RabbitMQ入门指南
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
人生永没有终点。只有等到你瞑目的那一刻才能说你走完了人生路在此之前新的第一次始终有新的挑战依然在新的感悟不断涌现。
文章目录
一、MQ与RabbitMQ概述
1. MQ简述
MQ全称Message Queue直译是消息队列是基础数据结构中 “先进先出” 的一种数据结构也是在消息的传输过程中保存消息的容器中间件多用于分布式系统之间进行通信。
一般MQ用来解决系统耦合、异步消息、流量削峰等问题实现高性能、高可用、可伸缩和最终一致性架构。AP架构
总结
-
消息队列MQ是一种中间件用于存储和传递消息。
-
分布式系统有两种通信方式直接远程调用(如OpenFeign) 和 借助第三方完成间接通信如RabbitMQ。
-
发送方称为生产者接收方称为消费者。
2. MQ的优势
MQ的优势(应用解耦、异步、削峰)
- 应用解耦提高系统容错性和可维护性
- 异步提速提升用户体验和系统吞吐量
- 削峰填谷提高系统稳定性。
1、应用解耦
2、异步提速
3、削峰填谷秒杀
使用MQ之后限制消费消息的速度为1000这样一来高峰期产生的数据势必会被积压在MQ中高峰就被“削”掉了。
但是因为消息积压在高峰期过后的一段时间内消费消息的速度还是会维持在1000直到消费完积压的消息这就叫做“填谷”从而提升系统的稳定性。
3. MQ的劣势
引入MQ会遇到下列问题
- 消息可靠性问题如何确保发送的消息至少被消费者消费一次避免消息丢失问题
- 延迟消息问题 如何实现消息的延迟投递解决方案使用延时队列、TTL、延迟队列插件实现
- 高可用问题如何避免单点MQ故障而导致的不可用问题解决方案搭建MQ集群
- 消息堆积问题如何解决数百万消息堆积无法及时消费的问题
4. 常见的MQ产品
市面上有很多MQ产品例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ、EMQ(物联网) 等也有直接使用Redis充当消息队列的场景。在实际技术选型时需要结合自身需求及MQ产品特点来综合考虑。
- Kafka: 是一个高可用性、高吞吐量的分布式消息系统。它具有持久化、持续性、可扩展性和副本机制并支持多分区和多消费者组。Kafka适用于大规模的数据流处理如日志聚合、流处理和实时数据流。在追求可用性和高吞吐能力方面Kafka是一个不错的选择。
- RocketMQ: 是一个低延迟、高吞吐量的分布式消息队列系统。它提供了可靠的消息传递机制支持高并发和高可用性的消息发布和订阅。RocketMQ适用于大规模的消息处理和异步通信场景。在追求可用性、可靠性和吞吐能力方面RocketMQ是一个较好的选择。
- RabbitMQ: 是一个可靠性较高、低延迟的开源消息队列系统。它采用AMQP协议支持多种消息模式和消息确认机制。RabbitMQ适用于可靠性要求较高的任务和通信场景。在追求可用性、可靠性和低延迟方面RabbitMQ是一个合适的选择。
追求可用性(高->低)Kafka、 RocketMQ 、RabbitMQ
追求可靠性RabbitMQ、RocketMQ
追求吞吐能力RocketMQ、Kafka
追求消息低延迟RabbitMQ、Kafka。
5. RabbitMQ兔子MQ
RabbitMQ官网http://www.rabbitmq.com/
RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。
AMQP (全称Advanced Message Queuing Protocol表示高级消息队列协议是一个网络协议是应用层协议的一个开放标、准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。2006年AMQP规范发布。类比HTTP。同类的协议还有MQTT用于物联网场景下
在RabbitMQ中有以下一些角色
-
Producer生产者生产者是指发送消息到RabbitMQ的应用程序。它创建消息并将其发送到交换器。
-
Consumer消费者消费者是指从RabbitMQ接收消息的应用程序。它基于订阅的方式从队列中获取消息并进行处理。
-
Exchange交换器交换器是消息的路由中心。当生产者发送消息时通过交换器将消息路由到一个或多个队列。
-
Queue队列队列是RabbitMQ中存储消息的地方。消费者从队列中接收消息并进行处理。
-
Binding绑定绑定将交换器与队列相关联。它定义了消息从交换器到队列的路由规则。
-
Broker代理服务器代理服务器是RabbitMQ的核心组件负责接收和传递消息。它负责处理交换器、队列、消息的路由和转发。
-
Channel信道信道是RabbitMQ使用的通信通道生产者和消费者通过信道与代理服务器进行交互。
-
Virtual Host虚拟主机虚拟主机在RabbitMQ中用于将不同的应用隔离开来。每个虚拟主机具有自己的交换器、队列和绑定。
这些角色共同组成了RabbitMQ的基本架构。生产者发送消息到交换器通过绑定将消息路由到队列消费者从队列中接收消息并进行处理。代理服务器负责消息的接收、传递和路由。信道用于生产者和消费者与代理服务器的通信而虚拟主机提供了应用隔离的环境。
RabbitMQ工作模式
文档地址https://www.rabbitmq.com/getstarted.html
RabbitMQ提供了6种工作模型但是我们常用的只有5种简单队列模型、工作队列模型、发布订阅模型广播、路由、主题。第6种RPC远程调用不属于MQ
JMS Sun公司提供一套Java操做消息队列的接口
- JMSJavaMessage ServiceJava消息服务应用程序接口即Java操作消息中间件的API
- JMS是JavaEE规范的一种类比JDBC
- 很多消息中间件都实现了JMS规范例如ActiveMQ。RabbitMQ官方没有提供JMS的实现包但是开源社区有提供。
二、RabbitMQ安装与配置
1. 基于docker快速安装RabbitMQ
扩展docker-compose安装rabbitmqhttps://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml
1、拉取镜像
docker pull rabbitmq:3.8-management
2、运行容器
docker run -di \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name rabbitmq \
--hostname my-rabbit \
-p 15672:15672 \
-p 5672:5672 \
--restart=always \
rabbitmq:3.8-management
- \ 代表换行
- -e 指定环境变量
- -e RABBITMQ_DEFAULT_USER=admin 用户名
- -e RABBITMQ_DEFAULT_PASS=123456 密码
- -v 挂载数据卷
- -p 15672:15672 用于web管理页面使用的端口 (管理员页面端口15672)
- -p 5672:5672 用于生产和消费端使用的端口通信端口也就是在代码里要使用的
- -di d后台运行i打开控制台交互
- –name mq 容器名字
- –hostname mq 这个参数在单机版mq配不配置都可以用来设置主机名搭建集群会用到
扩展启动xxx插件后面会用到这个命令
# 进入容器
docker exec -it rabbitmq /bin/bash
# 启动xxx插件
rabbitmq-plugins enable xxx
RabbitMQ管理端
管理端访问地址http://192.168.150.103:15672/
2. 创建用户和虚拟机
1、添加一个新用户
添加成功后列表会显示该用户但是这个用户没有操作权限需要为他创建一个虚拟机
2、创建虚拟机
为指定用户授权
最后该用户就可以操作这个虚拟机了
三、RabbitMQ快速入门
使用传统写法完成简单模的消息传递特点一条消息只能被一个消费者消费
官方的HelloWorld示例是基于简单消息队列模来实现的其中包括三个角色
- publisher消息发布者将消息发送到队列queue
- queue消息队列负责接受并缓存消息
- consumer订阅队列处理队列中的消息。
1. 基础环境搭建
1、创建maven工程并在pom文件中导入如下依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--SpringAMQP依赖,可以操作RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2、创建子模块publisher(生产者)、consumer消费者并编写启动类和yml配置文件
# 日志输出格式配置
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
2. publisher消息发布者实现
消息收发流程Connection连接、Channel通道、queue队列和exchange 交换机。
publisher消息发布者实现思路
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
1、编写publisher测试代码
package cn.aopmin.mq.test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者传统写法
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
public class PublisherTest {
/**
* 发送消息
*
* @throws IOException
* @throws TimeoutException
*/
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数分别是主机名、端口号、vhost虚拟主机、用户名、密码
factory.setHost("192.168.150.103");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue"; // 队列名称
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
2、在建立连接处打断点并以debug方式启动方便观察每个组件的创建
查看连接信息
回到IDEA继续按F8查看通道信息
继续按F8查看队列信息
最后直接放行程序查看队列中的消息
3. consumer消费者实现
consumer消费者实现思路
- 建立连接
- 创建Channel
- 声明队列
- 订阅消息
1、编写消费者代码
package cn.aopmin.mq.test;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者传统写法
* @author 白豆五
* @version 2023/04/27
* @since JDK8
*/
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.103");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
// 接收消息的回调函数
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
2、测试消费者启动程序后会一直执行不用的时候将程序结束即可
四、SpringAMQP与RabbitMQ工作模型
1. SpringAMQP概述
AMQP是消息中间件收发消息的协议(规范)具体实现由各个消息中间厂商实现例如 RabbitMQ
SpringAMQP是基于RabbitMQ封装的一套模板并且还利用SpringBoot对其实现了自动装配使用起来非常方便。
SpringAMQP的官方地址https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式异步接收消息
- 封装了RabbitTemplate工具用于发送消息 。
RabbitMQ工作模型简单队列模型、工作队列模型、发布订阅模型广播、路由、主题。
2. BasicQueue 基本模型简单模型
使用SpringAMQP实现简单模型的消息收发
1、在父工程中引入spring-amqp起步依赖
<!--SpringAMQP:可以操作RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
完整的pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.aopmin</groupId>
<artifactId>rabbitmq02-BasicQueue</artifactId>
<version>1.0.0</version>
<packaging>pom</packaging>
<description>springAMQP实现简单模型消息传递</description>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<relativePath/>
</parent>
<dependencies>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- SpringAMQP:可以操作RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、消息发送
2.1、在publisher服务的application.yml中添加rabbitmq配置
# RabbitMQ配置
spring:
rabbitmq:
host: 192.168.150.103 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
# 日志配置
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
2.2、在publisher服务中编写测试类SpringAmqpTest并利用RabbitTemplate实现消息发送
package cn.aopmin.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 使用SpringAMQP实现简单模型的消息发送
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
@SpringBootTest
// @RequiredArgsConstructor // 生成构造方法构造器注入,要求注入的字段必须final修饰
public class SpringAmqpTest {
/**
* RabbitTemplate是SpringAMQP中的核心类用于实现消息的发送和接收
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试简单模型的消息发送
*/
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
3、消息接收
3.1、在consumer服务的application.yml中添加rabbitmq配置
# RabbitMQ配置
spring:
rabbitmq:
host: 192.168.150.103 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
# 日志配置
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
3.2、在consumer服务的com.baidou.mq.listener
包中创建SpringRabbitListener类
package cn.aopmin.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息监听类
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
@Component
public class SpringRabbitListener {
/**
* 订阅消息
*
* @param msg 消息
* @throws InterruptedException
*/
@RabbitListener(queues = "simple.queue") // 配置要监听的队列: simple.queue
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("消费者接收到消息【" + msg + "】");
}
}
4、测试
先启动consumer服务(启动类)然后再运行publisher服务中发送消息的测试代码。
3. WorkQueue 工作模型
工作队列模型Work Queue Mode消息按照一定的策略分配给多个消费者来解决消息堆积问题适用于任务分发和负载均衡场景。
角色生产者、队列、消费者
使用SpringAMQP实现工作队列模型的消息收发
1、在消费者监听类中编写两个方法监听同一个队列模拟多个消费者。
package cn.aopmin.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息监听类
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
@Component
public class SpringRabbitListener {
/*
编写两个方法监听同一个队列可以实现多个消费者同时消费一个队列的消息
*/
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息【" + msg + "】");
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.out.println("消费者2接收到消息【" + msg + "】");
}
}
2、模拟生产者发多条消息
/**
* 测试工作模型发消息
*/
@Test
public void testWork() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello,rabbitmq";
// 模拟发送100条消息
for (int i = 1; i <= 100; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
}
System.out.println("消息发送完毕!");
}
3、测试先启动消费者服务再执行生产者发送消息的代码。
消费者预取消息限制
工作模型默认一人一半消息可以通过修改消费者application.yml文件配置prefetch属性控制消费者预取消息的上限
# RabbitMQ配置
spring:
rabbitmq:
host: 192.168.150.103 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
listener:
simple:
prefetch: 1 # 消息预取策略(每次获取一条消息处理完后再获取下一条)
prefetch属性用于指定消费者一次从RabbitMQ服务器预取的消息数量。通过限制预取消息的数量你可以控制每个消费者同时处理的消息数量从而实现负载均衡和资源控制。
4. Publish、Subscribe 发布订阅模型
发布订阅模型特点 可以通过交换机(exchange)将一条消息发给多个队列(消费者)进行处理。
常见的exchange类型包括Fanout广播、Direct路由、Topic主题。
交换机的主要作用
- 接收生产者发送的消息
- 将消息按照规则路由到绑定过的队列中
- 它不能缓存消息路由失败消息丢失
SpringAMQP提供了一个Exchange接口来表示所有不同类型的交换机
4.1 Fanout 广播模型
Fanout Exchange交换机会把收到的消息发送给绑定过的所有队列。队列需要与交换机建立关系然后才能收到对应消息
接下来使用SpringAMQP演示Fanout Exchange收发消息
1、在consumer服务中利用代码声明队列、交换机并将两者绑定
package cn.aopmin.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 在消费端声明队列、交换机、绑定关系,这样就不用在rabbitmq管理页面手动创建了,这样在服务启动后springAMQP会自动创建
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
@Configuration
public class MqConfig {
/**
* 声明Fanout交换机
* 交换机名: exchange.fanout
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchange.fanout");
}
/**
* 声明队列
* 队列名: fanout.queue1
*/
@Bean
public Queue queue1() {
return new Queue("fanout.queue1");
}
/**
* 声明队列
* 队列名: fanout.queue2
*/
@Bean
public Queue queue2() {
return new Queue("fanout.queue2");
}
/**
* 绑定关系
* 将队列1绑定到Fanout交换机上
*/
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue queue1) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
/**
* 绑定关系
* 将队列2绑定到Fanout交换机上
*/
@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue queue2) {//参数注入,即参数名就是bean的名字
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
消费者application.yml配置
# RabbitMQ配置
spring:
rabbitmq:
host: 192.168.150.103 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
# 日志配置
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
2、在consumer服务中编写两个消费者方法分别监听fanout.queue1和fanout.queue2
package cn.aopmin.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息监听类
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
@Component
public class FanoutListener {
/*
编写两个方法,分别监听队列1和队列2
*/
@RabbitListener(queues = "fanout.queue1")
public void listenerFanoutQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenerFanoutQueue2(String msg) throws InterruptedException {
System.out.println("消费者2接收到消息【" + msg + "】");
}
}
编写监听类后启动消费者服务会自动创建交换机和队列组件
3、在publisher的测试类中编写向exchange.fanout发消息的代码
/**
* 测试广播模式发送消息
*/
@Test
public void testFanout() {
// 交换机名称
String exchangeName = "exchange.fanout";
// 消息
String message = "hello,rabbitmq";
// 发送消息
// 第一个参数是交换机名称
// 第二个参数是routingKey(路由key),在广播模式下不需要指定
// 第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
System.out.println("消息发送完毕!");
}
执行测试方法查看运行结果
4.2 Direct 路由模型
Direct exchange会将接收到的消息按照规则Routing key转发到指定的队列因此称为路由模式(routes)
在交换机上做了一层规则判断操作。
Fanou模型要求
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时指定消息的RoutingKey
- Exchange会将消息路由到BindingKey与消息RoutingKey一致的队列上。
基于AMQP演示Direct模型
1、在consumer服务的监听类中编写两个消费者方法并在方法上通过@RabbitListener组合注解声明Exchange、Queue、RoutingKey然后分别监听direct.queue1和direct.queue2队列中的消息
package cn.aopmin.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.xml.ws.BindingType;
/**
* 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
@Component
public class DirectListener {
/**
* 在监听方法上通过注解的方式声明交换机和队列、及绑定关系
* 队列: 通过@Queue注解创建队列
* 交换机: 通过@Exchange注解创建交换机
* 绑定关系: 通过bindingkey绑定{"blue", "red"}
*
* @param msg
* @throws InterruptedException
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),//创建队列
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机
key = {"blue", "red"} // bindingkey
))
public void listenDirect1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),//创建队列
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机
key = {"yellow", "red"} // bindingkey
))
public void listenDirect2(String msg) throws InterruptedException {
System.out.println("消费者2接收到消息【" + msg + "】");
}
}
2、在publisher中编写测试方法向exchange.direct发送消息
/**
* 测试路由模式发送消息
*/
@Test
public void testDirect() {
// 交换机名称
String exchangeName = "exchange.direct";
// 消息
String message = "helloworld!";
// 发送消息
// 第一个参数是交换机名称
// 第二个参数是routingKey(路由key,发消息时候用的),在路由模式下需要指定
// 第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName, "blue", "routingKey:blue ---" + message);
rabbitTemplate.convertAndSend(exchangeName, "red", "routingKey:red ---" + message);
System.out.println("消息发送完毕!");
}
3、测试启动消费者服务创建交换机和队列然后执行生产者发消息方法
小节
1、Direct与Fanout交换机的区别
-
Fanout相对于Direct更灵活些。
-
Fanout交换机不做判断收到消息就会广播给绑定的队列。
-
Direct交换机会根据RouthingKey判断然后路由给满足规则的队列。
-
在Direct模型中如果多个队列都有相同的RouthingKey则与Fanout功能类似。
2、基于@RabbitListeneri注解声明队列和交换机有哪些常见注解
- @QueueBinding 绑定关系
- @Queue 队列
- @Exchange 交换机
4.3 Topic 主题模型
Topic Exchange 与 Direct Exchange类似区别在与RoutingKey必须是多个单词组成并且以==.== 分割。用的最多
队列与交换机指定BindingKey时可以使用通配符
#
表示匹配0或多个单词例如 china.# 、#.new*
表示匹配1个单词
- Queue1绑定的是
java.#
因此凡是以java.
开头的routing key
都会被匹配到例如 java.news、java.blog - Queue2绑定的是
#.news
因此凡是以.news
结尾的routing key
都会被匹配到例如java.news、weather.news、heihe.weather.news。
基于AMQP演示Topic 模型
1、在消费端的监听方法上声明交换机和队列、及绑定关系
package cn.aopmin.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.xml.ws.BindingType;
/**
* 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)
*
* @author 白豆五
* @version 2023/07/2
* @since JDK8
*/
@Component
public class TopicListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),//创建队列
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机
key = "java.#" // bindingkey
))
public void listenTopic1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),//创建队列
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机
key = "#.news" // bindingkey
))
public void listenTopic2(String msg) throws InterruptedException {
System.out.println("消费者2接收到消息【" + msg + "】");
}
}
2、在生产端编写发消息方法
/**
* 测试主题模式发送消息
*/
@Test
public void testTopic() {
// 交换机名称
String exchangeName = "exchange.topic";
// 消息
String message = "helloworld!";
// 发送消息
// 第一个参数是交换机名称
// 第二个参数是routingKey(路由key),在路由模式下需要指定
// 第三个参数是消息
rabbitTemplate.convertAndSend(exchangeName, "java.blog", message);
rabbitTemplate.convertAndSend(exchangeName, "java.news", message);
System.out.println("消息发送完毕!");
}
3、测试
小节
1、Direct和Topic交换机的区别
- 相同点两个交换机都会key进行判断。即消息的路由key与队列的绑定key进行比较
- 不同点Topic队列的绑定key支持通配符更加灵活。Direct队列的绑定key不支持通配符只能匹配具体key的消息。
5. 消息转换器
默认情况下Spring会帮我们把发送的任意对象类型消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。
但是Spring默认使用的是JDK序列化JDK序列化会存在一些问题
- 数据体积过大
- 有安全隐患
- 可读性差。
5.1 使用默认消息转换器发送Object类型消息
1、声明队列、编写监听方法
package cn.aopmin.mq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 声明队列
*
* @author 白豆五
* @version 2023/07/3
* @since JDK8
*/
@Configuration
public class MqConfig {
@Bean
public Queue ObjectQueue() {
return new Queue("object.queue");
}
}
@Component
public class ObjectListener {
@RabbitListener(queues = "object.queue")
public void listenObj(Object obj) {
System.out.println("收到消息:" + obj.toString());
}
}
2、编写发消息代码
/**
* 测试默认消息转换器收发消息(JDK序列化)
*/
@Test
public void testDefault() {
// 队列名称
String queueName = "object.queue";
// 对象消息
Map<String, Object> message = new HashMap<>();
message.put("name", "张三");
message.put("age", 18);
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("消息发送完毕!");
}
测试查看队列数据默认情况下JDK序列化的结果不直观可以把消息转成json格式发送
5.2 使用Jackson消息转换器收发JSON消息
Spring提供了org.springframework.amqp.support.converter.MessageConverter
接口来处理对象消息的转换。在AMQP中默认实现是SimpleMessageConverter而SimpleMessageConverter它基于JDK的ObjectOutputStream完成序列化。
如果我们不想使用默认的消息转换器只需在生产端和消费端配置MessageConverter类型的Bean即可。
1、生产端和消费端都引入jackson依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
2、生产端和消费的都配置消息转换器
package cn.aopmin.mq;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
//生产端
@SpringBootApplication
public class PublisherApp {
public static void main(String[] args) {
SpringApplication.run(PublisherApp.class, args);
}
// 使用json序列化机制进行消息转换
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
package cn.aopmin.mq;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
//消费端
@SpringBootApplication
public class ConsumerApp {
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
// 使用json序列化机制进行消息转换
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
3、修改消费端的监听方法
@Component
public class ObjectListener {
@RabbitListener(queues = "object.queue")
public void listenObj(Map<String,Object> msg) {
System.out.println("收到消息:" + msg);
}
}
4、测试
5.3 使用默认消息转换器收发JSON消息
上一种方案配来配去非常麻烦而且一旦消息转换器不一样就不能达到想要的结果。默认情况下对于字符串类型的消息默认的JDK消息转换器会使用UTF-8编码将字符串转换为字节数组并将其作为消息体进行发送。
这样我们在发消息的时候手工将对象序列化为json字符串在接收消息时再序列化为Java对象即可。
JSON工具类
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
常用方法
- 序列化JSON.toJSONString(xxx);
- 反序列化JSONObject(str,Xxx.class);
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |