55. Zookeeper

环境安装

ubuntu 安装 java8 参考

1
2
apt update
apt install openjdk-8-jdk

Zookeeper 下载,官方地址 apache-zookeeper-3.6.0-bin.tar.gz

解压文件

1
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz

进去解压目录重新命名配置文件

1
mv zoo_sample.cfg zoo.cfg

修改数据文件目录

1
dataDir=/users/yanrs/zookeeper

服务端启动

1
2
./bin/zkServer.sh start  # 后台启动
# ./bin/zkServer.sh start-foreground # 前台启动

客户端启动

1
./bin/zkCli.sh

可以使用以下命令查看服务是否正常启动。

1
2
3
ps -ef | grep zookeeper  # 方法1
lsof -i:2181 # 方法2,查端口是否被占用
netstat -anp tcp | grep 2181 # 方法3,查端口是否被占用

简单操作

查看根节点信息

使用 ls / 查看根节点下所有的 znode

1
2
3
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1]

创建节点

使用 create 创建 worders 节点,并且指定了一个空字符串,这说明我们在创建的时候不希望在 worders 节点中保存数据。这种方式创建的是持久化节点。

1
2
3
4
5
[zk: localhost:2181(CONNECTED) 1] create /worders ""
Created /worders
[zk: localhost:2181(CONNECTED) 2] ls /
[worders, zookeeper]
[zk: localhost:2181(CONNECTED) 3]

创建有序的持久化节点(只需加上 -s 即可)

1
2
3
[zk: localhost:2181(CONNECTED) 15] create -s /test testdata
Created /test0000000001
[zk: localhost:2181(CONNECTED) 16]

创建临时节点(客户端断开,临时节点就会消失)

1
2
[zk: localhost:2181(CONNECTED) 21] create -e /test_tmp testdata
Created /test_tmp

创建有序的临时节点

1
2
[zk: localhost:2181(CONNECTED) 22] create -e -s /test_tmp testdata
Created /test_tmp0000000006

总结:创建有序节点只需加上 -s 即可,创建临时节点只需加上 -e 即可

删除节点

使用 delete 加路径来删除节点信息

1
2
3
4
[zk: localhost:2181(CONNECTED) 1] delete /worders
[zk: localhost:2181(CONNECTED) 2] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 3]

可以使用 deleteall 来递归删除节点信息

1
[zk: localhost:2181(CONNECTED) 49] deleteall /test  # 不必担心test下面是否还有子路径,全都删除

停止服务

1
./bin/zkServer.sh stop

会话

会话状态的转换依赖于发生在客户端与服务之间的各种事情

一个会话从 NOT_CONNECTED 状态开始,当 ZooKeeper 客户端初始化后转换到 CONNECTING 状态(图中的箭头1)。正常情况下,成功与 ZooKeeper 服务器建立连接后,会话转换到 CONNECTED 状态(箭头2)。当客户端与 ZooKeeper 服务器断开连接或者无法收到服务器的响应时,它就会转换回 CONNECTING 状态(箭头3)并尝试发现其他 ZooKeeper 服务器。如果可以发现另一个服务器或重连到原来的服务器,当服务器确认会话有效后,状态又会转换回 CONNECTED 状态。否则,它将会声明会话过期,然后转换到 CLOSED状态(箭头4)。应用也可以显式地关闭会话(箭头4和箭头5)。

创建一个会话时,你需要设置会话超时这个重要的参数,这个参数设置了 ZooKeeper 服务允许会话被声明为超时之前存在的时间。如果经过时间 t 之后服务接收不到这个会话的任何消息,服务就会声明会话过期。而在客户端侧,如果经过 t/3 的时间未收到任何消息,客户端将向服务器发送心跳消息。在经过 2t/3 时间后,ZooKeeper 客户端开始寻找其他的服务器,而此时它还有 t/3 时间去寻找。

在仲裁模式下,客户端有多个服务器可以连接,而在独立模式下,客户端只能尝试重新连接单个服务器。在仲裁模式中,应用需要传递可用的服务器列表给客户端,告知客户端可以连接的服务器信息并选择一个进行连接。当尝试连接到一个不同的服务器时,非常重要的是,这个服务器的 ZooKeeper 状态要与最后连接的服务器的 ZooKeeper 状态保持最新。客户端不能连接到这样的服务器:它未发现更新而客户端却已经发现的更新。ZooKeeper 通过在服务中排序更新操作来决定状态是否最新。ZooKeeper 确保每一个变化相对于所有其他已执行的更新是完全有序的。因此,如果一个客户端在位置 i 观察到一个更新,它就不能连接到只观察到 i<i 的服务器上。在 ZooKeeper 实现中,系统根据每一个更新建立的顺序来分配给事务标识符。

仲裁模式

搭建集群

在 conf/zoo.cfg 下添加配置信息

1
2
3
server.1=127.0.0.1:2222:2223
server.2=127.0.0.1:3333:4444
server.3=127.0.0.1:4444:4445

每一个 server.n 项指定了编号为 n 的 ZooKeeper 服务器使用的地址和端口号。每个 server.n 项通过冒号分隔为三部分,第一部分为服务器 n 的 IP 地址或主机名(hostname),第二部分和第三部分为 TCP 端口号,分别用于仲裁通信和群首选举。因为我们在同一个机器上运行三个服务器进程,所以我们需要在每一项中使用不同的端口号。通常,我们在不同的服务器上运行每个服务器进程,因此每个服务器项的配置可以使用相同的端口号。

复制 conf/zoo.cfg 文件,在 conf 下创建 z1/z1.cfg。复制 conf/zoo.cfg 文件,在 conf 下创建 z2/z2.cfg,并将其中的端口改为 2182,复制 conf/zoo.cfg 文件,在 conf 下创建 z3/z3.cfg,并将其中的端口改为 2183,目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
[email protected]:~/apache-zookeeper-3.6.0-bin/conf# tree .
.
├── configuration.xsl
├── log4j.properties
├── z1
│?? └── z1.cfg
├── z2
│?? └── z2.cfg
├── z3
│?? └── z3.cfg
└── zoo.cfg

切换到 dataDir 所在的目录,分别创建以下目录,用于存放数据文件

1
2
3
4
5
6
mkdir z1
mkdir z1/data
mkdir z2
mkdir z2/data
mkdir z3
mkdir z3/data

当启动一个服务器时,我们需要知道启动的是哪个服务器。一个服务器通过读取data目录下一个名为myid的文件来获取服务器ID信息。可以通过以下命令来创建这些文件:

1
2
3
echo 1 > z1/data/myid
echo 2 > z2/data/myid
echo 3 > z3/data/myid

目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
[email protected]:/users/yanrs/zookeeper# tree .
.
├── z1
│?? └── data
│?? └── myid
├── z2
│?? └── data
│?? └── myid
└── z3
└── data
└── myid

切换到 conf/z1 下启动 z1 服务

1
../../bin/zkServer.sh start ./z1.cfg

查看 log 文件夹中的日志,因为我们只启动了三个 ZooKeepe r服务器中的一个,所以整个服务还无法运行。在日志中我们将会看到以下形式的记录:

这个服务器疯狂地尝试连接到其他服务器,然后失败,如果我们启动另一个服务器,我们可以构成仲裁的法定人数。切换到 conf/z2 目录,执行以下命令,启动 z2。

1
../../bin/zkServer.sh start ./z2.cfg

查看 z1 和 z2 的状态,可以看到 z2 成为了 leader,z1 成为了 follower

访问集群

在 bin 目录下访问集群,访问集群时,我们需要加上 server 的地址,连接后使用 Ctrl + c 进行停止,反复多次连接会发现端口在 2181 和 2182 之间切换,还会注意到尝试 2183 端口后连接失败的消息,之后为成功连接到某一个服务器端口的消息。客户端以随机顺序连接到连接串中的服务器。这样可以用 ZooKeeper 来实现一个简单的负载均衡。不过,客户端无法指定优先选择的服务器来进行连接。

1
./zkCli.sh -server 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

分布式锁

假设有一个应用由 n 个进程组成,这些进程尝试获取一个锁。再次强调,ZooKeeper 并未直接暴露原语,因此我们使用 ZooKeeper 的接口来管理 znode,以此来实现锁。为了获得一个锁,每个进程 p 尝试创建 znode,名为 /lock。如果进程 p 成功创建了 znode,就表示它获得了锁并可以继续执行其临界区域的代码。不过一个潜在的问题是进程 p 可能崩溃,导致这个锁永远无法释放。在这种情况下,没有任何其他进程可以再次获得这个锁,整个系统可能因死锁而失灵。为了避免这种情况,我们不得不在创建这个节点时指定 /lock 为临时节点。

其他进程因 znode 存在而创建 /lock 失败。当收到 /lock 删除的通知时,如果进程 p 还需要继续获取锁,它就继续尝试创建 /lock 的步骤,如果其他进程已经创建了,就继续监听节点。

实现主从模式

主节点

创建主节点

因为只有一个进程会成为主节点,所以一个进程成为 ZooKeeper 的主节点后必须锁定管理权。为此,进程需要创建一个临时 znode,名为 /master:

1
./zkCli.sh -server 127.0.0.1:2181

连接 2181 并创建 master 节点,其中 -e 指定该节点为临时节点,一个临时节点会在会话过期或关闭时自动被删除。,”” 引号中的为节点数据信息。

1
2
3
[zk: 127.0.0.1:2181(CONNECTED) 0] create -e /master "test data: im 2181"
Created /master
[zk: 127.0.0.1:2181(CONNECTED) 1]

查看创建的节点的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[zk: 127.0.0.1:2181(CONNECTED) 7] ls /  # 查看 zk 树的节点信息
[master, zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 8] get /master # 查看 /master 节点信息
test data: im 2181
[zk: 127.0.0.1:2181(CONNECTED) 9] stat /master # 查看 /master 的元数据信息
cZxid = 0x10000000f
ctime = Tue Apr 28 18:31:40 CST 2020
mZxid = 0x10000000f
mtime = Tue Apr 28 18:31:40 CST 2020
pZxid = 0x10000000f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100004792a60003
dataLength = 18
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 10]
抢占主节点

连接 2182 并尝试创建 master 节点

1
./zkCli.sh -server 127.0.0.1:2181

会提示节点已经存在

1
2
3
[zk: 127.0.0.1:2182(CONNECTED) 1] create -e /master "test data: im 2182"
Node already exists: /master
[zk: 127.0.0.1:2182(CONNECTED) 2]
监视主节点

使用 stat -w + 节点名称来为节点创建监视点

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: 127.0.0.1:2182(CONNECTED) 5] stat -w /master  # 为 /master 节点创建监视点
cZxid = 0x10000000f
ctime = Tue Apr 28 18:31:40 CST 2020
mZxid = 0x10000000f
mtime = Tue Apr 28 18:31:40 CST 2020
pZxid = 0x10000000f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100004792a60003
dataLength = 18
numChildren = 0
[zk: 127.0.0.1:2182(CONNECTED) 6]

删除 /master 节点,会看到监视点发出节点被删除的信息

1
2
3
4
5
6
[zk: 127.0.0.1:2182(CONNECTED) 7] delete /master   # 删除节点

WATCHER::

WatchedEvent state:SyncConnected type:NodeDeleted path:/master # 监听器效果
[zk: 127.0.0.1:2182(CONNECTED) 8]
抢占主节点

在次运行创建主节点信息时,就能创建了。因为之前的已经被删除了。

1
2
3
[zk: 127.0.0.1:2182(CONNECTED) 8] create -e /master "test data: im 2182"
Created /master
[zk: 127.0.0.1:2182(CONNECTED) 9]

从节点,任务和分配

创建三个父znode,这三个节点为持久性节点,不包含任何数据

1
2
3
4
create /workers ""
create /tasks ""
create /assign ""
ls /

在真实的应用中,这些 znode 可能由主进程在分配任务前创建,也可能由一个引导程序创建,不管这些节点是如何创建的,一旦这些节点存在了,主节点就需要监视 /workers 和 /tasks 的子节点的变化情况:

1
2
ls -w /workers
ls -w /tasks

从节点首先要通知主节点,告知从节点可以执行任务。从节点通过在 /workers 子节点下创建临时性的 znode 来进行通知,并在子节点中使用庄机名来标识自己:

1
create -e /workers/zk.yanrs.com "zk.yanrs.com:2181"

从节点创建后主节点就能收到监听信息

1
2
3
WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/workers

后面操作略,主要流程信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
从节点告知主节点可以执行任务,并传递自己的身份(便于后续主节点分配任务给自己)
create -e /workers/zk.yanrs.com "zk.yanrs.com:2181"

从节点创建 /assign/zk.yanrs.com 节点来接收分配的任务,并监听状态
create /assign/zk.yanrs.com ""
ls -w /assign/zk.yanrs.com

