大纲
前言
学习资源
服务器动态上下线案例
本节将演示如何使用 ZooKeeper 实现服务器动态上下线的监听,即类似 Eureka、Nacos 的服务注册与发现功能。
版本说明
组件 | 版本 | 说明 |
---|
Zookeeper 服务器 | 3.5.7 | |
Zookeeper 客户端 | 3.6.3 | |
需求分析
在某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
代码实现
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.3</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.17.1</version> </dependency>
|
1 2 3 4 5 6 7 8
| log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper;
public class DistributeServer {
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";
private static final int SESSION_TIMEOUT = 2000;
private static final String ROOT_PATH = "/servers";
private ZooKeeper client;
private void init() throws Exception { if (client == null) { client = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) {
} }); } }
private void register(String hostname) throws Exception { client.create(ROOT_PATH + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("===> " + hostname + " is online."); }
private void process() throws Exception { Thread.sleep(Integer.MAX_VALUE); }
private void close() throws Exception { if (client != null) { client.close(); } }
public static void main(String[] args) throws Exception { DistributeServer distributeServer = new DistributeServer(); distributeServer.init(); distributeServer.register("192.168.1.103"); distributeServer.process(); distributeServer.close(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;
import java.util.ArrayList; import java.util.List;
public class DistributeClient {
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";
private static final int SESSION_TIMEOUT = 2000;
private static final String ROOT_PATH = "/servers";
private ZooKeeper client;
private void init() throws Exception { if (client == null) { client = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("===> " + event.getType() + " -- " + event.getPath()); try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } }
private void getServerList() throws Exception { List<String> resultList = new ArrayList<>();
List<String> children = client.getChildren("/servers", true); for (String child : children) { byte[] data = client.getData(ROOT_PATH + "/" + child, false, null); if (data != null) { resultList.add(new String(data)); } }
resultList.forEach(System.out::println); }
private void process() throws Exception { Thread.sleep(Integer.MAX_VALUE); }
private void close() throws Exception { if (client != null) { client.close(); } }
public static void main(String[] args) throws Exception { DistributeClient distributeClient = new DistributeClient(); distributeClient.init(); distributeClient.getServerList(); distributeClient.process(); distributeClient.close(); }
}
|
代码下载
- 完整的案例代码可以直接从 GitHub 下载对应章节
zookeeper-lesson-02
。
原生 API 实现分布式锁案例
本节将介绍如何使用 ZooKeeper 原生的 Java API 来实现分布式锁。
概念介绍
什么是分布式锁呢?比如说 “进程 1” 在使用该资源的时候,会先去获得锁,” 进程 1” 获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源。” 进程 1” 用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,就可以保证分布式系统中多个进程能够有序地访问该临界资源(共享资源)。通常将分布式环境下的这个锁叫作分布式锁。
底层原理
- (1) 客户端往 ZooKeeper 发送请求后,在
/locks
节点下创建一个临时顺序节点。 - (2) 客户端判断自己是不是
/locks
节点下序号最小的节点,如果是最小,则获取到锁;如果不是最小,则对前一个节点进行监听。 - (3) 当客户端获取到锁,处理完业务后,通过删除自己创建的节点来释放锁,接着后面的节点将收到通知,重复第 2 步判断。
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.3</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.17.1</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.8.2</version> </dependency>
|
1 2 3 4 5 6 7 8
| log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper;
import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;
public class DistributeLock {
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";
private static final int SESSION_TIMEOUT = 2000;
private static final String ROOT_PATH = "/locks";
private static final String PREFIX_CHILD_PATH = "seq-";
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch watchPreNodeLatch = new CountDownLatch(1);
private ZooKeeper client;
private String currentChildNodePath;
private String preChildNodePath;
public DistributeLock() throws Exception { client = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); }
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(preChildNodePath)) { watchPreNodeLatch.countDown(); } } });
connectLatch.await();
if (client.exists(ROOT_PATH, false) == null) { client.create(ROOT_PATH, "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } }
public void lock() throws Exception { currentChildNodePath = client.create(ROOT_PATH + "/" + PREFIX_CHILD_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = client.getChildren(ROOT_PATH, false);
if (children.size() == 1) { return; }
Collections.sort(children);
String currentChildNodeName = currentChildNodePath.substring((ROOT_PATH + "/").length());
int currentChildNodeIndex = children.indexOf(currentChildNodeName); if (currentChildNodeIndex == -1) { throw new RuntimeException("Current child node not in children list."); } else if (currentChildNodeIndex == 0) { return; } else { preChildNodePath = ROOT_PATH + "/" + children.get(currentChildNodeIndex - 1); client.getData(preChildNodePath, true, null); watchPreNodeLatch.await(); return; } }
public void unlock() { try { client.delete(currentChildNodePath, -1); } catch (Exception e) { e.printStackTrace(); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class DistributeLockTest {
public static void main(String[] args) throws Exception { DistributeLock lock1 = new DistributeLock(); DistributeLock lock2 = new DistributeLock();
new Thread(new Runnable() { @Override public void run() { try { lock1.lock(); System.out.println("Thread 1 获得锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { lock1.unlock(); System.out.println("Thread 1 释放锁"); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.lock(); System.out.println("Thread 2 获得锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { lock2.unlock(); System.out.println("Thread 2 释放锁"); } } }).start(); }
}
|
程序执行后的输出结果:
1 2 3 4
| Thread 1 获得锁 Thread 1 释放锁 Thread 2 获得锁 Thread 2 释放锁
|
代码下载
- 完整的案例代码可以直接从 GitHub 下载对应章节
zookeeper-lesson-03
。
Curator 框架实现分布式锁案例
本节将介绍如何使用 Curator 框架来实现分布式锁,Curator 的详细介绍请看 这里。
需求分析
使用 ZooKeeper 的原生 Java API 实现分布式锁时存在以下问题,因此强烈建议使用 Curator 来替代。
代码复杂性高
- 使用 ZooKeeper 的原生 API 实现分布式锁需要手动处理锁的创建、竞争、释放,以及会话失效等场景。
- 涉及大量重复性代码(如监听器的注册和事件处理),增加开发和维护成本。
会话超时处理复杂
- ZooKeeper 会话超时后,锁可能仍然存在,其他客户端无法获取锁,导致锁的 “僵尸” 问题。
- 需要额外实现超时检测和清理机制,这在 ZooKeeper 的原生 API 中实现较为复杂。
锁重入支持困难
- ZooKeeper 的原生 API 不直接支持重入锁功能。
- 如果需要重入锁,必须自己实现额外的机制来追踪同一客户端对同一锁的多次加锁请求。
监听事件的实现繁琐
- ZooKeeper 的原生 API 提供了
Watcher
机制,但需要开发者自行编写逻辑来处理节点事件通知(如 NodeDeleted
)。 - 容易出现遗漏或处理不当的情况,导致锁状态不一致。
锁竞争效率低
- 使用 ZooKeeper 的原生 API 时,锁竞争通常是基于顺序节点的,客户端需要轮询或监听前序节点的删除事件。
- 如果监听实现不当(例如频繁重连或轮询),会对 ZooKeeper 集群造成较大的压力。
容错性差
- ZooKeeper 的原生 API 不会提供高层次的容错机制,比如自动重试或连接恢复。
- 在网络异常或 ZooKeeper 集群变更时,容易导致分布式锁不可用或误释放。
缺乏高级功能
- ZooKeeper 的原生 API 不提供诸如锁超时、锁泄漏检测等高级功能,必须由开发者自行实现。
- 开发者需要手动处理租约机制,以避免因客户端长时间离线导致的锁占用问题。
测试和调试困难
- 原生实现涉及底层细节较多,测试分布式锁的边界情况(如会话失效、网络分区等)较为复杂。
- 一旦出现问题,排查和定位也更加困难。
代码实现
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.4.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.4.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>5.4.0</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.8.2</version> </dependency>
|
日志配置
1 2 3 4 5 6 7 8
| log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
|
代码实现一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;
public class DistributeLockTest {
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";
private static final int SESSION_TIMEOUT = 2000;
private static final String ROOT_PATH = "/locks";
public CuratorFramework getCuratorFramework() { ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .connectionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(backoffRetry) .build();
client.start();
return client; }
@Test public void lockTest() throws InterruptedException { InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH); InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("Thread 1 获得锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock1.release(); System.out.println("Thread 1 释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("Thread 2 获得锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock2.release(); System.out.println("Thread 2 释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
Thread.sleep(Integer.MAX_VALUE);
}
}
|
程序执行后的输出结果:
1 2 3 4
| Thread 2 获得锁 Thread 2 释放锁 Thread 1 获得锁 Thread 1 释放锁
|
代码实现二
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;
public class DistributeLockTest {
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";
private static final int SESSION_TIMEOUT = 2000;
private static final String ROOT_PATH = "/locks";
public CuratorFramework getCuratorFramework() { ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .connectionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(backoffRetry) .build();
client.start();
return client; }
@Test public void lockTest() throws InterruptedException { InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH); InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("Thread 1 获得锁");
lock1.acquire(); System.out.println("Thread 1 再次获得锁");
Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock1.release(); System.out.println("Thread 1 释放锁");
lock1.release(); System.out.println("Thread 1 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("Thread 2 获得锁");
lock2.acquire(); System.out.println("Thread 2 再次获得锁");
Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock2.release(); System.out.println("Thread 2 释放锁");
lock2.release(); System.out.println("Thread 2 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
Thread.sleep(Integer.MAX_VALUE);
}
}
|
程序执行后的输出结果:
1 2 3 4 5 6 7 8
| Thread 1 获得锁 Thread 1 再次获得锁 Thread 1 释放锁 Thread 1 再次释放锁 Thread 2 获得锁 Thread 2 再次获得锁 Thread 2 释放锁 Thread 2 再次释放锁
|
特别注意
- 对于可重入锁,无论是 Java 提供的 ReentrantLock,还是 Curator 实现的分布式锁,获取多少次锁就必须释放多少次锁。
代码下载
- 完整的案例代码可以直接从 GitHub 下载对应章节
zookeeper-lesson-04
。