Kafka 入门教程之五

大纲

前言

学习资源

Kafka 配置更新模式

在 Kafka 官方文档 中,提到 Kafka 的配置更新有三种模式,如下所示:

从 Kafka 的 1.1 版本开始,一些 Broker 配置可以在无需重启 Broker 的情况下进行更新。可以参考 Broker Configs 中的 Dynamic Update Mode(动态更新模式),了解每个 Broker 配置的更新模式。

  • read-only(只读):需要重启 Broker 才能更新配置。
  • per-broker(单个 Broker 动态更新):可以为每个 Broker 动态更新配置,不需要重启 Broker。
  • cluster-wide(集群范围动态更新):可以作为集群范围的默认值动态更新,也可以为单个 Broker 设置不同的值(例如用于测试),不需要重启 Broker。

Kafka 生产环境调优

硬件配置选择

业务场景说明

  • 每天的数据量:100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条(中型规模的公司)。
  • 每秒钟的日志条数:1 亿 / 24 小时 / 60 分 / 60 秒 = 1150 条 / 每秒钟。
  • 每条日志的大小:0.5k ~ 2k(取 1k)。
  • 每天的数据大小:1 亿条 * 1k ≈ 100g。
  • 每秒钟的数据量:1150 条 / 每秒钟 * 1k ≈ 1m/s。
  • 高峰期每秒钟的日志条数:1150 条 * 20 倍 = 23000 条。
  • 高峰期每秒钟的数据量:20MB/s。

服务器数量选择

  • 计算公式:服务器数量 = 2 * (生产者每秒钟的峰值生产速率 * 副本数量 / 100) + 1
  • 比如:3 台 = 2 * (20m/s * 2 / 100) + 1

磁盘选择

  • Kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多,建议选择普通的机械硬盘。
  • 每天的数据大小:1 亿条 * 1k ≈ 100g。
  • 每天的总数据大小:100g * 2 个副本 * 日志保存时间 3 天 / 磁盘使用率 0.7 ≈ 1T。
  • 根据以上计算结果,建议三台服务器的硬盘总大小大于等于 1T。

CPU 选择

  • num.io.threads = 8:负责写磁盘的线程数量,这个参数要占 CPU 总核数的 50%。
  • num.replica.fetchers = 1:副本同步时用于拉取数据的线程数量,这个参数占 CPU 总核数的 50% 的 1/3。
  • num.network.threads = 3:处理网络请求的线程数量,这个参数占 CPU 总核数的 50% 的 2/3。

提示

建议使用 32 核的 CPU,这样可以最大程度地发挥 Kafka 的性能。

网络选择

  • 选择千兆网卡即可。
  • 网络带宽 = 峰值吞吐量 ≈ 20MB/s。
  • 100Mbps 的单位是 bit;1byte = 8bit;10M/s 的单位是 byte;100Mbps/8 = 12.5M/s。
  • 一般百兆的网卡(100Mbps)、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。

内存选择

提示

Kafka 的内存组成:堆内存(Kafka 内部配置) + 页缓存(操作系统内存)。

堆内存查看
  • 查看 Kafka 的进程号
1
$ jps
1
2
3
58882 Kafka
5255 Jps
1931 QuorumPeerMain
  • 根据 Kafka 进程号,查看 Kafka 的 GC 情况
1
$ jstat -gc 58882 1s 10
1
2
3
4
5
6
7
8
9
10
11
S0C    S1C    S0U    S1U      EC       EU        OC         OU       MC     MU    CCSC   CCSU   YGC     YGCT    FGC    FGCT     GCT   
0.0 2048.0 0.0 1515.7 53248.0 15360.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 15360.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 15360.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 15360.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 15360.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 15360.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 16384.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 16384.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 16384.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
0.0 2048.0 0.0 1515.7 53248.0 16384.0 993280.0 145686.5 36032.0 35748.0 4160.0 4020.3 4 0.033 0 0.000 0.033
参数描述
S0C 第一个幸存区的大小
S1C 第二个幸存区的大小
S0U 第一个幸存区的使用大小
S1U 第二个幸存区的使用大小
EC 伊甸园区的大小
EU 伊甸园区的使用大小
OC 老年代大小
OU 老年代使用大小
MC 方法区大小
MU 方法区使用大小
CCSC 压缩类空间大小
CCSU 压缩类空间使用大小
YGC 年轻代垃圾回收次数
YGCT 年轻代垃圾回收消耗时间
FGC 老年代垃圾回收次数
FGCT 老年代垃圾回收消耗时间
GCT 垃圾回收消耗总时间
  • 根据 Kafka 进程号,查看 Kafka 的堆内存信息
1
2
3
4
5
# 较低版本的 JDK(比如 JDK 8)
$ jmap -heap 58882

# 较高版本的 JDK(比如 JDK 11)
$ jhsdb jmap --heap --pid 58882
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
Attaching to process ID 58882, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 1.8.0_361-b09

