ZooKeeper 入门教程之六

大纲

前言

学习资源

版本说明

软件版本说明
ZooKeeper 的源码3.5.7本文给出的 ZooKeeper 源码分析,都是基于 ZooKeeper 的 3.5.7 版本进行讲解。

Leader 和 Follower 状态同步源码分析

Leader 与 Follower 的状态同步细节

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出来的 Leader 更新自己的状态为 Leader,其他节点更新自己的状态为 Follower。

  • Leader 更新状态的入口:leader.lead()
  • Follower 更新状态的入口:follower.followerLeader()

特别注意的事项:

  • (1) Follower 必须要让 Leader 知道自己的状态,包括:epoch、zxid、sid

    • 必须找出谁是 Leader
    • 发起请求去连接 Leader
    • 发送自己的信息给 Leader
    • Leader 接收到信息后,必须返回对应的信息给 Follower
  • (2) 当 Leader 得知 Follower 的状态后,就可以确定需要做何种方式的数据同步,包括:DIFF、TRUNC、SNAP

  • (3) Leader 和 Follower 之间执行数据同步

  • (4) 当 Leader 接收到超过半数 Follower 的 ACK 之后,进入正常工作状态,此时集群启动已完成。

同步策略的总结:

  • (1) DIFF(差异化同步策略),咱两一样,不需要做什么
  • (2) TRUNC(回滚同步策略),Follower 的 zxid 比 Leader 的 zxid 大,所以 Follower 要执行回滚
  • (3) COMMIT,Leader 的 zxid 比 Follower 的 zxid 大,发送 Proposal(提案)给 Foloower 执行提交
  • (4) SNAP(全量同步策略),如果 Follower 并没有任何数据,直接将数据全部序列给 Follower

Leader 等待接收 Follower 的状态同步申请

  • Leader.lead() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;

zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

try {
self.tick.set(0);
// 恢复数据到内存,在 ZK 启动时,其实已经加载过数据了
zk.loadData();

leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

// Start thread that waits for connection requests from
// new followers.
// 等待其他 Follower 节点向 Leader 节点发送同步状态
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();

long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

synchronized(this){
lastProposed = zk.getZxid();
}

newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);


if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}

......

} finally {
zk.unregisterJMX(this);
}
}
  • LearnerCnxAcceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
private volatile boolean stop = false;

public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
.getZooKeeperServerListener());
}

@Override
public void run() {
try {
while (!stop) {
Socket s = null;
boolean error = false;
try {
// 等待接收 Follower 节点的状态同步申请
s = ss.accept();

// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);

BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
// 一旦接收到 Follower 的请求,就创建 LearnerHandler 对象来处理请求
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
// 启动线程
fh.start();
} catch (SocketException e) {
......
}

......

}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e.getMessage());
handleException(this.getName(), e);
}
}

public void halt() {
stop = true;
}

}
  • Leader.Leader() 方法,上述的 ss 的初始化是在创建 Leader 对象时进行的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private final ServerSocket ss;

Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();
try {
if (self.shouldUsePortUnification() || self.isSslQuorum()) {
boolean allowInsecureConnection = self.shouldUsePortUnification();
if (self.getQuorumListenOnAllIPs()) {
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());
} else {
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
}
} else {
if (self.getQuorumListenOnAllIPs()) {
ss = new ServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new ServerSocket();
}
}
ss.setReuseAddress(true);
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
} else {
LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
}
throw e;
}
this.zk = zk;
this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}

Follower 查找并连接 Leader

  • Follower.followLeader() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
// Follower 查找 Leader
QuorumServer leaderServer = findLeader();
try {
// Follower 连接 Leader
connectToLeader(leaderServer.addr, leaderServer.hostname);
// Follower 向 Leader 注册
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}

// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
  • Follower.findLeader() 方法,Follower 查找 Leader。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
// 选举投票的时候记录的,最后推荐的 Leader 的 sid
Vote current = self.getCurrentVote();
// 如果这个 sid 在启动的所有集群节点范围内
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
// 尝试连接 Leader 的 IP 地址
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}
  • Learner.connectToLeader() 方法,Follower 连接 Leader。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();

int initLimitTime = self.tickTime * self.initLimit;
int remainingInitLimitTime = initLimitTime;
long startNanoTime = nanoTime();

