ZooKeeper 入门教程之三

大纲

前言

学习资源

服务器动态上下线案例

本节将演示如何使用 ZooKeeper 实现服务器动态上下线的监听,即类似 Eureka、Nacos 的服务注册与发现功能。

版本说明

组件版本说明
Zookeeper 服务器3.5.7
Zookeeper 客户端3.6.3

需求分析

在某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

代码实现

  • 引入 Maven 依赖
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
  • 日志文件(log4j.properties
1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 分布式服务端的 Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

public class DistributeServer {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// 根节点的路径
private static final String ROOT_PATH = "/servers";

// ZooKeeper 客户端
private ZooKeeper client;

/**
* 初始化服务器
*/
private void init() throws Exception {
if (client == null) {
client = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {

}
});
}
}

/**
* 注册服务器
*/
private void register(String hostname) throws Exception {
// 创建临时顺序节点
client.create(ROOT_PATH + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("===> " + hostname + " is online.");
}

/**
* 业务逻辑处理
*/
private void process() throws Exception {
// 模拟业务处理
Thread.sleep(Integer.MAX_VALUE);
}

/**
* 关闭服务器
*/
private void close() throws Exception {
if (client != null) {
client.close();
}
}

public static void main(String[] args) throws Exception {
DistributeServer distributeServer = new DistributeServer();
distributeServer.init();
distributeServer.register("192.168.1.103");
distributeServer.process();
distributeServer.close();
}

}
  • 分布式客户端的 Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// 根节点的路径
private static final String ROOT_PATH = "/servers";

// ZooKeeper 客户端
private ZooKeeper client;

/**
* 初始化客户端
*/
private void init() throws Exception {
if (client == null) {
client = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("===> " + event.getType() + " -- " + event.getPath());
try {
// 监听器是一次性的,当接收到监听事件后,需要重新设置监听器
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

/**
* 获取服务器列表,并监听服务列表的变化
*/
private void getServerList() throws Exception {
List<String> resultList = new ArrayList<>();

// 获取子节点,并监听子节点的变化
List<String> children = client.getChildren("/servers", true);
for (String child : children) {
// 获取子节点的数据
byte[] data = client.getData(ROOT_PATH + "/" + child, false, null);
if (data != null) {
resultList.add(new String(data));
}
}

// 打印服务器列表
resultList.forEach(System.out::println);
}

/**
* 业务逻辑处理
*/
private void process() throws Exception {
// 模拟业务处理
Thread.sleep(Integer.MAX_VALUE);
}

/**
* 关闭客户端
*/
private void close() throws Exception {
if (client != null) {
client.close();
}
}

public static void main(String[] args) throws Exception {
DistributeClient distributeClient = new DistributeClient();
distributeClient.init();
distributeClient.getServerList();
distributeClient.process();
distributeClient.close();
}

}

代码下载

  • 完整的案例代码可以直接从 GitHub 下载对应章节 zookeeper-lesson-02

原生 API 实现分布式锁案例

本节将介绍如何使用 ZooKeeper 原生的 Java API 来实现分布式锁。

概念介绍

什么是分布式锁呢?比如说 “进程 1” 在使用该资源的时候,会先去获得锁,” 进程 1” 获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源。” 进程 1” 用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,就可以保证分布式系统中多个进程能够有序地访问该临界资源(共享资源)。通常将分布式环境下的这个锁叫作分布式锁。

底层原理

  • (1) 客户端往 ZooKeeper 发送请求后,在 /locks 节点下创建一个临时顺序节点。
  • (2) 客户端判断自己是不是 /locks 节点下序号最小的节点,如果是最小,则获取到锁;如果不是最小,则对前一个节点进行监听。
  • (3) 当客户端获取到锁,处理完业务后,通过删除自己创建的节点来释放锁,接着后面的节点将收到通知,重复第 2 步判断。

代码实现

  • 引入 Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.2</version>
</dependency>
  • 日志文件(log4j.properties
1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// 根节点的路径
private static final String ROOT_PATH = "/locks";

// 子节点的路径前缀
private static final String PREFIX_CHILD_PATH = "seq-";

// 等待客户端建立连接的锁
private CountDownLatch connectLatch = new CountDownLatch(1);

// 等待监听前一个节点的锁
private CountDownLatch watchPreNodeLatch = new CountDownLatch(1);

// ZooKeeper 客户端
private ZooKeeper client;

// 当前客户端创建的子节点的路径
private String currentChildNodePath;

// 当前客户端监听的前一个节点的路径
private String preChildNodePath;

public DistributeLock() throws Exception {
// 初始化客户端
client = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 监听到客户端已建立连接
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}

// 监听到前一个节点的删除(释放锁)
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(preChildNodePath)) {
watchPreNodeLatch.countDown();
}
}
});

