netty编解码器与序列化框架分析
netty编解码器分析
编码(Encode)也称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
反之,解码(Decode)也称为反序列化(deserialization),用于把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。
进行远程跨进程服务调用时(例如 RPC 调用),需要使用特定的编解码技术,对需要进行网络传输的对象做编码或者解码,以便完成远程调用。
Java序列化
Java默认提供的序列化机制,需要序列化的Java对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInputStream和java.io.ObjectOutputStrem序列化和反序列化。
但是由于它自身存在很多缺点,因此大多数的RPC框架并没有选择它。Java序列化的主要缺点如下:
- 无法跨语言:是Java序列化最致命的问题。对于跨进程的服务调用,服务提供者可能会使用C++或者其它语言开发,当我们需要和异构语言进程交互时,Java序列化就难以胜任。由于Java序列化技术是Java语言内部的私有协议,其它语言并不支持,对于用户来说它完全是黑盒。Java序列化后的字节数组,别的语言无法进行反序列化,这就严重阻碍了它的应用范围。
- 序列化后的码流太大,例如使用二进制编解码技术对同一个复杂的POJO对象进行编码,它的码流仅仅为Java序列化之后的20%左右;目前主流的编解码框架,序列化之后的码流都远远小于原生的Java序列化。
- 序列化性能太低。
在netty中使用java的序列化,只需要添加编码器ObjectEncoder和解码器ObjectDecoder即可,netty都封装好了。
服务器端代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder()); // 编码
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); // 解码
ch.pipeline().addLast(new ServerHandler());
}
});
服务器端业务代码:
public class ServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
System.out.println("receive from client: " + msg);
UserResponse response = new UserResponse();
response.setCode(200);
response.setMessage("success");
ctx.writeAndFlush(response);
}
}
思考:不需要解决粘包半包问题?ObjectEncoder和ObjectDecoder中已经对粘包半包问题进行了处理,使用的是自定义消息长度的方式,下面看源代码。
io.netty.handler.codec.serialization.ObjectEncoder的源代码:
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
int startIdx = out.writerIndex();
ByteBufOutputStream bout = new ByteBufOutputStream(out);
ObjectOutputStream oout = null;
try {
bout.write(LENGTH_PLACEHOLDER); // 先占位4个字节
oout = new CompactObjectOutputStream(bout);
oout.writeObject(msg);
oout.flush();
} finally {
if (oout != null) {
oout.close();
} else {
bout.close();
}
}
int endIdx = out.writerIndex();
out.setInt(startIdx, endIdx - startIdx - 4); // 计算报文的长度,放在开始的4个字节里面
}
}
io.netty.handler.codec.serialization.ObjectDecoder解码器的源代码:
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
... ...
public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
super(maxObjectSize, 0, 4, 0, 4);
this.classResolver = classResolver;
}
... ...
}
发现ObjectDecoder继承LengthFieldBasedFrameDecoder,构造方法中调用了父类的构造方法super(maxObjectSize, 0, 4, 0, 4);
,指定了解码时前4个字节为报文的长度,解码后丢弃前面的4个字节,传递给后面的Handler。
Google的Protobuf
Protobuf全称Google Protocol Buffers,它由谷歌开源而来,在谷歌内部久经考验。它将数据结构以.proto 文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性。
它的特点如下:
- 结构化数据存储格式(XML,JSON等)。
- 高效的编解码性能。
- 语言无关、平台无关、扩展性好。
使用方法如下:
先下载对应操作系统的工具,下载地址:https://github.com/protocolbuffers/protobuf/releases,这里使用的是`protoc-3.13.0-win64.zip`,解压缩后bin目录下有个protoc.exe。
编写UserResponse.proto文件:
syntax = "proto3";
option java_package = "com.morris.netty.serialize.protobuf";
option java_outer_classname = "UserResponseProto";
message UserResponse {
int32 code = 1;
string message = 2;
}
然后使用protoc.exe生成对应的实体类:
> protoc.exe --java_out=. UserResponse.proto
把生成的实体类,拷贝到工程对应的目录。
项目中引入protobuf的maven依赖:
dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
服务器端代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 处理粘包半包的解码器
ch.pipeline().addLast(new ProtobufDecoder(UserRequestProto.UserRequest.getDefaultInstance())); // protobuf解码器
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 处理粘包半包的编码器
ch.pipeline().addLast(new ProtobufEncoder()); // protobuf编码器
ch.pipeline().addLast(new ServerHandler()); // 具体业务处理
}
});
服务器端业务代码:
public class ServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("receive from client: " + msg);
UserResponseProto.UserResponse response = UserResponseProto.UserResponse.newBuilder().setCode(200).setMessage("success").buildPartial();
ctx.writeAndFlush(response);
}
}
JBoss Marshalling
JBoss Marshalling是一个Java对象的序列化API包,修正了JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容;同时增加了一些可调的参数和附加的特性,并且这些参数和特性可通过工厂类进行配置。
相比于传统的Java序列化机制,它的优点如下:
- 可插拔的类解析器,提供更加便捷的类加载定制策略,通过一个接口即可实现定制;
- 可插拔的对象替换技术,不需要通过继承的方式;
- 可插拔的预定义类缓存表,可以减小序列化的字节数组长度,提升常用类型的对象序列化性能;
- 无须实现java.io.Serializable接口,即可实现Java序列化;
- 通过缓存技术提升对象的序列化性能。
使用方法如下:
引入marshalling的maven依赖:
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.6.Final</version>
</dependency>
创建一个生产marshalling编码器和marshalling解码器的工厂类:
package com.morris.netty.serialize.marshalling;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
public final class MarshallingCodeCFactory {
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
服务器端代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); // 解码器
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); // 编码器
ch.pipeline().addLast(new ServerHandler());
}
});
这里不需要处理粘包和半包问题的原因与java序列化不需要处理的原因一致,具体可以参考MarshallingEncoder和MarshallingDecoder的源代码。
服务器端业务代码与java序列化的服务器端业务代码一致。
相比于前面介绍的两种编解码框架,JBoss Marshalling更多是在JBoss内部使用,应用范围有限,netty是JBoss公司的,所以netty要支持一下自己家的产品。
自定义编解码器MessagePack
编码器相关基类:
- 将消息编码为字节:MessageToByteEncoder。
- 将消息编码为消息:MessageToMessageEncoder,T代表源数据的类型。
解码器相关基类:
- 将字节解码为消息:ByteToMessageDecoder。
- 将一种消息类型解码为另一种消息类型:MessageToMessageDecoder。
编解码器类:
- ByteToMessageCodec。
- MessageToMessageCodec。
下面通过自定义编码器和解码器,将MessagePack作为序列化框架嵌入到项目中来:
引入MessagePack的maven依赖:
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
编码器:
package com.morris.netty.serialize.messagepack;
import com.morris.netty.serialize.pojo.UserRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
public class MessagePackEncoder extends MessageToByteEncoder<UserRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, UserRequest msg, ByteBuf out)
throws Exception {
MessagePack messagePack = new MessagePack();
byte[] raw = messagePack.write(msg);
out.writeBytes(raw);
}
}
解码器:
package com.morris.netty.serialize.messagepack;
import com.morris.netty.serialize.pojo.UserRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.io.Serializable;
import java.util.List;
public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
MessagePack messagePack = new MessagePack();
Serializable serializable = messagePack.read(bytes, UserRequest.class);
out.add(serializable);
}
}
实体类,注意加上@Message注解:
package com.morris.netty.serialize.pojo;
import lombok.Data;
import org.msgpack.annotation.Message;
import java.io.Serializable;
@Message
@Data
public class UserRequest implements Serializable {
private int age;
private String name;
}
服务器端启动类关键代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // 解决粘包半包问题
ch.pipeline().addLast(new MessagePackDecoder()); // 解码器
ch.pipeline().addLast(new ServerHandler());
}
});
服务器端业务处理类:
package com.morris.netty.serialize.messagepack;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
System.out.println("receive from client: " + msg);
ctx.writeAndFlush(Unpooled.copiedBuffer("success\n".getBytes()));
ctx.fireChannelRead(msg);
}
}
客户端启动类关键代码:
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(2)); // 解决粘包半包问题
// 服务器端发送过来的是以换行符分割的文本,所以这里使用LineBasedFrameDecoder处理粘包半包问题
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new MessagePackEncoder()); // 编码器
ch.pipeline().addLast(new ClientHandler());
}
});
客户端业务处理类:
package com.morris.netty.serialize.messagepack;
import com.morris.netty.serialize.pojo.UserRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandler extends SimpleChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 100; i++) {
UserRequest request = new UserRequest();
request.setAge(i);
request.setName("morris");
ctx.writeAndFlush(request);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
ByteBuf receiveByteBuf = (ByteBuf) msg;
byte[] bytes = new byte[receiveByteBuf.readableBytes()];
receiveByteBuf.readBytes(bytes);
System.out.println("receive from server: " + new String(bytes));
}
}
当然,除了上述介绍的编解码框架和技术之外,比较常用的还有kryo、hession和Json等。
还没有评论,来说两句吧...