前言
本文适用于在 Centos/Debian/Ubuntu 等 Linux 发行版系统上,使用多台物理机器(至少三台)搭建 Kafka 集群。
Zookeeper 集群搭建
本文的 Kafka 集群搭建依赖于 Zookeeper,因此生产环境需要将 Zookeeper 集群提前搭建起来。值得一提的是,从 Kafka 2.8.0
版本开始,Kafka 自身实现了 Raft
分布式一致性机制,这意味着 Kafka 集群是可以脱离 ZooKeeper 独立运行的。
集群规划
节点 | IP 地址 | 端口 | 版本号 |
---|
Zookeeper 节点 1 | 192.168.1.1 | 2181 | 3.4.10 |
Zookeeper 节点 2 | 192.168.1.2 | 2181 | 3.4.10 |
Zookeeper 节点 3 | 192.168.1.3 | 2181 | 3.4.10 |
集群部署
由于篇幅有限,Linux 生产环境搭建 Zookeeper 集群的内容这里不再累述,详细教程可看 这里。
Kafka 集群搭建
集群规划
节点 | IP 地址 | 端口 | 版本号 |
---|
Kafka 节点 1 | 192.168.1.1 | 9092 | 2.13-3.2.1 |
Kafka 节点 2 | 192.168.1.2 | 9092 | 2.13-3.2.1 |
Kafka 节点 3 | 192.168.1.3 | 9092 | 2.13-3.2.1 |
集群搭建
- Kafka 的安装包可以从 官网 下载。
- 以下载得到的压缩文件
kafka_2.13-3.2.1.tgz
为例,2.11
是 Scala 的版本号,3.2.1
是 Kafka 的版本号。 - Kafka 的 Broker 组件是使用 Scala 开发的,而 Producer 组件和 Consumer 组件是使用 Java 开发的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
|
最关键的配置内容是 broker.id
、log.dirs
、zookeeper.connect
,其中的 zookeeper.connect
是 Zookeeper 连接地址,建议使用 /kafka
作为后缀,这样方便日后在 Zookeeper 里统一管理 Kafka 的数据。在开发测试阶段,其他配置内容暂时可以使用默认值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka-cluster/kafka-node1/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181/kafka
|
拷贝两份上面已经配置好的 Kafka 安装目录到其他服务器节点上,以此作为集群另外两个节点的安装文件,例如 kafka-node2
、kafka-node3
。安装目录拷贝完成后,还需要更改另外两个节点里的 server.properties
配置文件的 broker.id
、log.dirs
。节点二和节点三的最终配置如下:
1 2 3
| # 节点二的配置 broker.id=2 log.dirs=/usr/local/kafka-cluster/kafka-node2/logs
|
1 2 3
| # 节点三的配置 broker.id=3 log.dirs=/usr/local/kafka-cluster/kafka-node3/logs
|
集群管理
注意
启动 Kafka 集群之前,必须确保 Zookeeper 集群已经启动成功,这是因为本文搭建的 Kafka 集群依赖于 Zookeeper 集群。
集群启动后,可以使用以下命令查看集群的运行状态。如果发现集群启动失败,则可以使用前台的方式再次启动集群,然后根据终端输出的错误日志信息来定位问题。
注意
关闭 Kafka 集群时,一定要等 Kafka 所有节点进程全部关闭后再关闭 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群的相关信息,Zookeeper 集群一旦先关闭,Kafka 集群就没有办法再获取关闭进程的信息,此时只能手动强制杀死 Kafka 进程。
若希望清空 Kafka 集群的数据,则可以按照以下步骤操作。清空数据的操作不可恢复,生产环境下慎用。
第一步:关闭 Kafka 集群
第二步:连接 Zookeeper 集群,然后删除 /kafka
目录
第三步:删除 Kafka 各个集群节点的安装目录下的 logs
目录(文件夹)
第四步:重启 Kafka 集群
集群测试
生产者正常启动后,在生产者的控制台手动输入 hello kafka
,消费者的控制台就可以消费到生产者的消息,并输出 hello kafka
,这表示消费者成功消费了生产者发送的消息!
Kafka 更改端口(可选)
若希望更改 Kafka 的默认端口(9092),可以按照以下步骤更改 Kafka 安装目录下 config
子目录里的各个配置文件。例如可以将 Kafka 的默认端口更改为 9090
,如下所示:
1 2 3 4
|
listeners=PLAINTEXT://:9090
|
1 2 3 4
|
bootstrap.servers=localhost:9090
|
1 2 3 4
|
bootstrap.servers=localhost:9090
|
1 2 3 4
|
bootstrap.servers=localhost:9090
|
1 2 3 4
|
bootstrap.servers=localhost:9090
|
参考博客