springBoot使用rabbitmq并保证消息可靠性

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、理论说明

1.1、数据的丢失问题可能出现在生产者、MQ、消费者中

1、如下图
在这里插入图片描述

1.2、生产者弄丢了数据

1、生产者将数据发送到 RabbitMQ 的时候可能数据就在半路给搞丢了因为网络问题啥的都有可能。此时可以选择用RabbitMQ 提供的事务功能就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect 然后发送消息如果消息没有成功RabbitMQ 接收到那么生产者会收到异常报错此时就可以回滚事务 channel.txRollback 然后重试发送消息;如果收到了消息那么可以提交事务 channel.txCommit
在这里插入图片描述
2、但是问题是RabbitMQ 事务机制同步一搞基本上吞吐量会下来因为太耗性能。所以一般来说如果你要确保说 RabbitMQ 的消息别丢可以开启 confirm 模式在生产者那里设置开启 confirm 模式之后你每次写的消息都会分配一个唯一的 id然后如果写入了 RabbitMQ 中RabbitMQ 会给你回传一个 ack 消息告诉你说这个消息 ok 了。

3、如果RabbitMQ 没能处理这个消息会回调你的一个 nack 接口告诉你这个消息接收失败你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态如果超过一定时间还没接收到这个消息的回调那么你可以重发。

4、事务机制和 confirm 机制最大的不同在于事务机制是同步的你提交一个事务之后会阻塞在那儿但是 confirm 机制是异步的你发送个消息之后就可以发送下一个消息然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失都是用 confirm 机制的。

1.3、RabbitMQ 弄丢了数据

1、就是 RabbitMQ 自己弄丢了数据这个你必须开启 RabbitMQ 的持久化就是消息写入之后会持久化到磁盘哪怕RabbitMQ 自己挂了恢复之后会自动读取之前存储的数据一般数据不会丢。除非极其罕见的是RabbitMQ 还没持久化自己就挂了可能导致少量数据丢失但是这个概率较小。

设置持久化有两个步骤:

  • 创建 queue 的时候将其设置为持久化

这样就可以保证 RabbitMQ 持久化 queue 的元数据但是它是不会持久化 queue 里的数据的。

  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2

就是将消息设置为持久化此时 RabbitMQ 就会将消息持久化到磁盘上去。

2、必须要同时设置这两个持久化才行RabbitMQ 哪怕是挂了再次重启也会从磁盘上重启恢复queue恢复这个 queue 里的数据。注意哪怕是你给 RabbitMQ 开启了持久化机制也有一种可能就是这个消息写到了RabbitMQ 中但是还没来得及持久化到磁盘上结果不巧此时 RabbitMQ 挂了就会导致内存里的一点点数据丢失。所以持久化可以跟生产者那边的 confirm 机制配合起来只有消息被持久化到磁盘之后才会通知生产者 ack 了所以哪怕是在持久化到磁盘之前RabbitMQ 挂了数据丢了生产者收不到 ack 你也是可以自己重发的

1.4、消费端弄丢了数据

RabbitMQ 如果丢失了数据主要是因为你消费的时候刚消费到自己还没处理结果自己的进程挂了比如重启了那么就尴尬了RabbitMQ 认为你都消费了这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制简单来说就是你必须关闭 RabbitMQ 的自动 ack 可以通过一个 api 来调用就行然后每次你自己代码里确保处理完的时候再在程序里 ack 一把。这样的话如果你还没处理完不就没有 ack 了?那 RabbitMQ 就认为你还没处理完这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理消息是不会丢的。

1.5、总结

在这里插入图片描述

二、结合springBoot并保证消息可靠性

1、模拟业务我们希望会员服务在多个实例的情况下每个实例只需要收到一次消息

在这里插入图片描述

2、项目结构
在这里插入图片描述

3、生产者和消费者引入pom依赖

		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

4、生产者和消费者配置rabbitmq信息

spring:
  # rabbitmq 配置信息
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

2.1、保证生产者到rabbitmq阶段消息投递的安全

1、RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

2、我们先来了解一下 rabbitmq 整个消息投递的路径如下:

producer—>rabbitmq broker—>exchange—>queue—>consumer

3、消息从 producer 到 exchange 则会返回一个 confirmCallback 。

4、消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

2.1.1、【生产者】开启confirm和return 配置

1、在配置文件中直接开启即可

spring:
  # rabbitmq 配置信息
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #1、确保消息从发送端到服务端投递可靠分为以下两个步骤
    #1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为  publisher-confirm-type: correlate
    publisher-confirm-type: correlated
    #1.2、确认消息从交换机中到队列中
    publisher-returns: true

