RabbitMQ消息中间件-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、初始MQ

首先了解一下微服务间通讯有同步和异步两种方式

- 同步通讯是指两个或多个系统在进行信息交换时必须在同一时刻进行操作
- 异步通讯是指两个或多个系统之间的通讯方式其中发送方和接收方不是在同一时刻进行操作。

同步调用的优点

- 时效性较强可以立即得到结果

同步调用的缺点

- 多个系统间耦合扩展及后续维护繁琐
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败风险

异步通讯
优势一服务解耦
优势二性能提升吞吐量提高
优势三服务没有强依赖不担心级联失败问题
优势四流量削峰

缺点

- 架构复杂了业务没有明显的流程线不好管理对程序员的技术要求高了
- 需要依赖于Broker的可靠、安全、性能搭建集群

1. 技术对比

MQ中文是消息队列Message Queue字面来看就是存放消息的队列。
比较常见的MQ实现也被称为消息中间件:

- ActiveMQ
- **RabbitMQ**
- **RocketMQ**
- Kafka

几种常见MQ的对比
对比RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQPXMPP SMTPSTOMPOpenWire,STOMP REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
选择原则

- 追求可用性Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性RabbitMQ、RocketMQ
- 追求吞吐能力RocketMQ、Kafka
- 追求消息低延迟RabbitMQ、Kafka

2. 原生JavaAPI实现MQ

在这之前先认识RabbitMQ中的一些角色

- publisher生产者使用Java代码发送消息
- consumer消费者使用Java代码接收消息
- exchange交换机负责消息路由
- queue队列存储消息
- virtualHost虚拟主机隔离不同租户的exchange、queue、消息
2.1 MQ的消息模型
- 简单队列
- 工作队列模式
- 发布订阅模式
- Fanout广播
- Direct定向模式
- Topic主题
- 消息转换器

下面使用原生API只展示简单队列模式
2.2 原生JavaAPI实现简单队列
简单队列模式的模型图
Publisher
Queue
Consumer
- publisher消息发布者将消息发送到队列queue
- queue消息队列负责接收并缓存消息
- consumer订阅队列处理队列中的消息
下面使用的是官方提供的原生JavaAPI完成的不用自己手敲代码练习下面有利用Spring简化开发的方案
//生产端publisher实现
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.200.130");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("用户名");//设置自己的用户名和密码
        factory.setPassword("*****");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}
/***********************************************************************************************/
//消费端consumer实现

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.200.130");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("用户");  //用户密码和上面的生产端保持一致
        factory.setPassword("*****");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, 
                                       Envelope envelope,
                                       AMQP.BasicProperties properties, 
                                       byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

小结

基本消息队列的消息发送流程

1. 建立connection

2. 创建channel

3. 利用channel声明队列

4. 利用channel向队列发送消息

基本消息队列的消息接收流程

1. 建立connection

2. 创建channel

3. 利用channel声明队列

4. 定义consumer的消费行为handleDelivery()

5. 利用channel将消费者与队列绑定

3.基于SpringAMQP实现MQ

SpringAMQP是基于RabbitMQ封装的一套模板并且利用SpringBoot对其实现了自动装配使用起来非常方便。

SpringAmqp的官方地址https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能

- 自动声明队列、交换机及其绑定关系代码+注解
- 封装了RabbitTemplate工具用于发送消息 rabbitTemplate.convertAndSend()
- 基于注解的监听器模式异步接收消息@RabbitListener
Publisher
Queue
Consumer
在父工程中引入依赖
<!--AMQP依赖包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.简单队列
消息发送 

首先配置MQ地址在publisher服务的application.yml中添加配置
spring:
  rabbitmq:
    host: 192.168.200.130 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: 用户名 # 自己的用户名不能为中文和密码
    password: *****
    在publisher服务中编写测试类SpringAmqpTest并利用RabbitTemplate实现消息发送。
    代码实现如下
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTempslate;

    @Test //不要导错包用比较长的import org.junit.jupiter.api.Test;
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息此处并不会自动创建队列
        rabbitTemplate.convertAndSend(queueName, message);
    }
}
消息接收:
    首先配置MQ地址在consumer服务的application.yml中添加配置
    
spring:
  rabbitmq:
    host: 192.168.200.130 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码    
在consumer服务的中新建一个类SpringRabbitListener
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) {
        System.out.println("spring 消费者接收到消息【" + msg + "】");
    }
}
2.工作队列(Work queues)
    当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往
    消息就会堆积越来越多无法及时处理。如何解决呢
    
  - 那我们可以让多个消费者绑定到一个队列共同消费队列中的消息。
 这个就称为Work queues也被称为Task queues任务模型。可以使用work 模型
 多个消费者共同处理消息处理速度就能大大提高了。
