《Netty源码学习 一:pipleLine、FrameDecoder、Reactor多线程模型思想》

我就是我 2022-04-23 15:58 329阅读 0赞

《Netty源码学习 一: pipleLine、FrameDecoder、Reactor多线程模型思想》

  • Netty入门代码例子
  • Netty中的消息如何在管道pipeline中流转
  • 如何理解Netty的编解码器FrameDecoder
  • 从NIO Reactor多线程模型理解Netty Reactor模型

Netty入门代码例子

服务端:Server

  1. import java.net.InetSocketAddress;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import org.jboss.netty.bootstrap.ServerBootstrap;
  5. import org.jboss.netty.channel.ChannelPipeline;
  6. import org.jboss.netty.channel.ChannelPipelineFactory;
  7. import org.jboss.netty.channel.Channels;
  8. import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
  9. import org.jboss.netty.handler.codec.string.StringDecoder;
  10. import org.jboss.netty.handler.codec.string.StringEncoder;
  11. public class Server {
  12. public static void main(String[] args) {
  13. //服务类
  14. ServerBootstrap bootstrap = new ServerBootstrap();
  15. //boss线程监听端口,worker线程负责数据读写
  16. ExecutorService boss = Executors.newCachedThreadPool();
  17. ExecutorService worker = Executors.newCachedThreadPool();
  18. //设置niosocket工厂
  19. bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
  20. //设置管道的工厂
  21. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  22. @Override
  23. public ChannelPipeline getPipeline() throws Exception {
  24. ChannelPipeline pipeline = Channels.pipeline();
  25. pipeline.addLast("decoder", new StringDecoder());
  26. pipeline.addLast("encoder", new StringEncoder());
  27. pipeline.addLast("helloHandler", new HelloHandler());
  28. return pipeline;
  29. }
  30. });
  31. bootstrap.bind(new InetSocketAddress(10101));
  32. System.out.println("start!!!");
  33. }
  34. }

  HelloHandler 继承了SimpleChannelHandler,SimpleChannelHandler用于处理消息接收和写的handler类的父类。为了messageReceived、exceptionCaught、channelConnected、channelDisconnected、channelClosed方法能运行效果更加明显,增加输出语句。

  1. import org.jboss.netty.channel.ChannelHandlerContext;
  2. import org.jboss.netty.channel.ChannelStateEvent;
  3. import org.jboss.netty.channel.ExceptionEvent;
  4. import org.jboss.netty.channel.MessageEvent;
  5. import org.jboss.netty.channel.SimpleChannelHandler;
  6. public class HelloHandler extends SimpleChannelHandler {
  7. /** * 接收消息 */
  8. @Override
  9. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  10. //由于服务端管道增加了编码解码器,所以接收后的数据处理起来可以直接转换
  11. String s = (String) e.getMessage();
  12. System.out.println(s);
  13. //回写数据
  14. ctx.getChannel().write("hi");
  15. super.messageReceived(ctx, e);
  16. }
  17. /** * messageReceived执行出异常,会执行该方法 捕获异常 */
  18. @Override
  19. public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
  20. System.out.println("exceptionCaught");
  21. super.exceptionCaught(ctx, e);
  22. }
  23. /** * 新连接 */
  24. @Override
  25. public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  26. System.out.println("channelConnected");
  27. super.channelConnected(ctx, e);
  28. }
  29. /** * 必须是链接已经建立,关闭通道的时候才会触发 */
  30. @Override
  31. public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  32. System.out.println("channelDisconnected");
  33. super.channelDisconnected(ctx, e);
  34. }
  35. /** * channel关闭的时候触发 */
  36. @Override
  37. public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  38. System.out.println("channelClosed");
  39. super.channelClosed(ctx, e);
  40. }
  41. }