using parallel threads in the new generation.
using thread-local object allocation.
Parallel GC with 8 thread(s)

Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 4294967296 (4096.0MB)
NewSize = 268435456 (256.0MB)
MaxNewSize = 1431306240 (1365.0MB)
OldSize = 543948800 (518.75MB)
NewRatio = 2
SurvivorRatio = 8
PermSize = 67108864 (64.0MB)
MaxPermSize = 268435456 (256.0MB)
G1HeapRegionSize = 0 (0.0MB)

Heap Usage:
PS Young Generation
Eden Space:
capacity = 1530920960 (1460.0MB)
used = 322122560 (307.0MB)
free = 1208798400 (1153.0MB)
21.0% used
From Space:
capacity = 255852544 (244.0MB)
used = 0 (0.0MB)
free = 255852544 (244.0MB)
0.0% used
To Space:
capacity = 255852544 (244.0MB)
used = 0 (0.0MB)
free = 255852544 (244.0MB)
0.0% used
PS Old Generation
capacity = 2348810240 (2240.0MB)
used = 1610612736 (1536.0MB)
free = 738197504 (704.0MB)
68.6% used

50449 interned Strings occupying 4156344 bytes.

Heap Configuration(堆内存的配置)的参数说明:

参数描述
MinHeapFreeRatio 堆中空闲空间的最小比率。
MaxHeapFreeRatio 堆中空闲空间的最大比率。
MaxHeapSize 堆的最大大小。
NewSize 年轻代的初始大小。
MaxNewSize 年轻代的最大大小。
OldSize 老年代的初始大小。
NewRatio 年轻代与老年代容量的比率。
SurvivorRatioEden 区与 Survivor 区容量的比率。
PermSize 永久代的初始大小(适用于 Java 8 及之前)。
MaxPermSize 永久代的最大大小(适用于 Java 8 及之前)。
G1HeapRegionSizeG1 垃圾回收器的堆区域大小(如果使用 G1 收集器)。

Heap Usage(堆内存的使用情况)的参数说明:

参数描述
PS Young Generation 年轻代的堆使用情况。
Eden SpaceEden 区的容量、已用空间和空闲空间。
From SpaceFrom Survivor 区的容量、已用空间和空闲空间。
To SpaceTo Survivor 区的容量、已用空间和空闲空间。
PS Old Generation 老年代的堆使用情况。
Capacity (Old Generation) 老年代的总容量。
Used (Old Generation) 老年代的已用空间。
Free (Old Generation) 老年代的空闲空间。

Interned Strings 的参数说明:

参数描述
String Count 输出 JVM 中字符串池中缓存的字符串数量。
Total Size 字符串池中缓存字符串的总大小(以字节为单位)。
  • 或者使用 jcmd 工具,查看 Kafka 的堆内存信息
1
$ jcmd 58882 GC.heap_info
1
2
3
4
garbage-first heap   total 1048576K, used 185072K [0x00000000c0000000, 0x0000000100000000)
region size 1024K, 27 young (27648K), 2 survivors (2048K)
Metaspace used 39203K, committed 39616K, reserved 1114112K
class space used 4512K, committed 4672K, reserved 1048576K
堆内存调整
  • Kafka 堆内存建议每个节点设置 10G ~ 15G,堆内存大小可以在 kafka-server-start.sh 脚本文件中修改:
1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi
页缓存估算
  • 页缓存是 Linux 操作系统的内存,一般只需要保证 1 个 SeGment(1G 大小)中 25% 的数据可以存放在内存中就行了。
  • 计算公式:每个节点的页缓存大小 = (分区数量 * 1G * 25%) / 节点数。
  • 比如:3 个节点和 10 个分区,那么页缓存大小 = (10 * 1G * 25%) / 3 ≈ 1G。
  • 根据以上计算结果,建议服务器内存大于等于 11G,其中有 10G 是 Kafka 的堆内存使用,剩余的 1G 是页缓存使用。

Kafka 生产者调优

生产者的核心参数

Kafka 生产者的工作流程

Kafka 生产者的核心参数

生产者提高吞吐量

为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:

参数名称参数描述
buffer.memoryRecordAccumulator 缓冲区的总大小,默认值为 32m
batch.size缓冲区中一批数据的最大大小,默认值为 16k。适当增加该值,可以提高吞吐量;但是,如果该值设置得太大,会导致数据传输延迟增加。
linger.ms如果数据量迟迟未达到 batch.size,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
compression.type生产者发送的所有数据的压缩方式。默认值为 none,也就是不压缩。支持压缩类型:nonegzipsnappylz4zstd

生产者保证数据可靠性

参数名称参数描述
acks0:生产者发送过来的数据,不需要等数据落盘才应答。
1:生产者发送过来的数据,Leader 收到数据后才应答。
-1 (all):生产者发送过来的数据,Leader 和 ISR 队列里面的所有节点收到数据后才应答。默认值是 -1,而且 -1all 是等价的。
retries发送消息的失败重试次数,建议设置一个很大很大的值,表示一旦消息发送失败,就无限重试。

提示

至少一次(At Least Once) = ACK 级别设置为 -1 + 分区副本数量大于等于 2 + ISR 里应答的最小副本数量大于等于 2。

生产者保证数据有序性

  • 在 Kafka 单分区内,数据是有序(需要满足一定的条件)。
    • 在单个分区内,Kafka 保证消息是按生产者写入的顺序进行存储的,并且消费者在读取时也是按这个顺序读取的。因此,只要满足一定的条件,单分区内的数据是有序的。
    • 为了保证单分区的有序性,需要满足以下任意一个条件:
      • 第一种情况: max.in.flight.requests.per.connection=1,不需要开启幂等性,但会影响生产者的吞吐量。
      • 第二种情况:开启幂等性 enable.idempotence=true,并且设置 max.in.flight.requests.per.connection 的值在 1 ~ 5 之间。
  • 在 Kafka 多分区内,分区与分区之间的数据是无序。
    • 在多分区的情况下,Kafka 会将消息分布到不同的分区中。由于分区之间是并行处理的,Kafka 不会保证分区之间的消息顺序。因此,从全局视角来看,多分区的数据是无序的。
  • 在 MQ(比如 Kafka)中,保证消息的顺序性,最关键的是严格保证生产者、MQ 队列、消费者这三者是一对一的关系。
    • 要保证一个生产者只对应一个 Topic,一个 Topic 只对应一个 Partition,并且一个 Partition 只对应一个消费者。
    • 比如,生产者在发送消息的时候,可以指定一个 Key,比如指定某个订单 的 ID 作为 Key,那么这个订单相关的所有消息,一定都会被分发到同一个 Partition 中去,而且这个 Partition 中的数据一定是有顺序的。当消费者从 Partition 中读取消息的时候,也一定是有顺序的。另外,如果消费者是单线程进行消费处理,而处理比较耗时的话,假设处理一条消息耗时几十毫秒,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。建议在消费者内部使用单线程进行消费时,将消息写入 N 个内存队列,并且通过哈希算法将拥有相同 Key 的消息都写入到同一个内存队列里面;最后启动 N 个线程,每个线程分别消费一个内存队列即可,这就能保证消息的顺序性,也能大大提高消费消息的吞吐量。整个处理流程如下图所示:

生产者避免单分区乱序

Kafka 生产者解决单分区内的乱序问题,主要是依赖以下两个参数:

参数名称参数描述
enable.idempotence是否开启幂等性,默认值为 true, 表示默认开启幂等性。
max.in.flight.requests.per.connection允许最多没有返回 ACK 应答的次数,默认为 5,开启幂等性后必须保证该参数值在 1 ~ 5 范围内。

注意事项

  • 只开启 enable.idempotence 可以防止失败重试时的出现消息重复,但可能不能完全解决单分区内的乱序问题(这取决于 max.in.flight.requests.per.connection 参数的设置)。
  • 要彻底解决 Kafka 单分区内的乱序问题,enable.idempotencemax.in.flight.requests.per.connection 这两个参数需要配合使用,这样才可以确保消息的顺序性和幂等性,但是会牺牲一定的生产吞吐量。
  • 特别注意,这两个配置参数只能保证 Kafka 单分区内的数据顺序,多分区之间的数据顺序 Kafka 无法保证。

参数说明

  • max.in.flight.requests.per.connection 是 Kafka Producer 的一个配置参数,用于控制在同一个 TCP 连接上未被 Broker 确认(ACK)的请求的最大数量。默认值是 5,意味着在同一个连接上,最多可以有 5 个未确认的请求。
  • 如果启用了消息重试(retries > 0,默认启用重试机制),并且设置的 max.in.flight.requests.per.connection > 1,在某些情况下可能会导致单分区内的消息乱序。例如:一个较早的请求失败并被重试时,后续的请求可能已经被成功发送并确认。
  • enable.idempotence=true(启用幂等性),max.in.flight.requests.per.connection 的值不能超过 5,否则 Kafka 会拒绝启动 Producer。在启用幂等性后默认配置 max.in.flight.requests.per.connection=5,这是经过优化的平衡设置。

版本区别

  • Kafka 在 1.x 版本之前可以保证数据单分区的有序性,条件如下:

    • max.in.flight.requests.per.connection = 1,不需要考虑是否开启幂等性。
  • Kafka 在 1.x 及以后版本可以保证数据单分区的有序性,条件如下:

    • 未开启幂等性
      • max.in.flight.requests.per.connection 需要必须设置为 1。
    • 开启幂等性
      • max.in.flight.requests.per.connection 的值必须设置在 1 ~ 5 之间。
      • 原因说明:因为在 Kafka 1.x 版本以后,启用幂等性后,Kafka 服务端会缓存 Producer 发来的最近 5 个 Request 的元数据,因此无论如何,都可以保证最近 5 个 Request 的数据都是有序的(如下图所示)。

