zookeeper java api 操作(一) | 同步处理结果

x33g5p2x  于2021-12-20 转载在 其他  
字(12.2k)|赞(0)|评价(0)|浏览(379)

一、zookeeper java api

      zookeeper 可以通过java api连接操作,进行ZNode的创建删除,数据的获取设置,子节点的获取已经状态的观察,权限的设置。并且zookeeper客户端提供了异步操作,并有监听机制,更为方便的提供对zookeeper数据的监听和维护。

     上一篇文章提到的可以通过控制命令行操作zookeeper数据节点,同样,zookeeper官方为我们提供了相应的java api 操作。

      https://blog.csdn.net/qq_33513250/article/details/101618808

      客户端连接后我们可以做如下操作:

           创建znode、获取数据、设置数据、获取子节点、权限设置、节点存在判断、删除节点

二、pom 文件

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

三、同步操作

** 1.创建节点**

package com.szwn.zk;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class CreateNode implements Watcher{
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreateNode());
        System.out.println(zookeeper.getState());
        // 2.等待连接完成
        connectedSemaphore.await();
        
        // 3.创建节点,节点类型为persistent
        String path1 = zookeeper.create("/create-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("Success create znode: " + path1);

        // 4.创建节点,节点类型为ephemeral-sequential
        String path2 = zookeeper.create("/create-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Success create znode: " + path2);
    }

    /**
     * WatchedEvent 事件监听
     * 服务器回调监听器连接成功
     * @param event
     */
    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }
}

 2**.获取数据**

package com.szwn.zk;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class GetData implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk = null;
    private static Stat stat = new Stat();

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new GetData());
        // 2.等待连接完成
        connectedSemaphore.await();
        // 3.创建节点,节点类型为ephemeral,数据为123
        zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path);

        // 4.获取数据,且为监听模式,并把数据状态存入 stat中
        System.out.println("the data of znode " + path + " is : " + new String(zk.getData(path, true, stat)));
        System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());

        // 5.根据版本号-1 修改数据1234
        zk.setData(path, "1234".getBytes(), -1);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            // 连接监听,连接状态,空事件则为连接session成功
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            } else if (event.getType() == Event.EventType.NodeDataChanged) {
                // node data changed 监听
                try {
                    System.out.println("the data of znode " + event.getPath() + " is : " + new String(zk.getData(event.getPath(), true, stat)));
                    System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
                } catch (Exception e) {
                }
            }
        }
    }
}

 3.设置数据

package com.szwn.zk;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class SetData implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk;

    public static void main(String[] args) throws Exception {
        String path = "/zk-book-set";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new SetData());
        // 2.等待连接完成
        connectedSemaphore.await();

        // 3.创建节点,节点类型为ephemeral,数据为123
        zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path);

        // 4.获取数据,且为监听模式
        zk.getData(path, true, null);

        // 5.根据版本号-1 修改数据456
        Stat stat = zk.setData(path, "456".getBytes(), -1);
        System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
        // 6.根据版本号,上一次修改后的版本号,设置数据456
        Stat stat2 = zk.setData(path, "456".getBytes(), stat.getVersion());
        System.out.println("czxID: " + stat2.getCzxid() + ", mzxID: " + stat2.getMzxid() + ", version: " + stat2.getVersion());
        try {
            // 7.错误版本号设置数据,报错
            zk.setData(path, "456".getBytes(), stat.getVersion());
        } catch (KeeperException e) {
            System.out.println("Error: " + e.code() + "," + e.getMessage());
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            // 连接监听,连接状态,空事件则为连接session成功
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            }
        }
    }
}

 4.获取子节点

package com.szwn.zk;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class GetChildren implements Watcher{
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk = null;

    public static void main(String[] args) throws Exception {
        String path = "/zk-book-1";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new GetChildren());
        // 2.等待连接完成
        connectedSemaphore.await();

        // 3.创建节点,节点类型为persistent,数据为空
        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        // 4.创建子节点,路径为path/c1,节点类型为ephemeral,数据为空
        zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/c1");
        // 5.获取path的子节点,并监听变化
        List<String> childrenList = zk.getChildren(path, true);
        System.out.println(childrenList);

        // 4.创建子节点,路径为path/c2,节点类型为ephemeral,数据为空
        zk.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/c2");
        Thread.sleep(1000);
        // 4.创建子节点,路径为path/c3,节点类型为ephemeral,数据为空
        zk.create(path + "/c3", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/c3");
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            // 连接监听,连接状态,空事件则为连接session成功
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
                // 监听node children changed event
            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
                try {
                    System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));
                } catch (Exception e) {
                }
            }
        }
    }
}

5.节点权限控制

package com.szwn.zk;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

public class Auth {
    final static String PATH = "/zk-book-auth_test";

    public static void main(String[] args) throws Exception {

        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, null);
        // 2.连接添加权限信息,scheme和data
        zookeeper1.addAuthInfo("digest", "foo:true".getBytes());
        // 3.创建节点,数据为init
        // 权限为 CREATOR_ALL_ACL(This ACL gives the creators authentication id's all permissions.)
        zookeeper1.create(PATH, "init".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + PATH);
        // 4.创建第二个session
        ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, null);
        // 5.获取第一个数据
        byte[] data = zookeeper2.getData(PATH, false, null);
        System.out.println(new String(data));
    }
}

   第二个客户端没有path的权限,运行结果为

  

6.节点存在

package com.szwn.zk;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class Exist implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk;

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new Exist());
        // 2.等待连接完成
        connectedSemaphore.await();

        // 3.判断path节点是否存在,并启动监听
        Stat exists = zk.exists(path, true);
        System.out.println(exists);

        // 4.创建path节点并设置数据为123
        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.setData(path, "123".getBytes(), -1);

        // 5.创建path/c1子节点
        zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path + "/c1");

        // 6.删除path/c1子节点
        zk.delete(path + "/c1", -1);
        zk.delete(path, -1);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        try {
            if (Event.KeeperState.SyncConnected == event.getState()) {
                // 连接监听,连接状态,空事件则为连接session成功
                if (Event.EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                    // node created 监听,因为对节点的操作监听为下一次有效,所以设置监听为true,继续监听节点下次状态变化
                } else if (Event.EventType.NodeCreated == event.getType()) {
                    System.out.println("success create znode: " + event.getPath());
                    zk.exists(event.getPath(), true);
                    // node deleted 监听
                } else if (Event.EventType.NodeDeleted == event.getType()) {
                    System.out.println("success delete znode: " + event.getPath());
                    zk.exists(event.getPath(), true);
                    // node data changed 监听
                } else if (Event.EventType.NodeDataChanged == event.getType()) {
                    System.out.println("data changed of znode: " + event.getPath());
                    zk.exists(event.getPath(), true);
                }
            }
        } catch (Exception e) {
        }
    }
}

7.删除节点

package com.szwn.zk;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class DeleteNode implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk;

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000,
                new DeleteNode());
        // 2.等待连接完成
        connectedSemaphore.await();

        // 3.创建节点,节点类型为persistent
        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        // 4.创建子节点,节点类型为persistent
        zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path + "/c1");
        try {
            // 5.删除父节点,因为有子节点,报错
            zk.delete(path, -1);
        } catch (Exception e) {
            System.out.println("fail to delete znode: " + path);
        }

        // 6.删除子节点
        zk.delete(path + "/c1", -1);
        System.out.println("success delete znode: " + path + "/c1");
        // 7.删除父节点
        zk.delete(path, -1);
        System.out.println("success delete znode: " + path);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            }
        }
    }
}

运行结果

相关文章