ZooKeeper 入门教程之二

大纲

前言

学习资源

ZooKeeper 客户端的命令

客户端命令行语法

  • 启动 ZooKeeper 客户端
1
$ ./zkCli.sh -server 127.0.0.1:2181
  • 查看 ZooKeeper 的命令帮助手册
1
[zk: 127.0.0.1:2181(CONNECTED) 0] help

ZNode 节点的类型

四种常见节点类型

ZooKeeper 的节点(ZNode)是其数据存储的基本单元,每个节点有自己的路径、数据和元信息。根据用途和特性,ZooKeeper 的节点分为以下四种类型:

  • 持久节点(Persistent ZNode)

    • 定义:
      • 创建持久节点后,除非是显示删除,否则该节点将一直存在于 ZooKeeper 中。
      • 即使创建节点的客户端断开连接,节点也不会被删除。
    • 特点:
      • 不受客户端会话状态的影响,即使客户端断开连接,节点仍然存在。
      • 适用于存储长期存在的数据,如配置信息、元信息等。
    • 适用场景:
      • 配置信息存储:在分布式系统中,可以使用持久节点来存储各种配置信息,例如数据库连接信息、系统参数等。这些配置信息不会因为客户端的连接状态而改变,因此适合使用持久节点存储。
  • 临时节点(Ephemeral ZNode)

    • 定义:
      • 临时节点的生命周期与创建它的客户端会话绑定。如果创建临时节点的客户端会话结束(例如客户端与 ZooKeeper 服务器的连接断开),那么这个节点就会被自动删除。
      • 临时节点非常适合用于表示临时状态或临时任务。
    • 特点:
      • 当客户端断开会话(例如关闭连接或会话超时),节点会自动删除。
      • 无法创建临时节点的子节点(子节点必须是持久节点)。
    • 适用场景:
      • 服务注册中心:每个服务在启动时,连接到 ZooKeeper 并创建一个唯一的临时节点,节点名称通常包括服务的标识(如 IP、端口等)。客户端查询指定服务的父节点,即可获取所有在线服务的列表。ZooKeeper 的 Watcher 机制可用于实时监听服务的上下线变化。当服务实例正常关闭或因故宕机导致会话断开时,ZooKeeper 会自动删除对应的临时节点,确保注册中心能够反映实时的服务状态。
  • 持久顺序节点(Persistent Sequential ZNode)

    • 定义:
      • 持久顺序节点与持久节点类似,但是它会在节点名称后面自动添加一个单调递增的序号。这个序号是一个由 ZooKeeper 自动生成的数字,用于保持节点的顺序。
      • 持久顺序节点在创建后也不会被自动删除,除非是显式删除。
    • 特点:
      • 节点在创建后会一直存在,直到被明确删除。
      • 节点名称带有自动生成的顺序后缀,用于有序标识。
    • 适用场景:
      • 分布式任务调度:在分布式任务调度系统中,多个任务需要按顺序执行且长期存储,适合使用持久化顺序节点来实现。任务提交时,在 ZooKeeper 的 /tasks 路径下创建持久顺序节点,如 /tasks/task000000001,自动带有递增顺序号,确保任务能够按顺序处理。工作节点按顺序获取任务并处理,完成后删除对应节点。即使系统崩溃,节点数据仍存在,确保任务不会丢失。
  • 临时顺序节点(Ephemeral Sequential ZNode)

    • 定义:
      • 临时顺序节点结合了临时节点和顺序节点的特点。它会在节点名称后面自动添加一个单调递增的序号,并且在创建它的客户端会话结束后自动删除。
      • 临时顺序节点常用于实现分布式锁或者分布式队列。
    • 特点:
      • 节点的生命周期与客户端会话绑定,会话结束时自动删除。
      • 节点名称带有自动生成的顺序后缀,用于有序标识。
    • 适用场景:
      • 分布式锁:在分布式系统中,可以使用临时顺序节点来实现分布式锁。每个客户端尝试获取锁时,都创建一个临时顺序节点,然后检查自己创建的是否是最小的节点。如果是最小的节点,则表示获取了锁;否则,客户端需要监听前一个节点的删除事件,等待锁的释放。

提示

  • 创建 ZNode 时设置顺序标识,ZNode 的名称后面会附加一个顺序号,这顺序号是一个单调递增的计数器,由父节点维护。
  • 特别注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

三种新的节点类型

