Netty Boss线程是为什么都是写1
文章目录
- Netty Boss线程是为什么都是写1
- 中间件使用
- Dubbo中的使用方法:NettyServer#doOpen
- RxServer 的使用方法
- 为什么呢?
- 总结
Netty Boss线程是为什么都是写1
先来看看常见的中间件、框架的使用方式。
中间件使用
Dubbo中的使用方法:NettyServer#doOpen
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
RxServer 的使用方法
serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerParentEventLoop(true),
RxNetty.getRxEventLoopProvider().globalServerEventLoop(true));
public static RxEventLoopProvider getRxEventLoopProvider() {
return rxEventLoopProvider;
}
private static volatile RxEventLoopProvider rxEventLoopProvider =
new SingleNioLoopProvider(1, Runtime.getRuntime().availableProcessors());
看一下对应的实现 SingleNioLoopProvider
public SingleNioLoopProvider() {
this(Runtime.getRuntime().availableProcessors());
}
public SingleNioLoopProvider(int threadCount) {
eventLoop = new SharedNioEventLoopGroup(threadCount);
parentEventLoop = eventLoop;
parentEventLoopCount = childEventLoopCount = threadCount;
nativeEventLoop = new AtomicReference<EpollEventLoopGroup>();
nativeParentEventLoop = nativeEventLoop;
}
public SingleNioLoopProvider(int parentEventLoopCount, int childEventLoopCount) {
this.parentEventLoopCount = parentEventLoopCount;
this.childEventLoopCount = childEventLoopCount;
parentEventLoop = new SharedNioEventLoopGroup(parentEventLoopCount);
eventLoop = new SharedNioEventLoopGroup(childEventLoopCount);
nativeParentEventLoop = new AtomicReference<EpollEventLoopGroup>();
nativeEventLoop = new AtomicReference<EpollEventLoopGroup>();
}
为什么呢?
可以看到这里的boss线程池的线程数都是一个,权且思考一下:
从上层看的话,每个Server启动的时候需要绑定一个端口,那么无论是NIO还是BIO,我们都需要在服务器进行接收,而一个端口对应的是一个Selector
或者ServerSocket
对象,那么如果是多个线程的话,会不会存在竞争关系?并且因为本来这个Boss线程池(Acceptor)也只是接收一下客户端TCP连接
,并创建一个Channel对象发送给work线程池去进行真正的读写,难道是怕线程竞争?
看一下对应的代码
/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
Server初始化代码&处理
ServerBootStrap#init
void init(Channel channel) throws Exception {
// 省略...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 初始化Acceptor
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
可以看到这里
ServerBootstrapAcceptor
是pipeline的处理器。下面看看他的实现逻辑:开启Selector
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
NioEventLoop接收到请求,派发请求到Worker处理
NioEventLoop#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略
// 如果当前通道可读
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- 读取配置并封装成Channel
NioMessageUnsafe#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 将NioSocketChannel作为上游参数传到下游处理器
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
将Channel派发的Worker线程池
ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Nio处理完之后是一个channel
final Channel child = (Channel) msg;
// 为当前channel添加处理器
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册到channe到WorkerNioEventLoop让Worker进行通道的读写。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
总结
看到了现在,仿佛1的这个魔法值还仅仅是因为是一个端口的原因。而线程池的存在是为了在启动的时候可以绑定多个端口,方便扩展。
但是根本上还是一个端口对应一个boss线程池的一条处理线程,即一条线程持有一个端口对应的selector。当然如果我们启动不仅仅是一个端口的话,仍然需要写对应的端口个数。以上是Netty实现的原理。
继续复盘我们刚才说的线程竞争会不会导致selector的变慢之类的,其实是因为selector的特性导致的,我们只能有一个线程持久selector,即便是我们在多个线程进行持有selector,如果想要获取通道上面的事件就需要调用Selector#select()
方法。看一下他对应的介绍。
This method performs a blocking selection operation. It returns only after at least one channel is selected, this selector’s wakeup method is invoked, or the current thread is interrupted, whichever comes first
这个操作是阻塞的,同一时间只能有一个线程进行处理,所以天生不支持多线程。
– 以上是Netty4的源码
还没有评论,来说两句吧...