Publisher
Queue
Consumer 1
Consumer 2
消息发送:

    在publisher服务中的SpringAmqpTest类中添加一个测试方法
/**
     * workQueue
     * 向队列中不停发送消息模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 1; i <= 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}
消息接收:
     要模拟多个消费者绑定同一个队列我们在consumer中添加2个新的方法
//@RabbitListener(queues = "simple.queue")
//public void listenSimpleQueueMessage(String msg) {
//    System.out.println(msg);
//}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println(LocalTime.now() + "消费者1" + msg);
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println(LocalTime.now() + "消费者2" + msg);
    Thread.sleep(200);
}
运算之后得到结果消息是平均分配给每个消费者并没有考虑到消费者的处理能力。这样显然是有问题的。
怎样解决这个问题呢

我们可以修改consumer服务的application.yml文件添加配置

spring:
  rabbitmq:
    host: 192.168.200.130 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: 用户名 # 自己的用户名和密码
    password: **** 
    listener: #监听
      simple: #简单消息模型
        prefetch: 1  #每次只能获取一条消息处理完成才能获取下一个消息
        
 Work模型的使用

- 多个消费者绑定到一个队列同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
3.发布/订阅
发布订阅的模型如图
Publisher
exchange
Queue1
Consumer 1
Consumer 2
Queue2
Consumer 3
在订阅模型中多了一个exchange角色而且过程略有变化

- Publisher生产者也就是要发送消息的程序但是不再发送到队列中而是发给exchage交换机

- Consumer消费者与以前一样订阅队列没有变化

- Queue消息队列也与以前一样接收消息、缓存消息。

- Exchange交换机消息路由。一方面接收生产者发送的消息。另一方面知道如何处理消息
- 例如递交给某个特别队列、递交给所有队列、或将消息丢弃。到底如何操作取决于Exchange的类型。

  Exchange有以下3种类型

  - Fanout扇出广播将消息交给所有绑定到交换机的队列
  - Direct定向把消息交给符合指定routing key 的队列
  - Topic通配符把消息交给符合routing pattern路由模式 的队列

Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定
或者没有符合路由规则的队列那么消息会丢失
4.Fanout广播
Fanout英文翻译是扇出在MQ中理解成广播更合适。
Publisher
exchange
Queue1
Consumer 1
Queue2
Consumer 2
在广播模式下消息发送流程是这样的

- 1  可以有多个队列
- 2  每个队列都要绑定到Exchange交换机
- 3  生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定
- 4  交换机把消息发送给绑定过的所有队列
- 5  订阅队列的消费者都能拿到消息

声明队列和交换机

Spring提供了一个接口Exchange来表示所有不同类型的交换机UML类图

在这里插入图片描述

在consumer服务中创建一个类声明队列和交换机
    
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean // 方法中的参数从IoC容器中获取
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
} 
消息发送:
在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收
在consumer服务的SpringRabbitListener中添加两个方法作为消费者

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息【" + msg + "】");
}
交换机的作用是什么

- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息路由失败消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么

- Queue
- FanoutExchange
- Binding
5.Direct定向
在Fanout模式中一条消息会被所有订阅的队列都消费。
但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
key:blue
key:red
Publisher
exchange
Queue
Consumer 1
Queue2
Consumer 2
 在Direct模型下

- 队列与交换机的绑定不能是任意绑定了而是要指定一个`RoutingKey`路由key。
- 消息的发送方在 向 Exchange发送消息时也必须指定消息的 `RoutingKey`。
- Exchange不再把消息交给每一个绑定的队列而是根据消息的`Routing Key`进行判断
只有队列的`Routingkey`与消息的 `Routing key`完全一致才会接收到消息。
案例需求如下:

1. 利用@RabbitListener声明Exchange、Queue、RoutingKey

2. 在consumer服务中编写两个消费者方法分别监听direct.queue1和direct.queue2

3. 在publisher中编写测试方法向itcast. direct发送消息
声明队列和交换机
    基于@Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。
    
    在consumer的SpringRabbitListener中添加两个消费者同时基于注解来声明队列和交换机
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"), //创建队列
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),//创建交换机
    key = {"red", "blue"} //绑定接受消息的key
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息【" + msg + "】");
}
消息发送

在publisher服务的SpringAmqpTest类中添加测试方法

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉";
    // 发送消息key=red两个消费者都能收到消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
    
    // 发送消息key=blue消费者1 能收到消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    
    // 发送消息key=yellow消费者2 能收到消息
    rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
}
总结
Direct交换机与Fanout交换机的差异

- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机的常见注解

- 开始声明bindings= ? 
- 指定一个绑定关系 @QueueBinding
- 声明队列value = @Queue
- 声明交换机exchange = @Exchange
- 指定路由keykey = {一个或多个}
6.Topic主题
`Topic`类型的`Exchange`与`Direct`相比

- 相同点都可以根据`RoutingKey`把消息路由到不同的队列
- 不同点`Topic`类型`Exchange`可以让队列在绑定`Routing key` 的时候使用通配符

> `Routingkey` 一般都是有一个或多个单词组成多个单词之间以”.”分割
例如 `item.insert`, item.del

> 通配符规则
>
> `#`匹配零个一个或多个词任意多个【常用】
>
> `*`匹配不多不少必须是1个词
topic
bindingKey1
bindingKey2
bindingKey3
bindingKey4
Publisher
exchange
Queue1
Consumer 1
Queue2
Consumer 2
Queue3
Consumer 3
Queue4
Consumer 4
举例
demo.#能够匹配demo, demo.spu, demo.spu.insert
demo.*只能匹配demo.spu
实现思路如下

1. 并利用@RabbitListener声明Exchange、Queue、RoutingKey

2. 在consumer服务中编写两个消费者方法分别监听topic.queue1和topic.queue2

3. 在publisher中编写测试方法向itcast. topic发送消息

- Queue1假设绑定的是`china.#` 因此凡是以 `china.`开头的`routing key` 都会被匹配到。
- 包括china.news和china.weather
- Queue2假设绑定的是`#.news` 因此凡是以 `.news`结尾的 `routing key` 都会被匹配。
- 包括china.news和japan.news
消息接收
    在consumer服务的SpringRabbitListener中添加方法

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "demo.queue1"),
    exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "demo.queue2"),
    exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到demo.queue2的消息【" + msg + "】");
}
消息发送
    在publisher服务的SpringAmqpTest类中添加测试方法:
/**
     * topicExchange
     */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "demo.topic";
    // 消息
    String message = "喜报孙悟空大战哥斯拉胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    
     // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", "明天天气晴20-36度");
}
# 总结

