ZooKeeper 入门教程之五

大纲

前言

学习资源

版本说明

软件版本说明
ZooKeeper 源码3.5.7

ZK 辅助源码分析

ZK 持久化源码

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中(如下图所示)。在 org.apache.zookeeper.server.persistence 包下的相关类都是持久化相关的代码。

  • 快照接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface SnapShot {

// 反序列化方法
long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;

// 序列化方法
void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException;

// 查找最近的快照文件
File findMostRecentSnapshot() throws IOException;

// 释放资源
void close() throws IOException;

}
  • 日志接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public interface TxnLog {

// 设置服务状态
void setServerStats(ServerStats serverStats);

// 滚动日志
void rollLog() throws IOException;

// 追加日志
boolean append(TxnHeader hdr, Record r) throws IOException;

// 读取数据
TxnIterator read(long zxid) throws IOException;

// 获取最后一个 zxid
long getLastLoggedZxid() throws IOException;

// 删除日志
boolean truncate(long zxid) throws IOException;

// 获取 DbId
long getDbId() throws IOException;

// 提交
void commit() throws IOException;

// 日志同步时间
long getTxnLogSyncElapsedTime();

// 关闭日志
void close() throws IOException;

// 读取日志的接口
public interface TxnIterator {

// 获取头信息
TxnHeader getHeader();

// 获取传输的内容
Record getTxn();

// 下一条记录
boolean next() throws IOException;

// 关闭资源
void close() throws IOException;

// 获取存储的大小
long getStorageSize() throws IOException;

}

}
  • 处理持久化的核心类

ZK 序列化源码

zookeeper-jute 模块是 ZooKeeper 序列化相关的源码。

  • 序列化和反序列化方法
1
2
3
4
5
6
7
8
9
public interface Record {

// 序列化方法
public void serialize(OutputArchive archive, String tag) throws IOException;

// 反序列化方法
public void deserialize(InputArchive archive, String tag) throws IOException;

}
  • 迭代接口
1
2
3
4
5
6
7
8
9
public interface Index {

// 结束
public boolean done();

// 下一个
public void incr();

}
  • 序列化支持的数据类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface OutputArchive {

public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag) throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List<?> v, String tag) throws IOException;
public void endVector(List<?> v, String tag) throws IOException;
public void startMap(TreeMap<?,?> v, String tag) throws IOException;
public void endMap(TreeMap<?,?> v, String tag) throws IOException;

}
  • 反序列化支持的数据类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface InputArchive {

public byte readByte(String tag) throws IOException;
public boolean readBool(String tag) throws IOException;
public int readInt(String tag) throws IOException;
public long readLong(String tag) throws IOException;
public float readFloat(String tag) throws IOException;
public double readDouble(String tag) throws IOException;
public String readString(String tag) throws IOException;
public byte[] readBuffer(String tag) throws IOException;
public void readRecord(Record r, String tag) throws IOException;
public void startRecord(String tag) throws IOException;
public void endRecord(String tag) throws IOException;
public Index startVector(String tag) throws IOException;
public void endVector(String tag) throws IOException;
public Index startMap(String tag) throws IOException;
public void endMap(String tag) throws IOException;

}

ZK 服务端初始化源码分析

ZK 服务端启动脚本

ZooKeeper 服务端的启动命令是 zkServer.sh start,其中 zkServer.sh 服务端脚本的核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env bash

# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"

if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh # 相当于获取 zkEnv.sh 中的环境变量(ZOOCFG="zoo.cfg")
fi

# See the following page for extensive details on setting
# up the JVM to accept JMX remote management:
# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
# by default we allow local JMX connections
if [ "x$JMXLOCALONLY" = "x" ]
then
JMXLOCALONLY=false
fi

if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
echo "ZooKeeper JMX enabled by default" >&2
if [ "x$JMXPORT" = "x" ]
then
# for some reason these two options are necessary on jdk6 on Ubuntu
# accord to the docs they are not necessary, but otw jconsole cannot
# do a local attach
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
else
if [ "x$JMXAUTH" = "x" ]
then
JMXAUTH=false
fi
if [ "x$JMXSSL" = "x" ]
then
JMXSSL=false
fi
if [ "x$JMXLOG4J" = "x" ]
then
JMXLOG4J=true
fi
echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi

if [ "x$SERVER_JVMFLAGS" != "x" ]
then
JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"
fi

......

case $1 in
start)
echo -n "Starting zookeeper ... "
if [ -f "$ZOOPIDFILE" ]; then
if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
echo $command already running as process `cat "$ZOOPIDFILE"`.
exit 1
fi
fi
nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

......

;;
stop)
echo -n "Stopping zookeeper ... "
if [ ! -f "$ZOOPIDFILE" ]
then
echo "no zookeeper to stop (could not find file $ZOOPIDFILE)"
else
$KILL $(cat "$ZOOPIDFILE")
rm "$ZOOPIDFILE"
sleep 1
echo STOPPED
fi
exit 0
;;
restart)
shift
"$0" stop ${@}
sleep 3
"$0" start ${@}
;;
status)

.....

;;
*)
echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}" >&2

esac

由于 zkServer.sh start 命令在底层实际执行内容如下:

1
2
3
4
nohup "$JAVA"
+ 一堆提交参数
+ $ZOOMAIN (org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG" (zkEnv.sh 文件中 ZOOCFG="zoo.cfg")

所以 ZooKeeper 服务端的启动入口是 QuorumPeerMain 类。

ZK 服务端启动入口

  • 服务端的启动入口类 QuorumPeerMain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.apache.zookeeper.server.quorum;

public class QuorumPeerMain {

public static void main(String[] args) {
// 创建一个 ZK 节点
QuorumPeerMain main = new QuorumPeerMain();

try {
// 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
main.initializeAndRun(args);
} catch (IllegalArgumentException e)
{
......
}

LOG.info("Exiting normally");
System.exit(0);
}

}
  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}

解析配置参数

提示

  • ZooKeeper 在启动时,会解析 zoo.cfgmyid 配置文件(集群模式才需要有 myid 文件),从而获取核心配置参数。
  • zoo.cfg 配置文件,定义了整个 ZooKeeper 集群的配置参数,比如端口号、数据存储路径和服务器列表。
  • myid 节点标识文件,存放了当前节点的唯一编号,用于区分集群中的各个节点。
  • QuorumPeerConfig.parse() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void parse(String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);

try {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);

Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 加载配置文件
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}

// 解析配置文件
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}

......
}
  • QuorumPeerConfig.parseProperties() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
int clientPort = 0;
int secureClientPort = 0;
String clientPortAddress = null;
String secureClientPortAddress = null;

// 读取 zoo.cfg 文件中的配置参数,并赋值给 QuorumPeerConfig 的类对象
VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = vff.create(value);
} else if (key.equals("dataLogDir")) {
dataLogDir = vff.create(value);
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("localSessionsEnabled")) {
localSessionsEnabled = Boolean.parseBoolean(value);
} else if (key.equals("localSessionsUpgradingEnabled")) {
localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("secureClientPort")) {
secureClientPort = Integer.parseInt(value);
} else if (key.equals("secureClientPortAddress")){
secureClientPortAddress = value.trim();
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
} else if (key.equals("maxSessionTimeout")) {
maxSessionTimeout = Integer.parseInt(value);
} else if (key.equals("initLimit")) {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
}

......
}

......

if (dynamicConfigFileStr == null) {
setupQuorumPeerConfig(zkProp, true);
if (isDistributed() && isReconfigEnabled()) {
backupOldConfig();
}
}
}
  • QuorumPeerConfig.setupQuorumPeerConfig() 方法
1
2
3
4
5
6
7
8
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
// 初始化 myid
setupMyId();
setupClientPort();
setupPeerType();
checkValidity();
}
  • QuorumPeerConfig.setupMyId() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void setupMyId() throws IOException {
File myIdFile = new File(dataDir, "myid");
// standalone server doesn't need myid file.
if (!myIdFile.isFile()) {
return;
}
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();
}
try {
// 解析 myid 文件中的 id,并赋值给 serverId
serverId = Long.parseLong(myIdString);
MDC.put("myid", myIdString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("serverid " + myIdString
+ " is not a number");
}
}

过期快照删除

  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
// snapRetainCount = 3,最少保留的快照数量
// purgeInterval = 0,默认值为 0,表示默认关闭快照清除任务
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}
  • DatadirCleanupManager.start() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// 默认情况 purgeInterval 的值为 0,也就是删除任务默认会关闭,直接返回
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}

// 创建一个定时器
timer = new Timer("PurgeTask", true);
// 创建一个清理快照的任务
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
// 如果 purgeInterval 的值设置为 1,则表示 1 小时检查一次是否有快照过期,有则删除快照
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

purgeTaskStatus = PurgeTaskStatus.STARTED;
}
  • PurgeTask 任务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static class PurgeTask extends TimerTask {
private File logsDir;
private File snapsDir;
private int snapRetainCount;

public PurgeTask(File dataDir, File snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}

@Override
public void run() {
LOG.info("Purge task started.");
try {
// 清理过期快照
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
}
  • PurgeTxnLog.purge() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}

FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

// 获取最近的快照
List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
// 清理旧快照
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}

初始化通信组件

I/O 模型默认为 NIO 通信

  • QuorumPeerMain.initializeAndRun() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
// 管理 ZK 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数,包括 zoo.cfg 和 myid 文件
config.parse(args[0]);
}

// 启动定时任务,对过期的快照执行删除(该功能默认关闭)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
// 集群模式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式启动
ZooKeeperServerMain.main(args);
}
}
  • QuorumPeerMain.runFromConfig() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

// 通信组件初始化,默认是使用 NIO 通信(可以支持 Netty)
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}

if (config.getSecureClientPortAddress() != null) {
// 初始化 NIO 服务端的 Socket,绑定 2181 端口
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}

// 将解析到的参数赋值给该 ZK 集群节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
// 管理 ZK 数据的存储
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
// 管理 ZK 的网络通信
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

// 启动 ZK 集群节点
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
  • ServerCnxnFactory.createFactory() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";

static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}

zookeeper-docs 模块的 zookeeperAdmin.md 文件中,有以下这么一段说明:

1
2
3
4
5
serverCnxnFactory :
(Java system property: zookeeper.serverCnxnFactory)
Specifies ServerCnxnFactory implementation.
This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
Default is `NIOServerCnxnFactory`.

初始化 NIO 服务端的 Socket

提示

这里初始化 NIO 服务端的 Socket 时,ZooKeeper 并未启动。

  • NIOServerCnxnFactory.configure() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();

maxClientCnxns = maxcc;
sessionlessCnxnTimeout = Integer.getInteger(
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
cnxnExpiryQueue =
new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();

int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores/2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}

numWorkerThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

LOG.info("Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout/1000) + "s sessionless connection"
+ " timeout, " + numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." :
("" + (directBufferBytes/1024) + " kB direct buffers.")));
for(int i=0; i<numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}

// 初始化 NIO 服务端的 Socket,绑定 2181 端口,可以接收客户端请求
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
// 绑定 2181 端口
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

ZK 服务端加载数据源码分析

冷启动数据恢复快照数据