在 ZooKeeper 3.5.5 或更高版本中,新增了以下三种节点类型:

  • 容器节点(Container ZNode)

    • 定义:
      • 容器节点是一个标记节点类型,它的用途是将节点创建为一个 “容器”。
    • 特点:
      • 容器节点本身不会存储数据,而是作为一个逻辑上的分隔符或者容器来组织其它节点。
      • 容器节点没有真正的数据内容,但它可以有子节点。
      • 容器节点本质上是持久节点,但会在以下条件下被自动删除:
        • 容器节点的子节点全部被删除。
        • 在一段时间内(默认由 ZooKeeper 内部机制决定),容器节点没有新的子节点被创建。
    • 创建命令:
      • 节点创建:create -c /book "python"
    • 适用场景:
      • 当需要在 ZooKeeper 中组织节点,或者以分层方式管理节点时,可以使用容器节点。
      • 适用于组织节点的逻辑结构,尤其是在需要短期存储子节点但不希望长期保留父节点的场景,比如分布式锁或分布式队列的根节点。
      • 容器节点可以用作命名空间的划分或分组的基础结构。例如,可以使用容器节点来管理多个服务或配置,便于层级化管理。
    • 注意事项:
      • 容器节点的删除是由 ZooKeeper 后台任务完成,可能存在一定延迟。
      • 容器节点仅在子节点被清空后,ZooKeeper 后台任务才会尝试删除掉它。
  • 带生存时间的持久节点(Persistent With TTL ZNode)

    • 定义:
      • 一种持久节点类型,但带有 “TTL(生存时间)” 特性。
      • TTL 表示该节点将会在指定的时间过期后自动删除。过期时间由客户端在创建节点时设置,这种节点的生命周期是有限的。
    • 特点:
      • 自动删除:节点会在指定的 TTL 到期后自动被 ZooKeeper 删除。
      • 持久性:节点会在 ZooKeeper 重启后继续存在,直到 TTL 到期。
      • 可以用于存储临时的数据,但与普通的临时节点不同,它允许节点具有一个持久性的特征,只是在一定时间后会被清理。
    • 创建命令:
      • 节点创建:create -t 60000 /book "python"
    • 适用场景:
      • 当需要存储一些临时性的、具有时效性的数据,并希望它们在过期后自动删除时。
      • 比如,可以用来存储某些配置项或缓存数据,允许系统在数据过期时自动清理,而不需要人工干预。
  • 带生存时间的持久顺序节点(Persistent Sequential With TTL ZNode)

    • 定义:
      • 一种结合了持久性、顺序性和 TTL 的节点类型。
      • 节点会被赋予一个唯一的顺序标识符(通常是数字后缀),并且也具有 “TTL(生存时间)” 特性。
    • 特点:
      • 持久性:节点会在 ZooKeeper 重启后继续存在,直到 TTL 到期。
      • 顺序性:ZooKeeper 会为该节点分配一个自增的序列号,这个序列号会在节点名称后附加。
      • 自动删除:节点会在 TTL 到期后被自动删除。
    • 创建命令:
      • 节点创建:create -s -t 60000 /book "python"
    • 适用场景:
      • 适用于限时的分布式锁、分布式队列等,保证在特定时间后节点过期并清理。
      • 适用于需要生成顺序节点,并且这些节点具有时效性的业务场景。例如,在分布式任务调度中,可以为每个任务分配一个顺序编号,并且在任务完成后自动删除这些顺序编号的节点。

ZNode 节点的创建

持久节点的创建

  • 创建持久节点
1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
Created /backend
  • 获取节点的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[zk: 127.0.0.1:2181(CONNECTED) 1] get /backend
backend

[zk: 127.0.0.1:2181(CONNECTED) 2] get -s /backend
backend
cZxid = 0x300000020
ctime = Sun Dec 29 17:11:26 CST 2021
mZxid = 0x300000020
mtime = Sun Dec 29 17:11:26 CST 2021
pZxid = 0x300000021
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 1

临时节点的创建

  • 创建临时节点
1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] create -e /backend "backend"
Created /backend
  • 查看根目录下的节点列表
1
2
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[backend, zookeeper]
  • 退出客户端
1
[zk: 127.0.0.1:2181(CONNECTED) 2] quit
  • 客户端重新连接后,查看临时节点是否被删除
1
2
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /
[zookeeper]

持久顺序节点的创建

  • 首先创建一个普通的根节点(持久节点)
1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
Created /backend
  • 创建持久顺序节点
1
2
3
4
5
6
7
8
[zk: 127.0.0.1:2181(CONNECTED) 1] create -s /backend/python "python"
Created /backend/python0000000000

[zk: 127.0.0.1:2181(CONNECTED) 2] create -s /backend/python "python"
Created /backend/python0000000001

[zk: 127.0.0.1:2181(CONNECTED) 3] create -s /backend/python "python"
Created /backend/python0000000002
  • 查看子节点列表
1
2
[zk: 127.0.0.1:2181(CONNECTED) 4] ls /backend
[python0000000000, python0000000001, python0000000002]

临时顺序节点的创建

  • 首先创建一个普通的根节点(持久节点)
1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
Created /backend
  • 创建临时顺序节点
