当前位置:首页 > 科技  > 软件

SpringBoot整合RabbitMQ实现邮件异步发送

来源: 责编: 时间:2024-09-10 09:51:06 47观看
导读本篇文章将介绍另一种高可用的服务架构,以便实现邮件 100% 被投递成功。类似的短信推送等服务,实现逻辑也大体类似。01、先来一张流程图图片本文内容主要围绕这个流程图展开,利用 RabbitMQ 消息队列来实现邮件 100% 被投

本篇文章将介绍另一种高可用的服务架构,以便实现邮件 100% 被投递成功。类似的短信推送等服务,实现逻辑也大体类似。2A328资讯网——每日最新资讯28at.com

01、先来一张流程图

图片图片2A328资讯网——每日最新资讯28at.com

本文内容主要围绕这个流程图展开,利用 RabbitMQ 消息队列来实现邮件 100% 被投递,内容涵盖了 RabbitMQ 很多知识点,如:2A328资讯网——每日最新资讯28at.com

  • 生产者和消费者模型
  • 消息发送机制
  • 消费确认机制
  • 消息的重新投递
  • 消息消费失败的处理方案

02、实现思路

  • 1.准备一台电脑,并安装 RabbitMQ 服务
  • 2.开放 QQ 邮箱或者其它邮箱授权码,用于发送邮件
  • 3.创建邮件发送项目并编写代码
  • 4.发送邮件测试
  • 5.消息消费失败的处理介绍

03、环境准备

3.1、安装 RabbitMQ 服务

安装 RabbitMQ 服务,这一步比较简单,可以访问下面的官方地址,下载软件包并依次按照步骤进行安装即可。2A328资讯网——每日最新资讯28at.com

https://rabbitmq.org.cn/docs/download

安装成功之后,登陆 RabbitMQ 控制台,可以看到类似于如下界面。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

3.1.1、创建交换器

点击“Exchanges”菜单,进入“交换器”管理界面。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

进入之后,点击最下方“Add a new exchange”按钮,创建一个类型为topic,名称叫mail.exchange的交换器,并提交。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

3.1.2、创建消息队列

接着,点击“Queues”菜单,进入消息队列管理界面。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

同样的,点击最下方“Add a new queue”按钮,创建一个名称叫mq.mail.ack的消息队列,并提交。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

保存之后,在列表中可以看到刚刚创建的消息队列,然后点击进入详情。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

在详情中,将当前消息队列与上文创建的交换器进行绑定,便于后续通过交换器来发送消息到队列,操作如下。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

对于topic类型的交换器,通常不直接与消息队列进行交互,而是通过一个路由键,将消息路由到目标消息队列,这样设计的目的是让消息投递更加灵活。路由键,可以简单理解为类似于路由器,对数据进行路由分发处理。2A328资讯网——每日最新资讯28at.com

3.2、配置邮箱发送服务器

为了实现邮件自动发送功能,我们还需要准备一个邮箱发送服务器,这一步在之前的文章中已经详细的介绍过,在此,我们简单的再介绍一下。2A328资讯网——每日最新资讯28at.com

以 QQ 邮箱为例,登陆进去之后,在设置里面开启 POP3/SMTP 服务,并获取授权码记录下来。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

该授权码,就是下文配置文件中spring.mail.password需要的密码!2A328资讯网——每日最新资讯28at.com

04、方案实践

4.1、构建项目

在 IDEA 下创建一个名称为smail的 Spring Boot 项目,pom文件中加入amqp和mail相关依赖包,示例如下:2A328资讯网——每日最新资讯28at.com

<!--mail 支持--><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-mail</artifactId></dependency><!--amqp 支持--><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

4.2、添加相关配置

在application.properties中添加 rabbitmq、邮箱相关配置,示例如下:2A328资讯网——每日最新资讯28at.com

# 配置邮件发送主机地址spring.mail.host=smtp.exmail.qq.com# 配置邮件发送服务端口号spring.mail.port=465# 配置邮件发送服务协议spring.mail.protocol=smtp# 配置邮件发送者用户名或者账户spring.mail.username=xxxx# 配置邮件发送者密码或者授权码spring.mail.password=xxxx# 配置邮件默认编码spring.mail.default-encoding=UTF-8# 配置smtp相关属性spring.mail.properties.mail.smtp.auth=truespring.mail.properties.mail.smtp.ssl.enable=truespring.mail.properties.mail.smtp.ssl.required=true#rabbitmq配置spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=/spring.rabbitmq.username=testspring.rabbitmq.password=test# 开启confirms回调 P -> Exchangespring.rabbitmq.publisher-cnotallow=true# 开启returnedMessage回调 Exchange -> Queuespring.rabbitmq.publisher-returns=true# 设置手动确认(ack) Queue -> Cspring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.listener.simple.prefetch=100

其中,spring.mail.username和spring.mail.password指的就是上文中创建的邮箱账号和授权码,将其配置进去即可。2A328资讯网——每日最新资讯28at.com

4.3、编写 RabbitMQ 配置类

编写一个 RabbitMQ 配置类,用于监听消息的发送情况,示例如下。2A328资讯网——每日最新资讯28at.com

