当前位置:首页 > 科技  > 软件

完美解决,RocketMQ如何支持多事务消息?

来源: 责编: 时间:2024-02-04 17:24:39 303观看
导读今天我们将解决使用RocketMQ事务消息时可能遇到的一个常见问题:如何让其支持多事务消息?1. 问题背景在实际开发中,我们常常会面临多事务消息的场景,例如在DailyMart的订单模块中,用户支付后需要调用库存服务进行库存扣减,而

今天我们将解决使用RocketMQ事务消息时可能遇到的一个常见问题:如何让其支持多事务消息?2vk28资讯网——每日最新资讯28at.com

1. 问题背景

在实际开发中,我们常常会面临多事务消息的场景,例如在DailyMart的订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。这两个业务逻辑都需要通过事务消息来保证分布式事务。2vk28资讯网——每日最新资讯28at.com

为了处理这种情况,我们可能会考虑在订单模块中创建两个事务消息监听器,分别用于处理库存扣减和积分赠送的事务处理和事务回查。2vk28资讯网——每日最新资讯28at.com

@Component@Slf4j//处理订单支付的事务监听器public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {  @Override  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {    ......    //处理订单支付逻辑   }  @Override  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {      ......      //检查订单处理逻辑   }}@Component@Slf4j//处理订单收货的事务监听器public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {  @Override  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {    ......   }  @Override  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {      ......   }}

然而,当我们信心满满地完成业务逻辑编写并启动服务时,可能会遇到如下错误:rocketMQTemplate already exists RocketMQLocalTransactionListener2vk28资讯网——每日最新资讯28at.com

图片图片2vk28资讯网——每日最新资讯28at.com

在rocketmq-spring-boot-starter版本低于2.1.0的项目中,可以使用多个 @RocketMQTransactionListener 监听不同的 txProducerGroup 来发送不同类型的事务消息到topic。然而,从 RocketMQ-Spring 2.1.0 版本开始,注解 @RocketMQTransactionListener 不能设置 txProducerGroup、ak、sk,这些值均需与对应的 RocketMQTemplate 保持一致。通过阅读源码 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已经存在了 RocketMQTransactionListener 则会出现上述错误。2vk28资讯网——每日最新资讯28at.com

图片图片2vk28资讯网——每日最新资讯28at.com

2. 如何解决

为了在保证系统只有一个 RocketMQTransactionListener 的前提下实现多事务消息,我们可以将 RocketMQLocalTransactionListener 不处理具体业务逻辑,而是将其作为一个分发器使用。2vk28资讯网——每日最新资讯28at.com

在生产者发送事务消息时指定对应的事务处理器 ,并将事务处理器放置在消息头上发送出去,在 RocketMQTransactionListener 中根据消息头选择具体的事务处理器来实现业务逻辑。2vk28资讯网——每日最新资讯28at.com

具体实现如下:2vk28资讯网——每日最新资讯28at.com

2.1 定义事务消息处理接口

首先,定义公共的事务消息处理接口,所有事务消息都实现此接口而非 RocketMQ 默认的 RocketMQLocalTransactionListener。2vk28资讯网——每日最新资讯28at.com

public interface TransactionMessageHandler {        /**    * 执行本地事务    * @param payload 消息体    * @param arg 参数    */    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg);        /**     * 检查本地执行状态     * @param payload 消息体     * @return 执行结果     */    RocketMQLocalTransactionState checkLocalTransaction(Object payload);    }

2.2 修改事务消息发送工具类,指定消息处理器

public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) {    if(transactionMessageListener == null){    throw new IllegalArgumentException("transactionMessageListener must not null");  }    String destination = buildDestination(topic, tag);  Message<T> sendMessage = MessageBuilder.withPayload(message)    .setHeader(RocketMQHeaders.KEYS, message.getKey())    .setHeader(SOURCE_HEADER, message.getSource())    .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())    .build();  TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null);  log.info("[{}]事务消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));  return sendResult;}

2.3 修改RocketMQ事务消息监听器

@Slf4j@RocketMQTransactionListenerpublic class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener {        private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap;        public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) {        this.transactionMessageHandlerMap = transactionMessageHandlerMap;    }        @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {        log.info("消费者收到事务消息[{}]", JSONObject.toJSON(message));        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);                if (null == listenerName) {            throw new RuntimeException("not params transactionMessageListener");        }                RocketMQLocalTransactionState state;        Object payload = message.getPayload();        try {            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);            if (null == messageHandler) {                throw new RuntimeException("not match condition TransactionMessageHandler");            }            state = messageHandler.executeLocalTransaction(payload, arg);        } catch (Exception e) {            log.error("rocket transaction message executeLocal error:{}", e.getMessage());            return RocketMQLocalTransactionState.ROLLBACK;        }                return state;    }        @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {        log.info("消费者收到事务回查消息[{}]", JsonUtils.obj2String(message.getHeaders()));        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);        if (null == listenerName) {            throw new RuntimeException("not params transactionMessageListener");        }        RocketMQLocalTransactionState state;        try {            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);            if (null == messageHandler) {                throw new RuntimeException("not match condition TransactionMessageHandler");            }            state = messageHandler.checkLocalTransaction(message.getPayload());        } catch (Exception e) {            log.error("rocket transaction message executeLocal error:{}", e.getMessage());            return RocketMQLocalTransactionState.ROLLBACK;        }                return state;    }    }

在上述代码中,根据消息头中的TRANSACTION_MESSAGE_HEADER参数选择对应的事务处理器来处理事务消息。2vk28资讯网——每日最新资讯28at.com

在 DailyMart 中有一个公共组件 dailymart-rocketmq-spring-boot-starter 专门用于 RocketMQ 消息发送监听的封装,因此我们也将事务消息的处理逻辑封装到了此组件中。2vk28资讯网——每日最新资讯28at.com

图片图片2vk28资讯网——每日最新资讯28at.com

2.4 修改事务消息处理逻辑

所有的事务消息处理逻辑都实现 TransactionMessageHandler 接口,以订单支付的处理逻辑为例:2vk28资讯网——每日最新资讯28at.com

@Component@Slf4jpublic class OrderPaidTransactionConsumer implements TransactionMessageHandler {        @Resource    private TransactionTemplate transactionTemplate;            @Override    public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) {        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);        ...    }        @Override    public RocketMQLocalTransactionState checkLocalTransaction(Object payload) {        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);        ...    }    }

2.5 修改事务消息发送逻辑,指定事务处理器

TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);

小结

本文解决了在 RocketMQ 2.1.0 版本以后,无法简单使用多个 @RocketMQTransactionListener 的问题。通过引入事务消息处理接口 TransactionMessageHandler,我们将原有的事务处理器改造成了一个分发器,使得在 DailyMart 项目中可以轻松处理多事务消息的场景。2vk28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-73332-0.html完美解决,RocketMQ如何支持多事务消息?

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: .NET 中优秀依赖注入框架Autofac看一篇就够了

下一篇: 一场「跨时空」的小年夜直播:以数字技术助力戏曲「焕活」传承

标签:
  • 热门焦点
  • K60 Pro官方停产 第三方瞬间涨价

    虽然没有官方宣布,但Redmi的一些高管也已经透露了,Redmi K60 Pro已经停产且不会补货,这一切都是为了即将到来的K60 Ultra铺路,属于厂家的正常操作。但有意思的是该机在停产之后
  • 对标苹果的灵动岛 华为带来实况窗功能

    继苹果的灵动岛之后,华为也在今天正式推出了“实况窗”功能。据今天鸿蒙OS 4.0的现场演示显示,华为的实况窗可以更高效的展现出实时通知,比如锁屏上就能看到外卖、打车、银行
  • 一文看懂为苹果Vision Pro开发应用程序

    译者 | 布加迪审校 | 重楼苹果的Vision Pro是一款混合现实(MR)头戴设备。Vision Pro结合了虚拟现实(VR)和增强现实(AR)的沉浸感。其高分辨率显示屏、先进的传感器和强大的处理能力
  • K8S | Service服务发现

    一、背景在微服务架构中,这里以开发环境「Dev」为基础来描述,在K8S集群中通常会开放:路由网关、注册中心、配置中心等相关服务,可以被集群外部访问;图片对于测试「Tes」环境或者
  • 一年经验在二线城市面试后端的经验分享

    忠告这篇文章只适合2年内工作经验、甚至没有工作经验的朋友阅读。如果你是2年以上工作经验,请果断划走,对你没啥帮助~主人公这篇文章内容来自 「升职加薪」星球星友 的投稿,坐
  • 雅柏威士忌多款单品价格大跌,泥煤顶流也不香了?

    来源 | 烈酒商业观察编 | 肖海林今年以来,威士忌市场开始出现了降温迹象,越来越多不断暴涨的网红威士忌也开始悄然回归市场理性。近日,LVMH集团旗下苏格兰威士忌品牌雅柏(Ardbeg
  • 消费结构调整丨巨头低价博弈,拼多多还卷得动吗?

    来源:征探财经作者:陈香羽随着流量红利的退潮,电商的存量博弈越来越明显。曾经主攻中高端与品质的淘宝天猫、京东重拾&ldquo;低价&rdquo;口号。而过去与他们错位竞争的拼多多,靠
  • 与兆芯合作 联想推出全新旗舰版笔记本电脑开天N7系列

    联想与兆芯合作推出全新联想旗舰版笔记本电脑开天 N7系列。这个系列采用兆芯KX-6640MA处理器平台,KX-6640MA 处理器是采用了陆家嘴架构,16nm 工艺,4 核 4 线
  • 世界人工智能大会国际日开幕式活动在世博展览馆开启

    30日上午,世界人工智能大会国际日开幕式活动在世博展览馆开启,聚集国际城市代表、重量级院士专家、国际创新企业代表,共同打造人工智能交流平台。上海市副市
Top