Springboot2.3整合Zookeeper3.6实现基本操作

x33g5p2x  于2021-08-25 转载在 Zookeeper  
字(4.5k)|赞(0)|评价(0)|浏览(537)

本文主要介绍Springboot3.4整合Zookeeper3.6版本,需提前安装好zookeeper开发环境,有不清楚的小伙伴,请参考Zookeeper3.6搭建单机版和集群版

1. 引入pom依赖

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.6.0</version>
  5. </dependency>

2. zookeeper配置文件

  1. @Configuration
  2. public class ZookeeperConfig {
  3. //zookeeper连接信息
  4. @Value("${zookeeper.connect.address}")
  5. private String connectStr;
  6. //session超时时间(毫秒)
  7. @Value("${zookeeper.connect.time-out}")
  8. private int timeOut;
  9. private CountDownLatch connectLatch = new CountDownLatch(1);
  10. /** * 初始化zookeeper连接 * @return * @throws IOException * @throws InterruptedException */
  11. @Bean
  12. public ZooKeeper getZkClient() throws IOException, InterruptedException {
  13. ZooKeeper zkClient = new ZooKeeper(connectStr, timeOut, new Watcher() {
  14. @Override
  15. public void process(WatchedEvent event) {
  16. if (event.getState() == Event.KeeperState.SyncConnected) {
  17. connectLatch.countDown();
  18. }
  19. }
  20. });
  21. connectLatch.await();
  22. return zkClient;
  23. }
  24. }

3. Zookeeper接口封装

  1. public interface ZookeeperService {
  2. /** * 创建节点 * @param path 节点路径 * @param data 节点数据 * @param perm 节点权限 * @param nodeType 节点类型 * @return * @throws KeeperException * @throws InterruptedException */
  3. String createNode(String path, String data, ArrayList<ACL> perm, CreateMode nodeType) throws KeeperException, InterruptedException;
  4. /** * 判断节点是否存在 * @param path 节点路径 * @param nodeWatch 是否复用zookeeper中默认的watch * @return * @throws KeeperException * @throws InterruptedException */
  5. Stat existNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException;
  6. /** * 判断节点是否存在 * @param path 节点路径 * @param watcher 监听类型 创建/删除/更新 * @return * @throws KeeperException * @throws InterruptedException */
  7. Stat existNode(String path, Watcher watcher) throws KeeperException, InterruptedException;
  8. /** * 修改节点 * @param path 节点路径 * @param data 节点数据 * @return * @throws KeeperException * @throws InterruptedException */
  9. Stat updateNode(String path, String data) throws KeeperException, InterruptedException;
  10. /** * 删除节点 * @param path 节点路径 * @throws KeeperException * @throws InterruptedException */
  11. void deleteNode(String path) throws KeeperException, InterruptedException;
  12. /** * 获取节点数据 * @param path 节点路径 * @return * @throws KeeperException * @throws InterruptedException */
  13. String getNodeData(String path) throws KeeperException, InterruptedException;
  14. /** * 获取子节点 * @param path 节点路径 * @param nodeWatch 是否复用zookeeper中默认的watch * @return * @throws KeeperException * @throws InterruptedException */
  15. List<String> getChildrenNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException;
  16. /** * 获取子节点 * @param path 节点路径 * @param watcher 监听类型 创建/删除/更新 * @return * @throws KeeperException * @throws InterruptedException */
  17. List<String> getChildrenNode(String path, Watcher watcher) throws KeeperException, InterruptedException;
  18. }

4. Zookeeper接口实现

  1. @Service
  2. public class ZookeeperServiceImpl implements ZookeeperService {
  3. @Autowired
  4. private ZooKeeper zkClient;
  5. @Override
  6. public String createNode(String path, String data, ArrayList<ACL> perm, CreateMode nodeType) throws KeeperException, InterruptedException {
  7. String nodeStr = zkClient.create(path, data.getBytes(), perm, nodeType);
  8. return nodeStr;
  9. }
  10. @Override
  11. public Stat existNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException {
  12. Stat exists = zkClient.exists(path, nodeWatch);
  13. return exists;
  14. }
  15. @Override
  16. public Stat existNode(String path, Watcher watcher) throws KeeperException, InterruptedException {
  17. Stat exists = zkClient.exists(path, watcher);
  18. return exists;
  19. }
  20. /** * 更新节点 * @param path 节点路径 * @param data 节点数据 * version 版本号,如果为-1,表示忽略版本检查 * @return * @throws KeeperException * @throws InterruptedException */
  21. @Override
  22. public Stat updateNode(String path, String data) throws KeeperException, InterruptedException {
  23. Stat stat = zkClient.setData(path, data.getBytes(), 0);
  24. return stat;
  25. }
  26. @Override
  27. public void deleteNode(String path) throws KeeperException, InterruptedException {
  28. zkClient.delete(path, 0);
  29. }
  30. @Override
  31. public String getNodeData(String path) throws KeeperException, InterruptedException {
  32. Stat stat = new Stat();
  33. byte[] data = zkClient.getData(path, true, stat);
  34. String dataStr = new String(data);
  35. return dataStr;
  36. }
  37. @Override
  38. public List<String> getChildrenNode(String path, boolean nodeWatch) throws KeeperException, InterruptedException {
  39. List<String> childrenList = zkClient.getChildren(path, nodeWatch);
  40. return childrenList;
  41. }
  42. @Override
  43. public List<String> getChildrenNode(String path, Watcher watcher) throws KeeperException, InterruptedException {
  44. List<String> childrenList = zkClient.getChildren(path, watcher);
  45. return childrenList;
  46. }
  47. }

5. 节点类型

Zookeeper中节点类型主要有两类,即持久节点(Persistent)和临时节点(Ephemeral),每种节点又分为有序和无序
持久无序节点:客户端与Zookeeper断开连接后,该节点依旧存在
持久有序节点:客户端与Zookeeper端口连接后,该节点依旧存在,Zookeeper会给该节点名称进行顺序编号
临时无序节点:客户端与Zookeeper断开连接后,该节点被删除
临时有序节点:客户端与Zookeeper断开连接后,该节点被删除,Zookeeper会给该节点名称进行顺序编号

6. 客户端向服务端写数据流程

写流程之写入请求发送给Leader节点

写入请求发送给Leader节点

写流程之写入请求发送给follower节点

写入请求发送给follower节点

相关文章

最新文章

更多