ZooKeeper 入门教程之五

大纲

前言

学习资源

版本说明

软件版本说明
ZooKeeper 的源码3.5.7本文给出的 ZooKeeper 源码分析,都是基于 ZooKeeper 的 3.5.7 版本进行讲解。

ZK 辅助源码分析

ZK 持久化源码

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中(如下图所示)。在 org.apache.zookeeper.server.persistence 包下的相关类都是持久化相关的代码。

  • (1) ZooKeeper 中的数据模型是一棵树(DataTree),每个节点叫做 DataNode。
  • (2) ZooKeeper 集群中的 DataTree 时刻都保持状态同步。
  • (3) Zookeeper 集群中每个节点的数据在内存和磁盘中都有一份完整的数据。
    • 内存数据:DataTree
    • 磁盘数据:快照文件 + 编辑日志

  • 快照接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface SnapShot {

// 反序列化方法
long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;

// 序列化方法
void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException;

// 查找最近的快照文件
File findMostRecentSnapshot() throws IOException;

// 释放资源
void close() throws IOException;

}
  • 日志接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public interface TxnLog {

// 设置服务状态
void setServerStats(ServerStats serverStats);

// 滚动日志
void rollLog() throws IOException;

// 追加日志
boolean append(TxnHeader hdr, Record r) throws IOException;

// 读取数据
TxnIterator read(long zxid) throws IOException;

// 获取最后一个 zxid
long getLastLoggedZxid() throws IOException;

// 删除日志
boolean truncate(long zxid) throws IOException;

// 获取 DbId
long getDbId() throws IOException;

// 提交
void commit() throws IOException;

// 日志同步时间
long getTxnLogSyncElapsedTime();

// 关闭日志
void close() throws IOException;

// 读取日志的接口
public interface TxnIterator {

// 获取头信息
TxnHeader getHeader();

// 获取传输的内容
Record getTxn();

// 下一条记录
boolean next() throws IOException;

// 关闭资源
void close() throws IOException;

// 获取存储的大小
long getStorageSize() throws IOException;

}

}
  • 处理持久化的核心类

ZK 序列化源码

zookeeper-jute 模块是 ZooKeeper 序列化相关的源码。

  • 序列化和反序列化方法
1
2
3
4
5
6
7
8
9
public interface Record {

// 序列化方法
public void serialize(OutputArchive archive, String tag) throws IOException;

// 反序列化方法
public void deserialize(InputArchive archive, String tag) throws IOException;

}
  • 迭代接口
1
2
3
4
5
6
7
8
9
public interface Index {

// 结束
public boolean done();

// 下一个
public void incr();

}
  • 序列化支持的数据类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface OutputArchive {

public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag) throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List<?> v, String tag) throws IOException;
public void endVector(List<?> v, String tag) throws IOException;
public void startMap(TreeMap<?,?> v, String tag) throws IOException;
public void endMap(TreeMap<?,?> v, String tag) throws IOException;

}
  • 反序列化支持的数据类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface InputArchive {

public byte readByte(String tag) throws IOException;
public boolean readBool(String tag) throws IOException;
public int readInt(String tag) throws IOException;
public long readLong(String tag) throws IOException;
public float readFloat(String tag) throws IOException;
public double readDouble(String tag) throws IOException;
public String readString(String tag) throws IOException;
public byte[] readBuffer(String tag) throws IOException;
public void readRecord(Record r, String tag) throws IOException;
public void startRecord(String tag) throws IOException;
public void endRecord(String tag) throws IOException;
public Index startVector(String tag) throws IOException;
public void endVector(String tag) throws IOException;
public Index startMap(String tag) throws IOException;
public void endMap(String tag) throws IOException;

}

ZK 服务端初始化源码分析

ZK 服务端启动脚本

ZooKeeper 服务端的启动命令是 zkServer.sh start,其中 zkServer.sh 服务端脚本的核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env bash

# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"

if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh # 相当于获取 zkEnv.sh 中的环境变量(ZOOCFG="zoo.cfg")
fi

# See the following page for extensive details on setting
# up the JVM to accept JMX remote management:
# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
# by default we allow local JMX connections
if [ "x$JMXLOCALONLY" = "x" ]
then
JMXLOCALONLY=false
fi

if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
echo "ZooKeeper JMX enabled by default" >&2
if [ "x$JMXPORT" = "x" ]
then
# for some reason these two options are necessary on jdk6 on Ubuntu
# accord to the docs they are not necessary, but otw jconsole cannot
# do a local attach
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
else
if [ "x$JMXAUTH" = "x" ]
then
JMXAUTH=false
fi
if [ "x$JMXSSL" = "x" ]
then
JMXSSL=false
fi
if [ "x$JMXLOG4J" = "x" ]
then
JMXLOG4J=true
fi
echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi

if [ "x$SERVER_JVMFLAGS" != "x" ]
then
JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"
fi

......

case $1 in
start)
echo -n "Starting zookeeper ... "
if [ -f "$ZOOPIDFILE" ]; then
if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
echo $command already running as process `cat "$ZOOPIDFILE"`.
exit 1
fi
fi
nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

......

;;
stop)
echo -n "Stopping zookeeper ... "
if [ ! -f "$ZOOPIDFILE" ]
then
echo "no zookeeper to stop (could not find file $ZOOPIDFILE)"
else
$KILL $(cat "$ZOOPIDFILE")
rm "$ZOOPIDFILE"
sleep 1
echo STOPPED
fi
exit 0
;;
restart)
shift
"$0" stop ${@}
sleep 3
"$0" start ${@}
;;
status)

.....

;;
*)
echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}" >&2

esac

由于 zkServer.sh start 命令在底层实际执行内容如下:

1
2
3
4
nohup "$JAVA"
+ 一堆提交参数
+ $ZOOMAIN (org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG" (zkEnv.sh 文件中 ZOOCFG="zoo.cfg")

所以 ZooKeeper 服务端的启动入口是 QuorumPeerMain 类。

ZK 服务端启动入口

  • 服务端的启动入口类 QuorumPeerMain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.apache.zookeeper.server.quorum;

public class QuorumPeerMain {

public static void main(String[] args) {
// 创建一个 ZK 节点
QuorumPeerMain main = new QuorumPeerMain();

try {
// 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
main.initializeAndRun(args);
} catch (IllegalArgumentException e)
{
......
}

LOG.info("Exiting normally");
System.exit(0);
}

}
  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}

解析配置参数

提示

  • ZooKeeper 在启动时,会解析 zoo.cfgmyid 配置文件(集群模式才需要有 myid 文件),从而获取配置参数。
  • zoo.cfg 配置文件,定义了整个 ZooKeeper 集群的配置参数,比如端口号、数据存储路径和服务器列表。
  • myid 节点标识文件,存放了当前节点的唯一编号,用于区分集群中的各个节点。
  • QuorumPeerConfig.parse() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void parse(String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);

try {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);

Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 加载配置文件
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}

// 解析配置文件
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}

......
}
  • QuorumPeerConfig.parseProperties() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
int clientPort = 0;
int secureClientPort = 0;
String clientPortAddress = null;
String secureClientPortAddress = null;

// 读取 zoo.cfg 文件中的配置参数,并赋值给 QuorumPeerConfig 的类对象
VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = vff.create(value);
} else if (key.equals("dataLogDir")) {
dataLogDir = vff.create(value);
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("localSessionsEnabled")) {
localSessionsEnabled = Boolean.parseBoolean(value);
} else if (key.equals("localSessionsUpgradingEnabled")) {
localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("secureClientPort")) {
secureClientPort = Integer.parseInt(value);
} else if (key.equals("secureClientPortAddress")){
secureClientPortAddress = value.trim();
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
} else if (key.equals("maxSessionTimeout")) {
maxSessionTimeout = Integer.parseInt(value);
} else if (key.equals("initLimit")) {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
}

......
}

......

if (dynamicConfigFileStr == null) {
setupQuorumPeerConfig(zkProp, true);
if (isDistributed() && isReconfigEnabled()) {
backupOldConfig();
}
}
}
  • QuorumPeerConfig.setupQuorumPeerConfig() 方法
1
2
3
4
5
6
7
8
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
// 初始化 myid
setupMyId();
setupClientPort();
setupPeerType();
checkValidity();
}
  • QuorumPeerConfig.setupMyId() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void setupMyId() throws IOException {
File myIdFile = new File(dataDir, "myid");
// standalone server doesn't need myid file.
if (!myIdFile.isFile()) {
return;
}
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();
}
try {
// 解析 myid 文件中的 id,并赋值给 serverId
serverId = Long.parseLong(myIdString);
MDC.put("myid", myIdString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("serverid " + myIdString
+ " is not a number");
}
}

过期快照删除

  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
// snapRetainCount = 3,最少保留的快照数量
// purgeInterval = 0,默认值为 0,表示默认关闭快照清除任务
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}
  • DatadirCleanupManager.start() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// 默认情况 purgeInterval 的值为 0,也就是删除任务默认会关闭,直接返回
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}

// 创建一个定时器
timer = new Timer("PurgeTask", true);
// 创建一个清理快照的任务
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
// 如果 purgeInterval 的值设置为 1,则表示 1 小时检查一次是否有快照过期,有则删除快照
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

purgeTaskStatus = PurgeTaskStatus.STARTED;
}
  • PurgeTask 任务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static class PurgeTask extends TimerTask {
private File logsDir;
private File snapsDir;
private int snapRetainCount;

public PurgeTask(File dataDir, File snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}

@Override
public void run() {
LOG.info("Purge task started.");
try {
// 清理过期快照
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
}
  • PurgeTxnLog.purge() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}

FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

// 获取最近的快照
List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
// 清理旧快照
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}

