Kafka 入门教程之三

大纲

前言

学习资源

Kafka Broker

Zookeeper 存储结构

提示

在 Kafka 的 ZooKeeper 模式架构中,会在集群中选举一个 Broker 作为唯一的 Controller,该 Controller 负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。另外,Controller 的信息同步工作是依赖于 Zookeeper 的,其中 Kafka 在 Zookeeper 中的存储结构如下图所示:

整体的工作流程图

Broker 的核心参数

Broker 的文件存储

文件存储机制

在 Kafka 中,Topic 是逻辑上的概念,而 Partition 是物理上的概念,每个 Partition 对应于一个 Log 文件,该 Log 文件中存储的就是 Producer 生产的数据。Producer 生产的数据会被不断追加到该 Log 文件末端,为防止 Log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 Partition 分为多个 Segment。每个 Segment 都包含了 .log 文件、.index 文件和 .timeindex 等文件。这些文件位于同一个文件夹下,该文件夹的命名规则为 Topic 名称 + 分区序号,比如 first-0

文件存储位置

思考问题

当生产者发送消息到 Kafka 后,Topic 数据(消息数据)到底存储在什么位置?

(1) 启动生产者,并发送消息

1
[centos@node01 kafka]$ bin/kafka-console-producer.sh --bootstrap-server node01:9092 --topic first

(2) 查看 node01(或者 node02、node03)的 kafka/datas/first-0(或者 first-1、first-2) 路径上的文件列表

1
2
3
4
5
6
7
[centos@node01 kafka]$ ls ./datas/first-0

00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
leader-epoch-checkpoint
partition.metadata

(3) 直接查看 .log 文件的内容,发现是乱码

1
[centos@node01 first-0]$ cat 00000000000000000000.log

(4) 通过工具查看 .log 文件的内容,输出字段的介绍请看 这里

1
[centos@node01 kafka]$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./datas/first-0/00000000000000000000.log --print-data-log
1
2
3
4
5
6
7
8
Dumping ../datas/first-0/00000000000000000000.log
Log starting offset: 0
baseOffset: 18 lastOffset: 18 count: 1 baseSequence: 2 lastSequence: 2 producerId: 1001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4040 CreateTime: 1734184179506 size: 68 magic: 2 compresscodec: none crc: 799683847 isvalid: true
| offset: 18 CreateTime: 1734184179506 keySize: -1 valueSize: 0 sequence: 2 headerKeys: [] payload: java
baseOffset: 19 lastOffset: 19 count: 1 baseSequence: 3 lastSequence: 3 producerId: 1001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4108 CreateTime: 1734184184516 size: 69 magic: 2 compresscodec: none crc: 769781493 isvalid: true
| offset: 19 CreateTime: 1734184184516 keySize: -1 valueSize: 1 sequence: 3 headerKeys: [] payload: python
baseOffset: 20 lastOffset: 20 count: 1 baseSequence: 0 lastSequence: 0 producerId: 2000 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4177 CreateTime: 1734414249682 size: 73 magic: 2 compresscodec: none crc: 3581754326 isvalid: true
| offset: 20 CreateTime: 1734414249682 keySize: -1 valueSize: 5 sequence: 0 headerKeys: [] payload: golang

(5) 通过工具查看 .index 文件的内容

1
[centos@node01 kafka]$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./datas/first-0/00000000000000000000.index
1
2
Dumping ../datas/first-0/00000000000000000000.index
offset: 0 position: 0

文件内容详解

Kafka 通过 offset 定位 Log 文件中的数据的流程如下:

Kafka 日志存储的配置参数如下:

参数描述
log.segment.bytesKafka 中 Log 日志是分成一块块存储的,此配置是指 Log 日志划分成块的大小,默认值是 1G
log.index.interval.bytes默认值是 4kb,表示在 Kafka 里面每当写入了 4kb 大小的日志(.log), 然后就会往 .index 文件里面记录一个索引(稀疏索引)。

文件清除策略

