【RocketMQ学习】9.源码之Store


1 整体流程

消息存储整体流程
消息存储架构设计

2 核心类

2.1 DefaultMessageStore

DefaultMessageStore

部分属性:

private final MessageStoreConfig messageStoreConfig;	//消息配置属性
private final CommitLog commitLog;		//CommitLog文件存储的实现类
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;	//消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService;	//消息队列文件刷盘线程
private final CleanCommitLogService cleanCommitLogService;	//清除CommitLog文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;	//清除ConsumerQueue队列文件服务
private final IndexService indexService;	//索引实现类
private final AllocateMappedFileService allocateMappedFileService;	//MappedFile分配服务
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService;	//存储HA机制
private final ScheduleMessageService scheduleMessageService;	//消息服务调度线程
private final StoreStatsService storeStatsService;	//消息存储服务
private final TransientStorePool transientStorePool;	//消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager;	//Broker状态管理器
private final MessageArrivingListener messageArrivingListener;	//消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig;	//Broker配置类
private StoreCheckpoint storeCheckpoint;	//文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList;	//CommitLog文件转发请求

以流程图为准,以普通消息为例,从DefaultMessageStore#asyncPutMessage出发,源码追踪。

public class DefaultMessageStore implements MessageStore {
    @Override
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
        //1.检查Store状态
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
        }
        //2.检查消息本体
        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
        }

        long beginTime = this.getSystemClock().now();
        //3.将消息写入CommitLog文件
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

        putResultFuture.thenAccept((result) -> {
            long elapsedTime = this.getSystemClock().now() - beginTime;
            if (elapsedTime > 500) {
                log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
            }
            //4.putMessage过程耗时记录
            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
        });

        return putResultFuture;
    }

    /** 检查Store状态 */
    private PutMessageStatus checkStoreStatus() {
        //1.是否关机
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        }
        //2.判断Broker角色,如果是从节点,则无需写入
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("broke role is slave, so putMessage is forbidden");
            }
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        }
        //3.判断当前写入状态,如果是正在写入,则不能继续
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
                        "the broker's disk is full, write to logic queue error, write to index file error, etc");
            }
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        } else {
            this.printTimes.set(0);
        }
        //4.判断系统PageCache缓存去是否占用
        if (this.isOSPageCacheBusy()) {
            return PutMessageStatus.OS_PAGECACHE_BUSY;
        }
        return PutMessageStatus.PUT_OK;
    }

    /** 检查消息本体 */
    private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
        //1.判断消息主题长度是否超过最大限制
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return PutMessageStatus.MESSAGE_ILLEGAL;
        }
        //2.判断消息属性长度是否超过限制
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return PutMessageStatus.MESSAGE_ILLEGAL;
        }
        return PutMessageStatus.PUT_OK;
    }
}

其中,checkStoreStatus检查Store状态,checkMessage检查消息本体。检查完毕没问题后,则调用commitLog#asyncPutMessage进行消息落地。

2.2 CommitLog

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // 1.记录消息存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());
        // 2.Set the message body BODY CRC (consider the most appropriate setting on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;
        //3.获取消息存储服务
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 4.延迟消息
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //5.将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //6.将queueid替换为(延迟级别-1),每个延迟级别都对应一条队列
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                //7.备份原始的topic/queueid, 留着后面解析
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                //8.将消息topic设置为延迟topic,这样订阅该topic的消费者不能及时去消费了
                //等到延迟时间到了,将延迟topic再替换成原始topic,消费者就可以消费了
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);
            //9.判断mappedFile,如果为空或者已满,创建新的mappedFile文件
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            //10.如果创建失败,直接返回
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }
            //11.写入消息到mappedFile中(还未刷盘)
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            //...........
        } finally {
            putMessageLock.unlock();
        }
        //...........
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        //12.flush,刷盘
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        //13.主从同步
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                }
            }
            return putMessageResult;
        });
    }
}

其中,第11步写入mappedFile时,只是写缓冲区,并未刷盘,刷盘要等到第12步进行。

2.3 MappedFile

MappedFile#appendMessage(msg, this.appendMessageCallback);开始写入缓冲区ByteBuffer。其中appendMessageCallback的类为CommitLog内部类,写完缓冲区会回调给CommitLog#putMessage

public class MappedFile extends ReferenceResource {
    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
        //1.获取写指针
        int currentPos = this.wrotePosition.get();
        //如果指针大于文件大小则直接返回
        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            //2.写缓冲,通过回调方法写入
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            //3.更新写指针
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
}

第2步写缓冲,采用了回调,调用了CommitLog的内部类DefaultAppendMessageCallback进行写,方法有点长。

