ZooKeeper 入门教程之五
大纲
前言
学习资源
版本说明
软件 | 版本 | 说明 |
---|---|---|
ZooKeeper 的源码 | 3.5.7 | 本文给出的 ZooKeeper 源码分析,都是基于 ZooKeeper 的 3.5.7 版本进行讲解。 |
ZK 辅助源码分析
ZK 持久化源码
Leader 和 Follower 中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中(如下图所示)。在 org.apache.zookeeper.server.persistence
包下的相关类都是持久化相关的代码。
- (1) ZooKeeper 中的数据模型是一棵树(DataTree),每个节点叫做 DataNode。
- (2) ZooKeeper 集群中的 DataTree 时刻都保持状态同步。
- (3) Zookeeper 集群中每个节点的数据在内存和磁盘中都有一份完整的数据。
- 内存数据:DataTree
- 磁盘数据:快照文件 + 编辑日志
- 快照接口
1 | public interface SnapShot { |
- 日志接口
1 | public interface TxnLog { |
- 处理持久化的核心类
ZK 序列化源码
zookeeper-jute
模块是 ZooKeeper 序列化相关的源码。
- 序列化和反序列化方法
1 | public interface Record { |
- 迭代接口
1 | public interface Index { |
- 序列化支持的数据类型
1 | public interface OutputArchive { |
- 反序列化支持的数据类型
1 | public interface InputArchive { |
ZK 服务端初始化源码分析
ZK 服务端启动脚本
ZooKeeper 服务端的启动命令是 zkServer.sh start
,其中 zkServer.sh
服务端脚本的核心源码如下:
1 |
|
由于 zkServer.sh start
命令在底层实际执行内容如下:
1 | nohup "$JAVA" |
所以 ZooKeeper 服务端的启动入口是 QuorumPeerMain
类。
ZK 服务端启动入口
- 服务端的启动入口类
QuorumPeerMain
1 | package org.apache.zookeeper.server.quorum; |
QuorumPeerMain.initializeAndRun()
方法
1 | protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException |
解析配置参数
提示
- ZooKeeper 在启动时,会解析
zoo.cfg
和myid
配置文件(集群模式才需要有myid
文件),从而获取配置参数。 zoo.cfg
配置文件,定义了整个 ZooKeeper 集群的配置参数,比如端口号、数据存储路径和服务器列表。myid
节点标识文件,存放了当前节点的唯一编号,用于区分集群中的各个节点。
QuorumPeerConfig.parse()
方法
1 | public void parse(String path) throws ConfigException { |
QuorumPeerConfig.parseProperties()
方法
1 | public void parseProperties(Properties zkProp) throws IOException, ConfigException { |
QuorumPeerConfig.setupQuorumPeerConfig()
方法
1 | void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { |
QuorumPeerConfig.setupMyId()
方法
1 | private void setupMyId() throws IOException { |
过期快照删除
QuorumPeerMain.initializeAndRun()
方法
1 | protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException |
DatadirCleanupManager.start()
方法
1 | public void start() { |
PurgeTask
任务类
1 | static class PurgeTask extends TimerTask { |
PurgeTxnLog.purge()
方法
1 | public static void purge(File dataDir, File snapDir, int num) throws IOException { |
初始化通信组件
I/O 模型默认为 NIO 通信
QuorumPeerMain.initializeAndRun()
方法
1 | protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException |
QuorumPeerMain.runFromConfig()
方法
1 | public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException |
ServerCnxnFactory.createFactory()
方法
1 | public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory"; |
在 zookeeper-docs
模块的 zookeeperAdmin.md
文件中,有以下这么一段说明:
1 | serverCnxnFactory : |
初始化 NIO 服务端的 Socket
提示
这里初始化 NIO 服务端的 Socket 时,ZooKeeper 并未启动。
NIOServerCnxnFactory.configure()
方法
1 |
|
ZK 服务端加载数据源码分析
ZK 的数据存储模型
Leader 和 Follower 中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中(如下图所示)。
- (1) ZooKeeper 中的数据模型是一棵树(DataTree),每个节点叫做 DataNode。
- (2) ZooKeeper 集群中的 DataTree 时刻都保持状态同步。
- (3) Zookeeper 集群中每个节点的数据在内存和磁盘中都有一份完整的数据。
- 内存数据:DataTree
- 磁盘数据:快照文件 + 编辑日志
ZK 集群节点的启动
QuorumPeerMain.initializeAndRun()
方法
1 | protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException |
QuorumPeerMain.runFromConfig()
方法
1 | public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException |
冷启动数据恢复快照数据
QuorumPeer.start()
方法
1 |
|
QuorumPeer.loadDataBase()
方法
1 | private void loadDataBase() { |
ZKDatabase.loadDataBase()
方法
1 | public long loadDataBase() throws IOException { |
FileTxnSnapLog.restore()
方法
1 | public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { |
FileSnap.deserialize()
方法
1 | public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { |
FileSnap.deserialize()
方法
1 | public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException { |
SerializeUtils.deserializeSnapshot()
方法
1 | public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException { |
DataTree.deserialize()
方法
1 | public void deserialize(InputArchive ia, String tag) throws IOException { |
冷启动数据恢复编辑日志
QuorumPeer.start()
方法
1 |
|
QuorumPeer.loadDataBase()
方法
1 | private void loadDataBase() { |
ZKDatabase.loadDataBase()
方法
1 | public long loadDataBase() throws IOException { |
FileTxnSnapLog.restore()
方法
1 | public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { |
FileTxnSnapLog.fastForwardFromEdits()
方法
1 | public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { |
FileTxnSnapLog.processTransaction()
方法
1 | public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { |
FileTxnSnapLog.processTxn()
方法
1 | public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) |
ZK 集群选举源码分析
集群选举机制
ZooKeeper 集群的选举机制是高频面试题,其中分为第一次启动和非第一次启动两种情况。
集群第一次启动
(1) 服务器 1 启动,发起一次选举。服务器 1 投自己一票,此时服务器 1 的票数是一票,不够半数以上(一共有 5 票),选举无法完成,服务器 1 的状态保持为 LOOKING。
(2) 服务器 2 启动,再发起一次选举。服务器 1 和 2 分别投自己一票并交换选票信息,此时服务器 1 发现服务器 2 的
myid
比自己目前投票推举的(服务器 1)大,更改选票为推举服务器 2。此时服务器 1 的票数是 0 票,服务器 2 的票数是 2 票,两者都没有半数以上的票数,选举无法完成,服务器 1 和 服务器 2 的状态保持 LOOKING。(3) 服务器 3 启动,发起一次选举。此时服务器 1 和 2 都会更改选票为服务器 3。此次投票结果:服务器 1 有 0 票,服务器 2 有 0 票,服务器 3 有 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选 Leader。服务器 3 将状态更改为 LEADING,服务器 1 和 服务器 2 将状态更改为 FOLLOWING。
(4) 服务器 4 启动,发起一次选举。此时服务器 1、2、3 已经不是 LOOKING 状态,不会再更改选票信息。交换选票信息后的结果:服务器 3 有 3 票,服务器 4 有 1 票。此时服务器 4 少数服从多数,更改选票信息为服务器 3,并将自己的状态更改为 FOLLOWING。
(5) 服务器 5 启动,跟服务器 4 一样更改选票信息为服务器 3,并将自己的状态更改为 FOLLOWING。
特别注意
一旦 ZooKeeper 集群中已经存在 Leader 节点,那么后面加入集群的节点都只会成为 Follower 节点。
集群非第一次启动
(1) 当 ZooKeeper 集群中的任意一台服务器出现以下两种情况之一时,就会开始进入 Leader 选举:
- 服务器初始化启动。
- 服务器运行期间无法和 Leader 保持连接。
(2) 当一台服务器进入 Leader 选举流程时,当前集群也可能会处于以下两种状态之一:
集群中本来就已经存在一个 Leader
- 对于集群已经存在 Leader 的情况,当服务器试图去选举 Leader 时,会被告知当前服务器的 Leader 信息;对于该服务器来说,仅仅需要和 Leader 服务器建立连接,并进行状态同步即可。
集群中确实不存在 Leader
- 对于集群不存在 Leader 的情况,整个集群会进入选举流程。
- 假设 ZooKeeper 集群由 5 台服务器组成,SID 分别为 1、2、3、4、5,ZXID 分别为 8、8、8、7、7, 并且此时 SID 为 3 的服务器是 Leader。在某一时刻,SID 为 3 和 5 的服务器出现故障,由于 Leader 挂掉了,因此开始进行 Leader 选举。SID 为 1、2、4 的服务器的最终投票情况如下:
1
2(EPOCH, ZXID, SID ) (EPOCH, ZXID, SID ) (EPOCH, ZXID, SID )
(1, 8, 1) (1, 8, 2) (1, 7, 4) - 选举 Leader 的规则:①EPOCH 大的直接胜出 ②EPOCH 相同,ZXID(事务 ID)大的胜出 ③ZXID(事务 ID) 相同,SID(服务器 ID,即 myid) 大的胜出
- 选举 Leader 的结果:SID 为 2 的服务器会被选举 Leader,SID 为 1 和 4 的服务器作为 Follower。
集群选举机制总结
ZooKeeper 使用半数投票机制来实现选举,超过半数的投票通过,即通过。
集群第一次启动时的选举规则
- 投票过半数时,服务器 ID 大的胜出
集群非第一次启动时的选举规则
- (1) EPOCH 大的直接胜出
- (2) EPOCH 相同,事务 ID(ZXID)大的胜出
- (3) 事务 ID(ZXID)相同,服务器 ID(SID)大的胜出
集群选举准备
QuorumPeerMain.initializeAndRun()
方法
1 | protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException |
QuorumPeerMain.runFromConfig()
方法
1 | public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException |
QuorumPeer.start()
方法
1 |
|
QuorumPeer.startLeaderElection()
方法
1 | synchronized public void startLeaderElection() { |
QuorumPeer.createElectionAlgorithm()
方法
1 | protected Election createElectionAlgorithm(int electionAlgorithm) { |
网络通信组件初始化
QuorumPeer.createCnxnManager()
方法
1 | public QuorumCnxManager createCnxnManager() { |
QuorumCnxManager.QuorumCnxManager()
方法
1 | public QuorumCnxManager(QuorumPeer self, |
监听线程初始化
QuorumCnxManager.Listener.run()
方法
1 |
|
选举准备
FastLeaderElection.FastLeaderElection()
方法
1 | public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ |
FastLeaderElection.starter()
方法
1 | private void starter(QuorumPeer self, QuorumCnxManager manager) { |
集群选举执行
选举整体流程
提示
FastLeaderElection
类专门用于管理集群选举的。它有两个核心的内部线程类,包括WorkerSender(发送选举投票消息)
和WorkerReceiver(接收选举投票消息)
。QuorumCnxManager
类专门用于管理集群之间的通信,比如集群节点之间的消息发送与消息接收。它有两个核心的内部线程类,包括SendWorker(发送消息)
、RecvWorker(接收消息)
。
第一部分代码
QuorumPeerMain.initializeAndRun()
方法
1 | protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException |
QuorumPeerMain.runFromConfig()
方法
1 | public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException |
QuorumPeer.start()
方法
1 |
|
QuorumPeer.run()
方法。值得一提的是,上面执行super.start()
就相当于执行QuorumPeer
类中的run()
方法,表示开始执行选举。
1 |
|
FastLeaderElection.lookForLeader()
方法
1 | public Vote lookForLeader() throws InterruptedException { |
FastLeaderElection.sendNotifications()
方法,负责广播选票,将自己的选票发送给其他集群节点。
1 | private void sendNotifications() { |
FastLeaderElection.Messenger.WorkerSender
线程类,负责将选举投票消息发送给其他集群节点。
1 | class WorkerSender extends ZooKeeperThread { |
QuorumCnxManager.toSend()
方法,发送消息给其他集群节点。
1 | public void toSend(Long sid, ByteBuffer b) { |
第二部分代码
QuorumCnxManager.addToRecvQueue()
方法,将接收到的消息添加到接收队列。
1 | public void addToRecvQueue(Message msg) { |
QuorumCnxManager.addToSendQueue()
方法,将需要发送的消息添加到发送队列。
1 | private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, |
QuorumCnxManager.connectOne()
方法,与要发送消息的集群节点建立通信连接。
1 | synchronized void connectOne(long sid){ |
QuorumCnxManager.connectOne()
方法,与要发送消息的集群节点建立通信连接。
1 | synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){ |
QuorumCnxManager.initiateConnection()
方法,建立通信连接
1 | public void initiateConnection(final Socket sock, final Long sid) { |
QuorumCnxManager.startConnection()
方法,创建并启动发送器线程和接收器线程。
1 | private boolean startConnection(Socket sock, Long sid) throws IOException { |
QuorumCnxManager.SendWorker.run()
方法,发送器线程的核心处理逻辑。
1 |
|
QuorumCnxManager.SendWorker.send()
方法,将消息发送出去。
1 | synchronized void send(ByteBuffer b) throws IOException { |
QuorumCnxManager.RecvWorker.run()
方法,接收器线程的核心处理逻辑。
1 |
|
QuorumCnxManager.addToRecvQueue()
方法,将接收到的消息添加到接收队列中。
1 | public void addToRecvQueue(Message msg) { |
FastLeaderElection.Messenger.WorkerReceiver
线程类,负责接收其他集群节点发送过来的选举投票消息。
1 | class WorkerReceiver extends ZooKeeperThread { |