描述下Direct交换机与Topic交换机的差异

- Topic交换机接收的消息RoutingKey必须是多个单词以 `**.**` 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
  - `#`代表0个1个或多个词
  - `*`代表1个词
7.消息转换器
Spring会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为下面的message对象。
void convertAndSend(String exchange, String routingKey,Object message) throw AmqpException;
默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题

- 数据体积过大
- 可读性差
测试默认转换器:

@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "Jack");
    msg.put("age", 21);
    // 发送消息
    rabbitTemplate.convertAndSend("simple.queue", msg);
}
1、执行前先停止consumer服务防止消息被消费掉无法在RabbitMQ控制台看到
2、MQ服务上没有simple.queue临时通过管理端快速创建一个
发送消息后查看控制台

在这里插入图片描述

# 配置JSON转换器

   显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化
   和反序列化。
   
1、在publisher和consumer两个服务中都引入依赖因此咱们选择在父工程添加
<!-- mq-demo的pom.xmljacksonSpringBoot用的 -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
2、配置消息转换器

#在PublisherApplication和ConsumerApplication两个启动类中都添加一个Bean
PublisherApplication作用Java对象 =====》JSON字符串

import org.springframework.amqp.support.converter.MessageConverter;

@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }

    @Bean //注意导包org.springframework.amqp.support.converter.MessageConverter
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
3、重新发送消息通过管理平台查询效果

在这里插入图片描述

4、接受消息SpringRabbitListener
@RabbitListener(queues = "simple.queue")
public void listenObjectQueue(Map<String,Object> msg){
    System.out.println("接收到object.queue的消息" + msg);
}

二、MQ高级

1.消息可靠性

消息从发送到消费者接收会经历多个过程
Publisher
exchange
Queue1
Consumer 1
Queue2
Consumer 2
其中的每一步都可能导致消息丢失常见的丢失原因包括

- 发送时丢失
  - 生产者发送的消息未送达exchange
  - 消息到达exchange后未到达queue
- MQ宕机queue将消息丢失
- consumer接收到消息后未消费就宕机

针对这些问题RabbitMQ分别给出了解决方案

- 生产者确认机制发送时丢失
- 消息持久化MQ宕机
- 消费者确认机制消费者宕机
- 失败重试机制消费失败
1.1.生产者消息确认
    RabbitMQ提供了生产者确认机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。
    消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。
    