// 阻塞等待客户端建立连接
connectLatch.await();

// 如果根节点不存在,则创建根节点,根节点类型为永久节点
if (client.exists(ROOT_PATH, false) == null) {
client.create(ROOT_PATH, "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}

/**
* 加锁
*/
public void lock() throws Exception {
// 创建当前子节点(子节点的类型为临时顺序节点)
currentChildNodePath = client.create(ROOT_PATH + "/" + PREFIX_CHILD_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 获取所有子节点
List<String> children = client.getChildren(ROOT_PATH, false);

// 如果只有一个子节点,则当前客户端直接获得锁
if (children.size() == 1) {
return;
}

// 子节点根据序号进行排序
Collections.sort(children);

// 获取当前子节点的名称,如:seq-0000000009
String currentChildNodeName = currentChildNodePath.substring((ROOT_PATH + "/").length());

// 通过子节点名称,获取当前子节点在子节点集合中的位置
int currentChildNodeIndex = children.indexOf(currentChildNodeName);
if (currentChildNodeIndex == -1) {
// 抛出异常
throw new RuntimeException("Current child node not in children list.");
} else if (currentChildNodeIndex == 0) {
// 如果当前子节点是第一个子节点,则当前客户端直接获得锁
return;
} else {
// 如果当前子节点是第一个子节点,则监听前一个子节点
preChildNodePath = ROOT_PATH + "/" + children.get(currentChildNodeIndex - 1);
client.getData(preChildNodePath, true, null);
// 阻塞等待监听事件(进入等待锁状态)
watchPreNodeLatch.await();
return;
}
}

/**
* 释放锁
*/
public void unlock() {
// 删除子节点(如果指定的版本为 -1,则它与任何节点的版本匹配)
try {
client.delete(currentChildNodePath, -1);
} catch (Exception e) {
e.printStackTrace();
}
}

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class DistributeLockTest {

public static void main(String[] args) throws Exception {
DistributeLock lock1 = new DistributeLock();
DistributeLock lock2 = new DistributeLock();

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock1.lock();
System.out.println("Thread 1 获得锁");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock1.unlock();
System.out.println("Thread 1 释放锁");
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock2.lock();
System.out.println("Thread 2 获得锁");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock2.unlock();
System.out.println("Thread 2 释放锁");
}
}
}).start();
}

}

程序执行后的输出结果:

1
2
3
4
Thread 1 获得锁
Thread 1 释放锁
Thread 2 获得锁
Thread 2 释放锁

代码下载

  • 完整的案例代码可以直接从 GitHub 下载对应章节 zookeeper-lesson-03

Curator 框架实现分布式锁案例

本节将介绍如何使用 Curator 框架来实现分布式锁,Curator 的详细介绍请看 这里

需求分析

