Netty实现websocket示例

谁借莪1个温暖的怀抱¢ 2022-03-12 10:08 696阅读 0赞

这里构建一个websocket的服务端,客户端利用html页面来模拟,网上有很多netty实现websocket的示例,但是都不完整,也没有演示效果,这里给出一个完整的,既有服务端的实现,也有页面模拟客户端聊天功能的实现。

工程结构:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2ZlaW5pZmk_size_16_color_FFFFFF_t_70

pom.xml


io.netty
netty-all
4.1.27.Final

NettyConfig.java

package com.xxx.netty.websocket;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class NettyConfig {
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

WebsocketChannelHandler.java

  1. package com.xxx.netty.websocket;
  2. import java.util.Date;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelFutureListener;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.SimpleChannelInboundHandler;
  9. import io.netty.handler.codec.http.*;
  10. import io.netty.handler.codec.http.websocketx.*;
  11. import io.netty.util.CharsetUtil;
  12. public class WebSocketChannelHandler extends SimpleChannelInboundHandler<Object> {
  13. private WebSocketServerHandshaker handshaker;
  14. private static final String web_socket_url="ws://127.0.0.1:8888/websocket";
  15. @Override
  16. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  17. if(msg instanceof FullHttpMessage) {
  18. handleHttpRequest(ctx,(FullHttpRequest)msg);
  19. }else if(msg instanceof WebSocketFrame) {
  20. handleWebSocketFrame(ctx,(WebSocketFrame)msg);
  21. }
  22. }
  23. private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
  24. if(frame instanceof CloseWebSocketFrame) {
  25. handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
  26. }
  27. if(frame instanceof PingWebSocketFrame) {
  28. ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
  29. return;
  30. }
  31. if(!(frame instanceof TextWebSocketFrame)) {
  32. System.out.println("目前我们不支持二进制消息");
  33. new RuntimeException("["+this.getClass().getName()+"]不支持消息");
  34. }
  35. String request = ((TextWebSocketFrame)frame).text();
  36. System.out.println("服务端接收到消息==>"+request);
  37. TextWebSocketFrame res = new TextWebSocketFrame(new Date().toString()+"-"+ctx.channel().id()+"===>"+request);
  38. NettyConfig.group.writeAndFlush(res);
  39. }
  40. private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
  41. if(!request.decoderResult().isSuccess()||!"websocket".equals(request.headers().get("Upgrade"))) {
  42. sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
  43. return;
  44. }
  45. WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(web_socket_url, null, false);
  46. handshaker = factory.newHandshaker(request);
  47. if(handshaker==null) {
  48. WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
  49. }else {
  50. handshaker.handshake(ctx.channel(), request);
  51. }
  52. }
  53. private void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest request,DefaultFullHttpResponse response) {
  54. if(response.status().code()!=200) {
  55. ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(),CharsetUtil.UTF_8);
  56. response.content().writeBytes(buf);
  57. buf.release();
  58. }
  59. ChannelFuture future = ctx.channel().writeAndFlush(response);
  60. if(response.status().code()!=200) {
  61. future.addListener(ChannelFutureListener.CLOSE);
  62. }
  63. }
  64. @Override
  65. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  66. System.out.println("客户端与服务端建立连接");
  67. NettyConfig.group.add(ctx.channel());
  68. }
  69. @Override
  70. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  71. System.out.println("客户端与服务端断开连接");
  72. NettyConfig.group.remove(ctx.channel());
  73. }
  74. @Override
  75. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  76. ctx.flush();
  77. }
  78. }

WebsocketChildChannelHandler.java

package com.xxx.netty.websocket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketChildChannelHandler extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(“http-codec”,new HttpServerCodec());
ch.pipeline().addLast(“aggregator”,new HttpObjectAggregator(65536));
ch.pipeline().addLast(“http-chunked”,new ChunkedWriteHandler());
ch.pipeline().addLast(“handler”,new WebSocketChannelHandler());
}
}

Main.java

package com.xxx.netty.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketChildChannelHandler());
System.out.println(“server running and waiting for client to connect.”);
Channel channel = bootstrap.bind(8888).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

index.html

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>WebSocket客户端</title>
  6. <script type="text/javascript">
  7. var socket;
  8. if(!window.WebSocket){
  9. window.WebSocket = window.MozWebSocket;
  10. }
  11. if(window.WebSocket){
  12. socket = new WebSocket("ws://127.0.0.1:8888/websocket");
  13. socket.onmessage = function(event){
  14. var ta = document.getElementById("messageContent");
  15. ta.value +=event.data+"\r\n";
  16. }
  17. socket.onopen = function(event){
  18. var ta = document.getElementById("messageContent");
  19. ta.value = "您当前的浏览器支持WebSocket,请进行后续操作\r\n";
  20. }
  21. socket.onclose =function (event) {
  22. var ta = document.getElementById("messageContent");
  23. ta.value = "";
  24. ta.value = "WebSocket连接已经关闭\r\n";
  25. }
  26. }else{
  27. alert("您的浏览器不支持WebSocket.")
  28. }
  29. function send(message){
  30. if(!window.WebSocket){
  31. return;
  32. }
  33. if(socket.readyState==WebSocket.OPEN){
  34. socket.send(message);
  35. }else{
  36. alert("WebSocket连接未成功。")
  37. }
  38. }
  39. </script>
  40. </head>
  41. <body>
  42. <form onsubmit="return false;">
  43. <input type="text" name="message"/>
  44. <input type="button" value="发送消息" onclick="send(this.form.message.value)"/>
  45. <hr color="red"/>
  46. <textarea style="width:1024px;height:300px;" id="messageContent"></textarea>
  47. </form>
  48. </body>
  49. </html>

前端页面:

20190305183222731.gif

服务端打印信息:

2019030518303961.png

这里使用的是netty-all-4.1.27.Final版本,其中handler需要实现的方法是channelRead0(ctx,msg);如果使用的是netty-all-5.0.0.Alpha1版本,方法变为messageReceived(ctx,msg)。

发表评论

表情:
评论列表 (有 0 条评论,696人围观)

还没有评论,来说两句吧...

相关阅读