情景分析

  • max.in.flight.requests.per.connection 的值为 1(最严格的保证)

    • 每次只有一个请求正在处理,前一个请求完成后,才会发送下一个请求。
    • 在这种配置下,绝对可以保证单分区的有序性,但会牺牲生产者的吞吐量。
  • max.in.flight.requests.per.connection 的值在 1 ~ 5 之间

    • 配置的前提条件:必须设置 enable.idempotence=trueacks=all
    • 当满足上述前提条件时,Kafka 的生产者重试机制会确保消息的顺序,即使发生失败重试,也能按顺序处理请求。
    • Kafka 1.1 及以后版本,启用幂等性后,生产者可以保证单分区的顺序性,只要 max.in.flight.requests.per.connection 的值在 1 ~ 5 之间即可。
    • 配置风险:
      • 如果 enable.idempotence=false,同时 max.in.flight.requests.per.connection > 1,则单分区内可能会出现消息乱序。这是因为并行发送的请求,在发送失败后会重新发送,后发送的请求可能先完成,从而打破顺序。
      • 如果 enable.idempotence=false,为了保证单分区的顺序性,max.in.flight.requests.per.connection 的值必须设置为 1。

总结

  • enable.idempotence=trueacks=all 时,max.in.flight.requests.per.connection 的值在 1 ~ 5 之间,Kafka 仍然可以保证单分区的有序性。
  • 设置为 max.in.flight.requests.per.connection=1 时,这是最简单、最安全的配置,但会影响生产者的吞吐量。
  • 设置为 max.in.flight.requests.per.connection > 1 时,能保证单分区的有序性,同时可以提高吞吐量,但需要依赖 enable.idempotence=true(启用幂等性)。
  • 如果 enable.idempotence=false,即没有启用幂等性,那么 max.in.flight.requests.per.connection 的值必须设置为 1,这样才能保证单分区的有序性。

生产者避免消息重复发送

生产者避免消息重复发送有两种实现方案,包括开启幂等性和使用事务机制。

生产者开启幂等性

什么是 Kafka 的幂等性

  • Kafka 的幂等性是指 Producer (生产者) 无论向 Broker 发送多少条重复消息,Broker 端都只会持久化一条消息,保证了消息不重复。
  • 精确一次 (Exactly Once) = 开启幂等性 + 至少一次 (acks = -1 + 分区副本数 >= 2 + ISR 里应答的最小副本数量 >= 2)。
  • Kafka 官方文档中的幂等性详细介绍可以看 这里
参数名称参数描述
enable.idempotence是否开启幂等性,默认值为 true, 表示默认开启幂等性。
max.in.flight.requests.per.connection允许最多没有返回 ACK 应答的次数,默认为 5,开启幂等性后必须保证该参数值在 1 ~ 5 范围内。
  • 在 Kafka 中开启幂等性的配置参数是 enable.idempotence,默认值为 true,设置 false 会关闭幂等性。如果没有设置冲突的配置,默认情况下会启用幂等性。如果设置了冲突的配置,并且未显式启用幂等性,则会禁用幂等性。如果显式启用了幂等性,并且设置了冲突的配置,则会抛出 ConfigException 异常。

  • 当在生产者端设置 enable.idempotencetrue 时,生产者将确保数据流中只写入每条消息的一个副本,即可以确保生产者不会写入重复消息。如果设置为 false,由于代理失败等原因,生产者重试发送消息,可能会在数据流中写入重试消息的副本,即生产者可能会写入重复消息。

Kafka 启用幂等性要至少满足以下 4 个条件

  • (1) 在生产者端设置 enable.idempotence 参数:这个值必须为 true,即开启幂等性
  • (2) 在生产者端设置 max.in.flight.requests.per.connection 参数:这个值必须小于或等于 5
  • (3) 在生产者端设置 acks = -1:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功
  • (4) 在生产者端设置 retries:这个值必须大于 0,即必须要有重试机制
生产者使用事务机制

Kafa 除了可以使用幂等性来解决生产者重复发送消息之外,还可以使用事务机制来解决,关于 Kafka 事务机制的详细介绍请看 这里。Kafka 的事务一共有以下 5 个 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1.初始化事务
void initTransactions();

// 2.开启事务
void beginTransaction() throws ProducerFencedException;

// 3.在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4.提交事务
void commitTransaction() throws ProducerFencedException;

// 5.放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

Kafka Broker 调优

Broker 的核心参数

Broker 的工作流程

Broker 的核心参数

Broker 服役新节点

Broker 退役旧节点

Broker 增加分区数量

  • 通过 kafka-topics.sh 脚本增加 Topic 的分区数量
1
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3

特别注意

Kafka 的分区数量只能增加,不能减少。值得一提的是,Kafka 同样不支持减少已创建的主题的副本数量,但是可以通过 kafka-reassign-partitions.sh 脚本重新分配副本的方式来实现。

Broker 自动创建主题

  • 如果将 Broker 端的配置参数 auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,Broker 会自动创建一个分区数为 num.partitions(默认值为 1)、副本因子为 default.replication.factor(默认值为 1)的主题。

  • 除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应的主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。在生产环境下,强烈建议将 auto.create.topics.enable 设置为 false

Kafka 消费者调优

消费者的核心参数

消费者组的初始化流程

消费者组的完整消费流程

消费者的核心参数

消费者提高吞吐量

Kafka 如何提高消费者的消费速度呢?

  • (1) 如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数量,并且同时增加消费组的消费者数量,这两个条件缺一不可,即 消费组的消费者数量 = Topic 的分区数量

  • (2) 如果是下游的数据处理不及时(有较大延迟),则可以提高消费者每批次拉取消息的数量(默认每批次拉取 500 条消息)。每批次拉取数据过少(拉取的数据量 / 数据处理时间 < 生产速度),会使消费者处理数据的速度小于生产者生产数据的速度,从而可能导致消息积压。

  • (3) 消费者提高吞吐量的相关配置参数
参数名称参数描述
fetch.max.bytes消费者获取服务器端一批消息的最大字节数,默认值为 50m。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes(Broker 配置)或者 max.message.bytes(Topic 配置)影响。
max.poll.records消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500

消费者分区再平衡的优化

Kafka 消费者的分区再平衡(Partition Rebalancing)是指当消费者组中的成员发生变更时(如消费者加入、退出或故障),Kafka 自动调整分区与消费者之间的映射关系,以确保所有分区都会被消费者组中的成员消费。

消费者组的初始化流程

分区再平衡相关的配置参数

分区再平衡的优缺点

  • 优点:
    • 动态调整分区分配,支持弹性扩展。
    • 确保分区始终被有效消费。
  • 缺点:
    • 短暂中断:再平衡期间会暂停消费,导致短时间内消息未被处理。
    • 分区切换开销:消费者需要重新建立分区的连接和状态。

分区再平衡的优化措施

  • 使用静态成员 ID(group.instance.id),减少分区再平衡的频率。
  • 控制心跳和会话超时(heartbeat.interval.mssession.timeout.ms),避免误触发分区再平衡。
  • 通过配置参数 partition.assignment.strategy 来调整分区分配策略,以适应具体场景需求。

提示

  • 更多关于 Kafka 消费者的分区再平衡介绍请看 这里

Kafka 整体性能调优

提高整体吞吐量

为了提高 Kafka 整体的吞吐量,可以从以下四个角度进行优化。

(1) 增加 Topic 的分区数量

  • 通过 kafka-topics.sh 脚本增加 Topic 的分区数量(注意:分区数量只能增加,不能减少)
1
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3

(2) 提高生产吞吐量

参数描述
buffer.memory发送消息的缓冲区大小,默认值是 32m,生产环境可以增加到 64m
batch.size缓冲区中一批数据的最大大小,默认值是 16k。如果 Batch 设置太小,会导致频繁发送网络请求,吞吐量下降;如果 Batch 设置太大,会导致一条消息需要等待很久才能够被发送出去,增加网络延迟。
linger.ms如果数据迟迟未达到 batch.size,那么 Sender 等待 linger.ms 之后就会发送消息。默认值是 0,表示立刻发送消息。生产环境建议设置 5 ~ 100 毫秒之间。如果 linger.ms 设置太小,会导致频繁发送网络请求,吞吐量下降;如果 linger.ms 设置太大,会导致一条消息需要等待很久才能被发送出去,增加网络延迟。
compression.type默认值是 none,表示不压缩直接发送消息。支持的压缩类型:nonegzipsnappylz4zstd。生产环境也可以使用 gzip 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 Producer 端的 CPU 开销。

(3) 提高消费吞吐量

参数名称参数描述
fetch.max.bytes消费者获取服务器端一批消息的最大字节数,默认值为 50m。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes(Broker 配置)或者 max.message.bytes(Topic 配置)影响。
max.poll.records消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500

(4) 增加消费者的数量

当增加了 Topic 的分区数量后,还需要同时增加消费组的消费者数量,这两个条件缺一不可,即 消费组的消费者数量 = Topic 的分区数量,这样才能提高消费吞吐量。

数据精确一次

在 Kafka 中,为了保证消息被精确一次发送和消费,必须满足以下全部条件,缺一不可。

(1) 生产者角度

  • acks 参数设置为 -1
  • 开启幂等性 enable.idempotence = true,默认开启。
  • 在生产端使用事务机制发送消息。

(2) Broker 角度

  • 分区的副本数量大于等于 2(--replication-factor 2)。
  • ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)。

(3) 消费者角度

  • 事务机制 + 手动提交 Offset(enable.auto.commit = false)。
  • 消费者输出的目标存储系统必须支持事务(比如 MySQL、Kafka)。

合理设置分区数量

