RabbitMQ常见场景问题

6种工作模式

1.直连模式

没有交换机,根据routing key直连队列

在这里插入图片描述

application.properties

server.port=8081
spring.rabbitmq.host=39.99.141.194
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.mvc.pathmatch.matching-strategy=ant-path-matcher

RabbitmqConfig

@Configuration
public class RabbitMqConfig {
    //1.工作队列模式
    //声明队列同时交给spring
    @Bean(name = "work-queue")
    public Queue queue0(){
        return new Queue("work-queue");
    }
  }

send

@SpringBootTest
class RabbitmqApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void direct() {
        rabbitTemplate.convertAndSend("direct", "这是直连模式");
    }

}

consumer

@Controller
public class Consumer1 {
    @RabbitListener(queues = "direct")
    public void workQueue(String str){
        System.out.println("当前监听到了:"+str);
    }
}

控制台

image-20230131140125353

2.发布订阅模式

在这里插入图片描述

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给broker由交换机将消息转发到绑定此交换机的每个队列每个绑定交换机的队列都将接收到消息

RabbitmqConfig

//2.发布订阅模式
    //声明了队列
    @Bean(name = "queue1")
    public Queue queue(){
        return new Queue("publish-queue1");
    }

    @Bean(name = "queue2")
    public Queue queue2(){
        return new Queue("publish-queue2");
    }
    //广播的交换机
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("publish-exchange");
    }
    //将队列绑定到交换机
    @Bean
    Binding bindQueue1ToFanoutExchange(@Qualifier("queue1")Queue queue, FanoutExchange  fanoutExchange){
        return   BindingBuilder.bind(queue).to(fanoutExchange);
    }
    //将队列绑定到交换机
    @Bean
    Binding bindQueue2ToFanoutExchange(@Qualifier("queue2")Queue queue,FanoutExchange  fanoutExchange){
        return   BindingBuilder.bind(queue).to(fanoutExchange);
    }

send

@Test
public void testSendPublish(){
    Map map=new HashMap<>();
    map.put("name","张三");
    map.put("age",18);
    //1.交换机的名称  2.你的规则发布订阅模式为空 3.消息的主题
    rabbitTemplate.convertAndSend("publish-exchange","",map);
}

consumer

@Controller
public class Consumer2 {
    @RabbitListener(queues = "publish-queue1")
    public void workQueue1(Map str1){
        System.out.println("publish-queue1当前监听到了:"+str1);
    }
    @RabbitListener(queues = "publish-queue2")
    public void workQueue2(Map str2){
        System.out.println("publish-queue2当前监听到了:"+str2);
    }
    @RabbitListener(queues = "publish-queue3")
    public void workQueue3(Map str3){
        System.out.println("publish-queue3当前监听到了:"+str3);
    }
}

控制台

image-20230131161219640

3.Routing路由模式

在这里插入图片描述

路由模式:

1、每个消费者监听自己的队列并且设置routingkey。

2、生产者将消息发给交换机由交换机根据routingkey来转发消息到指定的队列。

RabbitMQConfig

    //3.routing模式 -路由模式
    //声明了3个队列
    @Bean(name = "queue4")
    public Queue queue4(){
        return new Queue("routing-queue1");
    }
    @Bean(name = "queue5")
    public Queue queue5(){
        return new Queue("routing-queue2");
    }
    @Bean(name = "queue6")
    public Queue queue6(){
        return new Queue("routing-queue3");
    }
    //声明交换机路由模式 DirectExchange
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("routing-exchange");
    }
    //建立队列与交换机的关系
    @Bean
    public Binding bindQueue1ToDirectExchange(@Qualifier("queue4")Queue queue,DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("info");
    }
    @Bean
    public Binding bindQueue2ToDirectExchange(@Qualifier("queue5")Queue queue,DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("waring");
    }
    @Bean
    public Binding bindQueue3ToDirectExchange(@Qualifier("queue6")Queue queue,DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("error");
    }

send

@Test
public void testRouting() {
    //1.交换机的名称  2.你的规则发布订阅模式为空 3.消息的主题
    rabbitTemplate.convertAndSend("routing-exchange", "info", "这是info");
    rabbitTemplate.convertAndSend("routing-exchange", "warning", "这是warning");
//        rabbitTemplate.convertAndSend("routing-exchange", "error", "这是error");
}

consumer

@Controller
public class Consumer3 {
    @RabbitListener(queues = "routing-queue1")
    public void routing1(String string) {
        System.out.println("routing-queue1接收到:" + string);
    }

    @RabbitListener(queues = "routing-queue2")
    public void routing2(String string) {
        System.out.println("routing-queue2接收到:" + string);
    }

    @RabbitListener(queues = "routing-queue3")
    public void routing3(String string) {
        System.out.println("routing-queue3接收到:" + string);
    }
}

控制台

只有info和warning收到了

image-20230131163212117

4.Topic通配符模式

在这里插入图片描述

路由模式:

1、每个消费者监听自己的队列并且设置带统配符的routingkey。

2、生产者将消息发给broker由交换机根据routingkey来转发消息到指定的队列。

RabbitMQConfig

    //4.topic模式 -主题模式
    //声明了3个队列
    @Bean(name = "queue7")
    public Queue queue7() {
        return new Queue("topic-queue1");
    }

    @Bean(name = "queue8")
    public Queue queue8() {
        return new Queue("topic-queue2");
    }

    @Bean(name = "queue9")
    public Queue queue9() {
        return new Queue("topic-queue3");
    }

    //声明交换机路由模式 DirectExchange
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic-exchange");
    }

    @Bean
    public Binding bindQueue1ToTopicExchange(@Qualifier("queue7") Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("ex.123.123");
    }

    @Bean
    public Binding bindQueue2ToTopicExchange(@Qualifier("queue8") Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("ex.*");
    }

    @Bean
    public Binding bindQueue3ToTopicExchange(@Qualifier("queue9") Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("ex.#");
    }

send

@Test
public void testTopic() {
    //1.交换机的名称  2.你的规则发布订阅模式为空 3.消息的主题
    rabbitTemplate.convertAndSend("topic-exchange", "ex.123.123", "这是ex.123.123");
}

consumer

@Controller
public class Consumer4 {
    @RabbitListener(queues = "topic-queue1")
    public void routing1(String string) {
        System.out.println("topic-queue1接收到:" + string);
    }

    @RabbitListener(queues = "topic-queue2")
    public void routing2(String string) {
        System.out.println("topic-queue2接收到:" + string);
    }

    @RabbitListener(queues = "topic-queue3")
    public void routing3(String string) {
        System.out.println("topic-queue3接收到:" + string);
    }
}

控制台

队列1,3可以收到

image-20230131170824007

5.Header模式

6.RPC

消息不丢失

消息发送到交换机失败

1.配置文件开启发布确认

spring.rabbitmq.publisher-confirm-type=correlated

2.配置回调函数

RabbitMQConfig:

@Slf4j
@Configuration
public class RabbitMQConfig {


    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("触发confirm回调,交换机接收到了");
            } else {
                log.info("触发confirm回调函数,交换机收不到信息,原因:" + cause);
                log.info("消息对应的的CorrelationData id:" + correlationData.getId());
            }
        });
        return rabbitTemplate;
    }
}

3.测试

随便给交换机发送一条消息

image-20230131222932467

4.如何处理失败消息

在ConfirmCallback监听中,当消息发送失败,ack失败时,我们又能拿到消息的CorrelationData,所以通过CorrelationData与消息之间的关系,我们在回调函数中通过CorrelationData来获取发送失败的消息,进而对其进行下一步操作(记录或重发等)

我们可以在发消息之前,将CorrelationData作为key,消息作为value,持久化起来(例如用redis数据库),当消息成功发送到交换机,ack为true时,我们再把他从持久化层中删除,这样的话,当消息发送失败时,我们就可以通过CorrelationData,从持久层中拿到发送失败的消息了

代码改造如下:

(CacheService是封装的redis操作工具类)

RabbitMQConfig:

@Slf4j
@Configuration
public class RabbitMQConfig {
    @Autowired
    private CacheService cacheService;
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.info("消息主体 message : " + returnedMessage.getMessage());
            log.info("消息主体 message : " + returnedMessage.getReplyCode());
            log.info("描述:" + returnedMessage.getReplyText());
            log.info("消息使用的交换器 exchange : " + returnedMessage.getExchange());
            log.info("消息使用的路由键 routing : " + returnedMessage.getRoutingKey());

        });
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("触发confirm回调,交换机接收到了");
                Long result = cacheService.hDelete("rabbitmq:" + correlationData.getId(), "exchange", "routingKey", "message");
                log.info("已清除redis消息备份:"+result);
            } else {
                log.info("触发confirm回调函数,交换机收不到信息,原因:" + cause);
                log.info("消息对应的的CorrelationData id:" + correlationData.getId());
                Map<Object, Object> map = cacheService.hGetAll("rabbitmq:" + correlationData.getId());
                String exchange = (String) map.get("exchange");
                String routingKey = (String) map.get("routingKey");
                String message = (String) map.get("message");
                rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            }
        });
        return rabbitTemplate;
    }