Kafka 中默认的日志(即消息数据)保存时间为 7 天,可以通过调整如下参数来修改保存时间。

  • log.retention.hours:小时(最低优先级),默认 7 天。
  • log.retention.minutes:分钟。
  • log.retention.ms:毫秒(最高优先级)。
  • log.retention.check.interval.ms:日志检查周期,默认 5 分钟。

思考:Kafka 中的日志一旦超过了设置的保存时间,那么日志会被怎么处理呢?

Kafka 中提供的日志清理策略有两种:deletecompact

  • delete(日志删除)
    • 会将过期的日志文件删除掉
    • log.cleanup.policy=delete:该配置参数表示所有日志数据都启用删除策略
      • 基于时间删除:默认开启。以 Segment 中所有记录中的最大时间戳作为该文件时间戳。
      • 基于大小删除:默认关闭。当超过设置的所有日志总大小,就删除最早的 Segment。配置参数是 log.retention.bytes,默认值是 -1,表示无穷大。

特别注意

  • 如果一个 Segment 中有一部分数据过期,但一部分数据没有过期(如果所示),那 Kafka 会怎么处理?
  • 由于 Kafka 默认是基于时间删除日志数据的,因此会以一个 Segment 中所有记录中的最大时间戳作为该文件时间戳,只有该时间戳过期了,才会删除该 Segment 中的所有记录。简而言之,只有等该 Segment 中的数据全部过期了,Kafka 才会删除数据。
  • compact(日志压缩)
    • 针对相同 key 的不同 value 值,只保留最后一个版本(如下图所示)。
    • log.cleanup.policy=compact:该配置参数表示所有日志数据都启用压缩策略。
    • 日志压缩后的 offset 可能是不连续的,比如上图中没有数据 6。当从这些 offset 消费消息时,将会拿到比这个 offset 大的 offset 对应的消息,比如实际上会拿到 offset 为 7 的消息,并从这个位置开始消费。

特别注意

日志压缩策略在生产环境中用得极少,只适合特殊场景。比如,消息的 key 是用户 ID,而 value 则是用户的资料;通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

文件高效读写

Kafka 读写文件(如 .log 文件)非常高效,其主要原因有以下几个:

  • (1) Kafka 本身支持分布式集群,可以采用分区技术,并行度较高。
  • (2) Kafka 读取数据采用了稀疏索引,可以快速定位需要消费的数据。
  • (3) Kafka 是顺序写磁盘,写效率较高。
    • Kafka 的 Producer 生产数据,要写入到 Log 文件时,写的过程是一直追加到文件末端,也就是顺序写。
    • Kafka 官网有数据表明,同样的磁盘,顺序写能到 600M/s, 而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间,如图所示
  • (4) Kafka 使用了页缓存 + 零拷贝技术
    • 页缓存
      • Kafka 重度依赖底层操作系统提供的页缓存(PageCache)功能。实际上,PageCache 是将尽可能多的空闲内存都当做了磁盘缓存来使用。
      • 当上层有写操作时,操作系统只是将数据写入 PageCache;当读操作发生时,先从 PageCache 中查找,如果找不到,再去磁盘中读取。
    • 零拷贝技术
      • 零拷贝技术主要用在 Kafka 的 Broker 中,尤其是在 Broker 将数据从磁盘读取并发送给消费者的过程中。
      • 首先,数据从磁盘读取后被拷贝到内核空间,也就是操作系统内核的页缓存(Page Cache)。然后,Kafka 通过操作系统的 sendfile 系统调用,将数据直接从页缓存发送到网络缓冲区,避免了用户空间的拷贝操作(如下图所示),从而提升了数据传输性能和资源利用效率。

思考:Kafka 重度依赖底层操作系统提供的页缓存(PageCache)功能,那么在写入数据时,如果 PageCache 没来得及落盘(刷写到磁盘),系统就宕机了(如断电),这是否有可能丢失数据呢?

