1 Consumer样例
这里以集群消费
、并发消费
为例,采用一般的推模式
进行消息消费。
public class BalanceConsumer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.setMaxReconsumeTimes(1);
consumer.subscribe("TopicTest", "tagA|TagB|TagC");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
2 启动流程
启动消费者从consumer.start();
开始。
//DefaultMQPushConsumer
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
其最终会调用MQClientInstance.start()
方法。上一章讲过,MQClientInstance 封装了RocketMQ网络处理的API,是消息生产者与消费者同NameServer、Broker打交道的唯一途径
。
其实,Consumer也是一个NettyRemotingClient
,通过netty与NameServer、Broker进行通讯。因而其他流程类似Producer,这里,我们对拉取、负载进行下代码跟踪。如下图所示:
这里顺便提一嘴
MQClientInstance
启动前的定时任务:
2.1 拉取
这里简要说说消息的拉取过程。
pullMessageService
其实是个线程,和下面的rebalanceService
一样,都继承自ServiceThread
,
public abstract class ServiceThread implements Runnable{};
public class PullMessageService extends ServiceThread {};
在构造MQClientInstance
时,就会创建pullMessageService
实例。当执行pullMessageService.start()
后,就是线程开始工作,拉取队列中的消息,但真正干活儿的还是Consumer
:
public class PullMessageService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
//根据消费者组名,选择一个消费者
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//执行拉取请求:立即执行、延迟执行
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
}
2.2 负载
同pullMessageService
,同样继承自ServiceThread
,rebalanceService
任务就是消费者的负载均衡,即给consumer重新调整和分配queue。当发生Broker掉线、消费者实例掉线、Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费。
触发形式:
- 定时触发(20sec)
- 接口触发:
- 1)收到broker的consumer list发生变化通知后需要重新做负载均衡,比如同一个group中新加入了consumer或者有consumer下线;
- 2)consumer启动的时候。
具体功能点如下:
2.2.1 定时触发负载均衡操作
public class RebalanceService extends ServiceThread {
//任务间隔时长
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//默认间隔20秒执行一次
this.waitForRunning(waitInterval);
//交由MQClientInstance执行负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
2.2.2 实现负载均衡算法
不管是DefaultMQPushConsumerImpl
的rebalancePushImpl
,还是DefaultMQPullConsumerImpl
的rebalancePullImpl
,调用都是父类RebalanceImpl
的实现。
分配策略:
先排序的目的是:如果同一个分组的多个客户端,分布在不同的机器上(消费者的机器上),每台客户端都单独算,使得算出来的效果是一致的
。
总体消费就是让每一个 Consumer有同样的一个
MessageQueue
的视图,因为每个消费者的视图是一致的,那么在每个客户端算负载,算出来的结果当然就是一致的。
对于Consumer1
和Consumer2
,经过统一的排序
,在Consumer1客户端也好,还是Consumer2的客户端也好,算出来的结果是一致的。
对比
Kafka
,在消费的时候依赖Zookeeper
,broker变动还要走选举
,如果选不出或者比较卡,这个是否会导致负载不正常,负载不成功就不能正常的工作。而RocketMQ的这种方式简单,并且高可用。
这里提一嘴消息队列的分配策略AllocateMessageQueueStrategy
,由于本章开头声明过,消费者模式是push,因而在构造消费者时,默认就已经确定分配策略是AllocateMessageQueueAveragely
:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark");
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
举例:
AllocateMessageQueueAveragely,平均分配
8个队列:q1,q2,q3,q4,q5,q6,q7,q8,消费者3个:c1,c2,c3
分配如下:
c1:q1,q2,q3
c2:q4,q5,q6
c3:q7,q8
AllocateMessageQueueAveragelyByCircle,平均轮询分配
8个队列:q1,q2,q3,q4,q5,q6,q7,q8,消费者3个:c1,c2,c3
分配如下:
c1:q1,q4,q7
c2:q2,q5,q8
c3:q3,q6
其他分配策略还有:AllocateMessageQueueAveragelyByCircle
、AllocateMessageQueueByConfig
等,这里不多列举了,之前在5.2 Consumer负载均衡一节中提到过。
2.2.3 更新消费者组分布信息
详见方法:rebalanceByTopic()
。
2.3 初始化offsetStore
由于本例是集群消费
,所以初始化offsetStore走的是RemoteBrokerOffsetStore
,对于广播消费
,其offset是存储于消费端的。
两种消费模式下的load()
实现:
RemoteBrokerOffsetStore
public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void load() { } }
LocalFileOffsetStore
public class LocalFileOffsetStore implements OffsetStore { @Override public void load() throws MQClientException { OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); log.info("load consumer's offset, {} {} {}", this.groupName, mq, offset.get()); } } } }
2.4 注册消费者,监听,消费者客户端启动
2.5 其他
在前面一些创建,服务启动,注册等动作完成后,接下来还有:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
//.................省略
default:
break;
}
//14、更新路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//15、如果是SQL过滤,检查broker是否支持SQL过滤,默认是支持TAG过滤的。服务端处理见ClientManageProcessor#checkClientConfig()
this.mQClientFactory.checkClientInBroker();
//发送心跳(30s),同步broker列表,同步consumer配置到broker,同步FilterClass到FilterServer(PushConsumer)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//16、rebalanceService执行一次
this.mQClientFactory.rebalanceImmediately();
}
其中,this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
,用于消费者或生产者向Broker
发送心跳,并且若Broker
有变化,会更新自己本地的缓存。
2.6 消息消费流程
2.6.1 获取Broker的消费位置-offset
在前面一些列动作,Consumer拿到了group、topic、通过rebalanceService又拿到了queue,准备消费了,还得要确定消费位置,也就是客户端向Broker发起查询offset请求,请求代号为QUERY_CONSUMER_OFFSET
。
调用的地方为:RemoteBrokerOffsetStore类
中fetchConsumeOffsetFromBroker()
:
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
//查找一个broker Address不为空的Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
//没找到,则从NameServer拉取路由
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
//再次发起查找一个broker Address不为空的Broker请求
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
//向broker发起查询offset请求
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
2.6.2 拉取消息
如2.2节所述,pullMessageService是一个单独的线程去拉取消息的。确定了消费者的group
、topic
、还有queue
和需要获取Queue的Offset
,就要正式开始拉取消息了。
送入的信息包括:topic
、queueid
、offset
,调用的方法为PullAPIWrapper.pullKernelImpl()
,最终交由MQClientAPIImpl.pullMessage()
执行,请求代号:PULL_MESSAGE
。
2.6.3 消费消息
拉到消息后,消费者就要进行消息的消费了。消费完了之后,要更新offset
,这个时候也要发起调用。
请求代号:UPDATE_CONSUMER_OFFSET
这个地方要注意有两种方式:
1、 定时,默认5s提交
对于集群消费模式,同步offset至Broker;对于广播消费模式,则是同步offset至本地。
2、 前面步骤的拉取消息时会带入参数:commitoffset
,这个时候也会更新。
RebalancePushImpl
中计算offset
的地方:
DefaultMQPushConsumerImpl.pullMessage()中的commitoffset
: