Kafka 入门教程之七
大纲
前言
学习资源
Kafka-KRaft 模式
KRaft 模式的概述
- 左图为 Kafka 的 ZooKeeer 模式架构,元数据存储在 Zookeeper 中,运行时会动态选举一个 Broker 节点作为 Controller(唯一),由 Controller 进行 Kafka 集群管理。
- 左图为 Kafka 的 KRaft 模式架构,不再依赖 Zookeeper 集群,而是使用多个 Controller 节点来代替 Zookeeper,元数据保存在 Controller 中,由 Controller 直接管理 Kafka 集群。
特性 | ZooKeeper 模式 | KRaft 模式 |
---|---|---|
Leader 选举依赖性 | 需要 ZooKeeper 提供元数据存储和通知机制 | 不依赖 ZooKeeper,完全有 Kafka 自己自行实现 |
元数据管理 | 由 ZooKeeper 管理 | 由 Kafka 自己的 Raft 集群管理 |
一致性保障 | ZooKeeper 提供一致性 | Raft 协议保障一致性 |
引入版本 | Kafka 早期版本(默认是 ZooKeeper 模式) | Kafka 2.8.0+ (KRaft 模式) |
版本说明
从 Kafka 2.8.0
版本开始,Kafka 自身实现了 Raft 分布式一致性机制,这意味着 Kafka 集群可以脱离 ZooKeeper 独立运行。
KRaft 模式的优势
- Kafka 不再依赖外部服务,而是能够独立运行。
- Controller 管理集群时,不再需要从 Zookeeper 中先读取数据,提高了集群性能。
- 由于不依赖 Zookeeper,因此 Kafka 集群扩展时不再受到 Zookeeper 读写能力的限制。
- Controller 节点不再是通过动态选举来决定,而是由配置文件指定,这样开发者可以有针对性地加强。
- Controller 节点支持通过配置来指定,而不是像以前一样对随机 Controller 节点的高负载束手无策。
Kafka 选举的概念
ZooKeeper 模式下的选举
在 ZooKeeper 模式下,Controller 的责职
- Controller 负责管理和分发 Kafka 集群的元数据信息,例如 Topic、分区和副本的状态。
- Controller 负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。
- 当分区的 Leader 副本失效时,Controller 负责触发分区副本的 Leader 选举,并将新的 Leader 信息更新到 ZooKeeper 和其他 Broker。
Controller 的选举
- 选举机制
- Kafka 集群的 Controller 是通过 ZooKeeper 进行选举的。
- 在 Broker 启动时,每个 Broker 都会尝试在 ZooKeeper 的
/controller
节点上创建一个临时节点。 - 成功创建该节点的 Broker 就成为 Controller,其它 Broker 则成为普通 Broker。
- 触发条件
- 如果当前的 Controller 因故障宕机(例如网络中断),ZooKeeper 会删除该 Controller 所在 Broker 的临时节点,然后触发新的 Controller 选举。
- 选举机制
分区副本的 Leader 选举
- 选举机制
- 由 Controller 从 ZooKeeper 获取 ISR 列表,选出新的分区副本 Leader 并更新元数据。
- 优先从 ISR 列表中选择健康的副本作为 Leader。
- 触发条件
- 当分区的 Leader 副本失效(如 Broker 宕机)。
- Controller 检测到 ISR(同步副本列表)发生变化。
- 更新通知
- 选举完成后,元数据更新至所有 Broker,客户端根据新 Leader 继续读写操作。
- 选举机制
两种选举的区别
- Controller 的选举:由 ZooKeeper 决定,目的是选出集群的管理者。
- 分区副本的 Leader 选举:由 Controller 负责发起,目的是为每个分区在 ISR 列表中选出一个新的 Leader 副本。
特别注意
- 在 ZooKeeper 模式下,Kafka 集群中的 Controller 是通过 ZooKeeper 选举产生的,集群中同一时刻只能有一个活跃的 Controller(唯一)。其他 Broker 则处于非 Controller 状态,只处理分区的读写请求。
- 在 ZooKeeper 模式下,Kafka 集群中同一时刻只能有一个活跃的 Controller(唯一),这种单点的设计简化了管理,但也增加了 Controller 故障时的切换开销和延迟。
- 在 KRaft 模式下,Kafka 使用 Raft 协议来支持运行多个 Quorum Controller 节点,从而提高了可用性和一致性。
KRaft 模式下的选举
在 KRaft 模式下,Controller 的责职
- Controller 是负责管理元数据的专用节点。
- Quorum Controller 是一个逻辑角色,可以分布在多个 Kafka Broker 上。
- Quorum Controller 集群是通过 Raft 共识协议来保证元数据的一致性和高可用性的。
Controller 的选举
- 在 KRaft 模式下,Controller 的选举是通过 Raft 共识算法完成的,而不再依赖 ZooKeeper。
- 在多个 Controller 节点中,会有一个被选举为 Leader Controller,负责处理元数据更新和变更请求。
- 其他 Controller 节点(Follower Controller)则通过 Raft 协议复制 Leader Controller 的元数据日志。
分区副本的 Leader 选举
- Controller 使用 Raft 协议直接管理元数据并选举副本 Leader,无需依赖 ZooKeeper。
- Controller 根据副本状态(如 ISR 列表)决定新的副本 Leader。
- 元数据变更后,通过 Raft 协议同步到所有 Quorum Controller 节点,无需依赖 ZooKeeper。
Kafka 生产者源码分析
提示
- 在阅读和调试 Kafka 的源码之前,必须先搭建 Kafka 源码的阅读环境,详细教程请看 这里。
- 验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。
生产者的工作原理
发送消息的流程
Main 线程的初始化
Sender 线程的初始化
发送数据到缓冲区
Sender 线程发送数据
生产者的源码分析
Main 线程的初始化
这里的核心类是 clients
模块下的 KafkaProducer
类,最核心的是 KafkaProducer()
构造方法,如下所示:
1 | KafkaProducer(ProducerConfig config, |
Sender 线程的初始化
这里的核心类是 clients
模块下的 KafkaProducer
类,最核心的是 newSender()
方法,如下所示:
1 | Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { |
Sendler 类实现了 Runnable 接口,Main 线程会将 Sender 对象放到一个线程(KafkaThread 类)中启动。Sender 类的 run()
方法如下所示:
1 |
|
Main 线程发送数据到缓冲区
发送总体流程
这里的核心类是 clients
模块下的 KafkaProducer
类,最核心的是 send()
方法,如下所示:
1 |
|
send()
会调用 onSend()
方法让拦截器处理待发送的数据,如下所示:
1 | public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { |
send()
会调用 doSend()
方法将数据发送给 Kafka,如下所示:
1 | private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { |
分区的选择
这里的核心类是 clients
模块下的 KafkaProducer
类,最核心的是 partition()
方法,如下所示:
1 | private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { |
分区器都实现了 Partitioner
接口,默认的分区器实现类是 DefaultPartitioner
,如下所示:
1 | public class DefaultPartitioner implements Partitioner { |
如果在发送数据时没有指定 Key,那么会调用 StickyPartitionCache.partition()
方法来获取分区号,如下所示:
1 | public class StickyPartitionCache { |
Kafka 的分区策略
- (1) 在指明
partition
的情况下,直接将指明的值作为partition
值。例如:partition=0
,那么数据会被写入分区 0。 - (2) 在没有指明
partition
值,但有指定key
的情况下,将key
的 Hash 值与topic
的partition
数进行取余来得到partition
值。例如:key
的 Hash 值是 5,topic
的partition
数是 2,那么key
对应的value
会被写入 1 号分区。 - (3) 在既没有指明
partition
值,又没有指定key
的情况下,Kafka 会采用Sticky Partition
黏性分区器,也就是会随机选择一个分区,并尽可能一直使用该分区,等该分区的batch
已满或者已完成,Kafka 再随机一个分区进行使用(和上一次选的分区不同)。例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16K 大小)或者linger.ms
设置的时间到了,Kafka 会再随机选择一个分区进行使用(如果还是 0 分区会继续随机选择一个分区)。
发送数据大小校验
这里的核心类是 clients
模块下的 KafkaProducer
类,最核心的是 ensureValidRecordSize()
方法,如下所示:
1 | private void ensureValidRecordSize(int size) { |
内存池的使用
在 clients
模块下的 KafkaProducer
类的 doSend()
方法中,有以下一段代码:
1 | RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, |
RecordAccumulator 类的 append()
方法的代码如下:
1 | public RecordAppendResult append(TopicPartition tp, |
RecordAccumulator 类的 getOrCreateDeque()
方法的代码如下:
1 | private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; |
Sender 线程发送数据到服务端
在 clients
模块下的 KafkaProducer
类的 doSend()
方法中,有以下一段代码:
1 | if (result.batchIsFull || result.newBatchCreated) { |
Sendler 类实现了 Runnable
接口,查看 Sender 类的 run()
方法,代码如下:
1 |
|
Sender 类的 runOnce()
方法,代码如下:
1 | void runOnce() { |
Sender 类的 sendProducerData()
方法,代码如下:
1 | private long sendProducerData(long now) { |
RecordAccumulator 类的 ready()
方法,代码如下:
1 | // 检查缓冲区(默认大小是 32m)的数据是否准备好 |
RecordAccumulator 类的 drain()
方法,代码如下:
1 | // 将发往同一个 Broker 节点的数据,封装为一个请求批次 |
Sender 类的 sendProduceRequest()
方法,代码如下:
1 | /** |
NetworkClient 的 send()
方法,代码如下:
1 | /** |
NetworkClient 的 poll()
方法,代码如下:
1 | // 获取发送的响应结果 |
Kafka 消费者源码分析
提示
- 在阅读和调试 Kafka 的源码之前,必须先搭建 Kafka 源码的阅读环境,详细教程请看 这里。
- 验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。
消费者的工作原理
消费者的初始化
消费者订阅主题
消费者拉取和处理消息
消费者组的消费流程
拉取和处理消息的流程
消费者提交 Offset
消费者组的初始化流程
消费者的源码分析
消费者的初始化
这里的核心类是 clients
模块下的 KafkaConsumer 类,最核心的是 KafkaConsumer()
构造方法,如下所示:
1 | KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { |
消费者订阅主题
这里的核心类是 clients
模块下的 KafkaConsumer 类,最核心的是 subscribe()
方法,如下所示:
1 |
|
SubscriptionState 的 subscribe()
方法,代码如下:
1 | public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) { |
SubscriptionState 的 changeSubscription()
方法,代码如下:
1 | private boolean changeSubscription(Set<String> topicsToSubscribe) { |
Metadata 类的 requestUpdateForNewTopics()
方法,代码如下:
1 | // 如果订阅的主题和以前订阅的不一致,就需要更新元数据信息 |
消费者拉取和处理消息
消费者拉取消息
这里的核心类是 clients
模块下的 KafkaConsumer 类,最核心的是 poll()
方法,如下所示:
1 |
|
KafkaConsumer 类的 pollForFetches()
方法,代码如下:
1 | private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { |
Fetcher 类的 sendFetches()
方法,代码如下:
1 | public synchronized int sendFetches() { |
Fetcher 类的 fetchedRecords()
方法,代码如下:
1 | // 从本地的队列(内存)里拉取数据,一次默认最多拉取 500 条数据 |
消费者处理消息
在 KafkaConsumer 中的 poll()
方法中,有以下一段代码:
1 | // 开始拉取数据 |
ConsumerInterceptors 的 onConsume()
方法,代码如下:
1 | public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { |
消费者提交 Offset
手动同步提交 Offset
这里的核心类是 clients
模块下的 KafkaConsumer 类,最核心的是 commitSync()
方法,如下所示:
1 |
|
ConsumerCoordinator 的 commitOffsetsSync()
方法,代码如下:
1 | // 同步提交 Offset |
手动异步提交 Offset
这里的核心类是 clients
模块下的 KafkaConsumer 类,最核心的是 commitAsync()
方法,如下所示:
1 |
|
ConsumerCoordinator 的 commitOffsetsAsync()
方法,代码如下:
1 | public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { |
消费者组的初始化流程
这里的核心类是 clients
模块下的 KafkaConsumer 类,最核心的是 updateAssignmentMetadataIfNeeded()
方法,如下所示:
1 | boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) { |
ConsumerCoordinator 的 poll()
方法,代码如下:
1 | public boolean poll(Timer timer, boolean waitForJoinGroup) { |
AbstractCoordinator 的 ensureCoordinatorReady()
方法,代码如下:
1 | protected synchronized boolean ensureCoordinatorReady(final Timer timer) { |
Kafka 服务端源码分析
提示
- 在阅读和调试 Kafka 的源码之前,必须先搭建 Kafka 源码的阅读环境,详细教程请看 这里。
- 验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。
- Kafka 的服务端(Broker)主要是基于 Scala 语言开发的,因此在阅读 Kafka 服务端的源码之前,必须熟练掌握 Scala 语言。
服务端的工作原理
服务端的整体工作流程
服务端的源码分析
服务端的主启动类
Kafka 服务端的主启动类是 core/src/main/scala/kafka/Kafka.scala
,在 IntelliJ IDEA 中打开该主启动类,并启动 main()
方法即可运行服务端(前提是 Kafka 的源码阅读环境已经搭建好),如下图所示:
Kafka 服务端主启动类中的 main()
方法,代码如下:
1 | def main(args: Array[String]): Unit = { |