1 demo准备
1.1 Producer
/**
* @author zyx
* 延时消息-生产者
*/
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
// delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(4);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
1.2 Consumer
/**
* @author zyx
* 延时消息-消费者
*/
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topics
consumer.subscribe("ScheduledTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
2 源码追踪
Apache RocketMQ 一共支持18个等级
的延迟投递,具体时间如下:
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
对于生产者而言,延迟消息与普通消息没有多大差异,就按照普通消息发送的。只是把消息对象Message的setDelayTimeLevel(int level)
方法设置一个延迟等级,这样该条消息就是一个延迟消息了。
2.1 Broker保存
通过前面的学习,我们也大概了解了消息发送的过程,这里就会省略一些步骤了。
Broker处理生产者发送过来的消息,请求代号为:SEND_MESSAGE_V2
,其对应的处理器为:SendMessageProcessor
。依次通过processRequest()
、asyncProcessRequest()
、asyncSendMessage()
等方法的处理,然后调用DefaultMessageStore#asyncPutMessage()
进行消息的落地。
处理过程与普通消息大同小异,就是一些小的方面做了些处理:
//CommitLog
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 1.记录消息存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// 2.Set the message body BODY CRC (consider the most appropriate setting on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
//3.获取消息存储服务
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 4.延迟消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//5.将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//6.将queueid替换为(延迟级别-1),每个延迟级别都对应一条队列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
//7.备份原始的topic/queueid, 留着后面解析
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//8.将消息topic设置为延迟topic,这样订阅该topic的消费者不能及时去消费了
//等到延迟时间到了,将延迟topic再替换成原始topic,消费者就可以消费了
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
//..........省略
}
}
总结下该方法处理的过程:
- 将原始topic替换为延迟消息固定的topic:
SCHEDULE_TOPIC_XXXX
,所有的延时消息共用这一个topic; - 将原始queueid替换为:
延迟级别 - 1
,相同延迟级别的消息会在同一个队列中;这样做最大的好处就是避免了排序,举个简单的例子:上午10:00,broker收到了一条
延迟消息1
,延迟级别为5;然后在10:02又收到了一条延迟消息2
,延迟级别也为5,由于延迟级别相同,他们会存储在同一条队列中;
由于队列天生有序,入队时间先按送达broker的时间先后进行排序,而同一队列上延迟时间也相同,因此延迟消息1
一定会在延迟消息2
前进行消消费,后面如果有消息再进入该队列中,也会按照先进先出
的方式进行消费。 - 备份原始topic/queueid, 保存到原始消息的properties属性至propertiesString。
这样就处理完了一条延迟消息,然后就是存储消息,和普通一样,这里就不展示了。
不过在消息分发(构建消息索引)时,将索引单元的tag hashcode
替换为消息的投递时间
,处理方法为CommitLog#checkMessageAndReturnSize()
,该方法在目前我们熟知的有两个地方(DLedger的后面另说)有调用:
- 消息恢复,重建索引;
- 定时任务,更新ConsumeQueue、IndexFile,即
ReputMessageService
。CommitLog#checkMessageAndReturnSize()
主要看以下的代码://CommitLog#checkMessageAndReturnSize() // Timing message processing { String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) { int delayLevel = Integer.parseInt(t); if (delayLevel > this.defaultMessageStore.getMaxDelayLevel()) { delayLevel = this.defaultMessageStore.getMaxDelayLevel(); } if (delayLevel > 0) { //如果是延迟消息,构建tagcode=消息写入时间 + 延迟级别对应的时间 tagsCode = this.defaultMessageStore.computeDeliverTimestamp(delayLevel, storeTimestamp); } } }
2.2 投递与持久化
既然消息已经更改了topic,那消费者肯定是拉取不到的。只有到了时间,Broker才会将该消息还原为原来的topic,让消费者能够拉取到并消费掉。
处理这个过程的类为:ScheduleMessageService
。其会在BrokerController执行initialize()
时,创建DefaultMessageStore
实例时初始化一个实例。其本身继承自ConfigManager
,我们可以看看其start()
干了啥。
public class ScheduleMessageService extends ConfigManager {
/**
* 延迟消息服务的启动方式
*/
public void start() {
// CAS 锁机制保证必须 shutdown 后才能再次start
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
//1.定时处理延迟消息
if (timeDelay != null) {
//每一个延迟级别对应一个DeliverDelayedMessageTimerTask
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 2.每隔10s,将延迟消息的相关信息持久化到硬盘中
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
}
主要做了两件事:
- 遍历所有的延迟级别,为每个延迟级别在延迟
FIRST_DELAY_TIME
毫秒后就处理延迟消息的投递操作
; - 开启执久化定时任务:定时将延迟消息的相关信息
持久化
到硬盘中.
2.2.1 消息投递
投递主要看DeliverDelayedMessageTimerTask
,其实质是个线程类,我们看看其run()
方法:
//DeliverDelayedMessageTimerTask
@Override
public void run() {
try {
if (isStarted()) {
//重点
this.executeOnTimeup();
}
} catch (Exception e) {
// 10s后,重新加入任务队列
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
关键看executeOnTimeup()
方法:
public void executeOnTimeup() {
//根据延迟topic和延迟queueid 去获取Consumequeue
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel)
);
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
// 1. 消息的写入的时间
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
// 2. 计算投递时间,投递时间 = 消息写入时间 + 延迟级别对应的时间
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
// 处理投递时间,保证投递时间必须小于(当前时间 + 延迟级别对应的时间)
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
// 小于等于0,表示消费需要投递
if (countdown <= 0) {
//去broker中将消息读取出来
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt != null) {
try {
//构建新的消息体,将原来的消息信息设置到这里,并将topic和queueid还原为原始的topic和queueid(前面备份过)
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner);
continue;
}
//投递,将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了
PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
总结下这个方法的作用及过程:
- 根据延迟topic和延迟queueid获取consumequeue,并从队列中读取索引单元;
- 计算消息的投递时间。从索引单元中取出消息的保存时间(延迟消息的索引单元会将
tag hashcode
替换为消息的存储时间),然后根据延迟等级获取出延迟时间,然后二者相加就是消息的投递时间,即:投递时间 = 消息写入时间 + 延迟级别对应的时间
; - 如果投递时间到了,
- 则根据索引单元中的
commitlog offset
和msg size
将该条消息A
从commitlog中读取出来; - 将读取出来的消息属性复制到一个新的
消息B
中,将消息A
中备份的原始topic
,queueid
读取出来重新设置到消息B
中,并清除延迟属性,使其成为一条普通消息; - 调用
CommitLog#putMessage(msg)
方法,再次将消息B
写入到commitlog中。这样消费者就可以消费到订阅了该topic的消息。
- 则根据索引单元中的
- 如果投递时间没到
- 计算剩余投递时间
countdown
(投递时间-当前时间),然后开启一个JDK的Timer延迟任务,延迟时间就是countdown,继续执行DeliverDelayedMessageTimerTask
的逻辑;
- 计算剩余投递时间
- 更新延迟消息队列的消费进度(后面持久化也就是指的它)。
2.2.2 消息持久化
回到ScheduleMessageService#start()
,关键的第2步就是持久化任务。
//ScheduleMessageService#start()
// 2.每隔10s,将延迟消息的相关信息持久化到硬盘中
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
该任务会定期执行ConfigManager#persist
方法进行持久化操作,即定期将延迟队列的消费位置偏移量持久化到文件中:
public abstract class ConfigManager {
public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
//将json字符串写入到文件中
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}
}
3 总结
- 发送消息时,通过
setDelayTimeLevel(int level)
来设置延迟等级,RocketMQ默认支持18种延迟等级
,每个延迟等级对应不同的延迟时间; - 所有延迟消息共用一个topic:
SCHEDULE_TOPIC_XXXX
; - 相同延迟等级的消息会放到同一个队列中(queueid=delayLevel - 1);
- 相同等级的消息会被同一个线程去处理。