Kafka 入门教程之二

大纲

前言

学习资源

Kafka 生产者

生产者消息发送流程

生产者消息发送原理

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main 线程和 sender 线程。在 main 线程中,会创建一个双端队列 RecordAccumulator。值得一提的是,main 线程将消息发送给 RecordAccumulator 时,sender 线程会不断从 RecordAccumulator 中拉取消息并发送到 Kafka Broker。

生产者重要参数列表

生产者异步发送 API

普通的异步发送

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-01

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的命令行消费者:

1
$ ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察命令行消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的异步发送

回调方法会在 Producer 收到 ack 时调用,且为异步调用;该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)。如果 Exceptionnull,则说明消息发送成功,如果 Exception 不为 null,则说明消息发送失败。值得一提的是,消息发送失败会自动重试发送,不需要在回调函数中手动重试发送。

  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息(带回调函数)
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
});
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的命令行消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0

生产者同步发送 API

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-02

普通的同步发送

同步发送的意思就是,当一条消息发送之后,会阻塞当前线程,直至收到 ack 应答。由于 send() 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,只需调用 Future 对象的 get() 方法即可实现同步发送。

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i)).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的命令行消费者:

1
$ ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察命令行消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的同步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息(带回调函数)
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的命令行消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 2
topic: test, partition: 0
topic: test, partition: 1
topic: test, partition: 2

生产者分区发送

生产者分区发送的优点

  • (1) 提高并行度,生产者支持以分区为单位发送数据,消费者支持以分区为单位消费数据。
  • (2) 便于合理使用存储资源,每个 Partition 在一台 Broker 上存储,可以把海量的数据按照分区切割成一块一块的数据并存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果。

生产者发送消息的分区策略

默认的分区器类是 DefaultPartitioner,部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {

......

}

通过 KafkaProducer 类的 send() 方法发送消息时,需要指定 ProducerRecord 对象作为参数,ProducerRecord 类的构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ProducerRecord<K, V> {

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
......
}

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
......
}

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
......
}

public ProducerRecord(String topic, Integer partition, K key, V value) {
......
}

public ProducerRecord(String topic, K key, V value) {
......
}

public ProducerRecord(String topic, V value) {
......
}

调用 ProducerRecord 类不同的构造方法时,有以下几种分区策略:

  • 在指明 partition 的情况下,直接将指明的值作为 partition 值。例如:partition=0,那么数据会被写入分区 0。

  • 在没有指明 partition 值,但有指定 key 的情况下,将 key 的 Hash 值与 topicpartition 数进行取余来得到 partition 值。例如:key 的 Hash 值是 5,topicpartition 数是 2,那么 key 对应的 value 会被写入 1 号分区。

  • 在既没有指明 partition 值,又没有指定 key 的情况下,Kafka 会采用 Sticky Partition 黏性分区器,也就是会随机选择一个分区,并尽可能一直使用该分区,等该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(和上一次选的分区不同)。例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16K 大小)或者 linger.ms 设置的时间到了,Kafka 会再随机选择一个分区进行使用(如果还是 0 分区会继续随机选择一个分区)。

自定义生产者发送的分区器

