1 前言
RocketMQ支持消息过滤和重试。对于消息过滤,其支持两种方式:
- TAG过滤:相当于子topic,对同一topic的消息再次进行过滤,分发至相应的消费者。
- SQL过滤:支持SQL92标准形式,对于producer传递的属性值进行过滤。SQL中可以使用的参数有默认的
TAGS
和一个在生产者中加入的a属性
。语法形式参见基础知识3.1.2节。
消费端如果发生消息失败,没有提交成功,消息默认情况下会进入重试队列
中。 重试队列的名字其实是跟消费群组
有关,不是主题,因为一个主题可以有多个群组消费。
本章将从这两方面对其原理,源码进行更细致地学习。
这里先加点本地调试环境的部署过程,因为自己踩坑了,所以要记录一下。网上很多所谓的教程,不堪大用,参考即可。
1.1 源码获取
这里下载各种版本。我下的RocketMQ版本为:4.8.0。
1.2 环境变量
因为如果不是源码调试,若在本地启动打包好的东西也是需要配置ROCKETMQ_HOME
的。所以我这里就采用了环境变量方式,当然也可以在NameServer
和Broker
启动项里配置。如下图:
1.3 必要目录
在RocketMQ运行主目录中,即环境变量ROCKETMQ_HOME
配置的目录下,创建conf
、logs
、store
三个文件夹。
我这里配置的环境变量值为:/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release
。不要以为版本冲突,只是个目录名称。
1.4 配置文件
从RocketMQ distribution
部署目录中将broker.conf
、logback_broker.xml
、logback_namesrv.xml
文件复制到conf
目录中。
1.5 配置修改
将logback_namesrv.xml
、logback_broker.xml
文件中的{user.home}
全部修改为环境变量配置的目录。注意,是全部替换掉。
然后,修改broker.conf
配置,如下,主要是文件目录配置,当然还有开启SQL过滤:
brokerClusterName = DefaultCluster
brokerName = broker-zyxelva
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# nameServer地址,分号分隔
namesrvAddr=172.104.126.76:9876
#SQL过滤
enablePropertyFilter=true
# 存储路径
storePathRootDir=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store
# commitLog 存储路径
storePathCommitLog=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/consumequeue
# 消息索引存储路径
storePathIndex=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/index
# checkpoint 文件存储路径
storeCheckpoint=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/checkpoint
# abort 文件存储路径
abortFile=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/abort
1.6 启动NameServer和Broker
NameServer直接点击启动main()
即可,但对于Broker,一定要配置-c
,指定broker.conf
的路径。不然,就像我一样掉坑里了,始终是默认配置启动的。如1.2节图所示。
在IDEA启动日志下看到如下日志,代表NameServer启动成功:
Connected to the target VM, address: '127.0.0.1:55891', transport: 'socket'
The Name Server boot success. serializeType=JSON
同样,对于Broker启动成功后,会有如下日志输出,特别是有NameServer地址输出,代表大概率启动成功,没有的话,就要看logs
下的broker.log
日志了:
Connected to the target VM, address: '127.0.0.1:53061', transport: 'socket'
The broker[broker-zyxelva, 172.104.126.76:10911] boot success. serializeType=JSON and name server is 172.104.126.76:9876
2 消息过滤
RocketMQ支持上述两种类型的消息过滤,在源码包中,也有对应的样例。下面,将从这些样例开始,对源码进行追踪。
另外,关于消息过滤,producer就只是把它当作普通消息
发送出去,并没有做什么额外的操作。
2.1 TAG过滤
2.1.1 Producer
/**
* @author zyxelva
* TAG过滤-生产者
*/
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
// 指定Namesrv地址信息.
producer.setNamesrvAddr("106.55.246.66:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
2.1.2 Consumer
以下消费者,根据consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");
可知,其订阅了主题为TagFilterTest
,TAG为TagA
或TAGB
或TAGC
的消息。实际生产上,RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic
,而应用中的不同业务
就用TAG
来区分。
/**
* @author zyxelva
* TAG过滤-消费者
*/
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("106.55.246.66:9876");
// 设置要过滤的TAG,多个使用 || 分开
consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
2.2 SQL过滤
若要使用SQL过滤,需要在broker配置中配置
enablePropertyFilter=true
,重启才会生效。
另外,属性值的命名也不要与内置的参数冲突。参见MessageConst.STRING_HASH_SET
静态代码块部分。2.2.1 Producer
/**
* @author zyxelva
* SQL过滤 -消息生产者(加入消息属性)
*/
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
// 指定Namesrv地址信息.
producer.setNamesrvAddr("106.55.246.66:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
//不仅设置了消息的tag,同时还通过msg.putUserProperty("a", String.valueOf(i));设置了自定义消息参数
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
2.2.2 Consumer
/**
* @author zyxelva
* SQL过滤-消费者
*/
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("106.55.246.66:9876");
// Don't forget to set enablePropertyFilter=true in broker
//SQL过滤
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
从以上代码来看,consumer会指定过滤规则,告诉broker自己能接收哪些消息,broker从而返回对应的消息。
2.3 源码追踪
生产者怎么发送消息的流程,这里不再赘述,详情可参见【RocketMQ学习】7-源码之Producer一文。
这里主要看消费端和服务端Broker是怎么处理的。
2.3.1 Consumer
2.3.1.1 订阅
Consumer会有两个地方向Broker发起订阅关系上报:
consumer.subscribe()
:路径为DefaultMQPushConsumer#subscribe()
、DefaultMQPushConsumerImpl#subscribe()
、MQClientInstance#sendHeartbeatToAllBroker()
。consumer.start()
心跳定时任务:路径为DefaultMQPushConsumer#start()
、DefaultMQPushConsumerImpl#start()
、MQClientInstance#start()
、MQClientInstance#startScheduledTask()
。//MQClientInstance private void startScheduledTask() { //定时任务二:定时向broker发送心跳,间隔30s this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); //发送心跳 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); }
本质上,都是通过发送心跳给broker进行的。方法为MQClientInstance#sendHeartbeatToAllBroker()
.
//MQClientInstance
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
//发送心跳
this.sendHeartbeatToAllBroker();
//已废弃
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
心跳流程如下图所示:
2.3.1.2 拉取
在源码之Consumer2.1节 拉取中,了解了pullMessageService
的作用就是拉取broker端的消息的线程服务。
这里,先不详聊,先看看Broker是怎样处理消息的。等这个跟完,再来继续消费端的消息过滤流程。
2.3.2 Broker
2.3.2.1 消息接收与存储
直接定位到消息发送的接口处,DefaultMQProducerImpl#sendKernelImpl()
:
//DefaultMQProducerImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//省略...
//组装请求头,包含消息过滤的属性
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setFlag(msg.getFlag());
//设置属性信息(tag也在里边)
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
SendResult sendResult = null;
switch (communicationMode) {
//省略...
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
return sendResult;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
怎么发送的,这里不详细罗列了。最终,通过
NettyRemotingClient
发送出去,Broker接收,请求代号:SEND_MESSAGE
。Broker端处理器为SendMessageProcessor
。接下来就是消息存储,包括Commitlog,ConsumeQueue,IndexFile相关的了,可参考【RocketMQ学习】9.源码之Store一文。2.3.2.2 消息订阅关系维护
这里,直接通过请求代号为心跳的码跟进下,broker处理Consumer的心跳逻辑。请求代号:HEART_BEAT
.
定位到BrokerController#registerProcessor()
。得到处理心跳的处理器为ClientManageProcessor
,顺便也把Broker处理消息的处理器列出,方便后续消息过滤处理流程的跟进,处理器为PullMessageProcessor
,请求代号:PULL_MESSAGE
。
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
这里注册了ClientManageProcessor
,Consumer发送心跳时会经由其processRequest
处理:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
}
最终会调用
ConsumerFilterManager
的register
将订阅信息存储起来,这里只会保存SQL过滤的相关信息://ConsumerFilterManager
/**
* 将非TAG过滤的信息缓存
* @param topic 主题
* @param consumerGroup 消费者组名
* @param expression SQL表达式
* @param type 过滤类型 TAG or SQL
* @param clientVersion
* @return success?
*/
public boolean register(final String topic, final String consumerGroup, final String expression,
final String type, final long clientVersion) {
//TAG过滤,则不用注册消息过滤,这里主要针对SQL过滤的
if (ExpressionType.isTagType(type)) {
return false;
}
if (expression == null || expression.length() == 0) {
return false;
}
FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
if (filterDataMapByTopic == null) {
FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
filterDataMapByTopic = prev != null ? prev : temp;
}
BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}
broker接收心跳并存储订阅信息流程如下:
2.3.2.3 Broker消息过滤
Consumer通过pullMessageService
向Broker发起拉取消息的请求。PullMessageService
在讲Consumer的时候提到过,它继承自ServiceThread
,会执行消息的拉取任务。
public class PullMessageService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
//根据消费者组名,选择一个消费者
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//执行拉取请求:立即执行、延迟执行
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
}
接着调用DefaultMQPushConsumerImpl#pullMessage
:
//DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
//省略...
// 回调逻辑
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
}
break;
//省略
default:
break;
}
}
}
};
String subExpression = null;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {}
}
大致流程为:
- 先定义了从broker拉到消息
回调逻辑
; - 然后调用
PullAPIWrapper#pullKernelImpl
拉取消息并触发回调逻辑
; - 将远程拉到的消息放入本地队列提供消费,
consumeMessageService.submitConsumeRequest
会触发本地消费逻辑,最终会调用到我们定义的MessageListener
。
上面是Consumer发起拉取消息的请求过程,接下来就该Broker收到这个请求后的处理了。
拉取消息发送的请求代号是PULL_MESSAGE
,上面我们也提到了PULL_MESSAGE
对应的处理器是PullMessageProcessor
。
Broker处理拉取消息请求
时会调用PullMessageProcessor#processRequest
方法:
//PullMessageProcessor
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
//省略...
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
//省略...
}
构造消息过滤器并获取消息,调用的是DefaultMessageStore#getMessage()
,这里过滤过程分为两步:
ConsumeQueue中的TAG过滤
:根据offset获取对应的consumerQueue
,为啥?想想consumerQueue
条目的结构: 过滤时会调用传入的ExpressionMessageFilter
的isMatchedByConsumeQueue
方法,该方法就是根据tagCode
过滤消息的(不止这个功能,还有位图过滤
)://ExpressionMessageFilter @Override public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { if (null == subscriptionData) { return true; } if (subscriptionData.isClassFilterMode()) { return true; } // by tags code. //tag过滤,这里只判断tag的hashCode是否相等,但不同tag的hashCode可能相等,真正的tag过滤是在consumer中进行的。 if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { if (tagsCode == null) { return true; } if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { return true; } return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } else { //省略 } }
SQL过滤
而后,调用ExpressionMessageFilter#isMatchedByCommitLog()
进行SQL过滤
://ExpressionMessageFilter public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) { // 省略一些内容 ... Object ret = null; try { MessageEvaluationContext context = new MessageEvaluationContext(tempProperties); // 处理值,即执行SQL ret = realFilterData.getCompiledExpression().evaluate(context); } catch (Throwable e) { log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e); } log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties); if (ret == null || !(ret instanceof Boolean)) { return false; } return (Boolean) ret; }
realFilterData
就包含Consumer样例中的SQL:
(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)
2.3.2.4 Consumer消息过滤
接下来就是Consumer处理消息过滤了,就是上节发起前定义的消息回调方法:
//DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
//...省略
// 消息拉取的回调函数,在拉取到消息后会进入这个方法处理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 处理消息,将二制消息解码为java对象,也会对消息进行tag过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
//...省略
}
//...省略
}
//...省略
}
}
跟进
PullAPIWrapper#processPullResult
方法://PullAPIWrapper
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
// 将二进制数据解码为对象
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
// 按 TAG 过滤
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
// 根据tag过滤消息
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
//...省略
}
//...省略
}
从代码来看,方法中会根据TAG是否在
TagsSet
中来决定该消息是否需要加入msgListFilterAgain
,而msgListFilterAgain
就是过滤的消息列表了。2.4 总结
RocketMq消息过滤支持TAG
与SQL
两种方式,
2.4.1 TAG 方式
在broker获取消息时,根据TAG的hashCode
过滤一波消息,但这样得到的消息可能并不只是指定TAG的,因此需要在consumer上做进一步的过滤。
举例来说,consumer订阅了TAG为tag1的消息,tag1与tag11两者的hashCode都是100,因此在broker上过滤时,根据TAG的hashCode,这两者对应的消息都会发往consumer,因此consumer需要再进比较TAG的值,过滤出真正需要的消息。
2.4.2 SQL 方式
SQL方式的过滤方式,只在Broker
中进行。
3 消息重试
3.1 Producer重试
Producer对于one-way发送的消息不会有重试或重投的动作。对于同步消息、异步消息发送,Producer有几个参数配置:
retryTimesWhenSendFailed
:同步发送失败重投次数
,默认为2
,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1
次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException
、MQClientException
和部分MQBrokerException
时会重投。retryTimesWhenSendAsyncFailed
:异步发送失败重试次数
,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢
。retryAnotherBrokerWhenNotStoreOK
:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false
。十分重要消息可以开启。
可以在创建Producer实例后,进行配置:
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
3.2 Consumer重试
Apache RocketMQ 有两种消费模式:集群消费模式
和广播消费模式
。
- 消息重试只针对集群消费模式生效;
- 广播消费模式
不提供失败重试
特性,即消费失败后,失败消息不再重试,只会以警告日志
的形式记录消费失败的消息,继续消费新的消息。
同时RocketMQ Push消费提供了两种消费方式:并发消费
和顺序消费
。
顺序消费和并发消费的重试机制并不相同
:
- 顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,
- 而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为
%RETRY%ConsumerGroupName
,集群模式下并发消费每一个ConsumerGroup
会对应一个特殊Topic,并会订阅该Topic。
在哪儿订阅的?DefaultMQPushConsumerImpl#start()
启动时,会调用copySubscription()
,该方法的作用有:
- 将DefaultMQPushConsumer的订阅信息copy到
RebalanceService
中; - 如果是cluster模式,订阅了topic,则自动订阅
%RETRY%+ConsumerGroupName
的topic。
//DefaultMQPushConsumerImpl
/**
* 拷贝订阅信息至RebalanceService中,包括重试的
*
* @throws MQClientException 异常
*/
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
//1.将DefaultMQPushConsumer的订阅信息copy到RebalanceService中
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString);
//2.订阅信息保存至RebalanceService中
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
//3.Push模式集群消费下,每个消费者组都会自动订阅一个用于重试的topic:%RETRY%+ConsumerGroupName
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL);
//4.重试topic相关的订阅信息保存至RebalanceService中
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
两者参数差别如下:
消费类型 | 重试间隔 | 最大重试次数 |
---|---|---|
顺序消费 | 间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis | 最大重试次数可通过自定义参数MaxReconsumeTimes 取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX |
并发消费 | 间隔时间根据重试次数阶梯变化,取值范围:10秒~2小时 。不支持自定义配置 | 最大重试次数可通过自定义参数MaxReconsumeTimes 取值进行配置。默认值为16 次,该参数取值无最大限制,建议使用默认值 |
TCP协议并发消费(无序消息)重试间隔1如下:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10秒 | 9 | 7分钟 |
2 | 30秒 | 10 | 8分钟 |
3 | 1分钟 | 11 | 9分钟 |
4 | 2分钟 | 12 | 10分钟 |
5 | 3分钟 | 13 | 20分钟 |
6 | 4分钟 | 14 | 30分钟 |
7 | 5分钟 | 15 | 1小时 |
8 | 6分钟 | 16 | 2小时 |
- 若最大重试次数
小于等于16次
,则每次重试的间隔时间会阶梯变化,具体时间,请参见上表TCP协议无序消息重试间隔
。 - 若最大重试次数
大于16次
,则超过16次的间隔时间均为2小时
。
我们通过顺序消费和并发消费的状态,也可看出RocketMQ对于消息重试的处理方式:
//顺序消费,监听器的返回状态码
public enum ConsumeOrderlyStatus {
/**
* Success consumption
*/
SUCCESS,
/**
* Rollback consumption(only for binlog consumption)
*/
@Deprecated
ROLLBACK,
/**
* Commit offset(only for binlog consumption)
*/
@Deprecated
COMMIT,
/**
* Suspend current queue a moment
*/
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
//并发消费,监听器的返回状态码
public enum ConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER;
}
3.2.1 并发消费失败重试
我们从消费者监听往回看,Consumer从哪里处理这种失败的逻辑。以2.1.2节的Consumer为例。
从图中可以看出,调用监听处理消息消费结果的地方有两处:
consumeMessageDirectly
:这个是被DefaultMQAdminExtImpl
调用的,从名字我猜是给监控平台调用的,比如Console。读者有知道的可以告知留言。ConsumeRequest#run()
:ConsumeMessageConcurrentlyService
内部线程类,专用于处理消息结果,最终要发回Broker。
跟踪下ConsumeRequest
怎么处理消息结果的,当然向后肯定是给Broker发信息处理,这里先看看是哪里调用的。
既然ConsumeRequest
是个线程类,那就要看是哪里提交了线程任务,要么是定时处理,要么是提交任务队列。RocketMQ是通过提交线程队列处理的。
还记得之前的消费者回调吗?对的,就是2.3.2.3节的那个PullCallback
。这个方法老长了。我们截图把重要的消费保留下来:
消费回调是在消费者拉取到消息之后,对结果的一种处理。我们可以追踪下,PullCallback
真正调用的地方,其传递路径为:PullAPIWrapper#pullKernelImpl()
、MQClientAPIImpl#pullMessage()
、MQClientAPIImpl#pullMessageAsync()
。
回到提交线程队列的话题,最终ConsumeRequest
是提交到一个任务队列中,这里当然要成功拉取到消息。对于拉取不成功的情况,RocketMQ会进行重新拉取。
对于拉取成功的消息,进行消费、结果反馈。
ConsumeRequest
既然是个线程类,那就看看其run()
方法咯:
class ConsumeRequest implements Runnable {
//...........省略
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
//获取消费者定义的监听器
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
//钩子
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
//消费,这里会返回消费结果码,ConsumeConcurrentlyStatus
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) {
//向Broker反馈消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
//.........省略
}
首先就是获取消费者定义的监听器,就是2.1.2节的Consumer注册的那个监听。然后执行完消费,就是把重要的消费结果status
返回。这个消费结果status
会给到Broker。这个方法就是processConsumeResult
。
//向Broker反馈消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) {
return;
}
switch (status) {
// 1、消费成功,注意这个ackIndex,对于是否重试至关重要,其实还是消费结果status引起的
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
//2.消费重试,ackIndex = -1
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
//3.broadcast模式,处理失败,不做处理,只会打印日志
case BROADCASTING:
//4.广播模式:如果消费结果是 ackIndex=-1就会执行循环,可以看到只是打印日志,没有其它多余的操作
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//5.集群消费模式:RECONSUME_LATER 时,消费失败,ackIndex = -1,下面的循环才会执行,因为CONSUME_SUCCESS的结果中,ackIndex = consumeRequest.getMsgs().size() - 1
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//6.Cluster模式,将消息发回broker重新发送(重试)
//能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)
boolean result = this.sendMessageBack(msg, context);
//7.ACK 可能会失败,需要记录失败的ACK
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
//8.存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//9.发回broker失败,则再次尝试本地消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//将消费前缓存的消息清除
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
// 10.更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
根据上面源码我们可以得出以下结论:
- 由第4步我们可知:
广播模式
就算消费者消费失败,也不会进行重试,只是打印警告日志
; - 只有
消费失败
(没有返回 CONSUME_SUCCESS 都成为失败)的消息才需要发送ACK重试; - 如果
ACK失败
,(《RocketMQ技术内幕》中称为ACK失败
),即重试失败,就会继续被延迟5s
重新消费(又会回调到Consumer中的回调方法); - 消息被消费成功、失败,都会更新Consumer 的
偏移量
; ConsumeMessageConcurrentlyService.sendMessageBack
:准备将结果发回Broker。
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
//延迟级别,默认为0
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
//先发CONSUMER_SEND_MSG_BACK命令,让broker重发;失败,则通过topic为%RETRY%+consumerGroup的消息发给broker处理
log.info("=======delayLevel: {}", delayLevel);
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
sendMessageBack
会有两个动作:
- 先向Broker发送请求代号为
CONSUMER_SEND_MSG_BACK
的请求,让Broker去处理,后面会详细说明具体干了啥事。 - 如果这个请求发送异常,则将消息重新组装,向topic为
%RETRY%ConsumerGroupName
发送消息,让Broker延迟处理重试。
这里就是3.2节开头说的:
并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为
%RETRY%ConsumerGroupName
,集群模式下并发消费每一个ConsumerGroup
会对应一个特殊Topic,并会订阅该Topic。
3.2.2 顺序消费失败重试
对于顺序消息消费失败,需要返回的状态是SUSPEND_CURRENT_QUEUE_A_MOMENT
,意思就是暂停一下,等会儿再试。
与并发消费下的消息处理不同主要在于ConsumeRequest
的处理。这里不列出源码了。读者自己可以去跟一跟。
3.3 Broker重试处理
以并发消费,即无序消息处理为例。broker在收到消费者发出的请求代号为CONSUMER_SEND_MSG_BACK
后,着手处理消息重试逻辑。
处理这个请求的处理器为SendMessageProcessor
。
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
//处理客户端对于失败的消息进行重发
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
SendMessageProcessor#asyncConsumerSendMsgBack()
:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//........省略
//1.新topic:%RETRY%+consumerGroupName
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 2.重试队列:1
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
//........省略
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
//3.延迟级别,初始为0
int delayLevel = requestHeader.getDelayLevel();
//4.最大重试次数,最大值 16次
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//5.超过重试次数,死信队列
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {
//死信队列topic:%DLQ%+consumerGroupName
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
//重试次数+1。新消息被消费者消费时就会传上来,到第5步进行比较
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
//6.作为新消息存到CommitLog中,originMsgId会落地的
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});
}
}
主要做了以下几个事:
- 更消息的 Topic 为
"%RETRY%"+ ConsumerGroupName
,计算queueId(重试队列,队列数为1); - 如果消息重试 >= 16次(默认)。继续更改消息的Topic 为死信队列的Topic:
"%DLQ%" + ConsumerGroupName
,消费队列为1(死信队列只有一个消费队列); - 如果没有变成死信,计算消息的延迟级别;
- 复制原来Msg,重新生成一个Msg,将新Msg丢给BrokerController中,然后存到CommitLog中进行存储,这里原始的消息ID(originMsgId)也会落地。
3.4 死信队列
当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。
此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息
(Dead-Letter Message),存储死信消息的特殊队列称为死信队列
(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%+ConsumerGroupName
,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。
总结就是:
- Broker中单独的一个队列(DLQ),该队列存储了Consumer端
重试16次
后都没成功消费的消息; - DLQ队列:只有写权限,没有读权限。所以是不能被Consumer重新消费的,只能进行
人工干预
,重新投递(Rocket-MQ-Console 中可以操作); - DLQ队列中,该消息的TOPIC重新被命名为:
"%DLQ%" + ConsumerGroupName
; - DLQ队列其实就是(consumequeue文件夹的
"%DLQ%" + ConsumerGroupName
命名的Topic文件夹下的队列)。
4 注意事项
- 一条消息无论重试多少次,这些重试消息的
Message ID
都不会改变。 消息重试
只针对集群消费模式
生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。- 死信消息不能被Consumer重新消费,只能进行
人工干预
,重新投递。