当前位置:首页 > 科技  > 软件

Springboot实现Rabbitmq死信队列以及延迟队列的优化

来源: 责编: 时间:2023-10-10 18:30:35 183观看
导读导入依赖:后续延迟队列优化用Springboot整合,先理解死信队列<!--RabbitMQ依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <ver

导入依赖:

后续延迟队列优化用Springboot整合,先理解死信队列yvr28资讯网——每日最新资讯28at.com

<!--RabbitMQ依赖-->        <dependency>            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>5.12.0</version>        </dependency>

死信队列

由于特定原因导致队列中的消息不能被消费,这样的消息如果没有后续处理就可以放入死信队列中,例如一个订单如果超时未被支付从而自动失效,就将这个订单放到死信队列中。(死信队列中的消息是可以被消费的)yvr28资讯网——每日最新资讯28at.com

yvr28资讯网——每日最新资讯28at.com

死信队列产生的原因

消息TTL过期

就是在规定的时间内消息没有被消费,(和延迟队列不同,延迟队列时表示到达时间消息才可以被消费)yvr28资讯网——每日最新资讯28at.com

在生产者代码中设置消息过期时间:yvr28资讯网——每日最新资讯28at.com

//生产者发送消息,将消息设置为TTL消息        AMQP.BasicProperties properties =                new AMQP.BasicProperties().builder().expiration("10000").build();

修改队列参数argument的特殊属性:yvr28资讯网——每日最新资讯28at.com

arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交换机arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkeyarguments.put("x-message-TTL", 10000);//设置过期时间(单位毫秒)  //将死信交换机与死信队列绑定

模拟代码:

消费者1yvr28资讯网——每日最新资讯28at.com

public class Consumer01 {    public static final String EXCHANGE_DIRECT = "exchange_direct";//普通交换机的名称    public static final String EXCHANGE_DIRECT_DEAD = "exchange_direct_dead";//死信交换机的名称    public static final String QUEUE_PLAIN = "queue_plain";//普通队列的名称    public static final String QUEUE_PLAIN_DEAD = "queue_plain_dead";//死信队列的名称    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Channel channel = RabbitMqUtils.createChannel();        //声明死信交换机和普通交换机        channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);        channel.exchangeDeclare(EXCHANGE_DIRECT_DEAD, BuiltinExchangeType.DIRECT);        //声明普通队列(绑定普通队列与死信交换机的关系,在通过rotingkey绑定死信队列        Map<String, Object> arguments = new HashMap<>();        arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交换机        arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkey        //设置过期时间(单位毫秒)        arguments.put("x-message-TTL", 10000);        channel.queueDeclare(QUEUE_PLAIN, false, false, false, arguments);        //声明死信队列        channel.queueDeclare(QUEUE_PLAIN_DEAD, false, false, false, null);        //普通交换机和队列的绑定        channel.queueBind(QUEUE_PLAIN, EXCHANGE_DIRECT, "routingkey_direct");        //死信交换机和死信队列的绑定        channel.queueBind(QUEUE_PLAIN_DEAD, EXCHANGE_DIRECT_DEAD, "routingkey_direct-dead");        //模拟超时时间消息未被消费        Thread.sleep(1000000);        channel.basicConsume(QUEUE_PLAIN, true, (consumerTag, message) -> {            System.out.println("Consumer01.main接受到消息:" + new String(message.getBody()));        }, (consumerTag, sig) -> {        });    }}

生产者yvr28资讯网——每日最新资讯28at.com

public class Produce {    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtils.createChannel();        //生产者发送消息,将消息设置为TTL消息        AMQP.BasicProperties properties =                new AMQP.BasicProperties().builder().expiration("10000").build();        for (int i = 0; i < 10; i++) {            String message = i + "";            channel.basicPublish(Consumer01.EXCHANGE_DIRECT,"routingkey_direct",properties,message.getBytes(StandardCharsets.UTF_8));        }    }}

消费者2yvr28资讯网——每日最新资讯28at.com

public class Consumer2 {    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtils.createChannel();        channel.basicConsume(Consumer01.QUEUE_PLAIN_DEAD, true, (consumerTag, message) -> {            System.out.println("Consumer2.main接受死信队列的消息:" + new String(message.getBody()));        }, (consumerTag, sig) -> {        });    }}/**输出结果:Consumer2.main接受死信队列的消息:0Consumer2.main接受死信队列的消息:1Consumer2.main接受死信队列的消息:2Consumer2.main接受死信队列的消息:3Consumer2.main接受死信队列的消息:4Consumer2.main接受死信队列的消息:5Consumer2.main接受死信队列的消息:6Consumer2.main接受死信队列的消息:7Consumer2.main接受死信队列的消息:8Consumer2.main接受死信队列的消息:9    */

队列达到了最大长度

将RabbiMQ的队列的argument属性的键设置为 x-max-length 表示队列可以容纳的最大条数yvr28资讯网——每日最新资讯28at.com

消息被拒绝

将自动应答设为falseyvr28资讯网——每日最新资讯28at.com

在消费者调一个Channel.basicReject,设置参数requeue为false,表示不重新排队,将消息丢到死信队列yvr28资讯网——每日最新资讯28at.com

延迟队列优化

延迟队列就是讲一个消息延迟发送,例如消息在队列中10s后才能被取出,可以通过RabbitMQ的插件或者死信队列来实现yvr28资讯网——每日最新资讯28at.com

用死信队列实现延迟队列的思路:yvr28资讯网——每日最新资讯28at.com

在于死信队列绑定的普通队列不设置消费者,利用TTL延迟消息,当TTL时间过期后,到达死信队列被消费这样就形成一个延迟队列。yvr28资讯网——每日最新资讯28at.com

延迟队列的使用场景:①典型的就是流量削峰,对于不重要的消息,可以延迟消费,有助于减轻数据库的压力,强化分布式系统的高可用和并发性能。②还可以实现一个消息提醒,例如用户三天未登录发送一个消息提醒。yvr28资讯网——每日最新资讯28at.com

yvr28资讯网——每日最新资讯28at.com

在实际生产中可能存在很多不同的延迟时间要求,不可能每一个延迟要求就创造一个队列,我们可以用生产者实现延迟信息,而队列不设置TTL就可以根据生产的延迟消息进行延迟发送。yvr28资讯网——每日最新资讯28at.com

但是此方法虽然实现了一个队列就可以转发不同延时时间的消息,但是有缺陷,队列中的消息是排队发送的,也就是说如果我第一条消息发送20s延时,接着第二条消息发送2s延时。最后却是20s消息先消费,而2s消息后消费,因为RabbitMQ在检测一条消息时发生了20s的阻塞。如下:yvr28资讯网——每日最新资讯28at.com

###GET http://localhost:8080/ttl/sendExpirationMessage/aaaaa/20000###GET http://localhost:8080/ttl/sendExpirationMessage/bbbbb/2000最后输出结果是先消费aaaa后消费bbbb

可以通过RabbitMQ的插件实现延时队列,此方法没有这缺陷yvr28资讯网——每日最新资讯28at.com

从官网上下载对应版本的延迟插件,下载后如图:交换机类型会多出一个 x-delayed-messageyvr28资讯网——每日最新资讯28at.com

yvr28资讯网——每日最新资讯28at.com


yvr28资讯网——每日最新资讯28at.com

在我们自定义的交换机中,这是一种新的交换机类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才会投递到目标队列中。yvr28资讯网——每日最新资讯28at.com

代码实例:yvr28资讯网——每日最新资讯28at.com

配置类:yvr28资讯网——每日最新资讯28at.com

@Configurationpublic class RabbitDelayedConfig {    //延迟交换机    public static final String DELAYED_EXCHANGE = "delayed.exchange";    //延迟队列b    public static final String DELAYED_QUEUE = "delayed.queue";    //延迟交换机和队列的routingkey    public static final String DELAYED_ROTINGKEY = "delayed.routingkey";    //public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) {    //		super(name, durable, autoDelete, arguments);    //		this.type = type;    //	}    @Bean    public CustomExchange delayedExchange() {        Map<String, Object> arguments = new HashMap<>();        //定义延迟消息类型由那种交换机规则处置        arguments.put("x-delayed-type", "direct");        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments);    }    @Bean    public Queue delayedQueue() {        return QueueBuilder                .nonDurable(DELAYED_QUEUE)                .build();    }    @Bean    public Binding delayedBinding() {        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROTINGKEY).noargs();    }}

生产者:yvr28资讯网——每日最新资讯28at.com

/*延迟交换机发送消息*/    @GetMapping("/sendDelayedMessage/{message}/{delayedTTL}")    public void sendDelayedMessage(@PathVariable String message, @PathVariable Integer delayedTTL) {        log.info("当前时间:{},发送一条延迟时间为{}的延迟消息给延迟队列:{}", new Date().toString(), delayedTTL, message);        rabbitTemplate.convertAndSend(RabbitDelayedConfig.DELAYED_EXCHANGE,                RabbitDelayedConfig.DELAYED_ROTINGKEY,                message,                msg -> {                    msg.getMessageProperties().setDelay(delayedTTL);//设置消息的延迟消息时间                    return msg;                });    }

消费者:yvr28资讯网——每日最新资讯28at.com

@Slf4j@Componentpublic class DelayedQueueConsumer {    @RabbitListener(queues = RabbitDelayedConfig.DELAYED_QUEUE)    public void queue(Message message) {        log.info("接受到延迟队列的消息,当前时间:{},消息:{}",new Date().toString(),new String(message.getBody()));    }}

本文链接:http://www.28at.com/showinfo-26-12687-0.htmlSpringboot实现Rabbitmq死信队列以及延迟队列的优化

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 福利来啦,一键部署:轻松掌握Docker及Docker-Compose的安装方法

下一篇: 自己动手用Python实现一个保卫果实小游戏【完整版】

标签:
  • 热门焦点
  • K60至尊版刚预热 一加Ace2 Pro正面硬刚

    K60至尊版刚预热 一加Ace2 Pro正面硬刚

    Redmi这边刚如火如荼的宣传了K60 Ultra的各种技术和硬件配置,作为竞品的一加也坐不住了。一加中国区总裁李杰发布了两条微博,表示在自家的一加Ace2上早就已经采用了和PixelWo
  • 小米平板5 Pro 12.4简评:多专多能 兼顾影音娱乐的大屏利器

    小米平板5 Pro 12.4简评:多专多能 兼顾影音娱乐的大屏利器

    疫情带来了网课,网课盘活了安卓平板,安卓平板市场虽然中途停滞了几年,但好的一点就是停滞的这几年行业又有了新的发展方向,例如超窄边框、高刷新率、多摄镜头组合等,这就让安卓
  • Raft算法:保障分布式系统共识的稳健之道

    Raft算法:保障分布式系统共识的稳健之道

    1. 什么是Raft算法?Raft 是英文”Reliable、Replicated、Redundant、And Fault-Tolerant”(“可靠、可复制、可冗余、可容错”)的首字母缩写。Raft算法是一种用于在分布式系统
  • Golang 中的 io 包详解:组合接口

    Golang 中的 io 包详解:组合接口

    io.ReadWriter// ReadWriter is the interface that groups the basic Read and Write methods.type ReadWriter interface { Reader Writer}是对Reader和Writer接口的组合,
  • 拼多多APP上线本地生活入口,群雄逐鹿万亿市场

    拼多多APP上线本地生活入口,群雄逐鹿万亿市场

    Tech星球(微信ID:tech618)文 | 陈桥辉 Tech星球独家获悉,拼多多在其APP内上线了&ldquo;本地生活&rdquo;入口,位置较深,位于首页的&ldquo;充值中心&rdquo;内,目前主要售卖美食相关的
  • 自律,给不了Keep自由!

    自律,给不了Keep自由!

    来源 | 互联网品牌官作者 | 李大为编排 | 又耳 审核 | 谷晓辉自律能不能给用户自由暂时不好说,但大概率不能给Keep自由。近日,全球最大的在线健身平台Keep正式登陆港交所,努力
  • 8月见!小米MIX Fold 3获得3C认证:支持67W快充

    8月见!小米MIX Fold 3获得3C认证:支持67W快充

    这段时间以来,包括三星、一加、荣耀等等有不少品牌旗下的最新折叠屏旗舰都得到了不少爆料,而小米新一代折叠屏旗舰——小米MIX Fold 3此前也屡屡被传
  • iQOO Neo8 Pro抢先上架:首发天玑9200+ 安卓性能之王

    iQOO Neo8 Pro抢先上架:首发天玑9200+ 安卓性能之王

    经过了一段时间的密集爆料,昨日iQOO官方如期对外宣布:将于5月23日推出全新的iQOO Neo8系列新品,官方称这是一款拥有旗舰级性能调校的作品。随着发布时
  • 滴滴违法违规被罚80.26亿 共存在16项违法事实

    滴滴违法违规被罚80.26亿 共存在16项违法事实

    滴滴违法违规被罚80.26亿 存在16项违法事实开始于2121年7月,历经一年时间,网络安全审查办公室对“滴滴出行”网络安全审查终于有了一个暂时的结束。据“网信
Top