(mq)rabbitmq死信队列与延时消息

weblog 1054 0 0

什么是死信队列?

死信队列:DLXdead-letter-exchange  

利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange(死信交换机),这个Exchange就是DLX

死信交换机本质上也是一个普通交换机,和一般的Exchange没有区别,只不过它处理消息的特殊性,所以称之为死信。它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中的消息做相应的处理。

什么样的消息会成为死信?

大致有三种情况

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

延时队列测试代码:

配置类:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author: Jiajiajia
* @Date: 2021/8/16
* @Description: 
*/
@Configuration
public class MqConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue("dlxQueue",true);
    }

    /**
     * 直连交换机(死信交换机DLX)
     * @return
     */
    @Bean
    DirectExchange dlxExchange() {
        return new DirectExchange("dlxExchange",true,false);
    }

    /**
     * 队列和交换机绑定
     * @return
     */
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(dlxQueue()).
                to(dlxExchange()).with("dlxRouting");
    }

    /**
     * 普通交换机
     * @return
     */
    @Bean
    DirectExchange exchange() {
        return (DirectExchange) ExchangeBuilder
                .directExchange("exchange") .durable(true) .build();
    }

    /**
     * 普通队列
     * @return
     */
    @Bean
    Queue queue() {
        return QueueBuilder.durable("queue")
                // 限制队列中最大消息数量
                .withArgument("x-max-length", 5)
                // 配置到期后转发的交换
                .withArgument("x-dead-letter-exchange","dlxExchange")
                // 配置到期后转发的路由键
                .withArgument("x-dead-letter-routing-key", "dlxRouting")
                .build();
    }

    /**
     * 队列和交换机绑定
     * @return
     */
    @Bean
    public Binding Binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder .bind(queue) .to(exchange)
                .with("routing");
    }
}

生产者

/**
     * 发送延迟消息
     * @return
     */
    @GetMapping("testTtl")
    private String testTtl(){
        rabbitTemplate.convertAndSend("exchange","routing","message info",
            message ->{
                //延迟3秒处理
                message.getMessageProperties().setExpiration(String.valueOf(3*1000));
                return message;
        });
        return "ok";
}

消费者:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class DelayMessageReceiver {

    @RabbitListener(queues = "dlxQueue")
    public void cfgUserReceiveDealy(String message) throws IOException {
        System.out.println("消息:"+message);
    }
}

当queue队列中消息数量超过5,或者queue队列中的消息超时的时候,都会将消息提交到dlxQueue队列。


猜你喜欢
official 1232 上一篇《(mq)rabbitmq安装插件实现1》文章中介绍了rabbitmq安装插件。本编将继续结合代码来实现(基于springboot项目)。下方所有源代码均已上传
official 1246 之前分享了在docker中安装rabbitmq的方法参考:http://www.jiajiajia.club/official/weblog/cb1bmf408189/234本文将续之前的文章,继
official 1249 是通过回调方法,无论发送到交换机是否成功,或是否成功路由到,都会通过回调方法来感知这些事件。配置文件修改spring: rabbitmq: publisher-confirm-type:corr
数据结构与算法 1583 ,放入后台的一个执行中,后台可以慢慢执行,当中没有业务数据,使该执行线程进入等待状态。当业务数据添加进中后唤醒处于等待状态的执行线程,继续处理业务。一、阻塞的实现packagecom.
java基础 4408 1.什么是阻塞阻塞是一个在基础上又支持了两个附加操作的。2.支持阻塞的插入方法:会阻塞插入元素的线程,直到不满。1.支持阻塞的移除方法:,获取元素的线程会等
mqtt协议 1590 一、概述二、设计规范三、主要特性四、协议原理mqtt客户端mqtt服务器一、概述  MQTT(MessageQueuingTelemetryTransport,遥测传输协议),是一种基于发布
official 1921 《操作系统》间片轮转算法思想:公平地、轮流地为各个进程服务,让每个进程在一定间间隔内都可以得到响应算法规则:按照各进程到达就绪的顺序,轮流让各个进程执行一个间片(如100ms)。若进程未在
official 1291 上一篇文章《(mq)rabbitmq发送确认》介绍了发布的确认方案,本篇文章将介绍,费确认的方法。和确认发布一样,费者有也需要确认,rabbitmq有三种确认模式
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。