Kafka 入门教程之三
大纲
前言
学习资源
Kafka Broker
Broker 整体工作流程
Zookeeper 存储信息
提示
Kafka 集群中每一个 Broker 都含有一个 Controller,其中有一个 Broker 的 Controller 会被选举为 Controller Leader,负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。另外,Controller 的信息同步工作是依赖于 Zookeeper 的。
整体的工作流程图
Broker 重要参数
模拟 Kafka 上下线
假设 Kafka 集群有三个节点(Broker),这里模拟 Kafka 上下线,然后观察 Zookeeper 中的数据变化。
(1) 查看 /kafka/brokers/ids
路径上的节点。
1 | [zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids |
(2) 查看 /kafka/controller
路径上的数据。
1 | [zk: localhost:2181(CONNECTED) 2] get /kafka/controller |
(3) 查看 /kafka/brokers/topics/first/partitions/0/state
路径上的数据。
1 | [zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state |
(4) 关闭 node03 节点上的 Kafka。
1 | [centos@node03 kafka]$ bin/kafka-server-stop.sh |
(5) 再次查看 /kafka/brokers/ids
路径上的节点。
1 | [zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids |
(6) 再次查看 /kafka/controller
路径上的数据。
1 | [zk: localhost:2181(CONNECTED) 2] get /kafka/controller |
(7) 再次查看 /kafka/brokers/topics/first/partitions/0/state
路径上的数据。
1 | [zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state |
(8) 启动 node03 节点上的 Kafka。
1 | [centos@node03 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties |
(9) 再次观察 (1)、(2)、(3) 步骤中的内容。
Kafka 副本
副本的概念
- Kafka 副本作用:提高数据可靠性。
- Kafka 默认的副本数量为 1 个,生产环境一般配置为 2 个,这可以保证数据可靠性;但太多副本会增加磁盘存储空间,增加网络上的数据传输,降低运行效率。
- Kafka 中副本分为 Leader 副本和 Follower 副本。Kafka 生产者只会将数据发往 Leader 副本,然后 Follower 副本会从 Leader 副本那里进行数据同步。
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
- AR = ISR + OSR
ISR
:表示和 Leader 保持同步的 Follower + Leader 集合。如果 Follower 长时间未向 Leader 发送通信请求或者同步数据,则该 Follower 将被踢出 ISR。该时间阈值由replica.lag.time.max.ms
参数设定,默认 30s。当 Leader 发生故障之后,就会从 ISR 中选举新的 Leader。OSR
:表示 Follower 与 Leader 同步数据时,数据同步延迟过高的副本。
副本的选举
副本的 Leader 选举流程
Kafka 集群中每一个 Broker 都含有一个 Controller,其中有一个 Broker 的 Controller 会被选举为 Controller Leader,负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。另外,Controller 的信息同步工作是依赖于 Zookeeper 的。
模拟副本的 Leader 选举
这里假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3。
(1) 创建一个新的 Topic,且拥有 4 个分区和 4 个副本
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --topic first --partitions 4 --replication-factor 4 |
(2) 查看 Leader 的分布情况,其中的 Replicas
就是 AR(Assigned Repllicas)
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first |
(3) 关闭掉 node04 节点(broker3
)上的 Kafka 进程
1 | [centos@node04 kafka]$ bin/kafka-server-stop.sh |
(4) 查看 Leader 的分布情况,可以发现分区 0 的 Leader 从 broker3
更换为 broker0
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first |
副本的分配
模拟副本分配
假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3,当设置 Kafka 的分区数大于节点数时,Kafka 底层是如何分配存储副本呢?
(1) 创建一个新的 Topic,且拥有 16 分区与 3 个副本
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --partitions 16 --replication-factor 3 --topic first |
(2) 查看分区和副本情况
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first |
手动调整副本存储
提示
在生产环境中,每台服务器的硬件配置和性能不一致,但是 Kafka 只会根据自己的代码规则创建对应的副本,就会导致个别服务器存储压力较大,所以往往需要手动调整副本的存储。
假设 Kafka 集群有 4 个节点(Broker),分别是 broker0、broker1、broker2、broker3,在这基础上创建一个新的 Topic,名称为
first
,拥有 4 个分区,2 个副本。最终需要将该 Topic 的所有副本都存储到 broker0 和 broker1 两台服务器上(如下图所示)。
(1) 创建一个新的 Topic,名称为 first
,拥有 4 个分区,2 个副本
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --create --topic first --partitions 4 --replication-factor 2 |
(2) 查看副本的存储情况
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first |
1 | Topic: first TopicId: TCI_nvlpST28lUqUMDiaHw PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824 |
(3) 创建副本存储计划,将所有副本都存储在 broker0、broker1 中
1 | [centos@node02 kafka]$ vim increase-replication-factor.json |
(4) 执行副本存储计划
1 | [centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute |
(5) 验证副本存储计划的执行
1 | [centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify |
(6) 查看副本的存储情况
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --describe --topic first |
可以发现所有副本都存储到 broker0 和 broker1 两台服务器上
1 | Topic: first TopicId: TCI_nvlpST28lUqUMDiaHw PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824 |
副本的故障处理
副本的 Leader 故障处理
副本的 Follower 故障处理
Kafka Broker 最佳实践
服役新节点
服役新节点指的是往 Kafka 集群中动态添加新的节点(Broker)。
特别注意
将新节点(Broker)加入到 Kafka 集群后,需要手动将分区、副本的数据迁移到新节点上,否则新节点形同虚设。
创建新节点
创建新的 Kafka 节点(Broker),并将其加入到已有的 Kafka 集群中,具体操作步骤这里不再累述。
执行数据迁移
这里假设 Kafka 集群原本有 3 个节点(broker0、broker1、broker2),然后新增了一个节点(
broker3
)。
(1) 创建一个配置文件,指定要迁移数据的 Topic
1 | [centos@node02 kafka]$ vim topics-to-move.json |
(2) 生成一个数据迁移计划,其中 --broker-list "0,1,2,3"
用于指定 Kafka 集群节点的 ID 列表,也就是说要将数据迁移到这几个节点上
1 | [centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate |
1 | Current partition replica assignment |
(3) 拷贝步骤 (2)
生成的数据迁移计划,以此创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)
1 | [centos@node02 kafka]$ vim increase-replication-factor.json |
(4) 执行副本存储计划
1 | [centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute |
(5) 验证副本存储计划的执行
1 | [centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify |
1 | Status of partition reassignment: |
(6) 查看主题的详细信息,可以发现各个分区的副本数据会存储在新的 Kafka 节点(broker3)上
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --topic first --describe |
1 | Topic: first TopicId: _h3inqW0T5ye8kif1P1c3A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 |
退役旧节点
退役旧节点指的是从 Kafka 集群移除某个正在运行的节点(Broker)。
特别注意
在移除某个正在运行的节点(Broker)之前,需要手动对分区、副本的数据进行迁移,否则可能会影响 Kafka 集群的正常运行。
执行数据迁移
这里假设 Kafka 集群原本有 4 个节点(broker0、broker1、broker2、broker3),先按照退役一台节点(如
broker3
)来生成执行计划,然后按照节点服役时的操作流程来执行数据迁移操作。
(1) 创建一个配置文件,指定要迁移数据的主题
1 | [centos@node02 kafka]$ vim topics-to-move.json |
(2) 生成一个数据迁移计划,其中 --broker-list "0,1,2"
用于指定 Kafka 集群节点的 ID 列表(因为要退役 broker3,所以列表里只有 0,1,2
这三个节点),也就是说要将数据迁移到这几个节点上
1 | [atguigu@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate |
1 | Current partition replica assignment |
(3) 拷贝步骤 (2)
生成的数据迁移计划,以此创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)
1 | [centos@node02 kafka]$ vim increase-replication-factor.json |
(4) 执行副本存储计划
1 | [centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --execute |
(5) 验证副本存储计划的执行
1 | [centos@node02 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server node02:9092 --reassignment-json-file increase-replication-factor.json --verify |
1 | Status of partition reassignment: |
(6) 查看主题的详细信息,可以发现各个分区的副本数据不会再存储在需要退役的 Kafka 节点(broker3)上
1 | [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server node02:9092 --topic first --describe |
1 | Topic: first TopicId: _h3inqW0T5ye8kif1P1c3A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 |
关闭旧节点
在需要退役的节点上执行关闭命令,最终 Kafka 集群只剩下 3 个节点(broker0、broker1、broker2)
1 | [centos@node04 kafka]$ bin/kafka-server-stop.sh |