【RabbitMQ】高级篇,学习纪录+笔记

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

目录

一.高级特性

1.1消息的可靠投递

2.1Consumer Ack

3.1消费端限流

4.1TTL

5.1死信队列

6.1延迟队列

7.1日志与监控

7.1.1日志

7.1.2监控

8.1消息追踪

8.1.1Firehose

8.1.2rabbitmq_tracing

9.1消息可靠性保障思路

9.2消息幂等性保障思路


一.高级特性

1.1消息的可靠投递

在使用 RabbitMQ 的时候作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提 供了两种方式用来控制消息的投递可靠性模式。

①confirm确认模式

②return退回模式

rabbitmq 整个消息投递的路径为 producer--->rabbitmqbroker--->exchange--->queue--->consumer 

消息从 producer 到 exchange 则会返回一个 confirmCallback 。 

消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

接基础篇在生产者中RabttitMQConfig中配置

    public static final String CONFIRM_EXCHANGE_NAME = "test_exchange_confirm";
    public static final String CONFIRM_QUEUE_NAME = "test_queue_confirm";
    @Bean("confirmExchange")
    public Exchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding bindConfirmExchangeQueue(@Qualifier("confirmExchange") Exchange exchange,
                                            @Qualifier("confirmQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
    }

在yml中开启

#配置RabbitMQ的基本信息
spring:
  rabbitmq:
    host: ip
    username: admin
    password: admin
    port: 5672
    virtual-host: /
    publisher-confirms: true //开启确认模式这个是老版本的写法已经废弃了新版本在后面加type
    publisher-returns: true //开启退回模式

测试类

/*
    *
    * 确认模式步骤
    * 1.确认模式开启在yml中配置
    * 2.在rabbitTemplate定义ConfirmCallback回调函数
    *
    * */
@Test
    public void testConfirm(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关的配置信息
             * @param ack exchange交换机 是否成功收到了消息true成功false失败
             * @param cause 如果ack为false这是失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    //接收成功
                    System.out.println("接收成功消息"+cause);
                }else {
                    //接收失败
                    System.out.println("接收失败消息"+cause);
                    //做一些处理,再次发送

                }
                System.out.println("Confirm方法被执行……");
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME,"confirm","message..confirm...");
    }

以上配置方法也可以写在配置类中

回退方式测试类

/**
     * 回退模式当消息发送给Exchange后Exchange路由到Queue失败时才会执行ReturnCallback
     * 1.开启回退模式
     * 2.设置ReturnCallback
     * 3.设置Exchange处理消息的模式
     *   消息没有路由到Queue丢弃消息
     *   如果消息没有路由到Queue返回给消息发送方
     */
    @Test
    public void testCallback() throws InterruptedException {
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingKey) {
                System.out.println("return执行了……");
                System.out.println(message);
                System.out.println(replycode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME,"confirm","message..confirm...");
        Thread.sleep(3000);
    }

消费者确认再继续看下面

2.1Consumer Ack

ack指Acknowledge确认。 表示消费端收到消息后的确认方式。

有三种确认方式 自动确认acknowledge="none" •

手动确认acknowledge="manual" •

根据异常情况确认acknowledge="auto"这种方式使用麻烦不作讲解

其中自动确认是指当消息一旦被Consumer接收到则自动确认收到并将相应 message 从 RabbitMQ 的 消息缓存中移除。但是在实际业务处理中很可能消息接收到业务处理出现异常那么该消息就会丢失。

如果设置了手动确认方式则需要在业务处理成功后调用channel.basicAck()手动签收如果出现异常则 调用channel.basicNack()方法让其自动重新发送消息

在消费者工程中新建监听者

/**
 * Counsumer ACK
 * 1.设置手动签收 在yml
 * 2.让监听器类实现ChannelAwareMessageListener
 * 3.如果消息成功处理调用channel的basicAck进行签收
 * 4.basicNack拒收 broker重新发送给counsumer
 */
@Component
public class ACKListener  {

