【Netty学习】6.编解码器


1 定义

每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式相互转换。这种转换逻辑由编解码器处理。

编解码器由编码器解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。

2 解码器

负责将入站数据从一种格式转换到另一种格式的,所以Netty的解码器实现了ChannelInboundHandler

  • 将字节解码为消息:ByteToMessageDecoderReplayingDecoder;
  • 将一种消息类型解码为另一种:MessageToMessageDecoder

每当需要为ChannelPipeline 中的下一个ChannelInboundHandler 转换入站数据时会用到。

2.1 抽象类 ByteToMessageDecoder

ByteToMessageDecoder是Netty将字节解码消息的基类。下表给出了它最重要的两个方法。

方法描述
decode(ChannelHandlerContext ctx, ByteBuf in, Listout)decode()方法被调用时将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码消息的 List
decodeLast(ChannelHandlerContext ctx, ByteBuf in, Listout)Netty提供的这个默认实现只是简单地调用了decode()方法。 当Channel的状态变为非活动时,这个方法将会被调用一次。 可以重写该方法以提供特殊的处理

我自己实现聊天应用时,利用Protocol Buffer实现了消息的序列化。对于消息头,我进行了一些加工,比如添加了一个魔数(short),版本(short),然后才是消息的长度(int)。因此,对于消息头处理时,需要判断及读取特定长度的数据。

public class ProtocolBufferDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 标记一下当前的readIndex的位置
        in.markReaderIndex();
        // 判断包头长度
        if (in.readableBytes() < 8) {
            // 不够包头
            return;
        }
        //读取魔数
        short magic = in.readShort();
        if (magic != ProtoInstant.MAGIC_CODE) {
            String error = "客户端口令不对:" + ctx.channel().remoteAddress();
            throw new BusinessException(error);
        }
        //读取版本
        short version = in.readShort();
        // 读取传送过来的消息的长度。
        int length = in.readInt();

        //此处省略一万字符......
    }
}

虽然 ByteToMessageDecoder 使得可以很简单地实现这种模式,但是你可能会发现,在调用 readInt()方法前不得不验证所输入的 ByteBuf 是否具有足够的数据有点繁琐。因而,Netty也提供了更加简便的解码器。

2.2 抽象类 ReplayingDecoder

ReplayingDecoder扩展了ByteToMessageDecoder类,使得我们不必调用readableBytes()方法。它通过使用一个自定义的ByteBuf实现ReplayingDecoderByteBuf,包装传入的ByteBuf实现了这一点,其将在内部执行调用readableBytes()方法

这个类的完整声明是:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理。

请注意 ReplayingDecoderByteBuf 的下面这些方面:

  • 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个 UnsupportedOperationException;
  • ReplayingDecoder 稍慢于 ByteToMessageDecoder

在真实的、更加复杂的情况下,使用一种或者另一种作为基类所带来的差异可能是很显著的。这里有一个简单的准则:如果使用 ByteToMessageDecoder 不会引入太多的复杂性,那么请使用它;否则,请使用 ReplayingDecoder

2.3 抽象类 MessageToMessageDecoder

完整声明如下:

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

类型参数 I 指定了 decode()方法的输入参数 msg 的类型,它是你必须实现的唯一方法。如下表所示:

方法描述
decode(ChannelHandlerContext ctx, I msg, Listout)对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。

比如IntegerToStringDecoder用于将integer类型转化为String类型。如下所示:

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

2.4 TooLongFrameException 类

由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。

为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一个 TooLongFrameException(随后会被 ChannelHandler.exceptionCaught()方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP)可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接

下面代码展示了 ByteToMessageDecoder 是如何使用 TooLongFrameException 来通知 ChannelPipeline 中的其他 ChannelHandler 发生了帧大小溢出的。需要注意的是,如果你正在使用一个可变帧大小的协议,那么这种保护措施将是尤为重要的。

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
    private static final int MAX_FRAME_SIZE = 1024;

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        if (readable > MAX_FRAME_SIZE) {
            in.skipBytes(readable);
        }
        throw new TooLongFrameException("Frame too big!");
// do something ...
    }
}

3 编码器

与解码器相反,其实现了 ChannelOutboundHandler,并将出站数据从一种格式转换为另一种格式。Netty 提供了一组类,用于帮助你编写具有以下功能的编码器:

  • 将消息编码为字节
  • 将消息编码为消息

3.1 抽象类 MessageToByteEncoder

其API如下表所示:

方法描述
encode(ChannelHandlerContext ctx, I msg, ByteBuf out)encode()方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为 ByteBuf 的(类型为 I 的)出站消息。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler

这个类只有一个方法,而解码器有两个。原因是解码器通常需要在 Channel 关闭之后产生最后一个消息(因此也就有了 decodeLast()方法)。这显然不适用编码器的场景——在连接被关闭之后仍然产生一个消息是毫无意义的

3.2 抽象类 MessageToMessageEncoder

方法描述
encode(ChannelHandlerContext ctx, I msg, Listout)这是你需要实现的唯一方法。每个通过 write()方法写入的消息都将会被传递给 encode()方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler

4 抽象的编解码器类

我们一直将解码器和编码器作为单独的实体讨论,但是你有时将会发现在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对。这些类同时实现了 ChannelInboundHandlerChannelOutboundHandler 接口。

为什么我们并没有一直优先于单独的解码器和编码器使用这些复合类呢? 因为通过尽可能地将这两种功能分开,最大化了代码的可重用性可扩展性,这是 Netty 设计的一个基本原则。

相关的类:

  • 抽象类 ByteToMessageCodec
  • 抽象类 MessageToMessageCodec

4.1 ByteToMessageCodec

