RocketMQ的消息存储主要分为3个级别:
- CommitLog:消息主体存储位置,所有发送到Broker的消息都会被追加写入CommitLog。CommitLog采用预分配固定大小文件方式实现,每个文件默认1G。
- ConsumeQueue:消息逻辑队列,由CommitLog物理存储文件映射逻辑队列,实现按消息Topic与QueueId在ConsumeQueue划分存储。实现快速查找与删除消息。
- IndexFile:消息索引文件,记录每个ConsumeQueue中的消息在CommitLog中的起始offset与结束offset。用于快速查找ConsumeQueue中的消息在CommitLog中的地址。
消息存储主要流程:
- Producer发送消息后,Broker将消息追加写入当前正在写入的CommitLog文件末尾。
- Producer发送消息时带有Topic与QueueId,Broker根据这两个属性为消息生成ConsumeQueue链表节点,并插入到相应ConsumeQueue的尾部。
- 生成ConsumeQueue节点时,会记录该消息在CommitLog中的起始offset与结束offset,生成一个IndexFile的索引项。
- 消费者向Broker拉取消息时,Broker先在相应ConsumeQueue中找出该消费者未拉取的首条消息。
- Broker然后根据该消息的CommitLog起始offset在CommitLog文件中查找出完整消息内容返回给消费者。
- 消费者确认消费后,Broker删除ConsumeQueue中相应的消息节点与IndexFile索引项。
RocketMQ通过CommitLog实现持久化存储,ConsumeQueue与IndexFile实现快速查找,结合生成物理存储文件与逻辑消息队列映射的方式实现高效查询与删除,来达到高性能与高吞吐的消息存储。