【RocketMQ学习】8.源码之Consumer


1 Consumer样例

这里以集群消费并发消费为例,采用一般的推模式进行消息消费。

public class BalanceConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.setMaxReconsumeTimes(1);
        consumer.subscribe("TopicTest", "tagA|TagB|TagC");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

2 启动流程

启动消费者从consumer.start();开始。

//DefaultMQPushConsumer
@Override
public void start() throws MQClientException {
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

其最终会调用MQClientInstance.start()方法。上一章讲过,MQClientInstance 封装了RocketMQ网络处理的API,是消息生产者与消费者同NameServer、Broker打交道的唯一途径

其实,Consumer也是一个NettyRemotingClient,通过netty与NameServer、Broker进行通讯。因而其他流程类似Producer,这里,我们对拉取、负载进行下代码跟踪。如下图所示:

Consumer的拉取与负载

这里顺便提一嘴MQClientInstance启动前的定时任务:

MQClientInstance的4个定时任务

2.1 拉取

这里简要说说消息的拉取过程。

pullMessageService其实是个线程,和下面的rebalanceService一样,都继承自ServiceThread

public abstract class ServiceThread implements Runnable{};

public class PullMessageService extends ServiceThread {};

在构造MQClientInstance时,就会创建pullMessageService实例。当执行pullMessageService.start()后,就是线程开始工作,拉取队列中的消息,但真正干活儿的还是Consumer

public class PullMessageService extends ServiceThread {
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

    private void pullMessage(final PullRequest pullRequest) {
        //根据消费者组名,选择一个消费者
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            //执行拉取请求:立即执行、延迟执行
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
}

2.2 负载

pullMessageService,同样继承自ServiceThreadrebalanceService任务就是消费者的负载均衡,即给consumer重新调整和分配queue。当发生Broker掉线、消费者实例掉线、Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费。

触发形式:

  • 定时触发(20sec)
  • 接口触发:
    • 1)收到broker的consumer list发生变化通知后需要重新做负载均衡,比如同一个group中新加入了consumer或者有consumer下线;
    • 2)consumer启动的时候。

具体功能点如下:

2.2.1 定时触发负载均衡操作

public class RebalanceService extends ServiceThread {
    //任务间隔时长
    private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            //默认间隔20秒执行一次
            this.waitForRunning(waitInterval);
            //交由MQClientInstance执行负载均衡
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}

2.2.2 实现负载均衡算法

不管是DefaultMQPushConsumerImplrebalancePushImpl,还是DefaultMQPullConsumerImplrebalancePullImpl,调用都是父类RebalanceImpl的实现。

Push模式下的doRebalance()

分配策略:

Push模式下的分配策略

先排序的目的是:如果同一个分组的多个客户端,分布在不同的机器上(消费者的机器上),每台客户端都单独算,使得算出来的效果是一致的

总体消费就是让每一个 Consumer有同样的一个MessageQueue的视图,因为每个消费者的视图是一致的,那么在每个客户端算负载,算出来的结果当然就是一致的。

算法一致性

对于Consumer1Consumer2,经过统一的排序,在Consumer1客户端也好,还是Consumer2的客户端也好,算出来的结果是一致的。

对比Kafka,在消费的时候依赖Zookeeper,broker变动还要走选举,如果选不出或者比较卡,这个是否会导致负载不正常,负载不成功就不能正常的工作。而RocketMQ的这种方式简单,并且高可用。

这里提一嘴消息队列的分配策略AllocateMessageQueueStrategy,由于本章开头声明过,消费者模式是push,因而在构造消费者时,默认就已经确定分配策略是AllocateMessageQueueAveragely

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark");

public DefaultMQPushConsumer(final String consumerGroup) {
    this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}

举例:

AllocateMessageQueueAveragely,平均分配
8个队列:q1,q2,q3,q4,q5,q6,q7,q8,消费者3个:c1,c2,c3
分配如下:
c1:q1,q2,q3
c2:q4,q5,q6
c3:q7,q8