客户端:

  1. import java.net.InetSocketAddress;
  2. import java.util.Scanner;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import org.jboss.netty.bootstrap.ClientBootstrap;
  6. import org.jboss.netty.channel.Channel;
  7. import org.jboss.netty.channel.ChannelFuture;
  8. import org.jboss.netty.channel.ChannelPipeline;
  9. import org.jboss.netty.channel.ChannelPipelineFactory;
  10. import org.jboss.netty.channel.Channels;
  11. import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
  12. import org.jboss.netty.handler.codec.string.StringDecoder;
  13. import org.jboss.netty.handler.codec.string.StringEncoder;
  14. public class Client {
  15. public static void main(String[] args) {
  16. //服务类
  17. ClientBootstrap bootstrap = new ClientBootstrap();
  18. //线程池
  19. ExecutorService boss = Executors.newCachedThreadPool();
  20. ExecutorService worker = Executors.newCachedThreadPool();
  21. //socket工厂
  22. bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
  23. //管道工厂
  24. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  25. @Override
  26. public ChannelPipeline getPipeline() throws Exception {
  27. ChannelPipeline pipeline = Channels.pipeline();
  28. pipeline.addLast("decoder", new StringDecoder());
  29. pipeline.addLast("encoder", new StringEncoder());
  30. pipeline.addLast("hiHandler", new HiHandler());
  31. return pipeline;
  32. }
  33. });
  34. //连接服务端
  35. ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 10101));
  36. Channel channel = connect.getChannel();
  37. System.out.println("client start");
  38. Scanner scanner = new Scanner(System.in);
  39. while(true){
  40. System.out.println("请输入");
  41. channel.write(scanner.next());
  42. }
  43. }
  44. }

  客户端也要有一个handler,继承SimpleChannelHandler 类。

  1. import org.jboss.netty.channel.ChannelHandlerContext;
  2. import org.jboss.netty.channel.ChannelStateEvent;
  3. import org.jboss.netty.channel.ExceptionEvent;
  4. import org.jboss.netty.channel.MessageEvent;
  5. import org.jboss.netty.channel.SimpleChannelHandler;
  6. public class HiHandler extends SimpleChannelHandler {
  7. /** * 接收消息 */
  8. @Override
  9. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  10. String s = (String) e.getMessage();
  11. System.out.println(s);
  12. super.messageReceived(ctx, e);
  13. }
  14. /** * 捕获异常 */
  15. @Override
  16. public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
  17. System.out.println("exceptionCaught");
  18. super.exceptionCaught(ctx, e);
  19. }
  20. /** * 新连接 */
  21. @Override
  22. public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  23. System.out.println("channelConnected");
  24. super.channelConnected(ctx, e);
  25. }
  26. /** * 必须是链接已经建立,关闭通道的时候才会触发 */
  27. @Override
  28. public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  29. System.out.println("channelDisconnected");
  30. super.channelDisconnected(ctx, e);
  31. }
  32. /** * channel关闭的时候触发 */
  33. @Override
  34. public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  35. System.out.println("channelClosed");
  36. super.channelClosed(ctx, e);
  37. }
  38. }

运行结果:
当运行服务端时,服务端时:
在这里插入图片描述
  客户端的hihandler和服务端的hellohandler都会执行channelConnected方法。
在这里插入图片描述
  客户端输入hello,服务端接收到消息,执行messageReceived函数,并返回hi。
在这里插入图片描述
  当客户端突然断开,服务端会执行messageReceived,但是无法读取数据,抛出异常,messageReceived抛出异常后,会被exceptionCaught函数捕获,因此输出exceptionCaught。然后依次执行channelDisconnected、channelClosed方法。注意:channelDisconnected与channelClosed的区别?channelDisconnected只有在连接建立后断开才会调用。而channelClosed无论连接是否成功都会调用关闭资源。

在这里插入图片描述
在继承这个SimpleChannelHandler类后,通常下面的三个方法分别有各自用处
{
   messageReceived接收消息、发送返回
   channelConnected新连接,通常用来检测IP是否是黑名单
   channelDisconnected链接关闭,可以再用户断线的时候清楚用户的缓存数据等
}

Netty中的消息如何在管道pipeline中流转

  在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。在Netty中,ChannelEvent是数据或者状态的载体,例如传输的数据对应MessageEvent,状态的改变对应ChannelStateEvent。当对Channel进行操作时,会产生一个ChannelEvent,并发送到ChannelPipeline。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理之后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler
          在这里插入图片描述
  一个数据最开始是一个MessageEvent,它附带了一个未解码的原始二进制消息ChannelBuffer,然后某个Handler将其解码成了一个数据对象,并生成了一个新的MessageEvent,并传递给下一步进行处理。
  Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。ChannelPipeline接口包含了两个重要的方法:sendUpstream(ChannelEvent e)和sendDownstream(ChannelEvent e),就分别对应了Upstream和Downstream。
  对应的,ChannelPipeline里包含的ChannelHandler也包含两类:ChannelUpstreamHandler和ChannelDownstreamHandler。每条线路的Handler是互相独立的。它们都很简单的只包含一个方法:ChannelUpstreamHandler.handleUpstream和ChannelDownstreamHandler.handleDownstream。
  Netty官方的javadoc里有一张图(ChannelPipeline接口里):
          在这里插入图片描述
  Downstream中有特殊的ChannelSink,ChannelSink包含一个重要方法ChannelSink.eventSunk,可以接受任意ChannelEvent。“sink”的意思是”下沉”,那么”ChannelSink”好像可以理解为”Channel下沉的地方”?实际上,它的作用确实是这样,也可以换个说法:“处于末尾的万能Handler”。
  需要注意的是在一条“流”里,一个ChannelEvent并不会主动的”流”经所有的Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream产生,并交给下一个Handler处理。也就是说,每个Handler接收到一个ChannelEvent,并处理结束后,如果需要继续处理,那么它需要调用sendUp(Down)stream新发起一个事件。如果它不再发起事件,那么处理就到此结束,即使它后面仍然有Handler没有执行。这个机制可以保证最大的灵活性,当然对Handler的先后顺序也有要求。
例子:server:

  1. import java.net.InetSocketAddress;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import org.jboss.netty.bootstrap.ServerBootstrap;
  5. import org.jboss.netty.channel.ChannelPipeline;
  6. import org.jboss.netty.channel.ChannelPipelineFactory;
  7. import org.jboss.netty.channel.Channels;
  8. import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
  9. import org.jboss.netty.handler.codec.string.StringDecoder;
  10. import org.jboss.netty.handler.codec.string.StringEncoder;
  11. public class Server {
  12. public static void main(String[] args) {
  13. //服务类
  14. ServerBootstrap bootstrap = new ServerBootstrap();
  15. //boss线程监听端口,worker线程负责数据读写
  16. ExecutorService boss = Executors.newCachedThreadPool();
  17. ExecutorService worker = Executors.newCachedThreadPool();
  18. //设置niosocket工厂
  19. bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
  20. //设置管道的工厂
  21. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  22. @Override
  23. public ChannelPipeline getPipeline() throws Exception {
  24. ChannelPipeline pipeline = Channels.pipeline();
  25. pipeline.addLast("handler1", new MyHandler1());
  26. pipeline.addLast("handler2", new MyHandler2());
  27. return pipeline;
  28. }
  29. });
  30. bootstrap.bind(new InetSocketAddress(30000));
  31. System.out.println("start!!!");
  32. }
  33. }

handler1接收消息后利用ChannelHandlerContext 转发消息到下一个handler:

  1. import org.jboss.netty.buffer.ChannelBuffer;
  2. import org.jboss.netty.channel.ChannelHandlerContext;
  3. import org.jboss.netty.channel.MessageEvent;
  4. import org.jboss.netty.channel.SimpleChannelHandler;
  5. import org.jboss.netty.channel.UpstreamMessageEvent;
  6. public class MyHandler1 extends SimpleChannelHandler {
  7. @Override
  8. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  9. ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
  10. byte[] array = buffer.array();
  11. String message = new String(array);
  12. System.out.println("handler1:" + message);
  13. //传递
  14. ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), "abc", e.getRemoteAddress()));
  15. ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), "efg", e.getRemoteAddress()));
  16. }
  17. }

handler2:将handler1中转发的消息读出来

  1. import org.jboss.netty.channel.ChannelHandlerContext;
  2. import org.jboss.netty.channel.MessageEvent;
  3. import org.jboss.netty.channel.SimpleChannelHandler;
  4. public class MyHandler2 extends SimpleChannelHandler {
  5. @Override
  6. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  7. String message = (String)e.getMessage();
  8. System.out.println("handler2:" + message);
  9. }
  10. }

client:

  1. import java.net.Socket;
  2. public class Client {
  3. public static void main(String[] args) throws Exception {
  4. Socket socket = new Socket("127.0.0.1", 30000);
  5. socket.getOutputStream().write("hello".getBytes());
  6. socket.close();
  7. }
  8. }

运行结果:
              在这里插入图片描述
  最终需要注意的是:UpstreamHandler是从前往后执行的,DownstreamHandler是从后往前执行的。在Servetr中配置ChannelPipeline添加handler时需要注意顺序了。若将上诉Server中的handler1、handler2添加顺序变换,hander2中不发送sendUpstream(),那么只会在hander2中接收一次消息。