for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
if (remainingInitLimitTime <= 0) {
LOG.error("initLimit exceeded on retries.");
throw new IOException("initLimit exceeded on retries.");
}

sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
if (self.isSslQuorum()) {
((SSLSocket) sock).startHandshake();
}
sock.setTcpNoDelay(nodelay);
break;
} catch (IOException e) {
remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);

if (remainingInitLimitTime <= 1000) {
LOG.error("Unexpected exception, initLimit exceeded. tries=" + tries +
", remaining init limit=" + remainingInitLimitTime +
", connecting to " + addr,e);
throw e;
} else if (tries >= 4) {
LOG.error("Unexpected exception, retries exceeded. tries=" + tries +
", remaining init limit=" + remainingInitLimitTime +
", connecting to " + addr,e);
throw e;
} else {
LOG.warn("Unexpected exception, tries=" + tries +
", remaining init limit=" + remainingInitLimitTime +
", connecting to " + addr,e);
this.sock = createSocket();
}
}
Thread.sleep(1000);
}

self.authLearner.authenticate(sock, hostname);

leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

Leader 创建 LearnerHandler 对象

  • Leader.lead() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;

zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

try {
self.tick.set(0);
// 恢复数据到内存,在 ZK 启动时,其实已经加载过数据了
zk.loadData();

leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

// Start thread that waits for connection requests from
// new followers.
// 等待其他 Follower 节点向 Leader 节点发送同步状态
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();

long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

synchronized(this){
lastProposed = zk.getZxid();
}

newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);


if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}

......

} finally {
zk.unregisterJMX(this);
}
}
  • LearnerCnxAcceptor 类,创建 LearnerHandler 对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
private volatile boolean stop = false;

public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
.getZooKeeperServerListener());
}

@Override
public void run() {
try {
while (!stop) {
Socket s = null;
boolean error = false;
try {
// 等待接收 Follower 节点的状态同步申请
s = ss.accept();

// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);

BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
// 一旦接收到 Follower 的请求,就创建 LearnerHandler 对象来处理请求
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
// 启动线程
fh.start();
} catch (SocketException e) {
......
}

......

}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e.getMessage());
handleException(this.getName(), e);
}
}

public void halt() {
stop = true;
}

}
  • LearnerHandler.run() 方法,LearnerHandler 是一个线程类,上述的 fh.start() 执行的是 LearnerHandler 中的 run() 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
@Override
public void run() {
try {
leader.addLearnerHandler(this);
// 心跳处理
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;

ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);

// 从网络中接收消息,并反序列化为 Packet
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
// 选举结束后,Observer 和 Follower 都应该给 Leader 发送一个标志信息:FOLLOWERINFO 或者 OBSERVERINFO
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}

byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}

if (leader.self.getView().containsKey(this.sid)) {
LOG.info("Follower sid: " + this.sid + " : info : "
+ leader.self.getView().get(this.sid).toString());
} else {
LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
}

if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}

// 读取 Follower 发送过来的 lastAcceptedEpoch
// 选举过程中,所使用的 epoch(任期代号),其实还是上一任 Leader 的 epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

long peerLastZxid;
StateSummary ss = null;

// 读取 Follower 发送过来的 zxid
long zxid = qp.getZxid();

// Leader 根据从 Follower 获取到的 sid 和旧的 epoch,构建新的 epoch
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
// Leader 向 Follower 发送信息,包含:zxid 和 newEpoch
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();

// Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);

/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
LearnerSnapshot snapshot =
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();

LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshots, "
+ "snapshot was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
snapshot.getConcurrentSnapshotNumber(),
snapshot.isEssential() ? "exempt" : "not exempt");
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
snapshot.close();
}
}

