MQ消息队列详解

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

MQ消息队列详解

WWW原则

一、MQ是什么

MQ全称为Message Queue, 消息队列MQ是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息针对应用程序的数据来通信而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信而不是通过直接调用彼此来通信直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。——百度百科

1. 解耦

系统之间的耦合度越高维护起来就会越麻烦如果要降低耦合度就需要通过一种方式来达到解耦的目的而MQ就能达到此目的通过MQ多个系统之间的依赖关系由之前的强耦合变成弱耦合将关注点转嫁放到了MQ上。
总结通过一个 MQPub/Sub 发布订阅消息这么一个模型A 系统就跟其它系统彻底解耦了。

2. 异步

顾名思义通过异步的方式能够达到缩短系统响应时间异步的相反是同步同步就是请求的处理过程是串行的只有上一个处理成功后下一个才会继续处理所有中间过程处理成功了最终才返回成功。而异步是指将响应先返回给前台然后后台继续处理直到成功。

3. 削峰

一般的 MySQL扛到每秒 2k 个请求就差不多了如果每秒请求到 5k 的话可能就直接把 MySQL 给打死了导致系统崩溃用户也就没法再使用系统了。
如果使用 MQ每秒 5k 个请求写入 MQA 系统每秒钟最多处理 2k 个请求因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求每秒钟就拉取 2k 个请求不要超过自己每秒能处理的最大请求数量就 ok这样下来哪怕是高峰期的时候A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来就 2k 个请求出去结果就导致在中午高峰期1 个小时可能有几十万甚至几百万的请求积压在 MQ 中。

三、什么时候用MQ

如果系统之间耦合度很高维护起来很麻烦建议使用MQ。

消息队列的有哪些

一、ActiveMQ

二、RabbitMQ

三、RocketMQ

四、Kafka

消息队列对比

ActiveMQ

特性ActiveMQ
单机吞吐量万级比 RocketMQ、Kafka 低一个数量级
topic 数量对吞吐量的影响
时效性微秒级这是 RabbitMQ 的一大特点延迟最低
可用性高基于主从架构实现高可用
消息可靠性基本不丢
功能支持基于 erlang 开发并发能力很强性能极好延时很低

RabbitMQ

特性ActiveMQ
单机吞吐量万级比 RocketMQ、Kafka 低一个数量级
topic 数量对吞吐量的影响
时效性ms 级
可用性高基于主从架构实现高可用
消息可靠性有较低的概率丢失数据
功能支持MQ 领域的功能极其完备

RocketMQ

特性ActiveMQ
单机吞吐量10 万级支撑高吞吐
topic 数量对吞吐量的影响topic 可以达到几百/几千的级别吞吐量会有较小幅度的下降这是 RocketMQ 的一大优势在同等机器下可以支撑大量的 topic
时效性ms 级
可用性非常高分布式架构
消息可靠性经过参数优化配置可以做到 0 丢失
功能支持MQ 功能较为完善还是分布式的扩展性好

kafka

特性ActiveMQ
单机吞吐量10 万级高吞吐一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响topic 从几十到几百个时候吞吐量会大幅度下降在同等机器下Kafka 尽量保证 topic 数量不要过多如果要支撑大规模的 topic需要增加更多的机器资源
时效性延迟在 ms 级以内
可用性非常高分布式一个数据多个副本少数机器宕机不会丢失数据不会导致不可用
消息可靠性同 RocketMQ
功能支持功能较为简单主要支持简单的 MQ 功能在大数据领域的实时计算以及日志采集被大规模使用

综上各种对比之后有如下建议

  1. 一般的业务系统要引入 MQ最早大家都用ActiveMQ但是现在确实大家用的不多了没经过大规模吞吐量场景的验证社区也不是很活跃。

  2. 后来开始用 RabbitMQerlang 语言阻止了大量的 Java工程师去深入研究和掌控它几乎处于不可控的状态但是确实人家是开源的比较稳定的支持活跃度也高。

  3. 现在确实越来越多的公司会去用 RocketMQ确实很不错毕竟是阿里出品但社区可能有停止维护的风险目前 RocketMQ 已捐给Apache但 GitHub 上的活跃度其实不算高。

  4. 中小型公司技术实力较为一般技术挑战不是特别高用 RabbitMQ 是不错的选择大型公司基础架构研发实力较强用 RocketMQ 是很好的选择。

  5. 如果是大数据领域的实时计算、日志采集等场景用 Kafka是业内标准的绝对没问题社区活跃度很高几乎是全世界这个领域的事实性规范。

消息队列有哪些优缺点

缺点有以下几个

一、系统可用性降低

系统引入的外部依赖越多越容易挂掉。引入MQ组件会对原有系统增加不可确定性因素MQ组件本身也存在突然挂掉的风险会对依赖它的系统造成影响。

二、系统复杂度提高

引入MQ会造成系统的复杂度可能提升一个量级同时也会存在很多问题需要通过架构层面来进行规避
常见的问题如下

1. 重复消费问题

首先比如 RabbitMQ、RocketMQ、Kafka都有可能会出现消息重复消费的问题正常。因为这问题通常不是 MQ 自己保证的是由我们开发来保证的。挑一个 Kafka 来举个例子说说怎么重复消费吧。

