当前位置:首页 > 科技  > 软件

消息队列批量收发消息,请避开这五个坑!

来源: 责编: 时间:2023-11-30 09:26:20 356观看
导读大家好,我是君哥。使用消息队列时,为了提高生产和消费的性能,有时会开启批量处理。在生产端,生产者发送的消息先发送到一个消息列表,积累到一定的消息量之后再批量发送给 Broker,如下图:在消费端,消费者拉取消息后先不立即处

efV28资讯网——每日最新资讯28at.com

大家好,我是君哥。efV28资讯网——每日最新资讯28at.com

使用消息队列时,为了提高生产和消费的性能,有时会开启批量处理。efV28资讯网——每日最新资讯28at.com

在生产端,生产者发送的消息先发送到一个消息列表,积累到一定的消息量之后再批量发送给 Broker,如下图:efV28资讯网——每日最新资讯28at.com

efV28资讯网——每日最新资讯28at.com

在消费端,消费者拉取消息后先不立即处理,而是把消息转存到一个内存队列或数据库,由业务线程去处理,如下图:efV28资讯网——每日最新资讯28at.com

efV28资讯网——每日最新资讯28at.com

无论是生产者做批量发送,还是消费者做批量处理,都需要考虑使用批量消息的业务场景,避免踩坑。下面看一下批量操作可能会遇到哪些坑。efV28资讯网——每日最新资讯28at.com

批量大小

当生产者采用批量发送的方式来提高发送性能时,一定要考虑发送消息的批量大小。下面是 RocketMQ 批量发送的官方示例:efV28资讯网——每日最新资讯28at.com

String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));try {    producer.send(messages);} catch (Exception e) {    e.printStackTrace();    //handle the error}

RocketMQ 默认消息大小是 4M,由 maxMessageSize 参数控制,如果批量消息大小超过 maxMessageSize,则会抛出异常。efV28资讯网——每日最新资讯28at.com

如果遇到消息大小超过 maxMessageSize 的情况时,可以用下面方法进行处理:efV28资讯网——每日最新资讯28at.com

  • 把这个参数改大,但需要考虑 Broker 的性能和网络带宽;
  • 将消息进行拆分后分批发送;
  • 对消息进行压缩处理。

RabbitMQ 相关的 API 则提供了更加灵活的批量控制,对消息数量和消息大小都做了控制,下面看一下源码:efV28资讯网——每日最新资讯28at.com

efV28资讯网——每日最新资讯28at.com

efV28资讯网——每日最新资讯28at.com

幂等

消费端可以批量拉取消息进行消费,这样可以减少拉取消息时的 RPC 次数,提升消费性能。比如在 RocketMQ 中,可以通过 Consumer 中的 pullBatchSize 来设置一次拉取的消息数量,通过 consumeMessageBatchMaxSize 参数来设置一次消费的消息数量。efV28资讯网——每日最新资讯28at.com

但需要注意的是,如果批量消息中一条消息消费失败了,这一批消息都需要进行重试,已经消费成功的消息会被重复消费,带来业务问题。efV28资讯网——每日最新资讯28at.com

为了不对业务造成影响,必须考虑幂等。一个简单的方法是在消息中增加全局唯一 id 属性,对消息消费结果进行记录,消费成功后保存 id。这样在消费消息之前先查询是否存在消费成功的记录,如果存在则直接返回处理成功。efV28资讯网——每日最新资讯28at.com

时延

在使用消息队列进行批量操作时,必须要考虑到时延问题。比如我们设置一个批次 100 条消息,积累够 100 条消息后再发送,在消息量小的情况下,可能积累够 100 条消息会很长时间,导致消费端拉取到一条消息时延很大。efV28资讯网——每日最新资讯28at.com

虽然消息队列的一个重要作用是削峰填谷,但在一些场景下,对消息的实时性也有要求。比如在车联网的充电场景,车联网平台需要实时感知充电桩的状态,如果充电桩积累够一批消息再上报平台,平台获取到的状态会不准确,如果心跳消息延时太久,平台会认为充电桩离线。efV28资讯网——每日最新资讯28at.com

对于有时延要求又需要批量操作的场景,可以设置一个超时时间,超时后即使消息数量不够,也会发送出去。看下 RabbitMQ 的处理:efV28资讯网——每日最新资讯28at.com

public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)  throws AmqpException { if (correlationData != null) {  //...  super.send(exchange, routingKey, message, correlationData); } else {  if (this.scheduledTask != null) {   this.scheduledTask.cancel(false);  }  MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);  if (batch != null) {   super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);  }  //这里获取到超时时间,到达超时时间后使用定时器将消息发送出去  Date next = this.batchingStrategy.nextRelease();  if (next != null) {   this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);  } }}

可靠性

使用批处理一定要考虑可靠性的问题。efV28资讯网——每日最新资讯28at.com

在消费端,消费者批量拉取一批消息后把消息暂存到一个内存临时队列,然后多线程去临时队列消费消息,如果服务宕机,临时队列中的消息会丢失。efV28资讯网——每日最新资讯28at.com

为了避免宕机引发的损失,可以拉取一批消息后保存到数据库,然后给 Broker 返回 ACK,之后业务代码去数据库查询消息并消费,不过要考虑数据库大事务、锁竞争等问题。efV28资讯网——每日最新资讯28at.com

当然,对于一些消息丢失不敏感的场景,比如日志收集之类的,可靠性这个指标是不用太关注的。efV28资讯网——每日最新资讯28at.com

特殊场景

因为批量消息有一些复杂性,消息队列的部分特性不支持。efV28资讯网——每日最新资讯28at.com

事务消息

批量消息会增加消息重试的难度,所以对于事务消息,建议使用单条消息,一条消息对应一个事务。efV28资讯网——每日最新资讯28at.com

顺序消息

顺序消息的实现思路一般是生产者将消息发送到同一个分区,消费者绑定这个分区并使用单线程消费这个分区的消息。如果对同一个 Topic 下的同一个分区来实现批量发送,难度会增大。所以建议顺序消息使用单条消息进行发送。efV28资讯网——每日最新资讯28at.com

延时消息

如果延时消息使用批量进行发送,这一批消息的延时时间必须相同,同时要考虑批量消息的超时时间,超时时间太大会影响延时时间的准确性,生产端实现复杂度大大增加。efV28资讯网——每日最新资讯28at.com

总结

使用批量消息,在一定程度上可以提高性能和吞吐量,但是确实也会存在一些问题,使用的时候要结合业务场景避开这些坑。efV28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-35258-0.html消息队列批量收发消息,请避开这五个坑!

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 利用属性选择器对外部链接进行样式设计

下一篇: 全网最详细的OpenFeign讲解,肯定有你不知道的

标签:
  • 热门焦点
  • K60 Pro官方停产 第三方瞬间涨价

    虽然没有官方宣布,但Redmi的一些高管也已经透露了,Redmi K60 Pro已经停产且不会补货,这一切都是为了即将到来的K60 Ultra铺路,属于厂家的正常操作。但有意思的是该机在停产之后
  • 一文看懂为苹果Vision Pro开发应用程序

    译者 | 布加迪审校 | 重楼苹果的Vision Pro是一款混合现实(MR)头戴设备。Vision Pro结合了虚拟现实(VR)和增强现实(AR)的沉浸感。其高分辨率显示屏、先进的传感器和强大的处理能力
  • .NET 程序的 GDI 句柄泄露的再反思

    一、背景1. 讲故事上个月我写过一篇 如何洞察 C# 程序的 GDI 句柄泄露 文章,当时用的是 GDIView + WinDbg 把问题搞定,前者用来定位泄露资源,后者用来定位泄露代码,后面有朋友反
  • 年轻人的“职场羞耻感”,无处不在

    作者:冯晓亭 陶 淘 李 欣 张 琳 马舒叶来源:燃次元&ldquo;人在职场,应该选择什么样的着装?&rdquo;近日,在网络上,一个与着装相关的帖子引发关注,在该帖子里,一位在高级写字楼亚洲金
  • 冯提莫签约抖音公会 前“斗鱼一姐”消失在直播间

    来源:直播观察提起&ldquo;冯提莫&rdquo;这个名字,很多网友或许听过,但应该不记得她是哪位主播了。其实,作为曾经的&ldquo;斗鱼一姐&rdquo;,冯提莫在游戏直播的年代影响力不输于现
  • 支持aptX Lossless无损传输 iQOO TWS 1赛道版发布限时优惠价369元

    2023年7月4日,“无损音质,声动人心”iQOO TWS 1正式发布,支持aptX Lossless无损传输,限时优惠价369元。iQOO TWS 1耳机率先支持端到端aptX Lossless无
  • 2299元起!iQOO Pad开启预售:性能最强天玑平板

    5月23日,iQOO如期举行了新品发布会,除了首发安卓最强旗舰处理器的iQOO Neo8系列新机外,还在发布会上推出了旗下首款平板电脑——iQOO Pad,其搭载了天玑
  • 中关村论坛11月25日开幕,15位诺奖级大咖将发表演讲

    11月18日,记者从2022中关村论坛新闻发布会上获悉,中关村论坛将于11月25至30日在京举行。本届中关村论坛由科学技术部、国家发展改革委、工业和信息化部、国务
  • 北京:科技教育体验基地开始登记

      北京“科技馆之城”科技教育体验基地登记和认证工作日前启动。首批北京科技教育体验基地拟于2023年全国科普日期间挂牌,后续还将开展常态化登记。  北京科技教育体验基
Top