当前位置:首页 > 科技  > 软件

三分钟学会在 RabbitMQ 中实现发布订阅模式

来源: 责编: 时间:2024-03-28 09:28:45 113观看
导读在这个充满挑战和收获的60天学习之旅中,你将迅速提升成为一名全栈工程师。专注于Spring Boot框架,我们将深入研究高级特性,从项目初始化到微服务架构,再到性能优化和持续集成部署。无论你是初学者还是有一定经验的开发者,

在这个充满挑战和收获的60天学习之旅中,你将迅速提升成为一名全栈工程师。专注于Spring Boot框架,我们将深入研究高级特性,从项目初始化到微服务架构,再到性能优化和持续集成部署。无论你是初学者还是有一定经验的开发者,这个专题都将带你穿越从零到全面掌握Spring Boot的学习曲线。jCQ28资讯网——每日最新资讯28at.com

jCQ28资讯网——每日最新资讯28at.com

Day 32 ~ Springboot3.1.x|3分钟学会在 RabbitMQ 中实现发布订阅模式jCQ28资讯网——每日最新资讯28at.com

实现发布与订阅消息模式

发布-订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特定的接收者(订阅者)。发布者类别定义了哪些订阅者因为订阅者匹配了发布者的类别而接收消息。jCQ28资讯网——每日最新资讯28at.com

以下是使用RabbitMQ实现发布-订阅模式的一种例子,我们将使用RabbitMQ的Fanout Exchange。jCQ28资讯网——每日最新资讯28at.com

Producer

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class EmitLog {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    try (Connection connection = factory.newConnection();         Channel channel = connection.createChannel()) {        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");        String message = "Log message...";        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));        System.out.println("Sent '" + message + "'");    }  }}

在上述代码的channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),我们声明一个名为"log"的exchange,同时我们定义其类型为"fanout",意味着它会将接收到的所有消息广播给所有它所知道的队列。jCQ28资讯网——每日最新资讯28at.com

Consumer

每一个订阅者都需要拥有一个queue,因此,我们需要在客户端中创建queue。jCQ28资讯网——每日最新资讯28at.com

import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");    String queueName = channel.queueDeclare().getQueue();    channel.queueBind(queueName, EXCHANGE_NAME, "");    DeliverCallback deliverCallback = (consumerTag, delivery) -> {        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);        System.out.println("Received '" + message + "'");    };    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });  }}

在这个例子中,我们声明一个新的queue,并将其与"logs"的exchange绑定。然后我们定义了消息的接收以及处理方式。jCQ28资讯网——每日最新资讯28at.com

处理消息发送失败的情况

在使用消息中间件的过程中,消息发送失败是无法避免的情况。因此,我们需要对此进行正确的处理以避免因此而导致的系统问题。jCQ28资讯网——每日最新资讯28at.com

对于消息发送失败的处理,有以下几种常用的方案:jCQ28资讯网——每日最新资讯28at.com

  • 重试: 对于有些暂时的问题,比如网络波动,可以通过简单的重试来解决。
  • 消息持久化:将消息存储在某处(例如数据库),只有当消息成功发送后,再删除它。
  • 死信队列:把无法处理的消息放入"死信队列",然后由专门的消费者来进行处理。

RabbitMQ中的消息确认(publisher confirms)和消费者应答(Consumer Acknowledgements)就是为了解决此类问题。jCQ28资讯网——每日最新资讯28at.com

ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection()) {    Channel channel = connection.createChannel();    String queueName = "test";    String message = "Hello world";    try {        channel.queueDeclare(queueName, false, false, false, null);        channel.confirmSelect();        channel.basicPublish("", queueName, null, message.getBytes());        if (!channel.waitForConfirms()) {            System.out.println("消息发送失败");        }    } catch (Exception e) {        System.out.println("错误: " + e.getMessage());    }}

上述代码中执行channel.confirmSelect();后,当前channel被设置为publisher confirm模式。在此模式下,当消息被RabbitMQ成功接收后,会发送一个确认给生产者。如果RabbitMQ没有发送确认,那么生产者可以认定该消息发送失败。jCQ28资讯网——每日最新资讯28at.com

结论:掌握发布-订阅模式和消息发送失败处理策略,对于掌握消息队列的使用至关重要,可为系统的稳定性和扩展性提供保障。jCQ28资讯网——每日最新资讯28at.com

jCQ28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-79990-0.html三分钟学会在 RabbitMQ 中实现发布订阅模式

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: SpringCloud项目开发中实用技巧总结

下一篇: 2024 年,这些 VS Code 插件可以卸载了!

标签:
  • 热门焦点
Top