【RocketMQ学习】5.源码之NameServer和Broker启动


1 准备

JDK:1.8.0_181
Maven:apache-maven-3.5.3
RocketMQ:4.8.0(官网目前最新版本是5.1.3),参见 快速指南 ,各版本源码及二进制执行文件下载参见 这里下载
环境变量:ROCKETMQ_HOME=/Users/zyxelva/Documents/rocketmq-all-4.8.0-bin-release

2 源码学习流程

2.1 RocketMQ源码包结构说明

  • broker:主要的业务逻辑,消息收发,主从同步,page cache
  • client:客户端接口,比如生产者和消费者
  • example:示例,比如生产者和消费者
  • common:公用数据结构等等
  • distribution:编译模块,编译输出等
  • fliter:进行Broker过滤的不感兴趣的消息传输,减小带宽压力
  • logappenderlogging:日志相关
  • namesrv:Namesrver服务,用于服务协调
  • openmessaging:对外提供服务
  • remoting:远程调用接口,封装Netty底层通信
  • srvutil:提供-些公用的工具方法,比如解析命令行参数
  • store:消息存储
  • tools:管理工具,比如有名的mqadmin工具

2.2 流程

3 打包与部署

使用如下命令对源码进行编译:

mvn -Prelease-all -DskipTests clean install -U -e -Dcheckstyle.skip=true

编译成功最后会输出如下内容,所有模块的编译结果都是SUCCESS状态:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] Apache RocketMQ 4.8.0 4.8.0 ........................ SUCCESS [  3.571 s]
[INFO] rocketmq-logging 4.8.0 ............................. SUCCESS [  3.588 s]
[INFO] rocketmq-remoting 4.8.0 ............................ SUCCESS [  2.614 s]
[INFO] rocketmq-common 4.8.0 .............................. SUCCESS [  5.597 s]
[INFO] rocketmq-client 4.8.0 .............................. SUCCESS [  4.363 s]
[INFO] rocketmq-store 4.8.0 ............................... SUCCESS [  2.325 s]
[INFO] rocketmq-srvutil 4.8.0 ............................. SUCCESS [  0.487 s]
[INFO] rocketmq-filter 4.8.0 .............................. SUCCESS [  1.247 s]
[INFO] rocketmq-acl 4.8.0 ................................. SUCCESS [  1.070 s]
[INFO] rocketmq-broker 4.8.0 .............................. SUCCESS [  2.547 s]
[INFO] rocketmq-tools 4.8.0 ............................... SUCCESS [  1.913 s]
[INFO] rocketmq-namesrv 4.8.0 ............................. SUCCESS [  1.072 s]
[INFO] rocketmq-logappender 4.8.0 ......................... SUCCESS [  1.358 s]
[INFO] rocketmq-test 4.8.0 ................................ SUCCESS [  1.649 s]
[INFO] rocketmq-openmessaging 4.8.0 ....................... SUCCESS [  0.831 s]
[INFO] rocketmq-example 4.8.0 ............................. SUCCESS [  1.081 s]
[INFO] rocketmq-distribution 4.8.0 4.8.0 .................. SUCCESS [01:19 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:55 min
[INFO] Finished at: 2023-07-21T11:12:38+08:00
[INFO] ------------------------------------------------------------------------

然后在distribution/target/目录下,可以看到编译打包好的可分发包:

[zyxelva@Taeyeons-MacBook-Pro 11:33:56 ~/Downloads/rocketmq-all-4.8.0-source-release]$ ls distribution/target/
archive-tmp                     rocketmq-4.8.0                  rocketmq-4.8.0.zip
maven-shared-archive-resources  rocketmq-4.8.0.tar.gz

distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/即为可以用以启动RocketMQ的文件目录:

[zyxelva@Taeyeons-MacBook-Pro 12:01:47 ~/Downloads/rocketmq-all-4.8.0-source-release]$ ll distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/
total 64
drwxr-xr-x   9 zyxelva  staff   288B Jul 21 11:12 .
drwxr-xr-x   3 zyxelva  staff    96B Jul 21 11:12 ..
-rw-rw-r--   1 zyxelva  staff    17K Dec  8  2020 LICENSE
-rw-rw-r--   1 zyxelva  staff   1.3K Dec  8  2020 NOTICE
-rw-rw-r--   1 zyxelva  staff   5.0K Dec  8  2020 README.md
drwxr-xr-x   7 zyxelva  staff   224B Jul 21 11:12 benchmark
drwxrwxr-x  30 zyxelva  staff   960B Dec  8  2020 bin
drwxrwxr-x  12 zyxelva  staff   384B Dec  8  2020 conf
drwxr-xr-x  36 zyxelva  staff   1.1K Jul 21 11:12 lib

当然如果要上传到服务器,可以采用已经打包好的文件:rocketmq-4.8.0.zip.

之所以要源码打包部署,是为了后面源码阅读,通过打印些必要的日志,完成源码的追踪学习。具体的部署流程可以参考官网

4 NameServer启动流程

4.1 概览

NameServer启动流程

核心方法:NamesrvStartup 类中main()->main0-> createNamesrvController->start() -> initialize()

4.2 步骤

4.2.1 解析配置文件,填充NameServerConfigNettyServerConfig属性值,并创建NamesrvController

NamesrvController类中createNamesrvController方法如下:

public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            //一、创建 NamesrvController
            NamesrvController controller = createNamesrvController(args);
            //二、初始化NamesrvController,再启动
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        //。。。。。。省略代码
        //创建 NamesrvConfig
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        //创建 NettyServerConfig
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 设置 NameServer 监听端口
        nettyServerConfig.setListenPort(9876);
        // 解析启动参数 -c 后面的东东,比如生产上一般要指定broker.conf的配置路径,还有就是本地调试Broker的时候需要-c 指定配置的路径
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
        //解析启动参数 -p后面的东东
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            //如果开启-p,则RocketMQ真的只会打印参数列表,不会启动nameserver
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
        //需要启动前配置好 RocketMQ 的安装目录 ROCKETMQ_HOME
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        //logback日志文件
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        //创建 NamesrvController
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }
    //。。。。。。省略代码
}

如果你要查看NameServer的各种参数,直接在启动个参数中送入-p就可以打印这个NameServer的所有的参数信息。(以IDEA为例)

打印NameServer参数配置

打印结果:

13:58:27.674 [main] INFO  RocketmqNamesrvConsole - rocketmqHome=/Users/zyxelva/Documents/rocketmq-all-4.8.0-bin-release
13:58:27.732 [main] INFO  RocketmqNamesrvConsole - kvConfigPath=/Users/zyxelva/namesrv/kvConfig.json
13:58:27.734 [main] INFO  RocketmqNamesrvConsole - configStorePath=/Users/zyxelva/namesrv/namesrv.properties
13:58:27.735 [main] INFO  RocketmqNamesrvConsole - productEnvName=center
13:58:27.737 [main] INFO  RocketmqNamesrvConsole - clusterTest=false
13:58:27.737 [main] INFO  RocketmqNamesrvConsole - orderMessageEnable=false
13:58:27.739 [main] INFO  RocketmqNamesrvConsole - listenPort=9876
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverWorkerThreads=8
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverCallbackExecutorThreads=0
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverSelectorThreads=3
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverOnewaySemaphoreValue=256
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverAsyncSemaphoreValue=64
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverChannelMaxIdleTimeSeconds=120
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverSocketSndBufSize=65535
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverSocketRcvBufSize=65535
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - serverPooledByteBufAllocatorEnable=true
13:58:27.740 [main] INFO  RocketmqNamesrvConsole - useEpollNativeSelector=false

4.2.2 初始化

即在NamesrvController实例创建后,初始化相关信息,包括配置文件保存路径加载、NettyRemotingServer创建、线程池创建,最重要是两个定时任务创建和启动:

  • 每隔10s扫描一次Broker,移除不活跃的Broker
  • 每隔10min打印一次KV配置
public class NamesrvStartup {
    public static NamesrvController start(final NamesrvController controller) throws Exception {
        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        //三、初始化 NamesrvController
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        //四、先在JVM关闭前注册钩子,关闭NettyServer、线程池、定时任务
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        //五、启动
        controller.start();
        return controller;
    }
}

public class NamesrvController {
    public boolean initialize() {
        //加载KV配置
        this.kvConfigManager.load();
        //创建NettyRemotingServer实例
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //线程池创建
        this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //向NettyRemotingServer注册NamesrvController和线程池
        this.registerProcessor();
        //每隔10s扫描一次为活跃Broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //每隔10min打印一次KV配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

       //。。。。。。省略代码

        return true;
    }
}

4.2.3 在JVM进程关闭之前,先将线程池关闭,及时释放资源。再启动。

public class NamesrvStartup {
    public static NamesrvController start(final NamesrvController controller) throws Exception {
        //此处省略一万字
        //四、先在JVM关闭前注册钩子,关闭NettyServer、线程池、定时任务
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        //五、启动
        controller.start();
        return controller;
    }
}

4.3 时序图

NameServer启动流程时序图

5 Broker启动流程

5.1 概览

Broker启动流程

5.2 时序图

Broker启动时序图

大致流程与NameServer相差无几,这里主要讲讲Broker注册Broker删除路由发现

5.3 Broker注册(路由注册)

RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。

Broker启动时向集群中所有的NameServer发送心跳信息,每隔30s向集群中所有NameServer发送心跳包,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。

5.3.1 BrokerController#start()

定位到BrokerController#start()方法。

public class BrokerController {
    public void start() throws Exception {
        //......省略各种start()
        
        //开启DLedger多副本主从切换
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            //开启事务状态回查处理器
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            //处理从节点的元数据同步
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            this.registerBrokerAll(true, false, true);
        }
        //broker每隔30s向NameServer发送心跳包
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //注意这里第二个入参为false,即不是oneway方式发送
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

        //.......
    }
}

主要跟踪registerBrokerAll()。依次经历BrokerController#registerBrokerAll->BrokerController#doRegisterBrokerAll->BrokerOuterAPI#registerBrokerAll

5.3.2 BrokerOuterAPI#registerBrokerAll()

public class BrokerOuterAPI {
    public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,
                                                        final String brokerAddr,
                                                        final String brokerName,
                                                        final long brokerId,
                                                        final String haServerAddr,
                                                        final TopicConfigSerializeWrapper topicConfigWrapper,
                                                        final List<String> filterServerList,
                                                        final boolean oneway,
                                                        final int timeoutMills,
                                                        final boolean compressed) {
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            //心跳包格式:
            //1.头部:brokerAddr、brokerId、brokerName、clusterName、haServerAddr、compressed
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
            //2.请求体:主题配置、过滤服务器配置、CRC32校验和
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            
            //使用CountDownLatch向多个NameServer发起注册
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //多线程注册
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }
                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
        return registerBrokerResultList;
    }

    private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final boolean oneway,
            final int timeoutMills,
            final RegisterBrokerRequestHeader requestHeader,
            final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
        //构建注册实体,注册代号:RequestCode.REGISTER_BROKER
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        //不会走这里,因为oneway为false
        if (oneway) {
            try {
                //发起注册
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }
        //走这里
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

        //...............
    }
}

Broker注册至NameServer,是一个客户端请求操作,会走NettyRemotingClient#invokeSync(),实际上的处理类是NettyRemotingAbstract负责公共的发送动作(也包括接收动作)。

5.3.3 NettyServerHandler#channelRead0()

由于是Netty负责网络这块,因而在服务端,通过NettyRemotingServer进行接收,Handler为其内置类NettyServerHandler,如同上述所说,实际为NettyRemotingAbstract去处理。

@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        //NettyRemotingAbstract#processMessageReceived
        processMessageReceived(ctx, msg);
    }
}

5.3.4 NettyRemotingAbstract#processRequestCommand()

定位到NettyRemotingAbstractprocessRequestCommand()

public abstract class NettyRemotingAbstract {
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        //.............
    }
}

还记得NameSrvController注册的处理器吗?

