(mq)rabbitmq死信队列与延时消息
什么是死信队列?
死信队列:DLX,dead-letter-exchange
利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange(死信交换机),这个Exchange就是DLX
死信交换机本质上也是一个普通交换机,和一般的Exchange没有区别,只不过它处理消息的特殊性,所以称之为死信。它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中的消息做相应的处理。
什么样的消息会成为死信?
大致有三种情况
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息TTL过期
- 队列达到最大长度
延时队列测试代码:
配置类:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Jiajiajia
* @Date: 2021/8/16
* @Description:
*/
@Configuration
public class MqConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue() {
return new Queue("dlxQueue",true);
}
/**
* 直连交换机(死信交换机DLX)
* @return
*/
@Bean
DirectExchange dlxExchange() {
return new DirectExchange("dlxExchange",true,false);
}
/**
* 队列和交换机绑定
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(dlxQueue()).
to(dlxExchange()).with("dlxRouting");
}
/**
* 普通交换机
* @return
*/
@Bean
DirectExchange exchange() {
return (DirectExchange) ExchangeBuilder
.directExchange("exchange") .durable(true) .build();
}
/**
* 普通队列
* @return
*/
@Bean
Queue queue() {
return QueueBuilder.durable("queue")
// 限制队列中最大消息数量
.withArgument("x-max-length", 5)
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange","dlxExchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "dlxRouting")
.build();
}
/**
* 队列和交换机绑定
* @return
*/
@Bean
public Binding Binding(Queue queue, DirectExchange exchange) {
return BindingBuilder .bind(queue) .to(exchange)
.with("routing");
}
}
生产者
/**
* 发送延迟消息
* @return
*/
@GetMapping("testTtl")
private String testTtl(){
rabbitTemplate.convertAndSend("exchange","routing","message info",
message ->{
//延迟3秒处理
message.getMessageProperties().setExpiration(String.valueOf(3*1000));
return message;
});
return "ok";
}
消费者:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DelayMessageReceiver {
@RabbitListener(queues = "dlxQueue")
public void cfgUserReceiveDealy(String message) throws IOException {
System.out.println("消息:"+message);
}
}
当queue队列中消息数量超过5,或者queue队列中的消息超时的时候,都会将消息提交到dlxQueue队列。
猜你喜欢
official
1232
上一篇《(mq)rabbitmq安装延时队列插件实现延时消息1》文章中介绍了rabbitmq安装延时队列插件。本编将继续结合代码来实现延时队列(基于springboot项目)。下方所有源代码均已上传
official
1246
之前分享了在docker中安装rabbitmq的方法参考:http://www.jiajiajia.club/official/weblog/cb1bmf408189/234本文将延续之前的文章,继
official
1249
是通过回调方法,无论消息发送到交换机是否成功,或是否成功路由到队列,都会通过回调方法来感知这些事件。配置文件修改spring:
rabbitmq:
publisher-confirm-type:corr
blog
java阻塞队列实现 生产者消费者 模型
数据结构与算法
1583
,放入后台的一个执行队列中,后台可以慢慢执行,当队列中没有业务数据时,使该执行线程进入等待状态。当业务数据添加进队列中后唤醒处于等待状态的执行线程,继续处理业务。一、阻塞队列的实现packagecom.
blog
阻塞队列及其原理
java基础
4408
1.什么是阻塞队列阻塞队列是一个在队列基础上又支持了两个附加操作的队列。2.支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。1.支持阻塞的移除方法:队列空时,获取元素的线程会等
blog
mqtt 协议的概念和理解
mqtt协议
1590
一、概述二、设计规范三、主要特性四、协议原理mqtt客户端mqtt服务器一、概述 MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输协议),是一种基于发布
official
1921
《操作系统》时间片轮转算法思想:公平地、轮流地为各个进程服务,让每个进程在一定时间间隔内都可以得到响应算法规则:按照各进程到达就绪队列的顺序,轮流让各个进程执行一个时间片(如100ms)。若进程未在
official
1291
上一篇文章《(mq)rabbitmq消息发送确认》介绍了消息发布时的确认方案,本篇文章将介绍,消息消费确认的方法。和确认发布一样,消费者有时也需要确认,rabbitmq有三种确认模式
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。