前面的两篇博客分别分析了NioEventLoopGroup和NioEventLoop这两个类的创建以及重要功能,这为这篇博客全面分析Netty的启动类ServerBootstrap奠定了基础。
先回顾一下Server的核心启动代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 EventLoopGroup bossGroup = new NioEventLoopGroup(1 ); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler();try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100 ) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
可以看到创建ServerBootstrap的过程,最后调用b.bind(PORT)
将服务绑定到端口上。
ServerBootstrap的构造函数什么也没有干,核心的内容都在bind方法中,所以接下来我们重点分析bind方法,看看netty究竟是怎么启动的。
bind方法 所有的重载的bind方法最终都调用了doBind方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
这里面干了两件事情:
调用initAndRegister方法创建Channel并注册Nio事件
调用doBind0触发pipeline中的事件链
initAndRegister方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null ) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
这个方法里面主要干了三件事情:
使用channelFactory创建channel
使用init方法初始化channel
最后调用config().group().register(channel)将刚刚创建的channel绑定到一个EventLoop上(就是一个NioEventLoop上)
1. channelFactory创建channel 首先弄清楚channelFactory是什么,当我们调用ServerBootstrap的channel方法时:
1 2 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)
调用的实际上是父类AbstractBootstrap的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public B channel (Class<? extends C> channelClass) { if (channelClass == null ) { throw new NullPointerException("channelClass" ); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } public B channelFactory (ChannelFactory<? extends C> channelFactory) { if (channelFactory == null ) { throw new NullPointerException("channelFactory" ); } if (this .channelFactory != null ) { throw new IllegalStateException("channelFactory set already" ); } this .channelFactory = channelFactory; return self(); }
调用AbstractBootstrap#channel方法时,创建了一个ReflectiveChannelFactory对象,并最终赋值给了channelFactory。
到这里我们就知道了channelFactory其实是一个ReflectiveChannelFactory实例,那继续看看ReflectiveChannelFactory是什么东西:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ReflectiveChannelFactory <T extends Channel > implements ChannelFactory <T > { private final Class<? extends T> clazz; public ReflectiveChannelFactory (Class<? extends T> clazz) { if (clazz == null ) { throw new NullPointerException("clazz" ); } this .clazz = clazz; } @Override public T newChannel () { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString () { return StringUtil.simpleClassName(clazz) + ".class" ; } }
ReflectiveChannelFactory其实就是用反射去创建一个新的Channel,也就是我们传入的NioServerSocketChannel,所以看到这里就明白了,initAndRegister方法初始化和注册的是NioServerSocketChannel这个Channel。
看一下NioServerSocketChannel创建的大致过程:
1 2 3 4 public NioServerSocketChannel (ServerSocketChannel channel) { super (null , channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this , javaChannel().socket()); }
这里调用了父类的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected AbstractNioChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent); this .ch = ch; this .readInterestOp = readInterestOp; try { ch.configureBlocking(false ); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket." , e2); } } throw new ChannelException("Failed to enter non-blocking mode." , e); } }
这里最重要的是,将SelectionKey.OP_ACCEPT赋值给了readInterestOp属性,这个在之后向Selector中注册的时候会用到。
这里分析一下为什么只关心SelectionKey.OP_ACCEPT事件,记得ServerBootstrap传入了两个EventLoopGroup,分别命名为bossGroup和workerGroup,代码如下:
1 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1 ); EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup和workerGroup的职责不同,bossGroup专门负责接收客户端的链接,一旦连接建立,就会把接下来的io读写工作交给workerGroup,这也是为什么bossGroup在new的时候只需要一个线程了。workerGroup才是执行io读写工作的线程,所以命名为工作线程。NioServerSocketChannel是处理客户端连接的Channel,所以它关心的事件只有SelectionKey.OP_ACCEPT。
在AbstractNioChannel的构造函数中还调用了父类AbstractChannel的构造函数,AbstractChannel构造函数干了三件事情:
1 2 3 id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();
比较重要的是创建了unsafe实例和pipeline实例,这里的unsafe不同于sun包中的unsafe,后面会仔细分析,这里只用知道这两个实例被初始化了。pipeline是Netty中另一个核心构建,如果说NioEventLoop是心脏,那pipeline就是血管了。这里创建的pipeline是DefaultChannelPipeline,记住这一点后,后面分析的pipeline的方法实现都在DefaultChannelPipeline中。这里简化了我们的分析,不用去找多个父类和多个实现才知道最终的方法。
当使用channelFactory创建完Channel之后,调用init方法去初始化这个channel。
2. init方法初始化channel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override void init (Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
剔除掉一些不管心的方法之后,这里的功能就比较清晰了,首先通过channel.pipeline()方法获取pipeline,这里的pipeline就是上文中创建的DefaultChannelPipeline。
然后pipeline中增加了一个ChannelHandle,ChannelHandle在对应事件触发的时候回调ChannelHandle里面的一些方法,所以这个类里面的方法是异步执行的。
稍微看一下这个ChannelInitializer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Sharable public abstract class ChannelInitializer <C extends Channel > extends ChannelInboundHandlerAdapter {PlatformDependent.newConcurrentHashMap(); protected abstract void initChannel (C ch) throws Exception ; @Override @SuppressWarnings("unchecked") public final void channelRegistered (ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); } else { ctx.fireChannelRegistered(); } }
initChannel方法实际是在channelRegistered被回调的时候调用的,这里mark一下。
在initChannel方法中,最重要的是往自己的事件处理连中添加了一个ServerBootstrapAcceptor。
上文已经分析过了,NioServerSocketChannel的作用其实就是监听并接受客户端的连接请求,连接建立完成之后,就会扔给worker线程。这里的ServerBootstrapAcceptor就是干这个用的,看一下ServerBootstrapAcceptor的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override @SuppressWarnings("unchecked") public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
当NioServerSocketChannel监听到SelectionKey.OP_ACCEPT时间后,会触发pipeline中的channelRead事件链,最终会执行ServerBootstrapAcceptor中的channelRead方法。这个方法调用childGroup.register(child)将客户端和服务端建立的连接,也就是这里传进来的msg(一个Channel实例)注册进childGroup。childGroup(一个NioEventLoopGroup实例)会从自己的children(一个NioEventLoop数组)中选出一个NioEventLoop去接纳这个新的Channel。
到这里init方法就执行完了,注意这里的很多动作其实都仅仅注册了一个回调函数,还没有被真正的执行。
接下来看看config().group().register(channel)。
3. config().group().register(channel) config方法返回的是一个ServerBootstrapConfig实例,这部分代码朋友们可以自己跟踪,比较简单。
ServerBootstrapConfig#group方法返回的其实就是ServerBootstrap中的group,这里的group是bossGroup,同样大家跟踪一下这部分代码,不再赘述了。关键我们看group.register(channel)干了什么:
首先,调用NioEventLoopGroup#register方法实际调用了父类MultithreadEventLoopGroup#register方法:
1 2 3 4 5 @Override public ChannelFuture register (Channel channel) { return next().register(channel); }
然后先调用了一个next方法,调用的是父类MultithreadEventExecutorGroup#next方法:
1 2 3 4 @Override public EventExecutor next () { return chooser.next(); }
这里调用了chooser.next(),看过上篇博客的朋友们肯定有印象,这个chooser其实就是从NioEventLoopGroup的children数组中选出一个NioEventLoop。
回到MultithreadEventLoopGroup#register方法中,这里的next().register(channel)其实调用的是NioEventLoop#register方法:
(这里比较绕,多看几遍这部分)
1 2 3 4 5 6 7 8 9 10 11 12 @Override public ChannelFuture register (Channel channel) { return register(new DefaultChannelPromise(channel, this )); } @Override public ChannelFuture register (final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise" ); promise.channel().unsafe().register(this , promise); return promise; }
然后调用的是unsafe的register方法:
这里调用register方法时,将自己传进去了,也就是一个EventLoop,这步的作用其实就是将这个Channel和一个EventLoop绑定起来了,或者说将Channel注册到了EventLoop中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null ) { throw new NullPointerException("eventLoop" ); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already" )); return ; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return ; } AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
这里面有一个AbstractChannel.this.eventLoop = eventLoop;
,可以印证我们刚刚分析的,这里将传进来的EventLoop赋值到了自己的eventLoop对象上,还记的上面的init方法中,最后有一段代码:
1 2 3 4 5 6 7 ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });
这里ch.eventLoop()拿到的其实就是刚刚赋值的eventLoop对象。
哎这里是不是很奇怪,ch.eventLoop()这步是在eventLoop对象被赋值之前调用的啊,这时拿到的eventLoop难道不是null嘛?这就是上面反复提到过的,这里的ch.eventLoop()其实并没有执行,只是注册了一个回调函数,当它真正被调用执行的时候,eventLoop已经被赋值了。
继续看AbstractChannel的register方法,这个方法调用了register0:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
这段代码干了两件重要的事情:
这两个后面分析。这里还做了一件重要的事情,调用了pipeline.fireChannelRegistered();
,这里触发了pipeline中的注册事件链,注册事件链比较特殊,上文提到过,ChannelInitializer的initChannel方法实际是由channelRegistered方法触发的。
1.doRegister 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true ; } else { throw e; } } } }
看到了重要的一个调用:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
,这里selectionKey是JDK的selectionKey,javaChannel返回的是JDK的Channel,然后将eventLoop中的Selector注册进了这个Channel中,到这里总算是落地到JDK的代码上了,到这里,Nio Server算是真正的启动了。
这里还没完,这里注册的interestOps是0,0不是任何一个Nio事件,所以这里其实是借助register方法初始化selectionKey,并没有开始真正的监听Nio事件。这里将自己作为attachment传进SelectionKey,之后会反过来从SelectionKey中取这个NettyChannel。
2.beginRead 刚刚说了,doRegister方法中并没有开启真正的事件监听,那唯一的可能就是在beginRead中开启监听了:
进过一顿寻找,发现最终beginRead调用了这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
这里将刚刚创建的selectionKey中的interestOps换成了在AbstractNioChannel构造的时候传进来的interestOps,文章开头看到了,最初传进来的interestOps是SelectionKey.OP_ACCEPT事件,所以最后selectionKey绑定的事件就是SelectionKey.OP_ACCEPT事件。
doBind方法 这个方法就很简单了,就是触发了pipeline中的bind事件链,并最终调用JDK绑定到端口,这里最后调用JDK的代码比较难找,最开始我找了好久也没有找到是在哪里调用的,仔细梳理了一遍之后发现,触发pipeline中的bind事件链中,bind事件被定义为出站事件,所以事件会从tail流到head,我们去看那一下DefaultChannelPipeline中的headContext:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super (pipeline, null , HEAD_NAME, false , true ); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler () { return this ; } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { } @Override public void bind ( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } }
最终委托给了unsafe#bind方法,后面的代码跟踪就比较简单了。
总结 我们看到,在注册这一步的时候,绕了好大一个弯,从NioEventLoopGroup#register到NioEventLoop#register再到Unsafe#register方法。
从NioEventLoopGroup#register到NioEventLoop#register这步,其实是从EventLoopGroup选出一个EventLoop(通过调用next方法),NioEventLoop#register到Unsafe#register这步,其实是将NioEventLoop绑定到Channel,这里Unsafe是Channel的内部类,最终,Unsafe调用了JDK的Nio register方法创建了一个selectionKey。Unsafe最后还点燃了pipeline的register事件链,并最终绑定了SelectionKey.OP_ACCEPT事件。
这里分一下Unsafe这个命名,与JDK打交道的功能封装在Unsafe中,它是连接Netty Nio与JDK Nio的桥梁,那为什么要命名为Unsafe呢?这里就很有趣了:
我们熟悉Sun提供的Unsafe工具,这个工具可以与一些底层直接进行交互,比如CAS,比如堆外内存的使用,这里命名为Unsafe的意思是说,这些东西都不属于JVM管理的,请知晓,JVM是不保证这些操作的安全性的!
在Netty中就很有意思了,Netty是在说谁Unsafe呢?其实说的是JDK,Netty说JDK是Unsafe的,因为对于Netty来说,JDK中的代码是不受自己控制的,调用JDK出了问题Netty是无能为力的,Netty同样不能保证JDK的安全!所以与JDK打交道的代码被称为Unsafe。
这样的话,在以后自己写框架的过程中,如果需要对其他的依赖的框架做一些封装,那这个封装的类也可以被命名为Unsafe,告诉别人,这个类里面的功能都是别人的!我不能保证安全性!我只是提供一层封装!(虽然我自己可能不会这么去做^_^)。