@Configurationpublic class RabbitConfig {    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);    @Autowired    private CachingConnectionFactory connectionFactory;    @Bean    public RabbitTemplate rabbitTemplate() {        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        // 设置消息转换器为json格式        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());        // 消息是否成功发送到Exchange        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {            if (ack) {                LOGGER.info("消息发送到Exchange成功,{}", correlationData);            } else {                LOGGER.error("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);            }        });        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调        rabbitTemplate.setMandatory(true);        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {            LOGGER.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);        });        return rabbitTemplate;    }}

4.4、编写生产者服务

在 Spring Boot 中,我们可以利用RabbitTemplate工具,将数据通过交换器发送到目标消息队列,示例如下。2A328资讯网——每日最新资讯28at.com

@Servicepublic class ProduceService {    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * 发送消息     * @param mail     * @return     */    public boolean sendByAck(Mail mail) {        // 创建uuid        String msgId = UUID.randomUUID().toString().replaceAll("-", "");        mail.setMsgId(msgId);        // 发送消息到mq服务器中(附带消息ID)        CorrelationData correlationData = new CorrelationData(msgId);        rabbitTemplate.convertAndSend("mail.exchange", "route.mail.ack", MessageHelper.objToMsg(mail), correlationData);        return true;    }}

4.5、编写消费者服务

在 Spring Boot 中,我们可以利用@RabbitListener注解,监听指定的消息队列,如果队列中有消息会第一时间收到回调,示例如下。2A328资讯网——每日最新资讯28at.com

@Componentpublic class ConsumerService {    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerService.class);    @Autowired    private SendMailService sendMailService;    /**     * 监听消息队列,手动确认模式,必须手动调用ack或者nack方法     * 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=manual     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(queues = {"mq.mail.ack"})    public void consumeFromAck(Message message, Channel channel) throws IOException {        LOGGER.info("收到消息:{}", message.toString());        //将消息转化为对象        Mail mail = MessageHelper.msgToObj(message, Mail.class);        // 手动确认模式        long tag = message.getMessageProperties().getDeliveryTag();        boolean success = sendMailService.send(mail);        if (success) {            // 消费成功,消息会被删除            channel.basicAck(tag, false);        } else {            // 消费失败,重新返回队列            channel.basicNack(tag, false, true);        }    }}

4.6、编写邮件发送服务

正如之前的文章中所介绍的,在 Spring Boot 中,我们可以利用JavaMailSender工具来实现邮件的自动推送,示例如下。2A328资讯网——每日最新资讯28at.com

@Servicepublic class SendMailService {    private static final Logger LOGGER = LoggerFactory.getLogger(SendMailService.class);    @Value("${spring.mail.username}")    private String from;    @Autowired    private JavaMailSender mailSender;    /**     * 发送简单邮件     *     * @param mail     */    public boolean send(Mail mail) {        String to = mail.getTo();// 目标邮箱        String title = mail.getTitle();// 邮件标题        String content = mail.getContent();// 邮件正文        SimpleMailMessage message = new SimpleMailMessage();        message.setFrom(from);        message.setTo(to);        message.setSubject(title);        message.setText(content);        try {            mailSender.send(message);            LOGGER.info("邮件发送成功");            return true;        } catch (MailException e) {            LOGGER.error("邮件发送失败, to: {}, title: {}", to, title, e);            return false;        }    }}

4.7、编写 controller 接口

接着,编写一个 controller 接口,将邮件发送服务暴露出去,示例如下:2A328资讯网——每日最新资讯28at.com

@RestControllerpublic class MailController {    @Autowired    private ProduceService produceService;    @PostMapping("send")    public String sendMail(Mail mail) {        boolean result = produceService.sendByAck(mail);        return result ? "success": "fail";    }}

4.8、服务测试

最后,启动 SpringBoot 服务,用 postman 来测试一下。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

查看控制台信息。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

查询接受者邮件信息。2A328资讯网——每日最新资讯28at.com

图片图片2A328资讯网——每日最新资讯28at.com

可以清楚的看到,邮件发送成功!2A328资讯网——每日最新资讯28at.com

当大批量的发送邮件,也不用担心,因为整个邮件的发送都是异步的,不会阻塞主流程的运行。2A328资讯网——每日最新资讯28at.com

2A328资讯网——每日最新资讯28at.com

05、消费失败的处理方案

虽然以上的方案非常可靠,可以保证发出的消息 100% 被消费,但是其实也有弊端。2A328资讯网——每日最新资讯28at.com

试想一下,按照上面的处理逻辑,假设其中有一条消息,因为某种原因一直发送失败,会出现什么样的情况?2A328资讯网——每日最新资讯28at.com

此时,这条消息会重新返回队列,然后一直重试,会导致其它的消息可能会无法被消费。2A328资讯网——每日最新资讯28at.com

针对这种情况,最简单粗暴的办法就是,当重试失败之后将消息丢弃,不会阻碍其它的消息被正常处理,不过会丢失数据。2A328资讯网——每日最新资讯28at.com

那么如何正确的处理消息消费失败的问题呢?2A328资讯网——每日最新资讯28at.com

可以借助数据库来记录消费失败的数据,针对系统无法成功处理的消息,人工进行干预。2A328资讯网——每日最新资讯28at.com

实践过程如下!2A328资讯网——每日最新资讯28at.com

5.1、创建一张消息日志表

首先,在数据库中创建一张消息日志表,用于跟踪消息数据的状态,示例如下:2A328资讯网——每日最新资讯28at.com

CREATE TABLE `msg_log` (  `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',  `exchange` varchar(100) NOT NULL DEFAULT '' COMMENT '交换机',  `route_key` varchar(100) NOT NULL DEFAULT '' COMMENT '路由键',  `queue_name` varchar(100) NOT NULL DEFAULT '' COMMENT '队列名称',  `msg` text COMMENT '消息体, json格式化',  `result` varchar(255) DEFAULT NULL COMMENT '处理结果',  `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0:等待消费,1:消费成功,2:消费失败,9:重试失败',  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `update_time` datetime DEFAULT NULL COMMENT '更新时间',  PRIMARY KEY (`msg_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息日志';

5.2、改写生产者逻辑

在生产者服务类中,先将消息数据写入数据库,再向 rabbitMQ 服务中发消息,示例如下:2A328资讯网——每日最新资讯28at.com

@Servicepublic class ProduceService {    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private MsgLogService msgLogService;    /**     * 发送消息     * @param mail     * @return     */    public boolean sendByAuto(Mail mail) {        String msgId = UUID.randomUUID().toString().replaceAll("-", "");        mail.setMsgId(msgId);        // 1.存储要消费的数据        msgLogService.save("mail.exchange", "route.mail.auto", "mq.mail.auto", msgId, mail);        // 2.发送消息到mq服务器中(附带消息ID)        CorrelationData correlationData = new CorrelationData(msgId);        rabbitTemplate.convertAndSend("mail.exchange", "route.mail.auto", MessageHelper.objToMsg(mail), correlationData);        return true;    }}

5.3、改写消费者逻辑

在消费者服务类中,收到消息之后,不管处理成功还是失败,都只会修改数据库中的消息状态,并且消息处理失败时,不再重新返回队列。2A328资讯网——每日最新资讯28at.com

@Componentpublic class ConsumerService {    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerService.class);    @Autowired    private SendMailService sendMailService;    @Autowired    private MsgLogService msgLogService;    /**     * 监听消息队列,自动确认模式,无需调用ack或者nack方法,当程序执行时才删除消息     * 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=auto     * @param message     */    @RabbitListener(queues = {"mq.mail.auto"})    public void consumeFromAuto(Message message) {        LOGGER.info("收到消息:{}", message.toString());        // 获取消息ID        Mail mail = MessageHelper.msgToObj(message, Mail.class);        // 消息幂等性处理,如果已经处理成功,无需重复消费        MsgLog queryObj = msgLogService.selectByMsgId(mail.getMsgId());        if(Objects.nonNull(queryObj) && Constant.SUCCESS.equals(queryObj.getStatus())){            return;        }        // 发送邮件        boolean success = sendMailService.send(mail);        if(success){            msgLogService.updateStatus(mail.getMsgId(), Constant.SUCCESS, "邮件发送成功");        } else {            msgLogService.updateStatus(mail.getMsgId(), Constant.FAIL, "邮件发送失败");        }    }}

因为此处采用自动确认模式,因此还需要修改application.properties中的配置参数,内容如下:2A328资讯网——每日最新资讯28at.com

# 设置自动确认(默认此模式)spring.rabbitmq.listener.simple.acknowledge-mode=auto

5.4、编写定时任务对失败消息进行补偿投递

当消息消费失败时,会自动记录到数据库。2A328资讯网——每日最新资讯28at.com

实际上,不可能每条数据都需要我们进行干预,有的可能重试一次就好了,因此可以编写一个定时任务,将消费失败的数据筛选出来,重新放入到消息队列中,只有当消费次数达到设置的最大值,此时进入人工干预阶段,可以节省不少的工作。2A328资讯网——每日最新资讯28at.com

示例如下:2A328资讯网——每日最新资讯28at.com

@Componentpublic class ScheduledTask {    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTask.class);    /**     * 最大投递次数     */    private static final int MAX_TRY_COUNT = 3;    @Autowired    private MsgLogService msgLogService;    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * 每30s拉取消费失败的消息, 重新投递     */    @Scheduled(cron = "0/30 * * * * ?")    public void retry() {        LOGGER.info("开始执行重新投递消费失败的消息!");        // 查询需要重新投递的消息        List<MsgLog> msgLogs = msgLogService.selectFailMsg();        for (MsgLog msgLog : msgLogs) {            if (msgLog.getTryCount() >= MAX_TRY_COUNT) {                msgLogService.updateStatus(msgLog.getMsgId(), Constant.RETRY_FAIL, msgLog.getResult());                LOGGER.info("超过最大重试次数, msgId: {}", msgLog.getMsgId());                break;            }            // 重新投递消息            CorrelationData correlationData = new CorrelationData(msgLog.getMsgId());            rabbitTemplate.convertAndSend("", msgLog.getQueueName(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);            // 更新下次重试时间            msgLogService.updateNextTryTime(msgLog.getMsgId(), msgLog.getTryCount());        }    }}

最后别忘了,在Application类上添加@EnableScheduling,以便让定时调度生效,示例如下:2A328资讯网——每日最新资讯28at.com

@EnableScheduling@SpringBootApplicationpublic class Application {    public static void main(String[] args) {        SpringApplication.run(Application.class,args);    }}

利用定时任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功!2A328资讯网——每日最新资讯28at.com

06、小结

本文主要以实现邮件自动推送这个业务场景为例,通过 Springboot 整合 rabbitMQ 技术来实现高可用的效果。2A328资讯网——每日最新资讯28at.com

当然,解决这个业务需求的技术方案还有很多,例如 Springboot 整合 rocketMQ 也可以实现这个效果,不管怎么变,底层的实现思路基本都一样。2A328资讯网——每日最新资讯28at.com

希望本篇的知识总结,对大家有所帮助。2A328资讯网——每日最新资讯28at.com

最后,代码都经过自测,想要获取项目源码的同学,可以点击如下地址获取。2A328资讯网——每日最新资讯28at.com

示例代码地址:2A328资讯网——每日最新资讯28at.com

https://gitee.com/pzblogs/spring-boot-example-demo

2A328资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-112786-0.htmlSpringBoot整合RabbitMQ实现邮件异步发送

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!

下一篇: Vue3 无所不能!我用 Vue3 写接口给前端用你们信吗?

标签:
  • 热门焦点
  • 小米官宣:2023年上半年出货量中国第一!

    小米官宣:2023年上半年出货量中国第一!

    今日早间,小米电视官方微博带来消息,称2023年小米电视上半年出货量达到了中国第一,同时还表示小米电视的巨屏风暴即将开始。“公布一个好消息2023年#小米电视上半年出货量中国
  • 破圈是B站头上的紧箍咒

    破圈是B站头上的紧箍咒

    来源 | 光子星球撰文 | 吴坤谚编辑 | 吴先之每年的暑期档都少不了瞄准追剧女孩们的古偶剧集,2021年有优酷的《山河令》,2022年有爱奇艺的《苍兰诀》,今年却轮到小破站抓住了追
  • 腾讯盖楼,字节拆墙

    腾讯盖楼,字节拆墙

    来源 | 光子星球撰文 | 吴坤谚编辑 | 吴先之&ldquo;想重温暴刷深渊、30+技能搭配暴搓到爽的游戏体验吗?一起上晶核,即刻暴打!&rdquo;曾凭借直播腾讯旗下代理格斗游戏《DNF》一
  • 自律,给不了Keep自由!

    自律,给不了Keep自由!

    来源 | 互联网品牌官作者 | 李大为编排 | 又耳 审核 | 谷晓辉自律能不能给用户自由暂时不好说,但大概率不能给Keep自由。近日,全球最大的在线健身平台Keep正式登陆港交所,努力
  • “又被陈思诚骗了”

    “又被陈思诚骗了”

    作者|张思齐 出品|众面(ID:ZhongMian_ZM)如今的国产悬疑电影,成了陈思诚的天下。最近大爆电影《消失的她》票房突破30亿断层夺魁暑期档,陈思诚再度风头无两。你可以说陈思诚的
  • 网传小米汽车开始筛选交付中心 建筑面积不低于3000平方米

    网传小米汽车开始筛选交付中心 建筑面积不低于3000平方米

    7月7日消息,近日有微博网友@长三角行健者爆料称,据经销商集团反馈,小米汽车目前已经开始了交付中心的筛选工作,要求候选场地至少有120个车位,建筑不能低
  • 滴滴违法违规被罚80.26亿 共存在16项违法事实

    滴滴违法违规被罚80.26亿 共存在16项违法事实

    滴滴违法违规被罚80.26亿 存在16项违法事实开始于2121年7月,历经一年时间,网络安全审查办公室对“滴滴出行”网络安全审查终于有了一个暂时的结束。据“网信
  • 英特尔Xe HPG游戏显卡:拥有512EU,单风扇版本

    英特尔Xe HPG游戏显卡:拥有512EU,单风扇版本

    据10 月 30 日外媒 TheVerge 消息报道,英特尔 Xe HPG Arc Alchemist 的正面实被曝光,不仅拥有 512 EU 版显卡,还拥有 128EU 的单风扇版本。另外,这款显卡 PCB
  • 2021中国国际消费电子博览会与青岛国际软件融合创新博览会新闻发布会隆重举行

    2021中国国际消费电子博览会与青岛国际软件融合创新博览会新闻发布会隆重举行

    9月18日,2021中国国际消费电子博览会与青岛国际软件融合创新博览会新闻发布会在青岛国际新闻中心隆重举行。发布会上青岛市政府领导联袂出席,对本次双展会情
Top