初始化通信组件

I/O 模型默认为 NIO 通信

  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}
  • QuorumPeerMain.runFromConfig() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

// 通信组件初始化,默认是使用 NIO 通信(可以支持 Netty)
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}

if (config.getSecureClientPortAddress() != null) {
// 初始化 NIO 服务端的 Socket,绑定 2181 端口
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}

// 将解析到的参数赋值给该 ZK 集群节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
// 管理 ZK 数据的存储
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
// 管理 ZK 的网络通信
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

// 启动 ZK 集群节点
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
  • ServerCnxnFactory.createFactory() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";

static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}

zookeeper-docs 模块的 zookeeperAdmin.md 文件中,有以下这么一段说明:

1
2
3
4
5
serverCnxnFactory :
(Java system property: zookeeper.serverCnxnFactory)
Specifies ServerCnxnFactory implementation.
This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
Default is `NIOServerCnxnFactory`.

初始化 NIO 服务端的 Socket

提示

这里初始化 NIO 服务端的 Socket 时,ZooKeeper 并未启动。

  • NIOServerCnxnFactory.configure() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();

maxClientCnxns = maxcc;
sessionlessCnxnTimeout = Integer.getInteger(
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
cnxnExpiryQueue =
new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();

int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores/2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}

numWorkerThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

LOG.info("Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout/1000) + "s sessionless connection"
+ " timeout, " + numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." :
("" + (directBufferBytes/1024) + " kB direct buffers.")));
for(int i=0; i<numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}

// 初始化 NIO 服务端的 Socket,绑定 2181 端口,可以接收客户端请求
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
// 绑定 2181 端口
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

ZK 服务端加载数据源码分析

ZK 的数据存储模型

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中(如下图所示)。

  • (1) ZooKeeper 中的数据模型是一棵树(DataTree),每个节点叫做 DataNode。
  • (2) ZooKeeper 集群中的 DataTree 时刻都保持状态同步。
  • (3) Zookeeper 集群中每个节点的数据在内存和磁盘中都有一份完整的数据。
    • 内存数据:DataTree
    • 磁盘数据:快照文件 + 编辑日志

ZK 集群节点的启动

  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}
  • QuorumPeerMain.runFromConfig() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

// 通信组件初始化,默认是使用 NIO 通信(可以支持 Netty)
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}

if (config.getSecureClientPortAddress() != null) {
// 初始化 NIO 服务端的 Socket,绑定 2181 端口
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}

// 将解析到的参数赋值给该 ZK 集群节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
// 管理 ZK 数据的存储
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
// 管理 ZK 的网络通信
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

// 启动 ZK 集群节点
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}

冷启动数据恢复快照数据

  • QuorumPeer.start() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动恢复数据
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 准备选举环境
startLeaderElection();
// 执行选举
super.start();
}
  • QuorumPeer.loadDataBase() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void loadDataBase() {
try {
// ZK 的操作分两种:事务操作和非事务操作
// 事务操作:zk.cteate(),都会被分配一个全局唯一的 zxid。这里 zxid 的组成:64 位(前 32 位:epoch 每个 Leader 任期的代号;后 32 位:txid 为事务 ID)
// 非事务操作:zk.getData()

// 加载磁盘数据到内存,恢复 DataTree,数据恢复过程:
// (1) 从快照文件中恢复大部分数据,并得到一个 lastProcessZXid
// (2) 再从编辑日志中执行 replay,执行到最后一条日志并更新 lastProcessZXid
// (3) 最终得到 DataTree 和 lastProcessZXid,表示数据恢复完成
zkDb.loadDataBase();

// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
}
} catch(IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
  • ZKDatabase.loadDataBase() 方法
1
2
3
4
5
public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
  • FileTxnSnapLog.restore() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 恢复快照文件数据到内存(DataTree)
long deserializeResult = snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);

RestoreFinalizer finalizer = () -> {
// 恢复编辑日志数据到内存(DataTree)
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
return highestZxid;
};

if (-1L == deserializeResult) {
/* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325) */
if (txnLog.getLastLoggedZxid() != -1) {
// ZOOKEEPER-3056: provides an escape hatch for users upgrading
// from old versions of zookeeper (3.4.x, pre 3.5.3).
if (!trustEmptySnapshot) {
throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
} else {
LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
return finalizer.run();
}
}
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
/* return a zxid of zero, since we the database is empty */
return 0;
}

return finalizer.run();
}
  • FileSnap.deserialize() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
