当前位置:首页 > 科技  > 软件

消息队列,聊聊发送消息的四种姿势~

来源: 责编: 时间:2023-12-18 09:46:55 164观看
导读微服务开发中经常会使用消息队列进行跨服务通信。在一个典型场景中,服务A执行一个业务逻辑,需要保存数据库,然后通知服务B执行相应的业务逻辑。在这种场景下,我们需要考虑如何发送消息。图片1. 基础版首先,我们可能会考虑

微服务开发中经常会使用消息队列进行跨服务通信。在一个典型场景中,服务A执行一个业务逻辑,需要保存数据库,然后通知服务B执行相应的业务逻辑。在这种场景下,我们需要考虑如何发送消息。jN928资讯网——每日最新资讯28at.com

图片图片jN928资讯网——每日最新资讯28at.com

1. 基础版

首先,我们可能会考虑将数据库操作和消息发送放在同一个事务中,以下是伪代码示例:jN928资讯网——每日最新资讯28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){  String id = IdUtils.nextId(); businessDO.setId(id);    xxxRepository.save(businessDO);          BusinessMessage businessMessage = new BusinessMessage();      businessMessage.setKey(id);      SendResult send = rocketMQTemplate.syncSend("test-topic", sendMessage);}

在这段代码里通过@Transactional注解将数据库的操作以及发送消息放到一个事务中,如果数据库的保存或者消息发送失败,则回滚事务。jN928资讯网——每日最新资讯28at.com

乍一看似乎没什么问题,但稍微推敲一下就会发现此方式有如下两个缺陷:jN928资讯网——每日最新资讯28at.com

1.1 数据不一致

首先最容易想到的是,这种消息发送方式无法保证数据的最终一致性。jN928资讯网——每日最新资讯28at.com

这里先让我来解释一下基于消息队列,生产者发送消息到消费者消费消息的过程:jN928资讯网——每日最新资讯28at.com

  1. 生产者发送消息
  2. MQ收到消息并将数据持久化,在存储中新增一条记录
  3. 返回ACK给生产者
  4. MQ 推送消息给对应的消费者,等待消费者返回ACK
  5. 如果消费者在指定时间内成功返回ACK,那么MQ则认为消费成功,执行第6步删除消息,如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会重新推送消息,重复执行第4、5、6步操作。
  6. 删除消息。

图片图片jN928资讯网——每日最新资讯28at.com

好,现在回到上面发送消息的场景,假设数据库处理成功,消息消费成功,但是MQ由于某些原因处理超时,导致ACK确认失败,此时整个事务回滚,结果出现数据不一致问题。jN928资讯网——每日最新资讯28at.com

这种数据不一致的问题在RPC调用的场景下也经常出现,其根本的原因在于:远程调用,结果最终可能为成功、失败、超时;而对于超时的情况,处理方最终的结果可能是成功,也可能是失败,调用方是无法知晓的。jN928资讯网——每日最新资讯28at.com

1.2 事务未提交

其次,使用以上方式还存在另一个问题,即消费者在处理消息时可能读不到刚刚保存的数据,即消费者消费速度快于事务提交的速度。jN928资讯网——每日最新资讯28at.com

举例:jN928资讯网——每日最新资讯28at.com

假设服务B需要通过消息中的数据ID获取服务A数据库保存的数据。这种情况下,数据库操作与消息发送在同一事务中。可能出现服务B在处理消息时,服务A事务还未提交,导致服务B获取的数据是空数据,无法执行相应业务逻辑。jN928资讯网——每日最新资讯28at.com

1.3 适用场景

尽管这种发送方式存在上述两个问题,但在某些场景下仍然适用。例如,消费者在处理时不依赖生产者的数据,且对数据一致性要求不高,这种情况下消息发送和数据库保存可以不在同一个事务中。jN928资讯网——每日最新资讯28at.com

2. 进阶版

为解决事务未提交问题,我们可以确保事务提交后再发送消息。在SpringBoot项目中,有两种常见解决方案。jN928资讯网——每日最新资讯28at.com

2.1 事务同步器

基于事务同步器的方法:jN928资讯网——每日最新资讯28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){      xxxRepository.save(businessDO);          TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {          @Override          public void afterCommit() {              BusinessMessage businessMessage = new BusinessMessage();              businessMessage.setXXX();              rocketMQTemplate.syncSend("test-topic", sendMessage);          }      });  }

TransactionSynchronizationManager.registerSynchronization 是 Spring 框架中用于注册事务同步的方法。通过这个方法,你可以在事务提交、回滚或完成时执行一些额外的逻辑。jN928资讯网——每日最新资讯28at.com

在上述代码中,使用了afterCommit方法,在事务成功提交后执行发送消息操作,确保在数据库操作成功且事务稳定的情况下发送消息。jN928资讯网——每日最新资讯28at.com

2.2 消息监听器

另一种方法是基于ApplicationEventPublisher,在保存数据库操作后发布一个事件,并通过@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)注解监听事件。这样可以确保消息在数据库事务提交之后再发送。jN928资讯网——每日最新资讯28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){      xxxRepository.save(businessDO);   eventPublisher.publishEvent(new UserCreatedEvent(registerUser));}@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void handleUserRegisteredEvent(UserCreatedEvent userCreatedEvent) {      rocketMQTemplate.syncSend("test-topic", sendMessage);  }

