Curator 入门使用教程

前言

学习资源

Curator 介绍

Curator 是一个基于 ZooKeeper 的高层次客户端框架,简化了分布式应用程序与 ZooKeeper 的交互。它封装了 ZooKeeper 的原生 Java API,提供了更高的抽象级别和丰富的功能组件,特别适合实现分布式协调任务。

Curator 的特点

  • 高效简化操作

    • 封装了 ZooKeeper 的低级 API,减少了开发复杂性。
    • 提供了简洁的 API 和直观的编程模型,降低了使用门槛。
  • 丰富的分布式工具

    • 提供开箱即用的分布式协调组件,例如分布式锁、领导选举、队列、计数器等。
  • 自动化重试机制

    • 内置连接管理和断线重连功能,支持可配置的重试策略(如指数退避重试),提高了容错性。
  • 监听机制优化

    • 更高效的事件监听和通知机制,避免了原生 Watcher 的复杂性和使用限制。
  • 会话和节点管理

    • 处理会话超时问题,提供自动化的临时节点管理,避免锁 “僵尸化” 问题。
  • 模块化设计

    • 分为核心模块和扩展模块(如 Curator Recipes),开发者可以按需使用。
  • 社区支持和文档完善

    • 拥有丰富的文档、教程以及社区支持,使其易于学习和使用。

Curator 的优点

  • 简化开发

    • 减少了直接使用原生 ZooKeeper API 的样板代码和复杂性。
  • 高可靠性

    • 内置重试机制和连接管理,提升分布式系统的容错性和稳定性。
  • 功能丰富

    • 提供了多种分布式协调工具,适用于不同场景。
  • 生产级支持

    • 经过大量生产环境验证,性能和稳定性都很出色。
  • 灵活性高

    • 提供模块化组件,开发者可以按需选择使用。

Curator 的核心模块

  • Curator Client

    • 功能:
      • 这是 Curator 的基础模块,封装了 ZooKeeper 的原生 API,简化了与 ZooKeeper 的交互。
    • 特性:
      • 提供连接管理,包括自动重连和会话恢复。
      • 内置可配置的重试机制(如指数退避、固定次数重试)。
      • 提供异步和同步操作支持。
    • 用途:
      • 开发者可以使用 Curator Client 进行节点的基本 CRUD 操作(创建、读取、更新、删除),无需直接使用低级别的 ZooKeeper 原生 Java API。
  • Curator Framework

    • 功能:
      • 在 Curator Client 的基础上提供更高级的抽象,适合开发者直接使用。
    • 特性:
      • 包括对 Watcher(监听器)的优化和增强。
      • 提供高效的路径缓存机制(Path Cache),支持节点的递归监听。
    • 用途:
      • 简化对 ZooKeeper 节点的管理,同时支持数据监听等功能。
  • Curator Recipes

    • 功能:
      • 提供分布式系统常见的协调工具,是 Curator 的亮点模块。
    • 组件:
      • 分布式锁(Distributed Lock):支持可重入锁、读写锁等。
      • 领导选举(Leader Election):实现分布式系统中的主从模式(主节点选举)。
      • 分布式计数器(Distributed Counter):分布式环境下的计数器实现,类似于 Java 的 AtomicInteger
      • 分布式队列(Distributed Queue):支持普通队列和优先级队列。
      • 分布式屏障(Distributed Barrier):实现同步屏障,协调多个客户端的执行顺序。
    • 用途:
      • 适用于分布式协调任务,比如资源互斥、任务调度等。
  • Curator Discovery

    • 功能:
      • 服务注册与发现模块。
    • 特性:
      • 支持动态服务注册、发现和管理。
      • 提供负载均衡等功能。
    • 用途:
      • 微服务架构中实现动态服务目录,维护服务状态。
  • Curator Testing

    • 功能:
      • 专用于测试分布式系统的工具模块。
    • 特性:
      • 提供嵌入式 ZooKeeper 测试服务器。
      • 支持模拟真实的 ZooKeeper 集群环境。
    • 用途:
      • 方便开发者在本地或 CI/CD 环境中测试 ZooKeeper 相关功能。

Curator 的适用场景

  • 分布式锁
    • 实现分布式系统中资源的互斥访问。
  • 领导选举
    • 在分布式环境中选出一个唯一的主节点。
  • 服务注册与发现
    • 动态维护服务列表,支持微服务架构。
  • 任务队列
    • 实现任务的分布式生产和消费(即分布式任务队列)。
  • 配置管理
    • 实时更新分布式系统中的配置。

Curator 实现分布式锁

需求分析

