微服务的异步通信技术RabbitMQ
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
文章目录
前言
MQ的出现进一步降低了微服务模块之间的耦合度相比于同步通信而言减少了关联服务的等待时间使消息的传递更加多变灵活
不管什么东西只要被Spring整合就会变得十分简单RabbitMQ也不例外
使用SpringAMQP来实现消息收发不需要重复地配置连接参数解决了一部分“硬编码”的问题。可以说和MyBatis整合JDBC非常相似。
在以前使用原生的RabbitMQ收发消息是这样的
使用SpringAMQP后收发消息是这样的
这就是一个基本队列Basic-Queue
可以看到只要引入依赖spring-boot-starter-amqp
写好yml配置文件建立连接、创建通道的工作Spring都为我们做好了而我们要做的仅仅就是利用工具类发送、监听消息可以说相当的方便
针对不同的场景我们要使用不同的队列模型
1.WorkQueue工作队列
对于单一消费者情况简单队列当生产者每秒发送50条消息消费者每秒处理40条消息这样每秒钟就会多出10条消息无法处理由此就会产生生产过剩而导致消息堆积在队列中一旦达到队列内存的上限新来的消息就无法被处理而被丢弃。
为了提高消息处理的速度避免队列中消息的堆积可以将队列绑定多个消费者即WorkQueue
为了方便观察控制台一般这样设计
生产者
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName = "work.queue";
String message = "hello, MQQ";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
消费者
为了尽可能的模拟真实场景消费者处理消息的能力不同所以设置两个消费者的sleep参数为不同的两个时间
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息——【" + msg + "】" + "At "+LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到消息——【" + msg + "】" + "At "+ LocalTime.now());
Thread.sleep(200);
}
消息预取机制
运行后观察控制台发现所有的消息处理完时间竟然花费了差不多五秒钟很显然这样的效率是非常低下的
为何绑定了两个消费者消费消息的速度不快反慢了呢
仔细观察控制台会发现50条消息是被平均分配的两个消费者分别消费id为偶数、奇数的消息这样一想好像是处理能力差的消费者拖了后腿(200msX25=5000ms=5s)为什么会出现这样的情况
这是由于MQ存在消息预取机制即消费者会在处理之前预先拿到消息的通道然后逐个处理消息这个过程是与处理消息相隔离的
如果还有人不明白就想想使用原生的RabbitMQ时我们是怎么处理消息的
当执行完回调函数有可能消息都不会被处理这时程序会继续向下执行过段时间才会开始处理消息其实我认为这也是体现RabbitMQ异步的一个地方
这样的机制存在是保证异步性的关键通过人为的设置参数也可以将消息预取的方式做出调整来保证处理的效率就像这样
listener:
simple:
prefetch: 1
通过prefetch参数来保证消费者每次获取消息的个数以及处理完成后才能获取下一个批次的消息
进行数据预取设置后消费者在一秒之内处理完了所有的消息
由此可见对于WorkQueue中消费者的设置要进行“按劳分配”的策略才较为完美
使用工作队列WorkQueue之后处理消息的效率得到了很大的提升并且也不会出现消息堆积的情况
2.Publish&Subscribe发布-订阅
对于简单队列和工作队列模型生产者发布消息消费者一旦消费完消息就会被销毁。这样无法做到将一个消息同时发送给多个消费者。
对于一个微服务项目在支付订单的模型中当支付服务完成会发消息同时去通知短信服务、订单服务…这就需要保证消息的高可用不能一个服务消费完消息就被销毁而导致其他服务接收不到消息
如何做到将同一消息发送给多个消费者并让其各自接收到采用发布&订阅的工作模型即可
通过交换机exchange将消息路由到不同的队列中再由消费者来消费各自订阅队列中的消息
针对不同的交换机种类会有不同的发布策略
1.Fanout广播
SpringAMQPA提供了声明交换机、队列、绑定关系的API
所以使用Exchange接口下的实现类就可以实现将消息路由到每一个绑定的Queue中使用代码就会变得非常简单声明交换机并绑定队列即可
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("yu7.fanout");
}
// fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
// 绑定队列1到交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// fanout.queue2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
// 绑定队列2到交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
注意交换机只能用作消息的转发路由不能用作消息的存储一旦路由失败消息就会丢失
2.DirectExchange路由
DirectExchange会将接收到的消息根据规则路由到指定的Queue中生产者发布消息时指定消息的RoutingKey与消费者声明的bindingKey相匹配从而达到“精确制导”
为了简化开发不使用声明Bean的方式来完成配置通过@RabbitListener注解即可一键完成所以根本不需要使用配置类
生产者
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "yu7.direct";
// 消息
String message = "hello, MQ!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "A", message);
}
消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "yu7.direct", type = ExchangeTypes.DIRECT),
key = {"A", "B"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "yu7.direct", type = ExchangeTypes.DIRECT),
key = {"A", "C"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息【" + msg + "】");
}
当队列的bindingKey都相同时就变成了广播模型
3.TopicExchange话题
Topic与Direct非常相似他允许RoutingKey-BindingKey以通配符的形式进行匹配这样就可以更加有针对性的路由、订阅更多消息并且以前用多个BindingKey的情况现在只需要用一个就能解决
#代表0或者多个单词
*代指一个单词
MQ的优点
1、耦合度低:每次有新需求只需要添加对应的订阅即可
2、吞吐量提升:各自处理自己订阅的事件不需要等待执行完毕后再释放资源
3、故障隔离:因为没有强依赖中间某一环节出了问题不会影响整个流程
4、流量削峰:MQ就像—根管道大量请求来了你们给我排好队依次执行