AllocateMessageQueueAveragelyByCircle,平均轮询分配
8个队列:q1,q2,q3,q4,q5,q6,q7,q8,消费者3个:c1,c2,c3
分配如下:
c1:q1,q4,q7
c2:q2,q5,q8
c3:q3,q6

其他分配策略还有:AllocateMessageQueueAveragelyByCircleAllocateMessageQueueByConfig等,这里不多列举了,之前在5.2 Consumer负载均衡一节中提到过。

2.2.3 更新消费者组分布信息

详见方法:rebalanceByTopic()

2.3 初始化offsetStore

由于本例是集群消费,所以初始化offsetStore走的是RemoteBrokerOffsetStore,对于广播消费,其offset是存储于消费端的。

集群模式下的offsetStore初始化

两种消费模式下的load()实现:

  • RemoteBrokerOffsetStore

    public class RemoteBrokerOffsetStore implements OffsetStore {
        @Override
        public void load() {
        }
    }
  • LocalFileOffsetStore

    public class LocalFileOffsetStore implements OffsetStore {
        @Override
        public void load() throws MQClientException {
            OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
            if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
                offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
    
                for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
                    AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                    log.info("load consumer's offset, {} {} {}",
                            this.groupName,
                            mq,
                            offset.get());
                }
            }
        }
    }

2.4 注册消费者,监听,消费者客户端启动

注册消费者、监听、消费者客户端启动

2.5 其他

在前面一些创建,服务启动,注册等动作完成后,接下来还有:

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                //.................省略
            default:
                break;
        }
        //14、更新路由信息
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        //15、如果是SQL过滤,检查broker是否支持SQL过滤,默认是支持TAG过滤的。服务端处理见ClientManageProcessor#checkClientConfig()
        this.mQClientFactory.checkClientInBroker();
        //发送心跳(30s),同步broker列表,同步consumer配置到broker,同步FilterClass到FilterServer(PushConsumer)
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        //16、rebalanceService执行一次
        this.mQClientFactory.rebalanceImmediately();
}

其中,this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();,用于消费者或生产者向Broker发送心跳,并且若Broker有变化,会更新自己本地的缓存。

2.6 消息消费流程

2.6.1 获取Broker的消费位置-offset

在前面一些列动作,Consumer拿到了group、topic、通过rebalanceService又拿到了queue,准备消费了,还得要确定消费位置,也就是客户端向Broker发起查询offset请求,请求代号为QUERY_CONSUMER_OFFSET

调用的地方为:RemoteBrokerOffsetStore类fetchConsumeOffsetFromBroker()

private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
        //查找一个broker Address不为空的Broker
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {
            //没找到,则从NameServer拉取路由
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            //再次发起查找一个broker Address不为空的Broker请求
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }

        if (findBrokerResult != null) {
            QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            //向broker发起查询offset请求
            return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }

2.6.2 拉取消息

如2.2节所述,pullMessageService是一个单独的线程去拉取消息的。确定了消费者的grouptopic、还有queue和需要获取Queue的Offset,就要正式开始拉取消息了。
送入的信息包括:topicqueueidoffset,调用的方法为PullAPIWrapper.pullKernelImpl(),最终交由MQClientAPIImpl.pullMessage()执行,请求代号:PULL_MESSAGE

请求拉取消息

2.6.3 消费消息

拉到消息后,消费者就要进行消息的消费了。消费完了之后,要更新offset,这个时候也要发起调用。
请求代号:UPDATE_CONSUMER_OFFSET
这个地方要注意有两种方式:
1、 定时,默认5s提交

定时持久化会同步offset至Broker

对于集群消费模式,同步offset至Broker;对于广播消费模式,则是同步offset至本地。

2、 前面步骤的拉取消息时会带入参数:commitoffset,这个时候也会更新。

PullAPIWrapper.pullKernelImpl()中的commitoffset

RebalancePushImpl中计算offset的地方:

RebalancePushImpl中的offset

DefaultMQPushConsumerImpl.pullMessage()中的commitoffset

DefaultMQPushConsumerImpl中的offset

3 整体流程

消费者流程图

文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录