方法名称描述
decode(ChannelHandlerContext ctx, ByteBuf in, List)只要有字节可以被消费,这个方法就将会被调用。它将入站 ByteBuf 转换为指定的消息格式,并将其转发给 ChannelPipeline 中的下一个 ChannelInboundHandler
decodeLast(ChannelHandlerContext ctx, ByteBuf in, Listout)这个方法的默认实现委托给了 decode()方法。它只会在 Channel 的状态变为非活动时被调用一次。它可以被重写以实现特殊的处理
encode(ChannelHandlerContext ctx, I msg, ByteBuf out)对于每个将被编码并写入出站 ByteBuf 的(类型为 I 的)消息来说,这个方法都将会被调用

4.2 MessageToMessageCodec

MessageToMessageCodec 是一个参数化的类,定义如下:

public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>

方法名称描述
protected abstract decode( ChannelHandlerContext ctx, INBOUND_IN msg, Listout)这个方法被调用时会被传入 INBOUND_IN 类型的消息。 它将把它们解码为 OUTBOUND_IN 类型的消息,这些消息将被转发给 ChannelPipeline 中的下一个 ChannelInboundHandler
protected abstract encode( ChannelHandlerContext ctx, OUTBOUND_IN msg, Listout)对于每个 OUTBOUND_IN 类型的消息,这个方法都将会被调用。这些消息将会被编码为 INBOUND_IN 类型的消息,然后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler

5 内置的编解码器

5.1 通过SSL/TLS 保护Netty 应用程序

为了支持 SSL/TLS,Java 提供了 javax.net.ssl 包,它的 SSLContextSSLEngine 类使得实现解密和加密相当简单直接。Netty 通过一个名为 SslHandlerChannelHandler 实现利用了这个 API,其中 SslHandler 在内部使用 SSLEngine 来完成实际的工作。

通过 SslHandler 进行解密和加密的数据流

在大多数情况下,SslHandler将是ChannelPipeline中的第一个ChannelHandler

5.2 HTTP 系列

HTTP 是基于请求/响应模式的:客户端向服务器发送一个 HTTP 请求,然后服务器将会返回一个 HTTP 响应。Netty 提供了多种编码器和解码器以简化对这个协议的使用。

HttpRequest组成
HttpResponse组成

一个HTTP请求/响应可能由多个数据部分组成,FullHttpRequestFullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequestLastHttpContent 等等)都实现了 HttpObject 接口

下表概要地介绍了处理和生成这些消息的 HTTP 解码器和编码器。

名称描述
HttpRequestEncoderHttpRequestHttpContentLastHttpContent 消息编码字节
HttpResponseEncoderHttpResponseHttpContentLastHttpContent 消息编码字节
HttpRequestDecoder字节解码HttpRequestHttpContentLastHttpContent 消息
HttpResponseDecoder字节解码HttpResponseHttpContentLastHttpContent 消息

5.3 WebSocket

WebSocket协议

5.4 空闲的连接和超时

检测空闲连接以及超时对于及时释放资源来说是至关重要的。由于这是一项常见的任务,Netty 特地为它提供了几个 ChannelHandler 实现。下表给出了它们的概述。

名称描述
IdleStateHandler当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过在你的 ChannelInboundHandler重写 userEventTriggered()方法来处理该 IdleStateEvent 事件。
ReadTimeoutHandler如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个 ReadTimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught()方法来检测该 ReadTimeoutException
WriteTimeoutHandler如果在指定的时间间隔内没有任何出站数据写入,则抛出一个 WriteTimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandlerexceptionCaught()方法检测该 WriteTimeoutException

5.5 序列化

5.5.1 JDK 序列化

JDK 提供了 ObjectOutputStreamObjectInputStream,用于通过网络对 POJO基本数据类型进行序列化反序列化

下表中列出了 Netty提供的用于和JDK进行互操作的序列化类。

名称描述
CompatibleObjectDecoder和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的解码器
CompatibleObjectEncoder和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的编码器
ObjectDecoder构建于 JDK 序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取。
ObjectEncoder构建于 JDK 序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取。

5.5.2 JBoss Marshalling 序列化

如果你可以自由地使用外部依赖,那么JBoss Marshalling将是个理想的选择:它比JDK序列化最多快 3 倍,而且也更加紧凑

JBoss Marshalling 是一种可选的序列化 API,它修复了在 JDK 序列化 API 中所发现的许多问题,同时保留了与 java.io.Serializable 及其相关类的兼容性,并添加 了几个新的可调优参数以及额外的特性,所有的这些都是可以通过工厂配置(如外部序列化器、类/实例查找表、类解析以及对象替换等)实现可插拔的。

Netty 通过下表所示的两组解码器/编码器对为 Boss Marshalling 提供了支持。

名称描述
CompatibleMarshallingDecoder/CompatibleMarshallingEncoder与只使用 JDK 序列化的远程节点兼容
MarshallingDecoder/MarshallingEncoder适用于使用 JBoss Marshalling 的节点。这些类必须一起使用

5.5.3 Protocol Buffers

Protocol Buffers 是一种由Google公司开发的、现在已经开源的数据交换格式。它以一种紧凑高效的方式对结构化的数据进行编码以及解码。它具有许多的编程语言绑定,使得它很适合跨语言的项目。

下表展示了 Netty 为支持 protobuf 所提供的 ChannelHandler 实现

名称描述
ProtobufDecoder使用 protobuf 对消息进行解码
ProtobufEncoder使用 protobuf 对消息进行编码
ProtobufVarint32FrameDecoder根据消息中的 Google Protocol BuffersBase 128 Varints整型长度字段值动态地分割所接收到的 ByteBuf
ProtobufVarint32LengthFieldPrependerByteBuf前追加一个Google Protocol BuffersBase 128 Varints整型的长度字段值

文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录