返回结果有两种方式

- publisher-confirm发送者确认
  - 消息成功投递到交换机返回ack
  - 消息未投递到交换机返回nack
  
- publisher-return发送者回执
  - 消息投递到交换机了但是没有路由到队列。返回通知及路由失败原因。
  - 正常到达队列没有任何回复没有回复就是成功
确认机制发送消息时需要给每个消息设置一个全局唯一Id以区分不同消息避免ack冲突。
举个栗子

在这里插入图片描述

修改publisher服务中的application.yml文件添加下面的内容

spring:
  rabbitmq:
    host: 192.168.200.130 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
    publisher-confirm-type: correlated #判断是否到达交换机异步通知
    publisher-returns: true #判断是否到达队列
    template:
      mandatory: true #定义消息路由失败时的策略
解释说明一下

- `publish-confirm-type`开启publisher-confirm这里支持两种类型
  - `simple`同步等待confirm结果直到超时【一般不使用影响性能】
  - `correlated`异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback
  
- `publish-returns`
  - `true`开启publish-return功能同样是基于callback机制不过是定义ReturnCallback
  
  - `false`关闭publish-return功能
- `template.mandatory`定义消息路由失败时的策略。
  - true则调用ReturnCallback
  - false则直接丢弃消息
修改consumer服务中的application.yml改为自己的虚拟机IP

spring:
  rabbitmq:
    host: 192.168.200.130 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
定义Return回调:
    每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目启动过程中配置
作用 如果消息没有到达队列会执行回调方法

修改publisher服务添加一个ReturnCallback
package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
//ApplicationContextAware: 在Spring容器Bean工厂创建好的时候通知咱们
public class CommonConfig implements ApplicationContextAware {
    
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback先用匿名内部类
        rabbitTemplate.setReturnCallback(
            (message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败没有到达队列记录日志
            log.error("消息队列接收失败应答码{}原因{}交换机{}路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要可以重发消息
            //rabbitTemplate.convertAndSend(exchange, routingKey, message);
        });
    }
}
定义Confirm回调:

    ConfirmCallback可以在发送消息时指定因为每个业务处理confirm成功或失败的逻辑不一定相同。
    在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中定义一个单元测试方法
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息体
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID需要封装到CorrelationData中
    //uuid, 雪花算法
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){
                    // 3.1.ack消息成功
                    log.debug("消息发送到交换机成功, ID:{}", correlationData.getId());
                }else{
                    // 3.2.nack消息失败
                    log.error("消息发送到交换机失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.发送消息其中"simple.test"是路由key
    rabbitTemplate.convertAndSend(
            "amq.topic", "simple.test", message, correlationData);

    // 休眠一会儿等待ack回执
    //如果不休眠程序就直接结束了RabbitMQ服务器就无法回调咱们写的代码
    Thread.sleep(2000);
}
登录到MQ的管理端
# 测试

1、发送到一个不存在的交换机camq.topic
rabbitTemplate.convertAndSend(
      "camq.topic", "simple.test", message, correlationData);

//查看日志会有一个没有到达交换机的信息
2、发送到一个已经存在的交换机amq.topic系统自带的但没有绑定指定的路由
rabbitTemplate.convertAndSend(
      "amq.topic", "simple.test", message, correlationData);

//查看日志没有路由到队列
3、通过管理端指定amq.topic交换机的路由key到simple.queue

在这里插入图片描述

rabbitTemplate.convertAndSend(
      "amq.topic", "simple.test", message, correlationData);

//成功发送需要到管理端查看一下队列中是否有消息
1.2.消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中但是消息发送到RabbitMQ以后如果突然宕机
也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存必须开启消息持久化机制。

- 交换机持久化
- 队列持久化
- 消息持久化
# 交换机持久化

RabbitMQ中交换机默认是非持久化的mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化
@Bean
public DirectExchange simpleExchange(){
    // 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    //durable: 持久化
    return new DirectExchange("simple.direct", true, false);
    
    //默认创建就是持久化的交换机
    //return new DirectExchange("simple.direct");
}
提示由SpringAMQP声明的交换机都是持久化的
可以在RabbitMQ控制台看到持久化的交换机都会带上`D`的标示

在这里插入图片描述

# 队列持久化

RabbitMQ中队列默认是非持久化的mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化
@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
    
    //return new Queue("simple.queue");
}
提示由SpringAMQP声明的交换机都是持久化的

在这里插入图片描述

# 消息持久化

利用SpringAMQP发送消息时可以设置消息的属性MessageProperties指定delivery-mode