LOG.debug("Sending NEWLEADER message to " + sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
.toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();

// Start thread that blast packets in the queue to learner
startSendingPackets();

/*
* Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK,"
+ " but received packet: {}", packetToString(qp));
return;
}

if(LOG.isDebugEnabled()){
LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());

syncLimitCheck.start();

// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
LOG.debug("Sending UPTODATE message to " + sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");

long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


ByteBuffer bb;
long sessionId;
int cxid;
int type;

switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
leader.zk.touch(sess, to);
}
break;
case Leader.REVALIDATE:
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
if (valid) {
try {
//set the session owner
// as the follower that
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) +
" expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: "+ valid);
}
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
queuedPackets.add(qp);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitLearnerRequest(si);
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock "
+ "still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch(IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
} catch (SnapshotThrottleException e) {
LOG.error("too many concurrent snapshots: " + e);
} finally {
LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
+ " ********");
shutdown();
}
}

Follower 向 Leader 注册

  • Follower.followLeader() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
// Follower 查找 Leader
QuorumServer leaderServer = findLeader();
try {
// Follower 连接 Leader
connectToLeader(leaderServer.addr, leaderServer.hostname);
// Follower 向 Leader 注册
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
// 循环等待接收消息
while (this.isRunning()) {
// 读取 Packet 消息
readPacket(qp);
// 处理 Packet 消息
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}

// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
  • Learner.registerWithLeader() 方法,Follower 向 Leader 注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
protected long registerWithLeader(int pktType) throws IOException{
/*
* Send follower info, including last zxid and sid
*/
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());

// 发送 Follower 的信息(FollowerInfo)给 Leader
writePacket(qp, true);

// 读取 Leader 返回的结果(LeaderInfo)
readPacket(qp);

final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
// 如果接收到 Leader 返回的结果
if (qp.getType() == Leader.LEADERINFO) {
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
// 接收 Leader 的 epoch
if (newEpoch > self.getAcceptedEpoch()) {
// 将自己原来的 epoch 保存在 wrappedEpochBytes 里面
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
// 将 Leader 发送过来的 epoch 保存起来
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
// since we have already acked an epoch equal to the leaders, we cannot ack
// again, but we still need to send our lastZxid to the leader so that we can
// sync with it if it does assume leadership of the epoch.
// the -1 indicates that this reply should not count as an ack for the new epoch
wrappedEpochBytes.putInt(-1);
} else {
throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}
// 发送 ackepoch 给 Leader,包含了 Follwoer 自己的 epoch 和 zxid
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
return qp.getZxid();
}
}

Leader 接收 Follower 的状态信息,根据同步方式发送同步消息

  • LearnerHandler.run() 方法,处理 Follower 的请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@Override
public void run() {
try {
leader.addLearnerHandler(this);
// 心跳处理
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;

ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);

// 从网络中接收消息,并反序列化为 Packet
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
// 选举结束后,Observer 和 Follower 都应该给 Leader 发送一个标志信息:FOLLOWERINFO 或者 OBSERVERINFO
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}

byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}

if (leader.self.getView().containsKey(this.sid)) {
LOG.info("Follower sid: " + this.sid + " : info : "
+ leader.self.getView().get(this.sid).toString());
} else {
LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
}

if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}

// 读取 Follower 发送过来的 lastAcceptedEpoch
// 选举过程中,所使用的 epoch(任期代号),其实还是上一任 Leader 的 epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

long peerLastZxid;
StateSummary ss = null;

// 读取 Follower 发送过来的 zxid
long zxid = qp.getZxid();

// Leader 根据从 Follower 获取到的 sid 和旧的 epoch,构建新的 epoch
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
// Leader 向 Follower 发送信息,包含:zxid 和 newEpoch
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
// 接收到 Follower 应答的 ackepoch
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
// 保存了对方 Follower 或者 Observer 的状态信息,包括:epoch 和 zxid
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();

// Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
// 判断 Leader 和 Follower 之间是否需要进行同步
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);

/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
LearnerSnapshot snapshot =
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();

LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshots, "
+ "snapshot was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
snapshot.getConcurrentSnapshotNumber(),
snapshot.isEssential() ? "exempt" : "not exempt");
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
snapshot.close();
}
}

LOG.debug("Sending NEWLEADER message to " + sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
.toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();

// Start thread that blast packets in the queue to learner
startSendingPackets();

......

} catch (IOException e) {
......
} finally {
......
}
}
  • LearnerHandler.syncFollower() 方法,判断 Leader 和 Follower 之间是否需要进行同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
/*
* When leader election is completed, the leader will set its
* lastProcessedZxid to be (epoch < 32). There will be no txn associated
* with this zxid.
*
* The learner will set its lastProcessedZxid to the same value if
* it get DIFF or SNAP from the leader. If the same learner come
* back to sync with leader using this zxid, we will never find this
* zxid in our history. In this case, we will ignore TRUNC logic and
* always send DIFF if we have old enough history
*/
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
// Keep track of the latest zxid which already queued
long currentZxid = peerLastZxid;
boolean needSnap = true;
boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
ReentrantReadWriteLock lock = db.getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
long maxCommittedLog = db.getmaxCommittedLog();
long minCommittedLog = db.getminCommittedLog();
long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ " peerLastZxid=0x{}", getSid(),
Long.toHexString(maxCommittedLog),
Long.toHexString(minCommittedLog),
Long.toHexString(lastProcessedZxid),
Long.toHexString(peerLastZxid));