Kafka 实际上有个 offset 的概念就是每个消息写进去都有一个 offset代表消息的序号然后 consumer 消费了数据之后每隔一段时间定时定期会把自己消费过的消息的 offset 提交一下表示“我已经消费过了下次我要是重启啥的你就让我继续从上次消费到的 offset 来继续消费吧”。
但是凡事总有意外比如我们之前生产经常遇到的就是你有时候重启系统看你怎么重启了如果碰到点着急的直接 kill 进程了再重启。这会导致 consumer 有些消息处理了但是没来得及提交 offset尴尬了。重启之后少数消息会再次消费一次。
举个栗子。
有这么个场景。数据 1/2/3 依次进入 KafkaKafka 会给这三条数据每条分配一个 offset代表这条数据的序号我们就假设分配的 offset 依次是 152/153/154。消费者从 Kafka 去消费的时候也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据刚准备去提交 offset 到 Zookeeper此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交Kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后消费者会找 Kafka 说嘿哥儿们你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功那么数据 1/2 会再次传过来如果此时消费者没有去重的话那么就会导致重复消费。

注意新版的 Kafka 已经将 offset 的存储从 Zookeeper 转移至 Kafka brokers并使用内部位移主题 __consumer_offsets 进行存储。

怎么保证消息队列消费的幂等性

比如你拿个数据要写库你先根据主键查一下如果这数据都有了你就别插入了update 一下好吧。
比如你是写 Redis那没问题了反正每次都是 set天然幂等性。
比如你不是上面两个场景那做的稍微复杂一点你需要让生产者发送每条数据的时候里面加一个全局唯一的 id类似订单 id 之类的东西然后你这里消费到了之后先根据这个 id 去比如 Redis 里查一下之前消费过吗如果没有消费过你就处理然后这个 id 写 Redis。如果消费过了那你就别处理了保证别重复处理相同的消息即可。
比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了重复数据插入只会报错不会导致数据库中出现脏数据。

2. 消息丢失问题

用 MQ 有个基本原则就是数据不能多一条也不能少一条不能多就是前面说的重复消费和幂等性问题。不能少就是说这数据别搞丢了。那这个问题你必须得考虑一下。

数据的丢失问题可能出现在生产者、MQ、消费者中咱们从 RabbitMQ 和 Kafka 分别来分析一下吧。

2.1. RabbitMQ
2.1.1. 生产者弄丢了数据

生产者将数据发送到 RabbitMQ 的时候可能数据就在半路给搞丢了因为网络问题啥的都有可能。
此时可以选择用 RabbitMQ 提供的事务功能就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect() 然后发送消息如果消息没有成功被 RabbitMQ 接收到那么生产者会收到异常报错此时就可以回滚事务 channel.txRollback() 然后重试发送消息如果收到了消息那么可以提交事务 channel.txCommit() 。

try {
    // 通过工厂创建连接
    connection = factory.newConnection();
    // 获取通道
    channel = connection.createChannel();
    // 开启事务
    channel.txSelect();

    // 这里发送消息
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

    // 模拟出现异常
    int result = 1 / 0;

    // 提交事务
    channel.txCommit();
} catch (IOException | TimeoutException e) {
    // 捕捉异常回滚事务
    channel.txRollback();
}

但是问题是RabbitMQ 事务机制同步一搞基本上吞吐量会下来因为太耗性能。

所以一般来说如果你要确保说写 RabbitMQ 的消息别丢可以开启 confirm 模式在生产者那里设置开启 confirm 模式之后你每次写的消息都会分配一个唯一的 id然后如果写入了 RabbitMQ 中RabbitMQ 会给你回传一个 ack 消息告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息会回调你的一个 nack 接口告诉你这个消息接收失败你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态如果超过一定时间还没接收到这个消息的回调那么你可以重发。
事务机制和 confirm 机制最大的不同在于事务机制是同步的你提交一个事务之后会阻塞在那儿但是 confirm 机制是异步的你发送个消息之后就可以发送下一个消息然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失都是用 confirm 机制的。

已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的即这两种模式是不能共存的。

客户端实现生产者 confirm 有 3 种方式

  1. 普通 confirm 模式每发送一条消息后调用 waitForConfirms() 方法等待服务器端 confirm如果服务端返回 false 或者在一段时间内都没返回客户端可以进行消息重发。
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if (!channel.waitForConfirms()) {
    // 消息发送失败
    // ...
}
  1. 批量 confirm 模式每发送一批消息后调用 waitForConfirms() 方法等待服务端 confirm。
channel.confirmSelect();
for (int i = 0; i < batchCount; ++i) {
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if (!channel.waitForConfirms()) {
    // 消息发送失败
    // ...
}
  1. 异步 confirm 模式提供一个回调方法服务端 confirm 了一条或者多条消息后客户端会回调这个方法。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
});

while (true) {
    long nextSeqNo = channel.getNextPublishSeqNo();
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
    confirmSet.add(nextSeqNo);
}
2.1.2. RabbitMQ 弄丢了数据