如何为某个 Topic 合理设置分区数量呢?以下是估算分区数量的方法:

  • (1) 创建一个只有 1 个分区的 Topic。
  • (2) 测试这个 Topic 的 Producer 吞吐量和 Consumer 吞吐量。
  • (3) 假设它们吞吐量的值分别是 Tp 和 Tc,单位是 MB/s。
  • (4) 然后,假设总的目标吞吐量是 Tt,那么 分区数 = Tt /min (Tp, Tc)
  • (5) 比如:Producer 吞吐量是 20m/s,Consumer 吞吐量是 50m/s,期望吞吐量 100m/s,那么 分区数 = 100 / min (20, 50) = 5 个分区

提示

  • Kafka 的分区数一般设置为:3 ~ 10 个。
  • Kafka 的分区数并不是越多越好,也不是越少越好,需要搭建完 Kafka 集群,然后对集群进行压测,再根据压测结果调整分区的数量。

单条消息大于 1M

当 Kafka 单条消息的大小大于 1M,那么可以通过以下参数进行优化:

Broker 节点宕机

在 Kafka 生产环境(集群部署)中,如果某个 Broker 节点挂掉,正常的处理方法如下:

  • (1) 先尝试重新启动 Broker 节点,如果能够正常启动,那问题直接解决。
  • (2) 如果无法正常重启,考虑增加内存、增加 CPU、增加网络带宽。
  • (3) 如果是将整个 Broker 节点误删除掉
    • 如果分区的副本数大于等于 2,那么可以按照 Kafka 服役新节点的方式重新服役一个新节点,并执行负载均衡,将部分数据迁移到新节点。
    • 如果分区的副本只有一个,那么只能 “批量重导”,也就是写个临时程序将丢失的那批数据查询出来,然后重新将消息写入 Kafka 里面。

Kafka 集群压力测试

Kafka 的性能调优(包括生产者、Broker、消费者)并不是一成不变的,需要根据不同的业务场景来调整参数。比如,A 项目单条消息的大小为 1K,而 B 项目单条消息的大小为 1M,那么两者的性能调优参数是不一样的。为了得到更符合具体场景的性能调优参数,Kafka 官方提供了压测脚本,可以很方便地对 Kafka 进行压测。

  • 生产者压测脚本:kafka-producer-perf-test.sh
  • 消费者压测脚本:kafka-consumer-perf-test.sh

上述脚本支持对 Kafka 集群进行压测,如下图所示:

生产者压力测试
  • (1) 创建一个新的 Topic,并设置 3 个分区和 3 个副本
1
$ ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 3 --partitions 3 --topic bench
  • (2) 执行生产者的压测脚本
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=16384 linger.ms=0

生产者压测脚本的参数:

参数名称参数描述
--record-size表示单条信息有多大,单位是字节(Byte)。
--num-records表示总共发送多少条信息。
--throughput表示每秒发送多少条信息。当设置为 -1,则表示不限流,即尽可能快地生产数据,可以测出生产者的最大吞吐量。
--producer-props用于配置生产者的相关参数,比如 batch.size 配置为 16k

生产者压测的输出结果:

1
2
3
4
5
6
7
8
37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 1453.0 ms max latency.
50535 records sent, 10107.0 records/sec (9.87 MB/sec), 1199.5 ms avg latency, 1404.0 ms max latency.
47835 records sent, 9567.0 records/sec (9.34 MB/sec), 1350.8 ms avg latency, 1570.0 ms max latency.
......
42390 records sent, 8444.2 records/sec (8.25 MB/sec), 3372.6 ms avg latency, 4008.0 ms max latency.
37800 records sent, 7558.5 records/sec (7.38 MB/sec), 4079.7 ms avg latency, 4758.0 ms max latency.
33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency, 5049.0 ms max latency.
1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms 99th, 5030 ms 99.9th
  • (3) 调整 batch.size 大小为 32k,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=32768 linger.ms=0
1
2
3
4
5
6
7
49922 records sent, 9978.4 records/sec (9.74 MB/sec), 64.2 ms avg latency, 340.0 ms max latency.
49940 records sent, 9988.0 records/sec (9.75 MB/sec), 15.3 ms avg latency, 31.0 ms max latency.
50018 records sent, 10003.6 records/sec (9.77 MB/sec), 16.4 ms avg latency, 52.0 ms max latency.
......
49960 records sent, 9992.0 records/sec (9.76 MB/sec), 17.2 ms avg latency, 40.0 ms max latency.
50090 records sent, 10016.0 records/sec (9.78 MB/sec), 16.9 ms avg latency, 47.0 ms max latency.
1000000 records sent, 9997.600576 records/sec (9.76 MB/sec), 20.20 ms avg latency, 340.00 ms max latency, 16 ms 50th, 30 ms 95th, 168 ms 99th, 249ms 99.9th.
  • (4) 调整 batch.size 大小为 4k,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=0
1
2
3
4
5
6
15598 records sent 3117.1 records/sec (3.04 MB/sec) 1878.3 ms avg latency, 3458.0 ms max latency.
17748 records sent 3549.6 records/sec (3.47 MB/sec) 5072.5 ms avg latency, 6705.0 ms max latency.
18675 records sent 3733.5 records/sec (3.65 MB/sec) 6800.9 ms avg latency, 7052.0 ms max latency.
......
19125 records sent, 3825.0 records/sec (3.74 MB/sec), 6416.5 ms avg latency, 7023.0 ms max latency.
1000000 records sent, 3660.201531 records/sec (3.57 MB/sec), 6576.68 ms avg latency, 7677.00 ms max latency, 6745 ms 50th, 7298 ms 95th, 7507 ms 99th, 7633 ms 99.9th.
  • (5) 调整 linger.ms 时间为 50ms,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50
1
2
3
4
5
6
7
16804 records sent, 3360.1 records/sec (3.28 MB/sec), 1841.6 ms avg latency, 3338.0 ms max latency·
18972 records sent, 3793.6 records/sec (3.70 MB/sec), 4877.7 ms avg latency, 6453.0 ms max latency.
19269 records sent, 3852.3 records/sec (3,76 MB/sec), 6477.9 ms avg latency, 6686.0 ms max latency.
......
17073 records sent, 3414.6 records/sec (3.33 MB/sec), 6987.7 ms avg latency, 7353.0 ms max latency.
19326 records sent, 3865.2 records/sec (3.77 MB/sec), 6756.5 ms avg latency, 7357.0 ms max latency.
1000000 records sent, 3842.754486 records/sec (3.75 MB/sec), 6272.49 ms avg latency, 7437.00 ms max latency, 6308 ms 50th, 6880 ms 95th, 7289 ms 99th, 7387 ms 99.9th.
  • (6) 调整 compression.typesnappy,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=snappy
1
2
3
4
5
6
7
17244 records sent, 3446.0 records/sec (3.37 MB/sec), 5207.0 ms avg latency, 6861.0 ms max latency.
18873 records sent, 3774.6 records/sec (3.69 MB/sec), 6865.0 ms avg latency, 7094.0 ms max latency·
18378 records sent, 3674.1 records/sec (3.59 MB/sec), 6579.2 ms avg latency, 6738.0 ms max latency.
......
17631 records sent, 3526.2 records/sec (3.44 MB/sec), 6671.3 ms avg latency, 7566.0 ms max latency.
19116 records sent, 3823.2 records/sec (3.73 MB/sec), 6739.4 ms avg latency, 7630.0 ms max latency.
1000000 records sent, 3722.925028 records/sec (3.64 MB/sec), 6467.75 ms avg latency, 7727.00 ms max latency, 6440 ms 50th, 7308 ms 95th, 7553 ms 99th, 7665 ms 99.9th.
  • (7) 调整 compression.typezstd,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=zstd
1
2
3
4
5
6
7
23820 records sent, 4763.0 records/sec (4.65 MB/sec), 1580.2 ms avg latency, 2651.0 ms max latency.
29340 records sent, 5868.0 records/sec (5.73 MB/sec), 3666.0 ms avg latency, 4752.0 ms max latency·
28950 records sent, 5788.8 records/sec (5.65 MB/sec), 5785.2 ms avg latency, 6865.0 ms max latency.
......
29580 records sent, 5916.0 records/sec (5.78 MB/sec), 6907.6 ms avg latency, 7432.0 ms max latency.
29925 records sent, 5981.4 records/sec (5.84 MB/sec), 6948.9 ms avg latency, 7541.0 ms max latency.
1000000 records sent, 5733.583318 records/sec (5.60 MB/sec), 6824.75 ms avg latency, 7595.00 ms max latency, 7067 ms 50th, 7400 ms 95th, 7500 ms 99th, 7552 ms 99.9th.
  • (8) 调整 compression.typegzip,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=gzip
1
2
3
4
5
6
7
8
27170 records sent, 5428.6 records/sec (5.30 MB/sec), 1374.0 ms avg latency, 2311.0 ms max latency.
31050 records sent, 6210.0 records/sec (6.06 MB/sec), 3183.8 ms avg latency, 4228.0 ms max latency.
32145 records sent, 6427.7 records/sec (6.28 MB/sec), 5028.1 ms avg latency, 6042.0 ms max latency.
......
31710 records sent, 6342.0 records/sec (6.19 MB/sec), 6457.1 ms avg latency, 6777.0 ms max latency.
31755 records sent, 6348.5 records/sec (6.20 MB/sec), 6498.7 ms avg latency, 6780.0 ms max latency.
32760 records sent, 6548.1 records/sec (6.39 MB/sec), 6375.7 ms avg latency, 6822.0 ms max latency.
1000000 records sent, 6320.153706 records/sec (6.17 MB/sec), 6155.42 ms avg latency, 6943.00 ms max latency, 6437 ms 50th, 6774 ms 95th, 6863 ms 99th, 6912 ms 99.9th.
  • (9) 调整 compression.typelz4,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=lz4
