Netty编解码器框架(十一) Myth丶恋晨 2022-10-29 12:29 355阅读 0赞 今天分享Netty编解码器框架 一、什么是编解码器 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何 将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编 码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区 别是什么呢? 如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列—它的数据。 那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器 则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。我们前面所学的解决粘包半包的其实也是编解码器框架的一部分。 1、解码器 将字节解码为消息—— ByteToMessageDecoder 将一种消息类型解码为另一种—— MessageToMessageDecoder 。 因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以 Netty 的解码器实现了 ChannelInboundHandler 。 什么时候会用到解码器呢?很简单:每当需要为 ChannelPipeline 中的下一个ChannelInboundHandler 转换入站数据时会用到。此外,得益于 ChannelPipeline 的设计,可 以将多个解码器链接在一起,以实现任意复杂的转换逻辑。 2、将字节解码为消息 抽象类 ByteToMessageDecoder 将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于 Netty 为它 提供了一个抽象的基类:ByteToMessageDecoder 。由于你不可能知道远程节点是否会一次性 地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。 它最重要方法 decode(ChannelHandlerContext ctx,ByteBuf in,List out) 这是你必须实现的唯一抽象方法。 decode() 方法被调用时将会传入一个包含了传入数据 的 ByteBuf ,以及一个用来添加解码消息的 List 。对这个方法的调用将会重复进行,直到确 定没有新的元素被添加到该 List ,或者该 ByteBuf 中没有更多可读取的字节时为止。然后,如果该 List 不为空,那么它的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。 3、将一种消息类型解码为另一种 在两个消息格式之间进行转换(例如,从 String->Integer )decode(ChannelHandlerContext ctx,I msg,List out) 对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler MessageToMessageDecoder, T 代表源数据的类型 4、TooLongFrameException 由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此, 不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提 供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。 为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛 出一个 TooLongFrameException (随后会被 ChannelHandler.exceptionCaught() 方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP )可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接。如下代码: public class TooLongExSample extends ByteToMessageDecoder { private static final int MAX_SIZE = 1024; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int readable = in.readableBytes(); if(readable>MAX_SIZE){ ctx.close(); throw new TooLongFrameException("传入的数据太多"); } }}5、编码器 解码器的功能正好相反。 Netty 提供了一组类,用于帮助你编写具有以下功能的编码器: 将消息编码为字节; MessageToByteEncoder 将消息编码为消息: MessageToMessageEncoder , T 代表源数据的类型 6、将消息编码为字节 encode(ChannelHandlerContext ctx,I msg,ByteBuf out) encode() 方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为ByteBuf 的出站消息(类型为 I 的)。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler 7、将消息编码为消息 encode(ChannelHandlerContext ctx,I msg,List out) 这是你需要实现的唯一方法。每个通过 write() 方法写入的消息都将会被传递给 encode() 方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler 8、编解码器类 我们一直将解码器和编码器作为单独的实体讨论,但是你有时将会发现在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/ 编码器对。这些类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口。 为什么我们并没有一直优先于单独的解码器和编码器使用这些复合类呢?因为通过尽可能地将这两种功能分开,最大化了代码的可重用性和可扩展性,这是 Netty 设计的一个基本原则。 相关的类: 抽象类 ByteToMessageCodec 抽象类 MessageToMessageCodec 9、Netty 内置的编解码器和 ChannelHandler Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了你在那 些相当繁琐的事务上本来会花费的时间与精力。 二、通过 SSL/TLS 保护 Netty 应用程序 SSL 和 TLS 这样的安全协议,它们层叠在其他协议之上,用以实现数据安全。我们在访问安全网站时遇到过这些协议,但是它们也可用于其他不是基于 HTTP 的应用程序,如安全 SMTP( SMTPS )邮件服务器甚至是关系型数据库系统。 为了支持 SSL/TLS , Java 提供了 javax.net.ssl 包,它的 SSLContext 和 SSLEngine 类使得 实现解密和加密相当简单直接。Netty 通过一个名为 SslHandler 的 ChannelHandler 实现利用了这个 API ,其中 SslHandler 在内部使用 SSLEngine 来完成实际的工作。 在大多数情况下,SslHandler 将是 ChannelPipeline 中的第一个 ChannelHandler 。 1、HTTP 系列 HTTP 是基于请求 / 响应模式的:客户端向服务器发送一个 HTTP 请求,然后服务器将会返回一个 HTTP 响应。 Netty 提供了多种编码器和解码器以简化对这个协议的使用。 一个 HTTP 请求 / 响应可能由多个数据部分组成,FullHttpRequest 和 FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息( FullHttpRequest 、 LastHttpContent 等等)都实现了 HttpObject 接口。 HttpRequestEncoder 将 HttpRequest 、 HttpContent 和 LastHttpContent 消息编码为字节 HttpResponseEncoder 将 HttpResponse 、 HttpContent 和 LastHttpContent 消息编码为字节 HttpRequestDecoder 将字节解码为 HttpRequest 、 HttpContent 和 LastHttpContent 消息 HttpResponseDecoder 将字节解码为 HttpResponse 、 HttpContent 和 LastHttpContent 消息 HttpClientCodec 和HttpServerCodec 则将请求和响应做了一个组合。 HttpClientCodec 客户端类: public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder> implements SourceCodec { private final Queue<HttpMethod> queue; private final boolean parseHttpAfterConnectRequest; private boolean done; private final AtomicLong requestResponseCounter; private final boolean failOnMissingResponse; public HttpClientCodec() { this(4096, 8192, 8192, false); } public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false); } public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) { this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, true); } public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse, boolean validateHeaders) { this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders, false); } public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse, boolean validateHeaders, boolean parseHttpAfterConnectRequest) { this.queue = new ArrayDeque(); this.requestResponseCounter = new AtomicLong(); this.init(new HttpClientCodec.Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new HttpClientCodec.Encoder()); this.failOnMissingResponse = failOnMissingResponse; this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest; } public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse, boolean validateHeaders, int initialBufferSize) { this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders, initialBufferSize, false); } public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse, boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest) { this.queue = new ArrayDeque(); this.requestResponseCounter = new AtomicLong(); this.init(new HttpClientCodec.Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize), new HttpClientCodec.Encoder()); this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest; this.failOnMissingResponse = failOnMissingResponse; }}HttpServerCodec 服务类: public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder> implements SourceCodec { private final Queue<HttpMethod> queue; public HttpServerCodec() { this(4096, 8192, 8192); } public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { this.queue = new ArrayDeque(); this.init(new HttpServerCodec.HttpServerRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new HttpServerCodec.HttpServerResponseEncoder()); } public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders) { this.queue = new ArrayDeque(); this.init(new HttpServerCodec.HttpServerRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new HttpServerCodec.HttpServerResponseEncoder()); } public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders, int initialBufferSize) { this.queue = new ArrayDeque(); this.init(new HttpServerCodec.HttpServerRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize), new HttpServerCodec.HttpServerResponseEncoder()); }}2、聚合 HTTP 消息 由于 HTTP 的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty 提供了一个聚合器 HttpObjectAggregator ,它可以将多个消息部分合并为 FullHttpRequest 或者 FullHttpResponse 消息。通过这样的方式,你将总是看到完整的消息内容。 3、HTTP 压缩 当使用 HTTP 时,建议开启压缩功能以尽可能多地减小传输数据的大小。虽然压缩会带 来一些 CPU 时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来说。Netty 为压缩和解压缩提供了 ChannelHandler 实现,它们同时支持 gzip 和 deflate 编码。 4、使用 HTTPS 启用 HTTPS 只需要将 SslHandler 添加到 ChannelPipeline 的 ChannelHandler 组合中。SSL 和 HTTP 的代码参见模块 netty-http 视频中实现步骤: 1) 、首先实现 Http 服务器并浏览器访问; public class HttpServer { public static final int port = 6789; //设置服务端端口 private static EventLoopGroup group = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接 private static ServerBootstrap b = new ServerBootstrap(); private static final boolean SSL = false;/*是否开启SSL模式*/ /** * Netty创建全部都是实现自AbstractBootstrap。 * 客户端的是Bootstrap,服务端的则是ServerBootstrap。 **/ public static void main(String[] args) throws Exception { final SslContext sslCtx; if(SSL){ SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); }else{ sslCtx = null; } try { b.group(group); b.channel(NioServerSocketChannel.class); b.childHandler(new ServerHandlerInit(sslCtx)); //设置过滤器 // 服务器绑定端口监听 ChannelFuture f = b.bind(port).sync(); System.out.println("服务端启动成功,端口是:"+port); // 监听服务器关闭监听 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程 } }}业务操作: public class ServerHandlerInit extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public ServerHandlerInit(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline ph = ch.pipeline(); if(sslCtx!=null){ ph.addLast(sslCtx.newHandler(ch.alloc())); } /*把应答报文 编码*/ ph.addLast("encoder",new HttpResponseEncoder()); /*把请求报文 解码*/ ph.addLast("decoder",new HttpRequestDecoder()); /*聚合http为一个完整的报文*/ ph.addLast("aggregator", new HttpObjectAggregator(10*1024*1024)); /*把应答报文 压缩,非必要*/ ph.addLast("compressor",new HttpContentCompressor()); ph.addLast(new BusiHandler()); }}业务返回: public class BusiHandler extends ChannelInboundHandlerAdapter { /** * 发送的返回值 * @param ctx 返回 * @param context 消息 * @param status 状态 */ private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1,status, Unpooled.copiedBuffer(context,CharsetUtil.UTF_8) ); response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String result=""; FullHttpRequest httpRequest = (FullHttpRequest)msg; System.out.println(httpRequest.headers()); try{ //获取路径 String path=httpRequest.uri(); //获取body String body = httpRequest.content().toString(CharsetUtil.UTF_8); //获取请求方法 HttpMethod method=httpRequest.method(); System.out.println("接收到:"+method+" 请求"); //如果不是这个路径,就直接返回错误 if(!"/test".equalsIgnoreCase(path)){ result="非法请求!"+path; send(ctx,result,HttpResponseStatus.BAD_REQUEST); return; } //如果是GET请求 if(HttpMethod.GET.equals(method)){ //接受到的消息,做业务逻辑处理... System.out.println("body:"+body); result="GET请求,应答:"+RespConstant.getNews(); send(ctx,result,HttpResponseStatus.OK); return; } //如果是其他类型请求,如post if(HttpMethod.POST.equals(method)){ //接受到的消息,做业务逻辑处理... //.... return; } }catch(Exception e){ System.out.println("处理请求失败!"); e.printStackTrace(); }finally{ //释放请求 httpRequest.release(); } } /* * 建立连接时,返回消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress()); }}字典数据类: public class RespConstant { private static final String[] NEWS = { "她那时候还太年轻,不知道所有命运赠送的礼物,早已在暗中标好了价格。——斯蒂芬·茨威格《断头皇后》", "这是一个最好的时代,也是一个最坏的时代;这是一个智慧的年代,这是一个愚蠢的年代;\n" + "这是一个信任的时期,这是一个怀疑的时期;这是一个光明的季节,这是一个黑暗的季节;\n" + "这是希望之春,这是失望之冬;人们面前应有尽有,人们面前一无所有;\n" + "人们正踏上天堂之路,人们正走向地狱之门。 —— 狄更斯《双城记》", }; private static final Random R = new Random(); public static String getNews(){ return NEWS[R.nextInt(NEWS.length)]; }}期待服务浏览器访问: 正常访问: 2)、实现客户端并访问。 public class HttpClient { public static final String HOST = "127.0.0.1"; private static final boolean SSL = false; public void connect(String host, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HttpClientCodec()); /*聚合http为一个完整的报文*/ ch.pipeline().addLast("aggregator", new HttpObjectAggregator(10*1024*1024)); /*解压缩*/ ch.pipeline().addLast("decompressor", new HttpContentDecompressor()); ch.pipeline().addLast(new HttpClientInboundHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { HttpClient client = new HttpClient(); client.connect("127.0.0.1", HttpServer.port); }}客户端业务操作: public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpResponse httpResponse = (FullHttpResponse)msg; System.out.println(httpResponse.status()); System.out.println(httpResponse.headers()); ByteBuf buf = httpResponse.content(); System.out.println(buf.toString(CharsetUtil.UTF_8)); httpResponse.release(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { URI uri = new URI("/test"); String msg = "Hello"; DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8"))); // 构建http请求 request.headers().set(HttpHeaderNames.HOST, HttpClient.HOST); request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); // 发送http请求 ctx.writeAndFlush(request); }}执行启动客户端: 5、空闲的连接和超时 检测空闲连接以及超时对于及时释放资源来说是至关重要的。由于这是一项常见的任务, Netty 特地为它提供了几个 ChannelHandler 实现。 IdleStateHandler 当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过在你的 ChannelInboundHandler 中重写 userEventTriggered() 方法来处理该 IdleStateEvent 事件。 ReadTimeoutHandler 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个 Read-TimeoutException 并关闭对应的 Channel 。可以通过重写你的 ChannelHandler 中的exceptionCaught()方法来检测该 Read-TimeoutException 。 WriteTimeoutHandler 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个Write-TimeoutException 并关闭对应的 Channel 。可以通过重写你的 ChannelHandler 的 exceptionCaught()方法检测该 WriteTimeout-Exception 。 Netty编解码器框架分析到此结束,更具体的使用会在 netty 后期 实战项目中看到,敬请期待!
相关 netty编解码器与序列化框架分析 netty编解码器分析 编码(Encode)也称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。 反之,解码(D 分手后的思念是犯贱/ 2022年12月20日 03:30/ 0 赞/ 221 阅读
相关 Netty 编解码器 Netty 编解码 在网络中数据是以字节码二进制的形式传输的,所以我们在使用 Netty 传输数据的时候,需要将我们传输的数据转换为 二进制的形式 进行传输,所以不管是我 梦里梦外;/ 2022年11月05日 06:24/ 0 赞/ 324 阅读
相关 Netty编解码器框架(十一) 今天分享Netty编解码器框架 一、什么是编解码器 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何 将其和目标应用程序的数据格式做相互转换。 Myth丶恋晨/ 2022年10月29日 12:29/ 0 赞/ 356 阅读
相关 Netty出入站机制与编解码器 从Socket读取数据到客户端/服务器为入站操作,入站操作管道中事件传播顺序是从前往后;将客户端/服务器数据写入到Socket为出站操作,出站操作管道中事件传播 刺骨的言语ヽ痛彻心扉/ 2022年09月09日 00:23/ 0 赞/ 339 阅读
相关 netty的编解码器介绍 本blog主要介绍: 1. Codec 编解码器 2. Decoder 解码器 3. Encoder 编码器 netty提供了强大的编解码器框架,使得我们编写自定 迷南。/ 2022年07月11日 04:19/ 0 赞/ 353 阅读
相关 Netty(编解码器框架) 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器 由编码器和解 古城微笑少年丶/ 2022年04月17日 06:07/ 0 赞/ 423 阅读
相关 Netty(预置的ChannelHandler和编解码器) 通过SSL/TLS保护Netty应用程序 为了支持SSL/TLS,Java提供了javax.net.ssl包,它的SSLContext和SSLEngine类使得实 港控/mmm°/ 2022年04月16日 02:27/ 0 赞/ 508 阅读
相关 Netty详解(七):Netty 编解码以及消息头编解码器 1. MessagePack 概述 MessagePack是一个高效的二进制序列化框架,像JSON一样支持不同语言间的数据交换,速度更快,序列化之后的码流更小。 Mes 一时失言乱红尘/ 2022年01月29日 13:01/ 0 赞/ 589 阅读
相关 Netty——编解码器 什么是编解码器 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和 目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器 比眉伴天荒/ 2021年08月30日 23:41/ 0 赞/ 646 阅读
相关 Netty——预置的ChannelHandler和编解码器(一) 预置的ChannelHandler和编解码器 Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了你在那些相 当繁琐的事务上本来会花费的时间与精力 桃扇骨/ 2021年08月30日 20:04/ 0 赞/ 518 阅读
还没有评论,来说两句吧...