【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

七、延迟队列

7.1 什么是延迟队列

正常的MQ应用场景中我们希望消息可以快速稳定的传递。但是有一些场景中希望在指定的延迟后再消费信息比如订单支付场景订单15部分内未支付则关闭订单。

这类实现延迟任务的场景就可以采用延迟队列来实现。

以下介绍一下其他的一些方法。

7.2 延迟队列的解决方案

7.2.1 定时任务

每隔n秒扫描一次数据库查询数据库装为过期的订单进行处理。

实现方式

spring schedule、quartz、xxljob等

优点

简单容易实现

缺点

  1. 存在延迟受定时器延迟时间限制
  2. 性能较差每次扫描数据库如果订单量交大会给数据库造成较大压力。
7.2.2 被动取消

当用户主动查询订单时判断订单是否超时超时则取消

  • 优点服务器压力小
  • 缺点如果用户长时间不查询则会造成统计异常而且用户打开订单页面会变慢严重的话会影响用户体验
7.2.3 JDK的延迟队列

DelayedQueue无界阻塞队列该队列只有在延迟期满后才能从中获取元素。

优点

实现简单任务的延迟低。

缺点

  • 服务器重启宕机数据会丢失
  • 只适用于单机版
  • 订单量大时可能会造成内存不足OOM
7.2.3 采用消息中间件rabbitMQ

RabbitMQ 本身不支持延迟队列可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递前面提到的死信队列。.

image-20231017141210411

把 DLX 跟某个队列绑定到了指定时间消息过期后就会从 DLX 路由到这个队列消费者可以从DLX的队列中取走消息。

7.2.3.1 适用专门优化后的死信队列实现延迟队列

在上面的mq方案中存在两个不同的交换机我们可以利用直连交换机的特性将交换机优化成一个交换机同时通过不同的routingKey指定普通队列和死信队列。

image-20231017141445269

思路解释

  1. 生产者发送消息到交换机X并指定ttl的key
  2. 消息被交换机传递到ttl队列中指定了消息过期时间的队列
  3. 同时ttl队列还指定的死信交换机DLX为自身的交换机X但是指定的routingKey为死信队列的key
  4. 这样当消息在ttl队列中到期后这条消息就会被传递到死信队列中提供给消费者
7.2.3.2 ⭐️实例代码

为了便于测试将发送和接收写在同一个服务中

配置信息

@Configuration
public class DelayExchangeConfig {
    public static String exchangeName = "order.ttl.exchange";
    public static String orderQ = "order.ttl.queue";
    public static String dlxQ = "order.dlx.queue";

    @Bean
    public DirectExchange delayedExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    @Bean
    public Queue orderQueue(){
        // 指定该队列的过期时间和死信队列
        Map<String , Object> properties = new HashMap<>();
        properties.put("x-message-ttl" , 15000);
        properties.put("x-dead-letter-exchange" , exchangeName);
        properties.put("x-dead-letter-routing-key" , "dead-letter");
        return QueueBuilder.durable(orderQ).withArguments(properties).build();
    }

    @Bean
    public Queue dlxQueue(){
        return QueueBuilder.durable(dlxQ).build();
    }

    @Bean
    public Binding dlxBinding1(){
        return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("dead-letter");
    }

    @Bean
    public Binding ttlBinding1(){
        return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("order");
    }

}

测试代码

@RestController
@RequestMapping("/delay")
@Slf4j
public class DelayedController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{msg}")
    public void sentErrorMsg(@PathVariable("msg") String msg) {
        log.info("(延迟队列)准备发送的信息{}  路由键 :{}", msg, "order");
        // 发送到普通的延时列表中
        rabbitTemplate.convertAndSend(exchangeName, "order", msg.getBytes(StandardCharsets.UTF_8));
        log.info("(延迟队列)成功发送发送时间{}" , LocalDateTimeUtil.now());
    }

    @RabbitListener(queues = "order.dlx.queue")
    public void receiveDelayedMsg(Message message){
        log.info("(延迟队列)接受到的消息是{}" , new String(message.getBody()));
    }
}
7.2.3.2 测试结果

配置正确

image-20231017144033384

控制台打印正确15秒后接收到的了之前发送的信息

image-20231017144116843


7.2.4 使用rabbitmq_delayed_message_exchange插件.
7.2.4.1 插件下载

插件下载地址

  • https://www.rabbitmq.com/community-plugins.html
  • https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    • 根据自己的rabbit版本我这里用的是3.9
7.2.4.2 ⭐️如何在docker环境下安装插件

参考文章https://juejin.cn/post/7138717546894589966

  1. 将下载到的文件移动到容器内

    docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
    

image-20231017153230781

  1. 进入容器bash指令并启动插件

    docker exec -it rabbitmq bash
    
    root@rabbit:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    # 使用下面的指令查看插件列表
    rabbitmq-plugins list
    

image-20231017153257970

进入控制台新建交换机能查看到新的交换机类型

image-20231017154024943

7.2.4.3 ⭐️ 代码示例如何使用该插件

官方说明文档https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#usage

image-20231017153803323

理解原理delay exchange在接受到消息后会先存在内部数据库中检查x-delay延迟时间头部

image-20231017154940504

代码使用思路

  1. 要创建自定义的交换机类型要使用CustomExchange()来创建。几个参数的解释如下

    • namerabbit中交换机的名称
    • type交换机类型 x-delayed-message
    • durable是否持久
    • autoDelete是否自动删除
    • arguments参数信息
  2. arguments参数信息从官方文档中获取

    // ... elided code ...
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
    // ... more code ...
    
  3. 交换机创建好后只需要创建一条队列即可并进行绑定

  4. 注意消息发送需要在头部存放信息headers.put("x-delay", 延迟时间)。不需要使用自带的expiration来控制延迟时间了

配置类

@Configuration
public class DelayPluginConfig {
    public static String exchangeName = "delay-x-plugin.x";
    public static String key = "demo";
    @Bean
    public CustomExchange customExchange(){
        // 参考官方文档创建插件提供的自定义交换机
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        // public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        return new CustomExchange(exchangeName, "x-delayed-message" , true , false , args);
    }

    @Bean
    public Queue delayDemoQueue(){
        return QueueBuilder.durable("delay-x-plugin.queue.demo").build();
    }

    @Bean
    public Binding delayPluginBinding(){
        return BindingBuilder
                .bind(delayDemoQueue())
                .to(customExchange())
                .with(key)
                .noargs();
    }
}

生产者

@RestController
@RequestMapping("/delay/plugin")
@Slf4j
public class DelayedPluginController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{delay}/{msg}")
    public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("delay") Long delay) {
        log.info("(延迟插件队列)准备发送的信息{} 延迟时间{} 路由键 :{}", msg, delay , "demo");
        // 在头部设置过期时间
        MessageProperties properties = new MessageProperties();
        properties.setHeader("x-delay", delay);
        Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
        // 发送信息
        rabbitTemplate.convertAndSend(exchangeName, "demo", message);
        log.info("(延迟插件队列)成功发送发送时间:{}", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()));
    }

    @RabbitListener(queues = "delay-x-plugin.queue.demo")
    public void receiveDelayedMsg(Message message) {
        log.info("(延迟插件队列)接受到的消息是{}", new String(message.getBody()));
    }
}

7.2.4.4 测试结果

生成交换机和队列

image-20231017160126659image-20231017160147125

访问路径/delay/plugin/25000/一条25秒过期的信息查看日志打印成功

image-20231017160422203

7.3 问题多个消息的延迟时间不同该如何解决

由于队列先进先出的特性如果不同消息的延迟时间不同一旦出现后进的消息延迟时间小于先进的队列那么消息过期的时间就会出错。

7.3.1 解决方案一用延迟队列区分

要解决这个问题就需要将队列的延迟时间统一将不同的延迟的消息发送到对应延迟的队列中。

保证队列的延迟时间和消息的延迟时间是一样的即可。

如下

image-20231017144817671

7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

由于该插件的原理并不是单纯的队列实现而是使用rabbit内部数据库时间所以可以很好的解决问题。

可以进行一个简单测试验证

  • 先发送一条25秒过期的信息再发送3条5秒过期的信息

  • 查看结果正常消费解决问题

    image-20231017160917110

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ

“【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码-CSDN博客” 的相关文章