1
2
3
4
5
6
7
8
[zk: 127.0.0.1:2181(CONNECTED) 1] create -e -s /backend/python "python"
Created /backend/python0000000000

[zk: 127.0.0.1:2181(CONNECTED) 2] create -e -s /backend/python "python"
Created /backend/python0000000001

[zk: 127.0.0.1:2181(CONNECTED) 3] create -e -s /backend/python "python"
Created /backend/python0000000002
  • 查看子节点列表
1
2
[zk: 127.0.0.1:2181(CONNECTED) 4] ls /backend
[python0000000000, python0000000001, python0000000002]
  • 退出客户端
1
[zk: 127.0.0.1:2181(CONNECTED) 5] quit
  • 客户端重新连接后,查看临时节点是否被删除
1
2
[zk: 127.0.0.1:2181(CONNECTED) 6] ls /backend
[]

ZNode 节点的删除

  • 删除节点
1
[zk: 127.0.0.1:2181(CONNECTED) 0] delete /backend
  • 递归删除节点
1
[zk: 127.0.0.1:2181(CONNECTED) 1] delete /backend/python

修改 ZNode 节点的值

  • 首先创建一个节点
1
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
  • 获取节点的值
1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: 127.0.0.1:2181(CONNECTED) 1] get -s /backend
backend
cZxid = 0x300000055
ctime = Sun Dec 29 17:59:20 CST 2021
mZxid = 0x300000055
mtime = Sun Dec 29 17:59:20 CST 2021
pZxid = 0x300000055
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
  • 修改节点的值
1
[zk: 127.0.0.1:2181(CONNECTED) 2] set /backend "backend_update"
  • 再次获取节点的值
1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: 127.0.0.1:2181(CONNECTED) 3] get -s /backend
backend_update
cZxid = 0x300000055
ctime = Sun Dec 29 17:59:20 CST 2021
mZxid = 0x300000056
mtime = Sun Dec 29 18:01:13 CST 2021
pZxid = 0x300000055
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 0

获取 ZNode 节点的值

  • 首先创建一个节点
1
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
  • 获取节点的值
1
2
[zk: 127.0.0.1:2181(CONNECTED) 1] get /backend
backend
  • 获取节点的值(附带详细信息)
1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: 127.0.0.1:2181(CONNECTED) 2] get -s /backend
backend
cZxid = 0x300000055
ctime = Sun Dec 29 17:59:20 CST 2021
mZxid = 0x300000055
mtime = Sun Dec 29 17:59:20 CST 2021
pZxid = 0x300000055
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0

获取 ZNode 节点的信息

  • 查看节点中所包含的子节点
1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[zookeeper]
  • 查看节点的详细信息
1
2
3
4
5
6
7
8
9
10
11
12
[zk: 127.0.0.1:2181(CONNECTED) 1] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x30000001d
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

查看 ZNode 节点的状态

  • 首先创建一个节点
1
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
  • 查看节点的状态
1
2
3
4
5
6
7
8
9
10
11
12
[zk: 127.0.0.1:2181(CONNECTED) 0] stat /backend
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x30000005d
cversion = 32
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2
  • 字段信息说明
字段含义
czxid 节点创建时的事务 ID(ZXID 是事务的唯一标识)
ctime 节点创建的时间戳(以毫秒为单位,从 1970 年开始)
mzxid 节点最后一次更新时的事务 ID(ZXID)
mtime 节点最后一次更新的时间戳(以毫秒为单位,从 1970 年开始)
pzxid 最后一次修改子节点列表的事务 ID(ZXID)
cversion 节点子节点的版本号(即子节点修改的次数)
dataVersion 节点数据的版本号
aclVersion 节点访问控制列表(ACL)的版本号
ephemeralOwner 如果是临时节点,表示创建该节点的会话 ID(Session ID);如果不是临时节点,则是 0
dataLength 节点存储的数据大小(以字节为单位)
numChildren 节点的直接子节点数量

监听 ZNode 节点的变化

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加 / 删除)时,ZooKeeper 服务器会通知客户端。监听机制可以保证 ZooKeeper 保存的任何数据的任何改变,都能快速地反应给监听了该节点的应用程序。

监听器的原理

  • 监听器的工作原理
    • (1) 首先要有一个 main 线程。
    • (2) 在 main 线程中创建 Zookeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(Connect),一个负责监听(Listener)。
    • (3) 通过 Connect 线程将注册的监听事件发送给 Zookeeper 服务器。
    • (4) 在 Zookeeper 服务器中,将客户端注册的监听事件添加到监听器列表。
    • (5) 当 Zookeeper 服务器监听到有数据或子节点发生变化,就会将这个消息发送给客户端的 Listener 线程。
    • (6) 在客户端的 Listener 线程内,调用 process() 方法进行消息处理。

监听命令的语法

  • 监听节点数据的变化

    • get -w path
  • 监听子节点增减的变化

    • ls -w path

监听节点的值变化

  • 首先创建一个节点
1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
Created /backend
  • 在集群节点一上注册监听 /backend 节点的数据变化
1
[zk: 127.0.0.1:2181(CONNECTED) 1] get -w /backend
  • 在集群节点二上修改 /backend 节点的数据
1
[zk: 127.0.0.1:2181(CONNECTED) 2] set /backend "backend_changed"
  • 观察集群节点一上接收到数据变化的监听
1
2
3
WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/backend

特别注意

ZooKeeper 的监听器注册一次,只能监听一次。如果想再次监听,则需要再次注册。

监听节点的子节点变化

  • 首先创建一个节点
1
2
[zk: 127.0.0.1:2181(CONNECTED) 0] create /backend "backend"
Created /backend
  • 在集群节点一上注册监听 /backend 节点的子节点变化
1
2
[zk: 127.0.0.1:2181(CONNECTED) 1] ls -w /backend
[]
  • 在集群节点二上为 /backend 节点创建子节点
1
2
[zk: 127.0.0.1:2181(CONNECTED) 2] create /backend/java
Created /backend/java
  • 观察集群节点一上接收到子节点变化的监听
1
2
3
WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/backend

特别注意

ZooKeeper 的监听器注册一次,只能监听一次。如果想再次监听,则需要再次注册。

ZooKeeper 客户端的 API

本节将演示如何使用 ZooKeeper 原生客户端(并不是 Curator 客户端)的 API,比如创建节点、监听子节点变化、判断节点是否存在,完整的案例代码可以直接从 GitHub 下载对应章节 zookeeper-lesson-01

  • 引入 Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.2</version>
</dependency>
  • 日志文件(log4j.properties
1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • Java 案例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class ZKClientTest {

// ZooKeeper 连接地址(多个集群节点使用逗号分割)
private static final String ADDRESS = "192.168.2.235:2181,192.168.2.235:2182,192.168.2.235:2183";

// ZooKeeper 会话超时时间
private static final int SESSION_TIMEOUT = 2000;

// ZooKeeper 客户端
private static ZooKeeper client;

/**
* 初始化 ZK 客户端
*/
@BeforeAll
public static void init() throws Exception {
// 创建 ZK 客户端,并设置全局监听器
client = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, new Watcher() {

/**
* 接收到监听事件后的回调函数(用户的业务逻辑)
*/
@Override
public void process(WatchedEvent event) {
// 获取监听事件的信息
System.out.println("===> Watcher: " + event.getType() + " -- " + event.getPath());

// 监听器是一次性的,当接收到监听事件后,需要重新设置监听器
try {
List<String> children = client.getChildren("/", true);
for (String path : children) {
System.out.println("===> " + path);
}
} catch (Exception e) {
e.printStackTrace();
}
}

});
}

/**
* 创建节点
*/
@Test
public void createNode() throws Exception {
String path = client.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("===> " + path);
}

/**
* 获取子节点,并监听子节点的变化
* <p> 监听器是一次性的,当接收到监听事件后,需要重新设置监听器
*/
@Test
public void getChildren() throws Exception {
List<String> children = client.getChildren("/", true);
for (String path : children) {
System.out.println("===> " + path);
}
// 延时阻塞
System.in.read();
}

/**
* 判断节点是否存在
*/
@Test
public void exist() throws Exception {
Stat stat = client.exists("/java", false);
System.out.println(stat == null ? "not exist" : "exist");
}

}

特别注意

  • ZooKeeper 的监听器是一次性的,当客户端接收到监听事件后,需要让客户端重新设置监听器。
  • 上述案例代码中,ZooKeeper 客户端的版本是 3.6.3,而 ZooKeeper 服务器的版本是 3.5.7

ZooKeeper 客户端的写入原理

当 ZooKeeper 客户端将写请求直接发送给 Leader 节点时,ZooKeeper 服务器的处理流程如下图所示

  • (1) Leader 节点让 Follwer 节点写入数据。
  • (2) 只要满足一半以上节点写入成功(半数写入),Leader 节点就会立刻返回 ACK(确认消息)给客户端。
  • (3) 最后 Leader 节点继续让其他 Follower 节点写入数据。

当 ZooKeeper 客户端将写请求发送给 Follower 节点,ZooKeeper 服务器的处理流程如下图所示

  • (1) Follower 节点会将写请求交给 Leader 节点。
  • (2) Leader 节点让 Follwer 节点写入数据。
  • (3) 只要满足一半以上节点写入成功(半数写入),Leader 节点就会立刻返回 ACK(确认消息)给 Follower 节点,然后 Follower 节点再返回 ACK(确认消息)给客户端。
  • (4) 最后 Leader 节点继续让其他 Follower 节点写入数据。