在这里插入图片描述

2.1.2、【生产者】配置交换器队列以及routingKey三件套

1、生产者项目结构

在这里插入图片描述

2、DirectRabbitConfig 这里根据前面的需求我们创建Direct类型的交换器即可至于其他类型的交换器以及区别后面有说明

package cn.gxm.producer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author GXM
 * @version 1.0.0
 * @Description 创建direct类型的交换机
 * @createTime 2023年01月03日
 */
@Slf4j
@Configuration
public class DirectRabbitConfig {


    private static final String QUEUE = "TestDirectQueue";
    private static final String EXCHANGE = "TestDirectExchange";
    private static final String ROUTING_KEY = "TestDirectRouting";

    /**
     * 创建一个名为TestDirectQueue的队列
     *
     * @return
     */
    @Bean
    public Queue testDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上当消息代理重启时仍然存在暂存队列:当前连接有效
        // exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除有消息者订阅本队列然后所有消费者都解除订阅此队列会自动删除。
        // arguments:队列携带的参数比如设置队列的死信队列消息的过期时间等等。
        return new Queue(QUEUE, true);
    }

    /**
     * 创建一个名为TestDirectExchange的Direct类型的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange testDirectExchange() {
        // durable:是否持久化,默认是false,持久化交换机。
        // autoDelete:是否自动删除交换机先有队列或者其他交换机绑定的时候然后当该交换机没有队列或其他交换机绑定的时候会自动删除。
        // arguments:交换机设置的参数比如设置交换机的备用交换机Alternate Exchange当消息不能被路由到该交换机绑定的队列上时会自动路由到备用交换机
        return new DirectExchange(EXCHANGE, true, false);
    }

    /**
     * 绑定交换机和队列
     *
     * @return
     */
    @Bean
    public Binding bindingDirect() {
        //bind队列to交换机中with路由keyrouting key
        return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY);
    }
}

3、RabbitTemplate配置配置这里有一个点非常重要

  • rabbitTemplate.setMandatory(true);必须得设置成true否则无法回调ReturnsCallback
  • Mandatory:为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发MessageReturn而为false时,匹配不到会直接被丢弃
package cn.gxm.producer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        //设置消息投递失败的策略有两种策略:自动删除或返回到客户端。
        //我们既然要做可靠性当然是设置为返回到客户端(true是返回客户端false是自动删除)
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
                } else {
                    log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{}原因:{}", correlationData, ack, cause);
                }
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.info("ReturnsCallback 消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}"
                        , returnedMessage.getMessage(), returnedMessage.getReplyCode()
                        , returnedMessage.getReplyText(), returnedMessage.getExchange()
                        , returnedMessage.getRoutingKey());
            }
        });

        return rabbitTemplate;
    }
}

4、写一个TestController用作消息发送一共发送5条消息其中一条触发confirmCallback的失败一条触发returnCallback但是五条都会触发confirmCallback他们的关系是并行的.

package cn.gxm.producer.controller;

import cn.gxm.producer.vo.User;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年01月03日
 */
@RestController
public class TestController {

    @Autowired
    RabbitTemplate rabbitTemplate;


    @GetMapping("/test")
    public String test() {
        return "producer ok";
    }

    @GetMapping("/push")
    public String push() {
        for (int i = 1; i <= 5; i++) {
            //这个参数是用来做消息的唯一标识
            //发布消息时使用存储在消息的headers中
            User user = new User(i, "汪涵");
            // 关联的数据可以用在消息投递失败的时候作为一个线索比如我把当前用户的id放进去如果user消息投递失败
            // 我后面可以根据id再找到user再次投递数据
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().concat("-") + i);
            if (i == 2) {
                //故意把交换机写错演示 confirmCallback
                rabbitTemplate.convertAndSend("TestDirectExchange_111", "TestDirectRouting",
                        JSON.toJSONString(user), correlationData);
            } else if (i == 3) {
                //故意把路由键写错演示 returnCallback
                rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting_111",
                        JSON.toJSONString(user), correlationData);
            } else {
                //正常发送
                rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",
                        JSON.toJSONString(user), correlationData);
            }
        }
        return "producer push ok";
    }
}

5、开始请求测试打印日志如下你会发现和测试controller的注释写的一样这里就不再多说了。

