SpringCloud系列(十二)[MQ 篇] - 一篇文章搞定 RabbitMQ 的使用

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

上篇文章已经讲过了 MQ 的概念及 RabbitMQ 的用途和部署, 本篇文章将对 RabbitMQ 进行深一步的理解和使用, 主要写的内容是 RabbitMQ 的结构及消息模型是什么样的, 并且以例子的形式来讲解 SpringAMQP 的使用.

RabbitMQ

🐇 RabbitMQ 的结构

  关于 RabbitMQ 的基本结构如下图所示:

在这里插入图片描述



🐇🐇RabbitMQ 的消息模型

在这里插入图片描述
补充 Headers Exchange (头交换机):
除此之外还有头交换机, 关于头交换机的模型用到的不做, 只需要知道其原理即可;有时消息的路由操作会涉及到多个属性此时使用消息头就比用路由键更容易表达头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则, 通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
我们可以绑定一个队列到头交换机上并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中消息代理得从应用开发者那儿取到更多一段信息换句话说它需要考虑某条消息(message)是需要部分匹配还是全部匹配。当然会有一个 “x-match” 参数, 当 “x-match” 设置为 “any” 时, 消息头的任意一个值被匹配就可以满足条件而当 “x-match” 设置为 “all” 的时候就需要消息头的所有值都匹配成功才能满足条件。
  另外, 头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作不同之处在于头交换机的路由规则是建立在头属性值之上而不是路由键。路由键必须是一个字符串而头属性值则没有这个约束它们甚至可以是整数或者哈希值(字典)等。



🐇🐇🐇SpringAMQP

SpringAMQP 是基于 RabbitMQ 封装的一套模板, 并且还利用 SpringBoot 对其实现了自动装配, 关于 SpringAMQP 的官方网址:☞SpringAMQP.

关于 AMQP 和 SpringAMQP 的概念:

  • AMQP: 全称 Advanced Message Queuing Protocol, 是用于在应用程序之间传递业务消息的开放标准, 该协议与语言和平台无关, 更符合微服务中独立性的要求;
  • SpringAMQP: 其是基于 AMQP 协议定义的一套 API 规范, 提供了模板来发送和接收消息, 主要包含两部分的内容: spring-amqp 为基础抽象; spring-rabbit 是底层的默认实现.主要提供了以下三个功能:
    1 自动声明队列 / 交换机及其绑定关系;
    2 基于注解的监听器模式, 异步接收消息;
    3 封装了 RabbitTemplate 工具, 用于发送消息.



🐇🐇🐇🐇各种模型的具体实现

各种模型的具体实现的前提是你的虚拟机已经部署了 RabbitMQ 并启动.具体部署步骤请看此文章:SpringCloud系列十[MQ 篇] - RabbitMQ 初步学习及详细部署步骤.

前提: 启动 RabbitMQ;
在这里插入图片描述
如果没有启动, 则执行指令 docker ps -a 找到容器的 ID, 然后执行 docker start [ID] 启动.
启动成功后进行登录, 如下图所示:
在这里插入图片描述


🐰 BasicQueue 简单队列模型

模型:
在这里插入图片描述


前提: 新建一个 simple.queue 队列;
在这里插入图片描述


消息发送:
步骤一: 在父工程 pom.xml 中引入 AMQP 依赖 (注意这里面已经包含了 RabbitMQ);

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

步骤二: 配置 RabbitMQ 地址, 在发布者(Publisher) 服务的 yml 文件中添加配置:

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 172.16.**.**  # rabbitMQ 的 ip 地址, 也就是你的虚拟机 ip 地址
    port: 5672  # 端口号
    username: myRabbitMQ
    password: 123456 # 这里的用户名和密码是 docker 运行 RabbitMQ 容器时自己设定的, 具体指令看上篇文章
    virtual-host: /

步骤三: 在 Publisher 服务中编写测试类, 并利用 RabbitTemplate 实现消息的发送;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

消息接收:
步骤一: 配置 RabbitMQ 地址, 在消费者(Consumer) 服务的 yml 文件中添加配置:

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 172.16.**.***  # rabbitMQ 的 ip 地址
    port: 5672  # 端口
    username: myRabbitMQ
    password: 123456
    virtual-host: /
    listener:
      direct:
        prefetch: 1

步骤二: 在 consumer 服务中新建 listener 包, 并新建 SpringRabbitListener 类用来接收消息;

@Component
public class SpringRabbitListener {
    @RabbitListener(queues="simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到 simple.queue 的消息: [" + msg + "]");
    }
}

测试:
启动 consumer 服务, 运行 publisher 中的测试代码, 进行消息的发送和接收;
在这里插入图片描述
在这里插入图片描述


🐰🐰 WorkQueue 工作消息队列模型

WorkQueue 又称 TaskQueue 任务模型, 也就是说让多个消费者绑定到一个队列来共同消费队列中的消息; 主要针对的问题是当消息处理比较耗时的时候, 可能生产消息的速度远远大于消息的消费速度, 这样队列中会堆积越来越多的消息无法及时处理, 而此模型多个消费者共同处理消息, 速度就能大大提高.
模型:
在这里插入图片描述


前提: 新建一个 work.queue 队列;
在这里插入图片描述


消息发送:
因为要解决的就是大量消息的堆积问题, 因此这里我们循环发送信息;

@Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
        String queueName = "work.queue";
        String message = "hello, message --- ";
        for (int i = 0; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }

消息接收:
消息的接收也要模拟两个消费者共同绑定一个队列, 于是在 consumer 的 SpringRabbitListener 中添加了两个方法:

@RabbitListener(queues="work.queue")
    public void listenSimpleQueue1(String msg) throws InterruptedException {
        System.out.println("消费者11111接收到 work.queue 的消息: [" + msg + "]" + LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues="work.queue")
    public void listenSimpleQueue2(String msg) throws InterruptedException {
        System.err.println("消费者22222接收到 work.queue 的消息: [" + msg + "]" + LocalTime.now());
        Thread.sleep(200);
    }

测试:
在这里插入图片描述
在这里插入图片描述
通过上面的结果可以看得出来, consumer1 很快就完成了自己的消息, 而 consumer2 在缓慢的处理自己的消息; 也就是说 consumer1 和 consumer2 都是处理相同数量的消息, 但是如果考虑到消费者的处理能力, 这样显然是有问题的, 因此最好的方式就是"能者多劳", 处理能力强的 consumer 处理的数量多才对, 因此需要在 consumer 的 yml 文件中需要添加以下配置, 目的是每个 consumer 每次只能获取一个消息, 只有处理完成了才能获取下一个消息.
在这里插入图片描述
测试结果如下:
在这里插入图片描述
很明显, consumer1 的处理能力比较强, 因此处理的消息也就比 consumer2 多得多.


总之:

  • 多个消费者绑定到一个队列上, 同一条消息只会被一个消费者处理;
  • 通过在 yml 中设置 prefetch 来控制消费者预取的消息数量.


🐰🐰🐰 Fanout Exchange 扇形交换机(广播模型)

在这里插入图片描述


消息发送:
在广播模式下, 消息的发送流程如下:

  • 可以有多个队列, 但是多个队列都绑定到了 Exchange 交换机上;
  • 发布者发送的消息只能发送到交换机, 交换机来决定要发给哪个队列, 发布者无法决定;
  • 交换机将消息发送到绑定过的所有队列, 订阅这些队列的消费者就可以拿到消息;
  • 交换机不能缓存消息, 当路由失败时, 消息也就丢失了.

步骤一: 声明队列和交换机
在 consumer 的 config 包中新建 FanoutConfig 类, 声明两个队列和一个交换机;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("myrabbitmq.fanout");
    }

    /**
     * 声明第一个队列
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 声明第二个队列
     */
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

步骤二: 消息发送
在 publisher 测试类中添加消息发送的测试方法;

@Test
    public void testFanoutExchange() {
        // 交换机名称
        String exchangeName = "myrabbitmq.fanout";
        String message = "您好, 我是 FanoutExchange";
        // FanoutExchange 路由键设置为空即可
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }

步骤三: 消息接收

/**
     * FanoutExchange 消息接收
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("consumer1 接收到 Fanout 的消息: [" + msg + "]");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("consumer2 接收到 Fanout 的消息: [" + msg + "]");
    }

测试:
在这里插入图片描述
在这里插入图片描述
在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述


🐰🐰🐰🐰 Direct Exchange 直连交换机(路由模型)

在广播模式下我们可以知道一条消息可以被多个消费者订阅, 但是在一些场景下我们希望不同的消息被不同的队列消费, 这时候 Direct Exchange 就出现了.
在这里插入图片描述


**消息发送:**

在 Direct Exchange 模式下:

  • 队列与交换机的绑定不再是任意绑定了, 而是需要指定一个 路由键 (RoutingKey);
  • 发布者向 Exchange 发送消息时, 也必须指定消息的路由键;
  • 交换机不再讲消息给每一个绑定的队列, 而是根据消息的路由键进行判断, 只有队列的路由键和消息的路由键完全一致才会接收到消息.

步骤一: 在 publisher 服务的测试类中添加消息发送测试;

/**
     * DirectExchange
     */
    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "myrabbitmq.direct";
        String message = "您好, 这里是 DirectExchange";

        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"routingkey1",message);
    }

消息接收:
步骤一: 声明队列和交换机 (这次我们基于注解的形式来进行声明)
在 consumer 的 SpringRabbitListener 中添加两个消费者同时基于注解来声明队列和交换机;

/**
     * 1. 基于注解的方式声明队列和交换机
     * 2. DirectExchange 消息接收
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "myrabbitmq.direct", type = ExchangeTypes.DIRECT),
            key = {"routingkey1","routingkey2"} // 可以设置任意个路由键
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println("消费者接收到了 direct.queue1 的消息: [" + msg + "]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "myrabbitmq.direct", type = ExchangeTypes.DIRECT),
            key = {"routingkey3","routingkey4"} // 可以设置任意个路由键
    ))
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者接收到了 direct.queue2 的消息: [" + msg + "]");
    }

测试:
路由键为 routingkey1 时发送消息:
在这里插入图片描述
路由键为 routingkey3 时发送消息:
在这里插入图片描述
在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:
在这里插入图片描述
绑定信息:
在这里插入图片描述
在这里插入图片描述

🐰🐰🐰🐰🐰 Topic Exchange 主题交换机(主题模型)

TopicExchange 类型的 Exchange 与 DirectExchange 相比都是可以根据 RoutingKey 把消息路由到不同的队列。只不过TopicExchange 类型的 Exchange 可以让队列在绑定 Routing key 的同时使用通配符
在这里插入图片描述
这里的 RoutingKey 一般都是由一个或者多个单词组成, 多个单词之间以 “.” 分割, 规则如下:

  • #: 匹配一个或多个词, 如 topic.# 能够匹配 topic.aa.bb 或者 topic.aa;
  • *: 匹配一个词, 只能匹配成 topic.aa 这种形式

消息发送:

/**
     * TopicExchange
     */
    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "myrabbitmq.topic";
        String message = "您好, 这里是 TopicExchange";
        
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"AABBcc.key1",message);
    }

消息接收:

/**
     * 1. 基于注解的方式声明队列和交换机
     * 2. TopicExchange 消息接收
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "myrabbitmq.topic", type = ExchangeTypes.TOPIC),
            key = "#.key1" // 所有后缀是 key1 的都可以接收到
    ))
    public void listenTopicQueue1(String msg) {
        System.out.println("消费者接收到了 topic.queue1 的消息: [" + msg + "]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "myrabbitmq.topic", type = ExchangeTypes.TOPIC),
            key = "key2.*" // 类似于 key2.aa  /  key2.bb 这样的可以接收到
    ))
    public void listenTopicQueue2(String msg) {
        System.out.println("消费者接收到了 topic.queue2 的消息: [" + msg + "]");
    }

测试:
在这里插入图片描述
在这里插入图片描述
在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:
在这里插入图片描述
在这里插入图片描述


🐇🐇🐇🐇🐇 关于消息的序列化问题

我们在消息发送的时候使用 convertAndSend 这个函数关于信息是 Object 类型, Spring会把你发送的消息序列化为字节发送给 RabbitMQ 接收消息的时候, 会把字节反序列化为 Java 对象, 也就是说使用的是 Spring 默认的 JDK 序列化方式, 但是会存在以下问题:

  • 数据体积过大;
  • 有安全漏洞;
  • 可读性差劲.

解决方式:


使用 JSON 的方式来序列化和反序列化, 具体方式如下:
1 在 publisher 和 consumer 两个服务中引入依赖:

 `<dependencies>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.9.10</version>
        </dependency>
  </dependencies>`

2 在启动类中添加 Bean

@Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: SpringRabbitMQ

“SpringCloud系列(十二)[MQ 篇] - 一篇文章搞定 RabbitMQ 的使用” 的相关文章