【RocketMQ学习】6.源码之Remoting


0 前言

RocketMQ消息队列集群主要包括NameServerBroker(Master/Slave)、ProducerConsumer4个角色。在【RocketMQ学习】1.基础知识一文中,我们了解了其物理架构,本章将深入了解其各个组件的通信机制及其对于Netty的整合。

1 通信过程

先来了解下上述4个组件的通信过程:

  • (1) Broker启动后需要完成一次将自己注册NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息
  • (2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
  • (3) 消息生产者Producer根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息落盘存储
  • (4) 消息消费者Consumer根据(2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费

从上面(1)~(3)中可以看出在消息生产者Producer, BrokerNameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

RocketMQ核心组件通信概览

rocketmq-remoting模块是RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

Remoting通信类结构

其中RemotingService提供了公共接口,RemotingClientRemotingServer接口封装了一些功能接口,包括注册每种消息类型的处理器及对应的线程池、消息发送的三种实现方式[^1]:同步(sync)、异步(async)、单向(oneway),NettyRemotingClientNettyRemotingServer则分别为其各自的具体实现类。

2 协议设计与编解码

前面学习Netty时,我们已经知道,要使得组件之间能够通信,能够识别发送过来的消息,需要进行一种消息的约定,这种约定规定了消息的结构,编解码方式,才能使得消息被正确接收和解读。

在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作

消息的组成

下表为Header部分字段说明:

Header字段类型Request说明Response说明
codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误
languageLanguageCode请求方实现的语言应答方实现的语言
versionint请求方程序的版本应答方程序的版本
opaqueint相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回
flagint区分是普通RPC还是onewayRPC得标志区分是普通RPC还是onewayRPC得标志
remarkString传输自定义文本信息传输自定义文本信息
extFieldsHashMap<String, String>请求自定义扩展信息响应自定义扩展信息

传输内容主要可以分为以下4部分:

  • (1) 消息长度:总长度四个字节存储,占用一个int类型;
  • (2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型后面三个字节表示消息头长度
  • (3) 消息头数据:经过序列化后的消息头数据;
  • (4) 消息主体数据:消息主体的二进制字节数据内容

编解码器分为两大部分:一部分是对整个消息的编解码,一个是对消息头数据的编解码,以下为消息主体的编码实现:

public ByteBuffer encode() {
        // 1> 消息长度占4字节
        int length = 4;

        // 2> 消息头数据长度
        byte[] headerData = this.headerEncode();
        length += headerData.length;

        // 3> 消息主体数据长度
        if (this.body != null) {
            length += body.length;
        }

        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // 消息总长
        result.putInt(length);

        // 消息头总长+序列化类型
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // 序列化后的消息头数据
        result.put(headerData);

        // 序列化后的消息主体数据
        if (this.body != null) {
            result.put(this.body);
        }

        result.flip();

        return result;
    }

3 NettyRemotingServer

3.1 启动

回顾下【Netty学习】4.Netty的一个例子中,NettyServer启动过程。

首先需要一个ServerBootstrap、其次是两个EventLoopGroup、然后是通信模式(NIO or BIO or IO Multiplexing)、最后是绑定端口、注册Handler。

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    //NettyRemotingServer#start()
    @Override
    public void start() {
        //........省略一万字

        ServerBootstrap childHandler =
                //两个EventLoopGroup
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        //通信模式
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .option(ChannelOption.SO_KEEPALIVE, false)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        //绑定端口
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        //注册Handler
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                        .addLast(defaultEventExecutorGroup,
                                                encoder,
                                                new NettyDecoder(),
                                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                                connectionManageHandler,
                                                serverHandler
                                        );
                            }
                        });
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            //异步绑定到服务器,sync()会阻塞到完成
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
        
        //....................
    }
}

3.2 发送方式

NettyRemotingServer不光是一个NettyServer,还提供了RocketMQ消息发送方式的具体实现,先来看看其直接父级接口:

public interface RemotingServer extends RemotingService {
    //注册每种消息类型的处理器及对应的线程池
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
                           final ExecutorService executor);
    //注册默认消息类型的处理器及对应的线程池
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    //同步发送
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
                               final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
            RemotingTimeoutException;
    //异步发送
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
                     final InvokeCallback invokeCallback) throws InterruptedException,
            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    //单向发送
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
            RemotingSendRequestException;

}

三种方式具体调用仍然是抽象父类NettyRemotingAbstract的实现。下图为消息发送之异步发送的流程:

异步发送流程

3.3 消息类型及其处理器、线程池注册

同上。

4 NettyRemotingClient

类似NettyRemotingServer,其直接父级接口如下:

public interface RemotingClient extends RemotingService {
    //更新服务端地址列表
    void updateNameServerAddressList(final List<String> addrs);

    //获取服务端地址列表
    List<String> getNameServerAddressList();

    //同步发送
    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
                               final long timeoutMillis) throws InterruptedException, RemotingConnectException,
            RemotingSendRequestException, RemotingTimeoutException;

    //异步发送
    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
                     final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    //单向发送
    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
            RemotingTimeoutException, RemotingSendRequestException;

    //注册每种消息类型的处理器及对应的线程池
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
                           final ExecutorService executor);

    void setCallbackExecutor(final ExecutorService callbackExecutor);

    ExecutorService getCallbackExecutor();

    boolean isChannelWritable(final String addr);
}

5 RocketMQ中的Reactor

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。

RocketMQ中的Reactor

上面的框图中可以大致了解RocketMQ中NettyRemotingServerReactor多线程模型
以下是官方解释流程:

  • 一个 Reactor主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。
  • RocketMQ的源码中会自动根据OS的类型选择NIOEpoll,也可以通过参数配置),然后监听真正的网络数据。
  • 拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证编解码空闲检查网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。
  • 而处理业务操作放在业务线程池中执行,根据RemotingCommand的业务请求码code去processorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。
  • 从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

参阅:
[^1]: 2.1 普通消息发送方式


文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录