前言
学习资源
Curator 介绍
Curator 是一个基于 ZooKeeper 的高层次客户端框架,简化了分布式应用程序与 ZooKeeper 的交互。它封装了 ZooKeeper 的原生 Java API,提供了更高的抽象级别和丰富的功能组件,特别适合实现分布式协调任务。
Curator 的特点
高效简化操作
- 封装了 ZooKeeper 的低级 API,减少了开发复杂性。
- 提供了简洁的 API 和直观的编程模型,降低了使用门槛。
丰富的分布式工具
- 提供开箱即用的分布式协调组件,例如分布式锁、领导选举、队列、计数器等。
自动化重试机制
- 内置连接管理和断线重连功能,支持可配置的重试策略(如指数退避重试),提高了容错性。
监听机制优化
- 更高效的事件监听和通知机制,避免了原生 Watcher 的复杂性和使用限制。
会话和节点管理
- 处理会话超时问题,提供自动化的临时节点管理,避免锁 “僵尸化” 问题。
模块化设计
- 分为核心模块和扩展模块(如 Curator Recipes),开发者可以按需使用。
社区支持和文档完善
- 拥有丰富的文档、教程以及社区支持,使其易于学习和使用。
Curator 的优点
简化开发
- 减少了直接使用原生 ZooKeeper API 的样板代码和复杂性。
高可靠性
- 内置重试机制和连接管理,提升分布式系统的容错性和稳定性。
功能丰富
生产级支持
灵活性高
Curator 的核心模块
Curator Client
- 功能:
- 这是 Curator 的基础模块,封装了 ZooKeeper 的原生 API,简化了与 ZooKeeper 的交互。
- 特性:
- 提供连接管理,包括自动重连和会话恢复。
- 内置可配置的重试机制(如指数退避、固定次数重试)。
- 提供异步和同步操作支持。
- 用途:
- 开发者可以使用 Curator Client 进行节点的基本 CRUD 操作(创建、读取、更新、删除),无需直接使用低级别的 ZooKeeper 原生 Java API。
Curator Framework
- 功能:
- 在 Curator Client 的基础上提供更高级的抽象,适合开发者直接使用。
- 特性:
- 包括对 Watcher(监听器)的优化和增强。
- 提供高效的路径缓存机制(Path Cache),支持节点的递归监听。
- 用途:
- 简化对 ZooKeeper 节点的管理,同时支持数据监听等功能。
Curator Recipes
- 功能:
- 提供分布式系统常见的协调工具,是 Curator 的亮点模块。
- 组件:
- 分布式锁(Distributed Lock):支持可重入锁、读写锁等。
- 领导选举(Leader Election):实现分布式系统中的主从模式(主节点选举)。
- 分布式计数器(Distributed Counter):分布式环境下的计数器实现,类似于 Java 的
AtomicInteger
。 - 分布式队列(Distributed Queue):支持普通队列和优先级队列。
- 分布式屏障(Distributed Barrier):实现同步屏障,协调多个客户端的执行顺序。
- 用途:
Curator Discovery
- 功能:
- 特性:
- 支持动态服务注册、发现和管理。
- 提供负载均衡等功能。
- 用途:
Curator Testing
- 功能:
- 特性:
- 提供嵌入式 ZooKeeper 测试服务器。
- 支持模拟真实的 ZooKeeper 集群环境。
- 用途:
- 方便开发者在本地或 CI/CD 环境中测试 ZooKeeper 相关功能。
Curator 的适用场景
Curator 实现分布式锁
需求分析
使用 ZooKeeper 的原生 Java API 实现分布式锁时存在以下问题,因此强烈建议使用 Curator 来替代。
代码复杂性高
- 使用 ZooKeeper 的原生 API 实现分布式锁需要手动处理锁的创建、竞争、释放,以及会话失效等场景。
- 涉及大量重复性代码(如监听器的注册和事件处理),增加开发和维护成本。
会话超时处理复杂
- ZooKeeper 会话超时后,锁可能仍然存在,其他客户端无法获取锁,导致锁的 “僵尸” 问题。
- 需要额外实现超时检测和清理机制,这在 ZooKeeper 的原生 API 中实现较为复杂。
锁重入支持困难
- ZooKeeper 的原生 API 不直接支持重入锁功能。
- 如果需要重入锁,必须自己实现额外的机制来追踪同一客户端对同一锁的多次加锁请求。
监听事件的实现繁琐
- ZooKeeper 的原生 API 提供了
Watcher
机制,但需要开发者自行编写逻辑来处理节点事件通知(如 NodeDeleted
)。 - 容易出现遗漏或处理不当的情况,导致锁状态不一致。
锁竞争效率低
- 使用 ZooKeeper 的原生 API 时,锁竞争通常是基于顺序节点的,客户端需要轮询或监听前序节点的删除事件。
- 如果监听实现不当(例如频繁重连或轮询),会对 ZooKeeper 集群造成较大的压力。
容错性差
- ZooKeeper 的原生 API 不会提供高层次的容错机制,比如自动重试或连接恢复。
- 在网络异常或 ZooKeeper 集群变更时,容易导致分布式锁不可用或误释放。
缺乏高级功能
- ZooKeeper 的原生 API 不提供诸如锁超时、锁泄漏检测等高级功能,必须由开发者自行实现。
- 开发者需要手动处理租约机制,以避免因客户端长时间离线导致的锁占用问题。
测试和调试困难
- 原生实现涉及底层细节较多,测试分布式锁的边界情况(如会话失效、网络分区等)较为复杂。
- 一旦出现问题,排查和定位也更加困难。
代码实现
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.4.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.4.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>5.4.0</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.8.2</version> </dependency>
|
日志配置
1 2 3 4 5 6 7 8
| log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
|
代码实现一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;
public class DistributeLockTest {
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";
private static final int SESSION_TIMEOUT = 2000;
private static final String ROOT_PATH = "/locks";
public CuratorFramework getCuratorFramework() { ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .connectionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(backoffRetry) .build();
client.start();
return client; }
@Test public void lockTest() throws InterruptedException { InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH); InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("Thread 1 获得锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock1.release(); System.out.println("Thread 1 释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("Thread 2 获得锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock2.release(); System.out.println("Thread 2 释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
Thread.sleep(Integer.MAX_VALUE);
}
}
|
程序执行后的输出结果:
1 2 3 4
| Thread 2 获得锁 Thread 2 释放锁 Thread 1 获得锁 Thread 1 释放锁
|
代码实现二
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;
public class DistributeLockTest {
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";
private static final int SESSION_TIMEOUT = 2000;
private static final String ROOT_PATH = "/locks";
public CuratorFramework getCuratorFramework() { ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .connectionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(backoffRetry) .build();
client.start();
return client; }
@Test public void lockTest() throws InterruptedException { InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH); InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("Thread 1 获得锁");
lock1.acquire(); System.out.println("Thread 1 再次获得锁");
Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock1.release(); System.out.println("Thread 1 释放锁");
lock1.release(); System.out.println("Thread 1 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("Thread 2 获得锁");
lock2.acquire(); System.out.println("Thread 2 再次获得锁");
Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock2.release(); System.out.println("Thread 2 释放锁");
lock2.release(); System.out.println("Thread 2 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } } }).start();
Thread.sleep(Integer.MAX_VALUE);
}
}
|
程序执行后的输出结果:
1 2 3 4 5 6 7 8
| Thread 1 获得锁 Thread 1 再次获得锁 Thread 1 释放锁 Thread 1 再次释放锁 Thread 2 获得锁 Thread 2 再次获得锁 Thread 2 释放锁 Thread 2 再次释放锁
|
特别注意
- 对于可重入锁,无论是 Java 提供的 ReentrantLock,还是 Curator 实现的分布式锁,获取多少次锁就必须释放多少次锁。
代码下载
- 完整的案例代码可以直接从 GitHub 下载对应章节
zookeeper-lesson-04
。
预览: