SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列、死信、备份交换机

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、应用场景

  • 业务中心根据业务需求向特定用户发送消息发送前不确定由哪个用户接收

在这里插入图片描述

  • 特定用户接收特定消息用户可以退出再切换别的用户登录用户登录后只接收与自已对应的消息

在这里插入图片描述

二、总体要求

项目要足够稳健消息不能丢失

  • 交换机、队列、消息持久化

  • 队列有容量限制如3000

  • 消息发送后需要确认非自动确认

  • 未发送成功的消息由缓存保存定时重发

  • 交换机收到消息但无法投递时转发至备份交换机再广播至对应队列

  • 费时操作采用异步方式

三、架构图

在这里插入图片描述

四、安装RabbitMQ

参考如下三篇文章

五、搭建SpringBoot项目

  • java1.8

  • spring-boot 2.6.7

1、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.7</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tuwer</groupId>
    <artifactId>mq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- amqp-client -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- amqp-client Java原生依赖 -->
<!--        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>-->
        <!-- hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.2</version>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.13.3</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.3</version>
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!-- 工具类 -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

2、application.yml

spring:
  rabbitmq:
    host: 192.168.3.174
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 交换机接收确认
    publisher-confirm-type: correlated
    # 交换机回退消息
    #publisher-returns: true

2、启动类

@EnableAsync 开启异步操作

package com.tuwer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
@EnableAsync
@SpringBootApplication
public class MqApp {
    public static void main(String[] args) {
        SpringApplication.run(MqApp.class, args);
    }
}

3、基础类

3.1、常量类

package com.tuwer.constant;

/**
 * <p>系统常量类</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
public class Constants {
    /**
     * 队列容量、通道预取值
     * 队列容量应根据项目需要设置合适的值
     * 本案例中为了测试方便设为5
     */
    public static final int QUEUE_CAPACITY = 5;
    public static final int PRE_FETCH_SIZE = 10;

    /**
     * 交换机
     */
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String BACKUP_EXCHANGE = "backup_exchange";

    /**
     * 队列
     */
    public static final String BACKUP_QUEUE = "backup_queue";
}

3.2、雪花算法工具类

获取Long型idSnowflakeUtil.getInstance().nextId()

package com.tuwer.util;

import lombok.extern.slf4j.Slf4j;

import java.text.MessageFormat;

/**
 * <p>雪花算法工具类</p>
 *
 * @author 土味儿
 * Date 2022/6/2
 * @version 1.0
 */
@Slf4j
@SuppressWarnings("all")
public class SnowflakeUtil {
    // ==============================Fields===========================================
    /**
     * 开始时间戳 (2000-01-01 00:00:00)
     */
    private static final long TWEPOCH = 946656000000L;

    /**
     * 机器id所占的位数 5
     */
    private static final long WORKER_ID_BITS = 5L;

    /**
     * 数据标识id所占的位数 5
     */
    private static final long DATA_CENTER_ID_BITS = 5L;

    /**
     * 支持的最大机器id结果是 31
     */
    private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);

    /**
     * 支持的最大数据标识id结果是 31
     */
    private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);

    /**
     * 序列在id中占的位数
     */
    private static final long SEQUENCE_BITS = 12L;

    /**
     * 机器ID向左移12位
     */
    private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;

    /**
     * 数据标识id向左移17位(12+5)
     */
    private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;

    /**
     * 时间戳向左移22位(5+5+12)
     */
    private static final long TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;

    /**
     * 生成序列的掩码这里为4095 (0b111111111111=0xfff=4095)
     */
    private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);

    /**
     * 步长 1024
     */
    private static final long STEP_SIZE = 1024;

    /**
     * unsigned int max value
     */
    private static final long UINT_MAX_VALUE = 0xffffffffL;

    /**
     * 工作机器ID(0~31)
     */
    private long workerId;

    /**
     * 工作机器ID 计数器
     */
    private long workerIdFlags = 0L;

    /**
     * 数据中心ID(0~31)
     */
    private long dataCenterId;

    /**
     * 数据中心ID 计数器
     */
    private long dataCenterIdFlags = 0L;

    /**
     * 毫秒内序列(0~4095)
     */
    private long sequence = 0L;

    /**
     * 毫秒内序列基数[0|1024|2048|3072]
     */
    private long basicSequence = 0L;

    /**
     * 上次生成ID的时间戳
     */
    private long lastTimestamp = -1L;

    /**
     * 工作模式
     */
    private final WorkMode workMode;

    public enum WorkMode {NON_SHARED, RATE_1024, RATE_4096;}

    //==============================单例模式静态内部类=====================================
    private static class InnerClass{
        private static final SnowflakeUtil INNER_DEMO = new SnowflakeUtil();
    }
    public static SnowflakeUtil getInstance(){
        return InnerClass.INNER_DEMO;
    }

    //==============================Constructors=====================================

    public SnowflakeUtil() {
        this(0, 0, WorkMode.RATE_4096);
    }

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     */
    public SnowflakeUtil(long workerId, long dataCenterId) {
        this(workerId, dataCenterId, WorkMode.RATE_4096);
    }

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     * @param workMode     工作模式
     */
    public SnowflakeUtil(long workerId, long dataCenterId, WorkMode workMode) {
        this.workMode = workMode;
        if (workerId > MAX_WORKER_ID || workerId < 0) {
            throw new IllegalArgumentException(MessageFormat.format("worker Id can't be greater than {0} or less than 0", MAX_WORKER_ID));
        }
        if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0) {
            throw new IllegalArgumentException(MessageFormat.format("datacenter Id can't be greater than {0} or less than 0", MAX_DATA_CENTER_ID));
        }
        this.workerId = workerId;
        this.workerIdFlags = setSpecifiedBitTo1(this.workerIdFlags, this.workerId);
        this.dataCenterId = dataCenterId;
        this.dataCenterIdFlags = setSpecifiedBitTo1(this.dataCenterIdFlags, this.dataCenterId);
    }

    // ==============================Methods==========================================

    /**
     * 获取机器id
     *
     * @return 所属机器的id
     */
    public long getWorkerId() {
        return workerId;
    }

    /**
     * 获取数据中心id
     *
     * @return 所属数据中心id
     */
    public long getDataCenterId() {
        return dataCenterId;
    }

    /**
     * 获得下一个ID (该方法是线程安全的)
     *
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();
        //如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < this.lastTimestamp) {
            if (timestamp > TWEPOCH) {
                if (WorkMode.NON_SHARED == this.workMode) {
                    nonSharedClockBackwards(timestamp);
                } else if (WorkMode.RATE_1024 == this.workMode) {
                    rate1024ClockBackwards(timestamp);
                } else {
                    throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
                }
            } else {
                throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
            }
        }
        //如果是同一时间生成的则进行毫秒内序列
        if (this.lastTimestamp == timestamp) {
            this.sequence = (this.sequence + 1) & SEQUENCE_MASK;
            //毫秒内序列溢出
            if (this.sequence == 0) {
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(this.lastTimestamp);
            }
        }
        //时间戳改变毫秒内序列重置
        else {
            this.sequence = this.basicSequence;
        }
        //上次生成ID的时间戳
        this.lastTimestamp = timestamp;
        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT)
                | (this.dataCenterId << DATA_CENTER_ID_SHIFT)
                | (this.workerId << WORKER_ID_SHIFT)
                | this.sequence;
    }

    /**
     * 阻塞到下一个毫秒直到获得新的时间戳
     *
     * @param lastTimestamp 上次生成ID的时间戳
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp0;
        do {
            timestamp0 = timeGen();
        } while (timestamp0 <= lastTimestamp);
        return timestamp0;
    }

    /**
     * 返回以毫秒为单位的当前时间
     *
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    /**
     * 尝试解决时钟回拨<br>【* 仅用于 单机生成不对外 的情况 *】
     *
     * @param timestamp 当前时间戳
     * @return void
     */
    private void nonSharedClockBackwards(long timestamp) {
        if (this.dataCenterIdFlags >= UINT_MAX_VALUE && this.workerIdFlags >= UINT_MAX_VALUE) {
            throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
        } else {
            //如果仅用于生成不重复的数值尝试变更 dataCenterId 或 workerId 修复时钟回拨问题

            log.warn("Clock moved backwards. Refusing to generate id for {} milliseconds", lastTimestamp - timestamp);
            //先尝试变更 dataCenterId当 dataCenterId 轮询一遍之后尝试变更 workerId 并重置 dataCenterId
            if (this.dataCenterIdFlags >= UINT_MAX_VALUE) {
                if (++this.workerId > MAX_WORKER_ID) {
                    this.workerId = 0L;
                }
                this.workerIdFlags = setSpecifiedBitTo1(this.workerIdFlags, this.workerId);
                // 重置 dataCenterId 和 dataCenterIdFlags
                this.dataCenterIdFlags = this.dataCenterId = 0L;
            } else {
                if (++this.dataCenterId > MAX_DATA_CENTER_ID) {
                    this.dataCenterId = 0L;
                }
            }
            this.dataCenterIdFlags = setSpecifiedBitTo1(this.dataCenterIdFlags, this.dataCenterId);
            this.lastTimestamp = -1L;
            log.warn("Try to fix the clock moved backwards. timestamp : {}, worker Id : {}, datacenter Id : {}", timestamp, workerId, dataCenterId);
        }
    }

    /**
     * 尝试解决时钟回拨<br>【* 仅用于每毫秒生成量 不大于 1024 的情况 *】
     *
     * @param timestamp 当前时间戳
     * @return void
     */
    private void rate1024ClockBackwards(long timestamp) {
        if (this.basicSequence > (SEQUENCE_MASK - STEP_SIZE)) {
            throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
        } else {
            log.warn("Clock moved backwards. Refusing to generate id for {} milliseconds", lastTimestamp - timestamp);
            this.basicSequence += STEP_SIZE;
            this.lastTimestamp = -1L;
            log.warn("Try to fix the clock moved backwards. timestamp : {}, basicSequence : {}", timestamp, basicSequence);
        }
    }

    /**
     * Set the specified bit to 1
     *
     * @param value raw long value
     * @param index bit index (From 0~31)
     * @return long value
     */
    private long setSpecifiedBitTo1(long value, long index) {
        return value |= (1L << index);
    }

    /**
     * Set the specified bit to 0
     *
     * @param value raw long value
     * @param index bit index (From 0~31)
     * @return long value
     */
    private long setSpecifiedBitTo0(long value, long index) {
        return value &= ~(1L << index);
    }

    /**
     * Get the specified bit
     *
     * @param value raw long value
     * @param index bit index(From 0-31)
     * @return 0 or 1
     */
    private int getSpecifiedBit(long value, long index) {
        return (value & (1L << index)) == 0 ? 0 : 1;
    }

}

