大纲
前言
本文将使用多台物理机器(至少三台),基于 KRaft 模式搭建 Kafka 集群,适用于在 CentOS/Debian/Ubuntu 等发行版。
Kafka-KRaft 模式介绍
KRaft 模式的概述
- 左图为 Kafka 的 ZooKeeer 模式架构,元数据存储在 Zookeeper 中,运行时会动态选举一个 Broker 节点作为 Controller(唯一),由 Controller 进行 Kafka 集群管理。
- 左图为 Kafka 的 KRaft 模式架构,不再依赖 Zookeeper 集群,而是使用多个 Controller 节点来代替 Zookeeper,元数据保存在 Controller 中,由 Controller 直接管理 Kafka 集群。
特性 | ZooKeeper 模式 | KRaft 模式 |
---|
Leader 选举依赖性 | 需要 ZooKeeper 提供元数据存储和通知机制 | 不依赖 ZooKeeper,完全有 Kafka 自己自行实现 |
元数据管理 | 由 ZooKeeper 管理 | 由 Kafka 自己的 Raft 集群管理 |
一致性保障 | ZooKeeper 提供一致性 | Raft 协议保障一致性 |
引入版本 | Kafka 早期版本(默认是 ZooKeeper 模式) | Kafka 2.8.0+ (KRaft 模式) |
版本说明
从 Kafka 2.8.0
版本开始,Kafka 自身实现了 Raft 分布式一致性机制,这意味着 Kafka 集群可以脱离 ZooKeeper 独立运行。
KRaft 模式的优势
- Kafka 不再依赖外部服务,而是能够独立运行。
- Controller 管理集群时,不再需要从 Zookeeper 中先读取数据,提高了集群性能。
- 由于不依赖 Zookeeper,因此 Kafka 集群扩展时不再受到 Zookeeper 读写能力的限制。
- Controller 节点不再是通过动态选举来决定,而是由配置文件指定,这样开发者可以有针对性地加强。
- Controller 节点支持通过配置来指定,而不是像以前一样对随机 Controller 节点的高负载束手无策。
Kafka-KRaft 集群部署
准备工作
集群部署规划
节点 | 节点 ID | 主机名 | 角色 | IP 地址 | Broker 端口 | Controller 端口 | 版本 |
---|
Kafka 集群节点一 | 1 | kafka01 | broker 、controller | 192.168.2.127 | 9092 | 9093 | 3.8.1 |
Kafka 集群节点二 | 2 | kafka02 | broker 、controller | 192.168.2.150 | 9092 | 9093 | 3.8.1 |
Kafka 集群节点三 | 3 | kafka03 | broker 、controller | 192.168.2.203 | 9092 | 9093 | 3.8.1 |
提示
- 对于 Kafka-KRaft 集群,可以在配置文件中通过
process.roles
配置项来指定某个节点单独作为 Broker 或者单独作为 Controller,还可以指定某个节点同时作为 Broker 和 Controller。 - 特别注意,为了保证 Kafka-KRaft 集群的高可用性,要求至少要有 3 个 Controller。
添加主机名映射
在所有 Kafka-KRaft 集群节点中,分别通过修改 /etc/hosts
配置文件来添加主机名和 IP 地址的映射关系,用于将主机名解析到指定的 IP 地址。
1 2 3
| 192.168.2.127 kafka01 192.168.2.150 kafka02 192.168.2.203 kafka03
|
集群搭建
Kafka 下载地址
- (1) Kafka 的安装包可以从 官网 下载。
- (2) 以下载得到的压缩文件
kafka_2.13-3.8.1.tgz
为例,2.11
是 Scala 的版本号,3.2.1
是 Kafka 的版本号。 - (3) 值得一提的是,Kafka 的 Broker 组件是使用 Scala 开发的,而 Producer 组件和 Consumer 组件是使用 Java 开发的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
|
最关键的配置项是 process.roles
、node.id
、controller.quorum.voters
、advertised.listeners
、log.dirs
。在生产阶段,其他配置项可以根据业务需求适当调整具体的参数值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka01:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/usr/local/kafka-cluster/kafka-node01/data
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=false
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
|
特别注意
- (1) 这里需要更改是 Kafka-KRaft 的配置文件
config/kraft/server.properties
,而不是 Kafka 的默认配置文件 config/server.properties
。 - (2) 上述部分配置默认是不存在于 Kafka-KRaft 的配置文件
config/kraft/server.properties
中的,因此需要手动添加缺少的配置项,比如 default.replication.factor
。
通过 scp
命令拷贝两份上面已经配置好的 Kafka 节点一安装目录到其他 Kafka 集群节点上,以此作为集群另外两个节点的安装文件。
安装目录拷贝完成后,还需要更改另外两个集群节点里的 Kafka-KRaft 配置文件 config/kraft/server.properties
中的 node.id
、advertised.listeners
、log.dirs
参数。节点二和节点三的核心配置如下:
1 2 3 4 5 6 7 8 9 10
| node.id=2 process.roles=broker,controller controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 inter.broker.listener.name=PLAINTEXT advertised.listeners=PLAINTEXT://kafka02:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.dirs=/usr/local/kafka-cluster/kafka-node02/data
|
1 2 3 4 5 6 7 8 9 10
| node.id=3 process.roles=broker,controller controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 inter.broker.listener.name=PLAINTEXT advertised.listeners=PLAINTEXT://kafka03:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.dirs=/usr/local/kafka-cluster/kafka-node03/data
|
集群启动
初始化存储目录
- 首先,在任意一个集群节点(比如节点一)中生成一个唯一的集群 ID
- 然后,在所有 Kafka-KRaft 集群节点中,分别用上面生成的集群 ID 来初始化 Kafka 的存储目录
启动各个集群节点
在所有 Kafka-KRaft 集群节点中,分别在后台启动 Kafka 节点
查看启动日志信息
在所有 Kafka-KRaft 集群节点中,分别查看 Broker 和 Controller 的启动日志信息,观察 Kafka 节点是否正常启动
前台启动 Kafka 集群节点
若希望更直观地观察 Kafka-KRaft 集群节点的启动日志信息,可以去掉上述节点启动命令中的 -daemon
参数,这样就可以使用前台方式启动 Kafka-KRaft 集群节点。
集群管理
查看状态
在所有 Kafka-KRaft 集群节点中,分别使用以下命令查看集群节点的运行状态。如果发现集群节点启动失败,则可以根据 Kafka 的日志文件来定位问题。
集群关闭
集群重建
若希望重建 Kafka-KRaft 集群,可以按照以下步骤进行操作(清空数据目录的操作不可恢复,生产环境下慎用)。
- (1) 关闭所有 Kafka-KRaft 集群节点
- (2) 在所有 Kafka-KRaft 集群节点中,分别清空
data
数据目录和 logs
日志目录里的文件 - (3) 在所有 Kafka-KRaft 集群节点中,分别重新执行
kafka-storage.sh format
命令来初始化 Kafka 的存储目录 - (4) 重新启动所有 Kafka-KRaft 集群节点
集群测试
- 进入任意节点(比如节点一)的安装目录下的
bin
目录
- 在生产者的控制台手动输入
hello kafka
,消费者就可以消费到生产者的消息,并在控制台输出 hello kafka
,这表示消费者成功消费了生产者发送的消息!
Kafka-KRaft 集群调优
调整 Kafka 的堆内存大小
若希望调整 Kafka 的堆内存大小,可以找到各个 Kafka-KRaft 集群节点的服务启动脚本文件 bin/kafka-server-start.sh
,然后在脚本文件中修改如下参数值:
1 2 3
| if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G" fi
|