ZooKeeper系统模型
数据模型ZNode
在ZooKeeper中,数据信息被保存在⼀个个数据节点上,这些节点被称为ZNode。ZNode 是ZooKeeper的最小数据单元,在ZNode下面又可以再挂ZNode。这样一层一层下去就形成了一个ZNode树,我们称之为ZNode Tree,它采用了类似文件系统的层级树状结构进行管理。
在ZooKeeper中,每一个数据节点都是一个ZNode,上图根目录下有两个节点,分别是app1和app2。其中app1下面又有三个子节点,所有的ZNode按层次进行组织,形成一棵树。ZNode的节点路径标识方式和Unix文件系统路径非常相似,都是由一系列使用斜杠(/)进行分割的路径表示,开发人员可以向这个节点写入数据,也可以在该节点下创建子节点。
ZNode的类型
刚刚我们已经了解到,ZooKeeper的ZNode Tree是由一系列数据节点组成的。接下来,我们对数据节点的类型做一下介绍。ZooKeeper节点类型可以分为三大类:持久性节点(Persistent)、临时性节点(Ephemeral)和顺序性节点(Sequential)。
在开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。不同类型的节点则会有不同的生命周期:
- 持久节点:是ZooKeeper中最常见的一种节点类型。所谓持久节点,就是指节点被创建后会一直存在服务器,知道删除惭怍主动清除
- 持久顺序节点:就是有顺序的持久节点,节点性质和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序
- 临时节点:就是会被自动清理掉的节点。它的生命周期和客户端会话绑在一起,客户端会话结束之后,节点被自动删除。与持久性节点不同的是,临时节点不能创建子节点
- 临时顺序节点:就是有顺序的临时节点。和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀
事务ID
事务是对物理和抽象的应用状态上的操作集合。往往在现在的概念中,狭义上的事务通常指的是数据库事务,一般包含了一系列对数据库有序的读写操作。这些数据库书屋具有的所谓的ACID特性,即原子性(Atomic)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。
而在ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作,我们也成之为事务操作或更新操作。一般包括数据节点创建与删除、数据节点内容更新等操作。对于每一个事务请求,ZooKeeper都会为其分配一个全局唯一的事务ID,用ZXID来表示,通常是一个64位的数字。每一个ZXID对应一次更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作的全局顺序。
ZNode的状态信息

