如果我是核酸系统架构师,我会这么用MQ。。。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
V-xinruyuan0330 获得600+页原创精品文章汇总PDF

目录

  • 一、前情提示
  • 二、保证投递消息不丢失的confirm机制
  • 三、confirm机制的代码实现
  • 四、confirm机制投递消息的高延迟性
  • 五、高并发下如何投递消息才能不丢失
  • 六、消息中间件全链路100%数据不丢失能做到吗

一、前情提示

上篇文章《选Redis做MQ的人是脑子里缺根弦儿吗》我们分析了RabbitMQ开启手动ack机制保证消费端数据不丢失的时候prefetch机制对消费者的吞吐量以及内存消耗的影响。

通过分析我们知道了prefetch过大容易导致内存溢出prefetch过小又会导致消费吞吐量过低所以在实际项目中需要慎重测试和设置。

这篇文章我们转移到消息中间件的生产端一起来看看如何保证投递到MQ的数据不丢失。

如果投递出去的消息在网络传输过程中丢失或者在RabbitMQ的内存中还没写入磁盘的时候宕机都会导致生产端投递到MQ的数据丢失。

而且丢失之后生产端自己还感知不到同时还没办法来补救。

下面的图就展示了这个问题。

在这里插入图片描述

所以本文呢我们就来逐步分析一下。


二、保证投递消息不丢失的confirm机制

其实要解决这个问题相信大家看过之前的消费端ack机制之后也都猜到了。

很简单就是生产端比如上图的订单服务首先需要开启一个confirm模式接着投递到MQ的消息如果MQ一旦将消息持久化到磁盘之后必须也要回传一个confirm消息给生产端。

这样的话如果生产端的服务接收到了这个confirm消息就知道是已经持久化到磁盘了。

否则如果没有接收到confirm消息那么就说明这条消息半路可能丢失了此时你就可以重新投递消息到MQ去确保消息不要丢失。

而且一旦你开启了confirm模式之后每次消息投递也同样是有一个delivery tag的也是起到唯一标识一次消息投递的作用。


这样MQ回传ack给生产端的时候会带上这个delivery tag。你就知道具体对应着哪一次消息投递了可以删除这条消息。

此外如果RabbitMQ接收到一条消息之后结果内部出错发现无法处理这条消息那么他会回传一个nack消息给生产端。此时你就会感知到这条消息可能处理有问题你可以选择重新再次投递这条消息到MQ去。

或者另一种情况如果某条消息很长时间都没给你回传ack/nack那可能是极端意外情况发生了数据也丢了你也可以自己重新投递消息到MQ去。

通过这套confirm机制就可以实现生产端投递消息不会丢失的效果。大家来看看下面的图一起来感受一下。

在这里插入图片描述

三、confirm机制的代码实现

下面我们再来看看confirm机制的代码实现

在这里插入图片描述

四、confirm机制投递消息的高延迟性

这里有一个很关键的点就是一旦启用了confirm机制投递消息到MQ之后MQ是不保证什么时候会给你一个ack或者nack的。

因为RabbitMQ自己内部将消息持久化到磁盘本身就是通过异步批量的方式来进行的。

正常情况下你投递到RabbitMQ的消息都会先驻留在内存里然后过了几百毫秒的延迟时间之后再一次性批量把多条消息持久化到磁盘里去。

这样做是为了兼顾高并发写入的吞吐量和性能的因为要是你来一条消息就写一次磁盘那么性能会很差每次写磁盘都是一次fsync强制刷入磁盘的操作是很耗时的。

所以正是因为这个原因你打开了confirm模式之后很可能你投递出去一条消息要间隔几百毫秒之后MQ才会把消息写入磁盘接着你才会收到MQ回传过来的ack消息这个就是所谓confirm机制投递消息的高延迟性

大家看看下面的图一起来感受一下。

在这里插入图片描述

五、高并发下如何投递消息才能不丢失

大家可以考虑一下在生产端高并发写入MQ的场景下你会面临两个问题

  • 1、你每次写一条消息到MQ为了等待这条消息的ack必须把消息保存到一个存储里。

并且这个存储不建议是内存因为高并发下消息是很多的每秒可能都几千甚至上万的消息投递出去消息的ack要等几百毫秒的话放内存可能有内存溢出的风险。

  • 2、绝对不能以同步写消息 + 等待ack的方式来投递那样会导致每次投递一个消息都同步阻塞等待几百毫秒会导致投递性能和吞吐量大幅度下降。

针对这两个问题相对应的方案其实也呼之欲出了。


首先用来临时存放未ack消息的存储需要承载高并发写入而且我们不需要什么复杂的运算操作这种存储首选绝对不是MySQL之类的数据库而建议采用kv存储。kv存储承载高并发能力极强而且kv操作性能很高。

其次投递消息之后等待ack的过程必须是异步的也就是类似上面那样的代码已经给出了一个初步的异步回调的方式。

消息投递出去之后这个投递的线程其实就可以返回了至于每个消息的异步回调是通过在channel注册一个confirm监听器实现的。

收到一个消息ack之后就从kv存储中删除这条临时消息收到一个消息nack之后就从kv存储提取这条消息然后重新投递一次即可也可以自己对kv存储里的消息做监控如果超过一定时长没收到ack就主动重发消息。

大家看看下面的图一起来体会一下
在这里插入图片描述

六、消息中间件全链路100%数据不丢失能做到吗

到此为止我们已经把生产端和消费端如何保证消息不丢失的相关技术方案结合RabbitMQ这种中间件都给大家分析过了。

其实架构思想是通用的 无论你用的是哪一种MQ中间件他们提供的功能是不太一样的但是你都需要考虑如下几点

  1. 生产端如何保证投递出去的消息不丢失消息在半路丢失或者在MQ内存中宕机导致丢失此时你如何基于MQ的功能保证消息不要丢失
  2. MQ自身如何保证消息不丢失起码需要让MQ对消息是有持久化到磁盘这个机制。
  3. 消费端如何保证消费到的消息不丢失如果你处理到一半消费端宕机导致消息丢失此时怎么办

目前来说我们初步的借着RabbitMQ举例已经把从前到后一整套技术方案的原理、设计和实现都给大家分析了一遍了。


但是此时真的能做到100%数据不丢失吗恐怕未必大家再考虑一下个特殊的场景。

生产端投递了消息到MQ而且持久化到磁盘并且回传ack给生产端了。

但是此时MQ还没投递消息给消费端结果MQ部署的机器突然宕机而且因为未知的原因磁盘损坏了直接在物理层面导致MQ持久化到磁盘的数据找不回来了。

这个大家千万别以为是开玩笑的大家如果留意留意行业新闻这种磁盘损坏导致数据丢失的是真的有的。

那么此时即使你把MQ重启了磁盘上的数据也丢失了数据是不是还是丢失了

你说我可以用MQ的集群机制啊给一个数据做多个副本比如后面我们就会给大家分析RabbitMQ的镜像集群机制确实可以做到数据多副本。

但是即使数据多副本一定可以做到100%数据不丢失


比如说你的机房突然遇到地震结果机房里的机器全部没了数据是不是还是全丢了

说这个并不是说要抬杠。而是告诉大家技术这个东西100%都是理论上的期望。

应该说我们凡事都朝着100%去做但是理论上是不可能完全做到100%保证的可能就是做到99.9999%的可能性数据不丢失但是还是有千万分之一的概率会丢失。

当然从实际的情况来说能做到这种地步其实基本上已经基本数据不会丢失了。

V-xinruyuan0330 获得600+页原创精品文章汇总PDF

另外推荐儒猿课堂的1元系列课程给您欢迎加入一起学习~

互联网Java工程师面试突击课1元专享

SpringCloudAlibaba零基础入门到项目实战1元专享

亿级流量下的电商详情页系统实战项目1元专享

Kafka消息中间件内核源码精讲1元专享

12个实战案例带你玩转Java并发编程1元专享

Elasticsearch零基础入门到精通1元专享

基于Java手写分布式中间件系统实战1元专享

基于ShardingSphere的分库分表实战课1元专享

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6