if (db.getCommittedLog().isEmpty()) {
/*
* It is possible that committedLog is empty. In that case
* setting these value to the latest txn in leader db
* will reduce the case that we need to handle
*
* Here is how each case handle by the if block below
* 1. lastProcessZxid == peerZxid -> Handle by (2)
* 2. lastProcessZxid < peerZxid -> Handle by (3)
* 3. lastProcessZxid > peerZxid -> Handle by (5)
*/
minCommittedLog = lastProcessedZxid;
maxCommittedLog = lastProcessedZxid;
}

/*
* Here are the cases that we want to handle
*
* 1. Force sending snapshot (for testing purpose)
* 2. Peer and leader is already sync, send empty diff
* 3. Follower has txn that we haven't seen. This may be old leader
* so we need to send TRUNC. However, if peer has newEpochZxid,
* we cannot send TRUNC since the follower has no txnlog
* 4. Follower is within committedLog range or already in-sync.
* We may need to send DIFF or TRUNC depending on follower's zxid
* We always send empty DIFF if follower is already in-sync
* 5. Follower missed the committedLog. We will try to use on-disk
* txnlog + committedLog to sync with follower. If that fail,
* we will send snapshot
*/

if (forceSnapSync) {
// Force leader to use snapshot to sync with follower
LOG.warn("Forcing snapshot sync - should not see this in production");
} else if (lastProcessedZxid == peerLastZxid) {
// Follower is already sync with us, send empty diff
LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, peerLastZxid);
needOpPacket = false;
needSnap = false;
} else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
// Newer than committedLog, send trunc and done
LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
Long.toHexString(maxCommittedLog) +
" for peer sid:" + getSid());
queueOpPacket(Leader.TRUNC, maxCommittedLog);
currentZxid = maxCommittedLog;
needOpPacket = false;
needSnap = false;
} else if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
// Follower is within commitLog range
LOG.info("Using committedLog for peer sid: " + getSid());
Iterator<Proposal> itr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(itr, peerLastZxid,
null, maxCommittedLog);
needSnap = false;
} else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
// Use txnlog and committedLog to sync

// Calculate sizeLimit that we allow to retrieve txnlog from disk
long sizeLimit = db.calculateTxnLogSizeLimit();
// This method can return empty iterator if the requested zxid
// is older than on-disk txnlog
Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
peerLastZxid, sizeLimit);
if (txnLogItr.hasNext()) {
LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
minCommittedLog, maxCommittedLog);

LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
null, maxCommittedLog);
needSnap = false;
}
// closing the resources
if (txnLogItr instanceof TxnLogProposalIterator) {
TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
txnProposalItr.close();
}
} else {
LOG.warn("Unhandled scenario for peer sid: " + getSid());
}
LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
" for peer sid: " + getSid());
leaderLastZxid = leader.startForwarding(this, currentZxid);
} finally {
rl.unlock();
}

if (needOpPacket && !needSnap) {
// This should never happen, but we should fall back to sending
// snapshot just in case.
LOG.error("Unhandled scenario for peer sid: " + getSid() +
" fall back to use snapshot");
needSnap = true;
}

return needSnap;
}

Follower 应答 Leader 同步结果

  • Follower.processPacket() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();

if (hdr.getType() == OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn) txn;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}

fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;

case Leader.COMMITANDACTIVATE:
// get the new configuration from the request
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));

// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
// commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(qp.getZxid());
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
fzk.sync();
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}
  • FollowerZooKeeperServer.commit() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
commitProcessor.commit(request);
}

synchronized public void sync(){
if(pendingSyncs.size() ==0){
LOG.warn("Not expecting a sync.");
return;
}

Request r = pendingSyncs.remove();
commitProcessor.commit(r);
}