整个ZNode节点内容包括两个部分:节点数据内容和节点状态信息。图中quota是数据内容,其他的属于状态信息,详情如下:
- cZxid:就是Create ZXID,表示节点被创建时的事务ID
- ctime:就是Create Time,表示节点的创建时间
- mZxid:就是Modified ZXID,表示节点最后一次被修改时的事务ID
- mtime:就是Modified Time,表示节后最后一次修改的时间
- pZxid:表示该节点额子节点列表最后一个被修改时的事务ID。只有子节点列表变更才会更新pZxid,子节点内容变更不会更新该属性
- cversion:表示子节点的版本号
- dataVersion:表示内容版本号
- aclVersion:表示acl版本
- ephemeralOwner:表示创建该临时节点时的会话session ID,如果是持久性节点那么值为0
- dataLength:表示数据长度
- numChildren:表示直系子节点个数
Watcher-数据变更通知
Zookeeper使用Watcher机制实现分布式数据的发布/订阅功能。⼀个典型的发布/订阅模型系统定义了⼀种⼀对多的订阅关系,能够让多个订阅者同时监听某⼀个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。
在ZooKeeper中,引入了Watcher机制来实现这种分布式的通知功能。ZooKeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。整个Watcher注册与通知过程如图所示:
ZooKeeper的Watcher机制主要包括客户端线程、客户端WatcherManager和ZooKeeper服务器三部分。具体工作流程为:客户端在向ZooKeeper服务器注册的同时,会将Watcher对象存储在客户端额WatcherManager当中。当ZooKeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑。
ACL-保障数据的安全
ZooKeeper作为一个分布式协调框架,其内部存储了分布式系统运行时状态的元数据。这些元数据会直接影响基于ZooKeeper进行构造的分布式系统的运行状态。因此,如何保障系统中数据的安全,从而避免因误操作所带来的数据随意变更而导致的数据库异常十分重要。在ZooKeeper中,提供了一套完善的ACL(Access Control List)权限控制机制来保障数据的安全。
我们可以从三个方面来理解ACl机制:权限模式(Scheme)、授权对象(ID)、权限(Permission)。通常使用“scheme:id:permission”来标识一个有效的ACL信息。
权限模式用来确定权限验证过程中使用的检验策略,有如下四种模式:
- IP:IP模式就是通过IP地址粒度来进行权限控制,如“ip:192.168.1.110”表示权限控制针对该IP地址。同时IP模式可以支持按照网段方式进行配置,入“ip:192.168.0.1/24”表示针对192.168.0.*这个网段进行权限控制
- Digest:Digest是最常用的权限控制模式,要更符合我们对权限控制的认识,其使用"username:password"形式的权限标识来进行权限配置,便于区分不同应用来进行权限控制。当我们通过“username:password”形式配置了权限标识后,ZooKeeper会先后对其进行SHA-1加密和BASE64编码
- World:World是⼀种最开放的权限控制模式,这种权限控制⽅式几乎没有任何作用,数据节点的访问权限对所有用户开放,即所有用户可以在不进行任何权限校验的情况下操作ZooKeeper上的数据。另外,World模式也可以看做一种特殊的Digest模式,它只有一个一个权限标识,即“world:anyone”
- Super:Super模式,顾名思义就是超级用户的意思,也是一种特殊的Digest模式。在Super模式下,超级用户可以对任意ZooKeeper上的数据节点进行任何操作
授权对象指的是权限赋予的用户或指定的一个实体,例如IP地址或是机器等。在不同的权限模式下,授权对象是不同的,下表列出了各个权限模式和授权对象之间的对应关系。
权限模式 | 授权对象 |
IP | 通常是⼀个IP地址或IP段:例如:192.168.10.110 或192.168.10.1/24 |
Digest | ⾃定义,通常是username:BASE64(SHA-1(username:password))例如:zm:sdfndsllndlksfn7c= |
World | 只有⼀个ID :anyone |
Super | 超级用户 |
权限就是指那些通过权限检查后可以被允许执行的操作。在ZooKeeper中,所有对数据的操作权限分为以下五大类:
- CREATE(C):数据节点的创建权限,允许授权对象在该数据节点下创建子节点
- DELETE(D):子节点的删除权限,允许授权对象删除该数据节点的子节点
- READ(R):数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表等
- WRITE(W):数据节点的更新权限,允许授权对象对该数据节点进行更新操作
- ADMIN(A):数据节点的管理权限,允许授权对象对该数据节点进行 ACL 相关的设置操作
ZooKeeper命令行操作
现在已经搭建起了⼀个能够正常运行的ZooKeeper服务了,所以接下来,就是来借助客户端来对 ZooKeeper 的数据节点进行操作。首先进入到ZooKeeper的bin目录,通过以下命令启动一个客户端:
./zkcli.sh 连接本地的zookeeper服务器
./zkCli.sh -server ip:port 连接指定的服务器
连接成功之后,系统会输出Zookeeper的相关环境及配置信息等信息。输入help之后,屏幕会输出可用的ZooKeeper命令,如图:
创建节点
使用create命令,可以创建⼀个ZooKeeper节点, 如
create [-s][-e] path data acl
其中,-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;acl用来进行权限控制。
读取节点
与读取相关的命令有ls 命令和get命令。
ls命令可以列出ZooKeeper指定节点下的所有子节点,但只能查看指定节点下的第⼀级的所有子节点:
ls path
其中,path表示的是指定数据节点的节点路径
get命令可以获取ZooKeeper指定节点的数据内容和属性信息:
get path
更新节点
使用set命令,可以更新指定节点的数据内容,用法如下:
set path data [version]
其中,data就是要更新的新内容,version表示数据版本.在ZooKeeper中,节点的数据是有版本概念的,这个参数用于指定本次更新操作是基于ZNode的哪⼀个数据版本进行的,如将/zk-permanent节点的数据更新为456,可以使用如下命令:set /zk-permanent 456
现在dataVersion已经变为1了,表示进行了更新。
删除节点
使用delete命令可以删除ZooKeeper上的指定节点,用法如下:
delete path [version]
其中version也是表示数据版本,使用delete /zk-permanent 命令即可删除/zk-permanent节点:
可以看到,已经成功删除/zk-permanent节点。值得注意的是,若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点。
ZooKeeper官方API
ZooKeeper作为一个分布式框架,主要用来解决分布式一致性问题,它提供了简单的分布式原语,并且对多种编程语言提供了API。我们接下来重点关注Java客户端API的使用方式。
ZooKeeper API共包含五个包,分别是:
- org.apache.zookeeper
- org.apache.zookeeper.data
- org.apache.zookeeper.server
- org.apache.zookeeper.server.quorum
- org.apache.zookeeper.server.upgrade
其中org.apache.zookeeper包下包含ZooKeeper类,该类是我们编程时最常用的类。下面我们通过增删改查的示例来演示一下官方API的使用。
准备工作
导入如下依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
建立会话
/**
* 创建会话
*/
public class CreateSession implements Watcher {
private final static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException {
/**
* 客户端可以通过创建⼀个zk实例来连接zk服务器
* new Zookeeper(connectString,sesssionTimeOut,Wather)
* connectString: 连接地址:IP:端⼝
* sessionTimeOut:会话超时时间:单位毫秒
* Watcher:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
ZooKeeper zooKeeper = new ZooKeeper("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
countDownLatch.await();
//表示会话真正建⽴
System.out.println("=========Client Connected to zookeeper==========");
System.out.println(zooKeeper.getState());
zooKeeper.close();
}
@Override
public void process(WatchedEvent watchedEvent) {
// 当连接创建了,服务端发送给客户端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
}
创建节点
/**
* 创建节点
*/
public class CreateNode implements Watcher {
//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
/**
* 客户端可以通过创建⼀个zk实例来连接zk服务器
* new Zookeeper(connectString,sesssionTimeOut,Wather)
* connectString: 连接地址:IP:端⼝
* sessionTimeOut:会话超时时间:单位毫秒
* Watcher:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper =
new ZooKeeper("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183", 5000, new CreateNode());
System.out.println(zooKeeper.getState());
countDownLatch.await();
//表示会话真正建⽴
System.out.println("=========Client Connected to zookeeper==========");
System.out.println(zooKeeper.getState());
// 调用创建节点方法
createNodeSync();
}
@Override
public void process(WatchedEvent watchedEvent) {
// 当连接创建了,服务端发送给客户端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 同步创建节点
*
* @throws Exception
*/
private static void createNodeSync() throws Exception {
/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何⼈
* AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替换。
* OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)-->world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点String node = zookeeper.create(path,data,acl,createMode);
*/
String node_PERSISTENT = zooKeeper
.create("/rubin_persistent", "持久节点内容".getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
String node_PERSISTENT_SEQUENTIAL =
zooKeeper.create("/rubin_persistent_sequential", "持久顺序节点内容".getBytes("utf-8"),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
String node_EPERSISTENT = zooKeeper
.create("/rubin_ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
String node_EPERSISTENT_SEQUENTIAL = zooKeeper
.create("/rubin_ephemeral_sequential", "临时顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取节点数据
System.out.println("创建的持久节点是:" + node_PERSISTENT);
System.out.println("创建的持久顺序节点是:" + node_PERSISTENT_SEQUENTIAL);
System.out.println("创建的临时节点是:" + node_EPERSISTENT);
System.out.println("创建的临时顺序节点是:" + node_EPERSISTENT_SEQUENTIAL);
zooKeeper.close();
}
}
删除节点
public class DeleteNode implements Watcher {
private static ZooKeeper zooKeeper;
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
/**
* 客户端可以通过创建⼀个zk实例来连接zk服务器
* new Zookeeper(connectString,sesssionTimeOut,Wather)
* connectString: 连接地址:IP:端⼝
* sessionTimeOut:会话超时时间:单位毫秒
* Watcher:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper =
new ZooKeeper("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183", 5000, new DeleteNode());
countDownLatch.await();
deleteNodeSync();
}
@Override
public void process(WatchedEvent watchedEvent) {
// 当连接创建了,服务端发送给客户端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("=========Client Connected to zookeeper==========");
countDownLatch.countDown();
}
}
private static void deleteNodeSync() throws KeeperException, InterruptedException {
/*
* zooKeeper.exists(path,watch) :判断节点是否存在
* zookeeper.delete(path,version) : 删除节点
*/
Stat exists = zooKeeper.exists("/rubin_persistent", false);
System.out.println(exists == null? "该节点不存在" : "该节点存在");
zooKeeper.delete("/rubin_persistent", -1);
Stat exists2 = zooKeeper.exists("/rubin_persistent", false);
System.out.println(exists2 == null? "该节点不存在" : "该节点存在");
zooKeeper.close();
}
}
获取子节点以及节点数据
public class GetNodeData implements Watcher {
private static ZooKeeper zooKeeper;
private static Boolean connected = false;
public static void main(String[] args) throws IOException, InterruptedException {
/**
* 客户端可以通过创建⼀个zk实例来连接zk服务器
* new Zookeeper(connectString,sesssionTimeOut,Wather)
* connectString: 连接地址:IP:端⼝
* sessionTimeOut:会话超时时间:单位毫秒
* Watcher:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper =
new ZooKeeper("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183", 5000, new GetNodeData());
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent watchedEvent) {
// 子节点列表发生变化时,服务器会发出NodeChildrenChanged通知,但不会把变化情况告诉给客户端
// 需要客户端自行获取,且通知是⼀次性的,需反复注册监听
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
// 再次获取节点数据
try {
getChildren(watchedEvent.getPath());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (watchedEvent.getType() == EventType.NodeDataChanged) {
try {
// 调用获取单个节点数据方法
getNoteData(watchedEvent.getPath());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
// 当连接创建了,服务端发送给客户端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
if (!connected) {
System.out.println("=========Client Connected to zookeeper==========");
try {
getNoteData(null);
getChildren(null);
} catch (Exception e) {
e.printStackTrace();
}
connected = true;
}
}
}
private static void getNoteData(String path) throws Exception {
/**
* path : 获取数据的路径
* watch : 是否开启监听
* stat : 节点状态信息
* null: 表示获取最新版本的数据
* zk.getData(path, watch, stat);
*/
if (StringUtils.isBlank(path)) {
path = "/rubin_persistent";
}
byte[] data = zooKeeper.getData(path, true, null);
System.out.println(new String(data, "utf-8"));
}
private static void getChildren(String path) throws KeeperException,
InterruptedException {
/**
* path:路径
* watch:是否要启动监听,当子节点列表发生变化,会触发监听
* zooKeeper.getChildren(path, watch);
*/
if (StringUtils.isBlank(path)) {
path = "/rubin_persistent";
}
List<String> children = zooKeeper.getChildren(path, true);
System.out.println(children);
}
}
修改节点数据
public class UpdateNode implements Watcher {
private static ZooKeeper zooKeeper;
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
/**
* 客户端可以通过创建⼀个zk实例来连接zk服务器
* new Zookeeper(connectString,sesssionTimeOut,Wather)
* connectString: 连接地址:IP:端⼝
* sessionTimeOut:会话超时时间:单位毫秒
* Watcher:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper =
new ZooKeeper("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183", 5000, new UpdateNode());
countDownLatch.await();
updateNodeSync();
}
@Override
public void process(WatchedEvent watchedEvent) {
// 当连接创建了,服务端发送给客户端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("=========Client Connected to zookeeper==========");
countDownLatch.countDown();
}
}
private static void updateNodeSync() throws Exception {
/**
* path:路径
* data:要修改的内容 byte[]
* version:为-1,表示对最新版本的数据进⾏修改
* zooKeeper.setData(path, data,version);
*/
byte[] data = zooKeeper.getData("/rubin_persistent", false, null);
System.out.println("修改前的值:" + new String(data));
// 修改 stat:状态信息对象 -1:最新版本
Stat stat = zooKeeper.setData("/rubin_persistent", "客户端修改内容".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/rubin_persistent", false, null);
System.out.println("修改后的值:" + new String(data2));
zooKeeper.close();
}
}
ZkClient开源客户端
ZkClient是Github上⼀个开源的ZooKeeper客户端,在ZooKeeper原生API接口之上进行了包装,是⼀个更易用的ZooKeeper客户端。同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。
准备工作
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
创建连接
public class CreateSession {
/**
* 创建⼀个zkClient实例来进行连接 注意:zkClient通过对zookeeperAPI内部包装,将这个异步的会话创建过程同步化了
*
* @param args
*/
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183");
System.out.println("ZooKeeper session established.");
zkClient.close();
}
}
创建节点
public class CreateNode {
/**
* 创建⼀个zkClient实例来进行连接 注意:zkClient通过对zookeeperAPI内部包装,将这个异步的会话创建过程同步化了
*
* @param args
*/
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183");
System.out.println("ZooKeeper session established.");
// 后面的参数标明如果父节点不存在会递归创建父节点
zkClient.createPersistent("/rubin-zkClient/rubin-c1", true);
System.out.println("success create znode.");
zkClient.close();
}
}
删除节点
public class DeleteNode {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183");
System.out.println("ZooKeeper session established.");
// 如果该节点下存在子节点 会自动地柜删除所有子节点之后 删除该节点
zkClient.deleteRecursive("/rubin-zkClient");
System.out.println("success delete znode.");
zkClient.close();
}
}
获取子节点列表
public class GetChildrenNodes {
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient = new ZkClient("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183");
System.out.println("ZooKeeper session established.");
//注册监听事件
zkClient.subscribeChildChanges("/rubin-zkClient", new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
System.out.println(parentPath + " 's child changed, currentChildren:" + currentChildren);
}
});
zkClient.createPersistent("/rubin-zkClient");
Thread.sleep(1000);
zkClient.createPersistent("/rubin-zkClient/c1");
Thread.sleep(1000);
zkClient.delete("/rubin-zkClient/c1");
Thread.sleep(1000);
zkClient.delete("/rubin-zkClient");
zkClient.close();
}
}
获取节点数据(包含了节点的创建、更新和删除)
public class GetNodeData {
public static void main(String[] args) throws InterruptedException {
String path = "/rubin-zkClient";
ZkClient zkClient = new ZkClient("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183");
System.out.println("ZooKeeper session established.");
if (!zkClient.exists(path)) {
zkClient.createPersistent(path);
}
// 注册监听
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataChange(String path, Object data) throws Exception {
System.out.println(path + "该节点内容被更新,更新后的内容" + data);
}
public void handleDataDeleted(String s) throws Exception {
System.out.println(s + " 该节点被删除");
}
});
// 获取节点内容
Object o = zkClient.readData(path);
System.out.println(o);
// 更新
zkClient.writeData(path, "4567");
Thread.sleep(1000);
// 删除
zkClient.delete(path);
Thread.sleep(1000);
zkClient.close();
}
}
Curator客户端
Curator 是Netflix公司开源的⼀套ZooKeeper客户端框架和ZKClient⼀样,Curator解决了很多 ZooKeeper 客户端⾮常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等。该客户端是最流行的 ZooKeeper客户端之⼀。从编码风格上来讲,它提供了基于Fluent的编程风格支持。
准备工作
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
创建连接
public class CreateSession {
public static void main(String[] args) {
/**
* retryPolicy:失败重试策略 包含ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)
* ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
* baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1,random.nextInt(1<<(retryCount+1)))
* maxRetries:最⼤重试次数
* maxSleepMs:最⼤sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒。
* 其他,查看org.apache.curator.RetryPolicy接⼝的实现类
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 工厂模式打开客户端
CuratorFramework client = CuratorFrameworkFactory.newClient("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183", 5000, 3000, retryPolicy);
// 启动客户端
client.start();
System.out.println("Zookeeper session1 established. ");
// 构造者模式构造客户端细节
CuratorFramework client1 = CuratorFrameworkFactory.builder()
.connectString("10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183") //server地址
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(3000) // 连接超时时间
.retryPolicy(retryPolicy) // 重试策略
.namespace("base") // 独⽴命名空间/base 即客户端对Zookeeper上数据节点的任何操作都是相对/base⽬录进⾏的,这有利于实现不同的Zookeeper的业务之间的隔离
.build();
// 启动客户端
client1.start();
System.out.println("Zookeeper session2 established. ");
client.close();
client1.close();
}
}
创建节点
public class CreateNode {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(
"10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183") //server地址
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(3000) // 连接超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 5)) // 重试策略
.build();
client.start();
System.out.println("Zookeeper session established. ");
// 添加节点
String path = "/rubin-curator/c1";
// 创建一个持久并带有数据的节点 如果没有父节点则自动递归创建
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
Thread.sleep(1000);
System.out.println("success create znode" + path);
client.close();
}
}
删除节点
public class DeleteNode {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(
"10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183") //server地址
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(3000) // 连接超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 5)) // 重试策略
.build();
client.start();
System.out.println("Zookeeper session established. ");
// 删除最新版本的节点 并递归删除其下面的所有子节点
client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath("/rubin-curator");
System.out.println("success create znode /rubin-curator");
client.close();
}
}
获取节点数据
public class GetNodeData {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(
"10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183") //server地址
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(3000) // 连接超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 5)) // 重试策略
.build();
client.start();
System.out.println("Zookeeper session established. ");
// 添加节点
String path = "/rubin-curator/c1";
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
System.out.println("success create znode" + path);
// 获取节点数据
Stat stat = new Stat();
byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
System.out.println(new String(bytes));
client.close();
}
}
更新节点数据
public class UpdateNode {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(
"10.10.8.13:2181,10.10.8.13:2182,10.10.8.13:2183") //server地址
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(3000) // 连接超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 5)) // 重试策略
.build();
client.start();
System.out.println("Zookeeper session established. ");
String path = "/rubin-curator/c1";
// 获取节点数据
Stat stat = new Stat();
byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
System.out.println(new String(bytes));
// 更新节点数据
int version = client.setData().withVersion(stat.getVersion()).forPath(path).getVersion();
System.out.println("Success set node for : " + path + ", new version: "+version);
client.setData().withVersion(version).forPath(path).getVersion();
client.close();
}
}
以上就是本博文的全部内容,欢迎留言交流~~~
文章评论