如何理解Netty的编解码器FrameDecoder

  FrameDecoder是继承SimpleChannelUpstreamHandler的,FrameDecoder非常特殊的是,在执行FrameDecoder或者其子类的messageReceived方法,return的对象就是间接调用sendUpstream往下传递的对象。看一下FrameDecoder中的messageReceived的实现:

  1. @Override
  2. public void messageReceived(
  3. ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  4. Object m = e.getMessage();
  5. if (!(m instanceof ChannelBuffer)) {
  6. ctx.sendUpstream(e);
  7. return;
  8. }
  9. ChannelBuffer input = (ChannelBuffer) m;
  10. if (!input.readable()) {
  11. return;
  12. }
  13. ChannelBuffer cumulation = cumulation(ctx);
  14. if (cumulation.readable()) {
  15. cumulation.discardReadBytes();
  16. cumulation.writeBytes(input);
  17. callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
  18. } else {
  19. callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
  20. if (input.readable()) {
  21. cumulation.writeBytes(input);
  22. }
  23. }
  24. }

  当buffer里面数据未被读取完怎么办?Netty的messageReceived方法中有一个cumulation缓存,cumulation实际上也是一个ChannelBuffer,如果cumulation可读,cumulation.discardReadBytes函数的作用是将0到readIndex之间的空间释放掉,将readIndex和writeIndex都重新标记一下。然后将读到的数据写到buffer里面。如果cumulation不可读,在调callDecode,如果发现从不可读状态到可读状态,则将读到的数据写到缓存区里面。
cumulation的实现:

  1. private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
  2. ChannelBuffer c = cumulation;
  3. if (c == null) {
  4. c = ChannelBuffers.dynamicBuffer(
  5. ctx.getChannel().getConfig().getBufferFactory());
  6. cumulation = c;
  7. }
  8. return c;
  9. }

callDecode的实现:

  1. private void callDecode(
  2. ChannelHandlerContext context, Channel channel,
  3. ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
  4. while (cumulation.readable()) {
  5. int oldReaderIndex = cumulation.readerIndex();
  6. Object frame = decode(context, channel, cumulation);
  7. if (frame == null) {
  8. if (oldReaderIndex == cumulation.readerIndex()) {
  9. // Seems like more data is required.
  10. // Let us wait for the next notification.
  11. break;
  12. } else {
  13. // Previous data has been discarded.
  14. // Probably it is reading on.
  15. continue;
  16. }
  17. } else if (oldReaderIndex == cumulation.readerIndex()) {
  18. throw new IllegalStateException(
  19. "decode() method must read at least one byte " +
  20. "if it returned a frame (caused by: " + getClass() + ")");
  21. }
  22. unfoldAndFireMessageReceived(context, remoteAddress, frame);
  23. }
  24. if (!cumulation.readable()) {
  25. this.cumulation = null;
  26. }
  27. }

  上面是一个循环,首先将读指针备份一下,decode方法是交个子类实现的一个抽象方这个用来实现具体数据分帧的算法,从这个里面看到如果子类没有读到一帧数据,则返回null所以下面有一个判断,是一点数据没有读呢,还是读了一点,如果一点都没有读,就不需要再检测了等下一次messageRecieved进行通知,如果发现读了一点数据,就调用下一次分帧。如果读了一帧数据就发送一个通知,unfold是针对读到的循环数据要不要打开的意思。到最后如果发现不是可读状态,cumulation将会被设置成null。