- 非持久化MessageDeliveryMode.NON_PERSISTENT
- 持久化MessageDeliveryMode.PERSISTENT

用java代码指定
@Test
public void testDurableMessage() {
    // 1.准备消息
    Message message = MessageBuilder
            .withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //设置消息的属性持久化
            .build();
    // 2.发送消息
    rabbitTemplate.convertAndSend("simple.queue", message);
}
提示由SpringAMQP声明的交换机都是持久化的
1.3.消费者消息确认
RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的消费者获取消息后应该向RabbitMQ发送ACK回
执表明自己已经处理消息。

设想这样的场景

- 1RabbitMQ投递消息给消费者
- 2消费者获取消息后返回ACK给RabbitMQ
- 3RabbitMQ删除消息
- 4消费者宕机消息尚未处理

这样消息就丢失了。因此消费者返回ACK的时机非常重要。

/********************************************************************************

而SpringAMQP则允许配置三种确认模式

- manual手动ack需要在处理完消息后调用api发送ack【麻烦一般不使用】。
- auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack
- none关闭ackMQ假定消费者获取消息后肯定会成功处理因此消息投递后立即被删除



由此可知

- manual自己根据业务情况判断什么时候该ack太麻烦不使用
- auto模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ack
- none模式下消息投递是不可靠的可能丢失不适合用在项目中

因此我们都是使用默认的auto即可。
# none模式
修改consumer服务的application.yml文件添加下面内容

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法模拟一个消息处理异常
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    log.info("消费者接收到simple.queue的消息【{}】", msg);
    // 模拟异常 ->给MQ返回nack
    System.out.println(1 / 0);
    log.debug("消息成功处理完成");
}

//  测试可以发现当消息处理抛异常时消息依然被RabbitMQ删除了
# auto模式
# 再次把确认机制修改为auto:

spring:
  rabbitmq:
    listener:
      simple:
        #消费成功返回ack
        #消费失败返回nack
        acknowledge-mode: auto # 根据异常自动ack
在异常位置打断点再次发送消息程序卡在断点时可以发现此时消息状态为unack未确定状态

在这里插入图片描述

抛出异常后因为Spring会自动返回nack所以消息恢复至Ready状态并且没有被RabbitMQ删除

在这里插入图片描述

1.4.消费失败重试机制
当消费者出现异常后消息会不断requeue重入队到队列再重新发送给消费者然后再次异常再次requeue
无限循环导致mq的消息处理飙升带来不必要的压力

在这里插入图片描述

怎么办呢
# 本地重试

我们可以利用Spring的retry机制在消费者出现异常时利用本地重试而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件添加内容
spring:
  rabbitmq:
    listener:
      simple:
        retry: #本地重试
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 2 # 失败的等待时长倍数下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数包含服务器推送的第一次
          stateless: true # true无状态false有状态。如果业务中包含事务这里改为false
重启consumer服务重复之前的测试。可以发现

- 在重试3次后SpringAMQP会抛出异常AmqpRejectAndDontRequeueException说明本地重试触发了

- 查看RabbitMQ控制台发现消息被删除了RejectAndDontRequeue说明最后SpringAMQP返回的是ack
mq删除消息了

reject: 拒绝
don't re queue: 不要重新放到队列


# 结论

- 开启本地重试时消息处理过程中抛出异常不会requeue到队列而是在消费者本地重试
- 重试达到最大次数后Spring会返回ack给MQ服务器(reject+ not re queue)消息会被丢弃
失败策略:

在之前的测试中达到最大重试次数后消息会被丢弃这是由Spring内部机制决定的。

在开启重试模式后重试次数耗尽如果消息依然失败会有MessageRecoverer接口来处理
它包含三种不同的实现
- RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息。默认就是这种方式

- ImmediateRequeueMessageRecoverer本地重试耗尽后返回nack消息重新入队重新推送消息

- RepublishMessageRecoverer【最优方法】重试耗尽后将失败消息投递到指定的交换机后续人工介入来处理


处理方案是RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列
后续由人工集中处理。
1在consumer服务中定义处理失败消息的交换机和队列
    
package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ErrorMessageConfig {
    
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    
    //TODO 指定失败处理策略
}
/**************************************************************************************************/
2定义一个RepublishMessageRecoverer关联队列和交换机
    
@Bean //非常特殊方法上有@Bean方法中所有的参数自动就有一个@Autowired
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    //最终效果将重试失败的消息重新发送到指定的交换机+路由key
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

/*********************************************************************************************************/
完整代码  
    
package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean //修改本地重试耗尽之后消息处理策略把消息发到指定的交换+key
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
1.5.总结
如何确保RabbitMQ消息的可靠性

