Kafka 源码阅读环境搭建
前言
验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。
学习资源
版本说明
软件 | 版本 | 说明 |
---|---|---|
OpenJDK | 1.8 | |
Gradle | 7.1.1 | |
Scala | 2.13.6 | |
Kafka | 3.0.0 |
特别注意
上述各软件的版本必须互相匹配,比如当 Kafka 的版本升级为 3.8.0
,那么 Scala、Gradle、OpenJDK 的版本可能也要跟着升级,具体的版本要求请查阅 Kafka 的下载说明。
下载源码
- 在 Kafka 官网 下载 Kafka 源码的压缩包,并解压
1 | # 下载源码 |
提示
- 若希望学习 Kafka 最新版本的代码,可以通过 Git 将 Kafka 的 GitHub 仓库 的
trunk
分支拉取下来,这里的trunk
分支是 Kafka 的主干分支。 - 特别注意,如果学习的是 Kafka 最新版本的代码,那么 Scala、Gradle、OpenJDK 的版本可能要升级,具体的版本要求请查阅 Kafka 的下载说明。
本地安装 JDK
提示
- Kafka 源码的编译需要依赖 JDK 8 或更高版本(比如 JDK 11)。
- 值得一提的是,Kafka 的 Broker 是基于 Scala 开发的,而 Producer 和 Consumer 是基于 Java 开发的。
- 特别注意,JDK 的版本和 Kafka 源码的版本必须相匹配(如图所示),否则可能会导致无法正常编译 Kafka 源码。
- 在 OpenJDK 官网 下载 OpenJDK 的压缩包,并解压文件
1 | # 下载文件 |
- 添加系统环境变量
1 | # 编辑配置文件,添加环境变量 |
- 验证安装
1 | # 查看 JDK 的版本号 |
本地安装 Scala
提示
- Kafka 源码的编译需要依赖 Scala。
- 值得一提的是,Kafka 的 Broker 是基于 Scala 开发的,而 Producer 和 Consumer 是基于 Java 开发的。
- 特别注意,Scala 的版本和 Kafka 源码的版本必须相匹配(如图所示),否则可能会导致无法正常编译 Kafka 源码。
- 在 Scala 官网 下载 Scala 的压缩包,并解压文件
1 | # 下载文件 |
- 添加系统环境变量
1 | # 编辑配置文件,添加环境变量 |
- 验证安装
1 | # 查看 Scala 的版本号 |
本地安装 Gradle
提示
- Kafka 使用 Gradle 作为构建工具,如果想避免在本地安装 Gradle,可以直接使用 Gradle Wrapper(Kafka 源码目录下的
gradlew
脚本)。 - 在 Kafka 的源码目录下,打开
gradle/wrapper/gradle-wrapper.properties
配置文件,就可以知道当前的 Kafka 源码依赖哪个版本的 Gradle。 - 特别注意,Gradle 的版本和 JDK 的版本必须相匹配,否则可能会导致无法正常编译 Kafka 源码。
- 在 Gradle 官网 下载 Gradle 的压缩包,并解压文件
1 | # 下载文件 |
- 添加系统环境变量
1 | # 编辑配置文件,添加环境变量 |
- 验证安装
1 | # 查看 Gradle 的版本号 |
IDE 工具导入源码
为了 Kafka 开发和调试方便,这里使用 IntelliJ IDEA + Scala Plugin 作为 Scala 的 IDE 工具。
IDE 导入源码
Scala 没有专门的 IDE 开发工具,可以使用以下任意一种 IDE 工具组合导入 Kafka 的源码目录。
若使用 IntelliJ IDEA + Scala Plugin 作为 IDE 工具,那么在导入 Kafka 源码时,直接选中 Kafka 源码目录下的 build.gradle
文件即可。
添加阿里镜像
在 IntelliJ IDEA 导入 Kafka 源码后,由于 Gradle 支持使用 Maven 依赖,为了加快 Gradle 拉取依赖的速度,建议编辑 Kafka 源码目录中的 build.gradle
配置文件,然后添加阿里云的 Maven 镜像地址 https://maven.aliyun.com/nexus/content/groups/public/
,如下图所示:
1 | repositories { |
更改依赖的版本
在 Kafka 3.0.0
版本的源码中,依赖了 4.1.0
版本的 grgit-core
。但是,在 Maven 中央仓库中只有 4.1.1
版本的 grgit-core
,因此在默认情况下 Gradle 将无法正常构建 Kafka 的源码,会出现以下错误信息:
1 | A problem occurred configuring root project 'kafka-3.0.0-src'. |
解决办法是,更改 Kafka 源码目录下的 gradle/dependencies.gradle
配置文件,将 grgit:"4.1.0"
更改为 grgit: "4.1.1"
,如下图所示:
使用本地的 JDK
在 IntelliJ IDEA 导入 Kafka 源码后,菜单栏导航到 File > Project Structure -> Project Settings
,然后设置本地已安装的 JDK 版本,如下图所示:
使用本地的 Scala
在 IntelliJ IDEA 导入 Kafka 源码后,编辑 Kafka 源码目录中的 gradle.properties
配置文件,然后将 Scala 的版本号更改为本地已安装的 Scala 版本,如下图所示:
在 IntelliJ IDEA 的主界面中,菜单栏导航到 File > Project Structure -> Platform Settings
,然后添加本地已安装的 Scala 版本,如下图所示:
使用本地的 Gradle
在 IntelliJ IDEA 导入 Kafka 源码后,IDEA 可能会自动下载 Gradle 的压缩包。这是因为 Kafka 使用了 Gradle Wrapper,并且 IDEA 默认会优先使用 Gradle Wrapper 来管理构建工具的版本。Gradle Wrapper 是 Gradle 提供的一种机制,用于确保构建环境的一致性,它包含一个 gradlew
可执行脚本和 gradle/wrapper/gradle-wrapper.properties
配置文件,并且在 gradle-wrapper.properties
配置文件中指定了 Gradle 的精确版本(如下图所示)。在 IntelliJ IDEA 导入 Gradle 项目时,Gradle 的使用优先级如下:
Gradle Wrapper(默认使用)
:IDEA 会优先使用项目中已配置的 Gradle Wrapper,以确保构建工具版本与项目的要求一致。本地已安装的 Gradle
:当选择不使用 Gradle Wrapper 时,IntelliJ IDEA 才会使用本地已安装的 Gradle 版本。
为了让 IntelliJ IDEA 默认使用本地已安装的 Gradle 版本,菜单栏导航到 File > Settings > Build, Execution, Deployment > Build Tools > Gradle
,然后在 Use Gradle from
下拉列表中选择 Specified location
,最后手动选择本地已安装的 Gradle 版本,如下图所示:
IDE 重新构建源码
在完成上述配置步骤后,重启 IntelliJ IDEA,然后 IDEA 就会自动构建 Kafka 项目的源码。另外,还可以在 IDEA 主界面的右侧工具栏中,点击 Gradle 的 重载
按钮,将整个项目重新构建一次,如下图所示:
当 IntelliJ IDEA 成功构建 Kafka 项目的源码后,会输出如下的日志信息:
1 | Starting Gradle Daemon... |
特别注意
在 IntelliJ IDEA 中构建源码的操作,只是为了让 IntelliJ IDEA 可以正常导入 / 加载 Gradle 项目(Kafka),并不代表项目的源码可以正常编译。
命令行编译源码
在终端可以执行 gradlew
命令直接编译 Kafka 的源码(如下所示),这里的 gradlew
是 Kafka 提供的 Gradle Wrapper 脚本。当 Kafka 源码编译成功后,所有生成的 JAR 文件和分发包一般都会位于 build
目录下。
1 | # 编译源码 |
或者使用本地已安装的 Gradle 来编译 Kafka 的源码。
1 | # 编译源码 |
IDE 工具启动源码
准备工作
(1) 在默认情况下,Kafka Broker 的运行需要依赖于 ZooKeeper,因此需要提前安装并启动 ZooKeeper,这里不再累述。
(2) 编辑 Kafka 源码目录下的
config/server.properties
配置文件,更改 ZooKeeper 的连接地址,如下所示:
1 | # ZooKeeper 连接地址(单个节点) |
或者指定多个 ZooKeeper 集群节点,使用逗号分割
1 | # ZooKeeper 连接地址(多个节点) |
提示
值得一提的是,从 Kafka 2.8.0
版本开始,Kafka 自身实现了 Raft
分布式一致性机制,这意味着 Kafka(包括集群)是可以脱离 ZooKeeper 独立运行的。
启动源码
Kafka Broker 的主启动类是 core/src/main/scala/kafka/Kafka.scala
,在 IntelliJ IDEA 中打开该主启动类,并启动 main
方法即可,如下图所示:
在默认情况下,Kafka Broker 会启动失败,因为在启动的时候没有指定 server.properties
配置文件,IntelliJ IDEA 会输出如下的错误信息:
因此,在 IntelliJ IDEA 中需要修改 Kafka Broker 的启动配置,也就是在 Program arguments
中添加 server.properties
配置文件的相对路径(或者绝对路径),如下图所示:
获取 server.properties
配置文件的相对路径(或者绝对路径),然后在添加到 Program arguments
里面,如下图所示:
这样 Kafka Broker 就可以正常启动了,如果控制台输出的 SL4J 的警告信息,那么可以忽略掉那些警告信息,或者根据 这里 的教程配置 Kafka 的日志输出,如下图所示:
测试源码
- 创建主题
1 | kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1 |
- 查看主题列表
1 | kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092 |
- 查看主题详情
1 | kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test |
- 发送消息
1 | kafka-console-producer.sh --topic test --broker-list 127.0.0.1:9092 |
- 接收消息
1 | kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 127.0.0.1:9092 |
常见错误解决
Kafka 源码启动失败
- 在 IntelliJ IDEA 中,通过主启动类
core/src/main/scala/kafka/Kafka.scala
启动 Kafka Broker,出现以下错误信息:
- 这可能是因为在启动 Kafka Broker 时,没有通过环境变量指定
server.properties
配置文件导致的,解决方案请看 这里。
特别注意
如果在 Broker 启动时指定了 server.properties
配置文件也还是启动失败,那么建议根据 这里 的教程,打印 Kafka Broker 启动时的详细错误日志信息,以此定位问题。
找不到 grgit-core 依赖
- 在 Kafka
3.0.0
版本的源码中,依赖了4.1.0
版本的grgit-core
。但是,在 Maven 仓库中只有4.1.1
版本的grgit-core
,因此在默认情况下 Gradle 将无法正常构建 Kafka 的源码,会出现以下错误信息:
1 | A problem occurred configuring root project 'kafka-3.0.0-src'. |
- 解决办法是,更改 Kafka 源码目录下的
gradle/dependencies.gradle
配置文件,将grgit:"4.1.0"
更改为grgit: "4.1.1"
,然后重新构建项目,如下图所示:
启动失败没有输出错误日志
在 IntelliJ IDEA 中启动 Kafka Broker 失败后,控制台没有输出具体的错误日志,只简单显示以下错误信息(如下图所示),导致很难定位问题
解决办法是在 Kafka 源码目录下,拷贝 config/log4j.properties
配置文件到 core/src/main/resources
目录下,如下图所示:
然后,编辑 Kafka 源码目录下的 build.gradle
配置文件,找到 project(':core')
,然后在里面添加以下依赖,如下图所示:
1 | implementation libs.slf4jApi |
集群 ID 不匹配导致启动失败
在 IntelliJ IDEA 中启动 Kafka Broker 时,出现以下错误信息:
1 | [2022-10-23 21:05:39,738] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) |
这个错误是由于 Kafka Broker 的集群 ID 与其日志目录下存储的 meta.properties
文件中的集群 ID 不匹配导致的。解决方案是,删除 Kafka 日志目录 /tmp/kafka-logs
下的所有文件,然后重新启动 Kafka Broker。
1 | $ rm -rf /tmp/kafka-logs |
提示
在使用 IntelliJ IDEA 启动 Kafka 源码时,默认日志目录(即 Kafka 的 log.dirs
配置)可以在 server.properties
配置文件中查看到。如果未明确设置,则 Kafka 会使用 /tmp/kafka-logs
作为默认日志路径。