RocketMQ 存储优化技术 解析——图解、源码级解析

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年1月13日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper发过专利优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力


文章目录

RocketMQ里的存储优化

内存预分配

写入消息时CommitLog会先从MappedFileQueue(队列中获取一个MappedFileMappedFile对象的预分配过程如下图所示:

在这里插入图片描述

MappedFile的创建过程是将构建好的AllocateRequest请求添加至队列中后台运行的AllocateMappedFileService服务线程根据队列里存在的请求执行MappedFile映射文件的创建和预分配工作。

 static class AllocateRequest implements Comparable<AllocateMappedFileService.AllocateRequest> {
        private String filePath;
        private int fileSize;
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        private volatile MappedFile mappedFile = null;

分配时有两种策略:

  1. 使用mmap的方式创建MappedFile实例
  2. TransientStorePool堆外内存池中获取相应的DirectByteBuffer来构建MappedFile实例

在创建并分配完成MappedFile实例后系统还会同时将下一个MappedFile实例也预先创建出来并保存到请求队列里下次请求时可以跳过创建MappedFile实例的时间直接返回。


mlock系统调用

该系统调用的功能是可以将进程使用的部分或所有的地址空间锁定在物理内存里防止其被交换到swap空间。

对于一款消息中间件来说追求的一定是消息读写的低延时因为内存页面调出调入的时间延迟可能太长或难以预知所以就希望尽可能多地使用物理内存以提高数据读写的效率。


文件预热

做预热主要是基于如下考虑:

  1. 仅仅分配内存并执行mlock系统调用之后并不会为程序完全锁定这些内存其中的分页仍然可能是copy-on-write的因此RocketMQ在创建MappedFile实例的时候会先写入一些随机值到mmap映射出的内存空间里。

  1. 使用mmap进行内存映射之后操作系统只是建立虚拟内存地址到物理地址的映射表但没有加载任何文件至内存中。程序想要访问数据时操作系统会检查该部分的分页是否已经在内存里如果不在的话则会发出一次缺页中断。RocketMQ在做mmap内存映射的同时进行madvise系统调用目的是使操作系统做一次内存映射后对应的文件数据尽可能多地预加载进内存从而实现预热。

x86 Linux中的一个标准页面大小为4KB1G的commitLog需要发生256次缺页中断才能将数据完全加载进物理内存中


存储模型

1. CommitLog

消息主体及元数据的存储主体存储Producer端写入的消息主体内容。单个文件默认大小为1GB文件名长度为20位

00000000000000000000表示第一个文件起始偏移量为0文件大小为1GB = 1073741824;00000000001073741824表示第二个文件写入是顺序写

public class CommitLog {
    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // End of file empty MAGIC CODE cbd43194
    protected final static int BLANK_MAGIC_CODE = -875286124;
    protected final MappedFileQueue mappedFileQueue;
    protected final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;

    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    private final FlushCommitLogService commitLogService;

    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
    protected volatile long confirmOffset = -1L;

    private volatile long beginTimeInLock = 0;

    protected final PutMessageLock putMessageLock;
}

2. ConsumeQueue

消息消费的逻辑队列其中包含了这个MessageQueueCommitLog中的起始物理位置偏移量offset、消息实体内容的大小和Message Tag的哈希值。从实际物理存储来说ConsumeQueue对应每个TopicQueueId下面的所有文件每个文件默认大小为6MB差不多可以存储30万条消息也是顺序写入。

public class ConsumeQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    public static final int CQ_STORE_UNIT_SIZE = 20;
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger("RocketmqStoreError");
    private final DefaultMessageStore defaultMessageStore;
    private final MappedFileQueue mappedFileQueue;
    private final String topic;
    private final int queueId;
    private final ByteBuffer byteBufferIndex;
    private final String storePath;
    private final int mappedFileSize;
    private long maxPhysicOffset = -1L;
    private volatile long minLogicOffset = 0L;
    private ConsumeQueueExt consumeQueueExt = null;
}

3. IndexFile

用于为生成的索引文件提供访问通过消息Key值查询消息真正的实体内容。在实际的物理存储上文件名是以创建的时间戳命名的一个IndexFile可以保存2000万个索引。

public class IndexFile {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    private final IndexHeader indexHeader;
}

4. MappedFileQueue

对连续物理存储的封装类代码中可以通过消息存储的物理偏移量快速定位offset对应的MappedFile

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    private final String storePath;

    private final int mappedFileSize;

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    private final AllocateMappedFileService allocateMappedFileService;

    private long flushedWhere = 0;
    private long committedWhere = 0;

    private volatile long storeTimestamp = 0;
}

5. MappedFile

文件存储的直接内存映射封装类通过该类的实例可以把消息写入PageCache或者将消息刷盘。

public class MappedFile extends ReferenceResource {
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    protected int fileSize;
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;
    private String fileName;
    private long fileFromOffset;
    private File file;
    private MappedByteBuffer mappedByteBuffer;
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;
}

刷盘流程

同步刷盘

在这里插入图片描述
只有在消息成功写入磁盘之后Broker才会给Producer返回一个ACK响应。RocketMQ中主线程会首先创建一个刷盘请求实例GroupCommitRequest并将其放到刷盘队列之后使用同步刷盘线程GroupCommitService来执行刷盘动作。

GroupCommitService里使用到CountDownLatch来控制线程间同步

RocketMQ里还使用了两个队列分别负责读和写操作实现读写分离提高并发量

同步刷盘可以保障较好的一致性一般适用于金融业务领域。


异步刷盘

在这里插入图片描述

只要消息写入到Pagecache之后就可以直接返回ACK给Producer之后后台异步线程负责将消息刷盘此时主线程并不会阻塞降低了读写延迟从而达到提高MQ性能和吞吐量的目的。


阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6