Kafka 源码阅读环境搭建

前言

验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。

学习资源

版本说明

软件版本说明
OpenJDK1.8
Gradle7.1.1
Scala2.13.6
Kafka3.0.0

特别注意

上述各软件的版本必须互相匹配,比如当 Kafka 的版本升级为 3.8.0,那么 Scala、Gradle、OpenJDK 的版本可能也要跟着升级,具体的版本要求请查阅 Kafka 的下载说明

下载源码

  • Kafka 官网 下载 Kafka 源码的压缩包,并解压
1
2
3
4
5
# 下载源码
$ wget https://archive.apache.org/dist/kafka/3.0.0/kafka-3.0.0-src.tgz

# 解压源码
$ tar -xvf kafka-3.0.0-src.tgz

提示

  • 若希望学习 Kafka 最新版本的代码,可以通过 Git 将 Kafka 的 GitHub 仓库trunk 分支拉取下来,这里的 trunk 分支是 Kafka 的主干分支。
  • 特别注意,如果学习的是 Kafka 最新版本的代码,那么 Scala、Gradle、OpenJDK 的版本可能要升级,具体的版本要求请查阅 Kafka 的下载说明

本地安装 JDK

提示

  • Kafka 源码的编译需要依赖 JDK 8 或更高版本(比如 JDK 11)。
  • 值得一提的是,Kafka 的 Broker 是基于 Scala 开发的,而 Producer 和 Consumer 是基于 Java 开发的。
  • 特别注意,JDK 的版本和 Kafka 源码的版本必须相匹配(如图所示),否则可能会导致无法正常编译 Kafka 源码。
1
2
3
4
5
6
7
8
9
10
11
# 下载文件
$ wget https://download.java.net/openjdk/jdk8u44/ri/openjdk-8u44-linux-x64.tar.gz

# 创建解压目录
$ sudo mkdir -p /usr/lib/jvm

# 解压文件
$ sudo tar -xvf openjdk-8u44-linux-x64.tar.gz -C /usr/lib/jvm

# 重命名文件
$ sudo mv /usr/lib/jvm/java-se-8u44-ri /usr/lib/jvm/openjdk-8u44
  • 添加系统环境变量
1
2
3
4
5
6
7
# 编辑配置文件,添加环境变量
$ vi ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/openjdk-8u44
export PATH=$PATH:$JAVA_HOME/bin

# 使环境变量生效
$ source ~/.bashrc
  • 验证安装
1
2
# 查看 JDK 的版本号
$ java -version

本地安装 Scala

提示

  • Kafka 源码的编译需要依赖 Scala。
  • 值得一提的是,Kafka 的 Broker 是基于 Scala 开发的,而 Producer 和 Consumer 是基于 Java 开发的。
  • 特别注意,Scala 的版本和 Kafka 源码的版本必须相匹配(如图所示),否则可能会导致无法正常编译 Kafka 源码。
  • Scala 官网 下载 Scala 的压缩包,并解压文件
1
2
3
4
5
6
7
8
# 下载文件
$ wget https://downloads.lightbend.com/scala/2.13.6/scala-2.13.6.tgz

# 解压文件
$ tar -xvf scala-2.13.6.tgz

# 移动文件
$ sudo mv scala-2.13.6 /usr/local
  • 添加系统环境变量
1
2
3
4
5
6
7
# 编辑配置文件,添加环境变量
$ vi ~/.bashrc
export SCALA_HOME=/usr/local/scala-2.13.6
export PATH=$PATH:$SCALA_HOME/bin

# 使环境变量生效
$ source ~/.bashrc
  • 验证安装
1
2
# 查看 Scala 的版本号
$ scala -version

本地安装 Gradle

