1 前言
上一章节,我们大致上了解了消息的存储机制,其中在消息写入时,RocketMQ有两种不同的写入机制:
- 常规方式:通过
MMAP
(Memory Map,内存映射)写内存映射文件
(Memory-Mapped File,MMF),然后通过同步或异步刷盘机制
写入文件; - 堆外缓冲区:若
开关开启
,Broker主节点
且配置为异步刷盘
,则先写堆外内存池,再MMAP
写内存,最后异步刷盘
写入文件。 本章,从消息发送角度,详细了解下对于堆外缓冲区
方式的流程及原理。
2 开启条件与限制
要利用堆外内存进行消息的写入,首先,Broker的配置文件需要配置:
transientStorePoolEnable=true
因为默认条件下(版本:4.8.0),该参数值为false。
另外,还需要设置刷盘方式为:异步刷盘。
flushDiskType = ASYNC_FLUSH
最后,broker必须为主节点。我们定位到代码,实际看看开启的条件:
//MessageStoreConfig.java
public boolean isTransientStorePoolEnable() {
//开启堆外内存缓冲区,必须是异步刷盘+主节点
return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole();
}
3 TransientStorePool概要设计
在 RocketMQ 中,TransientStorePool
是一种优化磁盘 I/O 性能的机制。它通过预分配内存块,将消息写入预分配的内存块(直接内存),然后使用内存映射
将内存块中的数据刷到磁盘,从而提高写入性能。因此,当你需要提高 RocketMQ 的消息写入性能时,可以考虑开启TransientStorePool
。
TransientStorePool
,从名字上来看,类似线程池的定义。
public class TransientStorePool {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
/**
* 缓冲池大小,默认值:5
*/
private final int poolSize;
/**
* 每个bytebuffer大小,默认值:1G
*/
private final int fileSize;
/**
* ByteBuffer容器,双端队列
*/
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
public TransientStorePool(final MessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.poolSize = storeConfig.getTransientStorePoolSize();
this.fileSize = storeConfig.getMappedFileSizeCommitLog();
this.availableBuffers = new ConcurrentLinkedDeque<>();
}
/**
* It's a heavy init method.
* 初始化
*/
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
//.....................省略
}
当开启条件达成时,会初始化堆外内存,这些内存会一直锁定在内存中,不会被进程将其换至磁盘。另外,由于是堆外内存,这么设计可以避免频繁的GC。
再看看其调用的地方,通过borrowBuffer()
,对外提供。
既然
MMAP
可以解决写入性能问题,为什么还会出现TransientStorePool
?
1) 减少 GC 压力: 单独使用 MMAP 确实可以降低 GC 压力,因为 MMAP 使用的是堆外内存
。当文件被映射到内存地址空间时,文件的读写操作实际上是在内存中进行的,这部分内存位于堆外,不受 Java 垃圾回收(GC)机制的影响。
然而,在实际应用中,仅依赖 MMAP 可能无法完全避免 GC 压力。这是因为在某些场景中,如高并发写入时,仍然需要在堆内存中创建临时对象,这些对象会受到 GC 的影响。
当我们结合使用 TransientStorePool 和 MMAP 时,可以进一步降低 GC 压力。RocketMQ 将消息写入 TransientStorePool 中的 DirectByteBuffer(直接内存缓冲区),DirectByteBuffer 是一种堆外内存,不受 GC 影响。这样,写入操作实际上发生在堆外内存,既减少了堆内存的分配和回收操作,也降低了 GC 压力。
2) 异步刷盘:使用 TransientStorePool 和 MMAP 结合,可以实现异步刷盘。首先,将消息写入 TransientStorePool 中的内存块,然后,将这些内存块映射到MMF
。最后,通过后台刷盘服务将内存中的数据异步写入磁盘。这样可以降低磁盘 I/O 的延迟,提高响应速度。
3.1 TransientStorePool技术的设计原理
RocketMQ中的TransientStorePool设计是为了提高消息写入磁盘的性能。它的原理主要基于两个技术:内存池
和MMAP
。
- 内存池:TransientStorePool是一个预先分配的固定大小的内存池。它用来暂存消息数据,以减少频繁的系统内存分配和回收,从而提高性能。当生产者发送消息时,消息首先会被写入到这个内存池中,而不是直接写入到磁盘。内存池中的每个内存块大小与RocketMQ中CommitLog文件的一个映射内存块大小相同。
- MMAP:一种将文件或者文件的一部分映射到进程内存地址空间的技术。RocketMQ使用MMAP技术将CommitLog文件映射到内存中,这样就可以通过直接操作内存来读写CommitLog文件,避免了磁盘IO操作的性能开销。
3.2 工作流程
结合上述的两个技术,RocketMQ的TransientStorePool设计实现了以下工作流程:
- 当生产者发送消息时,消息首先被写入到内存池中的一个内存块。
- 当内存块被写满时,将整个内存块的数据通过MMAP的方式刷入到CommitLog文件对应的映射内存块中。
- 最后,通过调用操作系统的
msync
方法,将映射内存中的数据刷入磁盘。
这种设计可以有效地减少磁盘IO操作,提高RocketMQ的消息写入性能。同时,由于内存池和MMAP的使用,RocketMQ可以充分利用操作系统的缓存机制,进一步优化性能。
4 串联消息发送流程
有了上面的知识,我们就可以确定,在MappedFile
中,如果writeBuffer
不为null,要么就一定开启了堆外内存缓冲!!!
再结合消息的发送流程。 数据到了存储层,最终会调用MappedFile
的appendMessagesInner()
进行消息的存储。
归结下来,消息的发送就有两个路径,如下图:
- 1.走传统的
MMAP
内存映射,数据写mappedByteBuffer
,然后通过flush刷盘
; - 2.走堆外内存缓冲区,数据先写
writeBuffer
,再通过commit
提交到FileChannel
中,最后再flush
刷盘
以上两种方式,处理的都是基于bytebuffer
的实现,所以都通过put方法
可以写入内存。
所以对应前面讲的刷盘,你会发现为什么异步刷盘线程有两个。
- 一个是针对的
MMAP
刷盘; - 一个是针对的
堆外内存缓冲
的提交
刷盘。
同步刷盘线程:GroupCommitService
异步刷盘线程:CommitLog.CommitRealTimeService
、CommitLog.FlushRealTimeService
。
刷盘会在
CommitLog#asyncPutMessage
方法中进行。
所以,堆外内存缓冲区一定是要异步操作。另外,上图【两条发送路径】中,commit
的是针对堆外内存缓冲的提交。flush
则是针对MMAP
的内存映射的处理。
5 两种写入机制对比
- 默认方式,
MMAP+PageCache
的方式,读写消息都走的是pageCache
,这样子读写都在pagecache
里面不可避免会有锁的问题
。 - 堆外缓冲区,
DirectByteBuffer
(堆外内存)+PageCache
的两层架构方式,这样子可以实现读写消息分离
,写入消息时候写到的是DirectByteBuffer
——堆外内存中,读消息走的是PageCache
(对于DirectByteBuffer
是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了延时,比如说缺页中断降低,内存加锁,污染页的回写。 所以使用堆外缓冲区的方式相对来说会比较好,但是肯定的是,需要消耗一定的内存,如果服务器内存吃紧就不推荐这种模式,同时的话,堆外缓冲区的话也需要配合异步刷盘
才能使用。
6 应用场景
- 高性能场景:对于具有较高消息吞吐量要求的场景,开启 TransientStorePool 可以减少磁盘 I/O,提高写入性能。
- 大量写入操作:在大量写入操作的场景下,使用 TransientStorePool 可以减少频繁的系统内存分配和回收操作,从而提高性能。
- 集群环境:在集群环境中,开启 TransientStorePool 可以帮助提高整个集群的性能和稳定性,尤其是在集群节点较多时。
- 高峰期:在系统高峰期(比如双十一、黑五等)时,消息流量和写入操作可能会急剧增加。开启 TransientStorePool 可以帮助应对这种临时的高负载压力,提高整个系统的处理能力。
- 异步刷盘:在异步刷盘的场景下,将消息先写入内存缓冲区可以提高响应速度。开启 TransientStorePool 可以使得消息先被写入直接内存,然后通过后台刷盘服务异步写入磁盘,从而降低延迟。
- 低延迟要求:对于对延迟有严格要求的场景,开启 TransientStorePool 可以减少磁盘 I/O 操作,降低消息写入的延迟。