文章13 | 阅读 5133 | 点赞0
上一篇文章学习了用zkClient客户端对zookeeper进行操作,下面我们将学习由Netflix公司的Jordan Zimmerman一套开源的
Zookeeper客户端框架Curator。Curator与zkClient一样解决了底层的细节开发工作,包括session过期重连、反复注册、异常报错
的封装等。除此之外,Curator目前是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和
可读性更强的Fluent风格的客户端API框架,提供了更加多的应用场景(Recipe,共享锁服务、Master选举和分布式计算的封装
等),方便用户使用和开发。目前很多项目均使用Curator作为zookeeper客户端进行开发,如阿里的开源分布式事务管理框架seata。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
package com.szwn.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
public class CreateNode {
public static void main(String[] args) throws Exception {
String path1 = "/curator-create/c1";
// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183").sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).aclProvider(new DefaultACLProvider()).build();
// 开始连接
client.start();
// 创建 EPHEMERAL类型节点,并创建父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
System.out.println("success create znode: " + path1);
String path2 = "/curator-create/c2";
// 创建 EPHEMERAL类型节点,并创建父节点,设置acl权限控制
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path2, "init2".getBytes());
System.out.println("success create znode: " + path2);
}
}
package com.szwn.curator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CreateNodeBackground {
static String path = "/curator-create-backGround/c1";
// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
// 2个信号量,BackGround 回调callBack成功再停止
static CountDownLatch semaphore = new CountDownLatch(2);
// 固定线程池,2个线程
static ExecutorService es = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
// 开始连接
client.start();
System.out.println("Main thread: " + Thread.currentThread().getName());
// 创建 EPHEMERAL类型节点,并创建父节点,使用一部线程background方式处理创建结果
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
// 打印事件信息
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
System.out.println();
semaphore.countDown();
}
}, es).forPath(path, "init".getBytes());
// 创建相同路径节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
// 打印事件信息
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}).forPath(path, "init".getBytes());
semaphore.await();
// 关闭线程池
es.shutdown();
// 关闭连接
client.close();
}
}
一个成功,code为0,一个失败,code-110(节点已经存在)
package com.szwn.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class GetData {
public static void main(String[] args) throws Exception {
String path = "/curator-create-getData/c1";
// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
// 开始连接
client.start();
// 创建 EPHEMERAL类型节点,并创建父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
Stat stat = new Stat();
// 获取节点数据,存储stat在new Stat()中,路径为/curator-create-getData/c1
System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
}
}
package com.szwn.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class SetData {
public static void main(String[] args) throws Exception {
String path = "/curator-create-setData/c1";
// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
// 开始连接
client.start();
// 创建 EPHEMERAL类型节点,并创建父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
Stat stat = new Stat();
// 获取节点数据,存储stat在new Stat()中,路径为/curator-create-setData/c1
client.getData().storingStatIn(stat).forPath(path);
// 设置数据,版本号为获取数据的版本号,路径为/curator-create-setData/c1
Stat stat1 = client.setData().withVersion(stat.getVersion()).forPath(path);
System.out.println("Success set node for : " + path + ", new version: "
+ stat1.getVersion());
try {
// 重新设置数据,版本号为获取数据的版本号,路径为/curator-create-setData/c1
client.setData().withVersion(stat.getVersion()).forPath(path);
} catch (Exception e) {
// 版本不一致报错
System.out.println("Fail set node due to " + e.getMessage());
}
}
}
package com.szwn.curator;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Children {
public static void main(String[] args) throws Exception {
String path = "/curator-children";
String path1 = path + "/c1";
// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183").sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).aclProvider(new DefaultACLProvider()).build();
// 开始连接
client.start();
// 创建 EPHEMERAL类型节点,并创建父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
System.out.println("success create znode: " + path1);
String path2 = path + "/c2";
// 创建 EPHEMERAL类型节点,并创建父节点,设置acl权限控制
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path2, "init2".getBytes());
System.out.println("success create znode: " + path2);
// 获取/curator-children子节点
List<String> list = client.getChildren().forPath(path);
System.out.println(list);
}
}
package com.szwn.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.CreateMode;
public class EnsureZkPath {
static String path = "/curator_ensure_path";
// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
org.apache.curator.utils.EnsurePath ensurePath = new org.apache.curator.utils.EnsurePath(path);
// first time syncs and creates if needed
ensurePath.ensure(client.getZookeeperClient());
// 直接创建子节点path1,因为前面ensure没有的话,自动创建了path节点
String path1 = path + "/c1";
client.create().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
System.out.println("success create znode: " + path1);
// subsequent times are NOPs,path已经存在,不做任何操作
ensurePath.ensure(client.getZookeeperClient());
// 直接创建子节点path2
String path2 = path + "/c2";
client.create().withMode(CreateMode.EPHEMERAL).forPath(path2, "init2".getBytes());
System.out.println("success create znode: " + path1);
}
}
package com.szwn.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZKPaths.PathAndNode;
import org.apache.zookeeper.ZooKeeper;
public class ZKPath {
static String path = "/curator_zkPath";
// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
// 开始连接
client.start();
// 获取原生zookeeper
ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();
// 修改namespace为/curator_zkPath,节点路径为sub
System.out.println(org.apache.curator.utils.ZKPaths.fixForNamespace(path, "sub"));
// 在父节点path下创建子节点sub
System.out.println(org.apache.curator.utils.ZKPaths.makePath(path, "sub"));
// 获取节点路径
System.out.println(org.apache.curator.utils.ZKPaths.getNodeFromPath("/curator_zkPath/sub1"));
// 获取path 和 node
PathAndNode pn = org.apache.curator.utils.ZKPaths.getPathAndNode("/curator_zkPath/sub1");
System.out.println(pn.getPath());
System.out.println(pn.getNode());
String dir1 = path + "/child1";
String dir2 = path + "/child2";
// 创建节点,没有数据
org.apache.curator.utils.ZKPaths.mkdirs(zookeeper, dir1);
org.apache.curator.utils.ZKPaths.mkdirs(zookeeper, dir2);
// 获取子节点
System.out.println(org.apache.curator.utils.ZKPaths.getSortedChildren(zookeeper, path));
// 获取子节点,同时删除本身
org.apache.curator.utils.ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
}
}
package com.szwn.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class DeleteData {
public static void main(String[] args) throws Exception {
String path = "/zk-curator";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183").sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
String path1 = path + "/c1";
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
String path2 = path + "/c2";
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path2, "init".getBytes());
Stat stat = new Stat();
// 查看path 下的子节点
System.out.println(client.getChildren().forPath(path));
// delete node and delete all children
client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
System.out.println("success delete znode " + path);
client.close();
Thread.sleep(Integer.MAX_VALUE);
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_33513250/article/details/102677481
内容来源于网络,如有侵权,请联系作者删除!