// 依次遍历每一个快照的数据
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot " + snap);
// 反序列化环境准备
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
// 反序列化,恢复快照文件数据到内存中的 DataTree
deserialize(dt, sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file " + snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}
  • FileSnap.deserialize() 方法
1
2
3
4
5
6
7
8
9
10
11
public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
FileHeader header = new FileHeader();
header.deserialize(ia, "fileheader");
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers "
+ header.getMagic() +
" != " + FileSnap.SNAP_MAGIC);
}
// 反序列化,恢复快照文件数据到内存中的 DataTree
SerializeUtils.deserializeSnapshot(dt,ia,sessions);
}
  • SerializeUtils.deserializeSnapshot() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
int count = ia.readInt("count");
while (count > 0) {
long id = ia.readLong("id");
int to = ia.readInt("timeout");
sessions.put(id, to);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"loadData --- session in archive: " + id
+ " with timeout: " + to);
}
count--;
}
// 反序列化,恢复快照文件数据到内存中的 DataTree
dt.deserialize(ia, "tree");
}
  • DataTree.deserialize() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
String path = ia.readString("path");
while (!"/".equals(path)) {
// 每次循环创建一个 DataNode 对象
DataNode node = new DataNode();
ia.readRecord(node, "node");
// 将 DataNode 恢复到 DataTree
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
// 处理父节点
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException("Invalid Datatree, unable to find " +
"parent " + parentPath + " of path " + path);
}

// 处理子节点
parent.addChild(path.substring(lastSlash + 1));

// 处理临时节点和持久节点
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
nodes.put("/", root);
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
setupQuota();

aclCache.purgeUnused();
}

冷启动数据恢复编辑日志

  • QuorumPeer.start() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动恢复数据
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 准备选举环境
startLeaderElection();
// 执行选举
super.start();
}
  • QuorumPeer.loadDataBase() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void loadDataBase() {
try {
// ZK 的操作分两种:事务操作和非事务操作
// 事务操作:zk.cteate(),都会被分配一个全局唯一的 zxid。这里 zxid 的组成:64 位(前 32 位:epoch 每个 Leader 任期的代号;后 32 位:txid 为事务 ID)
// 非事务操作:zk.getData()

// 加载磁盘数据到内存,恢复 DataTree,数据恢复过程:
// (1) 从快照文件中恢复大部分数据,并得到一个 lastProcessZXid
// (2) 再从编辑日志中执行 replay,执行到最后一条日志并更新 lastProcessZXid
// (3) 最终得到 DataTree 和 lastProcessZXid,表示数据恢复完成
zkDb.loadDataBase();

// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
}
} catch(IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
  • ZKDatabase.loadDataBase() 方法
1
2
3
4
5
public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
  • FileTxnSnapLog.restore() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 恢复快照文件数据到内存(DataTree)
long deserializeResult = snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);

RestoreFinalizer finalizer = () -> {
// 恢复编辑日志数据到内存(DataTree)
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
return highestZxid;
};

if (-1L == deserializeResult) {
/* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325) */
if (txnLog.getLastLoggedZxid() != -1) {
// ZOOKEEPER-3056: provides an escape hatch for users upgrading
// from old versions of zookeeper (3.4.x, pre 3.5.3).
if (!trustEmptySnapshot) {
throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
} else {
LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
return finalizer.run();
}
}
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
/* return a zxid of zero, since we the database is empty */
return 0;
}

return finalizer.run();
}
  • FileTxnSnapLog.fastForwardFromEdits() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 在此之前,已经从快照文件中恢复了大部分数据,接下来只需要从快照的 zxid + 1 位置开始恢复
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
// 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有恢复操作执行完成
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
// 从 lastProcessedZxid 事务编号器开始,不断地从编辑日志中恢复剩下的还没有恢复的数据
while (true) {
// iterator points to
// the first valid txn when initialized
// 获取事务头信息(含有 zxid)
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(highestZxid) > {}(next log) for type {}",
highestZxid, hdr.getZxid(), hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
try {
// 根据编辑日志恢复数据到内存(DataTree)
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
  • FileTxnSnapLog.processTransaction() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException {
ProcessTxnResult rc;
switch (hdr.getType()) {
case OpCode.createSession:
sessions.put(hdr.getClientId(),
((CreateSessionTxn) txn).getTimeOut());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"playLog --- create session in log: 0x"
+ Long.toHexString(hdr.getClientId())
+ " with timeout: "
+ ((CreateSessionTxn) txn).getTimeOut());
}
// give dataTree a chance to sync its lastProcessedZxid
rc = dt.processTxn(hdr, txn);
break;
case OpCode.closeSession:
sessions.remove(hdr.getClientId());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"playLog --- close session in log: 0x"
+ Long.toHexString(hdr.getClientId()));
}
rc = dt.processTxn(hdr, txn);
break;
default:
// 创建节点、删除节点和其他的各种事务操作
rc = dt.processTxn(hdr, txn);
}