    @RabbitListener(queues = "test_queue_confirm")
    public void ListenerQueue2(Message message,Channel channel) throws IOException {
        System.out.println("test_queue_confirm----->"+message);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            Thread.sleep(3000);
            int i = 10/0;
            /**
             * long deliveryTag,收到消息的标识
             * boolean multiple,是否签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //拒绝签收 第三个参数设置为true消息重新回到queuebroker重新发送该消息给消费端
            // channel.basicNack(deliveryTag,true,true);

            // 如果真得出现了异常我们采用消息重投,获取redelivered判断是否为重投: false没有重投true重投
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            System.out.println("redelivered = " + redelivered);
            try {
                // (已重投)拒绝确认
                if (redelivered) {
                    /**
                     * 拒绝确认从队列中删除该消息防止队列阻塞(消息堆积)
                     * boolean requeue: false不重新入队列(丢弃消息)
                     */
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println();
                } else { // (没有重投) 消息重投
                    /**
                     * 消息重投重新把消息放回队列中
                     * boolean multiple: 单条或批量
                     * boolean requeue: true重回队列
                     */
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    System.out.println("=========消息重投了=======");
                }
            } catch (Exception ee) {
                ee.printStackTrace();
            }

        }
    }


}

配置yml

#配置RabbitMQ的基本信息
spring:
  rabbitmq:
    host: ip
    username: admin
    password: admin
    port: 5672
    virtual-host: /
    listener:
      simple:
        # 并发消费每个侦听器线程的最小数量具体数值根据系统性能配置一般为系统cpu核数
        concurrency: 2
        # 并发消费每个侦听器线程的最大数量具体数值根据系统性能配置一般为系统cpu核数*2
        max-concurrency: 4
        # 每次只能获取一条消息处理完成才能获取下一个消息,避免照成消息堆积在一个消费线程上
        prefetch: 1
        #acknowledge-mode: manual         # 消费者开启手动ack消息确认需要测试请看示例请AckConsumer,所有队列都会生效
        #default-requeue-rejected: false  # 设置为false会重发消息到死信队列防止手动ack确认失败的消息堆积需要测试请示例AckConsumer,所有队列都会生效
        retry:
          enabled: true                   # 解决消息死循环问题-启用重试
          max-attempts: 3                 # 最大重试3次(默认)超过就丢失或放到死信队列中防止消息堆积
          multiplier: 2                   # 乘子
          initial-interval: 3000          # 第一次和第二次之间的重试间隔后面的用乘子计算 3s 6s 12s
          max-interval: 16000             # 最大重试时间间隔16s
      direct:
        acknowledge-mode: manual
        default-requeue-rejected: false

小结

在rabbit:listener-container标签中设置acknowledge属性设置ack方式 none自动确认manual手动确认 

如果在消费端没有出现异常则调用channel.basicAck(deliveryTag,false);方法确认签收消息 

如果出现异常则在catch中调用 basicNack或 basicReject拒绝消息让MQ重新发送消息。

持久化 exchange要持久化

queue要持久化

message要持久化

生产方确认Confirm

消费方确认Ack

Broker高可用

3.1消费端限流

在yml中配置 prefetch属性设置消费端一次拉取多少消息 

prefetch: 1

消费端的确认模式一定为手动确认。

acknowledge-mode: manual

详细配置见上面消费者的yml

4.1TTL

TTL 全称 Time To Live存活时间/过期时间。 

当消息到达存活时间后还没有被消费会被自动清除。 

RabbitMQ可以对消息设置过期时间也可以对整个队列Queue设置过期时间。

可以直接在RabbitMQ的web端直接对队列进行设置

也可以通过java代码对其进行设置具体代码见后面死信队列与延迟队列

小结

设置队列过期时间使用参数x-message-ttl单位ms(毫秒)会对整个队列消息统一过期。 

设置消息过期时间使用参数expiration。单位ms(毫秒)当该消息在队列头部时消费时会单独判断 这一消息是否过期。 

如果两者都进行了设置以时间短的为准。

5.1死信队列

死信队列英文缩写DLX 。Dead Letter Exchange死信交换机当消息成为Dead message后可以 被重新发送到另一个交换机这个交换机就是DLX

 

消息成为死信的三种情况

1. 队列消息长度到达限制

2. 消费者拒接消费消息basicNack/basicReject,并且不把消息重新放入原目标列,requeue=false

3. 原队列存在消息过期设置消息到达超时时间未被消费 

 

正常队列绑定死信交换机 给队列设置参数 x-dead-letter-exchange 和 x-dead-letter-routing-key

 

生产者