3.3、缓存模型类

缓存操作不是本文的重点用模型类代替实际部署时可换作redis

package com.tuwer.cache;

import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 模拟缓存
 *
 * @author 土味儿
 * Date 2023/1/3
 * @version 1.0
 */
public class CacheModel {
    /**
     * 并发Map
     */
    private static ConcurrentSkipListMap<Long, String> cache = new ConcurrentSkipListMap<>();

    /**
     * 存入缓存
     *
     * @param key
     * @param value
     */
    public static void put(Long key, String value) {
        cache.put(key, value);
        System.out.println("存入缓存【key】" + key + "【value】" + value);
        print();
    }

    /**
     * 获取value
     *
     * @param key
     * @return
     */
    public static String get(Long key) {
        String v = cache.get(key);
        System.out.println("从缓存中获取【key】" + key + "【value】" + v);
        return v;
    }

    /**
     * 删除key
     *
     * @param key
     */
    public static void del(Long key) {
        String v = cache.remove(key);
        System.out.println("从缓存中删除【key】" + key + "【value】" + v);
        print();
    }

    /**
     * 删除小于等于key的多个值
     *
     * @param key
     */
    public static void delMany1(Long key) {
        Set<Long> keys = cache.keySet();
        int n = 0;
        for (Long k : keys) {
            if (k <= key) {
                cache.remove(k);
                n++;
            }
        }
        System.out.println("从缓存中删除小于等于【" + key + "】的多个值共有 " + n + " 个");
        print();
    }

