本文给大家介绍一下在 Spring Boot 项目中如何集成消息队列 RabbitMQ,包含对 RibbitMQ 的架构介绍、应用场景、坑点解析以及代码实战。最后文末有免费领取龙年红包封面以及腾讯云社区答题领奖福利,欢迎大家领取。
我将使用 waynboot-mall 项目作为代码讲解,项目地址:https://github.com/wayn111/waynboot-mall。本文大纲如下,
图片
图片
RibbitMQ 是一个基于 AMQP 协议的开源消息队列系统,具有高性能、高可用、高扩展等特点。通常作为在系统间传递消息的中间件,它可以实现异步处理、应用解耦、流量削峰等功能。
图片
RibbitMQ 的主要组件介绍如下,
图片
RabbitMQ 是一个非常强大和灵活的消息中间件,它可以应用于多种场景和需求。以下是一些常见的 RabbitMQ 应用场景和实战经验:
图片
在使用 RabbitMQ 的过程中,有一些常见的问题需要注意:
在 waynboot-mall 项目中,消息层包含两个模块 waynboot-message-core 以及 waynboot-message-consumer,目录结构如下,
|-- waynboot-message-core // 核心消息配置,供其他服务集成使用| |-- config| |-- constant| |-- dto|-- waynboot-message-consumer // 消息消费服务,订阅队列接收消息,调用其他服务执行一些具体的业务逻辑| |-- api| |-- config| |-- consumer
waynboot-message-core 包目录说明如下,
waynboot-message-consumer 包目录说明如下,
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>${spring-boot.version}</version></dependency>
图片
在 waynboot-mall 项目中,通过 yml 文件的 spring.rabbitmq.virtual-host=“/” 属性来指定虚拟主机名称。
建议大家在使用 RabbitMQ 时都配置好自己项目的虚拟主机名称,来达到各系统资源隔离的目的。当然如果 RabbitMQ 服务只有一个项目在用,那就用默认的 / 作为虚拟主机名称也是可以的。
小知识:出于多租户和安全因素设计的,vhost 把 AMQP 的基本组件划分到一个虚拟的分组中。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个 RabbitMQ 服务器时,可以划分出多个虚拟主机。RabbitMQ 默认的虚拟主机路径是 /。
在 waynboot-mall 项目中,用订单消息来举例,生产者发送消息需要经过三个步骤
public class MQConstants { public static final String ORDER_DIRECT_QUEUE = "order_direct_queue"; public static final String ORDER_DIRECT_EXCHANGE = "order_direct_exchange"; public static final String ORDER_DIRECT_ROUTING = "order_direct_routing";}@Configurationpublic class BusinessRabbitConfig { @Bean public Queue orderDirectQueue() { return new Queue(MQConstants.ORDER_DIRECT_QUEUE); } @Bean DirectExchange orderDirectExchange() { return new DirectExchange(MQConstants.ORDER_DIRECT_EXCHANGE); } @Bean Binding bindingOrderDirect() { return BindingBuilder.bind(orderDirectQueue()).to(orderDirectExchange()).with(MQConstants.ORDER_DIRECT_ROUTING); }}
在 BusinessRabbitConfig 中,我们创建了订单交换机、队列以及路由绑定关系。在 Spring 项目中,项目启动时,就会自动在 RabbitMQ 服务器上创建好这些东西。
交换机列表
队列列表
生产者的消息发送确认主要包含两部分,
producter -> rabbitmq broker exchange -> queue
waynboot-mall 项目的 yml 中关于 RabbitMQ 的相关配置如下,
spring: # 配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 消息确认配置项 # 确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated # 确认消息已发送到队列(Queue) publisher-returns: true # 虚拟主机名称 virtual-host: /
可以看到,我们设置了 publisher-confirm-type 属性为 correlated,表示开启发布确认模式,用来确认消息已发送到交换机,publisher-confirm-type 有三个选项:
在 RabbitMQ 中,消息发送到交换机中也不代表消费者一定能接收到消息,所以我们还需要设置 publisher-returns 为 true 来表示确认交换机中消息已经发送到队列里。true 表示开启失败回调,开启后当消息无法路由到指定队列时会触发 ReturnCallback 回调。
接着是 RabbitTemplateConfig 的代码,这里面会定义前面提到的 confirmCallBack、returnCallBack 相关代码,
@Slf4j@Componentpublic class RabbitTemplateConfig { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); // 交换机收到消息回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); // 队列收到消息回调,如果失败的话会进行 returnCallback 的回调处理,反之成功就不会回调。 rabbitTemplate.setReturnsCallback(returned -> { log.info("returnCallback: " + "消息:" + returned.getMessage()); log.info("returnCallback: " + "回应码:" + returned.getReplyCode()); log.info("returnCallback: " + "回应信息:" + returned.getReplyText()); log.info("returnCallback: " + "交换机:" + returned.getExchange()); log.info("returnCallback: " + "路由键:" + returned.getRoutingKey()); }); return rabbitTemplate; }}
在 RabbitTemplateConfig 类代码里,我们可以设置 confirmCallBack、returnCallBack 回调函数后,监控生产者发送消息是否被交换机接收、以及交换机是否把消息发送到队列中。
在 Spring Boot 项目中,集成了 spring-boot-starter-amqp 依赖后,就可以直接注入 RabbitTemplate 来发送消息。
这里用 waynboot-mall 项目中的异步下单流程举例,代码如下,
@Slf4j@Service@AllArgsConstructorpublic class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService { private RabbitTemplate rabbitTemplate; @Override public R asyncSubmit(OrderVO orderVO) { OrderDTO orderDTO = new OrderDTO(); ... // 开始异步下单 String uid = IdUtil.getUid(); // 1. 创建消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突 CorrelationData correlationData = new CorrelationData(uid); // 2. 创建消息载体 Message ,AMQP 规范中定义的消息承载类,用来在生产者和消费者之前传递消息 Map<String, Object> map = new HashMap<>(); map.put("order", orderDTO); map.put("notifyUrl", WaynConfig.getMobileUrl() + "/callback/order/submit"); try { Message message = MessageBuilder .withBody(JSON.toJSONString(map).getBytes(Constants.UTF_ENCODING)) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 3. 发送消息到 RabbitMQ 服务器,需要指定交换机、路由键、消息载体以及消息ID rabbitTemplate.convertAndSend(MQConstants.ORDER_DIRECT_EXCHANGE, MQConstants.ORDER_DIRECT_ROUTING, message, correlationData); } catch (UnsupportedEncodingException e) { log.error(e.getMessage(), e); } return R.success().add("actualPrice", actualPrice).add("orderSn", orderSn); }}
waynboot-mall 项目中在使用 rabbitTemplate 发送消息时,按照如下步骤,大家可以参考
以上就是生产者发送消息时所有相关代码了,接着我们看下消费者处理消息的相关代码。
在 waynboot-mall 项目中,还是用订单消息来举例,消费者 yml 配置如下,
在 RabbitMQ 的消息消费环节,需要注意的一点就是,如果需要确保消费者不出现漏消费,则需要开启消费者的手动 ack 模式。
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest ... listener: simple: # 消息确认方式,其有三种配置方式,分别是none、manual(手动ack) 和auto(自动ack) 默认auto acknowledge-mode: manual # 一个消费者最多可处理的nack(未确认)消息数量,默认是250 prefetch: 250 # 设置消费者数量 concurrency: 1
在 yml 文件的消费者配置中,acknowledge-mode 属性用于指定消息确认模式,有三种模式:
如果消费者在消费的过程中没有抛出异常,则自动确认。
当消费者消费的过程中抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且该消息不会重回队列。
当抛出 ImmediateAcknowledgeAmqpException 异常,消息会被确认。
如果抛出其他的异常,则消息会被拒绝,但是与前两个不同的是,该消息会重回队列,如果此时只有一个消费者监听该队列,那么该消息重回队列后又会推送给该消费者,会造成死循环的情况。
消费者配置中,prefetch 属性用于指定消费者每次从队列获取的消息数量。
每个 customer 会在 MQ 预取一些消息放入内存的 LinkedBlockingQueue 中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果 ack 模式为 none,则忽略。
prefetch 默认值以前是 1,这可能会导致高效使用者的利用率不足。从 spring-amqp 2.0 版开始,默认的 prefetch 值是 250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。
不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch 的值应当设置为 1。
对于低容量消息和多个消费者的情况(也包括单 listener 容器的 concurrency 配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动 ack 下并设置 prefetch=1。
如果要保证消息的可靠不丢失,当 prefetch 大于 1 时,可能会出现因为服务宕机引起的数据丢失,故建议将 prefetch=1。
消费者配置中,concurrency 属性设置的是对每个 listener 在初始化的时候设置的并发消费者的个数。在上面的 yml 配置中,cnotallow=1,即每个 Listener 容器将开启一个线程去处理消息。在 2.0 以后的版本中,可以在注解中配置该参数,实例代码如下,
@RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE, concurrency = "2")public void process(Channel channel, Message message) throws IOException { String body = new String(message.getBody()); log.info("OrderPayConsumer 消费者收到消息: {}", body); ...}
在 waynboot-mall 项目中,消费者监听队列代码如下,
@Slf4j@Componentpublic class OrderPayConsumer { @Resource private RedisCache redisCache; @Resource private MobileApi mobileApi; @RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE) public void process(Channel channel, Message message) throws IOException { // 1. 转换订单消息 String body = new String(message.getBody()); log.info("OrderPayConsumer 消费者收到消息: {}", body); // 2. 获取消息ID String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); // 3. 获取发送tag long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 4. 消费消息幂等性处理 if (redisCache.getCacheObject(ORDER_CONSUMER_MAP.getKey()) != null) { // redis中包含该 key,说明该消息已经被消费过 log.error("msgId: {},消息已经被消费", msgId); channel.basicAck(deliveryTag, false);// 确认消息已消费 return; } try { // 5. 下单处理 mobileApi.submitOrder(body); // 6. 手动ack,消息成功确认 channel.basicAck(deliveryTag, false); // 7. 设置消息已被消费标识 redisCache.setCacheObject(ORDER_CONSUMER_MAP.getKey(), msgId, ORDER_CONSUMER_MAP.getExpireSecond()); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); log.error(e.getMessage(), e); } }}
waynboot-mall 项目中在使用 RabbitListener 注解消费消息时,按照如下步骤,大家可以参考
其中参数 long deliveryTag 为消息的唯一序号也就是第三步获取的发送 tag,第二个 boolean multiple 参数表示是否一次消费多条消息,false 表示只确认该序列号对应的消息,true 则表示确认该序列号对应的消息以及比该序列号小的所有消息,比如我先发送 2 条消息,他们的序列号分别为 2,3,并且他们都没有被确认,还留在队列中,那么如果当前消息序列号为 4,那么当 multiple 为 true,则序列号为 2、3 的消息也会被一同确认。
这篇文章给大家讲解了在 Spring Boot 项目中如何集成消息队列 RabbitMQ 用于业务逻辑解耦,有架构介绍、应用场景、坑点解析、代码实战 4 个部分,能带领大家比较全面的了解一波 RabbitMQ。大家在自己的项目中如果需要引入 RabbitMQ 时,都可以参考本文的代码实战配置,帮助大家快速集成、避免踩坑。
本文链接:http://www.28at.com/showinfo-26-70467-0.htmlSpring Boot项目集成RabbitMQ实战以及坑点讲解
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 利用Nacos实现Seata事务模式(XA与AT)的快速配置与灵活切换
下一篇: 字节码增强技术,不止有 Java Proxy、 Cglib 和 Javassist 还有 Byte Buddy