/**
* Snapshots are lazily created. So when a snapshot is in progress,
* there is a chance for later transactions to make into the
* snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
* errors could occur. It should be safe to ignore these.
*/
if (rc.err != Code.OK.intValue()) {
LOG.debug(
"Ignoring processTxn failure hdr: {}, error: {}, path: {}",
hdr.getType(), rc.err, rc.path);
}
}
  • FileTxnSnapLog.processTxn() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn)
{
ProcessTxnResult rc = new ProcessTxnResult();

try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime(), null);
break;
case OpCode.create2:
CreateTxn create2Txn = (CreateTxn) txn;
rc.path = create2Txn.getPath();
Stat stat = new Stat();
createNode(
create2Txn.getPath(),
create2Txn.getData(),
create2Txn.getAcl(),
create2Txn.getEphemeral() ? header.getClientId() : 0,
create2Txn.getParentCVersion(),
header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.createTTL:
CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
rc.path = createTtlTxn.getPath();
stat = new Stat();
createNode(
createTtlTxn.getPath(),
createTtlTxn.getData(),
createTtlTxn.getAcl(),
EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
createTtlTxn.getParentCVersion(),
header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.createContainer:
CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
rc.path = createContainerTxn.getPath();
stat = new Stat();
createNode(
createContainerTxn.getPath(),
createContainerTxn.getData(),
createContainerTxn.getAcl(),
EphemeralType.CONTAINER_EPHEMERAL_OWNER,
createContainerTxn.getParentCVersion(),
header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.delete:
case OpCode.deleteContainer:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.reconfig:
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
case OpCode.setACL:
SetACLTxn setACLTxn = (SetACLTxn) txn;
rc.path = setACLTxn.getPath();
rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
setACLTxn.getVersion());
break;
case OpCode.closeSession:
killSession(header.getClientId(), header.getZxid());
break;
case OpCode.error:
ErrorTxn errTxn = (ErrorTxn) txn;
rc.err = errTxn.getErr();
break;
case OpCode.check:
CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
rc.path = checkTxn.getPath();
break;
case OpCode.multi:
MultiTxn multiTxn = (MultiTxn) txn ;
List<Txn> txns = multiTxn.getTxns();
rc.multiResult = new ArrayList<ProcessTxnResult>();
boolean failed = false;
for (Txn subtxn : txns) {
if (subtxn.getType() == OpCode.error) {
failed = true;
break;
}
}

......
}
} catch (KeeperException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed: " + header + ":" + txn, e);
}
rc.err = e.code().intValue();
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed: " + header + ":" + txn, e);
}
}

......

return rc;
}

ZK 集群选举源码分析

集群选举机制

ZooKeeper 集群的选举机制是高频面试题,其中分为第一次启动和非第一次启动两种情况。

集群第一次启动

  • (1) 服务器 1 启动,发起一次选举。服务器 1 投自己一票,此时服务器 1 的票数是一票,不够半数以上(一共有 5 票),选举无法完成,服务器 1 的状态保持为 LOOKING。

  • (2) 服务器 2 启动,再发起一次选举。服务器 1 和 2 分别投自己一票并交换选票信息,此时服务器 1 发现服务器 2 的 myid 比自己目前投票推举的(服务器 1)大,更改选票为推举服务器 2。此时服务器 1 的票数是 0 票,服务器 2 的票数是 2 票,两者都没有半数以上的票数,选举无法完成,服务器 1 和 服务器 2 的状态保持 LOOKING。

  • (3) 服务器 3 启动,发起一次选举。此时服务器 1 和 2 都会更改选票为服务器 3。此次投票结果:服务器 1 有 0 票,服务器 2 有 0 票,服务器 3 有 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选 Leader。服务器 3 将状态更改为 LEADING,服务器 1 和 服务器 2 将状态更改为 FOLLOWING。

  • (4) 服务器 4 启动,发起一次选举。此时服务器 1、2、3 已经不是 LOOKING 状态,不会再更改选票信息。交换选票信息后的结果:服务器 3 有 3 票,服务器 4 有 1 票。此时服务器 4 少数服从多数,更改选票信息为服务器 3,并将自己的状态更改为 FOLLOWING。

  • (5) 服务器 5 启动,跟服务器 4 一样更改选票信息为服务器 3,并将自己的状态更改为 FOLLOWING。

特别注意

一旦 ZooKeeper 集群中已经存在 Leader 节点,那么后面加入集群的节点都只会成为 Follower 节点。