    /**
     * 删除小于等于key的多个值
     * ConcurrentNavigableMap
     * @param key
     */
    public static void delMany(Long key) {
        // 得到批量确认信息Map只能得到小于key的值
        ConcurrentNavigableMap<Long, String> confirmMap = cache.headMap(key);
        System.out.println("从缓存中删除小于等于【" + key + "】的多个值共有 " + (confirmMap.size() + 1) + " 个");
        // 清空已经确认的
        confirmMap.clear();

        // 单独再删除等于key的值
        cache.remove(key);

        print();
    }

    public static void print() {
        System.out.println("当前缓存大小" + cache.size());
        Set<Long> keys = cache.keySet();
        for (Long key : keys) {
            System.out.print(key);
            System.out.print(" | ");
            //System.out.println(cache.get(key));
        }
        System.out.println();
    }
}

4、配置类 MqConfig.java

package com.tuwer.config;

import com.tuwer.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * <p>配置类</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Configuration
public class MqConfig {
    @Resource
    private CachingConnectionFactory connectionFactory;
    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){        
        System.out.println("初始化...");
        // 删除普通空队列排除掉备份队列减轻RabbitMQ的压力
   }

    /**
     * 获取普通交换机
     *
     * @return
     */
    @Bean("normalExchange")
    public DirectExchange getNormalExchange() {
        return ExchangeBuilder
                .directExchange(Constants.NORMAL_EXCHANGE)
                // 持久化
                .durable(true)
                // 备份(候补)交换机
                .withArgument("alternate-exchange", Constants.BACKUP_EXCHANGE)
                .build();
    }

    /**
     * 备份交换机
     * 类型fanout
     *
     * @return
     */
    @Bean("backupExchange")
    public FanoutExchange getBackupExchange() {
        return new FanoutExchange(Constants.BACKUP_EXCHANGE);
    }

    /**
     * 获取备份队列
     *
     * @return
     */
    @Bean("backupQueue")
    public Queue getBackupQueue() {
        return QueueBuilder.durable(Constants.BACKUP_QUEUE).build();
    }

    /**
     * 备份队列 绑定 备份交换机
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding backupQueueBindBackupExchange(
            @Qualifier("backupQueue") Queue queue,
            @Qualifier("backupExchange") FanoutExchange exchange
    ) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    /**
     * 用于动态创建队列、交换机并绑定
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(){
        //return new RabbitAdmin(connectionFactory);
        return new RabbitAdmin(rabbitTemplate);
    }

    /**
     * 用于设置动态监听
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        container.setConcurrentConsumers(10);
        container.setMaxConcurrentConsumers(50);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 预取值
        container.setPrefetchCount(Constants.PRE_FETCH_SIZE);

        // 创建队列在用户登录后创建
        //createNormalQueueAndBind.create(username);
        //listenerContainer.setQueueNames("q_" + username);
        //listenerContainer.setMessageListener(myAckReceiver);

        return container;
    }
}

5、普通队列动态创建类

package com.tuwer.service;

import com.tuwer.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Locale;
import java.util.Objects;
import java.util.Properties;

/**
 * <p>动态创建队列</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */
@Component
@Slf4j
public class CreateNormalQueueAndBind {
    @Resource
    private DirectExchange normalExchange;
    @Resource
    private RabbitAdmin rabbitAdmin;

    /**
     * 动态创建普通队列并绑定至普通交换机
     *
     * @param routingKey
     */
    public void create(String routingKey) {
        String key = routingKey.toLowerCase(Locale.ROOT);
        String queueName = "q_" + key;

        // 创建普通队列
        Queue queue = null;

        // 查询队列属性为null时表示队列不存在
        Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);

        // 队列不存在时创建
        if (Objects.isNull(queueProperties)) {
            log.info("队列【{}】不存在创建中...", queueName);
            queue = QueueBuilder.durable(queueName).maxLength(Constants.QUEUE_CAPACITY).build();
            rabbitAdmin.declareQueue(queue);
        }
        // 绑定至普通交换机
        if (Objects.nonNull(queue)) {
            Binding binding = BindingBuilder.bind(queue).to(normalExchange).with(key);
            rabbitAdmin.declareBinding(binding);
        }
    }
}

经过测试如果队列满了的时候再向队列发送消息时最老的消息被丢弃且不会启用备份交换机为了防止信息丢失加入死信交换机死信队列当前队列满了的时候最老的信息进入死信交换机再转至死信队列

  • 原代码
// 原
queue = QueueBuilder.durable(queueName)
      .maxLength(Constants.QUEUE_CAPACITY)
      .build();
  • 新代码
// 新
queue = QueueBuilder.durable(queueName)
      // 设置队列长度
      .maxLength(Constants.QUEUE_CAPACITY)
      // 设置死信交换机
      .deadLetterExchange(Constants.BACKUP_EXCHANGE)
      // 设置死信RoutingKey死信队列
      .deadLetterRoutingKey(Constants.BACKUP_QUEUE)
      // 改变溢出规则当队列溢出时拒绝接收新消息
      //.overflow(QueueBuilder.Overflow.rejectPublish)
      .build();

6、发布确认回调类

package com.tuwer.service;

import com.tuwer.cache.CacheModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * <p>发布确认回调类</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 把当前类注入到 RabbitTemplate
     *
     * @PostConstruct 表示在执行当前类的构造时运行
     * 因为 ConfirmCallback接口是 RabbitTemplate的内部类
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调方法
     *
     * @param correlationData 回调消息
     * @param ack             交换机是否确认收到了消息true:收到了false:没有收到
     * @param cause           没有收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息ID
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            // 收到
            log.info("交换机收到了消息编号{}", id);
            // 从缓存中删除
            CacheModel.del(Long.parseLong(id));
        } else {
            // 未收到
            log.info("交换机没有收到编号为{} 的消息原因{}", id, cause);
        }
    }
}

从缓存中删除的优化操作

  • 采用Redis后发现的问题部分信息确认后没有从Redis中删除成功

    • 分析原因确认回调方法是异步执行可能在发送时信息还没有存入缓存中回调方法就已经执行 这时删除操作将会失败信息会一直留在缓存中

    • 解决方法休眠几秒待信息存入缓存后重新删除

在这里插入图片描述

  • 代码优化
    @Override
    @SneakyThrows
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息ID
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            // 收到
            log.info("交换机收到了消息编号{}", id);
            // 从缓存中删除
            RedisUtil<String> redisUtil = new RedisUtil<>(redisTemplate);
            Long n = redisUtil.rkey.del(id);
            if (n > 0) {
                log.info("从缓存中删除消息编号{}", id);
            } else {
                /*
                 * 确认回调方法是异步执行可能在发送时信息还没有存入缓存中回调方法就已经执行
                 * 这时删除操作将会失败信息会一直留在缓存中
                 * 解决方法休眠几秒待信息存入缓存后重新删除
                 */
                TimeUnit.SECONDS.sleep(3);
                n = redisUtil.rkey.del(id);
                if (n > 0) {
                    log.info("从缓存中首次删除消息失败再次尝试删除成功编号{}", id);
                }
            }
        } else {
            // 未收到
            log.info("交换机没有收到编号为{} 的消息原因{}", id, cause);
        }
    }

7、生产者服务类

发送消息采用异步方式 @Async

package com.tuwer.service;


import com.tuwer.cache.CacheModel;
import com.tuwer.constant.Constants;
import com.tuwer.util.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;

/**
 * <p>生产者</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
@Component
@Slf4j
public class Producer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private CreateNormalQueueAndBind createNormalQueueAndBind;

    /**
     * 异步发送消息
     *
     * @param msg
     */
    @Async
    public void sendMsg(String msg, String routingKey) {
        String key = routingKey.toLowerCase(Locale.ROOT);
        // 创建普通队列并绑定至普通交换机
        createNormalQueueAndBind.create(key);

        long id = SnowflakeUtil.getInstance().nextId();
        CorrelationData correlationData = new CorrelationData(String.valueOf(id));
        rabbitTemplate.convertAndSend(
                Constants.NORMAL_EXCHANGE,
                key,
                msg,
                correlationData);

        // 存入缓存
        CacheModel.put(id, msg);

        log.info("消息【{}】已发送编号{}", msg, correlationData.getId());
    }
}

8、消费者服务类

8.1、备份队列消费者

备份队列固定不变config中配置系统启动后自动创建

package com.tuwer.service;

import com.tuwer.config.MqConfig;
import com.tuwer.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * <p>备份消费者</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@Component
public class BackupConsumer {
    /**
     * 监听备份队列消息
     * @param message
     */
    @RabbitListener(queues = Constants.BACKUP_QUEUE)
    public void receiveMsg(Message message){
        String msg = new String(message.getBody());
        log.info("接收到备份队列的消息【{}】", msg);
    }
}

8.2、普通队列消费者

  • 队列不确定用户登录后动态切换要监控的队列

  • 如果用户退出后也要更新监控列表省略

package com.tuwer.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


/**
 * <p>普通消费者</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@Component
public class NormalConsumer {
    @Resource
    private SimpleMessageListenerContainer listenerContainer;
    @Resource
    private MyAckReceiver myAckReceiver;
    @Resource
    private CreateNormalQueueAndBind createNormalQueueAndBind;

    /**
     * 监听普通队列消息
     * 动态切换要监控的队列   
     *   
     * @param username
     */
    //@RabbitListener(queues = "队列名称")
    public void receiveMsg(String username) {
        // 创建队列
        createNormalQueueAndBind.create(username);
        // 设置要监听的队列用set不是add
        listenerContainer.setQueueNames("q_" + username);
        // 设置消息接收器
        listenerContainer.setMessageListener(myAckReceiver);

        // 当前监听的队列列表
        String[] queueNames = listenerContainer.getQueueNames();
        System.out.println("当前监听的队列");
        for (String queueName : queueNames) {
            System.out.println(queueName);
        }
        System.out.println("----------");
    }
}
  • 消息接收器
package com.tuwer.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * <p>消息接收器</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */
@Slf4j
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String consumerQueue = message.getMessageProperties().getConsumerQueue();
            String msg = new String(message.getBody());
            log.info("MyAckReceiver的【{}】队列收到的消息【{}】", consumerQueue, msg);
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicReject(deliveryTag, false);
            e.printStackTrace();
        }
    }
}

9、Controller接口类

9.1、发送消息

http://localhost:8080/sendMsg/用户名/消息

package com.tuwer.controller;

import com.tuwer.service.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * <p>发送消息</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@RestController
public class ProducerController {
    @Resource
    private Producer producer;

    /**
     * 发送消息
     *
     * @param msg      消息内容
     * @param username 接收消息的用户
     * @return
     */
    @GetMapping("/sendMsg/{username}/{msg}")
    public String sendMsg(
            @PathVariable("msg") String msg,
            @PathVariable("username") String username
    ) {
        producer.sendMsg(msg, username);

        return "OK!";
    }
}

9.2、模拟用户登录

http://localhost:8080/login/用户名

package com.tuwer.controller;

import com.tuwer.service.NormalConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * <p>模拟登录</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */
@Slf4j
@RestController
@RequestMapping("/login")
public class LoginController {
    @Resource
    private NormalConsumer consumer;

