RocketMQ 是如何刷盘的?

你好,我是猿java。

RocketMQ 如何保证发送消息不丢失? 这篇文章中提到了刷盘,这篇文章,我们将详细介绍 RocketMQ 的刷盘机制,包括它写了哪些文件,如何写入磁盘,以及相关的源码分析和示例代码。

本文源码基于 RocketMQ 5.0

RocketMQ 刷盘原理

RocketMQ 的刷盘流程主要涉及以下几类文件:

  1. CommitLog 文件:存储所有消息,支持顺序写入和随机读取。
  2. ConsumeQueue 文件:存储消息的逻辑索引,加速消息消费。
  3. Index 文件:存储消息的索引信息,支持根据 Key 或时间区间快速查找消息。

CommitLog 文件

CommitLog文件是 RocketMQ 存储消息的核心文件,所有的消息首先被顺序写入到CommitLog文件中。

消息写入CommitLog文件的过程如下:

  • 消息到达 Broker 后,通过 DefaultMessageStore 类的 putMessage 方法写入。
  • putMessage 方法调用CommitLog类的 putMessage 方法。
  • CommitLog将消息写入到内存映射文件(MappedFile)中。
  • 根据刷盘策略(同步或异步),将数据刷入磁盘。

简化的写入代码如下:

1
2
3
4
5
6
7
8
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 省略部分代码...
// 写入消息到 MappedByteBuffer
AppendMessageResult appendMessageResult = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 触发刷盘
handleDiskFlush(appendMessageResult, putMessageResult, msg);
return putMessageResult;
}

读取消息时,通过消息的物理偏移量(offset)从 CommitLog文件中读取,偏移量信息通常存储在ConsumeQueue文件中。

ConsumeQueue 文件

因为CommitLog文件包括当前 Broker 所有 Topic的信息,因此,为了消费者能够快速的定位到某个具体 Topic的信息,需要把CommitLog文件中的消息分别发送到每个 Topic,因此,RocketMQ 使用了一种叫做 ConsumeQueue的文件。

ConsumeQueue文件是消息的逻辑索引文件,该文件的每个条目(Entry)具有固定的大小,每个条目占用 20 字节。具体结构如下:

  • CommitLog Offset (8 bytes):消息在 CommitLog 文件中的物理偏移量。
  • Message Size (4 bytes):消息的大小。
  • Tag Hashcode (8 bytes):消息 Tag 的哈希值,用于消息过滤。

当消息写入CommitLog文件后,RocketMQ 会将消息的偏移量信息写入到相应的ConsumeQueue文件中,每个 Topic 的每个队列都有一个对应的ConsumeQueue文件。

简化的写入代码如下:

1
2
3
4
5
6
7
8
9
public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long cqOffset) {
// 省略部分代码...

// 写入 ConsumeQueue
this.mappedFile.appendMessage(positionInfo, this.appendMessageCallback);

// 触发刷盘
handleDiskFlush();
}

消费者在消费消息时,会首先从ConsumeQueue文件中读取消息的物理偏移量,然后根据偏移量从CommitLog文件中读取消息内容。

Index 文件

Index文件是消息的索引文件,用于根据消息的 Key 或时间区间快速查找消息。它类似于数据库中的索引。

当消息写入CommitLog文件后,RocketMQ 会根据消息的 Key 生成索引,并将索引信息写入到Index文件中。

简化的写入代码如下:

1
2
3
4
5
6
7
public void putKey(String key, long offset, long storeTimestamp) {
// 计算 Key 的哈希值
int keyHash = key.hashCode();

// 写入索引信息
this.mappedFile.appendMessage(new IndexEntry(keyHash, offset, storeTimestamp));
}

根据消息的 Key 或时间区间查找消息时,RocketMQ 会首先从Index文件中查找对应的偏移量,然后根据偏移量从CommitLog文件中读取消息内容。

img

刷盘方式

RocketMQ 的刷盘机制主要分为同步刷盘和异步刷盘两种方式:

  1. 同步刷盘:消息写入 CommitLog 文件后,立即将数据刷入磁盘,然后返回写入成功的响应给生产者。同步刷盘的可靠性高,但性能相对较低。
  2. 异步刷盘:消息写入 CommitLog 文件后,立即返回写入成功的响应给生产者,后台线程负责将数据批量刷入磁盘。异步刷盘的性能高,但可靠性相对较低。

源码分析

这个部分,我们将详细的分析 RocketMQ 关于刷盘机制的核心源码。

CommitLog 文件刷盘

CommitLog 文件的刷盘逻辑主要在 CommitLog.java 类中,以下是简化的代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class CommitLog {
// 刷盘服务类
private final FlushCommitLogService flushCommitLogService;

public CommitLog() {
this.flushCommitLogService = new FlushRealTimeService();
}

// 消息写入方法
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 省略部分代码...

// 写入消息到 MappedByteBuffer
appendMessageResult = mappedFile.appendMessage(msg, this.appendMessageCallback);

// 触发刷盘
handleDiskFlush(appendMessageResult, putMessageResult, msg);

return putMessageResult;
}

// 处理刷盘逻辑
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.future().get();
if (!flushOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
}
// 异步刷盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.flushCommitLogService.wakeup();
} else {
this.commitLogService.wakeup();
}
}
}
}

代码解释:

  • 消息写入:当消息到达 CommitLog 时,首先调用 putMessage 方法,这里的 mappedFile.appendMessage 方法将消息写入到内存映射文件(MappedByteBuffer)中。
  • 触发刷盘:接下来调用 handleDiskFlush 方法触发刷盘操作,
  • 刷盘:如果配置为同步刷盘(FlushDiskType.SYNC_FLUSH),则会创建一个 GroupCommitRequest 请求,并将其提交到 GroupCommitService;如果配置为异步刷盘(FlushDiskType.ASYNC_FLUSH),则会唤醒 FlushRealTimeService 或 CommitLogService 执行刷盘操作。

FlushCommitLogService 类

FlushCommitLogService 是一个抽象类,RocketMQ 提供了 FlushRealTimeServiceGroupCommitService 两个具体实现。

  • FlushRealTimeService:用于异步刷盘。
  • GroupCommitService:用于同步刷盘。

以下是 FlushRealTimeService 的简化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class FlushRealTimeService extends FlushCommitLogService {
@Override
public void run() {
while (!this.isStopped()) {
this.waitForRunning(10);
this.doFlush();
}
}

private void doFlush() {
// 省略部分代码...
mappedFileQueue.flush(0);
}
}

FlushRealTimeService 是 RocketMQ 中用于异步刷盘的核心类,它通过后台线程定时将内存中的数据刷入磁盘,以保证数据的持久化。其主要逻辑包括:

  • 定时检查是否需要刷盘;
  • 根据配置参数执行刷盘操作;
  • 更新刷盘时间戳;
  • 等待下一个刷盘周期;

异步刷盘的触发条件

异步刷盘主要在以下几种情况下被触发:

  • 消息写入到 CommitLog 后:当一条消息被写入到 CommitLog 文件后,如果配置为异步刷盘,RocketMQ 会唤醒异步刷盘服务,让其尽快将数据刷入磁盘。

  • 定时触发:RocketMQ 的异步刷盘服务通常会在一个固定的时间间隔内被定时触发,以确保数据在一定的时间内被刷入磁盘。

  • 达到某个阈值:当内存中的数据量达到某个阈值时,也会触发异步刷盘。这种阈值通常是通过配置来控制的,例如内存缓冲区的大小。

总结

RocketMQ 的刷盘机制通过同步刷盘和异步刷盘两种方式,确保消息在高性能和高可靠性之间找到平衡。理解其刷盘机制及源码实现,对于优化和调试RocketMQ系统具有重要意义。因此,还是建议有时间读读 RocketMQ的源码。

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。

drawing