1
2
3
4
5
6
7
8
16696 records sent, 3339.2 records/sec (3.26 MB/sec), 1924.5 ms avg latency, 3355.0 ms max latency.
19647 records sent, 3928.6 records/sec (3.84 MB/sec), 4841.5 ms avg latency, 6320.0 ms max latency·
20142 records sent, 4028.4 records/sec (3.93 MB/sec), 6203.2 ms avg latency, 6378.0 ms max latency.
......
20130 records sent, 4024.4 records/sec (3.93 MB/sec), 6073.6 ms avg latency, 6396.0 ms max latency.
19449 records sent, 3889.8 records/sec (3.80 MB/sec), 6195.6 ms avg latency, 6500.0 ms max latency.
19872 records sent, 3972.8 records/sec (3.88 MB/sec), 6274.5 ms avg latency, 6565.0 ms max latency.
1000000 records sent, 3956.087430 records/sec (3.86 MB/sec), 6085.62 ms avg latency, 6745.00 ms max latency, 6212 ms 50th, 6524 ms 95th, 6610 ms 99th, 6695 ms 99.9th.
  • (10) 调整 buffer.memory64m,然后进行压测
1
$ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 buffer.memory=67108864
1
2
3
4
5
6
7
8
20170 records sent, 4034.0 records/sec (3.94 MB/sec), 1669.5 ms avg latency, 3040.0 ms max latency.
21996 records sent, 4399.2 records/sec (4.30 MB/sec), 4407.9 ms avg latency, 5806.0 ms max latency.
22113 records sent, 4422.6 records/sec (4.32 MB/sec), 7189.0 ms avg latency, 8623.0 ms max latency.
......
19818 records sent, 3963.6 records/sec (3.87 MB/sec), 12416.0 ms avg latency, 12847.0 ms max latency.
20331 records sent, 4062.9 records/sec (3.97 MB/sec), 12400.4 ms avg latency, 12874.0 ms max latency.
19665 records sent, 3933.0 records/sec (3.84 MB/sec), 12303.9 ms avg latency, 12838.0 ms max latency.
1000000 records sent, 4020.100503 records/sec (3.93 MB/sec), 11692.17 ms avg latency, 13796.00 ms max latency, 12238 ms 50th, 12949 ms 95th, 13691 ms 99th, 13766 ms 99.9th.

生产者压测结果汇总

生产者核心参数生产者的吞吐量说明
batch.size=16384 linger.ms=08.97 MB/sec
batch.size=32768 linger.ms=09.76 MB/sec
batch.size=4096 linger.ms=03.57 MB/sec
batch.size=4096 linger.ms=503.75 MB/sec 在测试环境进行压测时,可能感受不到 linger.ms 参数调整带来的影响;但在生产环境下,linger.ms 参数对生产吞吐量的影响较大。
batch.size=4096 linger.ms=50 compression.type=snappy3.64 MB/sec
batch.size=4096 linger.ms=50 compression.type=zstd5.60 MB/sec
batch.size=4096 linger.ms=50 compression.type=gzip6.17 MB/sec
batch.size=4096 linger.ms=50 compression.type=lz43.86 MB/sec
batch.size=4096 linger.ms=50 buffer.memory=671088643.93 MB/sec
消费者压力测试
  • (1) 执行消费者的压测脚本
1
$ ./kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic bench --messages 1000000 --consumer.config ../config/consumer.properties

消费者压测脚本的参数:

参数名称参数描述
--bootstrap-server指定 Kafka 集群地址。
--topic指定 Topic 的名称。
--messages总共要消费的消息数量。
--consumer.config指定消费者使用的配置文件。

消费者压测的输出结果:

1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-09-08 21:58:26:171, 2022-09-08 21:58:33:321, 977.0166, 136.6457, 1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418
  • (2) 在 Kafka 的配置文件 config/consumer.properties 中,更改消费者一次拉取消息的最大数量为 2000,然后执行压测
1
max.poll.records=2000
1
$ ./kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic bench --messages 1000000 --consumer.config ../config/consumer.properties
1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-09-08 21:18:06:268, 2022-09-08 21:18:12:863, 977.5146, 148.2206, 1000975, 151777.8620, 358, 6237, 156.7283, 160489.8188
  • (3) 在 Kafka 的配置文件 config/consumer.properties 中,更改消费者拉取一批数据的最大大小为 100m,然后执行压测
1
fetch.max.bytes=104857600
1
$ ./kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic bench --messages 1000000 --consumer.config ../config/consumer.properties
1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-09-08 21:26:13:203, 2022-09-08 21:26:19:662, 977.5146, 151.3415, 1000975, 154973.6801, 362, 6097, 160.3272, 164175.0041

消费者压测结果汇总

消费者的核心参数消费者的吞吐量说明
max.poll.records=500 fetch.max.bytes=52428800136.6457 MB/sec
max.poll.records=2000 fetch.max.bytes=52428800148.2206 MB/sec
max.poll.records=2000 fetch.max.bytes=104857600151.3415 MB/sec