    /**
     * 登录
     *
     * @param username
     * @return
     */
    @GetMapping("/{username}")
    public String login(
            @PathVariable("username") String username
    ) {
        log.info("用户 {} 已登录", username);

        // 接收该用户的队列消息
        consumer.receiveMsg(username);

        return username;
    }
}

10、定时重发

缓存中未得到确认的消息由定时器重新发送

在这里插入图片描述

信息重发调度器这是系统优化后增加的类

  • 增加了信息实体类 MqMsg

  • 增加了异步发送多条信息方法 sendMsg(List<MqMsg> listOfMqMsg)

package com.tuwer.service;

import com.alibaba.fastjson2.JSON;
import com.tuwer.pojo.po.MqMsg;
import com.tuwer.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * <p>调度器</p>
 *
 * @author 土味儿
 * Date 2023/1/13
 * @version 1.0
 */
@Slf4j
@Component
public class Scheduler {
    @Resource
    private RedisTemplate redisTemplate;
    @Resource
    private Producer producer;

    /**
     * 缓存信息重发每30分钟检查一次
     */
    @Scheduled(cron = "0 0/30 * * * ?")
    public void resendForCache() {
        log.info("开始执行缓存信息重发...");
        // 1、从缓存中获取全部信息key集合
        RedisUtil<String> redisUtil = new RedisUtil<>(redisTemplate);
        Set<String> keys = redisUtil.rkey.keys("*");
        int s1 = keys.size();
        if (s1 < 1) {
            log.info("缓存中没有信息");
            return;
        }
        log.info("从缓存中获取到 {} 条信息!", s1);

        // 2、遍历分析信息
        String msgJson;
        MqMsg mqMsg;
        List<MqMsg> list = new ArrayList<>();
        // 需要重发的时间差秒
        long n = 10 * 60;
        // 当前时刻距离原点时刻的秒值
        long nowSecond = Instant.now().getEpochSecond();

        log.info("开始检查...");
        for (String key : keys) {
            // 得到当前key的值
            msgJson = redisUtil.rstring.get(key);
            // 解析成MqMsg对象
            mqMsg = JSON.parseObject(msgJson, MqMsg.class);

            // 当前信息距离原点时刻的秒值
            long msgSecond = mqMsg.getTime().atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();

            // 判断是否已达到重发的要求
            if ((nowSecond - msgSecond) > n) {
                /*
                 * 需要重发
                 * -------------------
                 * 把信息封装进list中
                 * list中元素是MqMsgVO对象
                 */
                list.add(mqMsgVO);
                log.info("信息{} 已过去 {} 秒封装进集合中", key, n);
            }
        }

        // 3、重发调用信息发送方法
        int s2 = list.size();
        if (s2 > 0) {
            log.info("共有 {} 条信息需要重发", s2);
            producer.sendMsg(list);
            log.info("重发指令已发出");
            return;
        }
        log.info("没有需要重发的信息");
    }
}
  • 信息实体类
package com.tuwer.pojo.po;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.tuwer.util.SnowflakeUtil;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

/**
 * <p>消息队列消息实体类</p>
 *
 * @author 土味儿
 * Date 2023/1/9
 * @version 1.0
 */
@Data
@NoArgsConstructor
public class MqMsg {
    private Long id;
    private String from;
    private String to;
    private int type;
    private String title;
    private String content;

    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private LocalDateTime time;

    private boolean read;

    public MqMsg(String from, String to, int type, String title, String content) {
        // 雪花算法生成id
        this.id = SnowflakeUtil.getInstance().nextId();

        this.from = from;
        this.to = to;
        this.type = type;
        this.title = title;
        this.content = content;

        // 当前时间
        this.time = LocalDateTime.now();
        this.read = false;
    }
}
  • 异步发送多条信息
