IT俱乐部 Python 基于Mongodb分布式锁解决定时任务并发执行问题

基于Mongodb分布式锁解决定时任务并发执行问题

前言

我们日常开发过程,会有一些定时任务的代码来统计一些系统运行数据,但是我们应用有需要部署多个实例,传统的通过配置文件来控制定时任务是否启动又太过繁琐,而且还经常出错,导致一些异常数据的产生

网上有很多分布式锁的实现方案,基于redis、zk、等有很多,但是我的就是一个用了mysql和mongo的小应用,不准备引入其他三方中间件来解决这个问题,撸一个简单的分布式锁来解决定时任务并发执行的问题,加锁操作的原子性和防死锁也都要支持,这里我使用mongodb写了AllInOne的工具类

All in one Code

先上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@Component
@Slf4j
public class MongoDBLock {
 
    private static final int DEFAULT_LOCK_TIMEOUT = 30;//锁的默认超时时间,单位秒
 
    private MongoTemplate mongoTemplate;
    private int lockTimeout;
 
    public MongoDBLock(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
        this.lockTimeout = DEFAULT_LOCK_TIMEOUT;
    }
 
    /**
     * 尝试获取分布式锁
     *
     * @param lockKey 锁的key
     * @return true:获取锁成功,false:获取锁失败
     */
    private boolean acquireLock(String lockKey) {
        LockDocument document = new LockDocument();
        document.setId(lockKey);
        document.setExpireAt(Instant.ofEpochMilli(Instant.now().toEpochMilli() + lockTimeout * 1000));
        try {
            mongoTemplate.insert(document);
            return true;
        } catch (Exception e) {
 
        }
        return false;
    }
 
    /**
     * 释放分布式锁
     *
     * @param lockKey 锁的key
     */
    private void releaseLock(String lockKey) {
        Query query = new Query(Criteria.where("key").is(lockKey));
        mongoTemplate.remove(query, LockDocument.class);
        log.info("程序执行成功,释放分布式锁,lockKey:{}",lockKey);
    }
 
    /**
     * 分布式锁入口方法,参数lockName为锁的名称,lockKey为需要加锁的key,执行完成后自动释放锁
     *
     * @param lockKey
     * @param task
     * @param
     * @throws Exception
     */
    public  void executeWithLock(String lockKey, ITask task) throws Exception {
        boolean locked = acquireLock(lockKey);
        if (locked) {
            log.info("获取分布式锁成功,lockKey:{}",lockKey);
            try {
                task.execute();
            } finally {
                releaseLock(lockKey);
            }
        } else {
            log.warn("获取分布式锁失败,lockKey:{}", lockKey);
            throw new AppException("获取分布式锁失败!");
        }
    }
 
    @Data
    @Document(collection = "lock_collection")
    static class LockDocument {
        @Id
        private String id;
        @Indexed(expireAfterSeconds = DEFAULT_LOCK_TIMEOUT)
        private Instant expireAt;
    }
 
    @FunctionalInterface
    public interface ITask {
        T execute() throws Exception;
    }
}

调用示例

1
2
3
4
5
6
7
@Resource
MongoDBLock mongoDBLock;
 
mongoDBLock.executeWithLock("key", () -> {
    // do some thing
    return null;
});

原理

  • 使用key作为主键,利用mongodb的insert原子性保障LockDocument不会重复插入
  • LockDocument中expireAt字段利用的mongodb索引过期机制,解决死锁问题,这里设置超时时间是30秒,并在执行完成之后会主动释放锁

到此这篇关于基于Mongodb分布式锁简单实现,解决定时任务并发执行问题的文章就介绍到这了,更多相关Mongodb分布式锁内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!

本文收集自网络,不代表IT俱乐部立场,转载请注明出处。https://www.2it.club/code/python/7764.html
上一篇
下一篇
联系我们

联系我们

在线咨询: QQ交谈

邮箱: 1120393934@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部