当前位置:首页 > 科技  > 软件

RabbitMQ工作模式-Publish/Subscribe发布与订阅模式

来源: 责编: 时间:2023-11-10 17:08:11 214观看
导读订阅模式类型订阅模式示例图:前面2个案例中,只有3个角色:P:生产者,也就是要发送消息的程序C:消费者:消息的接受者,会一直等待消息到来。queue:消息队列,图中红色部分而在订阅模型中,多了一个exchange角色,而且过程略有变化:P:生产者

G2S28资讯网——每日最新资讯28at.com

订阅模式类型

订阅模式示例图:G2S28资讯网——每日最新资讯28at.com

G2S28资讯网——每日最新资讯28at.com

前面2个案例中,只有3个角色:G2S28资讯网——每日最新资讯28at.com

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化:G2S28资讯网——每日最新资讯28at.com

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!G2S28资讯网——每日最新资讯28at.com

Publish/Subscribe发布与订阅模式

1、模式说明

G2S28资讯网——每日最新资讯28at.com

发布订阅模式:G2S28资讯网——每日最新资讯28at.com

每个消费者监听自己的队列。G2S28资讯网——每日最新资讯28at.com

生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息G2S28资讯网——每日最新资讯28at.com

2、案例

(1)生产者

G2S28资讯网——每日最新资讯28at.com

package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_PubSub {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //2. 设置参数        factory.setHost("127.0.0.1"); // ip  默认值 localhost        factory.setPort(5672); //端口  默认值 5672        factory.setVirtualHost("/test"); //虚拟机 默认值 /        factory.setUsername("libai"); // 用户名 默认 guest        factory.setPassword("libai"); //密码 默认值 guest        //3. 创建连接 Connection        Connection connection = factory.newConnection();        //4. 创建Channel        Channel channel = connection.createChannel();        //5. 创建交换机        /*           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)           参数:            1. exchange:交换机名称            2. type:交换机类型                DIRECT("direct"):定向                FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。                TOPIC("topic") 通配符的方式                HEADERS("headers") 参数匹配            3. durable:是否持久化            4. autoDelete:自动删除            5. internal:内部使用。 一般false            6. arguments:参数        */        String exchangeName = "test_fanout";        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);        //6. 创建队列        String queue1Name = "test_fanout_queue1";        String queue2Name = "test_fanout_queue2";        channel.queueDeclare(queue1Name, true, false, false, null);        channel.queueDeclare(queue2Name, true, false, false, null);        // 7. 绑定队列和交换机        /*            queueBind(String queue, String exchange, String routingKey)            参数:                1. queue:队列名称                2. exchange:交换机名称                3. routingKey:路由键,绑定规则                    如果交换机的类型为fanout ,routingKey设置为""         */        channel.queueBind(queue1Name, exchangeName, "");        channel.queueBind(queue2Name, exchangeName, "");        //8. 发送消息至交换机,由交换机分发消息        String body = "日志信息: 肥仔白调用了findAll方法...日志级别: INFO....";        channel.basicPublish(exchangeName, "", null, body.getBytes());        //9. 释放资源        channel.close();        connection.close();            }}

执行生产者,我们可以查看一下创建的 交换机 以及 队列信息:G2S28资讯网——每日最新资讯28at.com

G2S28资讯网——每日最新资讯28at.com

G2S28资讯网——每日最新资讯28at.com

下面再来看看队列,如下:G2S28资讯网——每日最新资讯28at.com

G2S28资讯网——每日最新资讯28at.com

G2S28资讯网——每日最新资讯28at.com

下面我们继续来写两个消费者接收消息。G2S28资讯网——每日最新资讯28at.com

(2)消费者1:读取队列1的消息

G2S28资讯网——每日最新资讯28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub1 {    //定义接收队列的名称    final static String queueName = "test_fanout_queue1";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //2. 设置参数        factory.setHost("127.0.0.1"); // ip  默认值 localhost        factory.setPort(5672); //端口  默认值 5672        factory.setVirtualHost("/test"); //虚拟机 默认值 /        factory.setUsername("libai"); // 用户名 默认 guest        factory.setPassword("libai"); //密码 默认值 guest        //3. 创建连接 Connection        Connection connection = factory.newConnection();        //4. 创建Channel        Channel channel = connection.createChannel();        //5. 创建队列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        参数:            1. queue:队列名称            2. durable:是否持久化,当mq重启之后,还在            3. exclusive:                * 是否独占。只能有一个消费者监听这队列                * 当Connection关闭时,是否删除队列            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉            5. arguments:参数。         */        channel.queueDeclare(queueName, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        参数:            1. queue:队列名称            2. autoAck:是否自动确认            3. callback:回调对象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回调方法,当收到消息后,会自动执行该方法                1. consumerTag:标识                2. envelope:获取一些信息,交换机,路由key...                3. properties:配置信息                4. body:数据             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收队列的数据 body: " + new String(body));            }        };        channel.basicConsume(queueName,true,consumer);        //不需要关闭资源,因为消费者需要持续监听队列信息    }}

(3)消费者2:读取队列2的消息

G2S28资讯网——每日最新资讯28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub2 {    //定义接收队列的名称    final static String queueName = "test_fanout_queue2";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //2. 设置参数        factory.setHost("127.0.0.1"); // ip  默认值 localhost        factory.setPort(5672); //端口  默认值 5672        factory.setVirtualHost("/test"); //虚拟机 默认值 /        factory.setUsername("libai"); // 用户名 默认 guest        factory.setPassword("libai"); //密码 默认值 guest        //3. 创建连接 Connection        Connection connection = factory.newConnection();        //4. 创建Channel        Channel channel = connection.createChannel();        //5. 创建队列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        参数:            1. queue:队列名称            2. durable:是否持久化,当mq重启之后,还在            3. exclusive:                * 是否独占。只能有一个消费者监听这队列                * 当Connection关闭时,是否删除队列            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉            5. arguments:参数。         */        channel.queueDeclare(queueName, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        参数:            1. queue:队列名称            2. autoAck:是否自动确认            3. callback:回调对象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回调方法,当收到消息后,会自动执行该方法                1. consumerTag:标识                2. envelope:获取一些信息,交换机,路由key...                3. properties:配置信息                4. body:数据             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收队列的数据 body: " + new String(body));            }        };        channel.basicConsume(queueName,true,consumer);        //不需要关闭资源,因为消费者需要持续监听队列信息    }}

3、测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。G2S28资讯网——每日最新资讯28at.com

  • 消费者1接收到的消息:

G2S28资讯网——每日最新资讯28at.com

  • 消费者2接收到的消息:

G2S28资讯网——每日最新资讯28at.com

从结果来看,生产者只需要发送一条消息,其余的消费者全部收到了消息,达到了广播的效果。G2S28资讯网——每日最新资讯28at.com

4、小结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。G2S28资讯网——每日最新资讯28at.com

发布订阅模式与工作队列模式的区别:G2S28资讯网——每日最新资讯28at.com

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。

本文链接:http://www.28at.com/showinfo-26-20057-0.htmlRabbitMQ工作模式-Publish/Subscribe发布与订阅模式

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: Oracle数据库调优实战:优化SQL查询的黄金法则!

下一篇: 学会使用Java的远程调试工具,解决难题

标签:
  • 热门焦点
Top