class DefaultAppendMessageCallback implements AppendMessageCallback {
    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
        // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

        // PHY OFFSET
        long wroteOffset = fileFromOffset + byteBuffer.position();

        int sysflag = msgInner.getSysFlag();

        int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
        int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
        ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
        ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

        this.resetByteBuffer(storeHostHolder, storeHostLength);
        String msgId;
        //生成消息ID
        if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
            msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
        } else {
            msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
        }

        // Record ConsumeQueue information
        keyBuilder.setLength(0);
        keyBuilder.append(msgInner.getTopic());
        keyBuilder.append('-');
        keyBuilder.append(msgInner.getQueueId());
        String key = keyBuilder.toString();
        //获得该消息在消息队列中的偏移量
        Long queueOffset = CommitLog.this.topicQueueTable.get(key);
        if (null == queueOffset) {
            queueOffset = 0L;
            CommitLog.this.topicQueueTable.put(key, queueOffset);
        }

        // Transaction messages that require special handling
        final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
        switch (tranType) {
            // Prepared and Rollback message is not consumed, will not enter the
            // consumer queuec
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                queueOffset = 0L;
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            default:
                break;
        }

        /**
         * Serialize message
         */
        //获得消息属性长度
        final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

        final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

        if (propertiesLength > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long. length={}", propertiesData.length);
            return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
        }
        //获得消息主题大小
        final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
        final int topicLength = topicData.length;
        //获得消息体大小
        final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
        //计算消息总长度
        final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

        // Exceeds the maximum message
        //消息长度不能超过4M
        if (msgLen > this.maxMessageSize) {
            CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
            return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
        }

        // Determines whether there is sufficient free space
        //消息如果没有足够的存储空间则新创建CommitLog文件
        if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
            this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(maxBlank);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
            // 3 The remaining space may be any value
            // Here the length of the specially set maxBlank
            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
            return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
        }

        // Initialization of storage space
        this.resetByteBuffer(msgStoreItemMemory, msgLen);
        // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(msgLen);
        // 2 MAGICCODE
        this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
        // 3 BODYCRC
        this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
        // 4 QUEUEID
        this.msgStoreItemMemory.putInt(msgInner.getQueueId());
        // 5 FLAG
        this.msgStoreItemMemory.putInt(msgInner.getFlag());
        // 6 QUEUEOFFSET
        this.msgStoreItemMemory.putLong(queueOffset);
        // 7 PHYSICALOFFSET
        this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
        // 8 SYSFLAG
        this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
        // 9 BORNTIMESTAMP
        this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
        // 10 BORNHOST
        this.resetByteBuffer(bornHostHolder, bornHostLength);
        this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
        // 11 STORETIMESTAMP
        this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
        // 12 STOREHOSTADDRESS
        this.resetByteBuffer(storeHostHolder, storeHostLength);
        this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
        // 13 RECONSUMETIMES
        this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
        // 14 Prepared Transaction Offset
        this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
        // 15 BODY
        this.msgStoreItemMemory.putInt(bodyLength);
        if (bodyLength > 0)
            this.msgStoreItemMemory.put(msgInner.getBody());
        // 16 TOPIC
        this.msgStoreItemMemory.put((byte) topicLength);
        this.msgStoreItemMemory.put(topicData);
        // 17 PROPERTIES
        this.msgStoreItemMemory.putShort((short) propertiesLength);
        if (propertiesLength > 0) {
            this.msgStoreItemMemory.put(propertiesData);
        }

        //将消息存储到ByteBuffer中,返回AppendMessageResult
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        // Write messages to the queue buffer
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

        AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

        switch (tranType) {
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // The next update ConsumeQueue information
                CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                break;
            default:
                break;
        }
        return result;
    }
}

回到DefaultMessageStore#asyncPutMessage的第11步,写入mappedFile缓冲区完成,再刷盘、主从同步。

//12.flush,刷盘
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//13.主从同步
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

3 存储文件

存储文件结构详见3.1 存储文件一节。

3.1 CommitLog文件

首先说说MappedFile的作用,如下图:

MappedFile所处位置

再从创建文件看看源码,定位到CommitLog#asyncPutMessage()第9~11步:

//9.判断mappedFile,如果为空或者已满,创建新的mappedFile文件
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}

老版本以前叫protected MappedFile tryCreateMappedFile(long createOffset) {},这里我们看看mappedFile命名规则,看UtilAll.offset2FileName(createOffset)方法:

public static String offset2FileName(final long offset) {
    final NumberFormat nf = NumberFormat.getInstance();
    //文件名长度:20
    nf.setMinimumIntegerDigits(20);
    nf.setMaximumFractionDigits(0);
    nf.setGroupingUsed(false);
    return nf.format(offset);
}

MappedFileQueue#getLastMappedFile(startOffset, true)中,通过AllocateMappedFileService#putRequestAndReturnMappedFile创建mappedFile,这个是个异步操作。因为AllocateMappedFileService也是和PullMessageServiceRebalanceService一样,继承自ServiceThread,而在上一章节中,我们说过ServiceThread继承自Thread.

关注AllocateMappedFileService#mmapOperation()中如下代码即可:

MappedFile mappedFile;
//TransientStorePoolEnable为true并且开启异步刷盘,则消息数据先写入堆外内存,然后异步线程把堆外内存数据刷到 PageCache
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    try {
        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        //写入MappedByteBuffer缓冲区,建立磁盘文件-内存映射关系
        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    } catch (RuntimeException e) {
        log.warn("Use default implementation.");
        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    }
}
//TransientStorePoolEnable为false,则直接写入 PageCache
else {
    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}

写入消息时,

  • 如果 isTransientStorePoolEnable 方法返回 true,则消息数据先写入堆外内存然后异步线程把堆外内存数据刷到 PageCache
  • 如果返回 false,则直接写入 PageCache。后面根据刷盘策略把 PageCache 中数据持久化到磁盘。如下图:
    消息写入流程

但无论先写对堆外内存还是直接写 PageCache,文件数据都会映射到 MappedByteBuffer。不同的是,如果消息先写入堆外内存,则 MappedByteBuffer 主要用来读消息堆外内存用来写消息。这一定程度上实现了读写分离,减少 PageCache 写入压力。

再来看看mappedFile.init()方法,对于上述两种情况,其实就是第一种情况多走了一步写对外内存(PS:这里有零拷贝出现,赶紧标记下)。

为了让CommitLog操作效率更高,RocketMQ 使用了 mmap 将磁盘上日志文件映射到用户态的内存地址中,减少日志文件从磁盘到用户态内存之间的数据拷贝。

public class MappedFile extends ReferenceResource {
    //开启transientStorePoolEnable和异步刷盘,走这里
    public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        //初始化writeBuffer
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }

    //未开启transientStorePoolEnable或者同步刷盘,走这里
    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;

        ensureDirOK(this.file.getParent());

        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            //MMAP,刷盘会在CommitLog#asyncPutMessage(final MessageExtBrokerInner msg)方法中落实
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }
}

writeBuffer还有印象吗?看看2.3节。

3.2 ConsumeQueue/Index文件

消息消费队列文件(ConsumeQueue)、消息属性索引文件(IndexFile)都是基于CommitLog文件构建的,当消息生产者提交的消息存储在CommitLog文件中,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。

RocketMQ通过开启一个线程ReputMessageService准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumerQueue、IndexFile文件。

RocketMQ Dispatcher

在 DefaultMessageStore 初始化的时候会启动一个线程 ReputMessageService,这个线程的逻辑是死循环里面每隔1ms执行一次,从 CommitLog 中获取消息然后写入 ConsumeQueueindexFile 文件。

ReputMessageService初始化

其本质上也是一个线程:

class ReputMessageService extends ServiceThread{
    @Override
    public void run() {
        DefaultMessageStore.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                Thread.sleep(1);
                this.doReput();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        DefaultMessageStore.log.info(this.getServiceName() + " service end");
    }
}

调用路径:ReputMessageService#doReput()->DefaultMessageStore.this.doDispatch->CommitLogDispatcher#dispatch().

public void doDispatch(DispatchRequest req) {
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

CommitLogDispatcher有两个实现类:

  • CommitLogDispatcherBuildConsumeQueue:写ConsumeQueue;
  • CommitLogDispatcherBuildIndex:写Index.

其中,dispatcherList如下构建:

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        //ConsumeQueue、Index分发处理器
        this.dispatcherList = new LinkedList<>();
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
}

可以看到,即使 Broker 挂了,只要 CommitLog 在,就可以重新构建出 ConsumeQueue 和 Index 文件。

上述调用流程图如下:

转发CommitLog文件更新事件过程
ConsumeQueue产生过程
Index产生过程

4 消息队列和索引文件恢复

由于RocketMQ存储首先将消息全量存储在CommitLog文件中,然后异步生成转发任务更新ConsumerQueue和Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,导致CommitLog、ConsumerQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在CommitLog中文件中存在,但由于没有转发到ConsumerQueue,这部分消息将永远无法被消费者消费。

消息队列和索引文件恢复过程

4.1 存储文件加载

DefaultMessageStore#load 判断上一次是否异常退出。实现机制是Broker在启动时创建abort文件,在退出时通过JVM钩子函数删除abort文件。如果下次启动时存在abort文件。说明Broker时异常退出的,CommitLog与ConsumerQueue数据有可能不一致,需要进行修复。

存储文件加载
public class DefaultMessageStore implements MessageStore {
    //DefaultMessageStore#load
    public boolean load() {
        boolean result = true;
        try {
            //判断临时文件是否存在
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
            //加载延时队列
            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }

            // 加载CommitLog文件
            result = result && this.commitLog.load();

            // 加载 Consume Queue
            result = result && this.loadConsumeQueue();

            if (result) {
                //加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点
                this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                //加载index文件
                this.indexService.load(lastExitOK);
                //根据Broker是否异常退出,执行不同的恢复策略
                this.recover(lastExitOK);

                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
            }
        } catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }

        if (!result) {
            this.allocateMappedFileService.shutdown();
        }

        return result;
    }

    private void recover(final boolean lastExitOK) {
        //获得ConsumeQueue最大的物理偏移
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
        //异常文件不存在,则正常恢复;否则,异常恢复
        if (lastExitOK) {
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
        } else {
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }
        //在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量
        this.recoverTopicQueueTable();
    }
}

4.2 正常恢复

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Began to recover from the last third file
            //Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。
            int index = mappedFiles.size() - 3;
            if (index < 0) {
                index = 0;
            }

            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            //代表当前已校验通过的offset
            long mappedFileOffset = 0;
            while (true) {
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();
                // Normal data
                //查找结果为true,并且消息长度大于0,表示消息正确. mappedFileOffset向前移动本消息长度
                if (dispatchRequest.isSuccess() && size > 0) {
                    mappedFileOffset += size;
                }
                // Come the end of the file, switch to the next file Since the
                // return 0 representatives met last hole,
                // this can not be included in truncate offset
                //如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。
                else if (dispatchRequest.isSuccess() && size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // Current branch can not happen
                        log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
                // Intermediate file read error
                // 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环
                else if (!dispatchRequest.isSuccess()) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }
            //更新MappedFileQueue的flushedWhere和committedWhere指针
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            //删除offset之后的所有文件
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        } else {
            // Commitlog case files are deleted
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
    }

4.3 异常恢复

@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    //....................省略
    if (!mappedFiles.isEmpty()) {
        // Looking beginning to recover from which file
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            //判断消息文件是否是一个正确的文件
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }
        //根据索引取出mappedFile文件
        if (index < 0) {
            index = 0;
            mappedFile = mappedFiles.get(index);
        }
        //..........验证消息的合法性,并将消息转发到消息消费队列和索引文件
           
    }else{
        //未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

5 刷盘机制

RocketMQ的存储是基于JDK NIO内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。

5.1 同步刷盘

消息追加到内存后,立即将数据刷写到磁盘文件。调用从CommitLog#asyncPutMessage()开始,调用方法入下,看同步刷盘部分:

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // Synchronization flush,同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            //刷盘服务
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                //封装刷盘请求
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                //提交刷盘请求
                service.putRequest(request);
                //异步返回
                return request.future();
            } else {
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // Asynchronous flush,异步刷盘
        else {
            //如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else  {
                //如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。
                commitLogService.wakeup();
            }
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }

提交刷盘请求后,由于GroupCommitService也是继承自ServiceThread,因而执行刷盘请求要看run()方法,这里我们关注doCommit()

private void doCommit() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        //已刷大于等于待刷指针,则证明刷盘成功。消息可能跨文件存储,所以刷两次
                        boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            CommitLog.this.mappedFileQueue.flush(0);
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                        }

                        req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

整个同步刷盘流程如下:

同步刷盘

5.2 异步刷盘

在消息追加到内存后,立即返回给消息发送端。

  • 如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加堆外内存,然后提交物理文件的内存映射中,然后刷写到磁盘
  • 如果未开启transientStorePoolEnable,消息直接追加物理文件直接映射文件中,然后刷写到磁盘中。
    异步刷盘

开启transientStorePoolEnable后异步刷盘步骤:

  • 将消息直接追加到ByteBuffer(堆外内存)
  • CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中
  • MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
  • commit操作成功返回,将committedPosition位置恢复
  • FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘 代码:CommitLog$CommitRealTimeService#run

6 过期文件删除机制

由于RocketMQ操作CommitLogConsumerQueue文件是基于内存映射机制并在启动的时候加载CommitLogConsumerQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件

RocketMQ顺序写CommitLog、ConsumerQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前的文件在下一个文件创建后将不会再被更新。

RocketMQ清除过期文件的方法时:

如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。

默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时


文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录