public class NamesrvController {
    public boolean initialize() {
        //加载KV配置
        this.kvConfigManager.load();
        //创建NettyRemotingServer实例
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //线程池创建
        this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //就是这里。。。。。向NettyRemotingServer注册NamesrvController和线程池
        this.registerProcessor();
        //..............
    }
}

其最终注册的就是DefaultRequestProcessor类型。因而NameServer处理Broker注册的任务就归它管了。

5.3.5 DefaultRequestProcessor#processRequest()

然后定位到DefaultRequestProcessor#processRequest(),根据注册代号即可找到NameServer处理Broker注册请求:

public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        //............
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            //判断是注册Broker信息
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                //这里用于组装注册请求实体
                //版本高于3.0.11,由此注册broker,我的版本4.8.0,因此从此进
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    //版本低于3.0.11,注册Broker信息
                    return this.registerBroker(ctx, request);
                };
                //....................
            default:
                break;
        }
        return null;
    }
}

5.3.6 NameServer.RouteInfoManager#registerBroker()

虽然有版本区分,但最终走的都是NameServer.RouteInfoManager#registerBroker()

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    //读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    //注册Broker核心方法
    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                //注册路由信息使用写锁
                this.lock.writeLock().lockInterruptibly();
                //维护clusterAddrTable
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;
                //维护brokerAddrTable
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                //第一次注册,则创建brokerData
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                //非第一次注册,更新Broker
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }

                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);
                //维护topicQueueTable
                if (null != topicConfigWrapper
                        && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                            || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                                topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                        new BrokerLiveInfo(
                                System.currentTimeMillis(),
                                topicConfigWrapper.getDataVersion(),
                                channel,
                                haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }

                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }
        return result;
    }
}

5.4 Broker删除(路由删除)

Broker每隔30s向NameServer发送一个心跳包,心跳包包含:

  • BrokerId
  • Broker地址
  • Broker名称
  • Broker所属集群名称
  • Broker关联的FilterServer列表

但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?

NameServer会每隔10s扫描brokerLiveTable状态表(见4.2.2 初始化),如果BrokerLiveInfo的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTablebrokerAddrTablebrokerLiveTablefilterServerTable

RocketMQ有两个触发点来删除路由信息:

  • NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
    //NamesrvController#initialize()
        //每隔10s扫描一次为活跃Broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
    
    //RouteInfoManager#scanNotActiveBroker()
    public class RouteInfoManager {
        //超过120s,则认为Broker失效
        public void scanNotActiveBroker() {
            //获得brokerLiveTable
            Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
            //遍历brokerLiveTable
            while (it.hasNext()) {
                Entry<String, BrokerLiveInfo> next = it.next();
                long last = next.getValue().getLastUpdateTimestamp();
                //如果收到心跳包的时间距当时时间是否超过120s
                if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                    //关闭连接
                    RemotingUtil.closeChannel(next.getValue().getChannel());
                    //移除broker
                    it.remove();
                    log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                    //维护路由表(更新路由表)
                    this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
                }
            }
        }
    }
  • Broker在正常关闭的情况下,会执行unregisterBrokerAll指令。请求代号:UNREGISTER_BROKER
    //BrokerController#shutdown()
    public void shutdown() {
            this.unregisterBrokerAll();
    }

这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。

5.5 路由发现

RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。

这里,客户端是指生产者消费者,他两都持有MQClientInstance实例(单例),客户端启动时,会开启定时任务,如下图定时任务一

MQClientInstance的4个定时任务

其会向NameServer发起代号为GET_ROUTEINFO_BY_TOPIC的请求,获取最新的路由信息。

而在NameServer端,通过其启动时注册的处理器DefaultRequestProcessor,处理客户端发来的请求:

//DefaultRequestProcessor
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
        //调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServer
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
        //如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息
        if (topicRouteData != null) {
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }

            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

6 总结

broker检测机制流程

NameServer存储信息:

  • topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡;
  • brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址;
  • clusterAddrTable:Broker集群信息,存储集群中所有Broker名称;
  • brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息;
  • filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤;

NameServer的存储实现基于内存(HashMap),NameServer并不会持久化路由信息,持久化的重任是交给Broker来完成。


文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录