集群非第一次启动

  • (1) 当 ZooKeeper 集群中的任意一台服务器出现以下两种情况之一时,就会开始进入 Leader 选举:

    • 服务器初始化启动。
    • 服务器运行期间无法和 Leader 保持连接。
  • (2) 当一台服务器进入 Leader 选举流程时,当前集群也可能会处于以下两种状态之一:

    • 集群中本来就已经存在一个 Leader

      • 对于集群已经存在 Leader 的情况,当服务器试图去选举 Leader 时,会被告知当前服务器的 Leader 信息;对于该服务器来说,仅仅需要和 Leader 服务器建立连接,并进行状态同步即可。
    • 集群中确实不存在 Leader

      • 对于集群不存在 Leader 的情况,整个集群会进入选举流程。
      • 假设 ZooKeeper 集群由 5 台服务器组成,SID 分别为 1、2、3、4、5,ZXID 分别为 8、8、8、7、7, 并且此时 SID 为 3 的服务器是 Leader。在某一时刻,SID 为 3 和 5 的服务器出现故障,由于 Leader 挂掉了,因此开始进行 Leader 选举。SID 为 1、2、4 的服务器的最终投票情况如下:
        1
        2
        (EPOCH, ZXID, SID ) (EPOCH, ZXID, SID ) (EPOCH, ZXID, SID )
        (1, 8, 1) (1, 8, 2) (1, 7, 4)
      • 选举 Leader 的规则:①EPOCH 大的直接胜出     ②EPOCH 相同,ZXID(事务 ID)大的胜出     ③ZXID(事务 ID) 相同,SID(服务器 ID,即 myid) 大的胜出
      • 选举 Leader 的结果:SID 为 2 的服务器会被选举 Leader,SID 为 1 和 4 的服务器作为 Follower。

集群选举机制总结

ZooKeeper 使用半数投票机制来实现选举,超过半数的投票通过,即通过。

  • 集群第一次启动时的选举规则

    • 投票过半数时,服务器 ID 大的胜出
  • 集群非第一次启动时的选举规则

    • (1) EPOCH 大的直接胜出
    • (2) EPOCH 相同,事务 ID(ZXID)大的胜出
    • (3) 事务 ID(ZXID)相同,服务器 ID(SID)大的胜出

集群选举准备

  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}
  • QuorumPeerMain.runFromConfig() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

// 通信组件初始化,默认是使用 NIO 通信(可以支持 Netty)
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}

if (config.getSecureClientPortAddress() != null) {
// 初始化 NIO 服务端的 Socket,绑定 2181 端口
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}

// 将解析到的参数赋值给该 ZK 集群节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());

......

// 启动 ZK 集群节点
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
  • QuorumPeer.start() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动恢复数据
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 选举准备
startLeaderElection();
// 执行选举
super.start();
}
  • QuorumPeer.startLeaderElection() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
// 创建选票
// (1) 选票组件:epoch(Leader 的任期代号)、zxid(某个 Leader 当选期间执行的事务编号)、myid(ServerID)
// (2) 开始投票时,每个节点都是先投自己一票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}

// if (!getView().containsKey(myid)) {
// throw new RuntimeException("My id " + myid + " not in the peer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(getQuorumAddress().getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
// 创建选举算法实例
this.electionAlg = createElectionAlgorithm(electionType);
}
  • QuorumPeer.createElectionAlgorithm() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le=null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
// (1) 创建 QuorumCnxnManager,负责选举过程中的所有网络通信
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
// (2) 启动监听线程
listener.start();
// (3) 准备开始选举
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

网络通信组件初始化

  • QuorumPeer.createCnxnManager() 方法
1
2
3
4
5
6
7
8
9
10
11
public QuorumCnxManager createCnxnManager() {
return new QuorumCnxManager(this,
this.getId(),
this.getView(),
this.authServer,
this.authLearner,
this.tickTime * this.syncLimit,
this.getQuorumListenOnAllIPs(),
this.quorumCnxnThreadsSize,
this.isQuorumSaslAuthEnabled());
}
  • QuorumCnxManager.QuorumCnxManager() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public QuorumCnxManager(QuorumPeer self,
final long mySid,
Map<Long,QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled) {
// 创建各种队列
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();

String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}

this.self = self;

this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;

initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);

// Starts listener thread that waits for connection requests
listener = new Listener();
listener.setName("QuorumPeerListener");
}

监听线程初始化

  • QuorumCnxManager.Listener.run() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
Exception exitException = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
if (self.shouldUsePortUnification()) {
LOG.info("Creating TLS-enabled quorum server socket");
ss = new UnifiedServerSocket(self.getX509Util(), true);
} else if (self.isSslQuorum()) {
LOG.info("Creating TLS-only quorum server socket");
ss = new UnifiedServerSocket(self.getX509Util(), false);
} else {
ss = new ServerSocket();
}

ss.setReuseAddress(true);

if (self.getQuorumListenOnAllIPs()) {
int port = self.getElectionAddress().getPort();
addr = new InetSocketAddress(port);
} else {
// Resolve hostname for this server in case the
// underlying ip address has changed.
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();
}
LOG.info("My election bind port: " + addr.toString());
setName(addr.toString());
// 绑定服务器地址
ss.bind(addr);
// 死循环
while (!shutdown) {
try {
// 阻塞等待处理请求
client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {
LOG.warn("The socket is listening for the election accepted "
+ "and it timed out unexpectedly, but will retry."
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException e) {
if (shutdown) {
break;
}
LOG.error("Exception while listening", e);
exitException = e;
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
closeSocket(client);
}
}

......
}

选举准备

  • FastLeaderElection.FastLeaderElection() 方法
1
2
3
4
5
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
  • FastLeaderElection.starter() 方法
1
2
3
4
5
6
7
8
9
10
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;

// 初始化队列和信息
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}

