当前位置:首页 > 科技  > 软件

深入理解 RocketMQ 广播消费

来源: 责编: 时间:2023-09-28 10:09:12 441观看
导读这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念、实现机制、实战案例三个方面一一展开,希望能帮助到大家。1 基础概念RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broa

这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念、实现机制、实战案例三个方面一一展开,希望能帮助到大家。hmE28资讯网——每日最新资讯28at.com

1 基础概念

RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。hmE28资讯网——每日最新资讯28at.com

集群消费:hmE28资讯网——每日最新资讯28at.com

同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。hmE28资讯网——每日最新资讯28at.com

图片图片hmE28资讯网——每日最新资讯28at.com

广播消费:hmE28资讯网——每日最新资讯28at.com

当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。hmE28资讯网——每日最新资讯28at.com

图片图片hmE28资讯网——每日最新资讯28at.com

2 源码解析

首先下图展示了广播消费的代码示例。hmE28资讯网——每日最新资讯28at.com

public class PushConsumer {    public static final String CONSUMER_GROUP = "myconsumerGroup";    public static final String DEFAULT_NAMESRVADDR = "localhost:9876";    public static final String TOPIC = "mytest";    public static final String SUB_EXPRESSION = "TagA || TagC || TagD";    public static void main(String[] args) throws InterruptedException, MQClientException {        // 定义 DefaultPushConsumer         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);        // 定义名字服务地址        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);        // 定义消费读取位点        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);        // 定义消费模式        consumer.setMessageModel(MessageModel.BROADCASTING);        // 订阅主题信息        consumer.subscribe(TOPIC, SUB_EXPRESSION);        // 订阅消息监听器        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {            try {                for (MessageExt messageExt : msgs) {                    System.out.println(new String(messageExt.getBody()));                }            }catch (Exception e) {                e.printStackTrace();            }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });        consumer.start();        System.out.printf("Broadcast Consumer Started.%n");    }}

和集群消费不同的点在于下面的代码:hmE28资讯网——每日最新资讯28at.com

consumer.setMessageModel(MessageModel.BROADCASTING);

接下来,我们从源码角度来看看广播消费和集群消费有哪些差异点 ?hmE28资讯网——每日最新资讯28at.com

首先进入 DefaultMQPushConsumerImpl 类的 start 方法 , 分析启动流程中他们两者的差异点:hmE28资讯网——每日最新资讯28at.com

图片图片hmE28资讯网——每日最新资讯28at.com

▍ 差异点1:拷贝订阅关系hmE28资讯网——每日最新资讯28at.com

private void copySubscription() throws MQClientException {    try {       Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();       if (sub != null) {          for (final Map.Entry<String, String> entry : sub.entrySet()) {              final String topic = entry.getKey();              final String subString = entry.getValue();              SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);            }        }       if (null == this.messageListenerInner) {          this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();       }       // 注意下面的代码 , 集群模式下自动订阅重试主题        switch (this.defaultMQPushConsumer.getMessageModel()) {           case BROADCASTING:               break;           case CLUSTERING:                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);                break;            default:                break;        }    } catch (Exception e) {        throw new MQClientException("subscription exception", e);    }}

在集群模式下,会自动订阅重试队列,而广播模式下,并没有这段代码。也就是说广播模式下,不支持消息重试。hmE28资讯网——每日最新资讯28at.com

▍ 差异点2:本地进度存储hmE28资讯网——每日最新资讯28at.com

switch (this.defaultMQPushConsumer.getMessageModel()) {    case BROADCASTING:        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    case CLUSTERING:        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    default:        break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

我们可以看到消费进度存储的对象是:LocalFileOffsetStore , 进度文件存储在如下的主目录 /{用户主目录}/.rocketmq_offsets。hmE28资讯网——每日最新资讯28at.com

public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(    "rocketmq.client.localOffsetStoreDir",    System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

进度文件是 /mqClientId/{consumerGroupName}/offsets.json 。hmE28资讯网——每日最新资讯28at.com

this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";

笔者创建了一个主题 mytest , 包含4个队列,进度文件内容如下:hmE28资讯网——每日最新资讯28at.com

图片图片hmE28资讯网——每日最新资讯28at.com

消费者启动后,我们可以将整个流程简化如下图,并继续整理差异点:hmE28资讯网——每日最新资讯28at.com

图片图片hmE28资讯网——每日最新资讯28at.com

▍ 差异点3:负载均衡消费该主题的所有 MessageQueuehmE28资讯网——每日最新资讯28at.com

进入负载均衡抽象类 RebalanceImpl 的rebalanceByTopic方法 。hmE28资讯网——每日最新资讯28at.com

private void rebalanceByTopic(final String topic, final boolean isOrder) {    switch (messageModel) {        case BROADCASTING: {            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);            if (mqSet != null) {                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);                // 省略代码            } else {                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);            }            break;        }        case CLUSTERING: {            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);            // 省略代码            if (mqSet != null && cidAll != null) {                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();                mqAll.addAll(mqSet);                Collections.sort(mqAll);                Collections.sort(cidAll);                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;                List<MessageQueue> allocateResult = null;                try {                     allocateResult = strategy.allocate(                            this.consumerGroup,                            this.mQClientFactory.getClientId(),                            mqAll,                            cidAll);                    } catch (Throwable e) {                        // 省略日志打印代码                        return;                    }                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();                if (allocateResult != null) {                    allocateResultSet.addAll(allocateResult);                }                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);                //省略代码            }            break;        }        default:            break;    }}

从上面代码我们可以看到消息模式为广播消费模式时,消费者会消费该主题下所有的队列,这一点也可以从本地的进度文件 offsets.json 得到印证。hmE28资讯网——每日最新资讯28at.com

▍ 差异点4:不支持顺序消息hmE28资讯网——每日最新资讯28at.com

我们知道消费消息顺序服务会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔 20 秒会重新尝试。hmE28资讯网——每日最新资讯28at.com

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            try {                ConsumeMessageOrderlyService.this.lockMQPeriodically();            } catch (Throwable e) {                log.error("scheduleAtFixedRate lockMQPeriodically exception", e);            }        }    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}

但是从上面的代码,我们发现只有在集群消费的时候才会定时申请锁,这样就会导致广播消费时,无法为负载均衡的队列申请锁,导致拉取消息服务一直无法获取消息数据。hmE28资讯网——每日最新资讯28at.com

笔者修改消费例子,在消息模式为广播模式的场景下,将消费模式从并发消费修改为顺序消费。hmE28资讯网——每日最新资讯28at.com

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {    try {        for (MessageExt messageExt : msgs) {            System.out.println(new String(messageExt.getBody()));        }    }catch (Exception e) {        e.printStackTrace();    }    return ConsumeOrderlyStatus.SUCCESS;});

图片图片hmE28资讯网——每日最新资讯28at.com

推送服务是一个 TCP 服务(自定义协议),同时也是一个消费者服务,消息模式是广播消费。hmE28资讯网——每日最新资讯28at.com

司机打开司机端 APP 后,APP 会通过负载均衡和推送服务创建长连接,推送服务会保存 TCP 连接引用 (比如司机编号和 TCP channel 的引用)。hmE28资讯网——每日最新资讯28at.com

派单服务是生产者,将派单数据发送到 MetaQ ,  每个推送服务都会消费到该消息,推送服务判断本地内存中是否存在该司机的 TCP channel , 若存在,则通过 TCP 连接将数据推送给司机端。hmE28资讯网——每日最新资讯28at.com

肯定有同学会问:假如网络原因,推送失败怎么处理 ?有两个要点:hmE28资讯网——每日最新资讯28at.com

  1. 司机端 APP 定时主动拉取派单信息;
  2. 当推送服务没有收到司机端的 ACK 时 ,也会一定时限内再次推送,达到阈值后,不再推送。

3.2 缓存同步

高并发场景下,很多应用使用本地缓存,提升系统性能 。hmE28资讯网——每日最新资讯28at.com

本地缓存可以是 HashMap 、ConcurrentHashMap ,也可以是缓存框架 Guava Cache 或者 Caffeine cache 。hmE28资讯网——每日最新资讯28at.com

图片图片hmE28资讯网——每日最新资讯28at.com

如上图,应用A启动后,作为一个 RocketMQ 消费者,消息模式设置为广播消费。为了提升接口性能,每个应用节点都会将字典表加载到本地缓存里。hmE28资讯网——每日最新资讯28at.com

当字典表数据变更时,可以通过业务系统发送一条消息到 RocketMQ ,每个应用节点都会消费消息,刷新本地缓存。hmE28资讯网——每日最新资讯28at.com

4 总结

集群消费和广播消费模式下,各功能的支持情况如下:hmE28资讯网——每日最新资讯28at.com

功能hmE28资讯网——每日最新资讯28at.com

集群消费hmE28资讯网——每日最新资讯28at.com

广播消费hmE28资讯网——每日最新资讯28at.com

顺序消息hmE28资讯网——每日最新资讯28at.com

支持hmE28资讯网——每日最新资讯28at.com

不支持hmE28资讯网——每日最新资讯28at.com

重置消费位点hmE28资讯网——每日最新资讯28at.com

支持hmE28资讯网——每日最新资讯28at.com

不支持hmE28资讯网——每日最新资讯28at.com

消息重试hmE28资讯网——每日最新资讯28at.com

支持hmE28资讯网——每日最新资讯28at.com

不支持hmE28资讯网——每日最新资讯28at.com

消费进度hmE28资讯网——每日最新资讯28at.com

服务端维护hmE28资讯网——每日最新资讯28at.com

客户端维护hmE28资讯网——每日最新资讯28at.com

广播消费主要用于两种场景:消息推送和缓存同步。hmE28资讯网——每日最新资讯28at.com

参考资料 :hmE28资讯网——每日最新资讯28at.com

https://www.51cto.com/article/714277.htmlhmE28资讯网——每日最新资讯28at.com

https://ost.51cto.com/posts/21100hmE28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-11888-0.html深入理解 RocketMQ 广播消费

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 消息队列技术选型:这七种消息场景一定要考虑!

下一篇: 从零开发可视化大屏制作平台

标签:
  • 热门焦点
  • K60至尊版刚预热 一加Ace2 Pro正面硬刚

    Redmi这边刚如火如荼的宣传了K60 Ultra的各种技术和硬件配置,作为竞品的一加也坐不住了。一加中国区总裁李杰发布了两条微博,表示在自家的一加Ace2上早就已经采用了和PixelWo
  • Rust中的高吞吐量流处理

    作者 | Noz编译 | 王瑞平本篇文章主要介绍了Rust中流处理的概念、方法和优化。作者不仅介绍了流处理的基本概念以及Rust中常用的流处理库,还使用这些库实现了一个流处理程序
  • 虚拟键盘 API 的妙用

    你是否在遇到过这样的问题:移动设备上有一个固定元素,当激活虚拟键盘时,该元素被隐藏在了键盘下方?多年来,这一直是 Web 上的默认行为,在本文中,我们将探讨这个问题、为什么会发生
  • 重估百度丨“晚熟”的百度云,能等到春天吗?

    &copy;自象限原创作者|程心排版|王喻可2016年7月13日,百度云计算战略发布会在北京举行,宣告着百度智能云的正式启程。彼时的会场座无虚席,甚至排队排到了门外,在场的所有人几乎都
  • 猿辅导与新东方的两种“归途”

    作者|卓心月 出品|零态LT(ID:LingTai_LT)如何成为一家伟大企业?答案一定是对&ldquo;势&rdquo;的把握,这其中最关键的当属对企业战略的制定,且能够站在未来看现在,即使这其中的
  • 小米公益基金会捐赠2500万元驰援北京、河北暴雨救灾

    8月2日消息,今日小米科技创始人雷军在其微博上发布消息称,小米公益基金会宣布捐赠2500万元驰援北京、河北暴雨救灾。携手抗灾,京冀安康!以下为公告原文
  • 携众多高端产品亮相ChinaJoy,小米带来一场科技与人文的视听盛宴

    7月28日,全球数字娱乐领域最具知名度与影响力的年度盛会中国国际数码互动娱乐展览会(简称ChinaJoy)在上海新国际博览中心盛大开幕。作为全球领先的科
  • 联想YOGA 16s 2022笔记本将要推出,屏幕支持触控功能

    联想此前宣布,将于11月2日19:30召开联想秋季轻薄新品发布会,推出联想 YOGA 16s 2022 笔记本等新品。官方称,YOGA 16s 2022 笔记本将搭载 16 英寸屏幕,并且是一
  • 华为举行春季智慧办公新品发布会 首次推出电子墨水屏平板

    北京时间2月27日晚,华为在巴塞罗那举行春季智慧办公新品发布会,在海外市场推出之前已经在中国市场上市的笔记本、平板、激光打印机等办公产品,并首次推出搭载
Top