当前位置:首页 > 科技  > 软件

开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!

来源: 责编: 时间:2024-09-10 09:51:04 44观看
导读前言因为工作中需要用到分布式的延时队列,调研了一段时间,选择使用 Redisson DelayedQueue,为了搞清楚内部运行流程,特记录下来。总体流程大概是图中的这个样子,初看一眼有点不知从何下手,接下来我会通过以下几点来分析流程

前言

因为工作中需要用到分布式的延时队列,调研了一段时间,选择使用 Redisson DelayedQueue,为了搞清楚内部运行流程,特记录下来。dQx28资讯网——每日最新资讯28at.com

总体流程大概是图中的这个样子,初看一眼有点不知从何下手,接下来我会通过以下几点来分析流程,相信看完本文你能了解整个运行流程。dQx28资讯网——每日最新资讯28at.com

  • 基本使用
  • 内部数据结构介绍
  • 基本流程
  • 发送延时消息
  • 获取延时消息
  • 初始化延时队列

图片图片dQx28资讯网——每日最新资讯28at.com

基本使用

发送延迟消息代码如下,发送了一条延迟时间为 5s 的消息。dQx28资讯网——每日最新资讯28at.com

public void produce() {  String queuename = "delay-queue";  RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);  RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);  delayedQueue.offer("测试延迟消息", 5, TimeUnit.SECONDS);}

接收消息代码如下,可以看到 delayedQueue 是没有用到的,那么为什么要加这一行呢,这个后面总结部分回答。dQx28资讯网——每日最新资讯28at.com

public void consume() throws InterruptedException { String queuename = "delay-queue";  RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);  RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);  String msg = blockingQueue.take();  //收到消息进行处理...}

这两段代码可以写在两个不同的 Java 工程里,只要连接的是同一个 Redis 就行。dQx28资讯网——每日最新资讯28at.com

调用 comsume() 之后,如果队列里没有消息,会阻塞等待队列里有消息并且取到了才会返回。之所以这么说是因为可能有别的 Java 进程也在跟你一样取同一个队列里的消息,如果消息被另一个抢完了,那这时就还得阻塞等待。dQx28资讯网——每日最新资讯28at.com

这时看上去的原理是这样的:dQx28资讯网——每日最新资讯28at.com

生产者调用 offer() 后,自己内部开启一个定时器,等到了时间在发送到 redis 的 list 里。dQx28资讯网——每日最新资讯28at.com

图片图片dQx28资讯网——每日最新资讯28at.com


dQx28资讯网——每日最新资讯28at.com

如果是这样设计的话,相信大家都能看出来一个很简单的问题,要是延时时间还没到,生产者自己挂了,那样消息就丢了。所以,还是让我们接着往下看。dQx28资讯网——每日最新资讯28at.com

内部数据结构介绍

redisson 源码里一共创建了三个队列:【消息延时队列】、【消息顺序队列】、【消息目标队列】。dQx28资讯网——每日最新资讯28at.com

图片图片dQx28资讯网——每日最新资讯28at.com


dQx28资讯网——每日最新资讯28at.com

假设在同一时间按照 msg1、msg2、msg3 的顺序发消息到延时队列,这三条消息就会被保存在【消息延时队列】和【消息顺序队列】。dQx28资讯网——每日最新资讯28at.com

可以看到【消息延时队列】的顺序是按照到期时间升序排列的,而不是像【消息顺序队列】按照插入顺序排。dQx28资讯网——每日最新资讯28at.com

消息到期后会将消息从前两个队列移除(怎么移?谁来移?),插入【消息目标队列】,也就是图中第三个队列。dQx28资讯网——每日最新资讯28at.com

消费者也是阻塞在【消息目标队列】上取消息。dQx28资讯网——每日最新资讯28at.com

这时可以简单说明下每个队列的作用:dQx28资讯网——每日最新资讯28at.com

  • 【消息延时队列】利用按照到期时间排序的特性,可以很快找到下一个要到期的消息,客户端内部自己定时到【消息目标队列】取
  • 【消息顺序队列】这个队列对分析的流程关联不大,可以忽略
  • 【消息目标队列】存放到期的消息,供消费端取

其实【消息延时队列】队列里存的时间(也就是 zet 的 score)是到期的时间戳,为了画图方便,图里就画的是延迟的时间,不过不影响理解。dQx28资讯网——每日最新资讯28at.com

理解好这几个队列的名字和作用,后面还会一直用到,如果忘了可以翻回来回顾下。dQx28资讯网——每日最新资讯28at.com

因为书写理解方便和【消息顺序队列】在本文没涉及到,后面部分好几次提到的内容:把到期的消息从【消息延时队列】移到【消息目标队列】里,这句话实际的代码逻辑是这样:把【消息延时队列】和【消息顺序队列】里的到期消息移除,把它们插入到【消息目标队列】。dQx28资讯网——每日最新资讯28at.com

基本流程

知道了内部所使用到的数据结构后,这里可以简单说下整体的基本流程。dQx28资讯网——每日最新资讯28at.com

先说发送延迟消息,发送的延迟消息会先存在【消息延时队列】和【消息顺序队列】,如果【消息延时队列】原本是空的,会发布订阅信息提醒有新的消息。dQx28资讯网——每日最新资讯28at.com

获取延迟消息只需要从【消息目标队列】阻塞的取就行了,因为里面都是到期数据。dQx28资讯网——每日最新资讯28at.com

那么问题就只剩下怎么样判断时间到了,把【消息延时队列】里的消息移动到【消息目标队列】里呢?dQx28资讯网——每日最新资讯28at.com

这部分工作交给了初始化延时队列来处理。dQx28资讯网——每日最新资讯28at.com

这里面会定时从【消息延时队列】查询最新到期时间,定时去把【消息延时队列】里的消息移动到【消息目标队列】里。dQx28资讯网——每日最新资讯28at.com

如果【消息延时队列】是空的,就不会再定时查,而是等待发布订阅信息提醒,再定时把【消息延时队列】里的消息移动到【消息目标队列】里。dQx28资讯网——每日最新资讯28at.com

刚开始看可能有点抽象,可以看完底下一节内容之后,再回头来看这里对应的流程总结,可能会比较清晰。dQx28资讯网——每日最新资讯28at.com

发送延时消息

发送延时消息的逻辑比较简单,先看下发送的代码。dQx28资讯网——每日最新资讯28at.com

public void produce() {  String queuename = "delay-queue";  RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);  RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);  delayedQueue.offer("测试延迟消息", 5, TimeUnit.SECONDS);}

从 delayedQueue.offer 方法开始,最终会执行到 RedissonDelayedQueue 的 offerAsync 方法里。dQx28资讯网——每日最新资讯28at.com

offerAsync 方法的作用就是发送一段脚本给 redis 执行,脚本内容是:dQx28资讯网——每日最新资讯28at.com

  1. 将消息和到期时间插入【消息延时队列】和【消息顺序队列】
  2. 如果最近到期的消息是刚刚插入的消息,则对指定主题发布到期时间,目的是为了让客户端定时去把【消息延时队列】里的到期数据移动到【消息目标队列】
@Overridepublic RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {  if (delay < 0) {   throw new IllegalArgumentException("Delay can't be negative");  }  long delayInMs = timeUnit.toMillis(delay);  long timeout = System.currentTimeMillis() + delayInMs;  long randomId = ThreadLocalRandom.current().nextLong();  return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,  "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"   + "redis.call('zadd', KEYS[2], ARGV[1], value);"  + "redis.call('rpush', KEYS[3], value);"  // if new object added to queue head when publish its startTime   // to all scheduler workers   + "local v = redis.call('zrange', KEYS[2], 0, 0); "  + "if v[1] == value then "  + "redis.call('publish', KEYS[4], ARGV[1]); "  + "end;",  Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),  timeout, randomId, encode(e));}

获取延时消息

获取延时消息是本文最简单的一部分。dQx28资讯网——每日最新资讯28at.com

public void consume() throws InterruptedException {  String queuename = "delay-queue";  RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);  RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);  String msg = blockingQueue.take();  //收到消息进行处理...}

blockingQueue.take() 方法其实只是对【消息目标队列】执行 blpop 阻塞的获取到期消息dQx28资讯网——每日最新资讯28at.com

初始化延时队列

看一下初始化的代码。dQx28资讯网——每日最新资讯28at.com

public void init() {    String queuename = "delay-queue";    RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);}

入口就是在 redissonClient.getDelayedQueue(blockingQueue) 中,创建了 RedissonDelayedQueue 对象,并执行了构造方法里的逻辑。dQx28资讯网——每日最新资讯28at.com

那么这里面主要做了什么事呢?dQx28资讯网——每日最新资讯28at.com

主要是调用了 QueueTransferTask 的 start() 方法。dQx28资讯网——每日最新资讯28at.com

public void start() {  RTopic schedulerTopic = getTopic();  statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {      @Override    public void onSubscribe(String channel) {      pushTask();    } }); messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {      @Override      public void onMessage(CharSequence channel, Long startTime) {     scheduleTask(startTime);   } });}

