今天我们将解决使用RocketMQ事务消息时可能遇到的一个常见问题:如何让其支持多事务消息?
在实际开发中,我们常常会面临多事务消息的场景,例如在DailyMart的订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。这两个业务逻辑都需要通过事务消息来保证分布式事务。
为了处理这种情况,我们可能会考虑在订单模块中创建两个事务消息监听器,分别用于处理库存扣减和积分赠送的事务处理和事务回查。
@Component@Slf4j//处理订单支付的事务监听器public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { ...... //处理订单支付逻辑 } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { ...... //检查订单处理逻辑 }}@Component@Slf4j//处理订单收货的事务监听器public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { ...... } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { ...... }}
然而,当我们信心满满地完成业务逻辑编写并启动服务时,可能会遇到如下错误:rocketMQTemplate already exists RocketMQLocalTransactionListener
图片
在rocketmq-spring-boot-starter版本低于2.1.0的项目中,可以使用多个 @RocketMQTransactionListener 监听不同的 txProducerGroup 来发送不同类型的事务消息到topic。然而,从 RocketMQ-Spring 2.1.0 版本开始,注解 @RocketMQTransactionListener 不能设置 txProducerGroup、ak、sk,这些值均需与对应的 RocketMQTemplate 保持一致。通过阅读源码 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已经存在了 RocketMQTransactionListener 则会出现上述错误。
图片
为了在保证系统只有一个 RocketMQTransactionListener 的前提下实现多事务消息,我们可以将 RocketMQLocalTransactionListener 不处理具体业务逻辑,而是将其作为一个分发器使用。
在生产者发送事务消息时指定对应的事务处理器 ,并将事务处理器放置在消息头上发送出去,在 RocketMQTransactionListener 中根据消息头选择具体的事务处理器来实现业务逻辑。
具体实现如下:
首先,定义公共的事务消息处理接口,所有事务消息都实现此接口而非 RocketMQ 默认的 RocketMQLocalTransactionListener。
public interface TransactionMessageHandler { /** * 执行本地事务 * @param payload 消息体 * @param arg 参数 */ RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg); /** * 检查本地执行状态 * @param payload 消息体 * @return 执行结果 */ RocketMQLocalTransactionState checkLocalTransaction(Object payload); }
public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) { if(transactionMessageListener == null){ throw new IllegalArgumentException("transactionMessageListener must not null"); } String destination = buildDestination(topic, tag); Message<T> sendMessage = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, message.getKey()) .setHeader(SOURCE_HEADER, message.getSource()) .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName()) .build(); TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null); log.info("[{}]事务消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult)); return sendResult;}
@Slf4j@RocketMQTransactionListenerpublic class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener { private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap; public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) { this.transactionMessageHandlerMap = transactionMessageHandlerMap; } @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { log.info("消费者收到事务消息[{}]", JSONObject.toJSON(message)); String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER); if (null == listenerName) { throw new RuntimeException("not params transactionMessageListener"); } RocketMQLocalTransactionState state; Object payload = message.getPayload(); try { TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName); if (null == messageHandler) { throw new RuntimeException("not match condition TransactionMessageHandler"); } state = messageHandler.executeLocalTransaction(payload, arg); } catch (Exception e) { log.error("rocket transaction message executeLocal error:{}", e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } return state; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("消费者收到事务回查消息[{}]", JsonUtils.obj2String(message.getHeaders())); String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER); if (null == listenerName) { throw new RuntimeException("not params transactionMessageListener"); } RocketMQLocalTransactionState state; try { TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName); if (null == messageHandler) { throw new RuntimeException("not match condition TransactionMessageHandler"); } state = messageHandler.checkLocalTransaction(message.getPayload()); } catch (Exception e) { log.error("rocket transaction message executeLocal error:{}", e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } return state; } }
在上述代码中,根据消息头中的TRANSACTION_MESSAGE_HEADER参数选择对应的事务处理器来处理事务消息。
在 DailyMart 中有一个公共组件 dailymart-rocketmq-spring-boot-starter 专门用于 RocketMQ 消息发送监听的封装,因此我们也将事务消息的处理逻辑封装到了此组件中。
图片
所有的事务消息处理逻辑都实现 TransactionMessageHandler 接口,以订单支付的处理逻辑为例:
@Component@Slf4jpublic class OrderPaidTransactionConsumer implements TransactionMessageHandler { @Resource private TransactionTemplate transactionTemplate; @Override public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) { final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class); ... } @Override public RocketMQLocalTransactionState checkLocalTransaction(Object payload) { final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class); ... } }
TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);
本文解决了在 RocketMQ 2.1.0 版本以后,无法简单使用多个 @RocketMQTransactionListener 的问题。通过引入事务消息处理接口 TransactionMessageHandler,我们将原有的事务处理器改造成了一个分发器,使得在 DailyMart 项目中可以轻松处理多事务消息的场景。
本文链接:http://www.28at.com/showinfo-26-73332-0.html完美解决,RocketMQ如何支持多事务消息?
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com