【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列-CSDN博客
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
文章目录
消息队列是现代分布式应用中的关键组件用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景包括任务调度、事件通知、日志处理等。在消息队列的应用中有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。
本文将介绍一些与消息队列相关的关键概念和技术包括死信交换机Dead Letter Exchange、消息的 TTLTime To Live生存时间、以及使用 DelayExchange 插件实现消息的延迟处理。通过深入理解这些概念和技术将能帮助我们更好地设计和构建具有高可用性和可靠性的消息队列系统。
首先我将介绍死信交换机以及它的作用然后讨论如何创建死信交换机和死信队列。随后将深入研究消息的TTL了解它的作用和如何配置。最后将探讨如何使用 DelayExchange 插件来实现消息的延迟处理以满足各种应用需求。
一、死信交换机
1.1 什么是死信和死信交换机
在了解什么是死信交换机之前让我们首先来了解一下什么是死信。 在消息队列系统中死信Dead Letter是指未能被成功消费的消息。这些消息通常由于多种原因而变为死信一些主要的原因如下
-
消费失败 当消息被消费者consumer拒绝
reject
或未能被确认acknowledge
并且针对与处理失败的消息没有设置重新入队requeue
参数时它们可能成为死信。这可能是因为消息格式错误、业务处理失败、或者其他原因导致消费者无法处理消息。 -
消息超时 消息在队列中等待消费但在一定时间内未被消费者处理。这个时间限制通常由消息的 TTLTime To Live生存时间来定义。当消息超过其 TTL 后它就变为死信。
-
队列堆积满 当消息队列积累了大量消息无法容纳更多消息时最早的消息可能成为死信因为它们无法被及时处理。
因此为了处理这些死信消息消息队列系统引入了 死信交换机Dead Letter Exchange。死信交换机是一个特殊的交换机它接收死信消息并根据规则将这些消息路由到死信队列。通过使用死信交换机系统可以将死信消息从正常队列中分离出来以便进一步处理或分析。
死信交换机通常与队列绑定当队列中的消息变为死信时它们会被发送到与之相关联的死信交换机然后再路由到死信队列。这种机制使得系统能够更好地处理消息的异常情况确保消息不会被永久丢失。
给队列绑定死信交换机的方法
- 给队列设置
dead-letter-exchange
属性指定一个交换机 - 给队列设置
dead-letter-routing-key
属性设置死信交换机与死信队列的RoutingKey
。
如下图所示
在上图中simple.queue
就与死信交换机 dl.direct
绑定最后路由到死信队列dl.queue
后续就可以编写其他逻辑来处理死信队列中的消息。
死信和死信交换机是构建可靠消息处理系统的重要组成部分它们能够帮助我们跟踪和处理未能成功消费的消息确保数据不会遗失同时提供更好的可用性和可维护性。
1.2 死信交换机和死信队列的创建方式
- 使用
@Bean
的方式创建
// 声明普通的 simple.queue 队列并且为其指定死信交换机dl.direct
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue") // 指定队列名称并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
- 使用
@RabbitListener
注解的方式创建
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDLQueue(String msg) {
log.info("消费者接收到 dl.queue 的延迟消息" + msg);
}
在这种情况下注意需要在创建 simple.queue
是时候绑定死信交换机。
二、消息的 TTL
2.1 什么是消息的 TTL
消息的TTL全称为"Time To Live"是消息队列系统中的一个重要概念。它定义了消息在队列中存活的时间也就是消息在被发送到队列后允许存留在队列中的时间长度。一旦消息的TTL超过设定的时间消息将被认为已过期消息队列系统将会将其标记为死信Dead Letter并将其路由到相关的死信队列。
在消息队列中消息的超时分为两种情况
-
消息所在的队列设置了储存消息的超时时间
-
消息本身设置了超时时间
但是不管哪种情况一定消息超时了都会成为死信如下图所示
对上图的简单解释
- 上图中设置了
ttl.queue
的超时时间为 10000 毫秒意味着一个消息在该队列中储存的时间不会超过这么长的时间 - 另外也可以在发送消息的时候给这个消息设置在队列中的超时时间例如 5000 毫秒。
- 无论是哪种情况一旦消息超时了都会发送到死信交换机然后再路由死信队列最后由处理死信的逻辑处理这些消息。
2.2 基于死信交换机和 TTL 实现消息的延迟
根据上面的死信交换机和 TTL 的特点我们可以实现延迟处理消息的功能TTL 和 死信的交换机及其队列的结构图示如下
下面就使用 Spring AMQP 来声明和实现这些交换机和队列
-
首先通过
@RabbitListener
注解声明一组死信交换机和死信队列并指定处理死信的逻辑@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue", durable = "true"), exchange = @Exchange(name = "dl.direct"), key = "dl" )) public void listenDLQueue(String msg) { log.info("消费者接收到 dl.queue 的延迟消息" + msg); }
-
然后通过
@Bean
的方式声明一组 TTL 的交换机和队列/** * 声明 TTL 交换机 */ @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl.direct", true, false); } /** * 声明 TTL 队列 * 1. 指定消息的 TTL * 2. 指定死信交换机 * 3. 指定死信交换机的 RoutingKey */ @Bean public Queue ttlQueue() { return QueueBuilder .durable("ttl.queue") // 指定队列的名称 .ttl(10_000) // 指定 TTL 为 10 秒 .deadLetterExchange("dl.direct") // 指定死信交换机 .deadLetterRoutingKey("dl") // 指定死信交换机的 RoutingKey .build(); } /** * 绑定 TTL 交换机和队列 */ @Bean public Binding ttlBinding() { return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl"); }
-
最后在
publisher
中编写发送消息的逻辑@Test public void testTTLMessage() { // 1. 创建消息 Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 2. 创建消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3. 发送消息 rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData); log.info("发送延迟消息成功消息ID: {}", correlationData.getId()); }
- 验证延迟消息
在创建ttl.queue
的时候指定了消息在队列中的 TTL 不超过 10 秒因此预测当发送消息 10s 后才会被消费者接收
首先启动 consumer
并清除控制台日志然后再发送消息
通过对比控制台日志的时间可以发现成功将消息延迟了 10 秒。
另外也可以在发送消息时设置超时时间可以通过 MessageBuilder
中的 setExpiration
设置消息的超时时间这里设置为 5 秒
再次发送消息并对比观察控制台日志的输出时间
可以发现此时消息延迟了 5 秒通过上面的对比演示可以得出结论那就是在同时指定了消息的过期时间以及队列的超时时间将会以短的那个时间为准。
三、基于 DelayExchang 插件实现延迟队列
3.1 安装 DelayExchang 插件
- 下载插件
RabbitMQ 有一个官方的插件社区地址为https://www.rabbitmq.com/community-plugins.html。其中包含各种各样的插件包括我们要使用的 DelayExchange 插件
这里我选择的是 3.8.9 的版本
- 上传插件
这里我的 RabbitMQ 是基于 Docker 安装的因此需要先查看 RabbitMQ 的插件目录对应的数据卷
然后直接进入数据卷挂载点目录
可以发现这个目录下其实以及有很多的插件的了然后上传刚才下载的插件到这个目录
- 安装插件
最后就是安装了安装时需要进入 MQ 容器内部来执行安装。我的容器名为mq
所以执行下面命令
docker exec -it mq bash
然后执行安装的命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
最后出现下面的日志就说明安装 DelayExchang 插件成功了
3.2 DelayExchang 实现消息延迟的原理
DelayExchange 是一个用于实现消息延迟发送的插件可以在消息队列系统中非常有用。其工作原理如下
-
创建 DelayExchange首先需要创建一个 DelayExchange这是一个特殊的交换机用于处理延迟消息。通常可以使用消息队列系统的管理工具或API如 Spring AMQP 的API来声明和配置 DelayExchange。
-
发送消息到 DelayExchange当需要发送一个延迟消息时将消息发送到 DelayExchange而不是直接发送到目标队列。在发送消息时需要为消息设置一个属性通常称为
x-delay
它表示消息的延迟时间。这个属性的值通常以毫秒为单位定义了消息应该延迟多长时间才会被投递。 -
DelayExchange 检查 x-delay 属性当消息到达DelayExchange时它会检查消息的
x-delay
属性。如果该属性存在说明这是一个延迟消息。DelayExchange会将消息持久化到硬盘并记录x-delay
的值作为延迟时间。 -
返回 Routing Not FoundDelayExchange 会向消息的发送者返回 “Routing Not Found” 的响应意味着消息当前没有目标队列可以接收。这是因为消息不会立即被投递而是需要等待一定的延迟时间。因此如果设置了生产者消息确认的
publisher-return
的ReturnCallback
就需要进行额外的处理以避免错误的提示。 -
延迟时间到期经过预定的延迟时间后DelayExchange 会重新检查已存储的消息查看是否有消息已经到达或超过了其设定的延迟时间。
-
重新投递消息一旦消息的延迟时间到期DelayExchange将重新投递消息到指定的目标队列允许消费者最终接收和处理消息。
通过 DelayExchange 的这一机制可以实现消息的延迟发送非常适合需要进行任务调度、处理延迟任务或者在时间敏感任务的应用中使用。它有助于减轻系统负载提高消息传递的可靠性以及更好地满足特定的应用需求。
3.3 使用 DelayExchang 实现消息的延迟
-
首先使用
@RabbitListener
注解声明一组延迟交换机和队列以及延迟消息的处理逻辑。@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayExchange(String msg) { log.info("消费者接收到了 delay.queue 的消息" + msg); }
这里使用 @RabbitListener
注解声明交换机和队列和前面的操作基本一致唯一的区别在于声明交换机的时候额外设置了一个 delayed
参数表明声明的是一个延迟交换机。
-
在
publisher
中发送延迟消息@Test public void testDelayMessage() { // 1. 创建消息 Message message = MessageBuilder.withBody("hello, delay message".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setHeader("x-delay", 5000) // 添加 x-delay 头信息 .build(); // 2. 创建消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3. 发送消息 rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData); log.info("通过延迟交换机发送延迟消息成功消息ID: {}", correlationData.getId()); }
同样此处发送消息的逻辑也和前面基本一致只是在 MessageBuilder
中使用 setHeader
额外设置了一个x-delay
的头信息表明了该消息是延迟消息同时也指定了消息的超时时间。
- 验证延迟消息
同样的首先启动 consumer
清除控制台日志然后向延迟交换机发送消息
通过日志可以看到成功发送了延迟消息但是却出现了错误的日志信息告诉我们是delay.direct
交换机没有成功将消息路由到 delay.queue
中但是通过 consumer
的控制台在延迟 5 秒后发现成功接收并处理了这个消息
出现上面错误日志的原则在上文的 DelayExchang 实现消息延迟的原理中的第 4 点已经提到了使用 DelayExchang 实现消息的延迟是会在达到了设置延迟时间再将消息发送给队列的。但是由于交换机在收到消息的时候没有立即路由给队列在返回确认消息给生产者的就是“Routing Not Found”因此就会使得生产者误以为路由失败了。
另外在上面的错误日志中可以发现有一个 receivedDelay
参数的值是 5000也就是延迟的时间我们可以根据这个参数在 RetuenCallback
中排除发送延迟消息时产生的的错误提示
然后再次发送延迟消息到延迟交换机就不会出现上面的错误提示了
至此我们便成功使用 DelayExchang 实现了发送延迟消息的功能。可以发现使用 DelayExchang 插件实现延迟消息比前面使用死信交换机和 TTL 来实现延迟消息更加的简单。
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |