spring boot RabbitMq基础教程-CSDN博客
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
RabbitMq
由于RabbitMQ
采用了AMQP协议因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息都可以与RabbitMQ
交互。并且RabbitMQ
官方也提供了各种不同语言的客户端。
但是RabbitMQ官方提供的Java客户端编码相对复杂一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具SpringAMQP。并且还基于SpringBoot对其实现了自动装配使用起来非常方便。
SpringAMQP提供了三个功能
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式异步接收消息
- 封装了RabbitTemplate工具用于发送消息
概念
**publisher**
生产者也就是发送消息的一方**consumer**
消费者也就是消费消息的一方**queue**
队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理**exchange**
交换机负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。**virtual host**
虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的exchange、queue
交换机
我们打开Exchanges选项卡可以看到已经存在很多交换机
我们点击任意交换机即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息
这里是由控制台模拟了生产者发送的消息。由于没有消费者存在最终消息丢失了这样说明交换机没有存储消息的能力。
队列
我们打开Queues
选项卡新建一个队列
命名为hello.queue1
再以相同的方式创建一个队列密码为hello.queue2
最终队列列表如下
此时我们再次向amq.fanout
交换机发送一条消息。会发现消息依然没有到达队列
发送到交换机的消息只会路由到与其绑定的队列因此仅仅创建队列是不够的我们还需要将其与交换机绑定。
绑定关系
点击Exchanges
选项卡点击amq.fanout
交换机进入交换机详情页然后点击Bindings
菜单在表单中填写要绑定的队列名称
相同的方式将hello.queue2也绑定到改交换机。
最终绑定结果如下
发送消息
再次回到exchange页面找到刚刚绑定的amq.fanout
点击进入详情页再次发送一条消息
回到Queues
页面可以发现hello.queue
中已经有一条消息了
点击队列名称进入详情页查看队列详情这次我们点击get message
可以看到消息到达队列了
这个时候如果有消费者监听了MQ的hello.queue1
或hello.queue2
队列自然就能接收到消息了。
用户管理
点击Admin
选项卡首先会看到RabbitMQ控制台的用户管理界面
这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima
这个用户。仔细观察用户表格中的字段如下
Name
admin
也就是用户名Tags
administrator
说明itheima
用户是超级管理员拥有所有权限Can access virtual host
/
可以访问的virtual host
这里的/
是默认的virtual host
对于小型企业而言出于成本考虑我们通常只会搭建一套MQ集群公司内的多个不同项目同时使用。这个时候为了避免互相干扰 我们会利用virtual host
的隔离特性将不同项目隔离。一般会做两件事情
- 给每个项目创建独立的运维账号将管理权限分离。
- 给每个项目创建不同的
virtual host
将每个项目的数据隔离。
交换机
。而一旦引入交换机消息发送的模式会有很大变化
可以看到在订阅模型中多了一个exchange角色而且过程略有变化
- Publisher生产者不再发送消息到队列中而是发给交换机
- Exchange交换机一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。
- Queue消息队列也与以前一样接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer消费者与以前一样订阅队列没有变化
- Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失
交换机的类型有四种
- Fanout广播将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct订阅基于RoutingKey路由key发送给订阅了消息的队列
- Topic通配符订阅与Direct类似只不过RoutingKey可以使用通配符
- Headers头匹配基于MQ的消息头匹配用的较少。
Fanout交换机
Fanout英文翻译是扇出我觉得在MQ中叫广播更合适。
在广播模式下消息发送流程是这样的
- 1 可以有多个队列
- 2 每个队列都要绑定到Exchange交换机
- 3 生产者发送的消息只能发送到交换机
- 4 交换机把消息发送给绑定过的所有队列
- 5 订阅队列的消费者都能拿到消息
我们的计划是这样的
- 创建一个名为
hmall.fanout
的交换机类型是Fanout
- 创建两个队列
fanout.queue1
和fanout.queue2
绑定到交换机hmall.fanout
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收
在consumer服务的SpringRabbitListener中添加两个方法作为消费者
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息【" + msg + "】");
}
Direct交换机
在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下
- 队列与交换机的绑定不能是任意绑定了而是要指定一个
RoutingKey
路由key - 消息的发送方在 向 Exchange发送消息时也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列而是根据消息的
Routing Key
进行判断只有队列的Routingkey
与消息的Routing key
完全一致才会接收到消息
案例需求如图
- 声明一个名为
hmall.direct
的交换机 - 声明队列
direct.queue1
绑定hmall.direct
bindingKey
为blud
和red
- 声明队列
direct.queue2
绑定hmall.direct
bindingKey
为yellow
和red
- 在
consumer
服务中编写两个消费者方法分别监听direct.queue1和direct.queue2 - 在publisher中编写测试方法向
hmall.direct
发送消息
声明队列和交换机
首先在控制台声明两个队列direct.queue1
和direct.queue2
然后声明一个direct类型的交换机命名为hmall.direct
:
然后使用red
和blue
作为key绑定direct.queue1
到hmall.direct
同理使用red
和yellow
作为key绑定direct.queue2
到hmall.direct
步骤略最终结果
消息接收
在consumer服务的SpringRabbitListener中添加方法
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息【" + msg + "】");
}
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
由于使用的red这个key所以两个消费者都收到了消息
我们再切换为blue这个key
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "最新报道哥斯拉是居民自治巨型气球虚惊一场";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
你会发现只有消费者1收到了消息
总结
描述下Direct交换机与Fanout交换机的差异
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey则与Fanout功能类似
交换机
说明
Topic
类型的Exchange
与Direct
相比都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符
BindingKey
一般都是有一个或多个单词组成多个单词之间以.
分割例如 item.insert
通配符规则
#
匹配一个或多个词*
匹配不多不少恰好1个词
举例
item.#
能够匹配item.spu.insert
或者item.spu
item.*
只能匹配item.spu
图示
假如此时publisher发送的消息使用的RoutingKey
共有四种
china.news
代表有中国的新闻消息china.weather
代表中国的天气消息japan.news
则代表日本新闻japan.weather
代表日本的天气消息
解释
topic.queue1
绑定的是china.#
凡是以china.
开头的routing key
都会被匹配到包括china.news
china.weather
topic.queue2
绑定的是#.news
凡是以.news
结尾的routing key
都会被匹配。包括:china.news
japan.news
接下来我们就按照上图所示来演示一下Topic交换机的用法。
首先在控制台按照图示例子创建队列、交换机并利用通配符绑定队列和交换机。此处步骤略。最终结果如下
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "喜报孙悟空大战哥斯拉胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消息接收
在consumer服务的SpringRabbitListener中添加方法
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息【" + msg + "】");
}
声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时队列和交换机是程序员定义的将来项目上线又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在如果不存在自动创建。
fanout示例
在consumer中创建一个类声明队列和交换机
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
direct示例
direct模式由于要绑定多个KEY会非常麻烦每一个Key都要编写一个binding
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
基于注解声明
基于@Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。
例如我们同样声明Direct模式的交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息【" + msg + "】");
}
是不是简单多了。
再试试Topic模式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息【" + msg + "】");
}
消息转换器
Spring的消息发送代码接收的消息体是一个Object
而在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。
只不过默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题
- 数据体积过大
- 有安全漏洞
- 可读性差
配置JSON转换器
显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化和反序列化。
在publisher
和consumer
两个服务中都引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意如果项目中引入了spring-boot-starter-web
依赖则无需再次引入Jackson
依赖。
配置消息转换器在publisher
和consumer
两个服务的启动类中添加一个Bean即可
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。
总结
以上的代码已上传到Github
https://github.com/onenewcode/mq-demo
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |