基于Zookeeper实现分布式锁实践

x33g5p2x  于2021-12-10 转载在 Zookeeper  
字(12.6k)|赞(0)|评价(0)|浏览(469)

基于Zookeeper实现分布式锁实践

1、什么是Zookeeper?

Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件。

引用官网的图例:

特征:

  1. zookeeper的数据机构是一种节点树的数据结构,zNode是基本的单位,znode是一种和unix文件系统相似的节点,可以往这个节点存储或向这个节点获取数据
  2. 通过客户端可以对znode进行数据操作,还可以注册watcher监控znode的改变

2、Zookeeper节点类型

  • 持久节点(Persistent)
  • 持久顺序节点(Persistent_Sequential)
  • 临时节点(Ephemeral)
  • 临时顺序节点(Ephemeral_Sequential)

3、Zookeeper环境搭建

下载zookeeper,官网链接,https://zookeeper.apache.org/releases.html#download,去官网找到对应的软件下载到本地

修改配置文件,${ZOOKEEPER_HOME}\conf,找到zoo_sample.cfg文件,先备份一份,另外一份修改为zoo.cfg

解压后点击zkServer.cmd运行服务端:

4、Zookeeper基本使用

在cmd窗口或者直接在idea编辑器里的terminal输入命令:

  1. zkCli.cmd -server 127.0.0.1:2181

输入命令help查看帮助信息:

  1. ZooKeeper -server host:port -client-configuration properties-file cmd args
  2. addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
  3. addauth scheme auth
  4. close
  5. config [-c] [-w] [-s]
  6. connect host:port
  7. create [-s] [-e] [-c] [-t ttl] path [data] [acl]
  8. delete [-v version] path
  9. deleteall path [-b batch size]
  10. delquota [-n|-b|-N|-B] path
  11. get [-s] [-w] path
  12. getAcl [-s] path
  13. getAllChildrenNumber path
  14. getEphemerals path
  15. history
  16. listquota path
  17. ls [-s] [-w] [-R] path
  18. printwatches on|off
  19. quit
  20. reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
  21. redo cmdno
  22. removewatches path [-c|-d|-a] [-l]
  23. set [-s] [-v version] path data
  24. setAcl [-s] [-v version] [-R] path acl
  25. setquota -n|-b|-N|-B val path
  26. stat [-w] path
  27. sync path
  28. version
  29. whoami

create [-s] [-e] [-c] [-t ttl] path [data] [acl]-s表示顺序节点,-e表示临时节点,若不指定表示持久节点,acl是来进行权限控制的

  1. [zk: 127.0.0.1:2181(CONNECTED) 1] create -s /zk-test 0
  2. Created /zk-test0000000000

查看

  1. [zk: 127.0.0.1:2181(CONNECTED) 4] ls /
  2. [zk-test0000000000, zookeeper]

设置修改节点数据

  1. set /zk-test 123

获取节点数据

  1. get /zk-test

ps,zookeeper命令详情查看help帮助文档,也可以去官网看看文档

ok,然后java写个例子,进行watcher监听

  1. package com.example.concurrent.zkSample;
  2. import org.I0Itec.zkclient.IZkDataListener;
  3. import org.I0Itec.zkclient.ZkClient;
  4. /** * <pre> * Zookeeper 例子 * </pre> * * <pre> * @author mazq * 修改记录 * 修改后版本: 修改人: 修改日期: 2021/12/09 16:57 修改内容: * </pre> */
  5. public class ZookeeperSample {
  6. public static void main(String[] args) {
  7. ZkClient client = new ZkClient("localhost:2181");
  8. client.setZkSerializer(new MyZkSerializer());
  9. client.subscribeDataChanges("/zk-test", new IZkDataListener() {
  10. @Override
  11. public void handleDataChange(String dataPath, Object data) throws Exception {
  12. System.out.println("监听到节点数据改变!");
  13. }
  14. @Override
  15. public void handleDataDeleted(String dataPath) throws Exception {
  16. System.out.println("监听到节点数据被删除了");
  17. }
  18. });
  19. try {
  20. Thread.sleep(1000 * 60 * 2);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }

5、Zookeeper应用场景

Zookeeper有什么典型的应用场景:

  1. 注册中心(Dubbo)
  2. 命名服务
  3. Master选举
  4. 集群管理
  5. 分布式队列
  6. 分布式锁

6、Zookeeper分布式锁

Zookeeper适合用来做分布式锁,然后具体实现是利用什么原理?我们知道zookeeper是类似于unix的文件系统,文件系统我们也知道在一个文件夹下面,会有文件名称不能一致的特性的,也就是互斥的特性。同样zookeeper也有这个特性,在同个znode节点下面,子节点命名不能重复。所以利用这个特性可以来实现分布式锁

业务场景:在高并发的情况下面进行订单场景,这是一个典型的电商场景

自定义的Zookeeper序列化类:

  1. package com.example.concurrent.zkSample;
  2. import org.I0Itec.zkclient.exception.ZkMarshallingError;
  3. import org.I0Itec.zkclient.serialize.ZkSerializer;
  4. import java.io.UnsupportedEncodingException;
  5. public class MyZkSerializer implements ZkSerializer {
  6. private String charset = "UTF-8";
  7. @Override
  8. public byte[] serialize(Object o) throws ZkMarshallingError {
  9. return String.valueOf(o).getBytes();
  10. }
  11. @Override
  12. public Object deserialize(byte[] bytes) throws ZkMarshallingError {
  13. try {
  14. return new String(bytes , charset);
  15. } catch (UnsupportedEncodingException e) {
  16. throw new ZkMarshallingError();
  17. }
  18. }
  19. }

订单编号生成器类,因为SimpleDateFormat是线程不安全的,所以还是要加上ThreadLocal

  1. package com.example.concurrent.zkSample;
  2. import java.text.DateFormat;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. public class OrderCodeGenerator {
  7. private static final String DATE_FORMAT = "yyyyMMddHHmmss";
  8. private static AtomicInteger ai = new AtomicInteger(0);
  9. private static int i = 0;
  10. private static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>() {
  11. @Override
  12. protected SimpleDateFormat initialValue() {
  13. return new SimpleDateFormat(DATE_FORMAT);
  14. }
  15. };
  16. public static DateFormat getDateFormat() {
  17. return (DateFormat) threadLocal.get();
  18. }
  19. public static String generatorOrderCode() {
  20. try {
  21. return getDateFormat().format(new Date(System.currentTimeMillis()))
  22. + i++;
  23. } finally {
  24. threadLocal.remove();
  25. }
  26. }
  27. }

pom.xml加上zookeeper客户端的配置:

  1. <dependency>
  2. <groupId>com.101tec</groupId>
  3. <artifactId>zkclient</artifactId>
  4. <version>0.10</version>
  5. </dependency>

实现一个zookeeper分布式锁,思路是获取节点,这个是多线程竞争的,能获取到锁,也就是创建节点成功,就执行业务,其它抢不到锁的线程,阻塞等待,注册watcher监听锁是否释放了,释放了,取消注册watcher,继续抢锁

  1. package com.example.concurrent.zkSample;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.I0Itec.zkclient.IZkDataListener;
  4. import org.I0Itec.zkclient.ZkClient;
  5. import org.I0Itec.zkclient.exception.ZkNodeExistsException;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.locks.Condition;
  9. import java.util.concurrent.locks.Lock;
  10. @Slf4j
  11. public class ZKDistributeLock implements Lock {
  12. private String localPath;
  13. private ZkClient zkClient;
  14. ZKDistributeLock(String localPath) {
  15. super();
  16. this.localPath = localPath;
  17. zkClient = new ZkClient("localhost:2181");
  18. zkClient.setZkSerializer(new MyZkSerializer());
  19. }
  20. @Override
  21. public void lock() {
  22. while (!tryLock()) {
  23. waitForLock();
  24. }
  25. }
  26. private void waitForLock() {
  27. // 创建countdownLatch协同
  28. CountDownLatch countDownLatch = new CountDownLatch(1);
  29. // 注册watcher监听
  30. IZkDataListener listener = new IZkDataListener() {
  31. @Override
  32. public void handleDataChange(String path, Object o) throws Exception {
  33. //System.out.println("zookeeper data has change!!!");
  34. }
  35. @Override
  36. public void handleDataDeleted(String s) throws Exception {
  37. // System.out.println("zookeeper data has delete!!!");
  38. // 监听到锁释放了,释放线程
  39. countDownLatch.countDown();
  40. }
  41. };
  42. zkClient.subscribeDataChanges(localPath , listener);
  43. // 线程等待
  44. if (zkClient.exists(localPath)) {
  45. try {
  46. countDownLatch.await();
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. // 取消注册
  52. zkClient.unsubscribeDataChanges(localPath , listener);
  53. }
  54. @Override
  55. public void unlock() {
  56. zkClient.delete(localPath);
  57. }
  58. @Override
  59. public boolean tryLock() {
  60. try {
  61. zkClient.createEphemeral(localPath);
  62. } catch (ZkNodeExistsException e) {
  63. return false;
  64. }
  65. return true;
  66. }
  67. @Override
  68. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  69. return false;
  70. }
  71. @Override
  72. public void lockInterruptibly() throws InterruptedException {
  73. }
  74. @Override
  75. public Condition newCondition() {
  76. return null;
  77. }
  78. }

订单服务api

  1. package com.example.concurrent.zkSample;
  2. public interface OrderService {
  3. void createOrder();
  4. }

订单服务实现类,加上zookeeper分布式锁

  1. package com.example.concurrent.zkSample;
  2. import java.util.concurrent.locks.Lock;
  3. public class OrderServiceInvoker implements OrderService{
  4. @Override
  5. public void createOrder() {
  6. Lock zkLock = new ZKDistributeLock("/zk-test");
  7. //Lock zkLock = new ZKDistributeImproveLock("/zk-test");
  8. String orderCode = null;
  9. try {
  10. zkLock.lock();
  11. orderCode = OrderCodeGenerator.generatorOrderCode();
  12. } finally {
  13. zkLock.unlock();
  14. }
  15. System.out.println(String.format("thread name : %s , orderCode : %s" ,
  16. Thread.currentThread().getName(),
  17. orderCode));
  18. }
  19. }

因为搭建分布式环境比较繁琐,所以这里使用juc里的并发协同工具类,CyclicBarrier模拟多线程并发的场景,模拟分布式环境的高并发场景

  1. package com.example.concurrent.zkSample;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. public class ConcurrentDistributeTest {
  5. public static void main(String[] args) {
  6. // 多线程数
  7. int threadSize = 30;
  8. // 创建多线程循环屏障
  9. CyclicBarrier cyclicBarrier = new CyclicBarrier(threadSize , ()->{
  10. System.out.println("准备完成!");
  11. }) ;
  12. // 模拟分布式集群的场景
  13. for (int i = 0 ; i < threadSize ; i ++) {
  14. new Thread(()->{
  15. OrderService orderService = new OrderServiceInvoker();
  16. // 所有线程都等待
  17. try {
  18. cyclicBarrier.await();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } catch (BrokenBarrierException e) {
  22. e.printStackTrace();
  23. }
  24. // 模拟并发请求
  25. orderService.createOrder();
  26. }).start();
  27. }
  28. }
  29. }

跑多几次,没有发现订单号重复的情况,分布式锁还是有点效果的

  1. thread name : Thread-6 , orderCode : 202112100945110
  2. thread name : Thread-1 , orderCode : 202112100945111
  3. thread name : Thread-13 , orderCode : 202112100945112
  4. thread name : Thread-11 , orderCode : 202112100945113
  5. thread name : Thread-14 , orderCode : 202112100945114
  6. thread name : Thread-0 , orderCode : 202112100945115
  7. thread name : Thread-8 , orderCode : 202112100945116
  8. thread name : Thread-17 , orderCode : 202112100945117
  9. thread name : Thread-10 , orderCode : 202112100945118
  10. thread name : Thread-5 , orderCode : 202112100945119
  11. thread name : Thread-2 , orderCode : 2021121009451110
  12. thread name : Thread-16 , orderCode : 2021121009451111
  13. thread name : Thread-19 , orderCode : 2021121009451112
  14. thread name : Thread-4 , orderCode : 2021121009451113
  15. thread name : Thread-18 , orderCode : 2021121009451114
  16. thread name : Thread-3 , orderCode : 2021121009451115
  17. thread name : Thread-9 , orderCode : 2021121009451116
  18. thread name : Thread-12 , orderCode : 2021121009451117
  19. thread name : Thread-15 , orderCode : 2021121009451118
  20. thread name : Thread-7 , orderCode : 2021121009451219

注释加锁的代码,再加大并发数,模拟一下

  1. package com.example.concurrent.zkSample;
  2. import java.util.concurrent.locks.Lock;
  3. public class OrderServiceInvoker implements OrderService{
  4. @Override
  5. public void createOrder() {
  6. //Lock zkLock = new ZKDistributeLock("/zk-test");
  7. //Lock zkLock = new ZKDistributeImproveLock("/zk-test");
  8. String orderCode = null;
  9. try {
  10. //zkLock.lock();
  11. orderCode = OrderCodeGenerator.generatorOrderCode();
  12. } finally {
  13. //zkLock.unlock();
  14. }
  15. System.out.println(String.format("thread name : %s , orderCode : %s" ,
  16. Thread.currentThread().getName(),
  17. orderCode));
  18. }
  19. }

跑多几次,发现出现订单号重复的情况,所以分布式锁是可以保证分布式环境的线程安全的

7、公平式Zookeeper分布式锁

上面例子是一种非公平锁的方式,一旦监听到锁释放了,所有线程都会去抢锁,所以容易出现“惊群效应”

  • 巨大的服务器性能损耗
  • 网络冲击
  • 可能造成宕机

所以,需要改进分布式锁,改成一种公平锁的模式

流程图:

代码改进:

  1. package com.example.concurrent.zkSample;
  2. import org.I0Itec.zkclient.IZkDataListener;
  3. import org.I0Itec.zkclient.ZkClient;
  4. import org.I0Itec.zkclient.exception.ZkNodeExistsException;
  5. import java.util.Collections;
  6. import java.util.List;
  7. import java.util.concurrent.CountDownLatch;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.concurrent.locks.Condition;
  10. import java.util.concurrent.locks.Lock;
  11. public class ZKDistributeImproveLock implements Lock {
  12. private String localPath;
  13. private ZkClient zkClient;
  14. private String currentPath;
  15. private String beforePath;
  16. ZKDistributeImproveLock(String localPath) {
  17. super();
  18. this.localPath = localPath;
  19. zkClient = new ZkClient("localhost:2181");
  20. zkClient.setZkSerializer(new MyZkSerializer());
  21. if (!zkClient.exists(localPath)) {
  22. try {
  23. this.zkClient.createPersistent(localPath);
  24. } catch (ZkNodeExistsException e) {
  25. }
  26. }
  27. }
  28. @Override
  29. public void lock() {
  30. while (!tryLock()) {
  31. waitForLock();
  32. }
  33. }
  34. private void waitForLock() {
  35. CountDownLatch countDownLatch = new CountDownLatch(1);
  36. // 注册watcher
  37. IZkDataListener listener = new IZkDataListener() {
  38. @Override
  39. public void handleDataChange(String dataPath, Object data) throws Exception {
  40. }
  41. @Override
  42. public void handleDataDeleted(String dataPath) throws Exception {
  43. // 监听到锁释放,唤醒线程
  44. countDownLatch.countDown();
  45. }
  46. };
  47. zkClient.subscribeDataChanges(beforePath, listener);
  48. // 线程等待
  49. if (zkClient.exists(beforePath)) {
  50. try {
  51. countDownLatch.await();
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. // 取消注册
  57. zkClient.unsubscribeDataChanges(beforePath , listener);
  58. }
  59. @Override
  60. public void unlock() {
  61. zkClient.delete(this.currentPath);
  62. }
  63. @Override
  64. public boolean tryLock() {
  65. if (this.currentPath == null) {
  66. currentPath = zkClient.createEphemeralSequential(localPath +"/" , "123");
  67. }
  68. // 获取Znode节点下面的所有子节点
  69. List<String> children = zkClient.getChildren(localPath);
  70. // 列表排序
  71. Collections.sort(children);
  72. if (currentPath.equals(localPath + "/" + children.get(0))) { // 当前节点是第1个节点
  73. return true;
  74. } else {
  75. //得到当前的索引号
  76. int index = children.indexOf(currentPath.substring(localPath.length() + 1));
  77. //取到前一个
  78. beforePath = localPath + "/" + children.get(index - 1);
  79. }
  80. return false;
  81. }
  82. @Override
  83. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  84. return false;
  85. }
  86. @Override
  87. public void lockInterruptibly() throws InterruptedException {
  88. }
  89. @Override
  90. public Condition newCondition() {
  91. return null;
  92. }
  93. }
  1. thread name : Thread-13 , orderCode : 202112100936140
  2. thread name : Thread-3 , orderCode : 202112100936141
  3. thread name : Thread-14 , orderCode : 202112100936142
  4. thread name : Thread-16 , orderCode : 202112100936143
  5. thread name : Thread-1 , orderCode : 202112100936144
  6. thread name : Thread-9 , orderCode : 202112100936145
  7. thread name : Thread-4 , orderCode : 202112100936146
  8. thread name : Thread-5 , orderCode : 202112100936147
  9. thread name : Thread-7 , orderCode : 202112100936148
  10. thread name : Thread-2 , orderCode : 202112100936149
  11. thread name : Thread-17 , orderCode : 2021121009361410
  12. thread name : Thread-15 , orderCode : 2021121009361411
  13. thread name : Thread-0 , orderCode : 2021121009361412
  14. thread name : Thread-10 , orderCode : 2021121009361413
  15. thread name : Thread-18 , orderCode : 2021121009361414
  16. thread name : Thread-19 , orderCode : 2021121009361415
  17. thread name : Thread-8 , orderCode : 2021121009361416
  18. thread name : Thread-12 , orderCode : 2021121009361417
  19. thread name : Thread-11 , orderCode : 2021121009361418
  20. thread name : Thread-6 , orderCode : 2021121009361419

8、zookeeper和Redis锁对比?

Redis和Zookeeper都可以用来实现分布式锁,两者可以进行对比:

  • 基于Redis实现分布式锁

  • 实现比较复杂

  • 存在死锁的可能

  • 性能比较好,基于内存 ,而且保证的是高可用,redis优先保证的是AP(分布式CAP理论)

  • 基于Zookeeper实现分布式锁

  • 实现相对简单

  • 可靠性高,因为zookeeper保证的是CP(分布式CAP理论)

  • 性能相对较好 并发1~2万左右,并发太高,还是redis性能好

本博客代码可以在GitHub找到下载链接

附录参考资料

相关文章

最新文章

更多