Linux 单机搭建 Kafka 集群

前言

本文适用于在 Centos/Debian/Ubuntu 等 Linux 发行版系统上,使用单机搭建 Kafka 集群。

Zookeeper 集群搭建

本文的 Kafka 集群搭建依赖于 Zookeeper,因此需要将 Zookeeper 单机集群提前搭建起来。值得一提的是,从 Kafka 2.8.0 版本开始,Kafka 自身实现了 Raft 分布式一致性机制,这意味着 Kafka 集群是可以脱离 ZooKeeper 独立运行的。

集群规划

节点 IP 地址端口版本号
Zookeeper 节点 1127.0.0.121813.4.10
Zookeeper 节点 2127.0.0.121823.4.10
Zookeeper 节点 3127.0.0.121833.4.10

集群搭建

由于篇幅有限,Linux 单机搭建 Zookeeper 集群的内容这里不再累述,详细教程可看 这里

Kafka 集群搭建

集群规划

节点 IP 地址端口版本号
Kafka 节点 1127.0.0.190922.13-3.2.1
Kafka 节点 2127.0.0.190932.13-3.2.1
Kafka 节点 3127.0.0.190942.13-3.2.1

集群搭建

  • Kafka 下载
  1. Kafka 的安装包可以从 官网 下载。
  2. 以下载得到的压缩文件 kafka_2.13-3.2.1.tgz 为例,2.11 是 Scala 的版本号,3.2.1 是 Kafka 的版本号。
  3. Kafka 的 Broker 组件是使用 Scala 开发的,而 Producer 组件和 Consumer 组件是使用 Java 开发的。
  • Kafka 安装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建安装目录
# mkdir -p /usr/local/kafka-cluster

# 进入安装目录
# cd /usr/local/kafka-cluster

# 下载文件
# wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz

# 解压文件
# tar -xvf kafka_2.13-3.2.1.tgz

# 重命名目录
# mv kafka_2.13-3.2.1 kafka-node1

# 删除文件
# rm -rf kafka_2.13-3.2.1.tgz
  • Kafka 基础配置
1
2
3
4
5
6
7
8
# 进入安装目录
# cd /usr/local/kafka-cluster/kafka-node1

# 创建日志目录(数据存储目录)
# mkdir logs

# 编辑配置文件(指定以下内容即可)
# vim config/server.properties

最关键的配置内容是 broker.idlog.dirszookeeper.connect,其中的 zookeeper.connect 是 Zookeeper 连接地址,建议使用 /kafka 作为后缀,这样方便日后在 Zookeeper 里统一管理 Kafka 的数据。在开发测试阶段,其他配置内容暂时可以使用默认值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# broker 的全局唯一编号,不能重复
broker.id=1
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# 运行日志存放的路径
log.dirs=/usr/local/kafka-cluster/kafka-node1/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认是 1 个副本
offsets.topic.replication.factor=1
# 每个 segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
# 每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
# 配置连接 Zookeeper 集群地址
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka
  • Kafka 端口配置

单机搭建 Kafka 集群时,为了解决端口冲突的问题,还需要指定 Kafka 监听的端口,必须将下述各个配置文件里的端口都更改掉。

1
2
3
4
# vim config/server.properties

# 默认端口号
listeners=PLAINTEXT://:9092
1
2
3
4
# vim config/connect-standalone.properties

# 单机环境的端口号
bootstrap.servers=localhost:9092
1
2
3
4
# vim config/connect-distributed.properties

# 集群环境的端口号
bootstrap.servers=localhost:9092
1
2
3
4
# vim config/producer.properties

# 发布端的端口号
bootstrap.servers=localhost:9092
1
2
3
4
# vim config/consumer.properties

# 消费端的端口号
bootstrap.servers=localhost:9092
  • Kafka 创建多个节点

拷贝两份上面已经配置好的 Kafka 安装目录,以此作为集群另外两个节点的安装文件,例如 kafka-node2kafka-node3。安装目录拷贝完成后,还需要为另外两个节点按照以下步骤更改对应的内容:

