1 Consumer样例
这里以集群消费、并发消费为例,采用一般的推模式进行消息消费。
public class BalanceConsumer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.setMaxReconsumeTimes(1);
consumer.subscribe("TopicTest", "tagA|TagB|TagC");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}2 启动流程
启动消费者从consumer.start();开始。
//DefaultMQPushConsumer
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}其最终会调用MQClientInstance.start()方法。上一章讲过,MQClientInstance 封装了RocketMQ网络处理的API,是消息生产者与消费者同NameServer、Broker打交道的唯一途径。
其实,Consumer也是一个NettyRemotingClient,通过netty与NameServer、Broker进行通讯。因而其他流程类似Producer,这里,我们对拉取、负载进行下代码跟踪。如下图所示:
这里顺便提一嘴
MQClientInstance启动前的定时任务:MQClientInstance的4个定时任务
2.1 拉取
这里简要说说消息的拉取过程。
pullMessageService其实是个线程,和下面的rebalanceService一样,都继承自ServiceThread,
public abstract class ServiceThread implements Runnable{};
public class PullMessageService extends ServiceThread {};在构造MQClientInstance时,就会创建pullMessageService实例。当执行pullMessageService.start()后,就是线程开始工作,拉取队列中的消息,但真正干活儿的还是Consumer:
public class PullMessageService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
//根据消费者组名,选择一个消费者
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//执行拉取请求:立即执行、延迟执行
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
}2.2 负载
同pullMessageService,同样继承自ServiceThread,rebalanceService任务就是消费者的负载均衡,即给consumer重新调整和分配queue。当发生Broker掉线、消费者实例掉线、Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费。
触发形式:
- 定时触发(20sec)
- 接口触发:
- 1)收到broker的consumer list发生变化通知后需要重新做负载均衡,比如同一个group中新加入了consumer或者有consumer下线;
- 2)consumer启动的时候。
具体功能点如下:
2.2.1 定时触发负载均衡操作
public class RebalanceService extends ServiceThread {
//任务间隔时长
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//默认间隔20秒执行一次
this.waitForRunning(waitInterval);
//交由MQClientInstance执行负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}2.2.2 实现负载均衡算法
不管是DefaultMQPushConsumerImpl的rebalancePushImpl,还是DefaultMQPullConsumerImpl的rebalancePullImpl,调用都是父类RebalanceImpl的实现。
分配策略:
先排序的目的是:如果同一个分组的多个客户端,分布在不同的机器上(消费者的机器上),每台客户端都单独算,使得算出来的效果是一致的。
总体消费就是让每一个 Consumer有同样的一个
MessageQueue的视图,因为每个消费者的视图是一致的,那么在每个客户端算负载,算出来的结果当然就是一致的。

对于Consumer1和Consumer2,经过统一的排序,在Consumer1客户端也好,还是Consumer2的客户端也好,算出来的结果是一致的。
对比
Kafka,在消费的时候依赖Zookeeper,broker变动还要走选举,如果选不出或者比较卡,这个是否会导致负载不正常,负载不成功就不能正常的工作。而RocketMQ的这种方式简单,并且高可用。
这里提一嘴消息队列的分配策略AllocateMessageQueueStrategy,由于本章开头声明过,消费者模式是push,因而在构造消费者时,默认就已经确定分配策略是AllocateMessageQueueAveragely:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark");
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}举例:
AllocateMessageQueueAveragely,平均分配
8个队列:q1,q2,q3,q4,q5,q6,q7,q8,消费者3个:c1,c2,c3
分配如下:
c1:q1,q2,q3
c2:q4,q5,q6
c3:q7,q8
AllocateMessageQueueAveragelyByCircle,平均轮询分配
8个队列:q1,q2,q3,q4,q5,q6,q7,q8,消费者3个:c1,c2,c3
分配如下:
c1:q1,q4,q7
c2:q2,q5,q8
c3:q3,q6其他分配策略还有:AllocateMessageQueueAveragelyByCircle、AllocateMessageQueueByConfig等,这里不多列举了,之前在5.2 Consumer负载均衡一节中提到过。
2.2.3 更新消费者组分布信息
详见方法:rebalanceByTopic()。
2.3 初始化offsetStore
由于本例是集群消费,所以初始化offsetStore走的是RemoteBrokerOffsetStore,对于广播消费,其offset是存储于消费端的。
两种消费模式下的load()实现:
RemoteBrokerOffsetStore
public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void load() { } }LocalFileOffsetStore
public class LocalFileOffsetStore implements OffsetStore { @Override public void load() throws MQClientException { OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); log.info("load consumer's offset, {} {} {}", this.groupName, mq, offset.get()); } } } }
2.4 注册消费者,监听,消费者客户端启动

2.5 其他
在前面一些创建,服务启动,注册等动作完成后,接下来还有:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
//.................省略
default:
break;
}
//14、更新路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//15、如果是SQL过滤,检查broker是否支持SQL过滤,默认是支持TAG过滤的。服务端处理见ClientManageProcessor#checkClientConfig()
this.mQClientFactory.checkClientInBroker();
//发送心跳(30s),同步broker列表,同步consumer配置到broker,同步FilterClass到FilterServer(PushConsumer)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//16、rebalanceService执行一次
this.mQClientFactory.rebalanceImmediately();
}其中,
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();,用于消费者或生产者向Broker发送心跳,并且若Broker有变化,会更新自己本地的缓存。2.6 消息消费流程
2.6.1 获取Broker的消费位置-offset
在前面一些列动作,Consumer拿到了group、topic、通过rebalanceService又拿到了queue,准备消费了,还得要确定消费位置,也就是客户端向Broker发起查询offset请求,请求代号为QUERY_CONSUMER_OFFSET。
调用的地方为:RemoteBrokerOffsetStore类中fetchConsumeOffsetFromBroker():
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
//查找一个broker Address不为空的Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
//没找到,则从NameServer拉取路由
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
//再次发起查找一个broker Address不为空的Broker请求
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
//向broker发起查询offset请求
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}2.6.2 拉取消息
如2.2节所述,pullMessageService是一个单独的线程去拉取消息的。确定了消费者的group、topic、还有queue和需要获取Queue的Offset,就要正式开始拉取消息了。
送入的信息包括:topic、queueid、offset,调用的方法为PullAPIWrapper.pullKernelImpl(),最终交由MQClientAPIImpl.pullMessage()执行,请求代号:PULL_MESSAGE。
2.6.3 消费消息
拉到消息后,消费者就要进行消息的消费了。消费完了之后,要更新offset,这个时候也要发起调用。
请求代号:UPDATE_CONSUMER_OFFSET
这个地方要注意有两种方式:
1、 定时,默认5s提交
对于集群消费模式,同步offset至Broker;对于广播消费模式,则是同步offset至本地。
2、 前面步骤的拉取消息时会带入参数:commitoffset,这个时候也会更新。
RebalancePushImpl中计算offset的地方:
DefaultMQPushConsumerImpl.pullMessage()中的commitoffset:
3 整体流程



