当前位置:首页 > 科技  > 软件

深入浅出RabbitMQ:顺序消费、死信队列和延时队列

来源: 责编: 时间:2023-11-03 17:07:48 269观看
导读大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。1. 引言在今天的文章中,我们来聊一聊 RabbitMQ,这是小 ❤ 在工作中用的最早的消息中间件,主要用于大量数据的

大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。8Yv28资讯网——每日最新资讯28at.com

1. 引言

在今天的文章中,我们来聊一聊 RabbitMQ,这是小 ❤ 在工作中用的最早的消息中间件,主要用于大量数据的异步消费。8Yv28资讯网——每日最新资讯28at.com

2. RabbitMQ

2.1 核心组件

RabbitMQ 是一个开源的消息中间件,它实现了高级消息队列协议(AMQP),同时提供了各种重要组件来支持消息的生产、传输和消费。8Yv28资讯网——每日最新资讯28at.com

图片图片8Yv28资讯网——每日最新资讯28at.com

  • Producer(生产者): 生产者是消息的发送方,负责将消息发布到 RabbitMQ 服务器。消息可以包含任何内容,例如任务、日志、通知等。
  • Channel(信道):消息推送与接收时使用的通道。
  • Exchange(交换机): 交换机是消息的中转站,它接收来自生产者的消息并将其路由到一个或多个队列。不同类型的交换机,如 fanout,direct,topic,headers,支持不同的路由规则。
  • Queue(队列): 队列是消息的缓冲区,消息在发送到消费者之前存储在队列中,消费者从队列中获取消息并进行处理。
  • Consumer(消费者): 消费者是消息的接收方,它从队列中获取消息并进行处理。消费者可以是多个,它们可以在不同的应用程序或服务器上运行。

2.2 工作流程

RabbitMQ 的工作方式是基于生产者、交换机和队列之间的协作。这是一个简单的消息传递过程:8Yv28资讯网——每日最新资讯28at.com

  • 将队列与交换机绑定(Binding)起来,定义了消息的路由规则;
  • 生产者将消息发布到交换机,交换机根据绑定规则将消息路由到一个或多个队列;
  • 消费者从队列中获取消息并进行处理。

这种模型具有高度的灵活性,可以轻松处理大量消息,同时确保消息的可靠传递。8Yv28资讯网——每日最新资讯28at.com

2.3 特性

说到消息中间件,很多人首先想到的就是 Kafka,但 RabbitMQ 也是许多金融或互联网公司构建可靠、可伸缩和高性能系统的首选。8Yv28资讯网——每日最新资讯28at.com

这是为什么呢?8Yv28资讯网——每日最新资讯28at.com

主要得从 RabbitMQ 的特性说起,主要有二:一个是功能强大,另一个是可靠性!8Yv28资讯网——每日最新资讯28at.com

RabbitMQ 注重消息的可靠性和灵活性,适合任务排队和消息传递。而 Kafka 是分布式流式平台,注重日志存储和数据分发。8Yv28资讯网——每日最新资讯28at.com

顺序消费也是可靠性的一种,RabbitMQ 可以使用单一队列或多个单一队列来确保顺序消费。8Yv28资讯网——每日最新资讯28at.com

除此之外,RabbitMQ 还提供持久性队列和消息,以确保消息在 RabbitMQ 服务器宕机后不会丢失。另外,生产者可以使用发布确认机制来确认消息是否被接收。8Yv28资讯网——每日最新资讯28at.com

RabbitMQ 相对 kafka 可靠性更好,数据更不易丢失,这对于一些数据敏感型的业务来说,显然更适合用前者。8Yv28资讯网——每日最新资讯28at.com

并且,RabbitMQ 中原生支持死信队列,可以更好地处理未完成的业务消息,以及实现延时队列等特性,接下来我们一一介绍。8Yv28资讯网——每日最新资讯28at.com

3. 保证顺序消费

RabbitMQ 提供了多个队列模型来保证消息的顺序消费。这对于某些应用程序非常重要,例如处理订单、支付和库存管理。8Yv28资讯网——每日最新资讯28at.com

消息错乱消费的场景

图片图片8Yv28资讯网——每日最新资讯28at.com

如上图所示,有三条业务消息分别是删除、增加和修改操作,但是 Consumer 没有按顺序消费,最终存储的顺序是增加、修改和删除,就会发生数据错乱。8Yv28资讯网——每日最新资讯28at.com

针对消息有序性的问题,RabbitMQ 的解决方法是分三个阶段来保证。8Yv28资讯网——每日最新资讯28at.com

  • 发送消息:入队列

消息发送时,需要业务来保证顺序性,就是保证生产者入队的顺序是有序的。8Yv28资讯网——每日最新资讯28at.com

在分布式的场景下如果难以保证各个服务器的入队顺序,则可以加分布式锁的方式来解决。或者在业务生产方的消息里带上消息递增 ID,以及消息产生的时间戳。8Yv28资讯网——每日最新资讯28at.com

  • 队列中的消息

在 RabbitMQ 的消息会保存在队列(Queue)中,在同一个队列里的消息是先进先出(FIFO)的,这个由 RabbitMQ 来帮我们保证顺序。8Yv28资讯网——每日最新资讯28at.com

而不同队列中的消息,RabbitMQ 无法保证其顺序性,就像我们在食堂打饭一样,站在不同的排队队列,我们也无法保证会比其他队列的人先打上饭。8Yv28资讯网——每日最新资讯28at.com

  • 消费消息:出队

一般来说,出队后的顺序消费交给消费者去保证。我们说的保证消费顺序,通常也是指消费者消费消息的顺序。8Yv28资讯网——每日最新资讯28at.com

有多个消费者的情况下,通常是无法保证消息顺序的。8Yv28资讯网——每日最新资讯28at.com

这就相当于我们在排队打饭时,有多个打饭阿姨,但是每个阿姨打饭的速度不一致,对应我们消费者的消费能力也不同。8Yv28资讯网——每日最新资讯28at.com

所以,为了保证消息的顺序性,我们可以只使用一个消费者来接收业务消息。8Yv28资讯网——每日最新资讯28at.com

就好比只有一个阿姨在打饭,来得早就一定能早点打上饭。但很明显,这样效率不是很高,所以在使用时我们需要权衡利弊:看业务更需要顺序性,还是更需要消费效率。8Yv28资讯网——每日最新资讯28at.com

优先级队列

在保证顺序消费时,另一个迂回策略是可以使用优先级队列(Priority Queue)。8Yv28资讯网——每日最新资讯28at.com

在 RabbitMQ3.5 之后,当消费者数量较少,如果服务器检测到消费者不能及时消费消息的情况下,优先级队列就会生效。8Yv28资讯网——每日最新资讯28at.com

具体有两种优先级策略:8Yv28资讯网——每日最新资讯28at.com

  • 设置队列的优先级
  • 设置消息的优先级

在声明队列时,我们可以通过 x-max-priority 属性来设置队列的最大优先级,或通过 Priority 属性来设置消息的优先级,从 1~10。8Yv28资讯网——每日最新资讯28at.com

Golang 实现代码如下:8Yv28资讯网——每日最新资讯28at.com

// 队列属性props := make(map[string]interface{})// 设置队列最大优先级props["x-max-priority"] = 10ch.Publish(   "tizi365",     // 交换机   "", // 路由参数   false,   false,   amqp.Publishing{       Priority:5, // 设置消息优先级       DeliveryMode:2,  // 消息投递模式,1代表非持久化,2代表持久化,       ContentType: "text/plain",       Body:       []byte(body),  })

当优先级队列消费生效时,会首先消费高优先级队列中的优先级高的消息,以此来实现顺序消费。8Yv28资讯网——每日最新资讯28at.com

但需要注意的是,优先级队列触发的条件比较苛刻,在需要严格保证业务消息顺序的情况下最好不要使用!8Yv28资讯网——每日最新资讯28at.com

4. 死信队列

RabbitMQ 里,当消息在队列中变成死信(消费者无法正常处理的消息)之后,它会被重新投递到一个交换机上(即死信交换机),死信交换机上绑定的消费队列就是死信队列。8Yv28资讯网——每日最新资讯28at.com

图片图片8Yv28资讯网——每日最新资讯28at.com

死信的产生

死信产生需要满足如下条件:8Yv28资讯网——每日最新资讯28at.com

  • 消息被消费者手动拒绝接收,并且 requeue(重新加入队列)策略为 False;
  • 消息已经过期(TTL);
  • 队列达到最大长度,消息装不下了。

死信的处理步骤

当死信产生时,如果我们定义了一个死信交换机(其实就是一个普通的交换机,只是用于处理死信,所以叫死信交换机),然后在死信交换机上绑定了一个队列(称作死信队列)。8Yv28资讯网——每日最新资讯28at.com

最后,如果死信队列有消费者监听时,死信消息的处理就会和正常业务消息一样,从交换机到队列,再由死信消费者(监听死信队列的消费者)正常消费。8Yv28资讯网——每日最新资讯28at.com

5. 延时队列

RabbitMQ 本身不支持延时队列,但是我们可以通过 RabbitMQ 的插件 rabbitmq-delayed-message-exchange,或者使用 死信队列 + 消息过期 的方式来实现。8Yv28资讯网——每日最新资讯28at.com

5.1 应用场景

当我们在电商里购物,或者在 12306 买票时,大概都会遇到这样一个场景:每次下订单后,到支付订单中间有一段商品锁定时间,超过时间后未支付订单就会关闭。8Yv28资讯网——每日最新资讯28at.com

状态转换图如下:8Yv28资讯网——每日最新资讯28at.com

图片图片8Yv28资讯网——每日最新资讯28at.com

5.2 插件实现

  • 安装插件

Github 地址:8Yv28资讯网——每日最新资讯28at.com

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

从 github 的 release 页面的 assets, 下载 rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 文件,把文件放到 rabbitmq 插件目录(plugins目录)8Yv28资讯网——每日最新资讯28at.com

提示:版本号可能跟本教程不一样,如果你的 rabbitmq 就是最新版本,插件也选择最新版本就行。8Yv28资讯网——每日最新资讯28at.com

  • 激活插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 定义交换机

通过 x-delayed-type 设置自定义交换机属性,支持发送延迟消息:8Yv28资讯网——每日最新资讯28at.com

props := make(map[string]interface{})   //关键参数,支持发送延迟消息   props["x-delayed-type"] = "direct"   // 声明交换机   err = ch.ExchangeDeclare(       "delay.queue",   // 交换机名字       "fanout", // 交换机类型       true,     // 是否持久化       false,           false,       false,       props,      // 设置属性  )
  • 发送延迟消息

通过消息头(x-delay),设置消息延迟时间。8Yv28资讯网——每日最新资讯28at.com

msgHeaders := make(map[string]interface{})       // 通过消息头,设置消息延迟时间,单位毫秒       msgHeaders["x-delay"] = 6000       err = ch.Publish(           "delay.queue",     // 交换机名字           "", // 路由参数           false,           false,           amqp.Publishing{               Headers:msgHeaders, // 设置消息头               ContentType: "text/plain",               Body:       []byte(body),          })

5.3 死信队列 + 消息过期方案

该方案的核心思想是,先创建死信交换机、队列和消费者,来监听死信消息。8Yv28资讯网——每日最新资讯28at.com

然后创建定时过期的消息,比如订单支付的时间为 30min,则将消息的 TTL(最大存活时间)设置为 30min,将消息放到一个没有消费者消费的队列中,当消息过期后就会成为死信。8Yv28资讯网——每日最新资讯28at.com

死信消息被重新发送到死信交换机,然后我们在死信队列中消费该消息,根据商品 ID 判断该商品是否被支付。8Yv28资讯网——每日最新资讯28at.com

如果没有支付,就取消订单,修改订单状态为待下单。如果已经支付,就将商品状态修改为已完成,并丢掉这条死信消息。8Yv28资讯网——每日最新资讯28at.com

5. 小结

RabbitMQ 是一个功能强大的消息中间件,它在许多互联网应用中扮演了关键角色,比如华为摄像机 SDK 的监控图像数据上报,大部分电商系统的异步消费等等。8Yv28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-16867-0.html深入浅出RabbitMQ:顺序消费、死信队列和延时队列

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: Swift 中 User Defaults 的读取和写入

下一篇: 你还在使用Python Django的ORM吗?原始SQL能实现更复杂操作!

标签:
  • 热门焦点
  • 鸿蒙OS 4.0公测机型公布:甚至连nova6都支持

    鸿蒙OS 4.0公测机型公布:甚至连nova6都支持

    华为全新的HarmonyOS 4.0操作系统将于今天下午正式登场,官方在发布会之前也已经正式给出了可升级的机型产品,这意味着这些机型会率先支持升级享用。这次的HarmonyOS 4.0支持
  • 28个SpringBoot项目中常用注解,日常开发、求职面试不再懵圈

    28个SpringBoot项目中常用注解,日常开发、求职面试不再懵圈

    前言在使用SpringBoot开发中或者在求职面试中都会使用到很多注解或者问到注解相关的知识。本文主要对一些常用的注解进行了总结,同时也会举出具体例子,供大家学习和参考。注解
  • 虚拟键盘 API 的妙用

    虚拟键盘 API 的妙用

    你是否在遇到过这样的问题:移动设备上有一个固定元素,当激活虚拟键盘时,该元素被隐藏在了键盘下方?多年来,这一直是 Web 上的默认行为,在本文中,我们将探讨这个问题、为什么会发生
  • 一文搞定Java NIO,以及各种奇葩流

    一文搞定Java NIO,以及各种奇葩流

    大家好,我是哪吒。很多朋友问我,如何才能学好IO流,对各种流的概念,云里雾里的,不求甚解。用到的时候,现百度,功能虽然实现了,但是为什么用这个?不知道。更别说效率问题了~下次再遇到,
  • 小红书1周涨粉49W+,我总结了小白可以用的N条涨粉笔记

    小红书1周涨粉49W+,我总结了小白可以用的N条涨粉笔记

    作者:黄河懂运营一条性教育视频,被54万人“珍藏”是什么体验?最近,情感博主@公主是用鲜花做的,火了!仅仅凭借一条视频,光小红书就有超过128万人,为她疯狂点赞!更疯狂的是,这
  • 本地生活这块肥肉,拼多多也想吃一口

    本地生活这块肥肉,拼多多也想吃一口

    出品/壹览商业 作者/李彦编辑/木鱼拼多多也看上本地生活这块蛋糕了。近期,拼多多在App首页“充值中心”入口上线了本机生活界面。壹览商业发现,该界面目前主要
  • 重估百度丨大模型,能撑起百度的“今天”吗?

    重估百度丨大模型,能撑起百度的“今天”吗?

    自象限原创 作者|程心 罗辑2023年之前,对于自己的“今天”,百度也很迷茫。“新业务到 2022 年底还是 0,希望 2023 年出来一个 1。”这是2022年底,李彦宏
  • 2纳米决战2025

    2纳米决战2025

    集微网报道 从三强争霸到四雄逐鹿,2nm的厮杀声已然隐约传来。无论是老牌劲旅台积电、三星,还是誓言重回先进制程领先地位的英特尔,甚至初成立不久的新
  • 亲历马斯克血洗Twitter,硅谷的苦日子在后头

    亲历马斯克血洗Twitter,硅谷的苦日子在后头

    文/刘哲铭  编辑/李薇  马斯克再次挥下裁员大刀。  美国时间11月14日,Twitter约4400名外包员工遭解雇,此次被解雇的员工的主要工作为内容审核等。此前,T
Top