RabbitMQ基础篇 笔记-CSDN博客
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
RabbitMQ
余额支付
同步调用
一步一步的来支付业务写完后如果之后加需求还需要增加代码不符合开闭原则。
性能上也有问题openfeign是同步调用性能太差。
同步调用耦合太多。
同步的优势是可以立即得到结果例如查询查到了就能知道结果。但是拓展性差性能下降级联失败。
异步调用
异步调用时基于消息通知的方式一般包括3个角色。
-
消息发送者
-
消息代理
-
消息接收值
发送者发送东西到消息代理消息接受者监听消息代理。类似于外卖柜
故障也会隔离。
缓存消息起到流量削峰填谷的功能。流量整形
缺点
拿不到对方执行的结果。不确定有没有执行成功。
安全依赖于mq的可靠性。broker
对于运行结果不关心的场景性能要求较高就可以使用异步。
MQ技术选型
Message QueueMQ先进先出的消息队列。MQ的技术有很多实现方案就用RabbitMQ erlang编译的
消费者与queue绑定。
生产者与exchange交换机绑定
交换机路由给queueexchange与queue构成了brokerRabbitMQ
公司可能搭建一个mq然后所有的服务都用这个。因此为了隔离创建了VirtualHost类似于数据库
快速入门
交换机是负责路由转发消息的它本身没有存储消息的能力。
必须让队列和交换机产生关系绑定
数据隔离
有虚拟主机的概念
首先创建一个用户。
不同的虚拟主机是互相隔离的。创建一个新的虚拟主机那么所有的东西都是新的类似与数据库。
使用Java操作
Spring AMQP
amqp是一种消息通信协议它是协议。Spring 提供了一套统一的amqp协议标准。定义了接口没实现只有Spring Rabbit实现了。
- 引入依赖
- 在每个微服务中引入MQ服务端信息5672端口。
- rabbitTemplate发送和接收。
生产者代码
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage2Queue() {
String queueName = "simple.queue";
String msg = "hello, amqp!";
rabbitTemplate.convertAndSend(queueName, msg);
}
}
消费者代码
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消费者收到了simple.queue的消息【" + msg +"】");
}
Work Queues
多个消费者绑定同一个queue。
每个消息只会被一个消费者消费掉。
生产者投递的消息不会考虑到消费者的处理能力。 所以需要添加 prefetch 参数。
每次只能取一条消息处理完成才能获得下一个。这样就是能者多劳。
解决消息堆积问题。
Fanout 交换机广播
真正的环境一定会有交换机的而不是直接发送到队列。交换机是有路由功能的比如多个服务监听队列只有一个服务能收到信息。但是交换机不同多个服务都可以监听到队列。
他会将接收到的消息分发给与它绑定的每一个队列。
可以为每个微服务创建队列这样每个微服务就都收到了。
@Test
void testSendFanout() {
String exchangeName = "hmall.fanout2";
String msg = "hello, everyone!";
//交换机名字routingKey消息
rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
交换机就是接受消息路由转发消息fanout就是广播。
Direct交换机定向路由
消息发给不同的人。
每一个Queue都与Exchange 设置一个 routingKey。
发送者发送消息时制定消息的 routingKey
交换机只会给相同的 routingKey的队列投递消息。
Topic 交换机
与Direct交换机类似区别是routingKey可以是多个单词的列表并且按照.分割。#类似于正则中的*。而* 则代表一个单词。
例如china.*表示所有关于china的内容。
使用SpringAMQP创建交换机队列
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange(){
// ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("hmall.fanout2");
}
@Bean
public Queue fanoutQueue3(){
// QueueBuilder.durable("ff").build();
return new Queue("fanout.queue3");
}
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue4(){
return new Queue("fanout.queue4");
}
@Bean
public Binding fanoutBinding4(){
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}
}
这种绑定太麻烦可以通过注解绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到了 direct.queue1的消息【" + msg +"】");
}
消息转换器
发送一个对象类型的消息接受后变成了一堆乱码使用了Java的序列化方式。我们需要将它转为JSON格式的。
在有 @Configuration 注解的类上
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |