当前位置:首页 > 科技  > 软件

实战与原理:如何基于RocketMQ实现分布式事务?

来源: 责编: 时间:2024-01-26 08:59:37 141观看
导读使用事务消息在DailyMart系统中,用户发起支付后,订单系统需要调用库存服务执行库存扣减逻辑。由于这是跨服务调用,因此会产生分布式事务。在这里,我们使用RocketMQ的事务消息来实现分布式事务。1、首先,在订单服务的应用服

使用事务消息

在DailyMart系统中,用户发起支付后,订单系统需要调用库存服务执行库存扣减逻辑。图片22n28资讯网——每日最新资讯28at.com

由于这是跨服务调用,因此会产生分布式事务。在这里,我们使用RocketMQ的事务消息来实现分布式事务。22n28资讯网——每日最新资讯28at.com

1、首先,在订单服务的应用服务层处理支付逻辑,并调用RocketMQ发送事务消息:22n28资讯网——每日最新资讯28at.com

@Overridepublic String payment(String orderSn) {    // todo 集成支付宝支付    // 支付流水号    String outOrderNo = IdUtils.get32UUID();    TradeOrder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn)).orElseThrow(() -> new BusinessException("订单编号不存在"));    // 如果订单处于待支付状态    if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {        OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);        TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");        if (SendStatus.SEND_OK == sendResult.getSendStatus() && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {            return tradeOrder.getOrderSn();        } else {            throw new BusinessException("支付失败...");        }    } else {        throw new BusinessException("订单已支付,请勿重复提交...");    }}

22n28资讯网——每日最新资讯28at.com

2、在订单服务的基础设施层,创建一个类实现 RocketMQLocalTransactionListener 接口:22n28资讯网——每日最新资讯28at.com

该接口有两个方法:22n28资讯网——每日最新资讯28at.com

  • executeLocalTransaction:用于执行本地事务。
  • checkLocalTransaction:在RocketMQ执行消息回查时检查本地事务执行结果,用于确定消息提交还是回滚。
@Component@Slf4jpublic class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {        @Resource    private TransactionTemplate transactionTemplate;    @Resource    private TradeOrderService tradeOrderService;         /**     * 执行本地事务     * 将订单状态修改成已支付     */    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {                final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);        try {            // 放到同一个本地事务中            this.transactionTemplate.executeWithoutResult(status -> {                String orderSn = orderPaidEvent.getOrderSn();                // 修改成待发货                tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);            });            return RocketMQLocalTransactionState.COMMIT;        } catch (Exception e) {            log.error("修改订单状态失败", e);            // ROLLBACK 则回滚消息,rocketmq将废弃这条消息            return RocketMQLocalTransactionState.ROLLBACK;            // 如果是UNKNOWN, 则触发回查        }    }    /**     * 检查本地事务执行状态     * 消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。     */    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);        String orderSn = orderPaidEvent.getOrderSn();        TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);        // 如果已经修改成待发货说明本地事务执行成功,此时消费端可以直接消费        if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {            return RocketMQLocalTransactionState.COMMIT;        } else {            // 这里查不到的时候返回 UNKNOWN在于,有可能事务还没有提交,回查就开始了            return RocketMQLocalTransactionState.UNKNOWN;        }    }}

22n28资讯网——每日最新资讯28at.com

3、在库存服务的基础设施层,监听消息以执行库存扣减逻辑:22n28资讯网——每日最新资讯28at.com

@Component@Slf4j@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")public class InventoryDeductionConsumer extends EnhanceMessageHandler<OrderPaidEvent> implements RocketMQListener<OrderPaidEvent> {        @Resource    private InventoryDomainService inventoryDomainService;        @Override    public void onMessage(OrderPaidEvent orderPaidEvent) {        super.dispatchMessage(orderPaidEvent);    }        @Override    protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {        // 执行库存扣减逻辑        String orderSn = orderPaidEvent.getOrderSn();        inventoryDomainService.deductionInventory(orderSn);    }}

通过以上步骤,我们完成了RocketMQ事务消息的发送,利用事务消息的特性保证分布式事务的最终一致性。与普通消息相比,事务消息在处理时需要实现 RocketMQLocalTransactionListener 接口,这是事务消息的核心。22n28资讯网——每日最新资讯28at.com

介绍完事务消息的使用,接下来我们再来聊聊事务消息的原理。22n28资讯网——每日最新资讯28at.com

事务消息的原理

首先,让我们思考一下,如果不使用事务消息会有什么问题。22n28资讯网——每日最新资讯28at.com

很容易想到的一个问题就是消息丢失。当保存订单后由于网络问题导致消息丢失,如下图所示:22n28资讯网——每日最新资讯28at.com

图片图片22n28资讯网——每日最新资讯28at.com

在不使用RocketMQ的情况下,我们往往会通过 本地消息表 + 补偿重试 的机制来保证消息一定会发送出去。其原理可以参考上篇文章 [Dailymart26:微服务中躲不过的坑 - 分布式事务]。22n28资讯网——每日最新资讯28at.com

那RocketMQ是如何解决这个问题的呢?22n28资讯网——每日最新资讯28at.com

1. 发送half消息,探测MQ是否正常

在基于RocketMQ的事务消息中,我们不是先执行自身的订单支付逻辑,而是先让订单系统发送一条 half消息 到MQ去。这个half消息本质上是一个订单支付成功的消息,只不过此时库存系统是看不见这个half消息的。然后,我们等待接收这个half消息写入成功的响应通知。22n28资讯网——每日最新资讯28at.com

图片图片22n28资讯网——每日最新资讯28at.com

发送half消息的本质其实是为了探测MQ是否仍然正常运行。但问题来了,如上所述,消息会发生丢失,那么half消息丢失怎么办呢?22n28资讯网——每日最新资讯28at.com

2. half消息发送失败

在发送half消息时,由于网络原因或者MQ直接挂了,就会导致half消息发送失败。这个时候订单系统需要执行一系列的回滚操作。在我们的场景中,应该执行退款操作,将钱退还给用户,并告知用户交易失败。22n28资讯网——每日最新资讯28at.com

3. half消息成功,订单系统执行自己的业务逻辑

如果成功收到half消息的正常响应,此时订单系统应该执行自己的业务逻辑。在我们这个场景中,就是修改订单数据库状态,将其修改为待发货状态。这部分逻辑就对应上述代码中的executeLocalTransaction()方法。22n28资讯网——每日最新资讯28at.com

图片图片22n28资讯网——每日最新资讯28at.com

4. 订单本地事务执行失败

如果订单系统执行本地事务失败,则需要发送一个rollback请求给MQ,让其删除这条half消息。22n28资讯网——每日最新资讯28at.com

图片图片22n28资讯网——每日最新资讯28at.com

5. 订单本地事务执行成功

如果订单系统的本地事务执行正常,此时需要发送一个commit请求给MQ,要求MQ对之前的half消息进行commit操作,这样库存系统就可以消费这条消息了。22n28资讯网——每日最新资讯28at.com

图片图片22n28资讯网——每日最新资讯28at.com

订单创建消息处于half状态时,库存系统是看不见它的。必须等到订单系统执行commit请求,消息被commit后,库存系统才能看到并获取这条消息进行后续处理。22n28资讯网——每日最新资讯28at.com

6. half消息发送成功,但是没收到half的响应

以上就是RocketMQ事务消息的正向流程。22n28资讯网——每日最新资讯28at.com

然而,还有一个问题:如果订单系统发送half消息成功后却没有收到half消息的响应,该如何处理呢?22n28资讯网——每日最新资讯28at.com

在这种情况下,订单系统可能会误以为是发送half消息到MQ失败了。订单系统就会执行回滚流程,退还支付金额,关闭订单。22n28资讯网——每日最新资讯28at.com

图片图片22n28资讯网——每日最新资讯28at.com

然而,此时MQ系统中已经存在了一条half消息。这条half消息又该如何处理呢?22n28资讯网——每日最新资讯28at.com

在RocketMQ中,有一套补偿流程。RocketMQ会定期扫描处于half状态的消息。如果一直没有对这个消息执行 commit/rollback 操作,超过了一定的时间,RocketMQ就会回调你的订单系统的一个接口,用以确认你本地事务的情况。22n28资讯网——每日最新资讯28at.com

当订单系统收到MQ的回查请求时,就需要检索一下数据库,根据订单状态决定执行commit还是rollback。22n28资讯网——每日最新资讯28at.com

这部分逻辑就对应上述代码中checkLocalTransaction()方法。22n28资讯网——每日最新资讯28at.com

图片图片22n28资讯网——每日最新资讯28at.com

7. rollback 或者 commit 失败怎么办?

通过上述说明,可以看到,RocketMQ是根据rollback或commit操作来决定half消息的状态的。如果业务系统执行了commit操作,则将half消息设置为可见,库存系统可以消费;如果业务系统执行了rollback操作,MQ就会删除half消息。那么问题来了:如果订单系统在执行rollback或commit操作时失败又该如何处理呢?22n28资讯网——每日最新资讯28at.com

这时候仍然依赖于前文提到的回查机制。22n28资讯网——每日最新资讯28at.com

由于此时MQ中的消息一直处于half状态,超过一定的超时时间后,MQ会发现这个half消息有问题,然后回调你的订单系统的接口。此时订单系统需要根据订单状态来决定执行commit请求还是rollback请求。22n28资讯网——每日最新资讯28at.com

以上,就是RocketMQ事务消息的原理。结合文章开头的代码,是不是已经很清晰了呢?22n28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-68319-0.html实战与原理:如何基于RocketMQ实现分布式事务?

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: Python Pathlib模块:一站式解决文件路径难题

下一篇: 我爱说实话,Mica-Http 超好用!

标签:
  • 热门焦点
  • 红魔电竞平板评测:大屏幕硬实力

    红魔电竞平板评测:大屏幕硬实力

    前言:三年的疫情因为要上网课的原因激活了平板市场,如今网课的时代已经过去,大家的生活都恢复到了正轨,这也就意味着,真正考验平板电脑生存的环境来了。也就是面对着这种残酷的
  • 中兴AX5400Pro+上手体验:再升级 双2.5G网口+USB 3.0这次全都有

    中兴AX5400Pro+上手体验:再升级 双2.5G网口+USB 3.0这次全都有

    2021年11月的时候,中兴先后发布了两款路由器产品,中兴AX5400和中兴AX5400 Pro,从产品命名上就不难看出这是隶属于同一系列的,但在外观设计上这两款产品可以说是完全没一点关系
  • Redmi Pad评测:红米充满野心的一次尝试

    Redmi Pad评测:红米充满野心的一次尝试

    从Note系列到K系列,从蓝牙耳机到笔记本电脑,红米不知不觉之间也已经形成了自己颇有竞争力的产品体系,在中端和次旗舰市场上甚至要比小米新机的表现来得更好,正所谓“大丈夫生居
  • 摸鱼心法第一章——和配置文件说拜拜

    摸鱼心法第一章——和配置文件说拜拜

    为了能摸鱼我们团队做了容器化,但是带来的问题是服务配置文件很麻烦,然后大家在群里进行了“亲切友好”的沟通图片图片图片图片对比就对比,简单对比下独立配置中心和k8s作为配
  • 2023 年的 Node.js 生态系统

    2023 年的 Node.js 生态系统

    随着技术的不断演进和创新,Node.js 在 2023 年达到了一个新的高度。Node.js 拥有一个庞大的生态系统,可以帮助开发人员更快地实现复杂的应用。本文就来看看 Node.js 最新的生
  • 得物效率前端微应用推进过程与思考

    得物效率前端微应用推进过程与思考

    一、背景效率工程随着业务的发展,组织规模的扩大,越来越多的企业开始意识到协作效率对于企业团队的重要性,甚至是决定其在某个行业竞争中突围的关键,是企业长久生存的根本。得物
  • Flowable工作流引擎的科普与实践

    Flowable工作流引擎的科普与实践

    一.引言当我们在日常工作和业务中需要进行各种审批流程时,可能会面临一系列技术和业务上的挑战。手动处理这些审批流程可能会导致开发成本的增加以及业务复杂度的上升。在这
  • 学习JavaScript的10个理由...

    学习JavaScript的10个理由...

    作者 | Simplilearn编译 | 王瑞平当你决心学习一门语言的时候,很难选择到底应该学习哪一门,常用的语言有Python、Java、JavaScript、C/CPP、PHP、Swift、C#、Ruby、Objective-
  • 拼多多APP上线本地生活入口,群雄逐鹿万亿市场

    拼多多APP上线本地生活入口,群雄逐鹿万亿市场

    Tech星球(微信ID:tech618)文 | 陈桥辉 Tech星球独家获悉,拼多多在其APP内上线了&ldquo;本地生活&rdquo;入口,位置较深,位于首页的&ldquo;充值中心&rdquo;内,目前主要售卖美食相关的
Top