Linux 生产环境搭建 Kafka-KRaft 集群

大纲

前言

本文将使用多台物理机器(至少三台),基于 KRaft 模式搭建 Kafka 集群,适用于在 CentOS/Debian/Ubuntu 等发行版。

Kafka-KRaft 模式介绍

KRaft 模式的概述

  • 左图为 Kafka 的 ZooKeeer 模式架构,元数据存储在 Zookeeper 中,运行时会动态选举一个 Broker 节点作为 Controller(唯一),由 Controller 进行 Kafka 集群管理。
  • 左图为 Kafka 的 KRaft 模式架构,不再依赖 Zookeeper 集群,而是使用多个 Controller 节点来代替 Zookeeper,元数据保存在 Controller 中,由 Controller 直接管理 Kafka 集群。
特性 ZooKeeper 模式 KRaft 模式
Leader 选举依赖性需要 ZooKeeper 提供元数据存储和通知机制不依赖 ZooKeeper,完全有 Kafka 自己自行实现
元数据管理由 ZooKeeper 管理由 Kafka 自己的 Raft 集群管理
一致性保障 ZooKeeper 提供一致性 Raft 协议保障一致性
引入版本 Kafka 早期版本(默认是 ZooKeeper 模式)Kafka 2.8.0+(KRaft 模式)

版本说明

从 Kafka 2.8.0 版本开始,Kafka 自身实现了 Raft 分布式一致性机制,这意味着 Kafka 集群可以脱离 ZooKeeper 独立运行。

KRaft 模式的优势

  • Kafka 不再依赖外部服务,而是能够独立运行。
  • Controller 管理集群时,不再需要从 Zookeeper 中先读取数据,提高了集群性能。
  • 由于不依赖 Zookeeper,因此 Kafka 集群扩展时不再受到 Zookeeper 读写能力的限制。
  • Controller 节点不再是通过动态选举来决定,而是由配置文件指定,这样开发者可以有针对性地加强。
  • Controller 节点支持通过配置来指定,而不是像以前一样对随机 Controller 节点的高负载束手无策。

Kafka-KRaft 集群部署

准备工作

集群部署规划

节点节点 ID 主机名角色 IP 地址 Broker 端口 Controller 端口版本
Kafka 集群节点一 1kafka01brokercontroller192.168.2.127909290933.8.1
Kafka 集群节点二 2kafka02brokercontroller192.168.2.150909290933.8.1
Kafka 集群节点三 3kafka03brokercontroller192.168.2.203909290933.8.1

提示

  • 对于 Kafka-KRaft 集群,可以在配置文件中通过 process.roles 配置项来指定某个节点单独作为 Broker 或者单独作为 Controller,还可以指定某个节点同时作为 Broker 和 Controller。
  • 特别注意,为了保证 Kafka-KRaft 集群的高可用性,要求至少要有 3 个 Controller。

添加主机名映射

在所有 Kafka-KRaft 集群节点中,分别通过修改 /etc/hosts 配置文件来添加主机名和 IP 地址的映射关系,用于将主机名解析到指定的 IP 地址。

1
2
# 编辑系统配置文件,添加以下配置内容
# vi /etc/hosts
1
2
3
192.168.2.127   kafka01
192.168.2.150 kafka02
192.168.2.203 kafka03

集群搭建

Kafka 下载地址

  • (1) Kafka 的安装包可以从 官网 下载。
  • (2) 以下载得到的压缩文件 kafka_2.13-3.8.1.tgz 为例,2.11 是 Scala 的版本号,3.2.1 是 Kafka 的版本号。
  • (3) 值得一提的是,Kafka 的 Broker 组件是使用 Scala 开发的,而 Producer 组件和 Consumer 组件是使用 Java 开发的。
  • Kafka 安装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建安装目录
# mkdir -p /usr/local/kafka-cluster

# 进入安装目录
# cd /usr/local/kafka-cluster

# 下载文件
# wget https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz

# 解压文件
# tar -xvf kafka_2.13-3.8.1.tgz

# 重命名目录(作为节点一的安装目录)
# mv kafka_2.13-3.8.1 kafka-node01

# 删除文件
# rm -rf kafka_2.13-3.8.1.tgz
  • Kafka 基础配置
1
2
3
4
5
6
7
8
9
``` sh
# 进入节点一的安装目录
# cd /usr/local/kafka-cluster/kafka-node01

# 创建数据存储目录
# mkdir data

# 编辑 KRaft 的配置文件(更改或添加以下内容即可)
# vi config/kraft/server.properties

最关键的配置项是 process.rolesnode.idcontroller.quorum.votersadvertised.listenerslog.dirs。在生产阶段,其他配置项可以根据业务需求适当调整具体的参数值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 指定节点的角色,可以单独指定为 broker 或者 controller,还可以两者同时指定
process.roles=broker,controller
# Kafka 节点的 ID,必须在集群中唯一
node.id=1
# 定义控制器选举的投票者,格式为 node.id@host:port
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
# 配置 Kafka 使用的监听地址和端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义 Broker 之间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT
# 定义 Broker 对外暴露的监听地址,供客户端连接
advertised.listeners=PLAINTEXT://kafka01:9092
# 定义 Controller 之间通信使用的监听器名称
controller.listener.names=CONTROLLER
# 定义监听器与安全协议的映射关系
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 指定 Kafka 使用的日志(数据)存储目录
log.dirs=/usr/local/kafka-cluster/kafka-node01/data
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# 每个 Topic 在创建时的分区数量,默认是 1 个分区
num.partitions=3
# 用来恢复和清理 Data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 是否启用自动创建主题的功能,默认是启用,禁用后可以避免因拼写错误或误用主题名而自动创建不必要的主题
auto.create.topics.enable=false
# 每个 Topic 在创建时的分区副本数量,默认是 1 个副本
default.replication.factor=3
# 设置内置主题 __consumer_offsets 的副本数量,默认是 1 个副本,用于存储消费者组的偏移量
offsets.topic.replication.factor=3
# 设置事务状态日志主题 __transaction_state 的副本数量,默认是 1 个副本
transaction.state.log.replication.factor=3
# 设置事务状态日志主题的最小同步副本数(ISR),确保至少有多个副本成功同步数据才能提交事务
transaction.state.log.min.isr=2
# 每个 Segment 文件保留的最长时间,超时将被删除,默认保留 7 天
log.retention.hours=168
# 每个 Segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次数据是否过期
log.retention.check.interval.ms=300000

特别注意

  • (1) 这里需要更改是 Kafka-KRaft 的配置文件 config/kraft/server.properties,而不是 Kafka 的默认配置文件 config/server.properties
  • (2) 上述部分配置默认是不存在于 Kafka-KRaft 的配置文件 config/kraft/server.properties 中的,因此需要手动添加缺少的配置项,比如 default.replication.factor
  • Kafka 创建多个节点

通过 scp 命令拷贝两份上面已经配置好的 Kafka 节点一安装目录到其他 Kafka 集群节点上,以此作为集群另外两个节点的安装文件。

1
2
3
4
5
# 拷贝安装目录到节点二
# scp -r /usr/local/kafka-cluster/kafka-node01 root@kafka02:/usr/local/kafka-cluster/kafka-node02

# 拷贝安装目录到节点三
# scp -r /usr/local/kafka-cluster/kafka-node01 root@kafka03:/usr/local/kafka-cluster/kafka-node03

安装目录拷贝完成后,还需要更改另外两个集群节点里的 Kafka-KRaft 配置文件 config/kraft/server.properties 中的 node.idadvertised.listenerslog.dirs 参数。节点二和节点三的核心配置如下:

1
2
3
4
5
6
7
8
9
10
# 节点二的核心配置
node.id=2
process.roles=broker,controller
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka02:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/usr/local/kafka-cluster/kafka-node02/data
1
2
3
4
5
6
7
8
9
10
# 节点三的核心配置
node.id=3
process.roles=broker,controller
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka03:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/usr/local/kafka-cluster/kafka-node03/data

集群启动

初始化存储目录

  • 首先,在任意一个集群节点(比如节点一)中生成一个唯一的集群 ID
1
2
# 节点一生成一个唯一的集群 ID
# /usr/local/kafka-cluster/kafka-node01/bin/kafka-storage.sh random-uuid
1
rNh-6V2lSvy4kC0LmYOl6w
  • 然后,在所有 Kafka-KRaft 集群节点中,分别用上面生成的集群 ID 来初始化 Kafka 的存储目录
1
2
# 初始化节点一的存储目录
# /usr/local/kafka-cluster/kafka-node01/bin/kafka-storage.sh format -t rNh-6V2lSvy4kC0LmYOl6w -c /usr/local/kafka-cluster/kafka-node01/config/kraft/server.properties
1
2
# 初始化节点二的存储目录
# /usr/local/kafka-cluster/kafka-node02/bin/kafka-storage.sh format -t rNh-6V2lSvy4kC0LmYOl6w -c /usr/local/kafka-cluster/kafka-node02/config/kraft/server.properties
1
2
# 初始化节点三的存储目录
# /usr/local/kafka-cluster/kafka-node03/bin/kafka-storage.sh format -t rNh-6V2lSvy4kC0LmYOl6w -c /usr/local/kafka-cluster/kafka-node03/config/kraft/server.properties

启动各个集群节点

在所有 Kafka-KRaft 集群节点中,分别在后台启动 Kafka 节点

1
2
# 启动节点一
# /usr/local/kafka-cluster/kafka-node01/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node01/config/kraft/server.properties
1
2
# 启动节点二
# /usr/local/kafka-cluster/kafka-node02/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node02/config/kraft/server.properties
1
2
# 启动节点三
# /usr/local/kafka-cluster/kafka-node03/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node03/config/kraft/server.properties

查看启动日志信息

在所有 Kafka-KRaft 集群节点中,分别查看 Broker 和 Controller 的启动日志信息,观察 Kafka 节点是否正常启动

1
2
3
4
5
# 查看节点一的 Brokder 日志
# vim /usr/local/kafka-cluster/kafka-node01/logs/server.log

# 查看节点一的 Controller 日志
# vim /usr/local/kafka-cluster/kafka-node01/logs/controller.log
1
2
3
4
5
# 查看节点二的 Brokder 日志
# vim /usr/local/kafka-cluster/kafka-node02/logs/server.log

# 查看节点二的 Controller 日志
# vim /usr/local/kafka-cluster/kafka-node02/logs/controller.log
1
2
3
4
5
# 查看节点三的 Brokder 日志
# vim /usr/local/kafka-cluster/kafka-node03/logs/server.log

# 查看节点三的 Controller 日志
# vim /usr/local/kafka-cluster/kafka-node03/logs/controller.log

前台启动 Kafka 集群节点

若希望更直观地观察 Kafka-KRaft 集群节点的启动日志信息,可以去掉上述节点启动命令中的 -daemon 参数,这样就可以使用前台方式启动 Kafka-KRaft 集群节点。

集群管理

查看状态

在所有 Kafka-KRaft 集群节点中,分别使用以下命令查看集群节点的运行状态。如果发现集群节点启动失败,则可以根据 Kafka 的日志文件来定位问题。

1
2
3
4
5
6
7
8
# 查看端口占用情况
# netstat -nplt | grep -E ':9092|:9093'

# 查看 Kafka 进程
# ps -aux | grep kafka

# 查看所有 Java 进程
# jps -l

集群关闭

1
2
# 关闭节点一
# /usr/local/kafka-cluster/kafka-node01/bin/kafka-server-stop.sh stop
1
2
# 关闭节点二
# /usr/local/kafka-cluster/kafka-node02/bin/kafka-server-stop.sh stop
1
2
# 关闭节点三
# /usr/local/kafka-cluster/kafka-node03/bin/kafka-server-stop.sh stop

集群重建

若希望重建 Kafka-KRaft 集群,可以按照以下步骤进行操作(清空数据目录的操作不可恢复,生产环境下慎用)。

  • (1) 关闭所有 Kafka-KRaft 集群节点
  • (2) 在所有 Kafka-KRaft 集群节点中,分别清空 data 数据目录和 logs 日志目录里的文件
  • (3) 在所有 Kafka-KRaft 集群节点中,分别重新执行 kafka-storage.sh format 命令来初始化 Kafka 的存储目录
  • (4) 重新启动所有 Kafka-KRaft 集群节点

集群测试

  • 进入任意节点(比如节点一)的安装目录下的 bin 目录
1
2
# 进入安装目录
# cd /usr/local/kafka-cluster/kafka-node01/bin
  • 创建主题
1
2
# 创建主题
# ./kafka-topics.sh --bootstrap-server kafka02:9092 --create --partitions 1 --replication-factor 3 --topic test
  • 查看主题列表
1
2
# 查看主题列表
# ./kafka-topics.sh --bootstrap-server kafka02:9092 --list
  • 查看主题详细信息
1
2
# 查看主题详细信息
# ./kafka-topics.sh --bootstrap-server kafka02:9092 --topic test --describe
  • 启动控制台消费者
1
2
# 启动消费者
# ./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test --from-beginning
  • 启动控制台生产者
1
2
# 启动生产者
# ./kafka-console-producer.sh --broker-list kafka02:9092 --topic test
  • 在生产者的控制台手动输入 hello kafka,消费者就可以消费到生产者的消息,并在控制台输出 hello kafka,这表示消费者成功消费了生产者发送的消息!

Kafka-KRaft 集群调优

调整 Kafka 的堆内存大小

若希望调整 Kafka 的堆内存大小,可以找到各个 Kafka-KRaft 集群节点的服务启动脚本文件 bin/kafka-server-start.sh,然后在脚本文件中修改如下参数值:

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
fi