使用 ZooKeeper 的原生 Java API 实现分布式锁时存在以下问题,因此强烈建议使用 Curator 来替代。

  • 代码复杂性高

    • 使用 ZooKeeper 的原生 API 实现分布式锁需要手动处理锁的创建、竞争、释放,以及会话失效等场景。
    • 涉及大量重复性代码(如监听器的注册和事件处理),增加开发和维护成本。
  • 会话超时处理复杂

    • ZooKeeper 会话超时后,锁可能仍然存在,其他客户端无法获取锁,导致锁的 “僵尸” 问题。
    • 需要额外实现超时检测和清理机制,这在 ZooKeeper 的原生 API 中实现较为复杂。
  • 锁重入支持困难

    • ZooKeeper 的原生 API 不直接支持重入锁功能。
    • 如果需要重入锁,必须自己实现额外的机制来追踪同一客户端对同一锁的多次加锁请求。
  • 监听事件的实现繁琐

    • ZooKeeper 的原生 API 提供了 Watcher 机制,但需要开发者自行编写逻辑来处理节点事件通知(如 NodeDeleted)。
    • 容易出现遗漏或处理不当的情况,导致锁状态不一致。
  • 锁竞争效率低

    • 使用 ZooKeeper 的原生 API 时,锁竞争通常是基于顺序节点的,客户端需要轮询或监听前序节点的删除事件。
    • 如果监听实现不当(例如频繁重连或轮询),会对 ZooKeeper 集群造成较大的压力。
  • 容错性差

    • ZooKeeper 的原生 API 不会提供高层次的容错机制,比如自动重试或连接恢复。
    • 在网络异常或 ZooKeeper 集群变更时,容易导致分布式锁不可用或误释放。
  • 缺乏高级功能

    • ZooKeeper 的原生 API 不提供诸如锁超时、锁泄漏检测等高级功能,必须由开发者自行实现。
    • 开发者需要手动处理租约机制,以避免因客户端长时间离线导致的锁占用问题。
  • 测试和调试困难

    • 原生实现涉及底层细节较多,测试分布式锁的边界情况(如会话失效、网络分区等)较为复杂。
    • 一旦出现问题,排查和定位也更加困难。

代码实现

引入依赖

  • 引入 Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.2</version>
</dependency>

日志配置

  • 日志文件(log4j.properties
1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

代码实现一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.Test;

/**
* 使用 Curator 实现分布式锁
*/
public class DistributeLockTest {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// 根节点的路径
private static final String ROOT_PATH = "/locks";

/**
* 获取客户端
*/
public CuratorFramework getCuratorFramework() {
// 重试策略
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);

// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.connectionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(backoffRetry)
.build();

// 启动客户端
client.start();

return client;
}

@Test
public void lockTest() throws InterruptedException {
// 创建分布式锁
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock1.acquire();
System.out.println("Thread 1 获得锁");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock1.release();
System.out.println("Thread 1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock2.acquire();
System.out.println("Thread 2 获得锁");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock2.release();
System.out.println("Thread 2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

Thread.sleep(Integer.MAX_VALUE);

// TODO 释放资源(关闭 ZooKeeper 客户端)
}

}

程序执行后的输出结果:

1
2
3
4
Thread 2 获得锁
Thread 2 释放锁
Thread 1 获得锁
Thread 1 释放锁

代码实现二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.Test;

/**
* 验证 Curator 实现的分布式锁是可重入锁
*/
public class DistributeLockTest {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// 根节点的路径
private static final String ROOT_PATH = "/locks";

/**
* 获取客户端
*/
public CuratorFramework getCuratorFramework() {
// 重试策略
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);

// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.connectionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(backoffRetry)
.build();

// 启动客户端
client.start();

return client;
}

@Test
public void lockTest() throws InterruptedException {
// 创建分布式锁
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), ROOT_PATH);

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock1.acquire();
System.out.println("Thread 1 获得锁");

// 再次获取锁(测试锁重入)
lock1.acquire();
System.out.println("Thread 1 再次获得锁");

Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock1.release();
System.out.println("Thread 1 释放锁");

// 再次释放锁
lock1.release();
System.out.println("Thread 1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取锁
lock2.acquire();
System.out.println("Thread 2 获得锁");

// 再次获取锁(测试锁重入)
lock2.acquire();
System.out.println("Thread 2 再次获得锁");

Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock2.release();
System.out.println("Thread 2 释放锁");

// 再次释放锁
lock2.release();
System.out.println("Thread 2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();

Thread.sleep(Integer.MAX_VALUE);

// TODO 释放资源(关闭 ZooKeeper 客户端)
}

}

程序执行后的输出结果:

1
2
3
4
5
6
7
8
Thread 1 获得锁
Thread 1 再次获得锁
Thread 1 释放锁
Thread 1 再次释放锁
Thread 2 获得锁
Thread 2 再次获得锁
Thread 2 释放锁
Thread 2 再次释放锁

特别注意

  • 对于可重入锁,无论是 Java 提供的 ReentrantLock,还是 Curator 实现的分布式锁,获取多少次锁就必须释放多少次锁。

代码下载

  • 完整的案例代码可以直接从 GitHub 下载对应章节 zookeeper-lesson-04