RabbitMQ 高级知识

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

RabbitMQ 高级特性

image.png

5.1 消息可靠性🔥🔥🔥

消息从发送到消费者接收会经理多个过程如下所示
image.png
其中的每一步都可能导致消息丢失常见的丢失原因包括

  • 发送时丢失
    • 生产者发送的消息未送达 exchange
    • 消息到达 exchange 后未到达 queue
  • MQ 宕机queue 将消息丢失
  • consumer 接收到消息后未消费就宕机

针对这些问题RabbitMQ 分别给出了解决方案

  • 生产者确认机制
  • mq 持久化
  • 消费者确认机制
  • 失败重试机制

我们以一个 Demo 进行演示

5.1.1 生产者消息确认

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到 MQ 以后会返回一个结果给发送者表示消息是否处理成功。
返回结果有两种方式

  • publisher-confirm发送者确认
    • 消息成功投递到交换机返回 ack
    • 消息未投递到交换机返回 nack
  • publisher-return发送者回执
    • 消息投递到交换机了但是没有路由到队列。返回ACK及路由失败原因。

注意确认机制发送消息时需要给每个消息设置一个全局唯一 id以区分不同消息避免 ack 冲突

image.png
修改 publisher 服务中的配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

说明

  • publish-confirm-type开启 publisher-confirm这里支持两种类型
    • simple同步等待 confirm 结果直到超时
    • correlated异步回调定义 ConfirmCallback ,MQ 返回结果时会回调这个 ConfirmCallback
  • publish-returns开启 publish-return 功能同样是基于callback机制不过是定义ReturnCallback
  • template.mandatory定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息

定义回调函数

// 设置发送者确认回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 自定义的数据 一般是消息的 UUID
* @param b 是否确认 true:消息发送到 exchange 中  false:消息未发送到 exchange 中
* @param s 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
log.info("发送确认回调触发  消息的ID===> {}", correlationData.getId());
if (b) {
log.info("消息成功发送到交换机中");
} else {
log.error("消息发送到交换机中失败原因{}", s);
// 可以重发
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要这个方法被调用代表消息没能正确路由到队列被 mq 返还回来了
* @param message 返回的消息
* @param i 回复状态码
* @param s 回复内容
* @param s1 交换机
* @param s2 路由 key
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
i, s, s1, s2, message.toString());
// 如果有业务需要可以重发
}
});

5.1.2 消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中但是消息发送到RabbitMQ以后如果突然宕机也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

默认情况下由 SpringAMQP 声明的交换机都是持久化的

@Bean
public FanoutExchange fanoutExchange() {
    // 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new FanoutExchange("fanout.exchange", true, false);
}

由 SpringAMQP 声明的队列都是持久化的

@Bean
public Queue queue() {
    return new Queue("fanout.queue");
}

利用 SpringAMQP 发送消息时可以设置消息的属性MessageProperties指定 delivery-mode

  • 1非持久化
  • 2持久化

默认情况下SpringAMQP 发出的任何消息都是持久化的不用特意指定

@Test
public void testSendDurableMessage() throws InterruptedException {
    // 1.消息体
    Message message = MessageBuilder.
            withBody("hello, spring amqp!".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    // 2.发送消息
    rabbitTemplate.convertAndSend("simple.queue", message);
}

5.1.3 消费者消息确认

设想这样的场景

  • RabbitMQ 投递消息给消费者
  • 消费者获取消息后返回 ACK 给 RabbitMQ
  • RabbitMQ 删除消息
  • 消费者宕机消息尚未处理

这样消息就丢失了。因此消费者返回 ACK 的时机非常重要
而 SpringAMQP 则允许配置三种确认模式

  • manual手动 ack需要在业务代码结束后调用 ap i发送 ack。
  • auto自动 ack由 spring 监测 listener 代码是否出现异常没有异常则返回 ack抛出异常则返回nack
  • none关闭 ackMQ 假定消费者获取消息后会成功处理因此消息投递后立即被删除

由此可知

  • none模式下消息投递是不可靠的可能丢失
  • auto模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ack
  • manual自己根据业务情况判断什么时候该ack

一般我们都是使用默认的auto即可
手动ack:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 手动ack
@RabbitListener(
       bindings = {
               @QueueBinding(
                       value = @Queue,
                       exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
                       key = {"black.*.#"}
               )
       }
)
public void getMessage3(String msg, Channel channel, Message message) throws IOException {
   System.out.println("接收到消息3" + msg);

  // 手动 ack
   channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

auto 模式
首先我们修改消费者的 yml 配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

在异常位置打断点再次发送消息程序卡在断点时可以发现此时消息状态为unack未确定状态
image.png
抛出异常后因为Spring会自动返回nack所以消息恢复至Ready状态并且没有被RabbitMQ删除
image.png

5.1.4 消费失败重试机制

当消费者出现异常后消息会不断 requeue重入队到队列再重新发送给消费者然后再次异常再次 requeue无限循环导致 mq 的消息处理飙升带来不必要的压力我们怎么办
我们可以利用 Spring 的 retry 机制本地重试在消费者出现异常时利用本地重试而不是无限制的 requeue 到 mq 队列。
修改 consumer 服务的 application.yml 文件添加内容

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态false有状态。如果业务中包含事务这里改为false

重启 consumer 服务重复之前的测试。可以发现

  • 在重试3次后SpringAMQP会抛出异常 AmqpRejectAndDontRequeueException说明本地重试触发了
  • 查看 RabbitMQ 控制台发现消息被删除了说明最后 SpringAMQP 返回的是ackmq删除消息了

由上述的发现可得知开启本地重试后最终消息还是会丢失这个我们需要怎么解决
这个时候我们可以自定义失败策略
在之前的测试中达到最大重试次数后消息会被丢弃这是由 Spring 内部机制决定的
在开启重试模式后重试次数耗尽如果消息依然失败则需要有 MessageRecovery 接口来处理它包含三种不同的实现

  • RejectAndDontRequeueRecoverer重试耗尽后直接 reject丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer重试耗尽后返回 nack消息重新入队这种也类似无限循环
  • ** RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机 **

比较优雅的一种处理方案是 RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列后续由人工集中处理。
image.png
在 consumer 中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

定义一个 RepublishMessageRecoverer关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

5.2 死信交换机

5.2.1 什么是死信交换机

当一个队列中的消息满足下列情况之一时可以成为死信dead letter

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败并且消息的 requeue 参数设置为false
  • 消息是一个过期消息超时无人消费
  • 要投递的队列消息满了无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性指定了一个交换机那么队列中的死信就会投递到这个交换机中而这个交换机称为死信交换机Dead Letter Exchange检查DLX
image.png
另外队列将死信投递给死信交换机时必须知道两个信息

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机并且正确的路由到死信队列
在失败重试策略中默认的 RejectAndDontRequeueRecoverer 会在本地重试次数耗尽后发送 reject 给RabbitMQ消息变成死信被丢弃。
我们可以给 simple.queue 添加一个死信交换机给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃而是最终投递到死信交换机路由到与死信交换机绑定的队列这也是一种防止信息丢失的方法不过我们经常用的是失败策略而不用死信交换机原因是因为配置麻烦

@Bean
public Queue simpleQueue() {
    // 配置死信交换机
    return QueueBuilder.durable("simple.queue")
            .deadLetterExchange("dl.exchange")
            .deadLetterRoutingKey("dl")
            .build();
}

@Bean
public Queue dlQueue() {
    return new Queue("dl.queue");
}

@Bean
public DirectExchange dlExchange() {
    return new DirectExchange("dl.exchange");
}

@Bean
public Binding dlBinding() {
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");
}

5.2.2 TTL

一个队列中的消息如果超时未消费则会变为死信超时分为两种情况

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

image.png

/**
 * 基于注解方式声明一组死信交换机和队列
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "dl.queue"),
        exchange = @Exchange(name = "dl.direct"),
        key = "dl"
))
public void listenDlQueue(String msg) {
    log.info("接收到 ttl.queue的延迟消息{}", msg);
}
@Bean
public DirectExchange ttlExchange() {
    return new DirectExchange("ttl.direct");
}

@Bean
public Queue ttlQueue() {
    return QueueBuilder.durable("ttl.queue")
            .ttl(10000) // 设置队列的超时时间 10s
            .deadLetterExchange("dl.direct") // 指定死信交换机
            .deadLetterRoutingKey("dl") // 指定死信 RoutingKey
            .build();
}

@Bean
public Binding ttlBinding() {
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
@Test
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
            .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
            .setExpiration("5000") // 设置消息的过期时间 5s
            .build();
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
}

总结
消息超时的两种方式是

  • 给队列设置ttl属性进入队列后超过ttl时间的消息变为死信
  • 给消息设置ttl属性队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息

  • 给消息的目标队列指定死信交换机
  • 将消费者监听的队列绑定到死信交换机
  • 发送消息时给消息设置超时时间为20秒

5.2.3 延迟队列

上面的死信交换机和 TTL 在我们项目中一般不使用我们一般使用延迟队列来进行实现延迟发送效果
因为延迟队列的需求非常多所以RabbitMQ的官方也推出了一个插件原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面https://www.rabbitmq.com/community-plugins.html
在使用的时候我们首先需要安装这里不再演示
DelayExchange 原理
DelayExchange需要将一个交换机声明为 delayed 类型。当我们发送消息到 delayExchange 时流程如下

  • 接收消息
  • 判断消息是否具备 x-delay 属性
  • 如果有 x-delay 属性说明是延迟消息持久化到硬盘读取 x-delay 值作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay 时间到期后重新投递消息到指定队列

使用 DelayExchange

  1. 首先声明 DelayExchange 交换机

注解声明推荐

@RabbitListener(bindings = @QueueBinding(
        value = @Queue("delay.queue"), // 队列
        exchange = @Exchange(value = "delay.direct", delayed = "true"), // Dealay 交换机
        key = "delay"
))
public void listenDelayQueue(String msg) {
    log.info("接收到 delay.queue的延迟消息{}", msg);
}

基于 Bean 方式
image.png

  1. 发送消息

发送消息时一定要携带 x-delay 属性指定延迟的时间

@Test
public void testDelayMsg() {
    // 创建消息
    Message message = MessageBuilder
            .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
            .setHeader("x-delay", 10000)
            .build();
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
}

5.3 惰性队列

5.3.1 消息堆积问题🔥🔥🔥

当生产者发送消息的速度超过了消费者处理消息的速度就会导致队列中的消息堆积直到队列存储消息达到上限。之后发送的消息就会成为死信可能会被丢弃这就是消息堆积问题
image.png
解决消息堆积有三种种思路

  • 增加更多消费者提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积提高堆积上限

5.3.2 惰性队列

从 RabbitMQ 的 3.6.0 版本开始就增加了 Lazy Queues 的概念也就是惰性队列。惰性队列的特征如下

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

惰性队列的优点有哪些

  • 基于磁盘存储消息上限高
  • 没有间歇性的page-out性能比较稳定

惰性队列的缺点有哪些

  • 基于磁盘存储消息时效性会降低
  • 性能受限于磁盘IO

基于命令行设置lazy-queue
而要设置一个队列为惰性队列只需要在声明队列时指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列

rabbitmqctl set_policy Lazy "^simple.queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读

  • rabbitmqctl RabbitMQ的命令行工具
  • set_policy 添加一个策略
  • Lazy 策略名称可以自定义
  • "^lazy-queue$" 用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' 设置队列模式为lazy模式
  • --apply-to queues策略的作用对象是所有的队列

基于@Bean声明lazy-queue

@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.queue")
            .lazy() // 开启 x-queue-mode 为 lazy
            .build();
}

基于@RabbitListener声明LazyQueue

@RabbitListener(queuesToDeclare = @Queue(
        value = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy") // 惰性队列
))
public void listenLazyQueue(String msg) {
    log.info("接收到 lazy.queue的消息{}", msg);
}

5.4 MQ 集群

集群搭建这部分一般开发不会搭建而是运维搭建了解即可但是我们需要知道 MQ 的集群以及特点
RabbitMQ的是基于Erlang语言编写而Erlang又是一个面向并发的语言天然支持集群模式。RabbitMQ的集群有两种模式

  • 普通模式普通模式集群提高了并发能力但是不进行数据同步每个MQ都有自己的队列、数据信息其它元数据信息如交换机等会同步。例如我们有2个MQmq1和mq2如果你的消息在mq1而你连接到了mq2那么mq2会去mq1拉取消息然后返回给你。如果mq1宕机消息就会丢失。
  • 镜像模式与普通模式不同队列会在各个mq的镜像节点之间同步因此你连接到任何一个镜像节点均可获取到消息提高了数据的可用性。而且如果一个节点宕机并不会导致数据丢失。不过这种方式增加了数据同步的带宽消耗。

镜像集群虽然支持主从但主从同步并不是强一致的某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后推出了新的功能仲裁队列来代替镜像集群底层采用Raft协议确保主从的数据一致性。

5.4.1 普通集群

普通集群或者叫标准集群classic cluster具备下列特征

  • 会在集群的各个节点间共享部分数据包括交换机、队列元信息。不包含队列中的消息
  • 当访问集群某节点时如果队列不在该节点会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机队列中的消息就会丢失

image.png

5.4.2 镜像集群

在普通集群中一旦创建队列的主机宕机队列就会不可用。不具备高可用能力。如果要解决这个问题必须使用官方提供的镜像集群方案
官方文档地址https://www.rabbitmq.com/ha.html
镜像集群本质是主从模式具备下面的特征

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点备份到的其它节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成然后同步给镜像节点
  • 主宕机后镜像节点会替代成新的主节点如果在主从同步完成前主就已经宕机可能出现数据丢失

image.png

5.4.3 仲裁队列

仲裁队列仲裁队列是3.8版本以后才有的新功能用来替代镜像队列具备下列特征

  • 与镜像队列一样都是主从模式支持主从数据同步
  • 使用非常简单没有复杂的配置
  • 主从同步基于Raft协议强一致

用 Java 代码创建仲裁队列

@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue") // 持久化
        .quorum() // 仲裁队列
        .build();
}

SpringAMQP 连接 MQ 集群

spring:
  rabbitmq:
    addresses: 192.168.80.128:8071, 192.168.80.128:8072, 192.168.80.128:8073
    username: muziteng
    password: 806823
    virtual-host: /

注意这里用 address 来代替 host、port 方式

更多知识在我的语雀知识库https://www.yuque.com/ambition-bcpii/muziteng

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