提示

  • Kafka 使用 Gradle 作为构建工具,如果想避免在本地安装 Gradle,可以直接使用 Gradle Wrapper(Kafka 源码目录下的 gradlew 脚本)。
  • 在 Kafka 的源码目录下,打开 gradle/wrapper/gradle-wrapper.properties 配置文件,就可以知道当前的 Kafka 源码依赖哪个版本的 Gradle。
  • 特别注意,Gradle 的版本和 JDK 的版本必须相匹配,否则可能会导致无法正常编译 Kafka 源码。
1
2
3
4
5
6
7
8
# 下载文件
$ wget https://services.gradle.org/distributions/gradle-7.1.1-bin.zip

# 创建解压目录
$ sudo mkdir /usr/local/gradle

# 解压文件
$ sudo unzip -d /usr/local/gradle gradle-7.1.1-bin.zip
  • 添加系统环境变量
1
2
3
4
5
6
7
# 编辑配置文件,添加环境变量
$ vi ~/.bashrc
export GRADLE_HOME=/usr/local/gradle/gradle-7.1.1
export PATH=$PATH:$GRADLE_HOME/bin

# 使环境变量生效
$ source ~/.bashrc
  • 验证安装
1
2
# 查看 Gradle 的版本号
$ gradle -version

IDE 工具导入源码

为了 Kafka 开发和调试方便,这里使用 IntelliJ IDEA + Scala Plugin 作为 Scala 的 IDE 工具。

IDE 导入源码

Scala 没有专门的 IDE 开发工具,可以使用以下任意一种 IDE 工具组合导入 Kafka 的源码目录。

若使用 IntelliJ IDEA + Scala Plugin 作为 IDE 工具,那么在导入 Kafka 源码时,直接选中 Kafka 源码目录下的 build.gradle 文件即可。

添加阿里镜像

在 IntelliJ IDEA 导入 Kafka 源码后,由于 Gradle 支持使用 Maven 依赖,为了加快 Gradle 拉取依赖的速度,建议编辑 Kafka 源码目录中的 build.gradle 配置文件,然后添加阿里云的 Maven 镜像地址 https://maven.aliyun.com/nexus/content/groups/public/,如下图所示:

1
2
3
4
repositories {
maven { url 'https://maven.aliyun.com/nexus/content/groups/public/' }
mavenCentral()
}

更改依赖的版本

在 Kafka 3.0.0 版本的源码中,依赖了 4.1.0 版本的 grgit-core。但是,在 Maven 中央仓库中只有 4.1.1 版本的 grgit-core,因此在默认情况下 Gradle 将无法正常构建 Kafka 的源码,会出现以下错误信息:

1
2
3
A problem occurred configuring root project 'kafka-3.0.0-src'.
> Could not resolve all artifacts for configuration ':classpath'.
> Could not find org.ajoberstar.grgit:grgit-core:4.1.0.

解决办法是,更改 Kafka 源码目录下的 gradle/dependencies.gradle 配置文件,将 grgit:"4.1.0" 更改为 grgit: "4.1.1",如下图所示:

使用本地的 JDK

在 IntelliJ IDEA 导入 Kafka 源码后,菜单栏导航到 File > Project Structure -> Project Settings,然后设置本地已安装的 JDK 版本,如下图所示:

使用本地的 Scala

在 IntelliJ IDEA 导入 Kafka 源码后,编辑 Kafka 源码目录中的 gradle.properties 配置文件,然后将 Scala 的版本号更改为本地已安装的 Scala 版本,如下图所示:

在 IntelliJ IDEA 的主界面中,菜单栏导航到 File > Project Structure -> Platform Settings,然后添加本地已安装的 Scala 版本,如下图所示:

使用本地的 Gradle