从NIO Reactor多线程模型理解Netty Reactor模型

  NIO Reactor多线程模型与Netty实现的思想类似,用一个ServerSocketChannel注册OP_ACCEPT事件到一个Selector,这个Selector负责将这个ServerSocketChannel接收的所有SocketChannel都注册OP_READ到另一个Selector上,并且将一个SocktChannel封装在线程池的任务中
        在这里插入图片描述
  我们把一个餐厅看做是一个服务器,客人相当于是客户端,大门就是ServerSocketChannel,服务员是一个包含selector的线程。NIO多线程模型就是有多个服务员(多个包含selector的线程)的餐厅。首先我们用一个包含Selector的线程充当门口迎客的服务员,将一个ServerSocketChannel注册OP_ACCEPT事件到这个Selector上(相当于让这个服务员看指定的这扇门),当门口迎客的服务员接收到从这个大门进来的客人,即把这个SocketChannel注册到其他大厅里的服务员上,并且注册OP_READ事件。客户端将自身的SocketChannel也注册成OP_READ,当客户端有需求时(相当于某个区的客人有需求时),这个客人所在的服务员会中select()方法返回,并且能从SelectionKey中找到这个客人发送的数据,并进行处理。
  为了防止同时可能有多个迎客服务员给大厅服务员同时注册SocketChannel OP_READ事件,大厅服务员自身有一个并发安全的任务队列,因此迎客服务员给大厅服务员注册SocketChannel的OP_READ事件时,是采用了提交一个Runnanble的task到大厅服务员的任务队列中。
  一个大厅里的服务员是如何工作的呢?大厅里的服务员上注册了许多有OP_READ事件的SocketChannel,大厅的服务员本身就是一个线程,他在run方法中不断循环的用select(500)非阻塞的方法查看这些SocketChannel是否有IO操作发送、然后看是否自身的任务队列中是否有需要注册的任务。
   在这里插入图片描述

  在本例中,迎客的服务员怕给大厅的服务员注册SocketChannel采用提交Runnable 任务的方式,大厅服务员自身维护一个任务队列,存放着还没注册OP_READ事件的任务,大厅服务员始终不断的循环在处理Selector中是否有需要处理的IO操作,是否有迎客服务员提交的需要注册OP_READ事件的任务。
  一个简易版本的仿Netty服务器端代码如下:
  AbstractNioSelector:是门口迎客服务员、大厅服务员的抽象类,同时继承了Runnable接口,并且有一个Executor线程池。子类NioServerBoss(视为迎客服务员)、NioServerWorker(视为大厅服务员)新建一个对象时,就会调用父类的构造器,AbstractNioSelector构造器中有 openSelector();方法,openSelector();方法中又执行 executor.execute(this);,即创建一个子类服务员时,子类服务员就是一个线程,并且已经在线程池中启动。所有的服务员都会不断的停留在

  1. wakenUp.set(false);
  2. select(selector);
  3. processTaskQueue();
  4. process(selector);

