文章13 | 阅读 5409 | 点赞0
zookeeper 可以通过java api连接操作,进行ZNode的创建删除,数据的获取设置,子节点的获取已经状态的观察,权限的设置。并且zookeeper客户端提供了异步操作,并有监听机制,更为方便的提供对zookeeper数据的监听和维护。
上一篇文章提到的可以通过控制命令行操作zookeeper数据节点,同样,zookeeper官方为我们提供了相应的java api 操作。
https://blog.csdn.net/qq_33513250/article/details/101618808
客户端连接后我们可以做如下操作:
创建znode、获取数据、设置数据、获取子节点、权限设置、节点存在判断、删除节点
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
package com.szwn.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class CreateNode implements Watcher{
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreateNode());
System.out.println(zookeeper.getState());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为persistent
String path1 = zookeeper.create("/create-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("Success create znode: " + path1);
// 4.创建节点,节点类型为ephemeral-sequential
String path2 = zookeeper.create("/create-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode: " + path2);
}
/**
* WatchedEvent 事件监听
* 服务器回调监听器连接成功
* @param event
*/
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
package com.szwn.zk;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class GetData implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();
public static void main(String[] args) throws Exception {
String path = "/zk-book";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new GetData());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为ephemeral,数据为123
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
// 4.获取数据,且为监听模式,并把数据状态存入 stat中
System.out.println("the data of znode " + path + " is : " + new String(zk.getData(path, true, stat)));
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
// 5.根据版本号-1 修改数据1234
zk.setData(path, "1234".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 连接监听,连接状态,空事件则为连接session成功
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == Event.EventType.NodeDataChanged) {
// node data changed 监听
try {
System.out.println("the data of znode " + event.getPath() + " is : " + new String(zk.getData(event.getPath(), true, stat)));
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
} catch (Exception e) {
}
}
}
}
}
package com.szwn.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class SetData implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book-set";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new SetData());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为ephemeral,数据为123
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
// 4.获取数据,且为监听模式
zk.getData(path, true, null);
// 5.根据版本号-1 修改数据456
Stat stat = zk.setData(path, "456".getBytes(), -1);
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
// 6.根据版本号,上一次修改后的版本号,设置数据456
Stat stat2 = zk.setData(path, "456".getBytes(), stat.getVersion());
System.out.println("czxID: " + stat2.getCzxid() + ", mzxID: " + stat2.getMzxid() + ", version: " + stat2.getVersion());
try {
// 7.错误版本号设置数据,报错
zk.setData(path, "456".getBytes(), stat.getVersion());
} catch (KeeperException e) {
System.out.println("Error: " + e.code() + "," + e.getMessage());
}
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 连接监听,连接状态,空事件则为连接session成功
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
package com.szwn.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class GetChildren implements Watcher{
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
String path = "/zk-book-1";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new GetChildren());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为persistent,数据为空
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
// 4.创建子节点,路径为path/c1,节点类型为ephemeral,数据为空
zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c1");
// 5.获取path的子节点,并监听变化
List<String> childrenList = zk.getChildren(path, true);
System.out.println(childrenList);
// 4.创建子节点,路径为path/c2,节点类型为ephemeral,数据为空
zk.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c2");
Thread.sleep(1000);
// 4.创建子节点,路径为path/c3,节点类型为ephemeral,数据为空
zk.create(path + "/c3", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c3");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 连接监听,连接状态,空事件则为连接session成功
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
// 监听node children changed event
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));
} catch (Exception e) {
}
}
}
}
}
package com.szwn.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class Auth {
final static String PATH = "/zk-book-auth_test";
public static void main(String[] args) throws Exception {
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, null);
// 2.连接添加权限信息,scheme和data
zookeeper1.addAuthInfo("digest", "foo:true".getBytes());
// 3.创建节点,数据为init
// 权限为 CREATOR_ALL_ACL(This ACL gives the creators authentication id's all permissions.)
zookeeper1.create(PATH, "init".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + PATH);
// 4.创建第二个session
ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, null);
// 5.获取第一个数据
byte[] data = zookeeper2.getData(PATH, false, null);
System.out.println(new String(data));
}
}
第二个客户端没有path的权限,运行结果为
package com.szwn.zk;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Exist implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new Exist());
// 2.等待连接完成
connectedSemaphore.await();
// 3.判断path节点是否存在,并启动监听
Stat exists = zk.exists(path, true);
System.out.println(exists);
// 4.创建path节点并设置数据为123
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData(path, "123".getBytes(), -1);
// 5.创建path/c1子节点
zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
// 6.删除path/c1子节点
zk.delete(path + "/c1", -1);
zk.delete(path, -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
try {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 连接监听,连接状态,空事件则为连接session成功
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
// node created 监听,因为对节点的操作监听为下一次有效,所以设置监听为true,继续监听节点下次状态变化
} else if (Event.EventType.NodeCreated == event.getType()) {
System.out.println("success create znode: " + event.getPath());
zk.exists(event.getPath(), true);
// node deleted 监听
} else if (Event.EventType.NodeDeleted == event.getType()) {
System.out.println("success delete znode: " + event.getPath());
zk.exists(event.getPath(), true);
// node data changed 监听
} else if (Event.EventType.NodeDataChanged == event.getType()) {
System.out.println("data changed of znode: " + event.getPath());
zk.exists(event.getPath(), true);
}
}
} catch (Exception e) {
}
}
}
package com.szwn.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class DeleteNode implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000,
new DeleteNode());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为persistent
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
// 4.创建子节点,节点类型为persistent
zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
try {
// 5.删除父节点,因为有子节点,报错
zk.delete(path, -1);
} catch (Exception e) {
System.out.println("fail to delete znode: " + path);
}
// 6.删除子节点
zk.delete(path + "/c1", -1);
System.out.println("success delete znode: " + path + "/c1");
// 7.删除父节点
zk.delete(path, -1);
System.out.println("success delete znode: " + path);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
运行结果
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_33513250/article/details/102289689
内容来源于网络,如有侵权,请联系作者删除!