send:

@Test
public void testConfirm() {
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    String exchange = "routing-exchange";
    String routingKey = "info";
    String message = "这是info";
    Map hashMap = new HashMap<>();
    hashMap.put("exchange", exchange);
    hashMap.put("routingKey", routingKey);
    hashMap.put("message", message);
    cacheService.hPutAll("rabbitmq:" + correlationData.getId(), hashMap);
    rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}

模拟一个失败情况(向不存在的exchange发消息),就会自动进行重试,redis中会保存下所有的失败消息,我们定时做人工处理也是可以的

控制台

image-20230131224734521

image-20230131230312860

RabbitMQ服务器故障

持久化

开启交换机,队列,消息的持久化,可将其存储在磁盘上,可在服务重启后恢复

  1. 交换机持久化

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("publish-exchange",true,false);
    }
    
  2. 队列持久化

    @Bean("direct")
    public Queue queue0() {
        return new Queue("direct",true);
    }
    
  3. 消息持久化

    //spring的rabbitTemplate默认开启消息持久化
    rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    

消息发送到队列失败

1.配置文件开启消费确认

spring.rabbitmq.publisher-returns=true

2.配置回调函数

//消息消费确认
//mandatory:交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式
//true:RabbitMQ会调用Basic.Return命令将消息返回给生产者
//false:RabbitMQ会把消息直接丢弃
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnedMessage -> {
    log.info("消息主体 message : " + returnedMessage.getMessage());
    log.info("消息主体 message : " + returnedMessage.getReplyCode());
    log.info("描述:" + returnedMessage.getReplyText());
    log.info("消息使用的交换器 exchange : " + returnedMessage.getExchange());
    log.info("消息使用的路由键 routing : " + returnedMessage.getRoutingKey());

});

3.测试

发送一个不存在的routingKey

image-20230201181451259

4.如何处理失败消息

returnedMessage包含消息的所有信息,可以进行例如上面使用的redis进行持久化,再进行处理

消息消费失败

rabbitMQ有 ack 签收机制简单来说就是三种模式:

AcknowledgeMode.NONE:默认推送的所有消息都已经消费成功会不断地向消费端推送消息。所以推送出去的消息不会暂存在server端

