1 准备
JDK:1.8.0_181
Maven:apache-maven-3.5.3
RocketMQ:4.8.0(官网目前最新版本是5.1.3),参见 快速指南 ,各版本源码及二进制执行文件下载参见 这里下载
环境变量:ROCKETMQ_HOME=/Users/zyxelva/Documents/rocketmq-all-4.8.0-bin-release
2 源码学习流程
2.1 RocketMQ源码包结构说明
broker
:主要的业务逻辑,消息收发,主从同步,page cacheclient
:客户端接口,比如生产者和消费者example
:示例,比如生产者和消费者common
:公用数据结构等等distribution
:编译模块,编译输出等fliter
:进行Broker过滤的不感兴趣的消息传输,减小带宽压力logappender
、logging
:日志相关namesrv
:Namesrver服务,用于服务协调openmessaging
:对外提供服务remoting
:远程调用接口,封装Netty底层通信srvutil
:提供-些公用的工具方法,比如解析命令行参数store
:消息存储tools
:管理工具,比如有名的mqadmin
工具
2.2 流程
- 【RocketMQ学习】5.源码之NameServer和Broker启动
- 【RocketMQ学习】6.源码之Remoting
- 【RocketMQ学习】7-源码之Producer
- 【RocketMQ学习】8.源码之Consumer
- 【RocketMQ学习】9.源码之Store
- 【RocketMQ学习】10.源码之堆外内存
- 【RocketMQ学习】11.源码之事务消息
- 【RocketMQ学习】12.源码之数据同步及异步机制
- 【RocketMQ学习】13.源码之消息过滤与重试
3 打包与部署
使用如下命令对源码进行编译:
mvn -Prelease-all -DskipTests clean install -U -e -Dcheckstyle.skip=true
编译成功最后会输出如下内容,所有模块的编译结果都是SUCCESS状态:
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache RocketMQ 4.8.0 4.8.0 ........................ SUCCESS [ 3.571 s]
[INFO] rocketmq-logging 4.8.0 ............................. SUCCESS [ 3.588 s]
[INFO] rocketmq-remoting 4.8.0 ............................ SUCCESS [ 2.614 s]
[INFO] rocketmq-common 4.8.0 .............................. SUCCESS [ 5.597 s]
[INFO] rocketmq-client 4.8.0 .............................. SUCCESS [ 4.363 s]
[INFO] rocketmq-store 4.8.0 ............................... SUCCESS [ 2.325 s]
[INFO] rocketmq-srvutil 4.8.0 ............................. SUCCESS [ 0.487 s]
[INFO] rocketmq-filter 4.8.0 .............................. SUCCESS [ 1.247 s]
[INFO] rocketmq-acl 4.8.0 ................................. SUCCESS [ 1.070 s]
[INFO] rocketmq-broker 4.8.0 .............................. SUCCESS [ 2.547 s]
[INFO] rocketmq-tools 4.8.0 ............................... SUCCESS [ 1.913 s]
[INFO] rocketmq-namesrv 4.8.0 ............................. SUCCESS [ 1.072 s]
[INFO] rocketmq-logappender 4.8.0 ......................... SUCCESS [ 1.358 s]
[INFO] rocketmq-test 4.8.0 ................................ SUCCESS [ 1.649 s]
[INFO] rocketmq-openmessaging 4.8.0 ....................... SUCCESS [ 0.831 s]
[INFO] rocketmq-example 4.8.0 ............................. SUCCESS [ 1.081 s]
[INFO] rocketmq-distribution 4.8.0 4.8.0 .................. SUCCESS [01:19 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:55 min
[INFO] Finished at: 2023-07-21T11:12:38+08:00
[INFO] ------------------------------------------------------------------------
然后在distribution/target/
目录下,可以看到编译打包好的可分发包:
[zyxelva@Taeyeons-MacBook-Pro 11:33:56 ~/Downloads/rocketmq-all-4.8.0-source-release]$ ls distribution/target/
archive-tmp rocketmq-4.8.0 rocketmq-4.8.0.zip
maven-shared-archive-resources rocketmq-4.8.0.tar.gz
而distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/
即为可以用以启动RocketMQ的文件目录:
[zyxelva@Taeyeons-MacBook-Pro 12:01:47 ~/Downloads/rocketmq-all-4.8.0-source-release]$ ll distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/
total 64
drwxr-xr-x 9 zyxelva staff 288B Jul 21 11:12 .
drwxr-xr-x 3 zyxelva staff 96B Jul 21 11:12 ..
-rw-rw-r-- 1 zyxelva staff 17K Dec 8 2020 LICENSE
-rw-rw-r-- 1 zyxelva staff 1.3K Dec 8 2020 NOTICE
-rw-rw-r-- 1 zyxelva staff 5.0K Dec 8 2020 README.md
drwxr-xr-x 7 zyxelva staff 224B Jul 21 11:12 benchmark
drwxrwxr-x 30 zyxelva staff 960B Dec 8 2020 bin
drwxrwxr-x 12 zyxelva staff 384B Dec 8 2020 conf
drwxr-xr-x 36 zyxelva staff 1.1K Jul 21 11:12 lib
当然如果要上传到服务器,可以采用已经打包好的文件:rocketmq-4.8.0.zip
.
之所以要源码打包部署,是为了后面源码阅读,通过打印些必要的日志,完成源码的追踪学习。具体的部署流程可以参考官网 。
4 NameServer启动流程
4.1 概览
核心方法:NamesrvStartup 类中main()
->main0
-> createNamesrvController
->start()
-> initialize()
4.2 步骤
4.2.1 解析配置文件,填充NameServerConfig
、NettyServerConfig
属性值,并创建NamesrvController
NamesrvController
类中createNamesrvController
方法如下:
public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
//一、创建 NamesrvController
NamesrvController controller = createNamesrvController(args);
//二、初始化NamesrvController,再启动
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//。。。。。。省略代码
//创建 NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//创建 NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置 NameServer 监听端口
nettyServerConfig.setListenPort(9876);
// 解析启动参数 -c 后面的东东,比如生产上一般要指定broker.conf的配置路径,还有就是本地调试Broker的时候需要-c 指定配置的路径
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//解析启动参数 -p后面的东东
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
//如果开启-p,则RocketMQ真的只会打印参数列表,不会启动nameserver
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//需要启动前配置好 RocketMQ 的安装目录 ROCKETMQ_HOME
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
//logback日志文件
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//创建 NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
//。。。。。。省略代码
}
如果你要查看NameServer
的各种参数,直接在启动个参数中送入-p
就可以打印这个NameServer
的所有的参数信息。(以IDEA为例)
打印结果:
13:58:27.674 [main] INFO RocketmqNamesrvConsole - rocketmqHome=/Users/zyxelva/Documents/rocketmq-all-4.8.0-bin-release
13:58:27.732 [main] INFO RocketmqNamesrvConsole - kvConfigPath=/Users/zyxelva/namesrv/kvConfig.json
13:58:27.734 [main] INFO RocketmqNamesrvConsole - configStorePath=/Users/zyxelva/namesrv/namesrv.properties
13:58:27.735 [main] INFO RocketmqNamesrvConsole - productEnvName=center
13:58:27.737 [main] INFO RocketmqNamesrvConsole - clusterTest=false
13:58:27.737 [main] INFO RocketmqNamesrvConsole - orderMessageEnable=false
13:58:27.739 [main] INFO RocketmqNamesrvConsole - listenPort=9876
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverWorkerThreads=8
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverCallbackExecutorThreads=0
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverSelectorThreads=3
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverOnewaySemaphoreValue=256
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverAsyncSemaphoreValue=64
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverChannelMaxIdleTimeSeconds=120
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverSocketSndBufSize=65535
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverSocketRcvBufSize=65535
13:58:27.740 [main] INFO RocketmqNamesrvConsole - serverPooledByteBufAllocatorEnable=true
13:58:27.740 [main] INFO RocketmqNamesrvConsole - useEpollNativeSelector=false
4.2.2 初始化
即在NamesrvController
实例创建后,初始化相关信息,包括配置文件保存路径加载、NettyRemotingServer创建、线程池创建,最重要是两个定时任务创建和启动:
- 每隔
10s
扫描一次Broker,移除不活跃的Broker - 每隔
10min
打印一次KV配置
public class NamesrvStartup {
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
//三、初始化 NamesrvController
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//四、先在JVM关闭前注册钩子,关闭NettyServer、线程池、定时任务
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//五、启动
controller.start();
return controller;
}
}
public class NamesrvController {
public boolean initialize() {
//加载KV配置
this.kvConfigManager.load();
//创建NettyRemotingServer实例
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//线程池创建
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//向NettyRemotingServer注册NamesrvController和线程池
this.registerProcessor();
//每隔10s扫描一次为活跃Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//每隔10min打印一次KV配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//。。。。。。省略代码
return true;
}
}
4.2.3 在JVM进程关闭之前,先将线程池关闭,及时释放资源。再启动。
public class NamesrvStartup {
public static NamesrvController start(final NamesrvController controller) throws Exception {
//此处省略一万字
//四、先在JVM关闭前注册钩子,关闭NettyServer、线程池、定时任务
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//五、启动
controller.start();
return controller;
}
}
4.3 时序图
5 Broker启动流程
5.1 概览
5.2 时序图
大致流程与NameServer相差无几,这里主要讲讲Broker注册
、Broker删除
、路由发现
。
5.3 Broker注册(路由注册)
RocketMQ路由注册是通过Broker与NameServer的心跳功能
实现的。
Broker启动时向集群中所有的NameServer发送心跳信息,每隔30s
向集群中所有NameServer发送心跳包
,NameServer收到心跳包时会更新brokerLiveTable
缓存中BrokerLiveInfo的lastUpdateTimeStamp
信息,然后NameServer每隔10s
扫描brokerLiveTable
,如果连续120S
没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。
5.3.1 BrokerController#start()
定位到BrokerController#start()
方法。
public class BrokerController {
public void start() throws Exception {
//......省略各种start()
//开启DLedger多副本主从切换
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//开启事务状态回查处理器
startProcessorByHa(messageStoreConfig.getBrokerRole());
//处理从节点的元数据同步
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
//broker每隔30s向NameServer发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//注意这里第二个入参为false,即不是oneway方式发送
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
//.......
}
}
主要跟踪registerBrokerAll()
。依次经历BrokerController#registerBrokerAll
->BrokerController#doRegisterBrokerAll
->BrokerOuterAPI#registerBrokerAll
5.3.2 BrokerOuterAPI#registerBrokerAll()
public class BrokerOuterAPI {
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//心跳包格式:
//1.头部:brokerAddr、brokerId、brokerName、clusterName、haServerAddr、compressed
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
//2.请求体:主题配置、过滤服务器配置、CRC32校验和
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
//使用CountDownLatch向多个NameServer发起注册
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//多线程注册
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
//构建注册实体,注册代号:RequestCode.REGISTER_BROKER
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
//不会走这里,因为oneway为false
if (oneway) {
try {
//发起注册
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//走这里
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
//...............
}
}
Broker注册至NameServer,是一个客户端请求操作,会走NettyRemotingClient#invokeSync()
,实际上的处理类是NettyRemotingAbstract
负责公共的发送动作(也包括接收动作)。
5.3.3 NettyServerHandler#channelRead0()
由于是Netty负责网络这块,因而在服务端,通过NettyRemotingServer
进行接收,Handler为其内置类NettyServerHandler
,如同上述所说,实际为NettyRemotingAbstract
去处理。
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
//NettyRemotingAbstract#processMessageReceived
processMessageReceived(ctx, msg);
}
}
5.3.4 NettyRemotingAbstract#processRequestCommand()
定位到NettyRemotingAbstract
的processRequestCommand()
:
public abstract class NettyRemotingAbstract {
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
//.............
}
}
还记得NameSrvController注册的处理器吗?
public class NamesrvController {
public boolean initialize() {
//加载KV配置
this.kvConfigManager.load();
//创建NettyRemotingServer实例
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//线程池创建
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//就是这里。。。。。向NettyRemotingServer注册NamesrvController和线程池
this.registerProcessor();
//..............
}
}
其最终注册的就是DefaultRequestProcessor
类型。因而NameServer
处理Broker注册
的任务就归它管了。
5.3.5 DefaultRequestProcessor#processRequest()
然后定位到DefaultRequestProcessor#processRequest()
,根据注册代号即可找到NameServer
处理Broker
注册请求:
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//............
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
//判断是注册Broker信息
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
//这里用于组装注册请求实体
//版本高于3.0.11,由此注册broker,我的版本4.8.0,因此从此进
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
//版本低于3.0.11,注册Broker信息
return this.registerBroker(ctx, request);
};
//....................
default:
break;
}
return null;
}
}
5.3.6 NameServer.RouteInfoManager#registerBroker()
虽然有版本区分,但最终走的都是NameServer.RouteInfoManager#registerBroker()
。
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//注册Broker核心方法
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
//注册路由信息使用写锁
this.lock.writeLock().lockInterruptibly();
//维护clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//维护brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//第一次注册,则创建brokerData
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//非第一次注册,更新Broker
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//维护topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
}
5.4 Broker删除(路由删除)
Broker每隔30s
向NameServer发送一个心跳包,心跳包
包含:
BrokerId
Broker地址
Broker名称
Broker所属集群名称
Broker关联的FilterServer列表
。
但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?
NameServer会每隔10s
扫描brokerLiveTable
状态表(见4.2.2 初始化),如果BrokerLiveInfo的lastUpdateTimestamp
的时间戳距当前时间超过120s
,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable
、brokerAddrTable
、brokerLiveTable
、filterServerTable
。
RocketMQ有两个触发点
来删除路由信息:
- NameServer定期扫描
brokerLiveTable
检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。//NamesrvController#initialize() //每隔10s扫描一次为活跃Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //RouteInfoManager#scanNotActiveBroker() public class RouteInfoManager { //超过120s,则认为Broker失效 public void scanNotActiveBroker() { //获得brokerLiveTable Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); //遍历brokerLiveTable while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); //如果收到心跳包的时间距当时时间是否超过120s if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //关闭连接 RemotingUtil.closeChannel(next.getValue().getChannel()); //移除broker it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); //维护路由表(更新路由表) this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } } }
- Broker在正常关闭的情况下,会执行
unregisterBrokerAll
指令。请求代号:UNREGISTER_BROKER
//BrokerController#shutdown() public void shutdown() { this.unregisterBrokerAll(); }
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。
5.5 路由发现
RocketMQ路由发现是非实时的
,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取
主题最新的路由。
这里,客户端
是指生产者
和消费者
,他两都持有MQClientInstance
实例(单例),客户端启动时,会开启定时任务,如下图定时任务一
:
其会向NameServer发起代号为GET_ROUTEINFO_BY_TOPIC
的请求,获取最新的路由信息。
而在NameServer端,通过其启动时注册的处理器DefaultRequestProcessor
,处理客户端发来的请求:
//DefaultRequestProcessor
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServer
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
6 总结
NameServer存储信息:
topicQueueTable
:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡;brokerAddrTable
:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址;clusterAddrTable
:Broker集群信息,存储集群中所有Broker名称;brokerLiveTable
:Broker状态信息,NameServer每次收到心跳包是会替换该信息;filterServerTable
:Broker上的FilterServer列表,用于类模式消息过滤;
NameServer的存储实现基于
内存
(HashMap),NameServer并不会持久化路由信息,持久化的重任是交给Broker来完成。