- 开启生产者确认机制确保生产者的消息能到达交换机和队列
- 开启持久化功能确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto由spring确认消息处理成功后完成ack
- 开启消费者失败本地重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机
- 交由人工处理

2.死信交换机

2.1.认识死信
当一个队列中的消息满足下列情况之一时可以成为死信dead letter

- 消费者使用basic.reject或 basic.nack声明消费失败并且消息的requeue参数设置为false
- 消息是一个过期消息超时无人消费【利用此机制实现延迟消息】
- 要投递的队列消息满了无法投递

如果一个消息被消费者拒绝了变成了死信

在这里插入图片描述

如果这个包含死信的队列配置了`dead-letter-exchange`属性指定了一个交换机那么队列中的死信就会投递到
这个交换机中而这个交换机称为死信交换机Dead Letter Exchange简称DLX。

如果这个死信交换机也绑定了一个队列则消息最终会进入这个只存放死信的队列

在这里插入图片描述

因为simple.queue绑定了死信交换机 dl.direct并且设置了路由key因此死信最终会经过死信交换机路由给死信队列。

- 指定死信交换机名称dl.direct
- 指定死信交换机与死信队列绑定的RoutingKeydl

这样才能确保投递的消息能到达死信交换机并且正确的路由到死信队列。

> 下边代码只是为了演示对应图片中的配置不用添加到项目中
@Bean
public Queue simpleQueue(){
    //return new Queue("simple.queue");
    return QueueBuilder
        .durable("simple.queue") // 指定队列名称并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .deadLetterRoutingKey("dl") //指定路由key
        .build();
}
# 总结:

什么样的消息会成为死信

- 消息被消费者reject或者返回nack并且设置了requeue=false
- 消息超时未消费
- 队列满了

死信交换机的使用场景是什么

- 如果队列绑定了死信交换机死信会投递到死信交换机
- 可以利用死信交换机收集所有消费者处理失败的消息死信交由人工处理进一步提高消息队列的可靠性。
2.2.TTL(过期时间)
TTL也就是Time-To-Live过期时间。如果一个队列中的消息TTL结束仍未消费则会变为死信。

TTL超时分为两种情况

- 消息本身设置了超时时间
- 消息所在的队列设置了超时时间

> 思考为什么要给消息或者队列设置过期时间呢

> 目的实现延迟任务的功能

> 比如要实现如下功能
>
> - 延迟10分钟发送短信给用户ttl = 10分钟
> - 用户下单如果用户在15 分钟内未支付则自动取消
> - 预约工作会议20分钟后自动通知所有参会人员
# 创建死信交换机

在consumer服务的SpringRabbitListener中定义一个新的消费者并且声明死信交换机 dl.direct、
死信队列 dl.queue
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.direct"),
    key = "dl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.queue的延迟消息{}", msg);
}
# 声明队列指定超时时间

在consumer服务中新建TTLMessageConfig创建ttl队列

- 设置超时时间ttl(10000)
- 指定死信交换机deadLetterExchange("dl.direct")
- 指定死信的路由keydeadLetterRoutingKey("dl")
package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLMessageConfig {

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue") // 指定队列名称并持久化
                .ttl(10000) // 设置队列的超时时间10秒
                .deadLetterExchange("dl.direct") // 指定死信交换机
                .deadLetterRoutingKey("dl")
                .build();
    }
    
    /**
     * 声明交换机将ttl队列与交换机绑定
     */
    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.direct");
    }
    
    @Bean
    public Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange){
        return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("ttl");
    }	
}
在publisher服务中发送消息
    
@Test
public void testTTLQueue() {
    // 创建消息
    String message = "hello, ttl queue";
    // 消息ID需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 记录日志
    log.debug("发送消息成功");
}
注意先启动消费者再发送消息

执行完之后观察时间戳可以看到消息发送与接收之间的时差大概是10秒。
# 发送消息时设定TTL

在发送消息时也可以指定TTL
@Test
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        //setex : set expire
        .setExpiration("5000") //设置过期时间
        .build();
    // 消息ID需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("发送消息成功");
}
查看发送消息日志

在这里插入图片描述

接收消息日志

在这里插入图片描述

这次发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时任意一个到期就会成为死信。
# 总结

消息超时的两种方式是

- 给队列设置ttl属性进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息

- 给消息的目标队列指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒
2.3.延迟交换机插件
利用TTL结合死信交换机我们实现了消息发出后消费者延迟收到消息的效果。这种消息模式就称为延迟队列Delay Queue模式。

延迟队列的使用场景包括

- 延迟发送短信
- 用户下单如果用户在15 分钟内未支付则自动取消
- 预约工作会议20分钟后自动通知所有参会人员

因为延迟队列的需求非常多所以RabbitMQ的官方也推出了DelayExchange插件原生支持延迟队列效果。

参考RabbitMQ的插件列表页面https://www.rabbitmq.com/community-plugins.html

在这里插入图片描述

使用方式可以参考官网地址https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
# 使用DelayExchange

插件的使用也非常简单

- 声明一个交换机交换机的类型可以是任意类型
- 设定delayed属性为true
- 声明队列与其绑定

# 1声明DelayExchange交换机

基于注解方式【常用】

注意如果MQ容器没有安装DelayExchange插件直接指定delayed=true启动项目时会报错
@RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "delay.queue",durable = "true"),
          exchange = @Exchange(name = "delay.direct",delayed = "true"),
           key="delay"
))
public void listenDelayedQueue(String msg){
    log.info("接受到 delay.queue的延迟消息 {}"msg);
}
在consumer服务的SpringRabbitListener中添加

> 优势代码简单

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayExchange(String msg) {
    log.info("消费者接收到了delay.queue的延迟消息{}", msg);
}
第二种方式也可以基于@Bean的方式

> 优势清晰明了

在这里插入图片描述

# 2发送消息

发送消息时一定要携带x-delay属性指定延迟的时间
@Test
public void testDelayedMsg(){
    Message message = MessageBuilder
        .withBody("hello,delayed message",getBytes(StandardCharsets.UTF_8))
        .setHeader("x-delay",10000)
        .build();
    CorrelationData correlationData = new CorrelationData(UUID.random.UUID().toString());
    rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
    log.debug("发送消息成功");
}
@Test
public void testSendDelayMessage() throws InterruptedException {
    // 1.准备消息
    Message message = MessageBuilder
            .withBody("hello, delayed messsage".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setHeader("x-delay", 10000) //时间必须是数字不能是字符串
            .build();
    // 2.准备CorrelationData
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.发送消息
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

    log.info("发送消息成功");
}
此时idea控制台会有一个报错信息

在这里插入图片描述

原因很简单在之前课程中我们添加了定义发送者Return回调如果消息发送之后没有到达队列就会报错。

当使用插件发送消息时设置了x-delay=10000那消息只要没有到过期时间就不会路由到队列中
而是存在一个叫Mnesia的分布式数据库管理系统中。

在这里插入图片描述

因此需要在publisher服务的CommonConfig中判断是否为延迟消息

在这里插入图片描述

// 判断是否是延迟消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
    // 是一个延迟消息忽略这个错误提示
    return;
}
# 总结

延迟队列插件的使用步骤包括哪些

- 声明一个交换机添加delayed属性为true
- 发送消息时添加x-delay头值为超时时间必须是int值

3.惰性队列

# 消息堆积问题

   当生产者发送消息的速度超过了消费者处理消息的速度就会导致队列中的消息堆积
   直到队列存储消息达到上限。之后发送的消息就会成为死信可能会被丢弃这就是消息堆积问题。
队列满丢弃
Publisher
Queue
Consumer
死信
解决消息堆积有三种思路

- 增加更多消费者提高消费速度
- 在消费者内开启线程池多线程处理加快消息处理速度
- 惰性队列扩大队列容积提高堆积上限
从RabbitMQ的3.6.0版本开始就增加了Lazy Queues的概念也就是惰性队列。惰性队列的特征如下

- 接收到消息后直接存入磁盘而非内存缺点速度会变慢
- 消费者要消费消息时才会从磁盘中读取并加载到内存最终推送给消费者
- 支持数百万条的消息存储
3.1.基于命令行设置lazy-queue
(本操作是Linux操作系统进行的)

设置一个队列为惰性队列只需要在声明队列时指定x-queue-mode属性为lazy即可。

可以通过命令行将一个运行中的队列修改为惰性队列


#进入MQ容器
docker exec -it mq bash

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
命令解读

- `rabbitmqctl` RabbitMQ的命令行工具
- `set_policy` 添加一个策略
- `Lazy` 策略名称可以自定义
- `"^lazy-queue$"` 用正则表达式匹配队列的名字
- `'{"queue-mode":"lazy"}'` 设置队列模式为lazy模式
- `--apply-to queues  `策略的作用对象是所有的队列
3.2.@Bean声明lazy-queue

