【RocketMQ学习】14.源码之延迟消息


1 demo准备

1.1 Producer

/**
 * @author zyx
 * 延时消息-生产者
 */
public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 实例化一个生产者来产生延时消息
        DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
            // delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            message.setDelayTimeLevel(4);
            // 发送消息
            producer.send(message);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

1.2 Consumer

/**
 * @author zyx
 * 延时消息-消费者
 */
public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topics
        consumer.subscribe("ScheduledTopic", "*");
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

2 源码追踪

Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level)延迟时间投递等级(delay level)延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

对于生产者而言,延迟消息与普通消息没有多大差异,就按照普通消息发送的。只是把消息对象Message的setDelayTimeLevel(int level)方法设置一个延迟等级,这样该条消息就是一个延迟消息了。

2.1 Broker保存

通过前面的学习,我们也大概了解了消息发送的过程,这里就会省略一些步骤了。

Broker处理生产者发送过来的消息,请求代号为:SEND_MESSAGE_V2,其对应的处理器为:SendMessageProcessor。依次通过processRequest()asyncProcessRequest()asyncSendMessage()等方法的处理,然后调用DefaultMessageStore#asyncPutMessage()进行消息的落地。

处理过程与普通消息大同小异,就是一些小的方面做了些处理:

//CommitLog
public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // 1.记录消息存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());
        // 2.Set the message body BODY CRC (consider the most appropriate setting on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;
        //3.获取消息存储服务
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 4.延迟消息
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //5.将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //6.将queueid替换为(延迟级别-1),每个延迟级别都对应一条队列
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                //7.备份原始的topic/queueid, 留着后面解析
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                //8.将消息topic设置为延迟topic,这样订阅该topic的消费者不能及时去消费了
                //等到延迟时间到了,将延迟topic再替换成原始topic,消费者就可以消费了
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
        //..........省略
    }
}

总结下该方法处理的过程:

  • 将原始topic替换为延迟消息固定的topic:SCHEDULE_TOPIC_XXXX,所有的延时消息共用这一个topic;
  • 将原始queueid替换为:延迟级别 - 1,相同延迟级别的消息会在同一个队列中;

    这样做最大的好处就是避免了排序,举个简单的例子:上午10:00,broker收到了一条延迟消息1,延迟级别为5;然后在10:02又收到了一条延迟消息2,延迟级别也为5,由于延迟级别相同,他们会存储在同一条队列中;
    由于队列天生有序,入队时间先按送达broker的时间先后进行排序,而同一队列上延迟时间也相同,因此延迟消息1一定会在延迟消息2前进行消消费,后面如果有消息再进入该队列中,也会按照先进先出的方式进行消费。

  • 备份原始topic/queueid, 保存到原始消息的properties属性至propertiesString。

这样就处理完了一条延迟消息,然后就是存储消息,和普通一样,这里就不展示了。

不过在消息分发(构建消息索引)时,将索引单元的tag hashcode替换为消息的投递时间,处理方法为CommitLog#checkMessageAndReturnSize(),该方法在目前我们熟知的有两个地方(DLedger的后面另说)有调用:

  • 消息恢复,重建索引;
  • 定时任务,更新ConsumeQueue、IndexFile,即ReputMessageService
    CommitLog#checkMessageAndReturnSize()主要看以下的代码:
    //CommitLog#checkMessageAndReturnSize()
    // Timing message processing
                    {
                        String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                        if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
                            int delayLevel = Integer.parseInt(t);
    
                            if (delayLevel > this.defaultMessageStore.getMaxDelayLevel()) {
                                delayLevel = this.defaultMessageStore.getMaxDelayLevel();
                            }
    
                            if (delayLevel > 0) {
                            //如果是延迟消息,构建tagcode=消息写入时间 + 延迟级别对应的时间
                                tagsCode = this.defaultMessageStore.computeDeliverTimestamp(delayLevel, storeTimestamp);
                            }
                        }
                    }
    ConsumeQueue条目变化

2.2 投递与持久化

既然消息已经更改了topic,那消费者肯定是拉取不到的。只有到了时间,Broker才会将该消息还原为原来的topic,让消费者能够拉取到并消费掉。

处理这个过程的类为:ScheduleMessageService。其会在BrokerController执行initialize()时,创建DefaultMessageStore实例时初始化一个实例。其本身继承自ConfigManager,我们可以看看其start()干了啥。