Leader 应答 Follower

  • LearnerHandler.run() 方法,处理 Follower 的请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void run()
{
try
{

......

LOG.debug("Sending UPTODATE message to " + sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while(true) {
......
}
} catch(IOException e) {
......
} finally {
......
}
}

ZK 服务端 Leader 启动源码分析

  • Leader.lead() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void lead() throws IOException, InterruptedException {

......

waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);

try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ newLeaderProposal.ackSetsToString() + " ]");
HashSet<Long> followerSet = new HashSet<Long>();

for(LearnerHandler f : getLearners()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
followerSet.add(f.getSid());
}
}
boolean initTicksShouldBeIncreased = true;
for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
initTicksShouldBeIncreased = false;
break;
}
}
if (initTicksShouldBeIncreased) {
LOG.warn("Enough followers present. "+
"Perhaps the initTicks need to be increased.");
}
return;
}

// 启动 ZK 服务端
startZkServer();

......

}
  • Leader.startZkServer() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
final LeaderZooKeeperServer zk;

private synchronized void startZkServer() {
// Update lastCommitted and Db's zxid to a value representing the new epoch
lastCommitted = zk.getZxid();
LOG.info("Have quorum of supporters, sids: [ "
+ newLeaderProposal.ackSetsToString()
+ " ]; starting up and setting last processed zxid: 0x{}",
Long.toHexString(zk.getZxid()));

/*
* ZOOKEEPER-1324. the leader sends the new config it must complete
* to others inside a NEWLEADER message (see LearnerHandler where
* the NEWLEADER message is constructed), and once it has enough
* acks we must execute the following code so that it applies the
* config to itself.
*/
QuorumVerifier newQV = self.getLastSeenQuorumVerifier();

Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());

self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
if (designatedLeader != self.getId()) {
allowedToCommit = false;
}

// 启动 ZK 服务端
zk.startup();

/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
* send leader election notifications to the ensemble.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
self.updateElectionVote(getEpoch());

zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}
  • LeaderZooKeeperServer.startup() 方法
1
2
3
4
5
6
7
8
@Override
public synchronized void startup() {
// 启动 ZK 服务端
super.startup();
if (containerManager != null) {
containerManager.start();
}
}
  • ZooKeeperServer.startup() 方法
1
2
3
4
5
6
7
8
9
10
11
12
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();

registerJMX();

setState(State.RUNNING);
notifyAll();
}
  • ZooKeeperServer.setupRequestProcessors() 方法
1
2
3
4
5
6
7
8
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
  • PrepRequestProcessor.run(),PrepRequestProcessor 是一个线程类,核心逻辑在 run() 方法中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {

@Override
public void run() {
try {
while (true) {
// 接收各种请求
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
// 处理各种请求
pRequest(request);
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}

}
  • PrepRequestProcessor.pRequest() 方法,处理各种请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);

try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch(IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;

//Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

for(Op op: multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn;

/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
}

/* Prep the request and convert to a Txn */
else {
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());

if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting" +
" remaining multi ops. Error Path:{} Error:{}",
request.toString(), e.getPath(), e.getMessage());
}

request.setException(e);

/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}

//FIXME: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

txns.add(new Txn(type, bb.array()));
}

request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns));

break;

//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request,
null, true);
}
break;

//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.checkWatches:
case OpCode.removeWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}

if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(), e.getPath(), e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process " + request, e);

StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if(bb != null){
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
}

LOG.error("Dumping request buffer: 0x" + sb.toString());
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}

ZK 服务端 Follower 启动源码分析

  • Follower.followLeader() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
// Follower 查找 Leader
QuorumServer leaderServer = findLeader();
try {
// Follower 连接 Leader
connectToLeader(leaderServer.addr, leaderServer.hostname);
// Follower 向 Leader 注册
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
// 读取 Packet 消息
readPacket(qp);
// 处理 Packet 消息
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}

// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
  • Follower.readPacket() 方法,读取消息。
1
2
3
4
5
6
7
8
9
10
11
12
void readPacket(QuorumPacket pp) throws IOException {
synchronized (leaderIs) {
leaderIs.readRecord(pp, "packet");
}
if (LOG.isTraceEnabled()) {
final long traceMask =
(pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
: ZooTrace.SERVER_PACKET_TRACE_MASK;

ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
}
}
  • Follower.processPacket() 方法,处理消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();

if (hdr.getType() == OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn) txn;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}

fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;

case Leader.COMMITANDACTIVATE:
// get the new configuration from the request
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));

// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
// commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(qp.getZxid());
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
fzk.sync();
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}

ZK 客户端启动源码分析

ZK 客户端启动脚本

ZK 客户端启动的脚本是 ZkCli.sh,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"

if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh
fi

ZOO_LOG_FILE=zookeeper-$USER-cli-$HOSTNAME.log

"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.ZooKeeperMain "$@"

ZkCli.sh 脚本启动 Zookeeper 时,会调用 ZooKeeperMain 类,所以 ZooKeeper 客户端的启动入口是 ZooKeeperMain 类。

ZK 客户端启动入口

  • ZooKeeperMain 类,ZK 客户端的启动入口。
1
2
3
4
5
6
7
8
9
10
11
12
public class ZooKeeperMain {

public static void main(String args[]) throws CliException, IOException, InterruptedException
{
// 创建 ZooKeeperMain 对象
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}

......

}

创建 ZookeeperMain 对象

  • ZooKeeperMain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ZooKeeperMain {

public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
System.out.println("Connecting to " + cl.getOption("server"));
// 连接 ZK 服务端
connectToZK(cl.getOption("server"));
}

protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (zk != null && zk.getState().isAlive()) {
zk.close();
}

host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
if (cl.getOption("secure") != null) {
System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
System.out.println("Secure connection is enabled");
}
// 创建 ZooKeeperAdmin 对象
zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
}

......

}
  • ZooKeeperAdmin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ZooKeeperAdmin extends ZooKeeper {

public ZooKeeperAdmin(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
super(connectString, sessionTimeout, watcher, canBeReadOnly);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly,
createDefaultHostProvider(connectString));
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly,
aHostProvider, null);
}

......

}

初始化监听器

  • ZooKeeperAdmin.ZooKeeperAdmin() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
// 赋值 watcher 给默认的 defaultWatcher
watchManager.defaultWatcher = watcher;
// 解析客户端的连接地址
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;

// 创建客户端与服务器通信的连接
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}

解析连接地址

  • ConnectStringParser.ConnectStringParser() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ConnectStringParser(String connectString) {
// parse out chroot, if any
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}

// 多个集群节点使用逗号分割,比如:zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
List<String> hostsList = split(connectString,",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
  • InetSocketAddress
1
2
3
4
5
6
7
8
9
10
11
12
13
public class InetSocketAddress extends SocketAddress {

// Private implementation class pointed to by all public methods.
private static class InetSocketAddressHolder {
// The hostname of the Socket Address,主机名称
private String hostname;
// The IP address of the Socket Address,通信地址
private InetAddress addr;
// The port number of the Socket Address,端口号
private int port;
}

}

创建通信连接

第一部分代码

  • ZooKeeperAdmin.ZooKeeperAdmin() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
// 赋值 watcher 给默认的 defaultWatcher
watchManager.defaultWatcher = watcher;
// 解析客户端的连接地址
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;

// 创建客户端与服务器通信的连接
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
  • ZooKeeper.getClientCnxnSocket() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
// ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
String clientCnxnSocketName = getClientConfig().getProperty(
ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
// 通过反射获取 clientCxnSocket 对象
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
  • ZooKeeper.createConnection() 方法
1
2
3
4
5
6
7
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}
  • ClientCnxn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ClientCnxn {

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
throws IOException {
this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
clientCnxnSocket, 0, new byte[16], canBeReadOnly);
}

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;

// 创建两个线程
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
initRequestTimeout();
}

public void start() {
sendThread.start();
eventThread.start();
}

......

}
  • ClientCnxn.SendThread 类,由于 SendThread 是一个线程类,因此其核心逻辑在 run() 方法中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
class SendThread extends ZooKeeperThread {

SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}

@Override
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// 循环发送/接收消息
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
// 开始连接服务端
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}

if (state.isConnected()) {
......
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}

// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}

// 接收服务端响应,并作出相应处理
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
......
}
}
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}

}

第二部分代码

  • ClientCnxn.SendThread.startConnect() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void startConnect(InetSocketAddress addr) throws IOException {
// initializing it for new connection
saslLoginFailed = false;
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
state = States.CONNECTING;

String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
if (clientConfig.isSaslClientEnabled()) {
try {
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig),
clientConfig);
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);

// 客户端与服务端建立连接
clientCnxnSocket.connect(addr);
}
  • ClientCnxnSocketNIO.connect() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;

/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
  • ClientCnxnSocketNIO.registerAndConnect() 方法