/**
 * <p>消息生产者</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
@Component
@Slf4j
public class Producer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private CreateNormalQueueAndBind createNormalQueueAndBind;
    @Resource
    private RedisTemplate redisTemplate;

    /**
     * 异步发送单条消息
     *
     * @param msgJson    消息实体的json字符串
     * @param routingKey
     */
    @Async
    public void sendMsg(String msgJson, String routingKey) {
        // 略
    }

    /**
     * 异步发送多条消息
     *
     * @param listOfMqMsg list集合元素为MqMsg
     */
    @Async
    public void sendMsg(List<MqMsg> listOfMqMsg) {
        if (CollUtil.isEmpty(listOfMqMsg)) {
            log.info("空集合没有可发送的消息");
            return;
        }

        long id;
        String to, msgJson;

        // redis工具类
        RedisUtil<String> redisUtil = new RedisUtil<>(redisTemplate);

        for (MqMsg msg : listOfMqMsg) {
            // id
            id = msg.getId();
            // 接收者 to
            to = msg.getTo();
            // json
            msgJson = JSON.toJSONString(msg);
            // 创建普通队列并绑定至普通交换机
            createNormalQueueAndBind.create(to);

            // 发布确认回调序号
            CorrelationData correlationData = new CorrelationData(String.valueOf(id));
            rabbitTemplate.convertAndSend(
                    MqConstants.NORMAL_EXCHANGE,
                    to,
                    msgJson,
                    correlationData);

            // 存入缓存
            if (redisUtil.rstring.add(correlationData.getId(), msgJson)) {
                log.info("消息{} 已存入缓存", correlationData.getId());
            }
        }

        log.info("{} 条消息已发送", listOfMqMsg.size());
    }
}
  • FastJson2 中 List< MqMsg > 的序列化与反序列化
// 序列化
List<MqMsg> list = new ArrayList<>();
list.add(new MqMsg(...));
list.add(new MqMsg(...));
list.add(new MqMsg(...));
String listJson = JSON.toJSONString(list)


// 反序列化
List<MqMsg> list = JSON.parseArray(listJson, MqMsg.class);

11、项目结构图

在这里插入图片描述

六、测试

1、启动后自动创建交换机、队列

  • 启动前

在这里插入图片描述

在这里插入图片描述

  • 启动后

在这里插入图片描述

在这里插入图片描述

2、向用户user1发送消息

http://localhost:8080/sendMsg/user1/测试1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、登录user1接收消息

http://localhost:8080/login/user1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4、向user2user3分别发消息

当前登录用户是user1向用户user2user3发送消息时user1是接收不到的消息会存储在相应队列中

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

5、切换用户user3

user1切换到user3同时监控的队列由q_user1动态切换到q_user3队列q_user3中的消息将被消费掉

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

6、再次向user1发消息

当前登录用户是user3这时再向上一个登录用户user1发消息消息应该不会被消费存在q_user1队列中

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

7、备份交换机接收消息

  • 本案例中发送消息前先动态创建队列一般不会出现信息无法路由的情况也就不会因为无法路由而启动备份交换机除非一些极端的情况

  • 当普通队列满的时候再向其发送消息最老的信息变为死信进入死信交换机本案例中把备份交换机当成死信交换机备份队列当成死信队列。

  • user2连发10条信息且user2不登录队列q_user2容量为5溢出后最老的信息进入备份交换机发送前先删除原来的q_user2队列

在这里插入图片描述

在这里插入图片描述

  • 启动测试

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • 改变溢出规则默认是丢弃最老(开头)的消息

在这里插入图片描述

在创建队列时加上overflow(QueueBuilder.Overflow.rejectPublish)改为拒绝接收新消息此时生产者会收到拒绝接收的消息提示缓存中的消息将不会被删除而是定时重发备份交换机也不会启用拒绝的信息不会进入备份队列不符合设计要求所以使用默认的溢出规则。

七、不足

  • 每个用户对应一个普通队列当用户过多时相应的队列也会很多并且队列是持久化的会占用较多的系统资源解决思路定时删除空队列

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: SpringRabbitMQ

“SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列、死信、备份交换机” 的相关文章