消息队列是一种典型的发布/订阅模式,是专门为异步化应用和分布式系统设计的,具有高性能、稳定性及可伸缩性的特点,是开发分布式系统和应用系统必备的技术之一。目前,针对不同的业务场景,比较成熟可靠的消息中间件产品有RocketMQ、Kafka、RabbitMq等,基于Redis再去实现一个消息队列少有提及,那么已经有很成熟的产品可以选择,还有必要再基于Redis自己来实现一个消息队列吗?基于Redis实现的消息队列有什么特别的地方吗?
先来回顾一个Redis有哪些特性:
总结一下:redis的特点就是:快、简单、稳定;
以RocketMQ为代表,作为专业的消息中间件而言,有哪些特性呢:
总结一下:RocketMQ的特点就是除了性能非常高、系统本身的功能比较专业、完善,能适应非常多的场景;
从上述分析可以看出,Redis队列和MQ消息队列各有优势,Redis的最大特点就是快,所以基于Redis的消息队列相比MQ消息队列而言,更适合实时处理,但是基于Redis的消息队列更易受服务器内存限制;而RocketMQ消息队列作为专业的消息中间件产品,功能更完善,更适合应用于比较复杂的业务场景,可以实现离线消息发送、消息可靠投递以及消息的安全性,但MQ消息队列的读写性能略低于Redis队列。在技术选型时,除了上述的因素外,还有一个需要注意:大多数系统都会引入Redis作为基础的缓存中间件使用,如果要选用RocketMQ的话,还需要额外再申请资源进行部署。
很多时候,所谓的优点和缺点,只是针对特定场景而言,如果场景不一样了,优点可能会变成缺点,缺点也可能会变成优点。因此,除了专业的消息中间件外,基于Redis实现一个消息队列也是有必要的,在某些特殊的业务场景,比如一些并发量不是很高的管理系统,某些业务流程需要异步化处理,这时选择基于Redis自己实现一个消息队列,也是一个比较好的选择。这也是本篇文章主要分享的内容。
队列(Queue)是一种数据结构,遵循先进先出(FIFO)的原则。在队列中,元素被添加到末尾(入队),并从开头移除(出队)。
Java中有哪些队列?
以LinkedBlockingQueue为例,其使用方法是这样的:
创建了一个生产者线程和一个消费者线程,生产者线程和消费者线程分别对同一个LinkedBlockingQueue对象进行操作。生产者线程通过调用put()方法将元素添加到队列中,而消费者线程通过调用take()方法从队列中取出元素。这两个方法都会阻塞线程,直到队列中有元素可供取出或有空间可供添加元素。
import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueExample { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 生产者线程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { queue.put("Element " + i); System.out.println("Produced: Element " + i); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消费者线程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String element = queue.take(); System.out.println("Consumed: " + element); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
图片
无法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失;
发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后就不能消费之前的历史消息;
不支持消费者确认机制,稳定性不能得到保证,例如当消费者获取到消息之后,还没来得及执行就宕机了。因为没有消费者确认机制,Redis 就会误以为消费者已经执行了,因此就不会重复发送未被正常消费的消息了,这样整体的 Redis 稳定性就被没有办法得到保障了。
基于Stream 类型实现:使用 Stream 的 xadd 和 xrange 来实现消息的存入和读取了,并且 Stream 提供了 xack 手动确认消息消费的命令,用它我们就可以实现消费者确认的功能了,使用命令如下:
127.0.0.1:6379> xack mq group1 1580959593553-0(integer) 1
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:
其中“Group”为群组,消费者也就是接收者需要订阅到群组才能正常获取到消息。
以上就是基于Redis实现消息队列的几种方式的简单对比介绍,下面主要是分享一下基于Redis的List数据类型实现,其他几种方式,有兴趣的小伙可以自己尝试一下。
基于Redis的List数据类型实现消费队列的工作原理是什么?
Redis基于List结构实现队列的原理主要依赖于List的push和pop操作。
在Redis中,你可以使用LPUSH命令将一个或多个元素推入列表的左边,也就是列表头部。同样,你可以使用RPUSH命令将一个或多个元素推入列表的右边,也就是列表尾部。
对于队列来说,新元素总是从队列的头部进入,而读取操作总是从队列的尾部开始。因此,当你想将一个新元素加入队列时,你可以使用LPUSH命令。当你想从队列中取出一个元素时,你可以使用RPOP命令。
此外,Redis还提供了BRPOP命令,这是一个阻塞的RPOP版本。如果给定列表内没有任何元素可供弹出的话,将阻塞连接直到等待超时或发现可弹出元素为止。
需要注意的是,虽然Redis能够提供原子性的push和pop操作,但是在并发环境下使用队列时,仍然需要考虑线程安全和并发控制的问题。你可能需要使用Lua脚本或者其他机制来确保并发操作的正确性。
总的来说,Redis通过提供List数据结构以及一系列相关命令,可以很方便地实现队列的功能。
下面是Redis关于List数据结构操作的命令主要包括以下几种:
以一个实际需求为例,演示一个基于Redis的延迟队列是怎么使用的?
有一个XX任务管理的功能,主要的业务过程:
1、创建任务后;
2、不断检查任务的状态,任务的状态有三种:待执行、执行中、执行完成;
3、如果任务状态是执行完成后,主动获取任务执行结果,对任务执行结果进行处理;如果任务状态是待执行、执行中,则延迟5秒后,再次查询任务执行状态;
图片
1、依赖引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.4.7.RELEASE</version></dependency><dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.23.1</version></dependency>
2、定义三个延迟队列BeforeQueue、RunningQueue、CompleteQueue,对队列的任务进行存取,BeforeQueue用于对待执行状态的任务的存取,Running用于对执行中状态的任务的存取,CompleteQueue用于对执行完成状态的任务的存取,在三个任务队列中,取出元素是阻塞的,即如果队列中没有新的任务,当前线程会一直阻塞等待,直到有新的任务进入;如果是队列中还有元素,则遵循先进先出的原则逐个取出进行处理;
@Component@Slf4jpublic class BeforeQueue { @Autowired private RedissonClient redissonClient; /** * <p>取出元素</p> * <p>如果队列中没有元素,就阻塞等待,直</p> * @return */ public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1"); Object obj = null; try { obj = queue1.take(); log.info("从myqueue1取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } /** * <p>放入元素</p> * @param obj */ public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue1设置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class RunningQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2"); Object obj = null; try { obj = queue1.take(); log.info("从myqueue2取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue2设置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class CompleteQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3"); Object obj = null; try { obj = queue1.take(); log.info("从CompleteQueue取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向CompleteQueue设置元素:{}",obj.toString()); }}
3、定义三个监听器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,监听器的主要作用主要就是负责监听三个队列中是否有新的任务 元素进入,如果有,则立即取出消费;如果没有,则阻塞等待新的元素进入,具体的实现逻辑是:新创建的任务会先放置到BeforeQueue中,BeforeQueueListener监听到有新的任务进入,会取出任务作一些业务处理,业务处理完一放入到RunningQueue中,RunningQueueListener监听到有新的任务进入,会取出任务再进行处理,这里的处理主要是查询任务执行状态,查询状态结果主要分两种情况:1、执行中、待执行状态,则把任务重新放入RunningQueue队列中,延迟5秒;2、执行完成状态,则把任务放置到CompleteQueue中;CompleteQueueListener监听到有新的任务进入后,会主动获取任务执行结果,作最后业务处理;
4、监听器在在处理队列中的数据相关的业务时,如果发生异常,则需要把取出的元素再重新入入到当前队列中,等待下一轮的重试;
@Component@Slf4jpublic class BeforeQueueListener implements Listener{ @Autowired private BeforeQueue beforeQueue; @Autowired private RunningQueue runningQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("监听器进入阻塞:BeforeQueueListener"); Object obj = beforeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("开始休眠1s模拟业务处理:BeforeQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("业务处理完成:BeforeQueueListener,元素:{}",obj.toString()); runningQueue.offer(obj); } catch (InterruptedException e) { log.error("业务处理发生异常,重置元素到BeforeQueue队列中"); log.error(e.getMessage()); beforeQueue.offer(obj); } } } } }).start(); }}
@Component@Slf4jpublic class RunningQueueListener implements Listener { @Autowired private RunningQueue runningQueue; @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true) { log.info("监听器进入阻塞:RunningQueueListener"); Object obj = runningQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("开始休眠1s模拟业务处理:RunningQueueListener,元素:{}", obj.toString()); Thread.currentThread().sleep(1000); Random random = new Random(); int i = random.nextInt(2); if (i==0) { test(); } log.info("业务处理完成:RunningQueueListener,元素:{}", obj.toString()); completeQueue.offer(obj); } catch (Exception e) { log.error("业务处理发生异常,重置元素到RunningQueue队列中"); log.error(e.getMessage()); runningQueue.offer(obj); } } } } }).start(); } public void test(){ try { int i=1/0; } catch (Exception e) { throw new RuntimeException("除数异常"); } }}
@Component@Slf4jpublic class CompleteQueueListener implements Listener{ @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("监听器进入阻塞:CompleteQueueListener"); Object obj = completeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("开始休眠1s模拟业务处理:CompleteQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("业务处理完成:listener3,元素:{}",obj.toString()); } catch (InterruptedException e) { log.error("业务处理发生异常,重置元素到CompleteQueue队列中"); log.error(e.getMessage()); completeQueue.offer(obj); } log.info("CompleteQueueListener任务结束,元素:{}",obj.toString()); } } } }).start(); }}
5、利用Springboot的扩展点ApplicationRunner,在项目启动完成后,分别启动BeforeQueueListener、RunningQueueListener、CompleteQueueListener,让三个监听器进入阻塞监听状态
@Componentpublic class MyRunner implements ApplicationRunner { @Autowired private ApplicationContext applicationContext; @Override public void run(ApplicationArguments args) throws Exception { Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class); for (String s : beansOfType.keySet()) { Listener listener = beansOfType.get(s); listener.start(); } }}
结果验证
图片
三个任务队列分别有三个线程来进行阻塞监听,即如果任务队列中有任务元素,则取出进行处理;如果没有,则阻塞等待,主线程只负责把任务设置到任务队列中,出现的问题是:控制台的日志输出显示任务元素已经放置到第一个BeforeQueue中,按照预期的结果应该是,控制台的日志输出会显示,从BeforeQueue取出元素进行业务处理、以及业务处理的日志,然后放置到RunningQueue中,再从RunningQueue中取出进行业务处理,接着放置到CompleteQueue队列中,最后从CompleteQueue中取出进行业务处理,最后结束;实际情况是:总是缺少从BeforeQueue取出元素进行业务处理、以及业务处理的日志,其他的日志输出都很正常、执行结果也正常;
经过排查分析,最后找到了原因:
是logback线程安全问题, Logback 的大部分组件都是线程安全的,但某些特定的配置可能会导致线程安全问题。例如,如果你在同一个 Appender 中处理多个线程的日志事件,那么可能会出现线程安全问题,导致某些日志事件丢失。
问题原因找到了,其实解决方法也就找到,具体就是logback的异步日志,logback.xml配置如下:
<?xml versinotallow="1.0" encoding="UTF-8"?><configuration scan="true" scanPeriod="60 seconds" debug="false"> <!-- 日志存放路径 --> <property name="log.path" value="logs/"/> <!-- 日志输出格式 --> <property name="console.log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/> <!-- 控制台输出 --> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${console.log.pattern}</pattern> <charset>utf-8</charset> </encoder> </appender> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <queueSize>500</queueSize> <discardingThreshold>0</discardingThreshold> <neverBlock>true</neverBlock> <appender-ref ref="console" /> </appender> <!--系统操作日志--> <root level="info"> <appender-ref ref="ASYNC" /> </root></configuration>
文章中展示了关键性代码,示例全部代码地址:https://gitcode.net/fox9916/redisson-demo.git
本文链接:http://www.28at.com/showinfo-26-55129-0.html基于Redis实现消息队列的实践
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com