0 前言
RocketMQ消息队列集群主要包括NameServer
、Broker
(Master/Slave)、Producer
、Consumer
4个角色。在【RocketMQ学习】1.基础知识一文中,我们了解了其物理架构,本章将深入了解其各个组件的通信机制及其对于Netty的整合。
1 通信过程
先来了解下上述4个组件的通信过程:
- (1)
Broker
启动后需要完成一次将自己注册至NameServer
的操作;随后每隔30s
时间定时向NameServer上报Topic路由信息
。 - (2) 消息生产者
Producer
作为客户端发送消息时候,需要根据消息的Topic
从本地缓存的TopicPublishInfoTable
获取路由信息。如果没有则更新路由信息会从NameServer
上重新拉取,同时Producer会默认每隔30s
向NameServer拉取一次路由信息。 - (3) 消息生产者
Producer
根据(2)
中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker
作为消息的接收者接收消息
并落盘存储
。 - (4) 消息消费者
Consumer
根据(2)
中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息
并进行消费
。
从上面(1)~(3)中可以看出在消息生产者Producer
, Broker
和NameServer
之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。
rocketmq-remoting
模块是RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty
的基础之上扩展了通信模块。
其中RemotingService
提供了公共接口,RemotingClient
、RemotingServer
接口封装了一些功能接口,包括注册每种消息类型的处理器及对应的线程池
、消息发送的三种实现方式[^1]:同步
(sync)、异步
(async)、单向
(oneway),NettyRemotingClient
、NettyRemotingServer
则分别为其各自的具体实现类。
2 协议设计与编解码
前面学习Netty时,我们已经知道,要使得组件之间能够通信,能够识别发送过来的消息,需要进行一种消息的约定,这种约定规定了消息的结构,编解码方式,才能使得消息被正确接收和解读。
在RocketMQ中,RemotingCommand
这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构
,还包含了编码解码操作
。
下表为Header部分字段说明:
Header字段 | 类型 | Request说明 | Response说明 |
---|---|---|---|
code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 |
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 |
version | int | 请求方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 |
flag | int | 区分是普通RPC还是onewayRPC得标志 | 区分是普通RPC还是onewayRPC得标志 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap<String, String> | 请求自定义扩展信息 | 响应自定义扩展信息 |
传输内容主要可以分为以下4部分:
- (1) 消息长度:
总长度
,四个字节
存储,占用一个int类型; - (2) 序列化类型&消息头长度:同样占用一个int类型,
第一个字节
表示序列化类型
,后面三个字节
表示消息头长度
; - (3) 消息头数据:经过
序列化
后的消息头数据; - (4) 消息主体数据:消息主体的
二进制字节数据内容
;
编解码器分为两大部分:一部分是对整个消息的编解码,一个是对消息头数据的编解码,以下为消息主体的编码实现:
public ByteBuffer encode() {
// 1> 消息长度占4字节
int length = 4;
// 2> 消息头数据长度
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> 消息主体数据长度
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// 消息总长
result.putInt(length);
// 消息头总长+序列化类型
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 序列化后的消息头数据
result.put(headerData);
// 序列化后的消息主体数据
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
3 NettyRemotingServer
3.1 启动
回顾下【Netty学习】4.Netty的一个例子中,NettyServer启动过程。
首先需要一个ServerBootstrap
、其次是两个EventLoopGroup
、然后是通信模式(NIO or BIO or IO Multiplexing)、最后是绑定端口、注册Handler。
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
//NettyRemotingServer#start()
@Override
public void start() {
//........省略一万字
ServerBootstrap childHandler =
//两个EventLoopGroup
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
//通信模式
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//绑定端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
//注册Handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
//异步绑定到服务器,sync()会阻塞到完成
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
//....................
}
}
3.2 发送方式
NettyRemotingServer
不光是一个NettyServer
,还提供了RocketMQ消息发送方式的具体实现,先来看看其直接父级接口:
public interface RemotingServer extends RemotingService {
//注册每种消息类型的处理器及对应的线程池
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
//注册默认消息类型的处理器及对应的线程池
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
int localListenPort();
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
//同步发送
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
//异步发送
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
//单向发送
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
三种方式具体调用仍然是抽象父类NettyRemotingAbstract
的实现。下图为消息发送之异步发送
的流程:
3.3 消息类型及其处理器、线程池注册
同上。
4 NettyRemotingClient
类似NettyRemotingServer
,其直接父级接口如下:
public interface RemotingClient extends RemotingService {
//更新服务端地址列表
void updateNameServerAddressList(final List<String> addrs);
//获取服务端地址列表
List<String> getNameServerAddressList();
//同步发送
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
//异步发送
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
//单向发送
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
//注册每种消息类型的处理器及对应的线程池
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
void setCallbackExecutor(final ExecutorService callbackExecutor);
ExecutorService getCallbackExecutor();
boolean isChannelWritable(final String addr);
}
5 RocketMQ中的Reactor
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。
上面的框图中可以大致了解RocketMQ中NettyRemotingServer
的Reactor多线程模型
。
以下是官方解释流程:
- 一个
Reactor
主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel
,并注册到selector上。 - RocketMQ的源码中会自动根据OS的类型选择
NIO
和Epoll
,也可以通过参数配置),然后监听真正的网络数据。 - 拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行
SSL验证
、编解码
、空闲检查
、网络连接管理
,这些工作交给defaultEventExecutorGroup
(即为上面的“M1”,源码中默认设置为8)去做。 - 而处理业务操作放在业务线程池中执行,根据
RemotingCommand
的业务请求码code去processorTable
这个本地缓存变量中找到对应的processor
,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。 - 从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
参阅:
[^1]: 2.1 普通消息发送方式