@Bean
public Queue lazyQueue(){
    return QueueBuilder
          .durable("lazy.queue")
          .lazy()//开启x-queue-mode为lazy
          .build();
}
package cn.itcast.mq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class LazyConfig {

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder
            	.durable("lazy.queue")
                .lazy() //指定是惰性队列
                .build();
    }
    
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal.queue").build();
    }
}
重启cousumer服务确认已经创建了以上两个队列

在这里插入图片描述

3.3.注解声明LazyQueue
此处没有给队列绑定交换机因此使用的是queuesToDeclare = ?而不是bindings = 
@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listLazyQueue(String msg) {
    log.info("接收到 lazy.queue 的消息{}", msg);
}
3.4.测试
先把cousumer服务停掉不然发送的消息都被消费掉了无法观察效果
1、在publisher服务的SpringAmqpTest中发送消息到惰性队列
@Test
public void testLazyQueue() throws InterruptedException {
    long b = System.currentTimeMillis();
    for (int i = 0; i < 1000; i++) {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("lazy.queue", message);
    }
    long e = System.currentTimeMillis();
    System.out.println(e - b);
}
发现消息都在磁盘中

在这里插入图片描述

2、也可以发送到普通队列做为对比
@Test
public void testNormalQueue() throws InterruptedException {
    long b = System.currentTimeMillis();
    for (int i = 0; i < 1000; i++) {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("normal.queue", message);
    }
    long e = System.currentTimeMillis();
    System.out.println(e - b);
}
发现消息都在内存中

在这里插入图片描述

# 总结

消息堆积问题的解决方案

- 队列上绑定多个消费者提高消费速度
- 在消费者内开启线程池多线程处理加快消息处理速度
- 使用惰性队列可以再mq中保存更多消息

惰性队列的优点有哪些

- 基于磁盘存储消息上限高
- 没有间歇性的page-out性能比较稳定

惰性队列的缺点有哪些

- 基于磁盘存储消息时效性会降低
- 性能受限于磁盘的IO

4.MQ集群

4.1.集群分类
RabbitMQ的是基于Erlang语言编写而Erlang又是一个面向并发的语言天然支持集群模式。

RabbitMQ的集群有两种模式

- 普通集群是一种分布式集群将队列分散到集群的各个节点从而提高整个集群的并发能力。
- 镜像集群是一种主从集群在普通集群的基础上添加了主从备份功能提高集群的数据可用性。

镜像集群虽然支持主从但主从同步并不是强一致的某些情况下可能有数据丢失的风险。

因此在RabbitMQ的3.8版本以后推出了新的功能仲裁队列来代替镜像集群底层采用Raft协议确保
主从的数据一致性。
4.2.普通集群
# 集群结构和特征

普通集群或者叫标准集群classic cluster具备下列特征

- 会在集群的各个节点间共享部分数据包括交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时如果队列不在该节点会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机队列中的消息就会丢失

结构如图

在这里插入图片描述

4.3.镜像集群
# 集群结构和特征

镜像集群本质是主从模式具备下面的特征

- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的**主节点**备份到的其它节点叫做该队列的**镜像**节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成然后同步给镜像节点
- 主宕机后镜像节点会替代成新的主

结构如图

在这里插入图片描述

4.4.仲裁队列
# 集群特征

镜像集群虽然支持主从但主从同步并不是强一致的某些情况下可能有数据丢失的风险。

仲裁队列仲裁队列是3.8版本以后才有的新功能用来替代镜像队列底层采用Raft协议确保主从的数据一致性
具备下列特征

- 与镜像队列一样都是主从模式支持主从数据同步
- 使用非常简单没有复杂的配置
- 主从同步基于Raft协议强一致
Java代码创建仲裁队列
    
@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue2") // 持久化
        //.layzy() //惰性队列
        .quorum() // 仲裁队列
        .build();
}
# SpringAMQP连接MQ集群

注意这里用address来代替host、port方式
spring:
  rabbitmq:
	#host: 192.168.200.130
    #port: 5672
    addresses: 192.168.200.130:8071, 192.168.200.130:8072, 192.168.200.130:8073
    username: itcast
    password: 123321
    virtual-host: /
注意因为重新创建的3MQ集群还没有安装延迟队列插件因此原来练习延迟队列的代码需要注释掉
1、创建交换机时
    
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayExchange(String msg) {
    log.info("消费者接收到了delay.queue的延迟消息{}", msg);
}
/******************************************************************************************/
2、发送消息时
    
@Test
public void testSendDelayMessage() throws InterruptedException {
    // 1.准备消息
    Message message = MessageBuilder
            .withBody("hello, delayed messsage".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setHeader("x-delay", 10000)
            .build();
    // 2.准备CorrelationData
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.发送消息
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

    log.info("发送消息成功");
}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