RocketMQ 存储优化技术 解析——图解、源码级解析
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年1月13日
🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper发过专利优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力
文章目录
RocketMQ里的存储优化
内存预分配
写入消息时CommitLog
会先从MappedFileQueue
(队列中获取一个MappedFile
MappedFile
对象的预分配过程如下图所示:
MappedFile
的创建过程是将构建好的AllocateRequest
请求添加至队列中后台运行的AllocateMappedFileService
服务线程根据队列里存在的请求执行MappedFile
映射文件的创建和预分配工作。
static class AllocateRequest implements Comparable<AllocateMappedFileService.AllocateRequest> {
private String filePath;
private int fileSize;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile MappedFile mappedFile = null;
分配时有两种策略:
- 使用
mmap
的方式创建MappedFile
实例 - 从
TransientStorePool
堆外内存池中获取相应的DirectByteBuffer
来构建MappedFile
实例
在创建并分配完成MappedFile
实例后系统还会同时将下一个MappedFile
实例也预先创建出来并保存到请求队列里下次请求时可以跳过创建MappedFile
实例的时间直接返回。
mlock
系统调用
该系统调用的功能是可以将进程使用的部分或所有的地址空间锁定在物理内存里防止其被交换到swap
空间。
对于一款消息中间件来说追求的一定是消息读写的低延时因为内存页面调出调入的时间延迟可能太长或难以预知所以就希望尽可能多地使用物理内存以提高数据读写的效率。
文件预热
做预热主要是基于如下考虑:
- 仅仅分配内存并执行
mlock
系统调用之后并不会为程序完全锁定这些内存其中的分页仍然可能是copy-on-write
的因此RocketMQ
在创建MappedFile
实例的时候会先写入一些随机值到mmap
映射出的内存空间里。
- 使用
mmap
进行内存映射之后操作系统只是建立虚拟内存地址到物理地址的映射表但没有加载任何文件至内存中。程序想要访问数据时操作系统会检查该部分的分页是否已经在内存里如果不在的话则会发出一次缺页中断。RocketMQ
在做mmap
内存映射的同时进行madvise
系统调用目的是使操作系统做一次内存映射后对应的文件数据尽可能多地预加载进内存从而实现预热。
x86 Linux中的一个标准页面大小为4KB1G的commitLog需要发生256次缺页中断才能将数据完全加载进物理内存中
存储模型
1. CommitLog
消息主体及元数据的存储主体存储Producer
端写入的消息主体内容。单个文件默认大小为1GB文件名长度为20位
00000000000000000000表示第一个文件起始偏移量为0文件大小为1GB = 1073741824;00000000001073741824表示第二个文件写入是顺序写
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
protected final static int BLANK_MAGIC_CODE = -875286124;
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock;
}
2. ConsumeQueue
:
消息消费的逻辑队列其中包含了这个MessageQueue
在CommitLog
中的起始物理位置偏移量offset
、消息实体内容的大小和Message Tag
的哈希值。从实际物理存储来说ConsumeQueue
对应每个Topic
和QueueId
下面的所有文件每个文件默认大小为6MB差不多可以存储30万条消息也是顺序写入。
public class ConsumeQueue {
private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
public static final int CQ_STORE_UNIT_SIZE = 20;
private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger("RocketmqStoreError");
private final DefaultMessageStore defaultMessageStore;
private final MappedFileQueue mappedFileQueue;
private final String topic;
private final int queueId;
private final ByteBuffer byteBufferIndex;
private final String storePath;
private final int mappedFileSize;
private long maxPhysicOffset = -1L;
private volatile long minLogicOffset = 0L;
private ConsumeQueueExt consumeQueueExt = null;
}
3. IndexFile
:
用于为生成的索引文件提供访问通过消息Key值查询消息真正的实体内容。在实际的物理存储上文件名是以创建的时间戳命名的一个IndexFile
可以保存2000万个索引。
public class IndexFile {
private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
private static int hashSlotSize = 4;
private static int indexSize = 20;
private static int invalidIndex = 0;
private final int hashSlotNum;
private final int indexNum;
private final MappedFile mappedFile;
private final FileChannel fileChannel;
private final MappedByteBuffer mappedByteBuffer;
private final IndexHeader indexHeader;
}
4. MappedFileQueue
:
对连续物理存储的封装类代码中可以通过消息存储的物理偏移量快速定位offset
对应的MappedFile
。
public class MappedFileQueue {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private static final int DELETE_FILES_BATCH_MAX = 10;
private final String storePath;
private final int mappedFileSize;
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private final AllocateMappedFileService allocateMappedFileService;
private long flushedWhere = 0;
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
}
5. MappedFile
:
文件存储的直接内存映射封装类通过该类的实例可以把消息写入PageCache
或者将消息刷盘。
public class MappedFile extends ReferenceResource {
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize;
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
private String fileName;
private long fileFromOffset;
private File file;
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
}
刷盘流程
同步刷盘
只有在消息成功写入磁盘之后Broker
才会给Producer
返回一个ACK响应。RocketMQ
中主线程会首先创建一个刷盘请求实例GroupCommitRequest
并将其放到刷盘队列之后使用同步刷盘线程GroupCommitService
来执行刷盘动作。
GroupCommitService
里使用到CountDownLatch
来控制线程间同步
RocketMQ
里还使用了两个队列分别负责读和写操作实现读写分离提高并发量
同步刷盘可以保障较好的一致性一般适用于金融业务领域。
异步刷盘
只要消息写入到Pagecache
之后就可以直接返回ACK给Producer
之后后台异步线程负责将消息刷盘此时主线程并不会阻塞降低了读写延迟从而达到提高MQ性能和吞吐量的目的。