1
2
3
4
5
6
7
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
  • ClientCnxnSocketNIO.primeConnection() 方法
1
2
3
4
5
6
7
8
9
10
11
void primeConnection() throws IOException {
LOG.info("Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());

// 标记不是第一次连接
isFirstConnect = false;

......

}

第三部分代码

  • ClientCnxnSocketNIO.doTransport() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
  • ClientCnxnSocketNIO.doIO() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
// 读取服务端的响应
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress());

if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}

执行 ZooKeeperMain.run () 方法

  • ZooKeeperMain 类,ZK 客户端的启动入口。
1
2
3
4
5
6
7
8
9
10
11
12
public class ZooKeeperMain {

public static void main(String args[]) throws CliException, IOException, InterruptedException
{
// 创建 ZooKeeperMain 对象
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}

......

}
  • ZooKeeperMain.run() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
void run() throws CliException, IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");

boolean jlinemissing = false;
// only use jline if it's in the classpath
try {
Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
Class<?> completorC =
Class.forName("org.apache.zookeeper.JLineZNodeCompleter");

System.out.println("JLine support is enabled");

Object console =
consoleC.getConstructor().newInstance();

Object completor =
completorC.getConstructor(ZooKeeper.class).newInstance(zk);
Method addCompletor = consoleC.getMethod("addCompleter",
Class.forName("jline.console.completer.Completer"));
addCompletor.invoke(console, completor);

String line;
Method readLine = consoleC.getMethod("readLine", String.class);
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
}
} catch (ClassNotFoundException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (NoSuchMethodException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InvocationTargetException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (IllegalAccessException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InstantiationException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
}

if (jlinemissing) {
System.out.println("JLine support is disabled");
BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));

String line;
while ((line = br.readLine()) != null) {
// 一行一行地读取命令
executeLine(line);
}
}
} else {
// Command line args non-null. Run what was passed.
processCmd(cl);
}
System.exit(exitCode);
}
  • ZooKeeperMain.executeLine() 方法
1
2
3
4
5
6
7
8
9
public void executeLine(String line) throws CliException, InterruptedException, IOException {
if (!line.equals("")) {
cl.parseCommand(line);
addToHistory(commandCount,line);
// 处理客户端命令
processCmd(cl);
commandCount++;
}
}
  • ZooKeeperMain.processCmd() 方法
1
2
3
4
5
6
7
8
9
10
11
12
protected boolean processCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
boolean watch = false;
try {
// 解析客户端命令
watch = processZKCmd(co);
exitCode = 0;
} catch (CliException ex) {
exitCode = ex.getExitCode();
System.err.println(ex.getMessage());
}
return watch;
}
  • ZooKeeperMain.processZKCmd() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
throw new MalformedCommandException("No command entered");
}

if (!commandMap.containsKey(cmd)) {
usage();
throw new CommandNotFoundException("Command not found " + cmd);
}

boolean watch = false;
LOG.debug("Processing " + cmd);


if (cmd.equals("quit")) {
zk.close();
System.exit(exitCode);
} else if (cmd.equals("redo") && args.length >= 2) {
Integer i = Integer.decode(args[1]);
if (commandCount <= i || i < 0) { // don't allow redoing this redo
throw new MalformedCommandException("Command index out of range");
}
cl.parseCommand(history.get(i));
if (cl.getCommand().equals("redo")) {
throw new MalformedCommandException("No redoing redos");
}
history.put(commandCount, history.get(i));
processCmd(cl);
} else if (cmd.equals("history")) {
for (int i = commandCount - 10; i <= commandCount; ++i) {
if (i < 0) continue;
System.out.println(i + " - " + history.get(i));
}
} else if (cmd.equals("printwatches")) {
if (args.length == 1) {
System.out.println("printwatches is " + (printWatches ? "on" : "off"));
} else {
printWatches = args[1].equals("on");
}
} else if (cmd.equals("connect")) {
if (args.length >= 2) {
connectToZK(args[1]);
} else {
connectToZK(host);
}
}

// Below commands all need a live connection
if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}

// execute from commandMap
CliCommand cliCmd = commandMapCli.get(cmd);
if(cliCmd != null) {
cliCmd.setZk(zk);
watch = cliCmd.parse(args).exec();
} else if (!commandMap.containsKey(cmd)) {
usage();
}
return watch;
}