客户端添加任务,并监听任务状态
create -s /tasks/task- "cmd"
ls -w /tasks/task-0000000000

主节点分配任务,这时从节点会接收到通知
create /assign/zk.yanrs.com/task-0000000000 ""

如果任务是给自己的,那么就执行任务,完成后添加 status 子节点,并标记为完成
create /tasks/task-0000000000/status "done"

# 客户端检查执行结果
get /tasks/task-0000000000

Stat 结构体

Znode 维护了一个 stat 结构,这个 stat 包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让 Zookeeper 验证缓存和协调更新。每次 znode 的数据发生了变化,版本号就增加。例如,无论何时客户端检索数据,它也一起检索数据的版本号。并且当客户端执行更新或删除时,客户端必须提供他正在改变的znode 的版本号。如果它提供的版本号和真实的数据版本号不一致,更新将会失败。

查看某个目录的 stat 信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[zk: localhost:2181(CONNECTED) 25] stat test_tmp
Path must start with / character
[zk: localhost:2181(CONNECTED) 26] stat /test_tmp
cZxid = 0x9
ctime = Sun May 17 21:16:29 CST 2020
mZxid = 0x9
mtime = Sun May 17 21:16:29 CST 2020
pZxid = 0x9
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x10010118bfa0000
dataLength = 8
numChildren = 0

Stat 结构体:

1
2
3
4
5
6
7
8
9
10
11
czxid - 创建节点的事务的 zxid(ZooKeeper Transaction Id),即创建节点的时候自动生成的一个 ID
ctime - znode 被创建的毫秒数(从1970年开始)
mzxid - znode 最后更新的zxid
mtime - znode 最后修改的毫秒数(从1970年开始)
pZxid - znode 最后更新的子节点 zxid
cversion - znode子节点变化号,znode子节点修改次数
dataversion - znode数据变化号
aclVersion - znode访问控制列表的变化号
ephemeralOwner - 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
dataLength - znode的数据长度
numChildren - znode子节点数量

Znode

Znode = path + data + stat

Java API

简单操作

演示连接,创建节点,获取节点信息。源码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.atguigu.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.rmi.server.ExportException;

/**
* ZK 基础使用
*/
public class Demo1 {
private static final String CONNECTSRING="127.0.0.1:2181";
private static final String PATH="/root";
private static final int TIMEOUT= 20 * 1000;

/**
* 获取 ZK 实例连接实例
* @return
* @throws IOException
*/
public ZooKeeper startZK() throws IOException {
return new ZooKeeper(CONNECTSRING, TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("连接监听");
}
});
}

/**
* 关闭 ZK
* @param zooKeeper
* @throws InterruptedException
*/
public void stopZk(ZooKeeper zooKeeper) throws InterruptedException {
if(zooKeeper!=null){
zooKeeper.close();
}
}

/**
* 创建 ZNode 节点
* @param zooKeeper
* @param path
* @param data
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String createZNode(ZooKeeper zooKeeper, String path, String data) throws KeeperException, InterruptedException {
// ZooDefs.Ids.OPEN_ACL_UNSAFE 代表无 ACL
// CreateMode.PERSISTENT 表示非临时节点
return zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

/**
* 获取数据
* @param zooKeeper
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getZNode(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException {
// 这里没有 watch,所以 watch 为 false
// 第三个参数为 Stat,传入一个空对象即可
byte[] data = zooKeeper.getData(path, false, new Stat());
return new String(data);
}

public static void main(String[] args) throws Exception {
Demo1 demo1 = new Demo1();
ZooKeeper zooKeeper = demo1.startZK();
if (zooKeeper.exists(PATH, false)==null){
demo1.createZNode(zooKeeper, PATH, "test data");
String zNode = demo1.getZNode(zooKeeper, PATH);
System.out.println(zNode);
}else{
System.out.println("znode exists!!");
}
}
}

监听器

在获取值的时候加入一个监听器,如果值发生变化就触发 triggerValue 函数。源码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.atguigu.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class Watch1 {
private static final String CONNECTSRING="127.0.0.1:2181";
private static final String PATH="/root";
private static final int TIMEOUT= 20 * 1000;

private ZooKeeper zooKeeper;

public ZooKeeper getZooKeeper() {
return zooKeeper;
}

public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}

/**
* 获取 ZK 实例连接实例
* @return
* @throws IOException
*/
public ZooKeeper startZK() throws IOException {
return new ZooKeeper(CONNECTSRING, TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("连接监听");
}
});
}

/**
* 关闭 ZK
* @throws InterruptedException
*/
public void stopZk() throws InterruptedException {
if(this.zooKeeper!=null){
this.zooKeeper.close();
}
}

/**
* 创建 ZNode 节点
* @param path
* @param data
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String createZNode(String path, String data) throws KeeperException, InterruptedException {
// ZooDefs.Ids.OPEN_ACL_UNSAFE 代表无 ACL
// CreateMode.PERSISTENT 表示非临时节点
return this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

/**
* 获取数据
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getZNode(String path) throws KeeperException, InterruptedException {
// 这里添加一个监听器
// 第三个参数为 Stat,传入一个空对象即可
byte[] data = this.zooKeeper.getData(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
String newValue = null;
try {
newValue = triggerValue(path);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("newValue:" + newValue);
}
}, new Stat());
return new String(data);
}

private String triggerValue(String path) throws KeeperException, InterruptedException {
byte[] data = this.zooKeeper.getData(path, false, new Stat());
return new String(data);
}

public static void main(String[] args) throws Exception {
Watch1 watch1 = new Watch1();
watch1.setZooKeeper(watch1.startZK());
if(watch1.getZooKeeper().exists(PATH, false)==null){
watch1.createZNode(PATH, "test data");
String zNode = watch1.getZNode(PATH);
System.out.println(zNode);
}else{
System.out.println("znode exists!!");
}

Thread.sleep(Integer.MAX_VALUE);
}
}

测试的时候,先将之前的 znode 节点删除,然后运行程序重新创建,创建后重新设置值,就能看到监听器监听到的变化,但是如果再次设置值的时候就监听不到了,原因是每个监听器只通知一次。

运行程序

查看监听器的输出

上面中的监听器通知一次后就结束了,如果一直监听某个节点呢?源码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package com.atguigu.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class Watch2 {
private static final String CONNECTSRING="127.0.0.1:2181";
private static final String PATH="/root";
private static final int TIMEOUT= 20 * 1000;

private ZooKeeper zooKeeper;

public ZooKeeper getZooKeeper() {
return zooKeeper;
}

public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}

/**
* 获取 ZK 实例连接实例
* @return
* @throws IOException
*/
public ZooKeeper startZK() throws IOException {
return new ZooKeeper(CONNECTSRING, TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("连接监听");
}
});
}

/**
* 关闭 ZK
* @throws InterruptedException
*/
public void stopZk() throws InterruptedException {
if(this.zooKeeper!=null){
this.zooKeeper.close();
}
}

/**
* 创建 ZNode 节点
* @param path
* @param data
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String createZNode(String path, String data) throws KeeperException, InterruptedException {
// ZooDefs.Ids.OPEN_ACL_UNSAFE 代表无 ACL
// CreateMode.PERSISTENT 表示非临时节点
return this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

/**
* 获取数据
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getZNode(String path) throws KeeperException, InterruptedException {
// 这里添加一个监听器
// 第三个参数为 Stat,传入一个空对象即可
byte[] data = this.zooKeeper.getData(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
String newValue = null;
try {
triggerValue(path);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
String oldValue = new String(data);
System.out.println("old value: " + oldValue);
return oldValue;
}

private String triggerValue(String path) throws KeeperException, InterruptedException {
byte[] data = this.zooKeeper.getData(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
// 再次调用自己,为 path 节点添加监听器
triggerValue(path);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
String newValue = new String(data);
System.out.println("newValue: " + newValue);
return newValue;
}

public static void main(String[] args) throws Exception {
Watch2 watch1 = new Watch2();
watch1.setZooKeeper(watch1.startZK());
if(watch1.getZooKeeper().exists(PATH, false)==null){
watch1.createZNode(PATH, "test data");
String zNode = watch1.getZNode(PATH);
System.out.println(zNode);
}else{
System.out.println("znode exists!!");
}

Thread.sleep(Integer.MAX_VALUE);
}
}

上面程序只要一直改变 /root 节点的值,就会一直处罚监听器,并且在监听器中再次设置一个监听器,下次改变的时候就能发现。这样就实现了一直监听某个节点的需求。