我们知道RocketMQ主要分为消息 生产、存储(消息堆积)、消费 三大块领域。
那接下来,我们白话一下,RocketMQ是如何存储消息的,揭秘消息存储全过程。
注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ 4.9.4版本
RocketMQ使用了一种基于日志的存储方式,将消息以顺序写入的方式追加到文件中,从而实现高性能的消息存储和读取。
RocketMQ的消息存储方式可以分为两个类型:CommitLog 和ConsumeQueue 。
图片
还有一个文件类型是indexfile,主要用于控制台消息检索,不影响消息的写入与消费,我们就不展开了。
CommitLog文件存储了Producer端写入的消息主体内容,它以追加写入的方式将消息存储到磁盘上的文件中。
单个文件大小默认1G ,文件名长度为20位(左边补零,剩余为起始偏移量),当文件满了,写入下一个文件。
比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
它的主要特点是:顺序写,但是随机读(被ConsumeQueue读取)。
虽然是随机读,但是利用package机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
Broker单个实例下所有的队列共用一个日志数据文件CommitLog来存储。而Kafka采用的是独立型的存储结构,每个队列一个文件。
ConsumeQueue文件是用于支持消息消费的存储结构。保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
消费者 通过 顺序读取 ConsumeQueue文件,可以快速定位到消息在CommitLog中的物理存储位置,从而实现快速消息的拉取和消费。
从实际物理存储的角度来看,每个主题Topic下的每个队列Queue对应一个ConsumeQueue文件。
生产者端的消息是顺序写入CommitLog,消费者端是顺序读取ConsumeQueue。但是根据ConsumeQueue的起始物理位置偏移量offset读取消息真实内容,实际是随机读取CommitLog。实现了 消息生产与消息消费、数据存储和数据索引 相互分离。
Broker在把消息写入日志文件的过程中,如果在刚收到消息时,Broker异常宕机了,那么内存中尚未写入磁盘的消息就会丢失了。
因此,RocketMQ持久化消息分为两种:同步刷盘和异步刷盘(默认配置)。
异步刷盘是指Broker收到消息后先存储到PageCache,然后立即通知Producer消息已存储成功,可以继续处理业务逻辑。此后,Broker会启动一个异步线程将消息持久化到磁盘。然而,如果Broker在持久化到磁盘之前发生故障,消息将会丢失。
## 刷盘策略配置flushDiskType = ASYNC_FLUSH
注意,写入PageCache后,应用服务宕机消息不丢失,只有机器断电或宕机会有少量消息丢失。
相比之下,同步刷盘的方式是在消息存储到缓存后不立即通知Producer,而是等待消息被持久化到磁盘后再通知Producer。这种方式确保了消息不会丢失,但性能不如异步刷盘高。一般用于金融业务。
## 刷盘策略配置flushDiskType = SYNC_FLUSH
在选择刷盘方式时,需要根据业务场景进行权衡。
即使Broker采用同步刷盘策略,但如果刷盘完成后磁盘损坏,会导致所有存储在磁盘上的消息丢失。
即使采用了主从复制,如果主节点在刷盘完成后还没有来得及将数据同步给从节点就发生了磁盘故障,同样会导致数据丢失。
所以我们可以配置同步机制,等待从节点复制完成主节点的消息后,才去通知Producer完成了消息存储。
## 主从同步策略配置brokerRole=SYNC_MASTER
RocketMQ通过使用内存映射文件(包括CommitLog、 ConsumeQueue等文件)来提高IO访问性能,也就是我们常说的零拷贝技术。
Java在NIO包里,引入了sendFile(FileChannel类)和MMAP(MappedByteBuffer类)两种实现方式的零拷贝技术。
主流的MQ都会使用零拷贝技术,来提升IO:
在不开启RocketMQ的内存映射增强方案时,RocketMQ的读和写都只会简单直接使用MMAP。
但是,MappedByteBuffer也存在一些缺陷:
为此,RocketMQ通过transientStorePoolEnable参数控制,对写入进行了优化。
如果开启了这个参数,会将写入拆分为两步, 写入缓冲区 + 异步刷盘 的增强策略。
## 刷盘策略配置flushDiskType = ASYNC_FLUSH transientStorePoolEnable = true
MappedFile会提前申请一块直接内存用作缓冲区,放弃使用mmap直接写文件。
数据先写入缓冲区,然后异步线程每200ms(且脏数据达到16K,commitCommitLogLeastPages = 4)将缓冲区的数据commit写入FileChannel。
再唤醒定时服务(FlushRealTimeService类)将FileChannel里的数据持久化到磁盘。flush函数和commit一样也可以传入一个刷盘页数,当脏页数量达到16K时(flushLeastPages = 4),会进行刷盘操作,调用FileChannel的force将内存中的数据持久化到磁盘。
开启transientStorePoolEnable参数后,性能最好,但是相对来说持久化最不可靠
RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。
需要注意的是,在RocketMQ中,消息存储时长并不能完整控制消息的实际保存时间。
因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。
建议在存储成本可控的前提下,尽可能延长消息存储时长。延长消息存储时长,可以为紧急故障恢复、应急问题排查和消息回溯带来更多的可操作空间。
本文链接:http://www.28at.com/showinfo-26-5709-0.html三分钟白话RocketMQ系列—— 如何存储消息
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 编程的思辨力:程序员们解析技术背后的思想