就是 RabbitMQ 自己弄丢了数据这个你必须开启 RabbitMQ 的持久化就是消息写入之后会持久化到磁盘哪怕是 RabbitMQ 自己挂了恢复之后会自动读取之前存储的数据一般数据不会丢。除非极其罕见的是RabbitMQ 还没持久化自己就挂了可能导致少量数据丢失但是这个概率较小。

设置持久化有两个步骤

  1. 创建 queue 的时候将其设置为持久化。这样就可以保证 RabbitMQ 持久化 queue 的元数据但是它是不会持久化 queue 里的数据的。

  2. 第二个是发送消息的时候将消息的 deliveryMode 设置为 2。就是将消息设置为持久化的此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行RabbitMQ 哪怕是挂了再次重启也会从磁盘上重启恢复 queue恢复这个 queue 里的数据。

注意哪怕是你给 RabbitMQ 开启了持久化机制也有一种可能就是这个消息写到了 RabbitMQ 中但是还没来得及持久化到磁盘上结果不巧此时 RabbitMQ 挂了就会导致内存里的一点点数据丢失。

所以持久化可以跟生产者那边的 confirm 机制配合起来只有消息被持久化到磁盘之后才会通知生产者 ack 了所以哪怕是在持久化到磁盘之前RabbitMQ 挂了数据丢了生产者收不到 ack 你也是可以自己重发的。

2.1.3. 消费端弄丢了数据

RabbitMQ 如果丢失了数据主要是因为你消费的时候刚消费到还没处理结果进程挂了比如重启了那么就尴尬了RabbitMQ 认为你都消费了这数据就丢了。

这个时候得用 RabbitMQ 提供的 ack 机制简单来说就是你必须关闭 RabbitMQ 的自动 ack 可以通过一个 api 来调用就行然后每次你自己代码里确保处理完的时候再在程序里 ack 一把。这样的话如果你还没处理完不就没有 ack 了那 RabbitMQ 就认为你还没处理完这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理消息是不会丢的。

为了保证消息从队列中可靠地到达消费者RabbitMQ 提供了消息确认机制。消费者在声明队列时可以指定 noAck 参数当 noAck=falseRabbitMQ 会等待消费者显式发回 ack 信号后才从内存和磁盘如果是持久化消息中移去消息。否则一旦消息被消费者消费RabbitMQ 会在队列中立即删除它。

2.2. Kafka

2.2.1. 消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况就是说你消费到了这个消息然后消费者那边自动提交了 offset让 Kafka 以为你已经消费好了这个消息但其实你才刚准备处理这个消息你还没处理你自己就挂了此时这条消息就丢咯。

这不是跟 RabbitMQ 差不多吗大家都知道 Kafka 会自动提交 offset那么只要关闭自动提交 offset在处理完之后自己手动提交 offset就可以保证数据不会丢。但是此时确实还是可能会有重复消费比如你刚处理完还没提交 offset结果自己挂了此时肯定会重复消费一次自己保证幂等性就好了。

生产环境碰到的一个问题就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下结果有的时候你刚把消息写入内存 queue然后消费者会自动提交 offset。然后此时我们重启了系统就会导致内存 queue 里还没来得及处理的数据就丢失了。

2.2.2. Kafka 弄丢了数据

这块比较常见的一个场景就是 Kafka 某个 broker 宕机然后重新选举 partition 的 leader。大家想想要是此时其他的 follower 刚好还有些数据没有同步结果此时 leader 挂了然后选举某个 follower 成 leader 之后不就少了一些数据这就丢了一些数据啊。

生产环境也遇到过我们也是之前 Kafka 的 leader 机器宕机了将 follower 切换为 leader 之后就会发现说这个数据就丢了。

所以此时一般是要求起码设置如下 4 个参数

  1. 给 topic 设置 replication.factor 参数这个值必须大于 1要求每个 partition 必须有至少 2 个副本。
  2. 在 Kafka 服务端设置 min.insync.replicas 参数这个值必须大于 1这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系没掉队这样才能确保 leader 挂了还有一个 follower 吧。
  3. 在 producer 端设置 acks=all 这个是要求每条数据必须是写入所有 replica 之后才能认为是写成功了。
  4. 在 producer 端设置 retries=MAX 很大很大很大的一个值无限次重试的意思这个是要求一旦写入失败就无限重试卡在这里了。
    我们生产环境就是按照上述要求配置的这样配置之后至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障进行 leader 切换时数据不会丢失。
2.2.3. 生产者会不会弄丢数据

如果按照上述的思路设置了 acks=all 一定不会丢要求是你的 leader 接收到消息所有的 follower 都同步到了消息之后才认为本次写成功了。如果没满足这个条件生产者会自动不断的重试重试无限次。

3. 消息传递顺序问题

三、一致性问题

可能存在系统A处理完数据直接返回成功了但系统B,C,D只有BC系统处理成功了系统D处理失败了导致数据最终不一致。

所以消息队列实际是一种非常复杂的架构你引入它有很多好处但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉做好之后你会发现系统复杂度提升了一个数量级也许是复杂了 10 倍。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6