前言
官方文档
SpringBoot 整合 Kafka
本节将介绍 SpringBoot 项目如何整合 Kafka,包括发送消息到 Kafka 和从 Kafka 消费消息。值得一提的是,本节给出的配置和代码同样适用于 SpringCloud 项目整合 Kafka,具体案例请看 这里。
代码下载
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-19
。
版本说明
组件 | 版本 | 说明 |
---|
SpringBoot | 2.7.18 | 本节的案例代码兼容 SpringBoot 3.2.0 版本 |
引入依赖
- SpringBoot 整合 Kafka 时,最关键的是引入
spring-kafka
依赖,而 spring-kafka
又会将 kafka-clients
依赖引入进来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath/> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| server: port: 9999
spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: acks: all batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 compression-type: gzip properties: linger.ms: 100 consumer: group-id: test auto-offset-reset: earliest enable-auto-commit: false auto-commit-interval: 1s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 properties: session.timeout.ms: 120000 max.poll.interval.ms: 300000 request.timeout.ms: 60000 allow.auto.create.topics: false heartbeat.interval.ms: 40000 listener: ack-mode: manual_immediate missing-topics-fatal: true type: batch concurrency: 3 template: default-topic: "test"
|
影响生产吞吐量(速度)的核心参数
spring.kafka.producer.batch-size
:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。spring.kafka.producer.buffer-memory
:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。spring.kafka.producer.compression-type
:生产者发送的所有数据的压缩方式。默认值为 none
,也就是不压缩。支持压缩类型:none
、gzip
、snappy
、lz4
和 zstd
。spring.kafka.producer.properties.linger.ms
:如果数据量迟迟未达到 batch.size
大小,Sender 线程等待 linger.ms
之后就会发送数据。单位是 ms
,默认值为 0ms
,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
影响消费吞吐量(速度)的核心参数
spring.kafka.consumer.max-poll-records
:消费者每次调用 poll()
方法时,一次最多能拉取的消息数量,默认值为 500spring.kafka.listener.concurrency
:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数spring.kafka.listener.type
:消费者的消费模式,包括 single
单条消费和 batch
批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records
参数设置一次最多能拉取的消息数量。
工具类的代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class KafkaConstants {
public static final String TOPIC_TEST = "test";
public static final String GROUP_ID = "test";
}
|
生产者的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.clay.kafka.config.KafkaConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/kafka") public class KafkaProducerController {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/produce") public String produce(String msg) { try { kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
消费者的代码
特别注意
- (1) 当启用自动提交 Offest,消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。因此,建议关闭自动提交 Offset,使用手动提交 Offset 来避免消息丢失的问题。
- (2) 为了提高消费者的吞掉量(消费速率),一般情况下,建议使用批量消费,而不是使用单条消费。
自动提交 + 单条消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: kafka: consumer: enable-auto-commit: true listener:
type: single
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(ConsumerRecord<String, String> record) { try { System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
自动提交 + 批量消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: kafka: consumer: enable-auto-commit: true listener:
type: batch
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(List<ConsumerRecord<String, String>> records) { try { for (ConsumerRecord<String, String> record : records) { System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
手动提交 + 单条消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: single
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { try { System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
手动提交 + 批量消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: batch
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
import java.util.List;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) { try { for (ConsumerRecord<String, String> record : records) { System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
SpringCloud 整合 Kafka
SpringCloud 整合 Kafka 有以下两种方式:
- (1) 使用 SpringBoot 整合 Kafka
- (2) 使用 SpringCloud Stream 整合 Kafka
SpringBoot 整合方式
这种整合方式其实就是上面介绍的 SpringBoot 整合 Kafka,依赖于 SpringBoot 和 Spring Kafka 提供的功能。SpringCloud 本身并没有直接对 Kafka 提供的特殊支持,而是通过 Spring Kafka 提供对 Kafka 的集成。这种方式最重要的是引入 SpringBoot 与 spring-kafka
的 Maven 坐标。
代码下载
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-21
。
版本说明
组件 | 版本 | 说明 |
---|
SpringBoot | 3.2.0 | |
SpringCloud | 2023.0.0 | |
引入依赖
- SpringCloud 整合 Kafka 时,最关键的是引入 SpringBoot 与
spring-kafka
依赖,而 spring-kafka
又会将 kafka-clients
依赖引入进来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> <relativePath/> </parent>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| server: port: 9999
spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: acks: all batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 compression-type: gzip properties: linger.ms: 100 consumer: group-id: test auto-offset-reset: earliest enable-auto-commit: false auto-commit-interval: 1s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 properties: session.timeout.ms: 120000 max.poll.interval.ms: 300000 request.timeout.ms: 60000 allow.auto.create.topics: false heartbeat.interval.ms: 40000 listener: ack-mode: manual_immediate missing-topics-fatal: true type: batch concurrency: 3 template: default-topic: "test"
|
影响生产吞吐量(速度)的核心参数
spring.kafka.producer.batch-size
:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。spring.kafka.producer.buffer-memory
:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。spring.kafka.producer.compression-type
:生产者发送的所有数据的压缩方式。默认值为 none
,也就是不压缩。支持压缩类型:none
、gzip
、snappy
、lz4
和 zstd
。spring.kafka.producer.properties.linger.ms
:如果数据量迟迟未达到 batch.size
大小,Sender 线程等待 linger.ms
之后就会发送数据。单位是 ms
,默认值为 0ms
,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
影响消费吞吐量(速度)的核心参数
spring.kafka.consumer.max-poll-records
:消费者每次调用 poll()
方法时,一次最多能拉取的消息数量,默认值为 500spring.kafka.listener.concurrency
:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数spring.kafka.listener.type
:消费者的消费模式,包括 single
单条消费和 batch
批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records
参数设置一次最多能拉取的消息数量。
工具类的代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class KafkaConstants {
public static final String TOPIC_TEST = "test";
public static final String GROUP_ID = "test";
}
|
生产者的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.clay.kafka.config.KafkaConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/kafka") public class KafkaProducerController {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/produce") public String produce(String msg) { try { kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
消费者的代码
特别注意
- (1) 当启用自动提交 Offest,消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。因此,建议关闭自动提交 Offset,使用手动提交 Offset 来避免消息丢失的问题。
- (2) 为了提高消费者的吞掉量(消费速率),一般情况下,建议使用批量消费,而不是使用单条消费。
自动提交 + 单条消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: kafka: consumer: enable-auto-commit: true listener:
type: single
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(ConsumerRecord<String, String> record) { try { System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
自动提交 + 批量消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: kafka: consumer: enable-auto-commit: true listener:
type: batch
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(List<ConsumerRecord<String, String>> records) { try { for (ConsumerRecord<String, String> record : records) { System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
手动提交 + 单条消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: single
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { try { System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
手动提交 + 批量消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: batch
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
import java.util.List;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) { try { for (ConsumerRecord<String, String> record : records) { System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
SpringCloud Stream 整合方式
代码下载
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-20
。
SpringCloud Stream 的概述
SpringCloud Stream 是一个构建消息驱动微服务的框架,抽象了 MQ 的使用方式,提供统一的 API 操作。SpringCloud Stream 通过 Binder、Inputs/Outputs Channel 完成应用程序和 MQ 的解耦。SpringCloud Stream 的模型如下图:
- Binder:负责绑定应用程序和 MQ 中间件,即指定应用程序是和 KafKa 交互还是和 RabbitMQ 交互,又或者和其他的 MQ 中间件交互。
- Inputs/Outputs Channel:抽象发布 / 订阅消息的方式,即无论是什么类型的 MQ 应用程序都通过统一的方式发布 / 订阅消息。
SpringCloud Stream 的核心配置:
binder
:绑定 MQ 中间件及配置bindings
:管理所有的 Topicdestination
:指定发布 / 订阅的 TopiccontentType
:指定发布 / 订阅消息的格式group
:指定消费者组(一条消息只能被一组消息者中的一个消息者消费)
SpringCloud Stream 的整合
在 SpringCloud Stream 3.X 之后,官方不建议使用 @Binding(Source.class)
、@StreamListener(Sink.class)
这样的注解,改成推荐使用函数式编程的方式实现。高版本的 SpringCloud Stream 提供两种使用方式,一种是使用 YML 配置的方式绑定生产 / 消费者,另一种是通过 Function 的方式绑定生产 / 消费者。
版本说明
组件 | 版本 | 说明 |
---|
SpringBoot | 3.2.0 | |
SpringCloud | 2023.0.0 | |
整合案例一
案例目标
SpringCloud Stream 整合 Kafka,并使用 YML 配置的方式绑定消费者,同时通过 StreamBridge 发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> <relativePath/> </parent>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies>
|
提示
根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka
或者引入 spring-cloud-stream-binder-kafka
都是可以的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| server: port: 9999
spring: application: name: kafka-test cloud: stream: kafka: binder: brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 auto-create-topics: false bindings: receiveMsg-in-0: destination: test content-type: application/json
|
配置参数说明
spring.cloud.stream.kafka.binder
下面有很多配置,比如:默认自动提交偏移量、默认的分区和副本数量等配置,对于当前案例来说,默认值即可满足。spring.cloud.stream.kafka.bindings
主要是为了配置消费者以及生产者信息的,其中 xxx-in/out-yy
这个配置主要是为了声明当前是生产者还是消费者的,而 destination
用于定义 Topic 的名称。
1 2 3 4 5 6 7 8 9 10 11 12
| @Data @NoArgsConstructor @AllArgsConstructor public class Person implements Serializable {
private Long id;
private String name;
private int age;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Component public class MessageProducer {
@Autowired private StreamBridge streamBridge;
public void sendMessage(String topic, Object data) { streamBridge.send(topic, data); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Component public class MessageConsumer {
@Bean public Consumer<Person> receiveMsg() { return person -> { if (person != null) { System.out.println("name: " + person.getName() + ", age: " + person.getAge()); } }; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @RestController @RequestMapping("/kafka") public class KafkaProducerController {
private static final String TOPIC = "test";
@Autowired private MessageProducer messageProducer;
@PostMapping("/produce") public String produce(@RequestBody Person person) { try { messageProducer.sendMessage(TOPIC, person); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
整合案例二
案例目标
SpringCloud Stream 整合 Kafka,使用 YML 配置的方式绑定消费者,同时通过 Function 的方式绑定生产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> <relativePath/> </parent>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies>
|
提示
根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka
或者引入 spring-cloud-stream-binder-kafka
都是可以的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| server: port: 9999
spring: application: name: kafka-test cloud: stream: kafka: binder: brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 auto-create-topics: false bindings: receiveMsg-in-0: binder: kafka destination: test content-type: application/json group: test sendMsg-out-0: binder: kafka destination: test content-type: application/json group: test function: definition: receiveMsg;sendMsg
|
配置参数说明
spring.cloud.stream.kafka.binder
下面有很多配置,比如:默认自动提交偏移量、默认的分区和副本数量等配置,对于当前案例来说,默认值即可满足。spring.cloud.stream.kafka.bindings
主要是为了配置消费者以及生产者信息的,其中 xxx-in/out-yy
这个配置主要是为了声明当前是生产者还是消费者的,而 destination
用于定义 Topic 的名称。
YML 配置文件是否需要指定 binder 属性
SpringCloud Stream 默认支持 Kafka 和 RabbitMQ 两种 Binder,如果在项目中只引入了 Kafka 的依赖,比如 spring-cloud-starter-stream-kafka
,那么 SpringCloud Stream 会自动将 Kafka 作为默认的 Binder,即不需要在 spring.cloud.stream.bindings
下显式配置 binder: kafka
。如果在项目中同时引入了多个 Binder(例如 Kafka 和 RabbitMQ),那么就必须明确配置每个绑定对应的 Binder,比如配置 binder: kafka
。另外,还可以通过 spring.cloud.stream.default-binder
配置项为全局指定默认的 Binder。
YML 配置文件是否需要指定 function.definition 属性
- 如果在项目中只有一个函数(如 Supplier 或 Consumer),SpringCloud Stream 会自动发现它,并将其绑定到配置的通道,在这种情况下,不需要配置
spring.cloud.function.definition
属性。 - 如果在项目中有多个函数(Supplier、Consumer 或 Function),SpringCloud Stream 无法确定要绑定哪一个函数,在这种情况下,必须通过
spring.cloud.function.definition
属性指定要绑定的函数。
1 2 3 4 5 6 7 8 9 10 11 12
| @Data @NoArgsConstructor @AllArgsConstructor public class Person implements Serializable {
private Long id;
private String name;
private int age;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Component public class MessageProducer {
private final BlockingQueue<Person> messageQueue = new LinkedBlockingQueue<>(1000);
@Bean public Supplier<Message<Person>> sendMsg() { return () -> { Person person = messageQueue.poll(); if (person != null) { return MessageBuilder.withPayload(person).build(); } return null; }; }
public void sendPersonMessage(Person person) { try { messageQueue.put(person); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Failed to enqueue message", e); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class MessageConsumer {
@Bean public Consumer<Message<String>> receiveMsg() { return message -> { System.out.println("Receive Msg: " + message.getPayload()); }; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @RestController @RequestMapping("/kafka") public class KafkaProducerController {
@Autowired private StreamBridge streamBridge;
@Autowired private MessageProducer messageProducer;
@PostMapping("/produce") public String produce(@RequestBody Person person) { try { streamBridge.send("sendMsg-out-0", person); } catch (Exception e) { e.printStackTrace(); } return "success"; }
@PostMapping("/sendMsg") public String send(@RequestBody Person person) { try { messageProducer.sendPersonMessage(person); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
两种消息发送方式的对比
方式 | 优点 | 缺点 |
---|
StreamBridge | 动态灵活,可发送到任意绑定目标,无需额外接口或类。 | 与函数式模型的代码解耦,可能增加绑定管理的复杂性。 |
函数式编程 | 符合现代函数式风格,自动化程度高,代码简洁。 | 一旦绑定,动态调整困难,适合静态绑定的场景。 |
提示
- 在 SpringCloud Stream 中,使用函数式编程(定义 Supplier)发送消息时,SpringCloud 的消息发送机制并不是基于任务调度的定时发送,而是通过 Reactive Streams 的发布 / 订阅机制驱动的。
- 当定义一个 Supplier 作为消息源时,SpringCloud Stream 会自动将这个 Supplier 包装成一个消息生成器,并且默认以无限流的形式调用它。
- SpringCloud Stream 会将 Supplier 的返回值转换为一个无限流,也就是包装为一个 Reactor 的
Flux
,并持续调用 messageSupplier()
,然后将这个流中的每个元素作为消息发送到 Kafka 的绑定目标。 - SpringCloud Stream 默认会通过内部的 Reactive Streams 驱动,每次从 Supplier 中获取一条消息并立即发送。默认情况下,这个过程是连续触发的,不带任何延迟,适合数据流量持续、需要高频发送的场景。
参考资料