在写入数据时,如果 PageCache 还没来得及将数据刷写到磁盘,并且在此期间发生了系统故障(如断电或崩溃),确实可能会导致数据丢失。不过,Kafka 通过以下机制尽量降低这种风险:

  • Kafka 主动刷盘(fsync)

    • Kafka 允许使用配置参数 log.flush.interval.messageslog.flush.interval.ms,可以控制写入日志的消息数量或时间间隔后强制调用 fsync,确保 PageCache 中的数据被同步到磁盘。
  • 操作系统的刷盘机制

    • 操作系统会定期将 PageCache 中的数据刷写到磁盘(例如 Linux 的 dirty_writeback_interval 配置),但这不是实时的,因此仍可能有短时间窗口导致数据未落盘。
  • 数据副本机制

    • Kafka 的 ISR(In-Sync Replica)机制 会在多个副本中同步数据,只有数据被至少一个副本确认后,生产者才会收到 ACK。这意味着,即使 PageCache 中的数据未落盘,但副本中的数据可以作为备份。
参数描述
log.flush.interval.messages每个分区在写入指定数量的消息后触发一次刷盘操作(将数据从页缓存刷入磁盘),默认值是 Long 的最大值 9223372036854775807,表示不会基于消息数量自动触发刷盘。一般不建议修改,交给系统自己管理。
log.flush.interval.ms每隔多长时间,触发一次刷盘操作(将数据从页缓存刷入磁盘),默认值是 null。一般不建议修改,交给系统自己管理。

总结

尽管 PageCache 提供了性能优化,但 Kafka 的可靠性主要依赖副本机制。如果系统故障发生在数据落盘之前,单个节点上的数据可能会丢失,但 Kafka 通过多副本机制保证了整体的数据可靠性。

模拟 Broker 上下线

假设 Kafka 集群有三个节点(Broker),这里模拟 Kafka 上下线,然后观察 Zookeeper 中的数据变化。

(1) 查看 /kafka/brokers/ids 路径上的节点。

1
2
3
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[0, 1, 2]

(2) 查看 /kafka/controller 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller

{"version":1,"brokerid":0,"timestamp":"1637292471777"}

(3) 查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state

{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}

(4) 关闭 node03 节点上的 Kafka。

1
[centos@node03 kafka]$ bin/kafka-server-stop.sh

(5) 再次查看 /kafka/brokers/ids 路径上的节点。

1
2
3
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[0, 1]

(6) 再次查看 /kafka/controller 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller

{"version":1,"brokerid":0,"timestamp":"1637292471777"}

(7) 再次查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state

{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1]}

(8) 启动 node03 节点上的 Kafka。

1
[centos@node03 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties

(9) 再次观察 (1)、(2)、(3) 步骤中的内容。

Kafka 副本

副本的概念

  • Kafka 副本的作用是提高数据可靠性。
  • Kafka 默认的副本数量为 1 个,生产环境一般配置为 2 个,这可以保证数据可靠性;但太多副本会增加磁盘存储空间,增加网络上的数据传输,降低运行效率。
  • Kafka 中副本分为 Leader 副本和 Follower 副本。Kafka 生产者只会将数据发往 Leader 副本,然后 Follower 副本会从 Leader 副本那里进行数据同步。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
    • AR = ISR + OSR
    • ISR:表示和 Leader 保持同步的 Follower + Leader 集合,简称 同步副本集合。如果 Follower 长时间未向 Leader 发送通信请求或者同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 配置参数设定,默认 30秒。当 Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
    • OSR:表示 Follower 与 Leader 同步数据时,数据同步延迟过高的副本。

副本的选举

副本的 Leader 选举流程

Kafka 集群中每一个 Broker 都含有一个 Controller,其中有一个 Broker 的 Controller 会被选举为 Controller Leader,负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。另外,Controller 的信息同步工作是依赖于 Zookeeper 的。

模拟副本的 Leader 选举

这里假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3。

(1) 创建一个新的 Topic,且拥有 4 个分区和 4 个副本

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --topic first --partitions 4 --replication-factor 4

(2) 查看 Leader 的分布情况,其中的 Replicas 就是 AR(Assigned Repllicas)

1
2
3
4
5
6
7
8
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

Topic: first TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: first Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: first Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3

(3) 关闭掉 node04 节点(broker3)上的 Kafka 进程

1
[centos@node04 kafka]$ bin/kafka-server-stop.sh

(4) 查看 Leader 的分布情况,可以发现分区 0 的 Leader 从 broker3 更换为 broker0

1
2
3
4
5
6
7
8
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

Topic: first TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: first Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: first Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0

副本的分配

模拟副本分配

假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3,当设置 Kafka 的分区数大于节点数时,Kafka 底层是如何分配存储副本呢?

(1) 创建一个新的 Topic,且拥有 16 分区与 3 个副本

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --partitions 16 --replication-factor 3 --topic first

(2) 查看分区和副本情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

手动调整副本存储

提示

在生产环境中,每台服务器的硬件配置和性能不一致,但是 Kafka 只会根据自己的代码规则创建对应的副本,就会导致个别服务器存储压力较大,所以往往需要手动调整副本的存储。

假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3,在这基础上创建一个新的 Topic,名称为 first,拥有 4 个分区,2 个副本。最终需要将该 Topic 的所有副本都存储到 broker0 和 broker1 两台服务器上(如下图所示)。

(1) 创建一个新的 Topic,名称为 first,拥有 4 个分区,2 个副本

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --topic first --partitions 4 --replication-factor 2

(2) 查看副本的存储情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first
1
2
3
4
5
Topic: first	TopicId: TCI_nvlpST28lUqUMDiaHw	PartitionCount: 4	ReplicationFactor: 2	Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: first Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: first Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3

(3) 创建副本存储计划,将所有副本都存储在 broker0、broker1 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[centos@node02 kafka]$ vim increase-replication-factor.json

{
"version": 1,
"partitions": [
{
"topic": "first",
"partition": 0,
"replicas": [0, 1]
},
{
"topic": "first",
"partition": 1,
"replicas": [0, 1]
},
{
"topic": "first",
"partition": 2,
"replicas": [0, 1]
},
{
"topic": "first",
"partition": 3,
"replicas": [0, 1]
}
]
}

特别注意

  • 在上述 JSON 配置文件中,replicas 是指分区副本所在的 Broker ID。
  • 比如,"replicas": [0, 1] 表示该分区的副本分别存储在 Broker 0 和 Broker 1 上。

(4) 执行副本存储计划

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute

(5) 验证副本存储计划的执行

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify

(6) 查看副本的存储情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

可以发现所有副本都存储到 broker0 和 broker1 两台服务器上

1
2
3
4
5
Topic: first	TopicId: TCI_nvlpST28lUqUMDiaHw	PartitionCount: 4	ReplicationFactor: 2	Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: first Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: first Partition: 2 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: first Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0

副本的故障处理

副本的 Leader 故障处理

副本的 Follower 故障处理

Kafka Broker 的最佳实践

服役新节点

服役新节点指的是往 Kafka 集群中动态添加新的节点(Broker)。

特别注意

将新节点(Broker)加入到 Kafka 集群后,需要手动将分区、副本的数据迁移到新节点上,否则新节点形同虚设。

创建新节点

创建新的 Kafka 节点(Broker),并将其加入到已有的 Kafka 集群中,具体操作步骤这里不再累述。

执行数据迁移

这里假设 Kafka 集群原本有 3 个节点(broker0、broker1、broker2),然后新增了一个节点(broker3)。

(1) 创建一个配置文件,指定要迁移数据的 Topic

1
2
3
4
5
6
7
8
9
10
[centos@node02 kafka]$ vim topics-to-move.json

{
"topics": [
{
"topic": "first"
}
],
"version": 1
}

(2) 生成一个数据迁移计划,其中 --broker-list "0,1,2,3" 用于指定 Kafka 集群节点的 ID 列表,也就是说要将数据迁移到这几个节点上

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
1
2
3
4
5
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any"," any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any"," any","any"]}]}

(3) 拷贝步骤 (2) 生成的数据迁移计划,以此创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[centos@node02 kafka]$ vim increase-replication-factor.json

{
"version": 1,
"partitions": [{
"topic": "first",
"partition": 0,
"replicas": [2, 3, 0],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 1,
"replicas": [3, 0, 1],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 2,
"replicas": [0, 1, 2],
"log_dirs": ["any", " any", "any"]
}]
}

特别注意

  • 在上述 JSON 配置文件中,replicas 是指分区副本所在的 Broker ID。
  • 比如,"replicas": [0, 1, 2] 表示该分区的副本分别存储在 Broker 0、Broker 1 和 Broker 2 上。

(4) 执行副本存储计划

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute

(5) 验证副本存储计划的执行

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify
1
2
3
4
5
6
7
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

(6) 查看主题的详细信息,可以发现各个分区的副本数据会存储在新的 Kafka 节点(broker3)上

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --topic first --describe
1
2
3
4
Topic: first     TopicId: _h3inqW0T5ye8kif1P1c3A PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,3,1 Isr: 0,1,3
Topic: first Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 2,0,1
Topic: first Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 1,2,3

退役旧节点

退役旧节点指的是从 Kafka 集群移除某个正在运行的节点(Broker)。

特别注意

在移除某个正在运行的节点(Broker)之前,需要手动对分区、副本的数据进行迁移,否则可能会影响 Kafka 集群的正常运行。

执行数据迁移

这里假设 Kafka 集群原本有 4 个节点(broker0、broker1、broker2、broker3),先按照退役一台节点(如 broker3)来生成执行计划,然后按照节点服役时的操作流程来执行数据迁移操作。

(1) 创建一个配置文件,指定要迁移数据的主题

1
2
3
4
5
6
7
8
[centos@node02 kafka]$ vim topics-to-move.json

{
"topics": [{
"topic": "first"
}],
"version": 1
}

(2) 生成一个数据迁移计划,其中 --broker-list "0,1,2" 用于指定 Kafka 集群节点的 ID 列表(因为要退役 broker3,所以列表里只有 0,1,2 这三个节点),也就是说要将数据迁移到这几个节点上

1
[atguigu@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
1
2
3
4
5
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any"," any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any"," any","any"]}]}

(3) 拷贝步骤 (2) 生成的数据迁移计划,以此创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[centos@node02 kafka]$ vim increase-replication-factor.json

{
"version": 1,
"partitions": [{
"topic": "first",
"partition": 0,
"replicas": [2, 0, 1],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 1,
"replicas": [0, 1, 2],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 2,
"replicas": [1, 2, 0],
"log_dirs": ["any", " any", "any"]
}]
}

特别注意

  • 在上述 JSON 配置文件中,replicas 是指分区副本所在的 Broker ID。
  • 比如,"replicas": [0, 1, 2] 表示该分区的副本分别存储在 Broker 0、Broker 1 和 Broker 2 上。

(4) 执行副本存储计划

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute

(5) 验证副本存储计划的执行

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify
1
2
3
4
5
6
7
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

(6) 查看主题的详细信息,可以发现各个分区的副本数据不会再存储在需要退役的 Kafka 节点(broker3)上

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --topic first --describe
1
2
3
4
Topic: first     TopicId: _h3inqW0T5ye8kif1P1c3A PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 1,2,0 Isr: 0,1,2
Topic: first Partition: 1 Leader: 1 Replicas: 2,0,1 Isr: 2,0,1
Topic: first Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 1,2,0

关闭旧节点

在需要退役的节点上执行关闭命令,最终 Kafka 集群只剩下 3 个节点(broker0、broker1、broker2)

1
[centos@node04 kafka]$ bin/kafka-server-stop.sh

增加分区数量

1
[centos@node02 kafka]$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3

特别注意

Kafka 的分区数量只能增加,不能减少。值得一提的是,Kafka 同样不支持减少已创建的主题的副本数量,但是可以通过 kafka-reassign-partitions.sh 脚本重新分配副本的方式来实现。

自动创建主题

  • 如果将 Broker 端的配置参数 auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,Broker 会自动创建一个分区数为 num.partitions(默认值为 1)、副本因子为 default.replication.factor(默认值为 1)的主题。

  • 除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应的主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。在生产环境下,强烈建议将 auto.create.topics.enable 设置为 false

Kafka 副本的最佳实践

动态增加副本的数量

提示

在生产环境中,由于某个 Topic 的重要等级需要提升,此时可以考虑增加该 Topic 的副本数(也叫增加副本因子)。值得一提的是,副本数的增加需要先制定存储计划,然后根据存储计划执行。

这里假设 Kafka 集群有 3 个节点(Broker),分别是 broker0、broker1、broker2。

(1) 创建一个新的 Topic,名称为 four,拥有 3 个分区,1 个副本

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --partitions 3 --replication-factor 1 --topic four

(2) 查看副本的存储情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic four
1
2
3
4
Topic: four	TopicId: G3To2JZmTfWk7w1pki8Rmw	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
Topic: four Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: four Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: four Partition: 2 Leader: 2 Replicas: 2 Isr: 2

(3) 创建副本存储计划,将每个分区的副本数量增加到 3 个(所有副本都指定存储在 broker0、broker1、broker2 中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[centos@node02 kafka]$ vim increase-replication-factor.json

{
"version": 1,
"partitions": [{
"topic": "four",
"partition": 0,
"replicas": [0, 1, 2]
}, {
"topic": "four",
"partition": 1,
"replicas": [0, 1, 2]
}, {
"topic": "four",
"partition": 2,
"replicas": [0, 1, 2]
}]
}

特别注意

  • 在上述 JSON 配置文件中,replicas 是指分区副本所在的 Broker ID。
  • 比如,"replicas": [0, 1, 2] 表示该分区的副本分别存储在 Broker 0、Broker 1 和 Broker 2 上。

(4) 执行副本存储计划

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute

(5) 验证副本存储计划的执行

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify

(6) 查看副本的存储情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic four
1
2
3
4
Topic: four	TopicId: G3To2JZmTfWk7w1pki8Rmw	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824
Topic: four Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2,0
Topic: four Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: four Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0

Leader Partition 负载平衡

在正常情况下,Kafka 本身会自动将 Leader Partition(即 Leader 副本)均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是,如果某些 Broker 宕机,会导致 Leader Partition 过于集中在其他少部分几台 Broker 上,这会导致少数几台 Broker 的读写请求压力过高;当其他宕机的 Broker 恢复之后都是 Follower Partition,读写请求压力都很低,这就会造成集群负载不均衡。

参数名称描述
auto.leader.rebalance.enable是否启用 Leader Partition 自动平衡,默认值是 true在生产环境中,Leader 重新选举的代价比较大,可能会带来性能影响,建议设置为 false
leader.imbalance.per.broker.percentage每个 Broker 允许的不平衡的 Leader 的比率,默认值是 10%。如果每个 Broker 超过了这个值,控制器(Controller)会触发 Leader 的负载平衡。
leader.imbalance.check.interval.seconds检查 Leader 负载是否平衡的间隔时间(秒),默认值是 300

下面拿一个 Topic 举例说明,假设 Kafka 集群(4 个节点、4 个分区、每个分区有 4 个副本)只有一个主题,如下图所示:

针对 broker0 节点,分区 2 的 AR 优先副本是 broker0 节点,但是 broker0 节点却不是 Leader 节点,所以 broker0 节点的不平衡数加 1。因为 AR 副本总数是 4,所以 broker0 节点不平衡率为 1/4 > 10%,需要再平衡。另外,broker2 节点和 broker3 节点和 broker0 节点的不平衡率一样,因此也需要再平衡;broker1 的不平衡数为 0,不需要再平衡。