Kafka 入门教程之五
大纲
前言
学习资源
Kafka 配置更新模式
在 Kafka 官方文档 中,提到 Kafka 的配置更新有三种模式,如下所示:
从 Kafka 的 1.1
版本开始,一些 Broker 配置可以在无需重启 Broker 的情况下进行更新。可以参考 Broker Configs 中的 Dynamic Update Mode
(动态更新模式),了解每个 Broker 配置的更新模式。
- read-only(只读):需要重启 Broker 才能更新配置。
- per-broker(单个 Broker 动态更新):可以为每个 Broker 动态更新配置,不需要重启 Broker。
- cluster-wide(集群范围动态更新):可以作为集群范围的默认值动态更新,也可以为单个 Broker 设置不同的值(例如用于测试),不需要重启 Broker。
Kafka 生产环境调优
硬件配置选择
业务场景说明
- 每天的数据量:100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条(中型规模的公司)。
- 每秒钟的日志条数:1 亿 / 24 小时 / 60 分 / 60 秒 = 1150 条 / 每秒钟。
- 每条日志的大小:0.5k ~ 2k(取 1k)。
- 每天的数据大小:1 亿条 * 1k ≈ 100g。
- 每秒钟的数据量:1150 条 / 每秒钟 * 1k ≈ 1m/s。
- 高峰期每秒钟的日志条数:1150 条 * 20 倍 = 23000 条。
- 高峰期每秒钟的数据量:20MB/s。
服务器数量选择
- 计算公式:服务器数量 = 2 * (生产者每秒钟的峰值生产速率 * 副本数量 / 100) + 1
- 比如:3 台 = 2 * (20m/s * 2 / 100) + 1
磁盘选择
- Kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多,建议选择普通的机械硬盘。
- 每天的数据大小:1 亿条 * 1k ≈ 100g。
- 每天的总数据大小:100g * 2 个副本 * 日志保存时间 3 天 / 磁盘使用率 0.7 ≈ 1T。
- 根据以上计算结果,建议三台服务器的硬盘总大小大于等于 1T。
CPU 选择
num.io.threads = 8
:负责写磁盘的线程数量,这个参数要占 CPU 总核数的 50%。num.replica.fetchers = 1
:副本同步时用于拉取数据的线程数量,这个参数占 CPU 总核数的 50% 的 1/3。num.network.threads = 3
:处理网络请求的线程数量,这个参数占 CPU 总核数的 50% 的 2/3。
提示
建议使用 32 核的 CPU,这样可以最大程度地发挥 Kafka 的性能。
网络选择
- 选择千兆网卡即可。
- 网络带宽 = 峰值吞吐量 ≈ 20MB/s。
- 100Mbps 的单位是 bit;1byte = 8bit;10M/s 的单位是 byte;100Mbps/8 = 12.5M/s。
- 一般百兆的网卡(100Mbps)、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。
内存选择
提示
Kafka 的内存组成:堆内存(Kafka 内部配置) + 页缓存(操作系统内存)。
堆内存查看
- 查看 Kafka 的进程号
1 | $ jps |
1 | 58882 Kafka |
- 根据 Kafka 进程号,查看 Kafka 的 GC 情况
1 | $ jstat -gc 58882 1s 10 |
1 | S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT GCT |
参数 | 描述 |
---|---|
S0C | 第一个幸存区的大小 |
S1C | 第二个幸存区的大小 |
S0U | 第一个幸存区的使用大小 |
S1U | 第二个幸存区的使用大小 |
EC | 伊甸园区的大小 |
EU | 伊甸园区的使用大小 |
OC | 老年代大小 |
OU | 老年代使用大小 |
MC | 方法区大小 |
MU | 方法区使用大小 |
CCSC | 压缩类空间大小 |
CCSU | 压缩类空间使用大小 |
YGC | 年轻代垃圾回收次数 |
YGCT | 年轻代垃圾回收消耗时间 |
FGC | 老年代垃圾回收次数 |
FGCT | 老年代垃圾回收消耗时间 |
GCT | 垃圾回收消耗总时间 |
- 根据 Kafka 进程号,查看 Kafka 的堆内存信息
1 | # 较低版本的 JDK(比如 JDK 8) |
1 | Attaching to process ID 58882, please wait... |
Heap Configuration(堆内存的配置)的参数说明:
参数 | 描述 |
---|---|
MinHeapFreeRatio | 堆中空闲空间的最小比率。 |
MaxHeapFreeRatio | 堆中空闲空间的最大比率。 |
MaxHeapSize | 堆的最大大小。 |
NewSize | 年轻代的初始大小。 |
MaxNewSize | 年轻代的最大大小。 |
OldSize | 老年代的初始大小。 |
NewRatio | 年轻代与老年代容量的比率。 |
SurvivorRatio | Eden 区与 Survivor 区容量的比率。 |
PermSize | 永久代的初始大小(适用于 Java 8 及之前)。 |
MaxPermSize | 永久代的最大大小(适用于 Java 8 及之前)。 |
G1HeapRegionSize | G1 垃圾回收器的堆区域大小(如果使用 G1 收集器)。 |
Heap Usage(堆内存的使用情况)的参数说明:
参数 | 描述 |
---|---|
PS Young Generation | 年轻代的堆使用情况。 |
Eden Space | Eden 区的容量、已用空间和空闲空间。 |
From Space | From Survivor 区的容量、已用空间和空闲空间。 |
To Space | To Survivor 区的容量、已用空间和空闲空间。 |
PS Old Generation | 老年代的堆使用情况。 |
Capacity (Old Generation) | 老年代的总容量。 |
Used (Old Generation) | 老年代的已用空间。 |
Free (Old Generation) | 老年代的空闲空间。 |
Interned Strings 的参数说明:
参数 | 描述 |
---|---|
String Count | 输出 JVM 中字符串池中缓存的字符串数量。 |
Total Size | 字符串池中缓存字符串的总大小(以字节为单位)。 |
- 或者使用
jcmd
工具,查看 Kafka 的堆内存信息
1 | $ jcmd 58882 GC.heap_info |
1 | garbage-first heap total 1048576K, used 185072K [0x00000000c0000000, 0x0000000100000000) |
堆内存调整
- Kafka 堆内存建议每个节点设置 10G ~ 15G,堆内存大小可以在
kafka-server-start.sh
脚本文件中修改:
1 | if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then |
页缓存估算
- 页缓存是 Linux 操作系统的内存,一般只需要保证 1 个 SeGment(1G 大小)中 25% 的数据可以存放在内存中就行了。
- 计算公式:每个节点的页缓存大小 = (分区数量 * 1G * 25%) / 节点数。
- 比如:3 个节点和 10 个分区,那么页缓存大小 = (10 * 1G * 25%) / 3 ≈ 1G。
- 根据以上计算结果,建议服务器内存大于等于 11G,其中有 10G 是 Kafka 的堆内存使用,剩余的 1G 是页缓存使用。
Kafka 生产者调优
生产者的核心参数
Kafka 生产者的工作流程
Kafka 生产者的核心参数
生产者提高吞吐量
为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:
参数名称 | 参数描述 |
---|---|
buffer.memory | RecordAccumulator 缓冲区的总大小,默认值为 32m 。 |
batch.size | 缓冲区中一批数据的最大大小,默认值为 16k 。适当增加该值,可以提高吞吐量;但是,如果该值设置得太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据量迟迟未达到 batch.size ,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms ,默认值为 0ms ,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。 |
compression.type | 生产者发送的所有数据的压缩方式。默认值为 none ,也就是不压缩。支持压缩类型:none 、gzip 、snappy 、lz4 和 zstd 。 |
生产者保证数据可靠性
参数名称 | 参数描述 |
---|---|
acks | 0 :生产者发送过来的数据,不需要等数据落盘才应答。1 :生产者发送过来的数据,Leader 收到数据后才应答。-1 (all) :生产者发送过来的数据,Leader 和 ISR 队列里面的所有节点收到数据后才应答。默认值是 -1 ,而且 -1 和 all 是等价的。 |
retries | 发送消息的失败重试次数,建议设置一个很大很大的值,表示一旦消息发送失败,就无限重试。 |
提示
至少一次(At Least Once) = ACK 级别设置为 -1
+ 分区副本数量大于等于 2 + ISR 里应答的最小副本数量大于等于 2。
生产者保证数据有序性
- 在 Kafka 单分区内,数据是有序(需要满足一定的条件)。
- 在单个分区内,Kafka 保证消息是按生产者写入的顺序进行存储的,并且消费者在读取时也是按这个顺序读取的。因此,只要满足一定的条件,单分区内的数据是有序的。
- 为了保证单分区的有序性,需要满足以下任意一个条件:
- 第一种情况:
max.in.flight.requests.per.connection=1
,不需要开启幂等性,但会影响生产者的吞吐量。 - 第二种情况:开启幂等性
enable.idempotence=true
,并且设置max.in.flight.requests.per.connection
的值在 1 ~ 5 之间。
- 第一种情况:
- 在 Kafka 多分区内,分区与分区之间的数据是无序。
- 在多分区的情况下,Kafka 会将消息分布到不同的分区中。由于分区之间是并行处理的,Kafka 不会保证分区之间的消息顺序。因此,从全局视角来看,多分区的数据是无序的。
- 在 MQ(比如 Kafka)中,保证消息的顺序性,最关键的是严格保证生产者、MQ 队列、消费者这三者是一对一的关系。
- 要保证一个生产者只对应一个 Topic,一个 Topic 只对应一个 Partition,并且一个 Partition 只对应一个消费者。
- 比如,生产者在发送消息的时候,可以指定一个 Key,比如指定某个订单 的 ID 作为 Key,那么这个订单相关的所有消息,一定都会被分发到同一个 Partition 中去,而且这个 Partition 中的数据一定是有顺序的。当消费者从 Partition 中读取消息的时候,也一定是有顺序的。另外,如果消费者是单线程进行消费处理,而处理比较耗时的话,假设处理一条消息耗时几十毫秒,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。建议在消费者内部使用单线程进行消费时,将消息写入 N 个内存队列,并且通过哈希算法将拥有相同 Key 的消息都写入到同一个内存队列里面;最后启动 N 个线程,每个线程分别消费一个内存队列即可,这就能保证消息的顺序性,也能大大提高消费消息的吞吐量。整个处理流程如下图所示:
生产者避免单分区乱序
Kafka 生产者解决单分区内的乱序问题,主要是依赖以下两个参数:
参数名称 | 参数描述 |
---|---|
enable.idempotence | 是否开启幂等性,默认值为 true , 表示默认开启幂等性。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ACK 应答的次数,默认为 5 ,开启幂等性后必须保证该参数值在 1 ~ 5 范围内。 |
注意事项
- 只开启
enable.idempotence
可以防止失败重试时的出现消息重复,但可能不能完全解决单分区内的乱序问题(这取决于max.in.flight.requests.per.connection
参数的设置)。 - 要彻底解决 Kafka 单分区内的乱序问题,
enable.idempotence
和max.in.flight.requests.per.connection
这两个参数需要配合使用,这样才可以确保消息的顺序性和幂等性,但是会牺牲一定的生产吞吐量。 - 特别注意,这两个配置参数只能保证 Kafka 单分区内的数据顺序,多分区之间的数据顺序 Kafka 无法保证。
参数说明
max.in.flight.requests.per.connection
是 Kafka Producer 的一个配置参数,用于控制在同一个 TCP 连接上未被 Broker 确认(ACK)的请求的最大数量。默认值是5
,意味着在同一个连接上,最多可以有 5 个未确认的请求。- 如果启用了消息重试(
retries > 0
,默认启用重试机制),并且设置的max.in.flight.requests.per.connection > 1
,在某些情况下可能会导致单分区内的消息乱序。例如:一个较早的请求失败并被重试时,后续的请求可能已经被成功发送并确认。 - 当
enable.idempotence=true
(启用幂等性),max.in.flight.requests.per.connection
的值不能超过5
,否则 Kafka 会拒绝启动 Producer。在启用幂等性后默认配置max.in.flight.requests.per.connection=5
,这是经过优化的平衡设置。
版本区别
Kafka 在
1.x
版本之前可以保证数据单分区的有序性,条件如下:max.in.flight.requests.per.connection = 1
,不需要考虑是否开启幂等性。
Kafka 在
1.x
及以后版本可以保证数据单分区的有序性,条件如下:- 未开启幂等性
max.in.flight.requests.per.connection
需要必须设置为 1。
- 开启幂等性
max.in.flight.requests.per.connection
的值必须设置在 1 ~ 5 之间。- 原因说明:因为在 Kafka
1.x
版本以后,启用幂等性后,Kafka 服务端会缓存 Producer 发来的最近 5 个 Request 的元数据,因此无论如何,都可以保证最近 5 个 Request 的数据都是有序的(如下图所示)。
- 未开启幂等性
情景分析
max.in.flight.requests.per.connection
的值为 1(最严格的保证)- 每次只有一个请求正在处理,前一个请求完成后,才会发送下一个请求。
- 在这种配置下,绝对可以保证单分区的有序性,但会牺牲生产者的吞吐量。
max.in.flight.requests.per.connection
的值在 1 ~ 5 之间- 配置的前提条件:必须设置
enable.idempotence=true
和acks=all
。 - 当满足上述前提条件时,Kafka 的生产者重试机制会确保消息的顺序,即使发生失败重试,也能按顺序处理请求。
- Kafka
1.1
及以后版本,启用幂等性后,生产者可以保证单分区的顺序性,只要max.in.flight.requests.per.connection
的值在 1 ~ 5 之间即可。 - 配置风险:
- 如果
enable.idempotence=false
,同时max.in.flight.requests.per.connection > 1
,则单分区内可能会出现消息乱序。这是因为并行发送的请求,在发送失败后会重新发送,后发送的请求可能先完成,从而打破顺序。 - 如果
enable.idempotence=false
,为了保证单分区的顺序性,max.in.flight.requests.per.connection
的值必须设置为 1。
- 如果
- 配置的前提条件:必须设置
总结
- 当
enable.idempotence=true
且acks=all
时,max.in.flight.requests.per.connection
的值在 1 ~ 5 之间,Kafka 仍然可以保证单分区的有序性。 - 设置为
max.in.flight.requests.per.connection=1
时,这是最简单、最安全的配置,但会影响生产者的吞吐量。 - 设置为
max.in.flight.requests.per.connection > 1
时,能保证单分区的有序性,同时可以提高吞吐量,但需要依赖enable.idempotence=true
(启用幂等性)。 - 如果
enable.idempotence=false
,即没有启用幂等性,那么max.in.flight.requests.per.connection
的值必须设置为 1,这样才能保证单分区的有序性。
生产者避免消息重复发送
生产者避免消息重复发送有两种实现方案,包括开启幂等性和使用事务机制。
生产者开启幂等性
什么是 Kafka 的幂等性
- Kafka 的幂等性是指 Producer (生产者) 无论向 Broker 发送多少条重复消息,Broker 端都只会持久化一条消息,保证了消息不重复。
- 精确一次 (Exactly Once) = 开启幂等性 + 至少一次 (
acks = -1
+分区副本数 >= 2
+ISR 里应答的最小副本数量 >= 2
)。 - Kafka 官方文档中的幂等性详细介绍可以看 这里。
参数名称 | 参数描述 |
---|---|
enable.idempotence | 是否开启幂等性,默认值为 true , 表示默认开启幂等性。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ACK 应答的次数,默认为 5 ,开启幂等性后必须保证该参数值在 1 ~ 5 范围内。 |
在 Kafka 中开启幂等性的配置参数是
enable.idempotence
,默认值为true
,设置false
会关闭幂等性。如果没有设置冲突的配置,默认情况下会启用幂等性。如果设置了冲突的配置,并且未显式启用幂等性,则会禁用幂等性。如果显式启用了幂等性,并且设置了冲突的配置,则会抛出 ConfigException 异常。当在生产者端设置
enable.idempotence
为true
时,生产者将确保数据流中只写入每条消息的一个副本,即可以确保生产者不会写入重复消息。如果设置为false
,由于代理失败等原因,生产者重试发送消息,可能会在数据流中写入重试消息的副本,即生产者可能会写入重复消息。
Kafka 启用幂等性要至少满足以下 4 个条件
- (1) 在生产者端设置
enable.idempotence
参数:这个值必须为true
,即开启幂等性 - (2) 在生产者端设置
max.in.flight.requests.per.connection
参数:这个值必须小于或等于 5 - (3) 在生产者端设置
acks = -1
:这是要求每条消息,必须是写入到所有 Replica(副本)之后,才能认为是发送成功 - (4) 在生产者端设置
retries
:这个值必须大于 0,即必须要有重试机制
生产者使用事务机制
Kafa 除了可以使用幂等性来解决生产者重复发送消息之外,还可以使用事务机制来解决,关于 Kafka 事务机制的详细介绍请看 这里。Kafka 的事务一共有以下 5 个 API:
1 | // 1.初始化事务 |
Kafka Broker 调优
Broker 的核心参数
Broker 的工作流程
Broker 的核心参数
Broker 服役新节点
Broker 退役旧节点
Broker 增加分区数量
- 通过
kafka-topics.sh
脚本增加 Topic 的分区数量
1 | $ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3 |
特别注意
Kafka 的分区数量只能增加,不能减少。值得一提的是,Kafka 同样不支持减少已创建的主题的副本数量,但是可以通过 kafka-reassign-partitions.sh
脚本重新分配副本的方式来实现。
Broker 自动创建主题
如果将 Broker 端的配置参数
auto.create.topics.enable
设置为true
(默认值是true
),那么当生产者向一个未创建的主题发送消息时,Broker 会自动创建一个分区数为num.partitions
(默认值为 1)、副本因子为default.replication.factor
(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应的主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。在生产环境下,强烈建议将
auto.create.topics.enable
设置为false
。
Kafka 消费者调优
消费者的核心参数
消费者组的初始化流程
消费者组的完整消费流程
消费者的核心参数
消费者提高吞吐量
Kafka 如何提高消费者的消费速度呢?
(1) 如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数量,并且同时增加消费组的消费者数量,这两个条件缺一不可,即
消费组的消费者数量 = Topic 的分区数量
。(2) 如果是下游的数据处理不及时(有较大延迟),则可以提高消费者每批次拉取消息的数量(默认每批次拉取 500 条消息)。每批次拉取数据过少(拉取的数据量 / 数据处理时间 < 生产速度),会使消费者处理数据的速度小于生产者生产数据的速度,从而可能导致消息积压。
- (3) 消费者提高吞吐量的相关配置参数
参数名称 | 参数描述 |
---|---|
fetch.max.bytes | 消费者获取服务器端一批消息的最大字节数,默认值为 50m 。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes (Broker 配置)或者 max.message.bytes (Topic 配置)影响。 |
max.poll.records | 消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500 。 |
消费者分区再平衡的优化
Kafka 消费者的分区再平衡(Partition Rebalancing)是指当消费者组中的成员发生变更时(如消费者加入、退出或故障),Kafka 自动调整分区与消费者之间的映射关系,以确保所有分区都会被消费者组中的成员消费。
消费者组的初始化流程
分区再平衡相关的配置参数
分区再平衡的优缺点
- 优点:
- 动态调整分区分配,支持弹性扩展。
- 确保分区始终被有效消费。
- 缺点:
- 短暂中断:再平衡期间会暂停消费,导致短时间内消息未被处理。
- 分区切换开销:消费者需要重新建立分区的连接和状态。
分区再平衡的优化措施
- 使用静态成员 ID(
group.instance.id
),减少分区再平衡的频率。 - 控制心跳和会话超时(
heartbeat.interval.ms
和session.timeout.ms
),避免误触发分区再平衡。 - 通过配置参数
partition.assignment.strategy
来调整分区分配策略,以适应具体场景需求。
提示
- 更多关于 Kafka 消费者的分区再平衡介绍请看 这里。
Kafka 整体性能调优
提高整体吞吐量
为了提高 Kafka 整体的吞吐量,可以从以下四个角度进行优化。
(1) 增加 Topic 的分区数量
- 通过
kafka-topics.sh
脚本增加 Topic 的分区数量(注意:分区数量只能增加,不能减少)
1 | $ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3 |
(2) 提高生产吞吐量
参数 | 描述 |
---|---|
buffer.memory | 发送消息的缓冲区大小,默认值是 32m ,生产环境可以增加到 64m 。 |
batch.size | 缓冲区中一批数据的最大大小,默认值是 16k 。如果 Batch 设置太小,会导致频繁发送网络请求,吞吐量下降;如果 Batch 设置太大,会导致一条消息需要等待很久才能够被发送出去,增加网络延迟。 |
linger.ms | 如果数据迟迟未达到 batch.size ,那么 Sender 等待 linger.ms 之后就会发送消息。默认值是 0 ,表示立刻发送消息。生产环境建议设置 5 ~ 100 毫秒之间。如果 linger.ms 设置太小,会导致频繁发送网络请求,吞吐量下降;如果 linger.ms 设置太大,会导致一条消息需要等待很久才能被发送出去,增加网络延迟。 |
compression.type | 默认值是 none ,表示不压缩直接发送消息。支持的压缩类型:none 、gzip 、snappy 、lz4 和 zstd 。生产环境也可以使用 gzip 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 Producer 端的 CPU 开销。 |
(3) 提高消费吞吐量
参数名称 | 参数描述 |
---|---|
fetch.max.bytes | 消费者获取服务器端一批消息的最大字节数,默认值为 50m 。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes (Broker 配置)或者 max.message.bytes (Topic 配置)影响。 |
max.poll.records | 消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500 。 |
(4) 增加消费者的数量
当增加了 Topic 的分区数量后,还需要同时增加消费组的消费者数量,这两个条件缺一不可,即 消费组的消费者数量 = Topic 的分区数量
,这样才能提高消费吞吐量。
数据精确一次
在 Kafka 中,为了保证消息被精确一次发送和消费,必须满足以下全部条件,缺一不可。
(1) 生产者角度
- 将
acks
参数设置为-1
。 - 开启幂等性
enable.idempotence = true
,默认开启。 - 在生产端使用事务机制发送消息。
(2) Broker 角度
- 分区的副本数量大于等于 2(
--replication-factor 2
)。 - ISR 里应答的最小副本数量大于等于 2 (
min.insync.replicas = 2
)。
(3) 消费者角度
- 事务机制 + 手动提交 Offset(
enable.auto.commit = false
)。 - 消费者输出的目标存储系统必须支持事务(比如 MySQL、Kafka)。
合理设置分区数量
如何为某个 Topic 合理设置分区数量呢?以下是估算分区数量的方法:
- (1) 创建一个只有 1 个分区的 Topic。
- (2) 测试这个 Topic 的 Producer 吞吐量和 Consumer 吞吐量。
- (3) 假设它们吞吐量的值分别是 Tp 和 Tc,单位是 MB/s。
- (4) 然后,假设总的目标吞吐量是 Tt,那么
分区数 = Tt /min (Tp, Tc)
。 - (5) 比如:Producer 吞吐量是 20m/s,Consumer 吞吐量是 50m/s,期望吞吐量 100m/s,那么
分区数 = 100 / min (20, 50) = 5 个分区
。
提示
- Kafka 的分区数一般设置为:3 ~ 10 个。
- Kafka 的分区数并不是越多越好,也不是越少越好,需要搭建完 Kafka 集群,然后对集群进行压测,再根据压测结果调整分区的数量。
单条消息大于 1M
当 Kafka 单条消息的大小大于 1M,那么可以通过以下参数进行优化:
Broker 节点宕机
在 Kafka 生产环境(集群部署)中,如果某个 Broker 节点挂掉,正常的处理方法如下:
- (1) 先尝试重新启动 Broker 节点,如果能够正常启动,那问题直接解决。
- (2) 如果无法正常重启,考虑增加内存、增加 CPU、增加网络带宽。
- (3) 如果是将整个 Broker 节点误删除掉
- 如果分区的副本数大于等于 2,那么可以按照 Kafka 服役新节点的方式重新服役一个新节点,并执行负载均衡,将部分数据迁移到新节点。
- 如果分区的副本只有一个,那么只能 “批量重导”,也就是写个临时程序将丢失的那批数据查询出来,然后重新将消息写入 Kafka 里面。
Kafka 集群压力测试
Kafka 的性能调优(包括生产者、Broker、消费者)并不是一成不变的,需要根据不同的业务场景来调整参数。比如,A 项目单条消息的大小为 1K,而 B 项目单条消息的大小为 1M,那么两者的性能调优参数是不一样的。为了得到更符合具体场景的性能调优参数,Kafka 官方提供了压测脚本,可以很方便地对 Kafka 进行压测。
- 生产者压测脚本:
kafka-producer-perf-test.sh
- 消费者压测脚本:
kafka-consumer-perf-test.sh
上述脚本支持对 Kafka 集群进行压测,如下图所示:
生产者压力测试
- (1) 创建一个新的 Topic,并设置 3 个分区和 3 个副本
1 | $ ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 3 --partitions 3 --topic bench |
- (2) 执行生产者的压测脚本
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=16384 linger.ms=0 |
生产者压测脚本的参数:
参数名称 | 参数描述 |
---|---|
--record-size | 表示单条信息有多大,单位是字节(Byte)。 |
--num-records | 表示总共发送多少条信息。 |
--throughput | 表示每秒发送多少条信息。当设置为 -1 ,则表示不限流,即尽可能快地生产数据,可以测出生产者的最大吞吐量。 |
--producer-props | 用于配置生产者的相关参数,比如 batch.size 配置为 16k 。 |
生产者压测的输出结果:
1 | 37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 1453.0 ms max latency. |
- (3) 调整
batch.size
大小为32k
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=32768 linger.ms=0 |
1 | 49922 records sent, 9978.4 records/sec (9.74 MB/sec), 64.2 ms avg latency, 340.0 ms max latency. |
- (4) 调整
batch.size
大小为4k
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=0 |
1 | 15598 records sent 3117.1 records/sec (3.04 MB/sec) 1878.3 ms avg latency, 3458.0 ms max latency. |
- (5) 调整
linger.ms
时间为50ms
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 |
1 | 16804 records sent, 3360.1 records/sec (3.28 MB/sec), 1841.6 ms avg latency, 3338.0 ms max latency· |
- (6) 调整
compression.type
为snappy
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=snappy |
1 | 17244 records sent, 3446.0 records/sec (3.37 MB/sec), 5207.0 ms avg latency, 6861.0 ms max latency. |
- (7) 调整
compression.type
为zstd
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=zstd |
1 | 23820 records sent, 4763.0 records/sec (4.65 MB/sec), 1580.2 ms avg latency, 2651.0 ms max latency. |
- (8) 调整
compression.type
为gzip
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=gzip |
1 | 27170 records sent, 5428.6 records/sec (5.30 MB/sec), 1374.0 ms avg latency, 2311.0 ms max latency. |
- (9) 调整
compression.type
为lz4
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 compression.type=lz4 |
1 | 16696 records sent, 3339.2 records/sec (3.26 MB/sec), 1924.5 ms avg latency, 3355.0 ms max latency. |
- (10) 调整
buffer.memory
为64m
,然后进行压测
1 | $ ./kafka-producer-perf-test.sh --topic bench --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 batch.size=4096 linger.ms=50 buffer.memory=67108864 |
1 | 20170 records sent, 4034.0 records/sec (3.94 MB/sec), 1669.5 ms avg latency, 3040.0 ms max latency. |
生产者压测结果汇总
生产者核心参数 | 生产者的吞吐量 | 说明 |
---|---|---|
batch.size=16384 linger.ms=0 | 8.97 MB/sec | |
batch.size=32768 linger.ms=0 | 9.76 MB/sec | |
batch.size=4096 linger.ms=0 | 3.57 MB/sec | |
batch.size=4096 linger.ms=50 | 3.75 MB/sec | 在测试环境进行压测时,可能感受不到 linger.ms 参数调整带来的影响;但在生产环境下,linger.ms 参数对生产吞吐量的影响较大。 |
batch.size=4096 linger.ms=50 compression.type=snappy | 3.64 MB/sec | |
batch.size=4096 linger.ms=50 compression.type=zstd | 5.60 MB/sec | |
batch.size=4096 linger.ms=50 compression.type=gzip | 6.17 MB/sec | |
batch.size=4096 linger.ms=50 compression.type=lz4 | 3.86 MB/sec | |
batch.size=4096 linger.ms=50 buffer.memory=67108864 | 3.93 MB/sec |
消费者压力测试
- (1) 执行消费者的压测脚本
1 | $ ./kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic bench --messages 1000000 --consumer.config ../config/consumer.properties |
消费者压测脚本的参数:
参数名称 | 参数描述 |
---|---|
--bootstrap-server | 指定 Kafka 集群地址。 |
--topic | 指定 Topic 的名称。 |
--messages | 总共要消费的消息数量。 |
--consumer.config | 指定消费者使用的配置文件。 |
消费者压测的输出结果:
1 | start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec |
- (2) 在 Kafka 的配置文件
config/consumer.properties
中,更改消费者一次拉取消息的最大数量为2000
,然后执行压测
1 | 2000 = |
1 | $ ./kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic bench --messages 1000000 --consumer.config ../config/consumer.properties |
1 | start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec |
- (3) 在 Kafka 的配置文件
config/consumer.properties
中,更改消费者拉取一批数据的最大大小为100m
,然后执行压测
1 | 104857600 = |
1 | $ ./kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic bench --messages 1000000 --consumer.config ../config/consumer.properties |
1 | start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec |
消费者压测结果汇总
消费者的核心参数 | 消费者的吞吐量 | 说明 |
---|---|---|
max.poll.records=500 fetch.max.bytes=52428800 | 136.6457 MB/sec | |
max.poll.records=2000 fetch.max.bytes=52428800 | 148.2206 MB/sec | |
max.poll.records=2000 fetch.max.bytes=104857600 | 151.3415 MB/sec |