这段代码主要是设置了指定主题(主题名:redisson_delay_queue_channel:{queuename})两个发布订阅的监听器。dQx28资讯网——每日最新资讯28at.com

  1. 当指定主题有新订阅时调用 pushTask() 方法,里面又会调用 pushTaskAsync() 方法
  2. 当指定主题有新消息时调用 scheduleTask(startTime) 方法

需要注意的是,这里会先订阅指定主题,然后触发执行 onSubscribe() 方法。dQx28资讯网——每日最新资讯28at.com

所以我们主要搞懂这三个方法都是做什么的,那么整个初始化流程就明白了。dQx28资讯网——每日最新资讯28at.com

因为这三个方法是相互调用的,只看文字的话容易云里雾里,这里有个流程图,看方法解释文字的时候可以对照着流程图看比较有印象。dQx28资讯网——每日最新资讯28at.com

图片图片dQx28资讯网——每日最新资讯28at.com

三个方法调用流程图.drawio.pngdQx28资讯网——每日最新资讯28at.com

  • scheduleTask()
    这个方法看起来多,但核心内容就是根据方法参数指定的时间调用 pushTask()。
private void scheduleTask(final Long startTime) {  TimeoutTask oldTimeout = lastTimeout.get();  if (startTime == null) {    return;  }  if (oldTimeout != null) {    oldTimeout.getTask().cancel();  }  long delay = startTime - System.currentTimeMillis();  if (delay > 10) {    Timeout timeout = connectionManager.newTimeout(new TimerTask() {                          @Override      public void run(Timeout timeout) throws Exception {        pushTask();        TimeoutTask currentTimeout = lastTimeout.get();        if (currentTimeout.getTask() == timeout) {          lastTimeout.compareAndSet(currentTimeout, null);        }      }    }, delay, TimeUnit.MILLISECONDS);    if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {      timeout.cancel();    }  } else {    pushTask();  }}
  • pushTaskAsync()
    这个方法是抽象方法,在创建 RedissonDelayedQueue 对象的时候传进来的,代码如下:
@Overrideprotected RFuture<Long> pushTaskAsync() {  return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,  "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "  + "if #expiredValues > 0 then "  + "for i, v in ipairs(expiredValues) do "  + "local randomId, value = struct.unpack('dLc0', v);"  + "redis.call('rpush', KEYS[1], value);"  + "redis.call('lrem', KEYS[3], 1, v);"  + "end; "  + "redis.call('zrem', KEYS[2], unpack(expiredValues));"  + "end; "  // get startTime from scheduler queue head task  + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "  + "if v[1] ~= nil then "  + "return v[2]; "  + "end "  + "return nil;",  Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),  System.currentTimeMillis(), 100);}

看不懂也不要紧,听我解释下就明白了。dQx28资讯网——每日最新资讯28at.com

这里发送了一段脚本给 redis 执行:dQx28资讯网——每日最新资讯28at.com

我的理解就是初始化的时候dQx28资讯网——每日最新资讯28at.com

1是为了处理旧的消息,比如生产者1发送了消息,然后时间没到自己下线了,这时如果没有其他客户端在线,就没有人能把数据从【消息目标队列】移到【消息目标队列】了。dQx28资讯网——每日最新资讯28at.com

2是返回的这个时间戳,会拿这个定时,等时间到了去【消息目标队列】拉去到期的消息。dQx28资讯网——每日最新资讯28at.com

简单总结就是这个方法是把到期消息从【消息延时队列】放到【消息目标队列】里,并且返回了最近要到期消息的时间戳。dQx28资讯网——每日最新资讯28at.com

从【消息延时队列】取出前一百条到期的消息,如果有的话,添加到【消息目标队列】里,并将这些消息从【消息延时队列】和【消息顺序队列】中移除dQx28资讯网——每日最新资讯28at.com

从【消息延时队列】取出下一条要到期的消息,返回它的到期时间戳(如果队列里没消息返回空)。dQx28资讯网——每日最新资讯28at.com

  • pushTask()
private void pushTask() {  RFuture<Long> startTimeFuture = pushTaskAsync();  startTimeFuture.whenComplete((res, e) -> {    if (e != null) {      if (e instanceof RedissonShutdownException) {        return;      }      log.error(e.getMessage(), e);      scheduleTask(System.currentTimeMillis() + 5 * 1000L);      return;    }    if (res != null) {      scheduleTask(res);    }  });}

这个代码看起来就比较简单,调用了 pushTaskAsync() 获取最近要到期消息的时间戳(异步封装了一下)。dQx28资讯网——每日最新资讯28at.com

有异常的话就调用 scheduleTask() 五秒后再执行一次 pushTask()。dQx28资讯网——每日最新资讯28at.com

没有异常的话如果有最近要到期消息的时间戳(说明【消息延时队列】里还有未到期消息),用这个最新到期时间调用 scheduleTask(),在这个指定的时间调用 pushTask()。dQx28资讯网——每日最新资讯28at.com

这个方法简单总结就是决定了要不要调用、什么时候再调用 pushTask(),主要操作逻辑都在 pushTaskAsync() 里(把到期的消息从【消息延时队列】移到【消息目标队列】供消费端消费)。dQx28资讯网——每日最新资讯28at.com

了解了上面几个方法的流程和含义,还记得一开头提到的添加了两个发布订阅的监听器吗?dQx28资讯网——每日最新资讯28at.com

1.当指定主题有新订阅时调用 pushTask() 方法,里面又会调用 pushTaskAsync() 方法dQx28资讯网——每日最新资讯28at.com

2.当指定主题有新消息时调用 scheduleTask(startTime) 方法dQx28资讯网——每日最新资讯28at.com

需要注意的是,这里会先订阅指定主题,然后触发执行 onSubscribe() 方法dQx28资讯网——每日最新资讯28at.com

  1. 在初始化延时队列刚启动的时候,处理到期旧数据:把到期的消息从【消息延时队列】移到【消息目标队列】供消费端消费;处理新数据:获取下次到期时间决定下次调用 pushTask() 的时间。
    上面讲的这种情况是站在当前客户端的视角,但毕竟这是监听订阅信息,如果启动不止一个客户端的话(就算是1个生产者1个消费者,也算两个客户端),总有一个客户端的订阅信息回调函数,会不会有问题?
    仔细想想是没有的,处理到期旧数据:之前启动的客户端已经处理完了;处理新数据:获取最近到期时间,在 scheduleTask() 里,如果之前有正在定时的任务,会把原来正在定时的任务取消掉。这个被取消的任务,时间要么就是当前这个时间,要嘛是之后的时间,取消掉不会影响逻辑。
  2. 为了应对原本【消息延时队列】里没消息了这种情况,流程结束了,重启定时去调用 pushTask() ,把到期的消息从【消息延时队列】移到【消息目标队列】供消费端消费。

总结

再放一下开头的图总体流程图:dQx28资讯网——每日最新资讯28at.com

图片图片dQx28资讯网——每日最新资讯28at.com

  1. 初始化延时队列时会把【消息延时队列】里的到期数据移动到【消息目标队列】,没有也有可能;然后是找最近要到期的消息时间,定时去拉,这个刚启动也是可能没有的,不过不要紧,这两步是为了处理滞留在【消息延时队列】的旧数据(在发送了延时消息后,还没到期时所有客户端都下线了,这样就没人能把【消息延时队列】里的到期数据移动到【消息目标队列】里,就会出现这种情况);
    最主要的还是设置了发布订阅监听器,当有人发送延时消息的时候能收到通知,定时去将【消息延时队列】里的到期数据移动到【消息目标队列】。
  2. 发送延时消息会先发送到【消息延时队列】和【消息顺序队列】,如果【消息延时队列】里没有数据,则将刚发送的到期时间发布到指定主题,提醒其他客户端有新消息。
  3. 初始化延时队列时设置的发布订阅监听器把【消息延时队列】里的到期数据移动到【消息目标队列】里。
  4. 获取延迟消息只需要执行 blpop 阻塞的获取【消息目标队列】的消息就可以了。

这里回答开头部分说的问题,到这看完了本文,你可以试着自己想一想这个问题的答案。dQx28资讯网——每日最新资讯28at.com

接收消息代码如下,可以看到 delayedQueue 是没有用到的,那么为什么要加这一行呢,这个后面总结部分回答。dQx28资讯网——每日最新资讯28at.com

public void consume() throws InterruptedException {    String queuename = "delay-queue";    RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);    String msg = blockingQueue.take();    //收到消息进行处理...}

其实这个问题也是我开发过程中遇到的一个奇怪的地方,接收方代码没有初始化延时队列。dQx28资讯网——每日最新资讯28at.com

首先再啰嗦一句,初始化延时队列的作用是会定时去把【消息延时队列】里的到期数据移动到【消息目标队列】。dQx28资讯网——每日最新资讯28at.com

如果只有发送方初始化延时队列:dQx28资讯网——每日最新资讯28at.com

  1. 发送方发送了延迟消息,在到期之前下线了(它就不能把【消息延时队列】里的到期数据移动到【消息目标队列】),而且没有其他发送方。
  2. 接收方不管有多少个,都没人能把【消息延时队列】里的到期数据移动到【消息目标队列】。

所以接收方代码里也初始化延时队列能够避免一部分数据丢失问题。dQx28资讯网——每日最新资讯28at.com

- End-DailyMart是一个基于 DDD 和Spring Cloud Alibaba的微服务商城系统,采用SpringBoot3.x以及JDK17。旨在为开发者提供集成式的学习体验,并将其无缝地应用于实际项目中。该专栏包含领域驱动设计(DDD)、Spring Cloud Alibaba企业级开发实践、设计模式实际应用场景解析、分库分表战术及实用技巧等内容。如果你对这个系列感兴趣,可在本公众号回复关键词 DDD 获取完整文档以及相关源码。


dQx28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-112785-0.html开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 这应该是全网最详细的Vue3.5版本解读

下一篇: SpringBoot整合RabbitMQ实现邮件异步发送

标签:
  • 热门焦点
  • K60至尊版刚预热 一加Ace2 Pro正面硬刚

    K60至尊版刚预热 一加Ace2 Pro正面硬刚

    Redmi这边刚如火如荼的宣传了K60 Ultra的各种技术和硬件配置,作为竞品的一加也坐不住了。一加中国区总裁李杰发布了两条微博,表示在自家的一加Ace2上早就已经采用了和PixelWo
  • 俄罗斯:将审查iPhone等外国公司设备 保数据安全

    俄罗斯:将审查iPhone等外国公司设备 保数据安全

    iPhone和特斯拉都属于在各自领域领头羊的品牌,推出的产品也也都是数一数二的,但对于一些国家而言,它们的产品可靠性和安全性还是在限制范围内。近日,俄罗斯联邦通信、信息技术
  • 红魔电竞平板评测:大屏幕硬实力

    红魔电竞平板评测:大屏幕硬实力

    前言:三年的疫情因为要上网课的原因激活了平板市场,如今网课的时代已经过去,大家的生活都恢复到了正轨,这也就意味着,真正考验平板电脑生存的环境来了。也就是面对着这种残酷的
  • Temu起诉SHEIN,跨境电商战事升级

    Temu起诉SHEIN,跨境电商战事升级

    来源 | 伯虎财经(bohuFN)作者 | 陈平安日前据外媒报道,拼多多旗下跨境电商平台Temu正对竞争对手SHEIN提起新诉讼,诉状称Shein&ldquo;利用市场支配力量强迫服装厂商与之签订独家
  • 破圈是B站头上的紧箍咒

    破圈是B站头上的紧箍咒

    来源 | 光子星球撰文 | 吴坤谚编辑 | 吴先之每年的暑期档都少不了瞄准追剧女孩们的古偶剧集,2021年有优酷的《山河令》,2022年有爱奇艺的《苍兰诀》,今年却轮到小破站抓住了追
  • 签约井川里予、何丹彤,单视频点赞近千万,MCN黑马永恒文希快速崛起!

    签约井川里予、何丹彤,单视频点赞近千万,MCN黑马永恒文希快速崛起!

    来源:视听观察永恒文希传媒作为一家MCN公司,说起它的名字来,可能大家会觉得有点儿陌生,但是说出来下面一串的名字之后,或许大家就会感到震惊,原来这么多网红,都签约这家公司了。根
  • 东方甄选单飞:有些鸟注定是关不住的

    东方甄选单飞:有些鸟注定是关不住的

    作者:彭宽鸿来源:华尔街科技眼&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;东方甄选创始人俞敏洪带队的&ldquo;7天甘肃行&rdquo;直播活动已在近日顺利收官。成立后一
  • 网传小米汽车开始筛选交付中心 建筑面积不低于3000平方米

    网传小米汽车开始筛选交付中心 建筑面积不低于3000平方米

    7月7日消息,近日有微博网友@长三角行健者爆料称,据经销商集团反馈,小米汽车目前已经开始了交付中心的筛选工作,要求候选场地至少有120个车位,建筑不能低
  • Meta盲目扩张致超万人被裁,重金押注元宇宙而前景未明

    Meta盲目扩张致超万人被裁,重金押注元宇宙而前景未明

    图片来源:图虫创意日前,Meta创始人兼CEO 马克&middot;扎克伯发布公开信,宣布Meta计划裁员超11000人,占其员工总数13%。他公开承认了自己的预判失误:&ldquo;不仅
Top