开发人员可以根据业务需求自定义分区器,只需要实现 Partitioner 接口即可。

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-03

  • 自定义分区器类,实现 Partitioner 接口,并重写 partition() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {

/**
* 返回消息对应的分区
*
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息内容
String msgValue = value.toString();

// 定义分区号
int partition;

if (msgValue.contains("order")) {
partition = 0;
} else {
partition = 1;
}
// 返回分区号
return partition;
}

/**
* 关闭资源
*/
@Override
public void close() {

}

/**
* 配置信息
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {

}

}
  • 在生产者的配置中添加分区器参数,以此来指定自定义分区器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定自定义分区器
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Partition : " + metadata.partition());
}
});
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的命令行消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
Partition : 1
Partition : 1
Partition : 1
Partition : 1
Partition : 1

生产者最佳实践

生产者如何提高吞吐量

参数优化

为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:

  • (1) 在生产者端设置 batch.size:批次大小,默认 16k
  • (2) 在生产者端设置 linger.ms:等待时间,默认 0ms,修改为 5-100ms
  • (3) 在生产者端设置 compression.type:压缩方式,默认是 none,修改过为 snappy
  • (4) 在生产者端设置 RecordAccumulator:缓冲区(双端队列)大小,默认是 32m,修改为 64m

特别注意

  • (1) 上述的四个参数值并不是设置得越大就越好,设置得过大会导致 Kafka 中的消息被延迟消费。
  • (2) 当 Topic 的分区数量比较多的时候,可以适当增加 RecordAccumulator(缓冲区) 的大小。
参数说明

示例代码

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-04

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 等待时间(默认 0ms)
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
// 批次大小(默认 16K),单位是字节
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
// 压缩方式(默认 none)
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 缓冲区大小(默认 32M),单位是字节
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024);

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}

生产者如何保证数据可靠性

这里的数据可靠性是指生产者如何保证消息可以发送给 Kafka,并保证 Kafka 可以持久化消息内容。

消息发送流程

ISR 介绍

Leader 维护了一个动态的 in-sync replica set (ISR),意为与 Leader 保持同步的 Follower + Leader 集合(leader: 0isr: 0, 1, 2)。

ACK 应答原理


数据可靠性总结

Kafka 至少配置以下 4 个核心参数,就可以保证生产者发送的消息不会丢失(数据可靠性):

  • (1) 给 Topic 设置 replication.factor 参数:这个值必须大于等于 2,即要求每个 Partition 必须至少有 2 个副本
  • (2) 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于等于 2,即要求一个 Leader 至少要有 1 个 Follower 还跟自己保持着同步,这样才能确保 Leader 宕机了,还有一个 Follower 可以使用
  • (3) 在生产者端设置 acks=all:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
  • (4) 在生产者端设置 retries=MAX(可以是一个很大很大的值,表示无限次重试的意思):这个是要求一旦消息发送失败,就无限重试

思考:为什么要求每个分区必须至少有 2 个副本?

在 Kafka 中,为了保证数据的可靠性,要求分区的副本数(replication.factor)必须大于等于 2,原因如下:

  • 副本数为 1 的问题

    • (1) 单点故障:如果只有一个副本(Leader),当该副本宕机时,分区的数据将无法访问,导致数据不可用或丢失。
    • (2) 无法容灾:没有其他副本可供替代,数据的高可用性和可靠性完全依赖于单一节点,风险极高。
  • 副本数 ≥ 2 的优势

    • (1) 高可用性:当 Leader 副本宕机时,ISR 中的其他副本可以迅速选举出新的 Leader,保证数据的可用性。
    • (2) 数据冗余:多个副本存储相同的数据,即使一个副本发生故障,其他副本仍然保存着数据,降低了数据丢失的风险。
    • (3) 容灾能力:即使一个副本所在的物理节点出现故障或数据损坏,其他副本仍能确保系统正常运行。

特别注意

当 ACK 级别设置为 all 的时候,虽然可以完全保证数据的可靠性,但会存在数据重复的情况,详细的介绍可以看 这里

示例代码

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-05

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置 ACK 应答级别,默认值是 all
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 设置重试次数,默认值是 int 类型的最大值 2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}

生产者如何处理数据重复

Kafka 从 0.11 版本以后,引入了一项重大特性:幂等性和事务,可用于解决生产者数据重复(即生产者重复发送消息)的问题。

数据重复分析

数据传递语义
  • 至少一次 (At Least Once) = ACK 级别设置为 all + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2
  • 最多一次 (At Most Once) = ACK 级别设置为 0

提示

  • 至少一次 (At Least Once) 可以保证数据不丢失,但是不能保证数据不重复。
  • 最多一次 (At Most Once) 可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次 (Exactly Once):对于一些非常重要的信息,比如和金额相关的数据,要求数据既不能重复,也不能丢失。
幂等性使用
幂等性介绍
  • Kafka 的幂等性是指 Producer (生产者) 无论向 Broker 发送多少条重复消息,Broker 端都只会持久化一条消息,保证了消息不重复。
  • Kafka 保证精确一次 (Exactly Once) = 启用幂等性 + 至少一次 (acks = all + 分区副本数 >= 2 + ISR 里应答的最小副本数量 >= 2)。
  • 重复数据的判断标准:具有 <PID, Partition, SeqNumber> 相同主键的消息提交时,Broker 只会持久化一条。其中 PID 是 Kafka 每次重启都会分配一个新的,Partition 表示分区号,Sequence Number 是单调自增的序列号,所以幂等性只能保证数据在单分区单会话内不重复,这里的单会话是相对于 Kafka 单次重启来说。如果需要解决多分区多会话的数据重复问题,需要结合幂等性与事务来解决。

幂等性开启
  • 在 Kafka 中开启幂等性的配置参数是 enable.idempotence,默认值为 true,设置 false 会关闭幂等性。如果没有设置冲突的配置,默认情况下会启用幂等性。如果设置了冲突的配置,并且未显式启用幂等性,则会禁用幂等性。如果显式启用了幂等性,并且设置了冲突的配置,则会抛出 ConfigException 异常。

  • 当在生产者端设置 enable.idempotencetrue 时,生产者将确保数据流中只写入每条消息的一个副本,即可以确保生产者不会写入重复消息。如果设置为 false,由于代理失败等原因,生产者重试发送消息,可能会在数据流中写入重试消息的副本,即生产者可能会写入重复消息。

  • 特别注意,Kafka 启用幂等性要至少满足以下 4 个条件:

    • (1) 在生产者端设置 enable.idempotence 参数:这个值必须为 true,即开启幂等性
    • (2) 在生产者端设置 max.in.flight.requests.per.connection 参数:这个值必须小于或等于 5
    • (3) 在生产者端设置 acks=all:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
    • (4) 在生产者端设置 retries:这个值必须大于 0,即必须要有重试机制

提示

Kafka 官方文档中的幂等性详细介绍可以看 这里

示例代码

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-06

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置启用幂等性
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// 设置单个连接上最多可以发送的未确认(ACK)请求的数量
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// 设置 ACK 应答级别,默认值是 all
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 设置重试次数,默认值是 int 类型的最大值 2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}
事务使用

特别注意

  1. 由于事务的底层是基于幂等性,因此在生产者使用事务之前,必须开启幂等性(默认开启)。
  2. 更多关于 Kafka 事务的使用介绍可以看 这里 的教程。
事务原理介绍

事务流程介绍

如上图所示,整个事务流程分为以下几个步骤:

  • 事务初始化:initTransactions()
  • 事务启动:beginTransaction()
  • 发送消息:一般发送多条消息,可以向 1 个或多个 Topic 发送消息
  • 事务提交:commitTransaction()
  • 事务回滚:abortTransaction()
  • 消费消息

当 Producer 发送多条事务消息时:

  • 事务初始化是一次性的
  • 事务开始、发送消息、事务提交 / 回滚则会一直循环运行
事务的常用 API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1.初始化事务
void initTransactions();

// 2.开启事务
void beginTransaction() throws ProducerFencedException;

// 3.在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4.提交事务
void commitTransaction() throws ProducerFencedException;

// 5.放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
示例代码

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-07

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 指定序列化器(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置事务 ID(必须)
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_01");

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

// 初始化事务
producer.initTransactions();

// 开启事务
producer.beginTransaction();

try {
// 发送数据
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 放弃事务
producer.abortTransaction();
} finally {
// 关闭资源
producer.close();
}

}

}

生产者如何保证数据有序

在消息队列中,消息有序是指消息的生产和消费遵循特定的顺序,即消息的顺序不会因为传输、存储或消费过程而被打乱。

数据有序分析

提示

  • 在单个分区内,数据是有序的(需要符合一定的条件)。
  • 当存在多个分区时,分区与分区之间的数据是无序。
  • 如果要求多分区的数据有序,可以让消费者读取多个分区的数据,并存储在本地内存中,然后对内存中的数据进行排序,最后再进一步统一处理数据。但这会引申出一个问题,也就是消费者需要等待所有分区的数据都读取完了才能进一步处理数据,这会导致数据的处理效率比较低。

保证数据有序
  • Kafka 在 1.x 版本之前可以保证数据单分区的有序性,条件如下:

    • max.in.flight.requests.per.connection = 1,不需要考虑是否开启幂等性。
  • Kafka 在 1.x 及以后版本可以保证数据单分区的有序性,条件如下:

    • 未开启幂等性
      • max.in.flight.requests.per.connection 需要必须设置为 1。
    • 开启幂等性
      • max.in.flight.requests.per.connection 的值必须设置在 1 ~ 5 之间。
      • 原因说明:因为在 Kafka 1.x 版本以后,启用幂等性后,Kafka 服务端会缓存 Producer 发来的最近 5 个 Request 的元数据,因此无论如何,都可以保证最近 5 个 Request 的数据都是有序的(如下图所示)。

常见解决方案

特别注意

在消息队列中,保证消息的顺序性,最关键的是严格保证生产者、MQ 队列、消费者这三者是一对一的关系。

  • 要保证一个生产者只对应一个 Topic,一个 Topic 只对应一个 Partition,并且一个 Partition 只对应一个消费者。

  • 比如,生产者在发送消息的时候,可以指定一个 Key,比如指定某个订单 的 ID 作为 Key,那么这个订单相关的所有消息,一定都会被分发到同一个 Partition 中去,而且这个 Partition 中的数据一定是有顺序的。当消费者从 Partition 中读取消息的时候,也一定是有顺序的。另外,如果消费者是单线程进行消费处理,而处理比较耗时的话,假设处理一条消息耗时几十毫秒,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。建议在消费者内部使用单线程进行消费时,将消息写入 N 个内存队列,并且通过哈希算法将拥有相同 Key 的消息都写入到同一个内存队列里面;最后启动 N 个线程,每个线程分别消费一个内存队列即可,这就能保证消息的顺序性,也能大大提高消费消息的吞吐量。