Kafka 入门教程之三

大纲

前言

学习资源

Kafka Broker

Broker 整体工作流程

Zookeeper 存储信息

提示

Kafka 集群中每一个 Broker 都含有一个 Controller,其中有一个 Broker 的 Controller 会被选举为 Controller Leader,负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。另外,Controller 的信息同步工作是依赖于 Zookeeper 的。

整体的工作流程图

Broker 重要参数

模拟 Kafka 上下线

假设 Kafka 集群有三个节点(Broker),这里模拟 Kafka 上下线,然后观察 Zookeeper 中的数据变化。

(1) 查看 /kafka/brokers/ids 路径上的节点。

1
2
3
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[0, 1, 2]

(2) 查看 /kafka/controller 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller

{"version":1,"brokerid":0,"timestamp":"1637292471777"}

(3) 查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state

{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}

(4) 关闭 node03 节点上的 Kafka。

1
[centos@node03 kafka]$ bin/kafka-server-stop.sh

(5) 再次查看 /kafka/brokers/ids 路径上的节点。

1
2
3
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[0, 1]

(6) 再次查看 /kafka/controller 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller

{"version":1,"brokerid":0,"timestamp":"1637292471777"}

(7) 再次查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据。

1
2
3
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state

{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1]}

(8) 启动 node03 节点上的 Kafka。

1
[centos@node03 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties

(9) 再次观察 (1)、(2)、(3) 步骤中的内容。

Kafka 副本

副本的概念

  • Kafka 副本作用:提高数据可靠性。
  • Kafka 默认的副本数量为 1 个,生产环境一般配置为 2 个,这可以保证数据可靠性;但太多副本会增加磁盘存储空间,增加网络上的数据传输,降低运行效率。
  • Kafka 中副本分为 Leader 副本和 Follower 副本。Kafka 生产者只会将数据发往 Leader 副本,然后 Follower 副本会从 Leader 副本那里进行数据同步。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
    • AR = ISR + OSR
    • ISR:表示和 Leader 保持同步的 Follower + Leader 集合。如果 Follower 长时间未向 Leader 发送通信请求或者同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。当 Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
    • OSR:表示 Follower 与 Leader 同步数据时,数据同步延迟过高的副本。

副本的选举

副本的 Leader 选举流程

Kafka 集群中每一个 Broker 都含有一个 Controller,其中有一个 Broker 的 Controller 会被选举为 Controller Leader,负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。另外,Controller 的信息同步工作是依赖于 Zookeeper 的。

模拟副本的 Leader 选举

这里假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3。

(1) 创建一个新的 Topic,且拥有 4 个分区和 4 个副本

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --topic first --partitions 4 --replication-factor 4

(2) 查看 Leader 的分布情况,其中的 Replicas 就是 AR(Assigned Repllicas)

1
2
3
4
5
6
7
8
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

Topic: first TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: first Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: first Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3

(3) 关闭掉 node04 节点(broker3)上的 Kafka 进程

1
[centos@node04 kafka]$ bin/kafka-server-stop.sh

(4) 查看 Leader 的分布情况,可以发现分区 0 的 Leader 从 broker3 更换为 broker0

1
2
3
4
5
6
7
8
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

Topic: first TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: first Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: first Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0

副本的分配

模拟副本分配

假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3,当设置 Kafka 的分区数大于节点数时,Kafka 底层是如何分配存储副本呢?

(1) 创建一个新的 Topic,且拥有 16 分区与 3 个副本

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --partitions 16 --replication-factor 3 --topic first

(2) 查看分区和副本情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

手动调整副本存储

提示

在生产环境中,每台服务器的硬件配置和性能不一致,但是 Kafka 只会根据自己的代码规则创建对应的副本,就会导致个别服务器存储压力较大,所以往往需要手动调整副本的存储。

假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3,在这基础上创建一个新的 Topic,名称为 first,拥有 4 个分区,2 个副本。最终需要将该 Topic 的所有副本都存储到 broker0 和 broker1 两台服务器上(如下图所示)。

(1) 创建一个新的 Topic,名称为 first,拥有 4 个分区,2 个副本

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --topic first --partitions 4 --replication-factor 2

(2) 查看副本的存储情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first
1
2
3
4
5
Topic: first	TopicId: TCI_nvlpST28lUqUMDiaHw	PartitionCount: 4	ReplicationFactor: 2	Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: first Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: first Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3

(3) 创建副本存储计划,将所有副本都存储在 broker0、broker1 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
[centos@node02 kafka]$ vim increase-replication-factor.json

{
"version": 1,
"partitions": [
{
"topic": "first",
"partition": 0,
"replicas": [
0,
1
]
},
{
"topic": "first",
"partition": 1,
"replicas": [
0,
1
]
},
{
"topic": "first",
"partition": 2,
"replicas": [
0,
1
]
},
{
"topic": "first",
"partition": 3,
"replicas": [
0,
1
]
}
]
}

(4) 执行副本存储计划

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute

(5) 验证副本存储计划的执行

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify

(6) 查看副本的存储情况

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first

可以发现所有副本都存储到 broker0 和 broker1 两台服务器上

1
2
3
4
5
Topic: first	TopicId: TCI_nvlpST28lUqUMDiaHw	PartitionCount: 4	ReplicationFactor: 2	Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: first Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: first Partition: 2 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: first Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0

副本的故障处理

副本的 Leader 故障处理

副本的 Follower 故障处理

Kafka Broker 最佳实践

服役新节点

服役新节点指的是往 Kafka 集群中动态添加新的节点(Broker)。

特别注意

将新节点(Broker)加入到 Kafka 集群后,需要手动将分区、副本的数据迁移到新节点上,否则新节点形同虚设。

创建新节点

创建新的 Kafka 节点(Broker),并将其加入到已有的 Kafka 集群中,具体操作步骤这里不再累述。

执行数据迁移

这里假设 Kafka 集群原本有 3 个节点(broker0、broker1、broker2),然后新增了一个节点(broker3)。

(1) 创建一个配置文件,指定要迁移数据的 Topic

1
2
3
4
5
6
7
8
9
10
[centos@node02 kafka]$ vim topics-to-move.json

{
"topics": [
{
"topic": "first"
}
],
"version": 1
}

(2) 生成一个数据迁移计划,其中 --broker-list "0,1,2,3" 用于指定 Kafka 集群节点的 ID 列表,也就是说要将数据迁移到这几个节点上

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
1
2
3
4
5
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any"," any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any"," any","any"]}]}

(3) 拷贝步骤 (2) 生成的数据迁移计划,以此创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[centos@node02 kafka]$ vim increase-replication-factor.json

{
"version": 1,
"partitions": [{
"topic": "first",
"partition": 0,
"replicas": [2, 3, 0],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 1,
"replicas": [3, 0, 1],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 2,
"replicas": [0, 1, 2],
"log_dirs": ["any", " any", "any"]
}]
}

(4) 执行副本存储计划

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute

(5) 验证副本存储计划的执行

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify
1
2
3
4
5
6
7
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

(6) 查看主题的详细信息,可以发现各个分区的副本数据会存储在新的 Kafka 节点(broker3)上

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --topic first --describe
1
2
3
4
Topic: first     TopicId: _h3inqW0T5ye8kif1P1c3A PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,3,1 Isr: 0,1,3
Topic: first Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 2,0,1
Topic: first Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 1,2,3

退役旧节点

退役旧节点指的是从 Kafka 集群移除某个正在运行的节点(Broker)。

特别注意

在移除某个正在运行的节点(Broker)之前,需要手动对分区、副本的数据进行迁移,否则可能会影响 Kafka 集群的正常运行。

执行数据迁移

这里假设 Kafka 集群原本有 4 个节点(broker0、broker1、broker2、broker3),先按照退役一台节点(如 broker3)来生成执行计划,然后按照节点服役时的操作流程来执行数据迁移操作。

(1) 创建一个配置文件,指定要迁移数据的主题

1
2
3
4
5
6
7
8
[centos@node02 kafka]$ vim topics-to-move.json

{
"topics": [{
"topic": "first"
}],
"version": 1
}

(2) 生成一个数据迁移计划,其中 --broker-list "0,1,2" 用于指定 Kafka 集群节点的 ID 列表(因为要退役 broker3,所以列表里只有 0,1,2 这三个节点),也就是说要将数据迁移到这几个节点上

1
[atguigu@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
1
2
3
4
5
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any"," any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any"," any","any"]}]}

(3) 拷贝步骤 (2) 生成的数据迁移计划,以此创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[centos@node02 kafka]$ vim increase-replication-factor.json

{
"version": 1,
"partitions": [{
"topic": "first",
"partition": 0,
"replicas": [2, 0, 1],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 1,
"replicas": [0, 1, 2],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 2,
"replicas": [1, 2, 0],
"log_dirs": ["any", " any", "any"]
}]
}

(4) 执行副本存储计划

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute

(5) 验证副本存储计划的执行

1
[centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify
1
2
3
4
5
6
7
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

(6) 查看主题的详细信息,可以发现各个分区的副本数据不会再存储在需要退役的 Kafka 节点(broker3)上

1
[centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --topic first --describe
1
2
3
4
Topic: first     TopicId: _h3inqW0T5ye8kif1P1c3A PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 1,2,0 Isr: 0,1,2
Topic: first Partition: 1 Leader: 1 Replicas: 2,0,1 Isr: 2,0,1
Topic: first Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 1,2,0
关闭旧节点

在需要退役的节点上执行关闭命令,最终 Kafka 集群只剩下 3 个节点(broker0、broker1、broker2)

1
[centos@node04 kafka]$ bin/kafka-server-stop.sh