【RocketMQ学习】10.源码之堆外内存


1 前言

上一章节,我们大致上了解了消息的存储机制,其中在消息写入时,RocketMQ有两种不同的写入机制:

  • 常规方式:通过MMAP(Memory Map,内存映射)写内存映射文件(Memory-Mapped File,MMF),然后通过同步或异步刷盘机制写入文件;
  • 堆外缓冲区:若开关开启Broker主节点且配置为异步刷盘,则先写堆外内存池,再MMAP写内存,最后异步刷盘写入文件。
    MMAP零拷贝技术
    本章,从消息发送角度,详细了解下对于堆外缓冲区方式的流程及原理。

2 开启条件与限制

要利用堆外内存进行消息的写入,首先,Broker的配置文件需要配置:

transientStorePoolEnable=true

因为默认条件下(版本:4.8.0),该参数值为false。

另外,还需要设置刷盘方式为:异步刷盘。

flushDiskType = ASYNC_FLUSH

最后,broker必须为主节点。我们定位到代码,实际看看开启的条件:

堆外内存存储开启条件
//MessageStoreConfig.java
public boolean isTransientStorePoolEnable() {
    //开启堆外内存缓冲区,必须是异步刷盘+主节点
    return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole();
}

3 TransientStorePool概要设计

在 RocketMQ 中,TransientStorePool 是一种优化磁盘 I/O 性能的机制。它通过预分配内存块,将消息写入预分配的内存块(直接内存),然后使用内存映射将内存块中的数据刷到磁盘,从而提高写入性能。因此,当你需要提高 RocketMQ 的消息写入性能时,可以考虑开启TransientStorePool

TransientStorePool,从名字上来看,类似线程池的定义。

public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    /**
     * 缓冲池大小,默认值:5
     */
    private final int poolSize;
    /**
     * 每个bytebuffer大小,默认值:1G
     */
    private final int fileSize;
    /**
     * ByteBuffer容器,双端队列
     */
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * It's a heavy init method.
     * 初始化
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }
    //.....................省略
}

当开启条件达成时,会初始化堆外内存,这些内存会一直锁定在内存中,不会被进程将其换至磁盘。另外,由于是堆外内存,这么设计可以避免频繁的GC。

再看看其调用的地方,通过borrowBuffer(),对外提供。

调用位置

既然MMAP可以解决写入性能问题,为什么还会出现TransientStorePool?

  1. 减少 GC 压力: 单独使用 MMAP 确实可以降低 GC 压力,因为 MMAP 使用的是堆外内存。当文件被映射到内存地址空间时,文件的读写操作实际上是在内存中进行的,这部分内存位于堆外,不受 Java 垃圾回收(GC)机制的影响。
    然而,在实际应用中,仅依赖 MMAP 可能无法完全避免 GC 压力。这是因为在某些场景中,如高并发写入时,仍然需要在堆内存中创建临时对象,这些对象会受到 GC 的影响。
    当我们结合使用 TransientStorePool 和 MMAP 时,可以进一步降低 GC 压力。RocketMQ 将消息写入 TransientStorePool 中的 DirectByteBuffer(直接内存缓冲区),DirectByteBuffer 是一种堆外内存,不受 GC 影响。这样,写入操作实际上发生在堆外内存,既减少了堆内存的分配和回收操作,也降低了 GC 压力。
  2. 异步刷盘:使用 TransientStorePool 和 MMAP 结合,可以实现异步刷盘。首先,将消息写入 TransientStorePool 中的内存块,然后,将这些内存块映射到 MMF。最后,通过后台刷盘服务将内存中的数据异步写入磁盘。这样可以降低磁盘 I/O 的延迟,提高响应速度。

3.1 TransientStorePool技术的设计原理

RocketMQ中的TransientStorePool设计是为了提高消息写入磁盘的性能。它的原理主要基于两个技术:内存池MMAP

  • 内存池:TransientStorePool是一个预先分配的固定大小的内存池。它用来暂存消息数据,以减少频繁的系统内存分配和回收,从而提高性能。当生产者发送消息时,消息首先会被写入到这个内存池中,而不是直接写入到磁盘。内存池中的每个内存块大小与RocketMQ中CommitLog文件的一个映射内存块大小相同。
  • MMAP:一种将文件或者文件的一部分映射到进程内存地址空间的技术。RocketMQ使用MMAP技术将CommitLog文件映射到内存中,这样就可以通过直接操作内存来读写CommitLog文件,避免了磁盘IO操作的性能开销。

3.2 工作流程

结合上述的两个技术,RocketMQ的TransientStorePool设计实现了以下工作流程:

  • 当生产者发送消息时,消息首先被写入到内存池中的一个内存块。
  • 当内存块被写满时,将整个内存块的数据通过MMAP的方式刷入到CommitLog文件对应的映射内存块中。
  • 最后,通过调用操作系统的msync方法,将映射内存中的数据刷入磁盘。

这种设计可以有效地减少磁盘IO操作,提高RocketMQ的消息写入性能。同时,由于内存池和MMAP的使用,RocketMQ可以充分利用操作系统的缓存机制,进一步优化性能。

4 串联消息发送流程

有了上面的知识,我们就可以确定,在MappedFile中,如果writeBuffer不为null,要么就一定开启了堆外内存缓冲!!!

再结合消息的发送流程。 数据到了存储层,最终会调用MappedFileappendMessagesInner()进行消息的存储。

使用位置

归结下来,消息的发送就有两个路径,如下图:

两条发送路径
  • 1.走传统的MMAP内存映射,数据写mappedByteBuffer,然后通过flush刷盘
  • 2.走堆外内存缓冲区,数据先写writeBuffer,再通过commit提交到FileChannel中,最后再flush刷盘
    以上两种方式,处理的都是基于bytebuffer的实现,所以都通过put方法可以写入内存。

所以对应前面讲的刷盘,你会发现为什么异步刷盘线程有两个。

  • 一个是针对的MMAP刷盘;
  • 一个是针对的堆外内存缓冲提交刷盘。
    刷盘线程

同步刷盘线程:GroupCommitService
异步刷盘线程:CommitLog.CommitRealTimeServiceCommitLog.FlushRealTimeService

刷盘机制

刷盘会在CommitLog#asyncPutMessage方法中进行。

所以,堆外内存缓冲区一定是要异步操作。另外,上图【两条发送路径】中,commit的是针对堆外内存缓冲的提交。flush则是针对MMAP的内存映射的处理。

5 两种写入机制对比

  • 默认方式,MMAP+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题
  • 堆外缓冲区,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了延时,比如说缺页中断降低,内存加锁,污染页的回写。
    读写分离
    所以使用堆外缓冲区的方式相对来说会比较好,但是肯定的是,需要消耗一定的内存,如果服务器内存吃紧就不推荐这种模式,同时的话,堆外缓冲区的话也需要配合异步刷盘才能使用。

6 应用场景

  • 高性能场景:对于具有较高消息吞吐量要求的场景,开启 TransientStorePool 可以减少磁盘 I/O,提高写入性能。
  • 大量写入操作:在大量写入操作的场景下,使用 TransientStorePool 可以减少频繁的系统内存分配和回收操作,从而提高性能。
  • 集群环境:在集群环境中,开启 TransientStorePool 可以帮助提高整个集群的性能和稳定性,尤其是在集群节点较多时。
  • 高峰期:在系统高峰期(比如双十一、黑五等)时,消息流量和写入操作可能会急剧增加。开启 TransientStorePool 可以帮助应对这种临时的高负载压力,提高整个系统的处理能力。
  • 异步刷盘:在异步刷盘的场景下,将消息先写入内存缓冲区可以提高响应速度。开启 TransientStorePool 可以使得消息先被写入直接内存,然后通过后台刷盘服务异步写入磁盘,从而降低延迟。
  • 低延迟要求:对于对延迟有严格要求的场景,开启 TransientStorePool 可以减少磁盘 I/O 操作,降低消息写入的延迟。

文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录