IT俱乐部 Redis redis延时队列的项目实践

redis延时队列的项目实践

引入

3.15.5org.redissonredisson-spring-boot-starter${redisson.version}

放入延时队列

  import org.redisson.api.RedissonClient;
  
  @Autowired
  private RedissonClient redissonClient;
  public static final String CardKitMessageDelayQueue = "QUEUE:CARD_KIT";

      // 发送延时消息
      RBlockingDeque blockingDeque = redissonClient
              .getBlockingDeque(CardKitMessageDelayQueue);
      RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
      // 计算时间戳
      long delayInSeconds = calculateDifference(model.getSendTime(), LocalDateTime.now());
      CardKitRedisBo cardKitRedisBo = new CardKitRedisBo();
      cardKitRedisBo.setId(model.getId()).setTemplateId(model.getTemplateId());
      delayedQueue.offer(cardKitRedisBo, delayInSeconds, TimeUnit.SECONDS);

监听延时队列

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class CardKitMessageListener implements ApplicationRunner {

  public static final String CardKitMessageDelayQueue = "QUEUE:CARD_KIT";

  public static final String CardKitMessageDelayLock = "LOCK:CARD_KIT";

  @Resource
  private RedissonClient redissonClient;

  @Autowired
  private Tracer tracer;

  @Autowired
  private CardKitService cardKitService;

  @Override
  public void run(ApplicationArguments args) {
    new Thread(() -> {
      RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(CardKitMessageDelayQueue);
      while (true) {
        // 获取定时任务锁
        RLock rLock = redissonClient.getLock(CardKitMessageDelayLock);
        try {
          // 最多等待5秒
          boolean isLocked = rLock.tryLock(5, TimeUnit.SECONDS);
          if (isLocked) {
            Span span = tracer.nextSpan().name("OccupationMessage").start();
            try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
              CardKitRedisBo poll = blockingDeque.take();
              log.info("获取延时消息:{}", JSONUtil.toJsonStr(poll));
              // 消费消息
              cardKitService.sendCardKit(poll);
            } finally {
              try {
                rLock.unlock();
              } catch (Exception ex) {
                log.warn("锁释放失败:" + ex.getMessage());
              }
              try {
                span.end();
              } catch (Exception ex) {
				log.error("失败", ex)
              }
            }
          }
        } catch (Exception ex) {
          log.error("延迟消息处理异常:" + ex.getMessage(), ex);
        }
      }
    }).start();
  }
}

到此这篇关于redis延时队列的文章就介绍到这了,更多相关redis延时队列内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部! 

本文收集自网络,不代表IT俱乐部立场,转载请注明出处。https://www.2it.club/database/redis/15697.html
上一篇
下一篇
联系我们

联系我们

在线咨询: QQ交谈

邮箱: 1120393934@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部