这里需要说明一下:在默认情况下Spring的事件监听机制并不是异步的(上次群友弄错了),而是同步的将代码进行解耦,@TransactionalEventListener也是通过同步的方式,但是加入了回调的方式来解决,这样就能够控制事务进行Commited、Rollback时才进行事件的处理,来达到事务同步的目的。jN928资讯网——每日最新资讯28at.com

2.3 适用范围

通过以上方式,相较于基础版,可以确保消息在事务提交后发送,解决了消费者读取空数据的问题。但仍然无法保证数据的一致性,适用于对数据一致性要求不高的场景。jN928资讯网——每日最新资讯28at.com

3. 本地消息表+补偿重试

如果需要保证最终一致性而非强一致性,可以采用本地消息表+补偿重试的方式来发送消息。jN928资讯网——每日最新资讯28at.com

这种方式的执行原理如下:jN928资讯网——每日最新资讯28at.com

  1. 在执行业务操作的同时,在本地消息表中插入一条状态为待发送的记录,业务数据的记录与消息记录必须在同一个事务中完成,这是此方案的核心原则。由于消息表与业务表在同一个库中,事务可以通过数据库来保证。
  2. 事务提交后发送消息,如果消息发送成功,则将消息状态标记为发送成功或删除消息
  3. 在生产者服务中会创建一个定时任务,定时从消息表中检索待发送的消息重新发送。
  4. 对于消费者消费失败,则依赖MQ本身的重试机制来完成,保证数据的最终一致性。

图片图片jN928资讯网——每日最新资讯28at.com

核心代码如下:jN928资讯网——每日最新资讯28at.com

@Transactional  public void saveWithMessage(BusinessDO businessDO){      TransactionMessage transactionMessage = new TransactionMessage();      transactionMessage.setStaus(MessageStatus.WAITING_SEND);      transactionMessage.setMessageKey(businessDO.getId());      ...        xxxRepository.save(businessDO);      messageRepository.save(TransactionMessage);      TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {          @Override          public void afterCommit() {              messageService.sendMessage(transactionMessage,businessDO);          }      });  }  public void sendMessage(TransactionMessage transactionMessage,BusinessDO businessDO){      BusinessMessage businessMessage = new BusinessMessage();      businessMessage.setXXX();      try{          rocketMQTemplate.syncSend("test-topic", sendMessage);          transactionMessage.setStatus(MessageStatus.SUCCESS);          messageRepository.update(transactionMessage);      }catch (Exception e){          // 执行失败的业务逻辑      }    }

3.1 问题

虽然这种方式能够保证消息在事务提交后发送,且能够保证最终一致性,但仍然存在一些缺陷:jN928资讯网——每日最新资讯28at.com

首先,需要额外的消息表,增加了系统复杂度。(针对此问题,我们又可以将该功能单独提取出来,做成一个消息服务来统一处理,考虑篇幅问题,这里暂不展开。)jN928资讯网——每日最新资讯28at.com

其次,通过定时任务轮询消息表,对于处理成功但ACK超时的数据会重新发送消息,这对下游系统产生了强烈的幂等性保障要求,消费者的处理逻辑必须做好幂等控制。关于幂等的处理方案,我在[[Dailymart17:并发与幂等的实现方案]]一文中有详细的说明,欢迎翻阅。jN928资讯网——每日最新资讯28at.com

4. 基于事务消息发送

目前,RocketMQ是主流MQ中唯一一个支持事务消息的,如果你们项目恰好使用的是RocketMQ,可以采用事务消息来发送。jN928资讯网——每日最新资讯28at.com

有关RocketMQ事务消息的详细信息,可以参考我之前的文章[SpringCloud基于RocketMQ实现分布式事务,这里不再赘述。同时,由于在Dailymart项目中使用的是RocketMQ,也可以参考Dailymart的代码实现。jN928资讯网——每日最新资讯28at.com

小结

本文详细介绍了微服务开发中常用的4种消息发送方式。对于那些对数据一致性要求不高的场景,可以选择使用进阶版的消息发送方式。而对于需要保证最终一致性的情况,推荐采用事务消息和本地消息表的方式进行消息发送。jN928资讯网——每日最新资讯28at.com

在文中涉及的相关代码,你可以在我星球 DDD&微服务 系列专栏中找到相应的实现,欢迎加入我们的讨论。jN928资讯网——每日最新资讯28at.com

DailyMart是一个基于 DDD 和Spring Cloud Alibaba的微服务商城系统,采用SpringBoot3.x以及JDK17。旨在为开发者提供集成式的学习体验,并将其无缝地应用于实际项目中。该专栏包含领域驱动设计(DDD)、Spring Cloud Alibaba企业级开发实践、设计模式实际应用场景解析、分库分表战术及实用技巧等内容。如果你对这个系列感兴趣,可在本公众号回复关键词 DDD 获取完整文档以及相关源码。jN928资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-48351-0.html消息队列,聊聊发送消息的四种姿势~

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: Eslint 会被 Oxlint 干掉吗?

下一篇: 神技能!一招教你免费搞定PDF转word

标签:
  • 热门焦点
Top