2023-01-03 16:34:14.912  INFO 27080 --- [nio-6072-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-01-03 16:34:14.912  INFO 27080 --- [nio-6072-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-01-03 16:34:14.913  INFO 27080 --- [nio-6072-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2023-01-03 16:34:14.975  INFO 27080 --- [nio-6072-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
2023-01-03 16:34:14.992  INFO 27080 --- [nio-6072-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#5f95f1e1:0/SimpleConnection@12b7544d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63178]
2023-01-03 16:34:15.020 ERROR 27080 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchange_111' in vhost '/', class-id=60, method-id=40)
2023-01-03 16:34:15.022  INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig      : ConfirmCallback 关联数据:CorrelationData [id=0ba3b21e-e4fc-44cf-84d7-17461c8c9c18-1],投递成功,确认情况:true
2023-01-03 16:34:15.022  INFO 27080 --- [nectionFactory3] cn.gxm.producer.config.RabbitConfig      : ConfirmCallback 关联数据:CorrelationData [id=54c62476-6847-4193-8f11-2a3efd90c284-2],投递失败,确认情况:false原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchange_111' in vhost '/', class-id=60, method-id=40)
2023-01-03 16:34:15.023  INFO 27080 --- [nectionFactory3] cn.gxm.producer.config.RabbitConfig      : ReturnsCallback 消息:(Body:'{"id":3,"name":"汪涵"}' MessageProperties [headers={spring_returned_message_correlation=d91bb792-b6b0-4bea-a909-6eb8d0997b74-3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),回应码:312,回应信息:NO_ROUTE,交换机:TestDirectExchange,路由键:TestDirectRouting_111
2023-01-03 16:34:15.024  INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig      : ConfirmCallback 关联数据:CorrelationData [id=d91bb792-b6b0-4bea-a909-6eb8d0997b74-3],投递成功,确认情况:true
2023-01-03 16:34:15.025  INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig      : ConfirmCallback 关联数据:CorrelationData [id=70852973-9d21-49a4-aa07-67ca8f11aed8-4],投递成功,确认情况:true
2023-01-03 16:34:15.026  INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig      : ConfirmCallback 关联数据:CorrelationData [id=9dc23013-5b97-47b4-8ebf-3f842615c2f7-5],投递成功,确认情况:true

6、因为我们此刻是没有启动消费者的所以在控制台是能看到三条数据的id为2和id为3的数据是有问题的都分别通过confirmCallbackreturnCallback来进行回调了则具体的补偿措施可以根据自身的业务来处理。

在这里插入图片描述

2.2、保证rabbitmq阶段消息存储的安全

1、这个阶段我们只要保证队列交换机以及发送数据的持久化就🆗。

  • 队列持久化第二个持久化的参数为true即可即使rabbitmq重启这个队列还是存在的
/**
     * 创建一个名为TestDirectQueue的队列
     *
     * @return
     */
    @Bean
    public Queue testDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上当消息代理重启时仍然存在暂存队列:当前连接有效
        // exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除有消息者订阅本队列然后所有消费者都解除订阅此队列会自动删除。
        // arguments:队列携带的参数比如设置队列的死信队列消息的过期时间等等。
        return new Queue(QUEUE, true);
    }
  • 交换机持久化第二个持久化的参数为true即可即使rabbitmq重启这个交换机还是存在的
/**
     * 创建一个名为TestDirectExchange的Direct类型的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange testDirectExchange() {
        // durable:是否持久化,默认是false,持久化交换机。
        // autoDelete:是否自动删除交换机先有队列或者其他交换机绑定的时候然后当该交换机没有队列或其他交换机绑定的时候会自动删除。
        // arguments:交换机设置的参数比如设置交换机的备用交换机Alternate Exchange当消息不能被路由到该交换机绑定的队列上时会自动路由到备用交换机
        return new DirectExchange(EXCHANGE, true, false);
    }
  • 数据持久化如果你使用原生方式设置deliveryMode参数为2即可
//消息持久化测试
Builder builder = new Builder();
builder.deliveryMode(2);
BasicProperties properties = builder.build();
channel.basicPublish("", queue_name, properties, string.getBytes());

其中针对BasicProperties中的源码信息为:

public static class BasicProperties extends
com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType;//消息类型如:text/plain
        private String contentEncoding;//编码
        private Map<String,Object> headers;
        private Integer deliveryMode;//1:nonpersistent 不持久 2:persistent 持久
        private Integer priority;//优先级
        private String correlationId;
        private String replyTo;//反馈队列
        private String expiration;//expiration到期时间
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;
...

而我们如果使用springBoot的RabbitTemplate则默认会进行数据持久化具体springBoot的持久化封装可以查看文章 Springboot 2.x ——RabbitTemplate为什么会默认消息持久化? 所以如下发送数据会默认执行数据持久化

rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",
                        JSON.toJSONString(user), correlationData);

2.3、保证消费端消费数据

1、根据前面的分析我们在消费端只需要手动ACK即可这样就能保证消费端的数据可靠性了

2、消息确认模式有:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

2.3.1、【消费者】开启手动ack模式

  • 默认情况下消息消费者是自动 ack 确认消息的如果要手动 ack确认则需要修改确认模式为 manual

如果设置了自动应答又进行手动应答会出现 double ack那么程序会报错。
注意这里使用的是simplet的其实还有direct的simple主要包括两种工作模式direct主要包括四种根据你的使用模式来选择配置即可

在这里插入图片描述

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
  • 或在 RabbitListenerContainerFactory 中进行开启手动 ack
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
    return factory;
}
  • 最后可以手动确认消息以下两种方式都可以
@RabbitHandler
public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    System.out.println(message);
    try {
        channel.basicAck(tag,false);            // 确认消息
    } catch (IOException e) {
        e.printStackTrace();
    }
}
@RabbitListener(queues = "TestDirectQueue")
@Component
@Slf4j
public class DirectConsumer {

    @RabbitHandler
    public void process(Object data, Channel channel, Message message) throws IOException {
        log.info("消费者接受到的消息是:{},消息体为:{}", data, message);
        //由于配置设置了手动应答所以这里要进行一个手动应答。注意:如果设置了自动应答这里又进行手动应答会出现double ack那么程序会报错。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
  • 需要注意的 basicAck 方法需要传递两个参数

    • deliveryTag唯一标识 ID:当一个消费者向 RabbitMQ 注册后会建立起一个 Channel RabbitMQ 会用 basic.deliver 方法向消费者推送消息这个方法携带了一个 delivery tag 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID是一个单调递增的正整数delivery tag 的范围仅限于 Channel

    • multiple:为了减少网络流量手动确认可以被批处理当该参数为 true 时则可以一次性确认 delivery_tag 小于等于传入值的所有消息

2.3.2、【消费者】开启ack后拒绝消息 nack

1、以下示例表示了如果header中没有error字段我们则否认消息不接受消息而一旦该消息被 nack 后则该消息会一直重新入队列然后一直重新消费所以这个很重要我们最好设置重试的次数不然消息一直nack一直重复消费如果消息一直在累加则会越来越多的无法消费的数据最终会拖垮rabbitmq。

也可以拒绝该消息消息会被丢弃不会重回队列,这样就会避免上述拖垮rabbitmq的情况出现
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //拒绝消息

@RabbitHandler
public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
    System.out.println(message);
    if (map.get("error")!= null){
        System.out.println("错误的消息");
        try {
            channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否认消息
            return;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    try {
        channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //确认消息
    } catch (IOException e) {
        e.printStackTrace();
    }
}

在这里插入图片描述

在这里插入图片描述

三、Exchange的三种模式:DirectFanoutTopic。

1、可以参考 Springboot+RabbitMq整合使用含配置详解等

总结:

在Exchange中有三种模式:DirectFanoutTopic。

  • Direct模式只会将消息转发到符合绑定routing key的队列中如果没有符合routing key的队列那么消息会丢失。而且Direct发送的消息是唯一的也就是说再Direct中的一个消息最后只会发送到一个队列中被消费。

  • Fanout模式会无视routing key会把消息转发到所有绑定到该交换机上的队列中。所以Fanout中的一个消息会转发到所有的队列中也就是如果绑定了多个队列那么一个相同的消息会在多个队列中。

  • Topic模式有一套转发的routing key规则只会把消息转发到符合routing key 的队列中。所以在Topic中的一个消息有可能也会被转发到多个队列中进行消费。

四、注意消息模式是广播还是工作队列

1、消息广播和我们平时说的“广播”意思差不多就是希望同一条消息不同消费者都能分别消费;而队列模式就是不同消费者共享消费同一个队列的数据相同消息只能被某一个消费者消费一次。

2、比如同一个用户的注册消息会员服务需要监听以发送欢迎短信营销服务同样需要监听以发送新用户小礼物。但是会员服务、营销服务都可能有多个实例我们期望的是同一个用户的消息可以同时广播给不同的服务广播模式但对于同一个服务的不同实例比如会员服务 1 和会员服务 2不管哪个实例来处理处理一次即可工作队列模式:

在这里插入图片描述

3、可以参考文章 异步处理好用但非常容易用错

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: SpringRabbitMQ