1 整体流程
2 核心类
2.1 DefaultMessageStore
部分属性:
private final MessageStoreConfig messageStoreConfig; //消息配置属性
private final CommitLog commitLog; //CommitLog文件存储的实现类
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; //消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService; //消息队列文件刷盘线程
private final CleanCommitLogService cleanCommitLogService; //清除CommitLog文件服务
private final CleanConsumeQueueService cleanConsumeQueueService; //清除ConsumerQueue队列文件服务
private final IndexService indexService; //索引实现类
private final AllocateMappedFileService allocateMappedFileService; //MappedFile分配服务
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService; //存储HA机制
private final ScheduleMessageService scheduleMessageService; //消息服务调度线程
private final StoreStatsService storeStatsService; //消息存储服务
private final TransientStorePool transientStorePool; //消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager; //Broker状态管理器
private final MessageArrivingListener messageArrivingListener; //消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig; //Broker配置类
private StoreCheckpoint storeCheckpoint; //文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList; //CommitLog文件转发请求
以流程图为准,以普通消息
为例,从DefaultMessageStore#asyncPutMessage
出发,源码追踪。
public class DefaultMessageStore implements MessageStore {
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
//1.检查Store状态
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
//2.检查消息本体
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
long beginTime = this.getSystemClock().now();
//3.将消息写入CommitLog文件
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
//4.putMessage过程耗时记录
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
});
return putResultFuture;
}
/** 检查Store状态 */
private PutMessageStatus checkStoreStatus() {
//1.是否关机
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
//2.判断Broker角色,如果是从节点,则无需写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("broke role is slave, so putMessage is forbidden");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
//3.判断当前写入状态,如果是正在写入,则不能继续
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
"the broker's disk is full, write to logic queue error, write to index file error, etc");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {
this.printTimes.set(0);
}
//4.判断系统PageCache缓存去是否占用
if (this.isOSPageCacheBusy()) {
return PutMessageStatus.OS_PAGECACHE_BUSY;
}
return PutMessageStatus.PUT_OK;
}
/** 检查消息本体 */
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
//1.判断消息主题长度是否超过最大限制
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
//2.判断消息属性长度是否超过限制
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
}
其中,checkStoreStatus
检查Store状态,checkMessage
检查消息本体。检查完毕没问题后,则调用commitLog#asyncPutMessage
进行消息落地。
2.2 CommitLog
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 1.记录消息存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// 2.Set the message body BODY CRC (consider the most appropriate setting on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
//3.获取消息存储服务
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 4.延迟消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//5.将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//6.将queueid替换为(延迟级别-1),每个延迟级别都对应一条队列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
//7.备份原始的topic/queueid, 留着后面解析
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//8.将消息topic设置为延迟topic,这样订阅该topic的消费者不能及时去消费了
//等到延迟时间到了,将延迟topic再替换成原始topic,消费者就可以消费了
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
//9.判断mappedFile,如果为空或者已满,创建新的mappedFile文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//10.如果创建失败,直接返回
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
//11.写入消息到mappedFile中(还未刷盘)
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
//...........
} finally {
putMessageLock.unlock();
}
//...........
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//12.flush,刷盘
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//13.主从同步
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
}
return putMessageResult;
});
}
}
其中,第11步写入mappedFile时,只是写缓冲区,并未刷盘,刷盘要等到第12步进行。
2.3 MappedFile
MappedFile#appendMessage(msg, this.appendMessageCallback);
开始写入缓冲区ByteBuffer
。其中appendMessageCallback
的类为CommitLog
内部类,写完缓冲区会回调给CommitLog#putMessage
。
public class MappedFile extends ReferenceResource {
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
//1.获取写指针
int currentPos = this.wrotePosition.get();
//如果指针大于文件大小则直接返回
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
//2.写缓冲,通过回调方法写入
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//3.更新写指针
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
}
第2步写缓冲,采用了回调,调用了CommitLog
的内部类DefaultAppendMessageCallback
进行写,方法有点长。
class DefaultAppendMessageCallback implements AppendMessageCallback {
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
this.resetByteBuffer(storeHostHolder, storeHostLength);
String msgId;
//生成消息ID
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
//获得该消息在消息队列中的偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
//获得消息属性长度
final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
//获得消息主题大小
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
//获得消息体大小
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
//计算消息总长度
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
//消息长度不能超过4M
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
//消息如果没有足够的存储空间则新创建CommitLog文件
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0) {
this.msgStoreItemMemory.put(propertiesData);
}
//将消息存储到ByteBuffer中,返回AppendMessageResult
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
}
回到DefaultMessageStore#asyncPutMessage
的第11步,写入mappedFile缓冲区完成,再刷盘、主从同步。
//12.flush,刷盘
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//13.主从同步
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
3 存储文件
存储文件结构详见3.1 存储文件一节。
3.1 CommitLog文件
首先说说MappedFile的作用,如下图:
再从创建文件看看源码,定位到CommitLog#asyncPutMessage()
第9~11步:
//9.判断mappedFile,如果为空或者已满,创建新的mappedFile文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
老版本以前叫protected MappedFile tryCreateMappedFile(long createOffset) {}
,这里我们看看mappedFile
命名规则,看UtilAll.offset2FileName(createOffset)
方法:
public static String offset2FileName(final long offset) {
final NumberFormat nf = NumberFormat.getInstance();
//文件名长度:20
nf.setMinimumIntegerDigits(20);
nf.setMaximumFractionDigits(0);
nf.setGroupingUsed(false);
return nf.format(offset);
}
在MappedFileQueue#getLastMappedFile(startOffset, true)
中,通过AllocateMappedFileService#putRequestAndReturnMappedFile
创建mappedFile
,这个是个异步操作。因为AllocateMappedFileService
也是和PullMessageService
、RebalanceService
一样,继承自ServiceThread
,而在上一章节中,我们说过ServiceThread
继承自Thread
.
关注AllocateMappedFileService#mmapOperation()
中如下代码即可:
MappedFile mappedFile;
//TransientStorePoolEnable为true并且开启异步刷盘,则消息数据先写入堆外内存,然后异步线程把堆外内存数据刷到 PageCache
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
//写入MappedByteBuffer缓冲区,建立磁盘文件-内存映射关系
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
}
//TransientStorePoolEnable为false,则直接写入 PageCache
else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
写入消息时,
- 如果
isTransientStorePoolEnable
方法返回true
,则消息数据先写入堆外内存
,然后异步线程把堆外内存数据刷到 PageCache
; - 如果返回
false
,则直接写入PageCache
。后面根据刷盘策略把 PageCache 中数据持久化到磁盘。如下图:
但无论先写对堆外内存还是直接写 PageCache,文件数据都会映射到 MappedByteBuffer
。不同的是,如果消息先写入堆外内存,则 MappedByteBuffer
主要用来读消息
,堆外内存
用来写消息
。这一定程度上实现了读写分离,减少 PageCache
写入压力。
再来看看mappedFile.init()
方法,对于上述两种情况,其实就是第一种情况多走了一步写对外内存(PS:这里有零拷贝
出现,赶紧标记下)。
为了让
CommitLog
操作效率更高,RocketMQ 使用了mmap
将磁盘上日志文件映射到用户态的内存地址中,减少日志文件从磁盘到用户态内存之间的数据拷贝。
public class MappedFile extends ReferenceResource {
//开启transientStorePoolEnable和异步刷盘,走这里
public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
//初始化writeBuffer
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
//未开启transientStorePoolEnable或者同步刷盘,走这里
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//MMAP,刷盘会在CommitLog#asyncPutMessage(final MessageExtBrokerInner msg)方法中落实
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
}
writeBuffer
还有印象吗?看看2.3节。
3.2 ConsumeQueue/Index文件
消息消费队列文件
(ConsumeQueue)、消息属性索引文件
(IndexFile)都是基于CommitLog文件
构建的,当消息生产者提交的消息存储在CommitLog文件中,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费
,根据消息属性查找消息也会出现较大延迟。
RocketMQ通过开启一个线程ReputMessageService
来准实时转发CommitLog文件更新事件,相应的任务处理器
根据转发的消息及时更新ConsumerQueue、IndexFile文件。
在 DefaultMessageStore 初始化的时候会启动一个线程 ReputMessageService
,这个线程的逻辑是死循环里面每隔1ms
执行一次,从 CommitLog 中获取消息然后写入 ConsumeQueue
和 indexFile
文件。
其本质上也是一个线程:
class ReputMessageService extends ServiceThread{
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
}
调用路径:ReputMessageService#doReput()
->DefaultMessageStore.this.doDispatch
->CommitLogDispatcher#dispatch()
.
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
而CommitLogDispatcher
有两个实现类:
CommitLogDispatcherBuildConsumeQueue
:写ConsumeQueue;CommitLogDispatcherBuildIndex
:写Index.
其中,dispatcherList
如下构建:
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
//ConsumeQueue、Index分发处理器
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
}
可以看到,即使 Broker 挂了,只要 CommitLog 在,就可以重新构建出 ConsumeQueue 和 Index 文件。
上述调用流程图如下:
4 消息队列和索引文件恢复
由于RocketMQ存储首先将消息全量存储在CommitLog文件中,然后异步
生成转发任务更新ConsumerQueue和Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,导致CommitLog、ConsumerQueue、IndexFile文件数据不一致
。如果不加以人工修复的话,会有一部分消息即便在CommitLog中文件中存在,但由于没有转发到ConsumerQueue,这部分消息将永远无法被消费者消费。
4.1 存储文件加载
DefaultMessageStore#load
判断上一次是否异常退出。实现机制是Broker在启动时创建abort文件
,在退出时通过JVM钩子函数
删除abort文件。如果下次启动时存在
abort文件。说明Broker时异常退出
的,CommitLog与ConsumerQueue数据有可能不一致,需要进行修复。
public class DefaultMessageStore implements MessageStore {
//DefaultMessageStore#load
public boolean load() {
boolean result = true;
try {
//判断临时文件是否存在
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
//加载延时队列
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// 加载CommitLog文件
result = result && this.commitLog.load();
// 加载 Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
//加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点
this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//加载index文件
this.indexService.load(lastExitOK);
//根据Broker是否异常退出,执行不同的恢复策略
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
private void recover(final boolean lastExitOK) {
//获得ConsumeQueue最大的物理偏移
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
//异常文件不存在,则正常恢复;否则,异常恢复
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
//在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量
this.recoverTopicQueueTable();
}
}
4.2 正常恢复
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
//Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。
int index = mappedFiles.size() - 3;
if (index < 0) {
index = 0;
}
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
//代表当前已校验通过的offset
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
//查找结果为true,并且消息长度大于0,表示消息正确. mappedFileOffset向前移动本消息长度
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
//如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
// 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
//更新MappedFileQueue的flushedWhere和committedWhere指针
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
//删除offset之后的所有文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
4.3 异常恢复
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
//....................省略
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
//判断消息文件是否是一个正确的文件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
//根据索引取出mappedFile文件
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
//..........验证消息的合法性,并将消息转发到消息消费队列和索引文件
}else{
//未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
5 刷盘机制
RocketMQ的存储是基于JDK NIO
的内存映射机制
(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。
5.1 同步刷盘
消息追加到内存后,立即将数据刷写到磁盘文件。调用从CommitLog#asyncPutMessage()
开始,调用方法入下,看同步刷盘部分:
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush,同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//刷盘服务
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
//封装刷盘请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
//提交刷盘请求
service.putRequest(request);
//异步返回
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush,异步刷盘
else {
//如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
//如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
提交刷盘请求后,由于GroupCommitService
也是继承自ServiceThread
,因而执行刷盘请求要看run()
方法,这里我们关注doCommit()
:
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
//已刷大于等于待刷指针,则证明刷盘成功。消息可能跨文件存储,所以刷两次
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
整个同步刷盘流程如下:
5.2 异步刷盘
在消息追加到内存后,立即返回给消息发送端。
- 如果开启
transientStorePoolEnable
,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存
,该堆外内存将使用内存锁定
,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存
,然后提交到物理文件的内存映射
中,然后刷写到磁盘。 - 如果未开启
transientStorePoolEnable
,消息直接追加到物理文件直接映射文件
中,然后刷写到磁盘中。
开启transientStorePoolEnable后异步刷盘步骤:
- 将消息直接追加到ByteBuffer(堆外内存)
CommitRealTimeService
线程每隔200ms
将ByteBuffer新追加内容提交到MappedByteBuffer中- MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
- commit操作成功返回,将committedPosition位置恢复
FlushRealTimeService
线程默认每500ms
将MappedByteBuffer中新追加的内存刷写到磁盘 代码:CommitLog$CommitRealTimeService#run
6 过期文件删除机制
由于RocketMQ操作CommitLog
、ConsumerQueue
文件是基于内存映射机制并在启动的时候加载CommitLog
、ConsumerQueue
目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件
。
RocketMQ顺序写
CommitLog、ConsumerQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前的文件在下一个文件创建后将不会再被更新。
RocketMQ清除过期文件的方法时:
如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。
默认每个文件的过期时间为
72小时
,通过在Broker配置文件中设置fileReservedTime
来改变过期时间,单位为小时
。