集群选举执行

选举整体流程

提示

  • FastLeaderElection 类专门用于管理集群选举的。它有两个核心的内部线程类,包括 WorkerSender(发送选举投票消息)WorkerReceiver(接收选举投票消息)
  • QuorumCnxManager 类专门用于管理集群之间的通信,比如集群节点之间的消息发送与消息接收。它有两个核心的内部线程类,包括 SendWorker(发送消息)RecvWorker(接收消息)

第一部分代码

  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}
  • QuorumPeerMain.runFromConfig() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

// 通信组件初始化,默认是使用 NIO 通信(可以支持 Netty)
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}

if (config.getSecureClientPortAddress() != null) {
// 初始化 NIO 服务端的 Socket,绑定 2181 端口
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}

// 将解析到的参数赋值给该 ZK 集群节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());

......

// 启动 ZK 集群节点
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
  • QuorumPeer.start() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动恢复数据
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 准备选举环境
startLeaderElection();
// 执行选举
super.start();
}
  • QuorumPeer.run() 方法。值得一提的是,上面执行 super.start() 就相当于执行 QuorumPeer 类中的 run() 方法,表示开始执行选举。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@Override
public void run() {
updateThreadName();

LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}

try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");

if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
// 进行选举,选举结束后返回最终成为 Leader 胜选的那张选票
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);

for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
}

jmxQuorumBean = null;
jmxLocalPeerBean = null;
jmxRemotePeerBean = null;
}
}
  • FastLeaderElection.lookForLeader() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
// 节点正在启动时,所有其他集群节点都会给我发送一个投票
// 保存每一个集群节点的最新合法有效的投票结果
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

// 存储合法选举之外的投票结果
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

// 一次选举的最大等待时间,默认值是 0.2s
int notTimeout = finalizeWait;

// 每发起一轮选举,logicalclock++
// 在没有合法的 epoch 数据之前,都使用逻辑时钟代替
// 选举 Leader 的规则:依次比较 epoch(任期的代号)、zxid(事务 ID)、ServerID(myid),谁大谁当选 Leader
synchronized(this){
// 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
logicalclock.incrementAndGet();
// 更新选票(serverid、zxid、epoch)
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 广播选票,将自己的选票发给其他集群节点
sendNotifications();

/*
* Loop in which we exchange notifications until we find a leader
*/
// 一轮一轮地选举,直到选举成功
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
......
}
return null;
} finally {
......
}
}
  • FastLeaderElection.sendNotifications() 方法,负责广播选票,将自己的选票发送给其他集群节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void sendNotifications() {
// 遍历投票参与者,给每个集群节点发送选票
for (long sid : self.getCurrentAndNextConfigVoters()) {
// 创建需要发送的选票
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 将需要发送的选票放入发送队列
sendqueue.offer(notmsg);
}
}
  • FastLeaderElection.Messenger.WorkerSender 线程类,负责将选举投票消息发送给其他集群节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class WorkerSender extends ZooKeeperThread {
volatile boolean stop;
QuorumCnxManager manager;

WorkerSender(QuorumCnxManager manager){
super("WorkerSender");
this.stop = false;
this.manager = manager;
}

public void run() {
while (!stop) {
try {
// 队列阻塞,时刻准备接收要发送的选票
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;

// 处理需要发送的选票
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}

/**
* Called by run() once there is a new message to send.
*
* @param m message to send
*/
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
// 发送选票
manager.toSend(m.sid, requestBuffer);

}
}
  • QuorumCnxManager.toSend() 方法,发送消息给其他集群节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
// 如果是发给自己的消息,则直接进入自己的 RecvQueue 队列
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
// 如果是发给其他集群节点的消息,则获取已经存在的发送队列或者创建对应的发送队列,并将需要发送的消息放入该队列中
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);

}
}

第二部分代码

  • QuorumCnxManager.addToRecvQueue() 方法,将接收到的消息添加到接收队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void addToRecvQueue(Message msg) {
synchronized(recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"recvQueue. Ignoring exception " + ne);
}
}
try {
// 将发送给自己的消息添加到 recvQueue 队列
recvQueue.add(msg);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert element in the recvQueue " + ie);
}
}
}
  • QuorumCnxManager.addToSendQueue() 方法,将需要发送的消息添加到发送队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
