SpringBoot 与 SpringCloud 整合 Kafka

前言

官方文档

SpringBoot 整合 Kafka

本节将介绍 SpringBoot 项目如何整合 Kafka,包括发送消息到 Kafka 和从 Kafka 消费消息。值得一提的是,本节给出的配置和代码同样适用于 SpringCloud 项目整合 Kafka,具体案例请看 这里

代码下载

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-19

版本说明

组件版本说明
SpringBoot2.7.18本节的案例代码兼容 SpringBoot 3.2.0 版本

引入依赖

  • SpringBoot 整合 Kafka 时,最关键的是引入 spring-kafka 依赖,而 spring-kafka 又会将 kafka-clients 依赖引入进来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
server:
port: 9999

spring:
kafka:
# Kafka 集群的地址
bootstrap-servers: 127.0.0.1:9092
producer:
# 设置生产者需要等待多少个分区副本收到消息的确认,可选值: 0 | 1 | all,其中 all 表示所有分区副本都需要确认,确保消息不丢失
acks: all
# 单次发送消息的批量大小(以字节为单位,默认16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率
batch-size: 16384
# 生产者内存缓冲区的大小(以字节为单位,默认32M),用于存储等待发送的消息
buffer-memory: 33554432
# Key 的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 设置生产者在遇到发送失败时的重试次数0 表示不进行重试
retries: 3
# 压缩类型,支持的压缩类型:none、gzip、snappy、lz4、zstd
compression-type: gzip
# 生产者的其他核心配置
properties:
# 如果数据量迟迟未达到 batch.size 大小,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间
linger.ms: 100
consumer:
# 消费者组 ID
group-id: test
# 从哪个偏移量 offset 开始消费,可选值:earliest | latest
auto-offset-reset: earliest
# 是否自动提交偏移量 offset
enable-auto-commit: false
# 自动提交的频率,生效的前提是 enable-auto-commit=true
auto-commit-interval: 1s
# Key 的反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
max-poll-records: 100
# 消费者的其他核心配置
properties:
# 如果在这个时间内(默认45秒),协调器没有收到心跳,该消费者会被踢出消费者组并触发分区再平衡
session.timeout.ms: 120000
# 最大消费时间,该参数决定了获取消息后提交偏移量的最长时间,超过设定的时间(默认5分钟),服务端会认为该消费者失效,然后将其踢出消费者组并触发分区再平衡
max.poll.interval.ms: 300000
# 客户端等待请求响应的最长时间如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败
request.timeout.ms: 60000
# 订阅或分配主题时,是否允许自动创建主题(生产环境建议设置为 false)
allow.auto.create.topics: false
# poll() 方法向协调器发送心跳的频率(默认每隔3秒发送一次),建议设置为 session.timeout.ms 的三分之一
heartbeat.interval.ms: 40000
# 指定每个分区里返回的记录最多不超的字节数
# max.partition.fetch.bytes=1048576
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 是否严格检查 topic 的存在性
# true: 如果配置的 topic 不存在,则启动失败
# false: 忽略不存在的 topic,继续启动
missing-topics-fatal: true
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
# 并发线程数,决定了创建多少个消费者实例(等于消费线程数)
# 建议设置为小于或等于 Topic 的分区数
# 每个线程可消费一个分区,线程数多于分区时,多余的线程将处于空闲状态
concurrency: 3
template:
default-topic: "test"
  • 影响生产吞吐量(速度)的核心参数

    • spring.kafka.producer.batch-size:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。
    • spring.kafka.producer.buffer-memory:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。
    • spring.kafka.producer.compression-type:生产者发送的所有数据的压缩方式。默认值为 none,也就是不压缩。支持压缩类型:nonegzipsnappylz4zstd
    • spring.kafka.producer.properties.linger.ms:如果数据量迟迟未达到 batch.size 大小,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
  • 影响消费吞吐量(速度)的核心参数

    • spring.kafka.consumer.max-poll-records:消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
    • spring.kafka.listener.concurrency:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数
    • spring.kafka.listener.type:消费者的消费模式,包括 single 单条消费和 batch 批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records 参数设置一次最多能拉取的消息数量。

工具类的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
public class KafkaConstants {

/**
* 主题
*/
public static final String TOPIC_TEST = "test";

/*
* 消费者组 ID
*/
public static final String GROUP_ID = "test";

}