第一步:创建另外两个节点的日志目录(数据存储目录)
第二步:更改 server.properties 配置文件里的 broker.idlog.dirs
第三步:更改 Kafka 监听的端口,包括更改上述的 server.propertiesconnect-standalone.propertiesconnect-distributed.propertiesproducer.propertiesconsumer.properties 配置文件

上述两个步骤完成后,节点二和节点三的最终配置如下:

1
2
3
4
5
6
# 节点二的配置
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/usr/local/kafka-cluster/kafka-node2/logs

bootstrap.servers=localhost:9093
1
2
3
4
5
6
# 节点三的配置
broker.id=3
listeners=PLAINTEXT://:9094
log.dirs=/usr/local/kafka-cluster/kafka-node3/logs

bootstrap.servers=localhost:9094

集群管理

  • 集群启动

注意

启动 Kafka 集群之前,必须确保 Zookeeper 集群已经启动成功,这是因为本文搭建的 Kafka 集群依赖于 Zookeeper 集群。

1
2
3
4
5
6
7
8
9
# 后台启动
# /usr/local/kafka-cluster/kafka-node1/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node1/config/server.properties
# /usr/local/kafka-cluster/kafka-node2/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node2/config/server.properties
# /usr/local/kafka-cluster/kafka-node3/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node3/config/server.properties

# 或者前台启动(可直接查看启动时输出的日志信息)
# /usr/local/kafka-cluster/kafka-node1/bin/kafka-server-start.sh /usr/local/kafka-cluster/kafka-node1/config/server.properties
# /usr/local/kafka-cluster/kafka-node2/bin/kafka-server-start.sh /usr/local/kafka-cluster/kafka-node2/config/server.properties
# /usr/local/kafka-cluster/kafka-node3/bin/kafka-server-start.sh /usr/local/kafka-cluster/kafka-node3/config/server.properties
  • 查看状态

集群启动后,可以使用以下命令查看集群的运行状态。如果发现集群启动失败,则可以使用前台的方式再次启动集群,然后根据终端输出的错误日志信息来定位问题。

1
2
3
4
5
6
7
8
9
10
# 查看端口占用情况
# netstat -nplt | grep 9092
# netstat -nplt | grep 9093
# netstat -nplt | grep 9094

# 查看Kafka进程
# ps -aux | grep kafka

# 查看Java进程
# jps -l
  • 集群关闭

注意

关闭 Kafka 集群时,一定要等 Kafka 所有节点进程全部关闭后再关闭 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群的相关信息,Zookeeper 集群一旦先关闭,Kafka 集群就没有办法再获取关闭进程的信息,此时只能手动强制杀死 Kafka 进程。

1
2
3
4
5
# 在单机搭建集群的环境下,任意执行以下一个命令都会关闭所有集群节点(这跟 Shell 脚本的代码有关),如果希望只单独关闭某个节点,建议使用 kill 命令

# /usr/local/kafka-cluster/kafka-node1/bin/kafka-server-stop.sh stop
# /usr/local/kafka-cluster/kafka-node2/bin/kafka-server-stop.sh stop
# /usr/local/kafka-cluster/kafka-node3/bin/kafka-server-stop.sh stop
  • 清空数据

若希望清空 Kafka 集群的数据,则可以按照以下步骤操作。清空数据的操作不可恢复,生产环境下慎用。

第一步:关闭 Kafka 集群
第二步:连接 Zookeeper 集群,然后删除 /kafka 目录
第三步:删除 Kafka 各个集群节点安装目录下的 logs 目录
第四步:重启 Kafka 集群

集群测试

  • 进入任意节点的安装目录下的 bin 目录
1
2
# 进入安装目录
# cd /usr/local/kafka-cluster/kafka-node1/bin
  • 创建主题
1
2
# 创建主题
# ./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 3 --partitions 1 --topic test
  • 查看主题列表
1
2
# 查看主题列表
# ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
  • 查看主题详细信息
1
2
# 查看主题详细信息
# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe
  • 启动控制台消费者
1
2
# 启动消费者
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
  • 启动控制台生产者
1
2
# 启动生产者
# ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

生产者正常启动后,在生产者的控制台手动输入 hello kafka,消费者的控制台就可以消费到生产者的消息,并输出 hello kafka,这表示消费者成功消费了生产者发送的消息!

参考博客