在 IntelliJ IDEA 导入 Kafka 源码后,IDEA 可能会自动下载 Gradle 的压缩包。这是因为 Kafka 使用了 Gradle Wrapper,并且 IDEA 默认会优先使用 Gradle Wrapper 来管理构建工具的版本。Gradle Wrapper 是 Gradle 提供的一种机制,用于确保构建环境的一致性,它包含一个 gradlew 可执行脚本和 gradle/wrapper/gradle-wrapper.properties 配置文件,并且在 gradle-wrapper.properties 配置文件中指定了 Gradle 的精确版本(如下图所示)。在 IntelliJ IDEA 导入 Gradle 项目时,Gradle 的使用优先级如下:

  • Gradle Wrapper(默认使用):IDEA 会优先使用项目中已配置的 Gradle Wrapper,以确保构建工具版本与项目的要求一致。
  • 本地已安装的 Gradle:当选择不使用 Gradle Wrapper 时,IntelliJ IDEA 才会使用本地已安装的 Gradle 版本。

为了让 IntelliJ IDEA 默认使用本地已安装的 Gradle 版本,菜单栏导航到 File > Settings > Build, Execution, Deployment > Build Tools > Gradle,然后在 Use Gradle from 下拉列表中选择 Specified location,最后手动选择本地已安装的 Gradle 版本,如下图所示:

IDE 重新构建源码

在完成上述配置步骤后,重启 IntelliJ IDEA,然后 IDEA 就会自动构建 Kafka 项目的源码。另外,还可以在 IDEA 主界面的右侧工具栏中,点击 Gradle 的 重载 按钮,将整个项目重新构建一次,如下图所示:

当 IntelliJ IDEA 成功构建 Kafka 项目的源码后,会输出如下的日志信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
Starting Gradle Daemon...
Gradle Daemon started in 1 s 8 ms

> Configure project :
Starting build with version 3.0.0 using Gradle 7.1.1, Java 1.8 and Scala 2.13.6

Deprecated Gradle features were used in this build, making it incompatible with Gradle 8.0.

You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.

See https://docs.gradle.org/7.1.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 13s

特别注意

在 IntelliJ IDEA 中构建源码的操作,只是为了让 IntelliJ IDEA 可以正常导入 / 加载 Gradle 项目(Kafka),并不代表项目的源码可以正常编译。

命令行编译源码

在终端可以执行 gradlew 命令直接编译 Kafka 的源码(如下所示),这里的 gradlew 是 Kafka 提供的 Gradle Wrapper 脚本。当 Kafka 源码编译成功后,所有生成的 JAR 文件和分发包一般都会位于 build 目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 编译源码
./gradlew build

# 编译源码,先执行清理
./gradlew clean build

# 编译源码,并跳过测试
./gradlew build -x test

# 仅编译 core 模块的源码
./gradlew core:build

# 编译生成二进制包
./gradlew kafka-dist:assemble

或者使用本地已安装的 Gradle 来编译 Kafka 的源码。

1
2
3
4
5
# 编译源码
gradle build

# 编译源码,并跳过测试
gradle build -x test

IDE 工具启动源码

准备工作

  • (1) 在默认情况下,Kafka Broker 的运行需要依赖于 ZooKeeper,因此需要提前安装并启动 ZooKeeper,这里不再累述。

  • (2) 编辑 Kafka 源码目录下的 config/server.properties 配置文件,更改 ZooKeeper 的连接地址,如下所示:

1
2
# ZooKeeper 连接地址(单个节点)
zookeeper.connect=127.0.0.1:2181

或者指定多个 ZooKeeper 集群节点,使用逗号分割

1
2
# ZooKeeper 连接地址(多个节点)
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka

提示

值得一提的是,从 Kafka 2.8.0 版本开始,Kafka 自身实现了 Raft 分布式一致性机制,这意味着 Kafka(包括集群)是可以脱离 ZooKeeper 独立运行的。

启动源码

Kafka Broker 的主启动类是 core/src/main/scala/kafka/Kafka.scala,在 IntelliJ IDEA 中打开该主启动类,并启动 main 方法即可,如下图所示:

在默认情况下,Kafka Broker 会启动失败,因为在启动的时候没有指定 server.properties 配置文件,IntelliJ IDEA 会输出如下的错误信息:

因此,在 IntelliJ IDEA 中需要修改 Kafka Broker 的启动配置,也就是在 Program arguments 中添加 server.properties 配置文件的相对路径(或者绝对路径),如下图所示:

获取 server.properties 配置文件的相对路径(或者绝对路径),然后在添加到 Program arguments 里面,如下图所示:

这样 Kafka Broker 就可以正常启动了,如果控制台输出的 SL4J 的警告信息,那么可以忽略掉那些警告信息,或者根据 这里 的教程配置 Kafka 的日志输出,如下图所示:

测试源码

  • 创建主题
1
kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1
  • 查看主题列表
1
kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
  • 查看主题详情
1
kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test
  • 发送消息
1
kafka-console-producer.sh --topic test --broker-list 127.0.0.1:9092
  • 接收消息
1
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 127.0.0.1:9092

常见错误解决

Kafka 源码启动失败

  • 在 IntelliJ IDEA 中,通过主启动类 core/src/main/scala/kafka/Kafka.scala 启动 Kafka Broker,出现以下错误信息:

  • 这可能是因为在启动 Kafka Broker 时,没有通过环境变量指定 server.properties 配置文件导致的,解决方案请看 这里

特别注意

如果在 Broker 启动时指定了 server.properties 配置文件也还是启动失败,那么建议根据 这里 的教程,打印 Kafka Broker 启动时的详细错误日志信息,以此定位问题。

找不到 grgit-core 依赖

  • 在 Kafka 3.0.0 版本的源码中,依赖了 4.1.0 版本的 grgit-core。但是,在 Maven 仓库中只有 4.1.1 版本的 grgit-core,因此在默认情况下 Gradle 将无法正常构建 Kafka 的源码,会出现以下错误信息:
1
2
3
A problem occurred configuring root project 'kafka-3.0.0-src'.
> Could not resolve all artifacts for configuration ':classpath'.
> Could not find org.ajoberstar.grgit:grgit-core:4.1.0.
  • 解决办法是,更改 Kafka 源码目录下的 gradle/dependencies.gradle 配置文件,将 grgit:"4.1.0" 更改为 grgit: "4.1.1",然后重新构建项目,如下图所示:

启动失败没有输出错误日志

在 IntelliJ IDEA 中启动 Kafka Broker 失败后,控制台没有输出具体的错误日志,只简单显示以下错误信息(如下图所示),导致很难定位问题

解决办法是在 Kafka 源码目录下,拷贝 config/log4j.properties 配置文件到 core/src/main/resources 目录下,如下图所示:

然后,编辑 Kafka 源码目录下的 build.gradle 配置文件,找到 project(':core'),然后在里面添加以下依赖,如下图所示:

1
2
3
implementation libs.slf4jApi
implementation libs.slf4jlog4j
implementation libs.log4j

集群 ID 不匹配导致启动失败

在 IntelliJ IDEA 中启动 Kafka Broker 时,出现以下错误信息:

1
2
3
4
5
6
7
[2022-10-23 21:05:39,738] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID pvKvK1WTSeGJyE0w6qUvrQ doesn't match stored clusterId Some(gvucjXWLQxqChk_mvH7DAQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:223)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)
[2022-10-23 21:05:39,741] INFO shutting down (kafka.server.KafkaServer)
[2022-10-23 21:05:39,744] INFO [feature-zk-node-event-process-thread]: Shutting down (kafka.server

这个错误是由于 Kafka Broker 的集群 ID 与其日志目录下存储的 meta.properties 文件中的集群 ID 不匹配导致的。解决方案是,删除 Kafka 日志目录 /tmp/kafka-logs 下的所有文件,然后重新启动 Kafka Broker。

1
$ rm -rf /tmp/kafka-logs

提示

在使用 IntelliJ IDEA 启动 Kafka 源码时,默认日志目录(即 Kafka 的 log.dirs 配置)可以在 server.properties 配置文件中查看到。如果未明确设置,则 Kafka 会使用 /tmp/kafka-logs 作为默认日志路径。

参考资料