中,wakeUp此处是用于在给Selector注册SocketChannel任务时,标识Selector是否占用。

  1. import java.io.IOException;
  2. import java.nio.channels.Selector;
  3. import java.util.Queue;
  4. import java.util.concurrent.ConcurrentLinkedQueue;
  5. import java.util.concurrent.Executor;
  6. import java.util.concurrent.atomic.AtomicBoolean;
  7. import com.cn.pool.NioSelectorRunnablePool;
  8. public abstract class AbstractNioSelector implements Runnable {
  9. /** * 线程池 */
  10. private final Executor executor;
  11. /** * 选择器 */
  12. protected Selector selector;
  13. /** * 选择器wakenUp状态标记 */
  14. protected final AtomicBoolean wakenUp = new AtomicBoolean();
  15. /** * 任务队列 */
  16. private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
  17. /** * 线程名称 */
  18. private String threadName;
  19. /** * 线程管理对象 */
  20. protected NioSelectorRunnablePool selectorRunnablePool;
  21. AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
  22. this.executor = executor;
  23. this.threadName = threadName;
  24. this.selectorRunnablePool = selectorRunnablePool;
  25. openSelector();
  26. }
  27. /** * 获取selector并启动线程 */
  28. private void openSelector() {
  29. try {
  30. this.selector = Selector.open();
  31. } catch (IOException e) {
  32. throw new RuntimeException("Failed to create a selector.");
  33. }
  34. executor.execute(this);
  35. }
  36. @Override
  37. public void run() {
  38. Thread.currentThread().setName(this.threadName);
  39. while (true) {
  40. try {
  41. wakenUp.set(false);
  42. select(selector);
  43. processTaskQueue();
  44. process(selector);
  45. } catch (Exception e) {
  46. // ignore
  47. }
  48. }
  49. }
  50. /** * 注册一个任务并激活selector * * @param task */
  51. protected final void registerTask(Runnable task) {
  52. taskQueue.add(task);
  53. Selector selector = this.selector;
  54. if (selector != null) {
  55. if (wakenUp.compareAndSet(false, true)) {
  56. selector.wakeup();
  57. }
  58. } else {
  59. taskQueue.remove(task);
  60. }
  61. }
  62. /** * 执行队列里的任务 */
  63. private void processTaskQueue() {
  64. for (;;) {
  65. final Runnable task = taskQueue.poll();
  66. if (task == null) {
  67. break;
  68. }
  69. task.run();
  70. }
  71. }
  72. /** * 获取线程管理对象 * @return */
  73. public NioSelectorRunnablePool getSelectorRunnablePool() {
  74. return selectorRunnablePool;
  75. }
  76. /** * select抽象方法 * * @param selector * @return * @throws IOException */
  77. protected abstract int select(Selector selector) throws IOException;
  78. /** * selector的业务处理 * * @param selector * @throws IOException */
  79. protected abstract void process(Selector selector) throws IOException;
  80. }

  迎客服务员:与大厅服务员中的Selector是做不同的事情的,所以两者的process(Selector selector)方法有所不同。

  1. import java.io.IOException;
  2. import java.nio.channels.ClosedChannelException;
  3. import java.nio.channels.SelectionKey;
  4. import java.nio.channels.Selector;
  5. import java.nio.channels.ServerSocketChannel;
  6. import java.nio.channels.SocketChannel;
  7. import java.util.Iterator;
  8. import java.util.Set;
  9. import java.util.concurrent.Executor;
  10. import com.cn.pool.Boss;
  11. import com.cn.pool.NioSelectorRunnablePool;
  12. import com.cn.pool.Worker;
  13. public class NioServerBoss extends AbstractNioSelector implements Boss{
  14. public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
  15. super(executor, threadName, selectorRunnablePool);
  16. }
  17. @Override
  18. protected void process(Selector selector) throws IOException {
  19. Set<SelectionKey> selectedKeys = selector.selectedKeys();
  20. if (selectedKeys.isEmpty()) {
  21. return;
  22. }
  23. for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
  24. SelectionKey key = i.next();
  25. i.remove();
  26. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  27. // 新客户端
  28. SocketChannel channel = server.accept();
  29. // 设置为非阻塞
  30. channel.configureBlocking(false);
  31. // 获取一个worker
  32. Worker nextworker = getSelectorRunnablePool().nextWorker();
  33. // 注册新客户端接入任务
  34. nextworker.registerNewChannelTask(channel);
  35. System.out.println("新客户端链接");
  36. }
  37. }
  38. public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){
  39. final Selector selector = this.selector;
  40. registerTask(new Runnable() {
  41. @Override
  42. public void run() {
  43. try {
  44. //注册serverChannel到selector
  45. serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  46. } catch (ClosedChannelException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. });
  51. }
  52. @Override
  53. protected int select(Selector selector) throws IOException {
  54. return selector.select();
  55. }
  56. }

  大厅服务员:轮询该Selector上所有读操作,并读取数据

  1. import java.io.IOException;
  2. import java.nio.ByteBuffer;
  3. import java.nio.channels.ClosedChannelException;
  4. import java.nio.channels.SelectionKey;
  5. import java.nio.channels.Selector;
  6. import java.nio.channels.SocketChannel;
  7. import java.util.Iterator;
  8. import java.util.Set;
  9. import java.util.concurrent.Executor;
  10. import com.cn.pool.NioSelectorRunnablePool;
  11. import com.cn.pool.Worker;
  12. public class NioServerWorker extends AbstractNioSelector implements Worker{
  13. public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
  14. super(executor, threadName, selectorRunnablePool);
  15. }
  16. @Override
  17. protected void process(Selector selector) throws IOException {
  18. Set<SelectionKey> selectedKeys = selector.selectedKeys();
  19. if (selectedKeys.isEmpty()) {
  20. return;
  21. }
  22. Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
  23. while (ite.hasNext()) {
  24. SelectionKey key = (SelectionKey) ite.next();
  25. // 移除,防止重复处理
  26. ite.remove();
  27. // 得到事件发生的Socket通道
  28. SocketChannel channel = (SocketChannel) key.channel();
  29. // 数据总长度
  30. int ret = 0;
  31. boolean failure = true;
  32. ByteBuffer buffer = ByteBuffer.allocate(1024);
  33. //读取数据
  34. try {
  35. ret = channel.read(buffer);
  36. failure = false;
  37. } catch (Exception e) {
  38. // ignore
  39. }
  40. //判断是否连接已断开
  41. if (ret <= 0 || failure) {
  42. key.cancel();
  43. System.out.println("客户端断开连接");
  44. }else{
  45. System.out.println("收到数据:" + new String(buffer.array()));
  46. //回写数据
  47. ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
  48. channel.write(outBuffer);// 将消息回送给客户端
  49. }
  50. }
  51. }
  52. /** * 加入一个新的socket客户端 */
  53. public void registerNewChannelTask(final SocketChannel channel){
  54. final Selector selector = this.selector;
  55. registerTask(new Runnable() {
  56. @Override
  57. public void run() {
  58. try {
  59. //将客户端注册到selector中
  60. channel.register(selector, SelectionKey.OP_READ);
  61. } catch (ClosedChannelException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. });
  66. }
  67. @Override
  68. protected int select(Selector selector) throws IOException {
  69. return selector.select(500);
  70. }
  71. }

  辅助的类:绑定端口到ServerSocketChannel中,本例中,只用了一个ServerSocketChannel,一个迎客服务员。

  1. import java.net.SocketAddress;
  2. import java.nio.channels.ServerSocketChannel;
  3. import com.cn.pool.Boss;
  4. import com.cn.pool.NioSelectorRunnablePool;
  5. public class ServerBootstrap {
  6. private NioSelectorRunnablePool selectorRunnablePool;
  7. public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {
  8. this.selectorRunnablePool = selectorRunnablePool;
  9. }
  10. /** * 绑定端口 * @param localAddress */
  11. public void bind(final SocketAddress localAddress){
  12. try {
  13. // 获得一个ServerSocket通道
  14. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  15. // 设置通道为非阻塞
  16. serverChannel.configureBlocking(false);
  17. // 将该通道对应的ServerSocket绑定到port端口
  18. serverChannel.socket().bind(localAddress);
  19. //获取一个boss线程
  20. Boss nextBoss = selectorRunnablePool.nextBoss();
  21. //向boss注册一个ServerSocket通道
  22. nextBoss.registerAcceptChannelTask(serverChannel);
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

  大厅服务员、迎客服务员接口:

  1. import java.nio.channels.SocketChannel;
  2. public interface Worker {
  3. /** * 加入一个新的客户端会话 * @param channel */
  4. public void registerNewChannelTask(SocketChannel channel);
  5. }
  6. import java.nio.channels.ServerSocketChannel;
  7. public interface Boss {
  8. /** * 加入一个新的ServerSocket * @param serverChannel */
  9. public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
  10. }

  配置有几个大厅服务员、几个迎客服务员NioSelectorRunnablePool 类:将线程池传入大厅服务员、迎客服务员中。

  1. import java.util.concurrent.Executor;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. import com.cn.NioServerBoss;
  4. import com.cn.NioServerWorker;
  5. public class NioSelectorRunnablePool {
  6. /** * boss线程数组 */
  7. private final AtomicInteger bossIndex = new AtomicInteger();
  8. private Boss[] bosses;
  9. /** * worker线程数组 */
  10. private final AtomicInteger workerIndex = new AtomicInteger();
  11. private Worker[] workeres;
  12. public NioSelectorRunnablePool(Executor boss, Executor worker) {
  13. initBoss(boss, 1);
  14. initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
  15. }
  16. /** * 初始化boss线程 * @param boss * @param count */
  17. private void initBoss(Executor boss, int count) {
  18. this.bosses = new NioServerBoss[count];
  19. for (int i = 0; i < bosses.length; i++) {
  20. bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);
  21. }
  22. }
  23. /** * 初始化worker线程 * @param worker * @param count */
  24. private void initWorker(Executor worker, int count) {
  25. this.workeres = new NioServerWorker[count];
  26. for (int i = 0; i < workeres.length; i++) {
  27. workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);
  28. }
  29. }
  30. /** * 获取一个worker * @return */
  31. public Worker nextWorker() {
  32. return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];
  33. }
  34. /** * 获取一个boss * @return */
  35. public Boss nextBoss() {
  36. return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
  37. }
  38. }

  测试:需要自己新建两个线程池传入NioSelectorRunnablePool中。

  1. import java.net.InetSocketAddress;
  2. import java.util.concurrent.Executors;
  3. import com.cn.pool.NioSelectorRunnablePool;
  4. public class Start {
  5. public static void main(String[] args) {
  6. //初始化线程
  7. NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
  8. //获取服务类
  9. ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);
  10. //绑定端口
  11. bootstrap.bind(new InetSocketAddress(10101));
  12. System.out.println("start");
  13. }
  14. }

运行结果:
      在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,329人围观)

还没有评论,来说两句吧...

相关阅读