ByteBuffer buffer) {
if (queue.remainingCapacity() == 0) {
try {
queue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"Queue. Ignoring exception " + ne);
}
}
try {
// 将需要发送的消息添加到发送队列
queue.add(buffer);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert an element in the queue " + ie);
}
}
  • QuorumCnxManager.connectOne() 方法,与要发送消息的集群节点建立通信连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server " + sid);
return;
}
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
return;
}
if (lastSeenQV != null && lastProposedView.containsKey(sid)
&& (!knownId || (lastProposedView.get(sid).electionAddr !=
lastCommittedView.get(sid).electionAddr))) {
knownId = true;
if (connectOne(sid, lastProposedView.get(sid).electionAddr))
return;
}
if (!knownId) {
LOG.warn("Invalid server id: " + sid);
return;
}
}
}
  • QuorumCnxManager.connectOne() 方法,与要发送消息的集群节点建立通信连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server " + sid);
return true;
}

Socket sock = null;
try {
LOG.debug("Opening channel to server " + sid);
if (self.isSslQuorum()) {
SSLSocket sslSock = self.getX509Util().createSSLSocket();
setSockOpts(sslSock);
sslSock.connect(electionAddr, cnxTO);
sslSock.startHandshake();
sock = sslSock;
LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite());
} else {
sock = new Socket();
setSockOpts(sock);
sock.connect(electionAddr, cnxTO);

}
LOG.debug("Connected to server " + sid);
// Sends connection request asynchronously if the quorum
// sasl authentication is enabled. This is required because
// sasl server authentication process may take few seconds to
// finish, this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
initiateConnectionAsync(sock, sid);
} else {
// 建立连接
initiateConnection(sock, sid);
}
return true;
} catch (UnresolvedAddressException e) {
......
}
......
}
  • QuorumCnxManager.initiateConnection() 方法,建立通信连接
1
2
3
4
5
6
7
8
9
10
public void initiateConnection(final Socket sock, final Long sid) {
try {
startConnection(sock, sid);
} catch (IOException e) {
LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
new Object[] { sid, sock.getRemoteSocketAddress() }, e);
closeSocket(sock);
return;
}
}
  • QuorumCnxManager.startConnection() 方法,创建并启动发送器线程和接收器线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean startConnection(Socket sock, Long sid) throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
// 通过输出流,向对方发送数据
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);

// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = formatInetAddr(self.getElectionAddress());
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();

// 通过输入流,读取对方发送过来的消息
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}

// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}

// If lost the challenge, then drop the new connection
// 如果对方的 myid 比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的 Socket 客户端
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
// 初始化发送器和接收器
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);

SendWorker vsw = senderWorkerMap.get(sid);

if(vsw != null)
vsw.finish();

senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));

// 启动发送器线程和接收器线程
sw.start();
rw.start();

return true;

}
return false;
}
  • QuorumCnxManager.SendWorker.run() 方法,发送器线程的核心处理逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
public void run() {
threadCnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}

try {
// 只要连接没有断开
while (running && !shutdown && sock != null) {

ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
// 不断从发送队列 SendQueue 中获取需要发送的消息
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}

if(b != null){
// 更新对于 sid 这个集群节点的最近一条消息
lastMessageSent.put(sid, b);
// 将消息发送出去
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid
+ " my id = " + QuorumCnxManager.this.mySid
+ " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}
  • QuorumCnxManager.SendWorker.send() 方法,将消息发送出去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
b.position(0);
b.get(msgBytes);
} catch (BufferUnderflowException be) {
LOG.error("BufferUnderflowException ", be);
return;
}
// 通过输出流,向外发送消息
dout.writeInt(b.capacity());
dout.write(b.array());
dout.flush();
}
  • QuorumCnxManager.RecvWorker.run() 方法,接收器线程的核心处理逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public void run() {
threadCnt.incrementAndGet();
try {
// 只要连接没有断开
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
byte[] msgArray = new byte[length];
// 通过输入流,接收发送过来的消息
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
// 将接收到的消息放入接收队列中
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = "
+ QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
closeSocket(sock);
}
}
  • QuorumCnxManager.addToRecvQueue() 方法,将接收到的消息添加到接收队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void addToRecvQueue(Message msg) {
synchronized(recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"recvQueue. Ignoring exception " + ne);
}
}
try {
// 将接收到的消息,放入接收队列中
recvQueue.add(msg);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert element in the recvQueue " + ie);
}
}
}
  • FastLeaderElection.Messenger.WorkerReceiver 线程类,负责接收其他集群节点发送过来的选举投票消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class WorkerReceiver extends ZooKeeperThread  {
volatile boolean stop;
QuorumCnxManager manager;

WorkerReceiver(QuorumCnxManager manager) {
super("WorkerReceiver");
this.stop = false;
this.manager = manager;
}

public void run() {

Message response;
while (!stop) {
// Sleeps on receive
try {
// 从接收队列中,取出其他集群节点发送过来的选举投票消息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);

......

} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}