AcknowledgeMode.AUTO: 由 spring-rabbit 依据消息处理逻辑是否抛出异常自动发送 ack(无异常或 nack(异常到 server 端。

AcknowledgeMode.MANUAL:模式需要人为地获取到 channel 之后调用方法向 server 发送 ack (或消费失败时的 nack 信息

消费结果结果批量操作
ack表示成功确认使用此回执方法后消息会被rabbitmq broker 删除
void basicAck(long deliveryTag, boolean multiple)
允许
nack表示失败确认一般在消费消息业务异常时用到此方法可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
允许
reject拒绝消息与 basicNack 区别在于不能进行批量操作其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
不允许
  • deliveryTag:表示消息投递序号每次消费消息或者消息重新投递后deliveryTag 都会递增。手动消息确认模式下我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
  • multiple:为了减少网络流量手动确认可以被批处理值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7可它们都没有被确认当我发第四条消息此时deliveryTag为8multiple设置为 true会将5、6、7、8的消息全部进行确认。
  1. 配置文件开启手动签收

    #多消费者轮询模式,每个消费者都能收到的未被消费的最大消息数量
    spring.rabbitmq.listener.simple.prefetch=1
    #设置消费端手动,返回分为:ack(无异常nack(存在异常reject(存在异常
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #开启重试
    spring.rabbitmq.listener.simple.retry.enabled=true
    
  2. 消费者开启手动签收,并手动抛出异常用于测试

    @RabbitListener(queues = "routing-queue1")
    public void routing1(String string, Channel channel, Message message) throws IOException {
        try {
            System.out.println("routing-queue1接收到:" + string);
            throw new RuntimeException("手动抛出异常,测试");
        } catch (Exception e) {
            log.error("消息消费出现异常,重新入队伍");
            /**
             * 出现异常,把消息重新投递回队列中,如一直有异常会一直循环投递
             * deliveryTag:表示消息投递序号。
             * multiple:是否批量确认。
             * requeue:值为 true 消息将重新入队列。
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
        }
        /**
         * 消息确认 ACK
         * deliveryTag:表示消息投递序号每次消费消息或者消息重新投递后deliveryTag都会增加
         * multiple:是否批量确认值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
         */
        log.info("ACK消息消费确认.....");
    
        // 消息确认 basicAck
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    
        // 消息拒绝 basicReject
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    
    }
    
  3. 测试

    send

    @Test
    public void testConfirm() {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String exchange = "routing-exchange";
        String routingKey = "info";
        String message = "这是info";
        Map hashMap = new HashMap<>();
        hashMap.put("exchange", exchange);
        hashMap.put("routingKey", routingKey);
        hashMap.put("message", message);
        cacheService.hPutAll("rabbitmq:" + correlationData.getId(), hashMap);
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
    

    会不停进行重新投递消费

    image-20230201191913345

消息幂等性(重复消费)

问题

保证MQ消息不重复的情况下消费者消费消息成功后在给MQ发送消息确认的时候出现了网络异常(或者是服务中断)MQ没有接收到确认此时MQ不会将发送的消息删除
为了保证消息被消费当消费者网络稳定后MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

测试重复消费场景

发送5000条数据到queue,消费端自动应答

send

@Test
public void testAgain() {
    String exchange = "routing-exchange";
    String routingKey = "warning";
    for (int i = 1; i <= 5000; i++) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "这是warning:" + i;
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
}

image-20230201205325114

启动consumer,随后中断重启

@RabbitListener(queues = "routing-queue2")
public void routing2(String string, Channel channel, Message message) throws IOException {
    System.out.println("routing-queue2接收到:" + string);
    // 消息确认 basicAck
    //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

image-20230201205724867

这个我测不出来,但理论上是可能出现问题的(QAQ)

解决

如何解决消息重复消费的问题:
为了保证消息不被重复消费首先要保证每个消息是唯一的所以可以给每一个消息携带一个全局唯一的id流程如下:

  1. 消费者监听到消息后获取id先去查询这个id是否存中

  2. 如果不存在则正常消费消息并把消息的id存入 数据库或者redis中(下面的编码示例使用redis

  3. 如果存在则丢弃此消息

消费者改造,以消息id为key消息内容为value存入setnx中设置过期时间(可承受的redis服务器异常时间比如设置过期时间为10分钟如果redis服务器断了20分钟那么未消费的数据都会丢了

/**
 * setnx,如果redis中有记录,就会返回false,说明已经消费过了,无法写入
 * @param string
 * @param channel
 * @param message
 * @throws IOException
 */
@RabbitListener(queues = "routing-queue2")
public void routing2(String string, Channel channel, Message message) throws IOException, InterruptedException {
    boolean b = cacheService.setIfAbsent("RabbitmqConsumer:"+message.getMessageProperties().getMessageId(), string, 10L, TimeUnit.MINUTES);
    if (!b) {
        return;
    }
    System.out.println("routing-queue2接收到:" + string);

}

测试,已经全部存入了redis

image-20230201220102907

消息有序

消息消费顺序错乱原因

  1. 一个queue有多个consumer去消费这样就会造成顺序的错误consumer从MQ里面读取数据是有序的但是每个consumer的执行时间是不固定的无法保证先读到消息的consumer一定先完成操作这样就会出现消息并没有按照顺序执行造成数据顺序错误。

  2. 一个queue对应一个consumer但是consumer里面进行了多线程消费这样也会造成消息消费顺序错误。

解决

在必须保证顺序消费的业务中,单个队列对应单个消费者,单线程消费

消息堆积

消息堆积原因

  • 消息堆积即消息没及时被消费是生产者生产消息速度快于消费者消费的速度导致的。
  • 消费者消费慢可能是因为:本身逻辑耗费时间较长、阻塞了。

解决

  1. 增加消费者消费能力,消费者内开启多线程处理消息(注意无法保证顺序消费)

  2. 建立新的queue消费者同时订阅新旧queue采用订阅模式

  3. 默认情况下rabbitmq消费者为单线程串行消费(org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer类的concurrentConsumers与txSize(对应prefetchCount都是1设置并发消费两个关键属性concurrentConsumers和prefetchCount。concurrentConsumers:设置的是对每个listener在初始化的时候设置的并发消费者的个数;prefetchCount:每次从broker里面取的待消费的消息的个数。
    配置方法:修改application.properties:

    spring.rabbitmq.listener.concurrency=m
    spring.rabbitmq.listener.prefetch=n
    
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