基于Mongodb分布式锁简单实现,解决定时任务并发执行问题

x33g5p2x  于9个月前 转载在 Go  
字(2.0k)|赞(0)|评价(0)|浏览(1357)

前言

我们日常开发过程,会有一些定时任务的代码来统计一些系统运行数据,但是我们应用有需要部署多个实例,传统的通过配置文件来控制定时任务是否启动又太过繁琐,而且还经常出错,导致一些异常数据的产生

网上有很多分布式锁的实现方案,基于redis、zk、等有很多,但是我的就是一个用了mysql和mongo的小应用,不准备引入其他三方中间件来解决这个问题,撸一个简单的分布式锁来解决定时任务并发执行的问题,加锁操作的原子性和防死锁也都要支持,这里我使用mongodb写了AllInOne的工具类

All in one Code

先上代码

  1. @Component
  2. @Slf4j
  3. public class MongoDBLock {
  4. private static final int DEFAULT_LOCK_TIMEOUT = 30;//锁的默认超时时间,单位秒
  5. private MongoTemplate mongoTemplate;
  6. private int lockTimeout;
  7. public MongoDBLock(MongoTemplate mongoTemplate) {
  8. this.mongoTemplate = mongoTemplate;
  9. this.lockTimeout = DEFAULT_LOCK_TIMEOUT;
  10. }
  11. /**
  12. * 尝试获取分布式锁
  13. *
  14. * @param lockKey 锁的key
  15. * @return true:获取锁成功,false:获取锁失败
  16. */
  17. private boolean acquireLock(String lockKey) {
  18. LockDocument document = new LockDocument();
  19. document.setId(lockKey);
  20. document.setExpireAt(Instant.ofEpochMilli(Instant.now().toEpochMilli() + lockTimeout * 1000));
  21. try {
  22. mongoTemplate.insert(document);
  23. return true;
  24. } catch (Exception e) {
  25. }
  26. return false;
  27. }
  28. /**
  29. * 释放分布式锁
  30. *
  31. * @param lockKey 锁的key
  32. */
  33. private void releaseLock(String lockKey) {
  34. Query query = new Query(Criteria.where("_id").is(lockKey));
  35. mongoTemplate.remove(query, LockDocument.class);
  36. log.info("程序执行成功,释放分布式锁,lockKey:{}",lockKey);
  37. }
  38. /**
  39. * 分布式锁入口方法,参数lockName为锁的名称,lockKey为需要加锁的key,执行完成后自动释放锁
  40. *
  41. * @param lockKey
  42. * @param task
  43. * @param <T>
  44. * @throws Exception
  45. */
  46. public <T> void executeWithLock(String lockKey, ITask<T> task) throws Exception {
  47. boolean locked = acquireLock(lockKey);
  48. if (locked) {
  49. log.info("获取分布式锁成功,lockKey:{}",lockKey);
  50. try {
  51. task.execute();
  52. } finally {
  53. releaseLock(lockKey);
  54. }
  55. } else {
  56. log.warn("获取分布式锁失败,lockKey:{}", lockKey);
  57. throw new AppException("获取分布式锁失败!");
  58. }
  59. }
  60. @Data
  61. @Document(collection = "lock_collection")
  62. static class LockDocument {
  63. @Id
  64. private String id;
  65. @Indexed(expireAfterSeconds = DEFAULT_LOCK_TIMEOUT)
  66. private Instant expireAt;
  67. }
  68. @FunctionalInterface
  69. public interface ITask<T> {
  70. T execute() throws Exception;
  71. }
  72. }

调用示例

  1. @Resource
  2. MongoDBLock mongoDBLock;
  3. mongoDBLock.executeWithLock("key", () -> {
  4. // do some thing
  5. return null;
  6. });

原理

  • 使用key作为主键,利用mongodb的insert原子性保障LockDocument不会重复插入
  • LockDocument中expireAt字段利用的mongodb索引过期机制,解决死锁问题,这里设置超时时间是30秒,并在执行完成之后会主动释放锁

相关文章

最新文章

更多