订阅模式示例图:
前面2个案例中,只有3个角色:
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
发布订阅模式:
每个消费者监听自己的队列。
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_PubSub { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建交换机 /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"):定向 FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic") 通配符的方式 HEADERS("headers") 参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); //6. 创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); // 7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ channel.queueBind(queue1Name, exchangeName, ""); channel.queueBind(queue2Name, exchangeName, ""); //8. 发送消息至交换机,由交换机分发消息 String body = "日志信息: 肥仔白调用了findAll方法...日志级别: INFO...."; channel.basicPublish(exchangeName, "", null, body.getBytes()); //9. 释放资源 channel.close(); connection.close(); }}
执行生产者,我们可以查看一下创建的 交换机 以及 队列信息:
下面再来看看队列,如下:
下面我们继续来写两个消费者接收消息。
package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub1 { //定义接收队列的名称 final static String queueName = "test_fanout_queue1"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ channel.queueDeclare(queueName, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收队列的数据 body: " + new String(body)); } }; channel.basicConsume(queueName,true,consumer); //不需要关闭资源,因为消费者需要持续监听队列信息 }}
package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub2 { //定义接收队列的名称 final static String queueName = "test_fanout_queue2"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ channel.queueDeclare(queueName, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收队列的数据 body: " + new String(body)); } }; channel.basicConsume(queueName,true,consumer); //不需要关闭资源,因为消费者需要持续监听队列信息 }}
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
从结果来看,生产者只需要发送一条消息,其余的消费者全部收到了消息,达到了广播的效果。
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
本文链接:http://www.28at.com/showinfo-26-20057-0.htmlRabbitMQ工作模式-Publish/Subscribe发布与订阅模式
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: Oracle数据库调优实战:优化SQL查询的黄金法则!
下一篇: 学会使用Java的远程调试工具,解决难题