RocketMQ 是如何刷盘的?
你好,我是猿java。
在 RocketMQ 如何保证发送消息不丢失? 这篇文章中提到了刷盘,这篇文章,我们将详细介绍 RocketMQ 的刷盘机制,包括它写了哪些文件,如何写入磁盘,以及相关的源码分析和示例代码。
本文源码基于 RocketMQ 5.0
RocketMQ 刷盘原理
RocketMQ 的刷盘流程主要涉及以下几类文件:
- CommitLog 文件:存储所有消息,支持顺序写入和随机读取。
- ConsumeQueue 文件:存储消息的逻辑索引,加速消息消费。
- Index 文件:存储消息的索引信息,支持根据 Key 或时间区间快速查找消息。
CommitLog 文件
CommitLog
文件是 RocketMQ 存储消息的核心文件,所有的消息首先被顺序写入到CommitLog
文件中。
消息写入CommitLog
文件的过程如下:
- 消息到达 Broker 后,通过 DefaultMessageStore 类的 putMessage 方法写入。
- putMessage 方法调用
CommitLog
类的 putMessage 方法。 CommitLog
将消息写入到内存映射文件(MappedFile)中。- 根据刷盘策略(同步或异步),将数据刷入磁盘。
简化的写入代码如下:
1 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
读取消息时,通过消息的物理偏移量(offset)从 CommitLog
文件中读取,偏移量信息通常存储在ConsumeQueue
文件中。
ConsumeQueue 文件
因为CommitLog
文件包括当前 Broker 所有 Topic的信息,因此,为了消费者能够快速的定位到某个具体 Topic的信息,需要把CommitLog
文件中的消息分别发送到每个 Topic,因此,RocketMQ 使用了一种叫做 ConsumeQueue
的文件。
ConsumeQueue
文件是消息的逻辑索引文件,该文件的每个条目(Entry)具有固定的大小,每个条目占用 20 字节。具体结构如下:
- CommitLog Offset (8 bytes):消息在 CommitLog 文件中的物理偏移量。
- Message Size (4 bytes):消息的大小。
- Tag Hashcode (8 bytes):消息 Tag 的哈希值,用于消息过滤。
当消息写入CommitLog
文件后,RocketMQ 会将消息的偏移量信息写入到相应的ConsumeQueue
文件中,每个 Topic 的每个队列都有一个对应的ConsumeQueue
文件。
简化的写入代码如下:
1 | public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long cqOffset) { |
消费者在消费消息时,会首先从ConsumeQueue
文件中读取消息的物理偏移量,然后根据偏移量从CommitLog
文件中读取消息内容。
Index 文件
Index
文件是消息的索引文件,用于根据消息的 Key 或时间区间快速查找消息。它类似于数据库中的索引。
当消息写入CommitLog
文件后,RocketMQ 会根据消息的 Key 生成索引,并将索引信息写入到Index
文件中。
简化的写入代码如下:
1 | public void putKey(String key, long offset, long storeTimestamp) { |
根据消息的 Key 或时间区间查找消息时,RocketMQ 会首先从Index
文件中查找对应的偏移量,然后根据偏移量从CommitLog
文件中读取消息内容。
刷盘方式
RocketMQ 的刷盘机制主要分为同步刷盘和异步刷盘两种方式:
- 同步刷盘:消息写入 CommitLog 文件后,立即将数据刷入磁盘,然后返回写入成功的响应给生产者。同步刷盘的可靠性高,但性能相对较低。
- 异步刷盘:消息写入 CommitLog 文件后,立即返回写入成功的响应给生产者,后台线程负责将数据批量刷入磁盘。异步刷盘的性能高,但可靠性相对较低。
源码分析
这个部分,我们将详细的分析 RocketMQ 关于刷盘机制的核心源码。
CommitLog 文件刷盘
CommitLog 文件的刷盘逻辑主要在 CommitLog.java
类中,以下是简化的代码片段:
1 | public class CommitLog { |
代码解释:
- 消息写入:当消息到达 CommitLog 时,首先调用 putMessage 方法,这里的 mappedFile.appendMessage 方法将消息写入到内存映射文件(MappedByteBuffer)中。
- 触发刷盘:接下来调用 handleDiskFlush 方法触发刷盘操作,
- 刷盘:如果配置为同步刷盘(FlushDiskType.SYNC_FLUSH),则会创建一个 GroupCommitRequest 请求,并将其提交到 GroupCommitService;如果配置为异步刷盘(FlushDiskType.ASYNC_FLUSH),则会唤醒 FlushRealTimeService 或 CommitLogService 执行刷盘操作。
FlushCommitLogService 类
FlushCommitLogService
是一个抽象类,RocketMQ 提供了 FlushRealTimeService
和 GroupCommitService
两个具体实现。
- FlushRealTimeService:用于异步刷盘。
- GroupCommitService:用于同步刷盘。
以下是 FlushRealTimeService
的简化代码:
1 | class FlushRealTimeService extends FlushCommitLogService { |
FlushRealTimeService
是 RocketMQ 中用于异步刷盘的核心类,它通过后台线程定时将内存中的数据刷入磁盘,以保证数据的持久化。其主要逻辑包括:
- 定时检查是否需要刷盘;
- 根据配置参数执行刷盘操作;
- 更新刷盘时间戳;
- 等待下一个刷盘周期;
异步刷盘的触发条件
异步刷盘主要在以下几种情况下被触发:
消息写入到 CommitLog 后:当一条消息被写入到 CommitLog 文件后,如果配置为异步刷盘,RocketMQ 会唤醒异步刷盘服务,让其尽快将数据刷入磁盘。
定时触发:RocketMQ 的异步刷盘服务通常会在一个固定的时间间隔内被定时触发,以确保数据在一定的时间内被刷入磁盘。
达到某个阈值:当内存中的数据量达到某个阈值时,也会触发异步刷盘。这种阈值通常是通过配置来控制的,例如内存缓冲区的大小。
总结
RocketMQ 的刷盘机制通过同步刷盘和异步刷盘两种方式,确保消息在高性能和高可靠性之间找到平衡。理解其刷盘机制及源码实现,对于优化和调试RocketMQ系统具有重要意义。因此,还是建议有时间读读 RocketMQ的源码。
学习交流
如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。