public class ScheduleMessageService extends ConfigManager {
    /**
     * 延迟消息服务的启动方式
     */
    public void start() {
        // CAS 锁机制保证必须 shutdown 后才能再次start
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                //1.定时处理延迟消息
                if (timeDelay != null) {
                    //每一个延迟级别对应一个DeliverDelayedMessageTimerTask
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
            // 2.每隔10s,将延迟消息的相关信息持久化到硬盘中
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        if (started.get()) {
                            ScheduleMessageService.this.persist();
                        }
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }
}

主要做了两件事:

  • 遍历所有的延迟级别,为每个延迟级别在延迟FIRST_DELAY_TIME毫秒后就处理延迟消息的投递操作;
  • 开启执久化定时任务:定时将延迟消息的相关信息持久化到硬盘中.

2.2.1 消息投递

投递主要看DeliverDelayedMessageTimerTask,其实质是个线程类,我们看看其run()方法:

//DeliverDelayedMessageTimerTask
@Override
        public void run() {
            try {
                if (isStarted()) {
                    //重点
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // 10s后,重新加入任务队列
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
            }
        }

关键看executeOnTimeup()方法:

public void executeOnTimeup() {
            //根据延迟topic和延迟queueid 去获取Consumequeue
            ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
                    TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel)
            );
            long failScheduleOffset = offset;

            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    // 1. 消息的写入的时间
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    // 2. 计算投递时间,投递时间 = 消息写入时间 + 延迟级别对应的时间
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }
                            // 处理投递时间,保证投递时间必须小于(当前时间 + 延迟级别对应的时间)
                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;
                            // 小于等于0,表示消费需要投递
                            if (countdown <= 0) {
                                //去broker中将消息读取出来
                                MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        //构建新的消息体,将原来的消息信息设置到这里,并将topic和queueid还原为原始的topic和queueid(前面备份过)
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner);
                                            continue;
                                        }
                                        //投递,将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了
                                        PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);

                                        if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                                        /*
                                         * XXX: warn and notify me
                                         */
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else {
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {

                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else {

                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

总结下这个方法的作用及过程:

  • 根据延迟topic和延迟queueid获取consumequeue,并从队列中读取索引单元;
  • 计算消息的投递时间。从索引单元中取出消息的保存时间(延迟消息的索引单元会将tag hashcode替换为消息的存储时间),然后根据延迟等级获取出延迟时间,然后二者相加就是消息的投递时间,即:投递时间 = 消息写入时间 + 延迟级别对应的时间
  • 如果投递时间到了,
    • 则根据索引单元中的commitlog offsetmsg size将该条消息A从commitlog中读取出来;
    • 将读取出来的消息属性复制到一个新的消息B中,将消息A中备份的原始topic, queueid读取出来重新设置到消息B中,并清除延迟属性,使其成为一条普通消息;
    • 调用CommitLog#putMessage(msg)方法,再次将消息B写入到commitlog中。这样消费者就可以消费到订阅了该topic的消息。
  • 如果投递时间没到
    • 计算剩余投递时间countdown(投递时间-当前时间),然后开启一个JDK的Timer延迟任务,延迟时间就是countdown,继续执行DeliverDelayedMessageTimerTask的逻辑;
  • 更新延迟消息队列的消费进度(后面持久化也就是指的它)。
延迟队列概览

2.2.2 消息持久化

回到ScheduleMessageService#start(),关键的第2步就是持久化任务。

//ScheduleMessageService#start()
            // 2.每隔10s,将延迟消息的相关信息持久化到硬盘中
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        if (started.get()) {
                            ScheduleMessageService.this.persist();
                        }
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

该任务会定期执行ConfigManager#persist方法进行持久化操作,即定期将延迟队列的消费位置偏移量持久化到文件中

public abstract class ConfigManager {
  public synchronized void persist() {
    String jsonString = this.encode(true);
    if (jsonString != null) {
      String fileName = this.configFilePath();
      try {
        //将json字符串写入到文件中
        MixAll.string2File(jsonString, fileName);
      } catch (IOException e) {
        log.error("persist file " + fileName + " exception", e);
      }
    }
  }
}

3 总结

  • 发送消息时,通过setDelayTimeLevel(int level)来设置延迟等级,RocketMQ默认支持18种延迟等级,每个延迟等级对应不同的延迟时间;
  • 所有延迟消息共用一个topic: SCHEDULE_TOPIC_XXXX
  • 相同延迟等级的消息会放到同一个队列中(queueid=delayLevel - 1);
  • 相同等级的消息会被同一个线程去处理。

文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录