使用 ZooKeeper 的原生 Java API 实现分布式锁时存在以下问题,因此强烈建议使用 Curator 来替代。

  • 代码复杂性高

    • 使用 ZooKeeper 的原生 API 实现分布式锁需要手动处理锁的创建、竞争、释放,以及会话失效等场景。
    • 涉及大量重复性代码(如监听器的注册和事件处理),增加开发和维护成本。
  • 会话超时处理复杂

    • ZooKeeper 会话超时后,锁可能仍然存在,其他客户端无法获取锁,导致锁的 “僵尸” 问题。
    • 需要额外实现超时检测和清理机制,这在 ZooKeeper 的原生 API 中实现较为复杂。
  • 锁重入支持困难

    • ZooKeeper 的原生 API 不直接支持重入锁功能。
    • 如果需要重入锁,必须自己实现额外的机制来追踪同一客户端对同一锁的多次加锁请求。
  • 监听事件的实现繁琐

    • ZooKeeper 的原生 API 提供了 Watcher 机制,但需要开发者自行编写逻辑来处理节点事件通知(如 NodeDeleted)。
    • 容易出现遗漏或处理不当的情况,导致锁状态不一致。
  • 锁竞争效率低

    • 使用 ZooKeeper 的原生 API 时,锁竞争通常是基于顺序节点的,客户端需要轮询或监听前序节点的删除事件。
    • 如果监听实现不当(例如频繁重连或轮询),会对 ZooKeeper 集群造成较大的压力。
  • 容错性差

    • ZooKeeper 的原生 API 不会提供高层次的容错机制,比如自动重试或连接恢复。
    • 在网络异常或 ZooKeeper 集群变更时,容易导致分布式锁不可用或误释放。
  • 缺乏高级功能

    • ZooKeeper 的原生 API 不提供诸如锁超时、锁泄漏检测等高级功能,必须由开发者自行实现。
    • 开发者需要手动处理租约机制,以避免因客户端长时间离线导致的锁占用问题。
  • 测试和调试困难

    • 原生实现涉及底层细节较多,测试分布式锁的边界情况(如会话失效、网络分区等)较为复杂。
    • 一旦出现问题,排查和定位也更加困难。

代码实现

引入依赖

  • 引入 Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.2</version>
</dependency>

日志配置

  • 日志文件(log4j.properties
1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

代码实现一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.Test;

/**
* 使用 Curator 实现分布式锁
*/
public class DistributeLockTest {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// 根节点的路径
private static final String ROOT_PATH = "/locks";

/**
* 获取客户端
*/
public CuratorFramework getCuratorFramework() {
// 重试策略
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);

// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.connectionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(backoffRetry)
.build();

// 启动客户端
client.start();

return client;
}

@Test
public void lockTest() throws InterruptedException {
// 创建分布式锁
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock1.acquire();
System.out.println("Thread 1 获得锁");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock1.release();
System.out.println("Thread 1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock2.acquire();
System.out.println("Thread 2 获得锁");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock2.release();
System.out.println("Thread 2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

Thread.sleep(Integer.MAX_VALUE);

// TODO 释放资源(关闭 ZooKeeper 客户端)
}

}

程序执行后的输出结果:

1
2
3
4
Thread 2 获得锁
Thread 2 释放锁
Thread 1 获得锁
Thread 1 释放锁

代码实现二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.Test;

/**
* 验证 Curator 实现的分布式锁是可重入锁
*/
public class DistributeLockTest {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// 根节点的路径
private static final String ROOT_PATH = "/locks";

/**
* 获取客户端
*/
public CuratorFramework getCuratorFramework() {
// 重试策略
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);

// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.connectionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(backoffRetry)
.build();

// 启动客户端
client.start();

return client;
}

@Test
public void lockTest() throws InterruptedException {
// 创建分布式锁
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock1.acquire();
System.out.println("Thread 1 获得锁");

// 再次获取锁(测试锁重入)
lock1.acquire();
System.out.println("Thread 1 再次获得锁");

Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock1.release();
System.out.println("Thread 1 释放锁");

// 再次释放锁
lock1.release();
System.out.println("Thread 1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock2.acquire();
System.out.println("Thread 2 获得锁");

// 再次获取锁(测试锁重入)
lock2.acquire();
System.out.println("Thread 2 再次获得锁");

Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock2.release();
System.out.println("Thread 2 释放锁");

// 再次释放锁
lock2.release();
System.out.println("Thread 2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

Thread.sleep(Integer.MAX_VALUE);

// TODO 释放资源(关闭 ZooKeeper 客户端)
}

}

程序执行后的输出结果:

1
2
3
4
5
6
7
8
Thread 1 获得锁
Thread 1 再次获得锁
Thread 1 释放锁
Thread 1 再次释放锁
Thread 2 获得锁
Thread 2 再次获得锁
Thread 2 释放锁
Thread 2 再次释放锁

特别注意

  • 对于可重入锁,无论是 Java 提供的 ReentrantLock,还是 Curator 实现的分布式锁,获取多少次锁就必须释放多少次锁。

代码下载

  • 完整的案例代码可以直接从 GitHub 下载对应章节 zookeeper-lesson-04