生产者的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.clay.kafka.config.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 发送消息
*/
@GetMapping("/produce")
public String produce(String msg) {
try {
kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}

消费者的代码

特别注意

  • (1) 当启用自动提交 Offest,消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。因此,建议关闭自动提交 Offset,使用手动提交 Offset 来避免消息丢失的问题。
  • (2) 为了提高消费者的吞掉量(消费速率),一般情况下,建议使用批量消费,而不是使用单条消费。

自动提交 + 单条消费

  • 核心配置(自动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: true
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
# ack-mode: manual_immediate

# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: single
  • 核心代码(自动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(自动提交 Offset + 单条消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(ConsumerRecord<String, String> record) {
try {
// 处理消息
System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
} catch (Exception e) {
// 消息处理失败
System.err.println("Error processing message: " + e.getMessage());
}
}

}

自动提交 + 批量消费

  • 核心配置(自动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: true
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
# ack-mode: manual_immediate

# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
  • 核心配置(自动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(自动提交 Offset + 批量消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(List<ConsumerRecord<String, String>> records) {
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
}
} catch (Exception e) {
// 消息处理失败
System.err.println("Error processing message: " + e.getMessage());
}
}

}

手动提交 + 单条消费

  • 核心配置(手动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: single
  • 核心代码(手动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(手动提交 Offset + 单条消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
// 处理消息
System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}

手动提交 + 批量消费

  • 核心配置(手动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
  • 核心代码(手动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(手动提交 Offset + 批量消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
}
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}

SpringCloud 整合 Kafka

SpringCloud 整合 Kafka 有以下两种方式:

  • (1) 使用 SpringBoot 整合 Kafka
  • (2) 使用 SpringCloud Stream 整合 Kafka

SpringBoot 整合方式

这种整合方式其实就是上面介绍的 SpringBoot 整合 Kafka,依赖于 SpringBoot 和 Spring Kafka 提供的功能。SpringCloud 本身并没有直接对 Kafka 提供的特殊支持,而是通过 Spring Kafka 提供对 Kafka 的集成。这种方式最重要的是引入 SpringBoot 与 spring-kafka 的 Maven 坐标。

代码下载

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-21

版本说明

组件版本说明
SpringBoot3.2.0
SpringCloud2023.0.0

引入依赖

  • SpringCloud 整合 Kafka 时,最关键的是引入 SpringBoot 与 spring-kafka 依赖,而 spring-kafka 又会将 kafka-clients 依赖引入进来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
server:
port: 9999

spring:
kafka:
# Kafka 集群的地址
bootstrap-servers: 127.0.0.1:9092
producer:
# 设置生产者需要等待多少个分区副本收到消息的确认,可选值: 0 | 1 | all,其中 all 表示所有分区副本都需要确认,确保消息不丢失
acks: all
# 单次发送消息的批量大小(以字节为单位,默认16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率
batch-size: 16384
# 生产者内存缓冲区的大小(以字节为单位,默认32M),用于存储等待发送的消息
buffer-memory: 33554432
# Key 的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 设置生产者在遇到发送失败时的重试次数0 表示不进行重试
retries: 3
# 压缩类型,支持的压缩类型:none、gzip、snappy、lz4、zstd
compression-type: gzip
# 生产者的其他核心配置
properties:
# 如果数据量迟迟未达到 batch.size 大小,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间
linger.ms: 100
consumer:
# 消费者组 ID
group-id: test
# 从哪个偏移量 offset 开始消费,可选值:earliest | latest
auto-offset-reset: earliest
# 是否自动提交偏移量 offset
enable-auto-commit: false
# 自动提交的频率,生效的前提是 enable-auto-commit=true
auto-commit-interval: 1s
# Key 的反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
max-poll-records: 100
# 消费者的其他核心配置
properties:
# 如果在这个时间内(默认45秒),协调器没有收到心跳,该消费者会被踢出消费者组并触发分区再平衡
session.timeout.ms: 120000
# 最大消费时间,该参数决定了获取消息后提交偏移量的最长时间,超过设定的时间(默认5分钟),服务端会认为该消费者失效,然后将其踢出消费者组并触发分区再平衡
max.poll.interval.ms: 300000
# 客户端等待请求响应的最长时间如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败
request.timeout.ms: 60000
# 订阅或分配主题时,是否允许自动创建主题(生产环境建议设置为 false)
allow.auto.create.topics: false
# poll() 方法向协调器发送心跳的频率(默认每隔3秒发送一次),建议设置为 session.timeout.ms 的三分之一
heartbeat.interval.ms: 40000
# 指定每个分区里返回的记录最多不超的字节数
# max.partition.fetch.bytes=1048576
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 是否严格检查 topic 的存在性
# true: 如果配置的 topic 不存在,则启动失败
# false: 忽略不存在的 topic,继续启动
missing-topics-fatal: true
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
# 并发线程数,决定了创建多少个消费者实例(等于消费线程数)
# 建议设置为小于或等于 Topic 的分区数
# 每个线程可消费一个分区,线程数多于分区时,多余的线程将处于空闲状态
concurrency: 3
template:
default-topic: "test"
  • 影响生产吞吐量(速度)的核心参数

    • spring.kafka.producer.batch-size:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。
    • spring.kafka.producer.buffer-memory:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。
    • spring.kafka.producer.compression-type:生产者发送的所有数据的压缩方式。默认值为 none,也就是不压缩。支持压缩类型:nonegzipsnappylz4zstd
    • spring.kafka.producer.properties.linger.ms:如果数据量迟迟未达到 batch.size 大小,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
  • 影响消费吞吐量(速度)的核心参数

    • spring.kafka.consumer.max-poll-records:消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
    • spring.kafka.listener.concurrency:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数
    • spring.kafka.listener.type:消费者的消费模式,包括 single 单条消费和 batch 批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records 参数设置一次最多能拉取的消息数量。

工具类的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
public class KafkaConstants {

/**
* 主题
*/
public static final String TOPIC_TEST = "test";

/*
* 消费者组 ID
*/
public static final String GROUP_ID = "test";

}

生产者的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.clay.kafka.config.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 发送消息
*/
@GetMapping("/produce")
public String produce(String msg) {
try {
kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}

消费者的代码

特别注意

  • (1) 当启用自动提交 Offest,消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。因此,建议关闭自动提交 Offset,使用手动提交 Offset 来避免消息丢失的问题。
  • (2) 为了提高消费者的吞掉量(消费速率),一般情况下,建议使用批量消费,而不是使用单条消费。
自动提交 + 单条消费
  • 核心配置(自动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: true
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
# ack-mode: manual_immediate

# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: single
  • 核心代码(自动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(自动提交 Offset + 单条消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(ConsumerRecord<String, String> record) {
try {
// 处理消息
System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
} catch (Exception e) {
// 消息处理失败
System.err.println("Error processing message: " + e.getMessage());
}
}

}
自动提交 + 批量消费
  • 核心配置(自动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: true
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
# ack-mode: manual_immediate

# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
  • 核心配置(自动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(自动提交 Offset + 批量消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(List<ConsumerRecord<String, String>> records) {
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
}
} catch (Exception e) {
// 消息处理失败
System.err.println("Error processing message: " + e.getMessage());
}
}

}
手动提交 + 单条消费
  • 核心配置(手动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: single
  • 核心代码(手动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(手动提交 Offset + 单条消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
// 处理消息
System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}
手动提交 + 批量消费
  • 核心配置(手动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
  • 核心代码(手动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(手动提交 Offset + 批量消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
}
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}

SpringCloud Stream 整合方式

代码下载

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-20

SpringCloud Stream 的概述

SpringCloud Stream 是一个构建消息驱动微服务的框架,抽象了 MQ 的使用方式,提供统一的 API 操作。SpringCloud Stream 通过 Binder、Inputs/Outputs Channel 完成应用程序和 MQ 的解耦。SpringCloud Stream 的模型如下图:

  • Binder:负责绑定应用程序和 MQ 中间件,即指定应用程序是和 KafKa 交互还是和 RabbitMQ 交互,又或者和其他的 MQ 中间件交互。
  • Inputs/Outputs Channel:抽象发布 / 订阅消息的方式,即无论是什么类型的 MQ 应用程序都通过统一的方式发布 / 订阅消息。

SpringCloud Stream 的核心配置:

  • binder:绑定 MQ 中间件及配置
  • bindings:管理所有的 Topic
  • destination:指定发布 / 订阅的 Topic
  • contentType:指定发布 / 订阅消息的格式
  • group:指定消费者组(一条消息只能被一组消息者中的一个消息者消费)

SpringCloud Stream 的整合

在 SpringCloud Stream 3.X 之后,官方不建议使用 @Binding(Source.class)@StreamListener(Sink.class) 这样的注解,改成推荐使用函数式编程的方式实现。高版本的 SpringCloud Stream 提供两种使用方式,一种是使用 YML 配置的方式绑定生产 / 消费者,另一种是通过 Function 的方式绑定生产 / 消费者。

版本说明
组件版本说明
SpringBoot3.2.0
SpringCloud2023.0.0
整合案例一

案例目标

SpringCloud Stream 整合 Kafka,并使用 YML 配置的方式绑定消费者,同时通过 StreamBridge 发送消息。

  • 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>

提示

根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka 或者引入 spring-cloud-stream-binder-kafka 都是可以的。

  • 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server:
port: 9999

spring:
application:
name: kafka-test
cloud:
stream:
kafka:
binder:
# Kafka 集群的地址
brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
# 是否允许自动创建主题
auto-create-topics: false
bindings:
# 对应 MessageConsumer 中的 receiveMsg 函数,格式为:xxx-in/out-yy,其中 xxx 是函数名称,in/out 是接收消息/发送消息,yy 是消费者的索引
receiveMsg-in-0:
# 指定 Topic
destination: test
# 指定消息的格式
content-type: application/json

配置参数说明

  • spring.cloud.stream.kafka.binder 下面有很多配置,比如:默认自动提交偏移量、默认的分区和副本数量等配置,对于当前案例来说,默认值即可满足。
  • spring.cloud.stream.kafka.bindings 主要是为了配置消费者以及生产者信息的,其中 xxx-in/out-yy 这个配置主要是为了声明当前是生产者还是消费者的,而 destination 用于定义 Topic 的名称。
  • 实体类
1
2
3
4
5
6
7
8
9
10
11
12
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person implements Serializable {

private Long id;

private String name;

private int age;

}
  • Kafka 生产者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 消息生产者
*/
@Component
public class MessageProducer {

@Autowired
private StreamBridge streamBridge;

/**
* 发送消息
*
* @param topic 主题
* @param data 消息
*/
public void sendMessage(String topic, Object data) {
streamBridge.send(topic, data);
}

}
  • Kafka 消费者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 消息消费者
*/
@Component
public class MessageConsumer {

/**
* 消费消息
* <p> 对应 YML 配置文件中的 receiveMsg-in-0
*/
@Bean
public Consumer<Person> receiveMsg() {
return person -> {
if (person != null) {
System.out.println("name: " + person.getName() + ", age: " + person.getAge());
}
};
}

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

private static final String TOPIC = "test";

@Autowired
private MessageProducer messageProducer;

/**
* 发送消息
*/
@PostMapping("/produce")
public String produce(@RequestBody Person person) {
try {
// 发送消息到指定的 Topic
messageProducer.sendMessage(TOPIC, person);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}
整合案例二

案例目标

SpringCloud Stream 整合 Kafka,使用 YML 配置的方式绑定消费者,同时通过 Function 的方式绑定生产者。

  • 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>

提示

根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka 或者引入 spring-cloud-stream-binder-kafka 都是可以的。

  • 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
server:
port: 9999

spring:
application:
name: kafka-test
cloud:
stream:
kafka:
binder:
# Kafka 集群的地址
brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
# 是否允许自动创建主题
auto-create-topics: false
bindings:
receiveMsg-in-0:
# 指定 Binder
binder: kafka
# Topic 的名称
destination: test
# 消息的格式
content-type: application/json
# 消费组的名称
group: test
sendMsg-out-0:
# 绑定 Kafka
binder: kafka
# Topic 的名称
destination: test
# 消息的格式
content-type: application/json
# 消费组的名称
group: test
function:
# 指定要绑定的函数(多个函数使用分号分隔)
definition: receiveMsg;sendMsg

配置参数说明

  • spring.cloud.stream.kafka.binder 下面有很多配置,比如:默认自动提交偏移量、默认的分区和副本数量等配置,对于当前案例来说,默认值即可满足。
  • spring.cloud.stream.kafka.bindings 主要是为了配置消费者以及生产者信息的,其中 xxx-in/out-yy 这个配置主要是为了声明当前是生产者还是消费者的,而 destination 用于定义 Topic 的名称。

YML 配置文件是否需要指定 binder 属性

SpringCloud Stream 默认支持 Kafka 和 RabbitMQ 两种 Binder,如果在项目中只引入了 Kafka 的依赖,比如 spring-cloud-starter-stream-kafka,那么 SpringCloud Stream 会自动将 Kafka 作为默认的 Binder,即不需要在 spring.cloud.stream.bindings 下显式配置 binder: kafka。如果在项目中同时引入了多个 Binder(例如 Kafka 和 RabbitMQ),那么就必须明确配置每个绑定对应的 Binder,比如配置 binder: kafka。另外,还可以通过 spring.cloud.stream.default-binder 配置项为全局指定默认的 Binder。

YML 配置文件是否需要指定 function.definition 属性

  • 如果在项目中只有一个函数(如 Supplier 或 Consumer),SpringCloud Stream 会自动发现它,并将其绑定到配置的通道,在这种情况下,不需要配置 spring.cloud.function.definition 属性。
  • 如果在项目中有多个函数(Supplier、Consumer 或 Function),SpringCloud Stream 无法确定要绑定哪一个函数,在这种情况下,必须通过 spring.cloud.function.definition 属性指定要绑定的函数。
  • 实体类
1
2
3
4
5
6
7
8
9
10
11
12
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person implements Serializable {

private Long id;

private String name;

private int age;

}
  • Kafka 生产者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
public class MessageProducer {

private final BlockingQueue<Person> messageQueue = new LinkedBlockingQueue<>(1000);

/**
* 发送消息(基于函数式编程定义 Supplier)
* <p> 对应 YML 配置文件中的 sendMsg-out-0
* <p> 对应 YML 配置文件中的 spring.cloud.function.definition = sendMsg
*/
@Bean
public Supplier<Message<Person>> sendMsg() {
return () -> {
// 从队列中取出消息(非阻塞操作)
Person person = messageQueue.poll();
if (person != null) {
// 构建消息
return MessageBuilder.withPayload(person).build();
}
return null;
};
}

/**
* 提供外部方法发送消息,将 Person 对象加入队列
*/
public void sendPersonMessage(Person person) {
try {
// 将消息放入到队列(阻塞操作)
messageQueue.put(person);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to enqueue message", e);
}
}

}
  • Kafka 消费者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class MessageConsumer {

/**
* 消费消息(基于函数式编程定义 Consumer)
* <p> 对应 YML 配置文件中的 receiveMsg-in-0
*/
@Bean
public Consumer<Message<String>> receiveMsg() {
return message -> {
System.out.println("Receive Msg: " + message.getPayload());
};
}

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

@Autowired
private StreamBridge streamBridge;

@Autowired
private MessageProducer messageProducer;

/**
* 第一种方式发送消息
* <p> 依赖 StreamBridge
* <p> 依赖 spring.cloud.stream.bindings 的配置
* <p> 不依赖 spring.cloud.function.definition 的配置,但建议加上对应的配置
*/
@PostMapping("/produce")
public String produce(@RequestBody Person person) {
try {
// 发送消息到指定的绑定目标(Binding)
streamBridge.send("sendMsg-out-0", person);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

/**
* 第二种方式发送消息
* <p> 依赖函数式编程(定义 Supplier)
* <p> 依赖 spring.cloud.stream.bindings 的配置
* <p> 依赖 spring.cloud.function.definition 的配置
* <p> 不依赖 StreamBridge
*/
@PostMapping("/sendMsg")
public String send(@RequestBody Person person) {
try {
messageProducer.sendPersonMessage(person);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}

两种消息发送方式的对比

方式优点缺点
StreamBridge 动态灵活,可发送到任意绑定目标,无需额外接口或类。与函数式模型的代码解耦,可能增加绑定管理的复杂性。
函数式编程符合现代函数式风格,自动化程度高,代码简洁。一旦绑定,动态调整困难,适合静态绑定的场景。

提示

  • 在 SpringCloud Stream 中,使用函数式编程(定义 Supplier)发送消息时,SpringCloud 的消息发送机制并不是基于任务调度的定时发送,而是通过 Reactive Streams 的发布 / 订阅机制驱动的。
  • 当定义一个 Supplier 作为消息源时,SpringCloud Stream 会自动将这个 Supplier 包装成一个消息生成器,并且默认以无限流的形式调用它。
  • SpringCloud Stream 会将 Supplier 的返回值转换为一个无限流,也就是包装为一个 Reactor 的 Flux,并持续调用 messageSupplier(),然后将这个流中的每个元素作为消息发送到 Kafka 的绑定目标。
  • SpringCloud Stream 默认会通过内部的 Reactive Streams 驱动,每次从 Supplier 中获取一条消息并立即发送。默认情况下,这个过程是连续触发的,不带任何延迟,适合数据流量持续、需要高频发送的场景。

参考资料