RabbitMQConfig中配置一对正常交换机队列、一堆死信交换机队列、正常队列与死信交换机绑定

 //死信
    //先声明正常的交换机(test_exchange_dlx)和队列(test_queue_dlx)
    //声明死信交换机(exchange_dlx)和队列(queue_dlx)
    //正常队列绑定死信交换机
    //设置两个参数
    //x-dead-letter-exchange 死信交换机名称
    //x-dead-letter-routing-key 发送给死信交换机的routing key
    @Bean("test_exchange_dlx")
    public Exchange test_exchange_dlx(){
        return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();
    }

    @Bean("test_queue_dlx")
    public Queue test_queue_dlx(){
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","exchange_dlx");
        map.put("x-dead-letter-routing-key","dlx.hehe");
        map.put("x-message-ttl",10000);
        map.put("x-max-length",10);
        return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();
    }

    @Bean
    public Binding dlxCommonBinding(@Qualifier("test_exchange_dlx") Exchange exchange,
                              @Qualifier("test_queue_dlx") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
    }

    @Bean("exchange_dlx")
    public Exchange exchange_dlx(){
        return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();
    }

    @Bean("queue_dlx")
    public Queue queue_dlx(){
        return QueueBuilder.durable("queue_dlx").build();
    }

    @Bean
    public Binding dlxBinding(@Qualifier("exchange_dlx") Exchange exchange,
                              @Qualifier("queue_dlx") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }

测试类

    /**
     * 发送测试死信消息
     * 1.过期时间
     * 2.长度限制
     * 3.消息拒收
     */
    @Test
    public void testDlx(){
        //1.过期时间
//        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息我会死亡");

        //2.长度限制
//        for (int i = 1; i <= 20 ; i++) {
//            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息我会死亡");
//        }

        //3.消息拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息我会死亡");

    }

消费者监听监听正常队列

@Component
public class DLXListener {

    @RabbitListener(queues = "test_queue_dlx")
    public void ListenerQueue3(Message message,Channel channel) throws IOException {
        System.out.println("test_queue_dlx----->"+message);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            Thread.sleep(3000);
            int i = 10/0;
            /**
             * long deliveryTag,收到消息的标识
             * boolean multiple,是否签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //拒绝签收 第三个参数设置为true消息重新回到queuebroker重新发送该消息给消费端
            // channel.basicNack(deliveryTag,true,true);
            // 如果真得出现了异常我们采用消息重投,获取redelivered判断是否为重投: false没有重投true重投
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            System.out.println("redelivered = " + redelivered);
            try {
                // (已重投)拒绝确认
                if (redelivered) {
                    System.out.println("拒绝了不重投");
                    /**
                     * 拒绝确认从队列中删除该消息防止队列阻塞(消息堆积)
                     * boolean requeue: false不重新入队列(丢弃消息)
                     */
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println();
                } else { // (没有重投) 消息重投
                    /**
                     * 消息重投重新把消息放回队列中
                     * boolean multiple: 单条或批量
                     * boolean requeue: true重回队列
                     */
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    System.out.println("=========消息拒绝接收重投了=======");
                }
            } catch (Exception ee) {
                ee.printStackTrace();
            }

        }
    }


}

观察死信队列中消息数量变化

6.1延迟队列

延迟队列即消息进入队列后不会立即被消费只有到达指定时间后才会被消费。

需求

1. 下单后30分钟未支付取消订单回滚库存。

2. 新用户注册成功7天后发送短信问候。

实现方式 1. 定时器 2. 延迟队列

定时器对于性能消耗很大所以采取延迟队列

由于RabbitMQ没有直接的延迟队列故而采取ttl+死信队列来实现

生产者

RabbitMQConfig中配置

//延迟
    //先声明正常的交换机(order_exchange)和队列(order_queue)
    //声明死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
    //正常队列绑定死信交换机
    //设置两个参数
    //x-dead-letter-exchange 死信交换机名称
    //x-dead-letter-routing-key 发送给死信交换机的routing key
    @Bean("order_exchange")
    public Exchange order_exchange(){
        return ExchangeBuilder.topicExchange("order_exchange").durable(true).build();
    }

    @Bean("order_queue")
    public Queue order_queue(){
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","order_exchange_dlx");
        map.put("x-dead-letter-routing-key","dlx.order.cancel");
        map.put("x-message-ttl",10000);
        map.put("x-max-length",10);
        return QueueBuilder.durable("order_queue").withArguments(map).build();
    }

    @Bean
    public Binding yanchiCommonBinding(@Qualifier("order_exchange") Exchange exchange,
                                    @Qualifier("order_queue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

    @Bean("order_exchange_dlx")
    public Exchange order_exchange_dlx(){
        return ExchangeBuilder.topicExchange("order_exchange_dlx").durable(true).build();
    }

    @Bean("order_queue_dlx")
    public Queue order_queue_dlx(){
        return QueueBuilder.durable("order_queue_dlx").build();
    }

    @Bean
    public Binding yanchiBinding(@Qualifier("order_exchange_dlx") Exchange exchange,
                              @Qualifier("order_queue_dlx") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
    }

测试类

@Test
    public void testDelay() throws InterruptedException {
        //发送订单消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息 id1");
        //打印倒计时10s
        for (int i = 0; i < 10; i++) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }

    }

消费者监听监听的是死信队列

@Component
public class OrderListener {

    @RabbitListener(queues = "order_queue_dlx")
    public void ListenerQueue4(Message message,Channel channel) throws IOException {
        System.out.println("order_queue_dlx----->"+message);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            Thread.sleep(3000);
            System.out.println("根据订单id查询状态");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单回滚……");
            /**
             * long deliveryTag,收到消息的标识
             * boolean multiple,是否签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //拒绝签收 第三个参数设置为true消息重新回到queuebroker重新发送该消息给消费端
            // channel.basicNack(deliveryTag,true,true);
            // 如果真得出现了异常我们采用消息重投,获取redelivered判断是否为重投: false没有重投true重投
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            System.out.println("redelivered = " + redelivered);
            try {
                // (已重投)拒绝确认
                if (redelivered) {
                    System.out.println("拒绝了不重投");
                    /**
                     * 拒绝确认从队列中删除该消息防止队列阻塞(消息堆积)
                     * boolean requeue: false不重新入队列(丢弃消息)
                     */
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println();
                } else { // (没有重投) 消息重投
                    /**
                     * 消息重投重新把消息放回队列中
                     * boolean multiple: 单条或批量
                     * boolean requeue: true重回队列
                     */
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    System.out.println("=========消息拒绝接收重投了=======");
                }
            } catch (Exception ee) {
                ee.printStackTrace();
            }

        }
    }


}

小结

1.延迟队列 指消息进入队列后可以被延迟一定时间再进行消费。

2. RabbitMQ没有提供延迟队列功能但是可以使用 TTL + DLX 来实现延迟队列效果。

7.1日志与监控

7.1.1日志

RabbitMQ默认日志存放路径 /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、 RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等

7.1.2监控

web管控台监控

rabbitmqctl管理和监控

8.1消息追踪

在使用任何消息中间件的过程中难免会出现某条消息异常丢失的情况。对于RabbitMQ而言可能 是因为生产者或消费者与RabbitMQ断开了连接而它们与RabbitMQ又采用了不同的确认机制也 有可能是因为交换器与队列之间不同的转发策略甚至是交换器并没有与任何队列进行绑定生产者又不感知或者没有采取相应的措施另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehoserabbitmq_tracing插件功能来实现消息追踪

8.1.1Firehose

firehose的机制是将生产者投递给rabbitmq的消息rabbitmq投递给消费者的消息按照指定的格式 发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace它是一个topic类 型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称分别对应生产者投递到exchange的消息和消费者从queue上获取的消息。

注意打开 trace 会影响消息写入功能适当打开后请关闭。

rabbitmqctl trace_on开启Firehose命令

rabbitmqctl trace_off关闭Firehose命令

步骤

1.创建一个新的队列然后将这个队列绑定到默认交换机上Routing Key写#表示所有消息都监听

2.然后在这个新的队列发一条消息然后再查发现只有一条

3.然后开启rabbitmqctl trace_on

4.再发一条消息再查发现有额外的消息

8.1.2rabbitmq_tracing

rabbitmq_tracing和Firehose在实现上如出一辙只不过rabbitmq_tracing的方式比Firehose多了一 层GUI的包装更容易使用和管理。

启用插件rabbitmq-plugins enable rabbitmq_tracing

然后即可在web端右侧看到Tracing点进去即可查看与操作可以创建一个新的Pattern写#

9.1消息可靠性保障思路

由于比较复杂只提供一个思路思路多种多样这个不一定好看看就好

 

9.2消息幂等性保障思路

幂等性指一次和多次请求某一个资源对于资源本身应该具有同样的结果。

也就是说其任 意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指消费多条相同的消息得到与消费该消息一次相同的结果。

比如发送两条相同的扣款信息不能扣两次款。

采取乐观锁机制。

 

本文只是本人在学习复习过程中的笔记有什么不足之处请指教。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