前面我们仔细分析过NioEventLoop的源码,以及找到了Netty事件驱动的源头代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return ; } if (eventLoop != this || eventLoop == null ) { return ; } unsafe.close(unsafe.voidPromise()); return ; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0 ) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
讲NioEventLoop的时候,后面就没有接着讲事件被触发之后的操作了,我们当时只知道了,这里是Nio事件触发的源头,是监听事件的地方。
这篇博客我们来看一看,当SelectionKey.OP_READ | SelectionKey.OP_ACCEPT这两个事件被Netty监听到之后,Netty会怎么操作。
先看看SelectionKey.OP_ACCEPT。看到这篇文章应该知道了,Server端的是有两个层次的:boss和worker,boss用来接收ACCEPT事件,worker用来持有建立的连接以及继续监听连接的读写事件。
所以这里SelectionKey.OP_ACCEPT事件触发后,最后一定会创建一个连接,并交给worker线程池。下面我们分析源码:
ServerBootstrapAcceptor 先必须要回忆一下ServerBootstrapAcceptor,这个类是一个ChannelHanlder,是boss Channel创建的时候注册到pipeline中去的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (final Channel ch) throws Exception { ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
最后调用pipeline.addLast,将一个ServerBootstrapAcceptor实例注册进了pipeline。
ServerBootstrapAcceptor.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { @Override @SuppressWarnings("unchecked") public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } }
这个类主要实现了channelRead方法,这个方法会被pipeline#fireChannelRead方法回调。也就是说,read事件链被触发的时候,这个方法会被回调,这里记一下这个方法被触发的地方。
先看一下这个channelRead方法干了什么,首先传进来的参数msg被强制转换为Channel类型,这个应该是在调用方保证的,可以猜想到这个Channel是刚刚Client和Server建立的连接。接下来调用child.pipeline().addLast(childHandler)
往子Channel的pipeline中注册一个事件处理类:childHandler,这个类是在调用ServerBootstrap#childHandler的时候设置进来的。最后将Channel注册进childGroup,这个childGroup是一个NioEventLoopGroup,也就是worker线程池。
这里验证了我们的猜想,总结一下:
初始化NioServerSocketChannel(init方法)的时候,在NioServerSocketChannel的pipeline中注册了一个ServerBootstrapAcceptor,当这个类的channelRead方法被回调时,建立的连接,也就是一个新的Channel被注册到worker线程组中。
这就是NioServerSocketChannel负责的功能啦。
好了,大致流程已经分析清楚了,接下来仔仔细细的看一下整个流程:
SelectionKey.OP_ACCEPT事件 这个事件被触发的时候,调用了unsafe.read(),这个unsafe就是NioServerSocketChannel对应的unsafe,调用的unsafe#read方法实际是调用NioServerSocketChannel的父类AbstractNioMessageChannel中的内部类NioMessageUnsafe#read方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read () { assert eventLoop () .inEventLoop () ; final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false ; Throwable exception = null ; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0 ) { break ; } if (localRead < 0 ) { closed = true ; break ; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null ) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true ; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
这里面干了两件事:
doReadMessages(readBuf)
pipeline.fireChannelRead(readBuf.get(i))
惯例一个个看:
1. doReadMessages 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected int doReadMessages (List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null ) { buf.add(new NioSocketChannel(this , ch)); return 1 ; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket." , t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket." , t2); } } return 0 ; }
这个方法在子类NioServerSocketChannel中,很简单的调用了SocketUtils.accept(javaChannel()),其实就是调用JDK原生的accept方法,接纳一个新的客户端,并返回一个客户端的句柄SocketChannel,然后包装成Netty中的NioSocketChannel类,add到buf中。
当这个方法返回之后,List<Object> buf中被填充了所有刚刚Accept的Client端的连接。
2. pipeline.fireChannelRead(readBuf.get(i)) 这个方法在一个for循环中,挨个触发channelRead事件链。这里的pipeline是NioServerSocketChannel这个类的pipeline,所以最终会调用ServerBootstrapAcceptor的channelRead方法,传入的参数是NioSocketChannel实例。
这里调用完后,后面的逻辑上面已经讲过了,ServerBootstrapAcceptor将这个客户端连接实例注册到了worker线程组中,开始监听并处理之后的读写事件。
总结 这里比较容易疑惑的是,SelectionKey.OP_READ | SelectionKey.OP_ACCEPT这两个事件被同时监听,并都触发的是unsafe.read()事件,但是,如果调用了NioServerSocketChannel的pipeline的channelRead事件链的话,可以保证一定是SelectionKey.OP_ACCEPT事件,因为NioServerSocketChannel监听的只有OP_ACCEPT事件,所以NioServerSocketChannel绑定的EventLoop中触发出来的事件只可能是SelectionKey.OP_ACCEPT事件被触发。
趁热打铁看看Read事件 Read事件由刚刚accept之后,new出来的NioSocketChannel来负责监听。看看这个类的创建:
NioSocketChannel.java构造函数:
1 2 3 4 public NioSocketChannel (Channel parent, SocketChannel socket) { super (parent, socket); config = new NioSocketChannelConfig(this , socket.socket()); }
调用了父类AbstractNioByteChannel的构造函数,注意:NioSocketChannel.java的父类是AbstractNioByteChannel,NioServerSocketChannel的父类是AbstractNioMessageChannel,这两个父类是不一样的,名字很相似 。
【BTW】NioServerSocketChannel和NioSocketChannel两个句柄前面是服务端的,后面是客户端的,和JDK命名规则一样。
看AbstractNioByteChannel的构造:
1 2 3 protected AbstractNioByteChannel (Channel parent, SelectableChannel ch) { super (parent, ch, SelectionKey.OP_READ); }
好了这里得到了一个重要的信息,NioSocketChannel监听的是OP_READ事件。剩下两个参数很好理解,parent就是NioServerSocketChannel实例,ch是Accept事件被处理之后,创建的java原生的SelectableChannel。
所以当监听到OP_READ事件之后,会调用unsafe.read()
,这里的unsafe是在AbstractNioByteChannel中实现的unsafe,看看它的NioByteUnsafe#read方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Override public final void read () { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return ; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null ; boolean close = false ; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0 ) { byteBuf.release(); byteBuf = null ; close = allocHandle.lastBytesRead() < 0 ; if (close) { readPending = false ; } break ; } allocHandle.incMessagesRead(1 ); readPending = false ; pipeline.fireChannelRead(byteBuf); byteBuf = null ; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
对应于NioMessageUnsafe的read方法,这里同样干了两个重要的事情:
doReadBytes
fireChannelRead
doReadBytes 这个方法是一个模板方法,在NioSocketChannel中实现:
1 2 3 4 5 protected int doReadBytes (ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
这里其实就是将javaChannel中的数据读到了ByteBuf中,然后返回了,具体过程不再分析。
fireChannelRead 在获取到javaChannel中的读取的数据之后,就发起了channelRead事件链,这里的pipeline是NioSocketChannel的事件链(看pipeline事件触发时,要看清楚pipeline是属于哪个Channel,这将影响后面的逻辑分析)。
最终会运行childHandler的channelRead方法,也就是开发者自定义的handler,这里其实就是把事件传给了开发人员的逻辑里面了。
以Netty提供的Echo例子为例,它的childHandler是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete (ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
直接将读到的数据使用ctx.write(msg)方法写回到客户端,也就是echo服务。
这里解释一下@Sharable:
@Sharable
表示一个ChannelHandler是否是可以多个Pipeline共享的,可以和:是否是可重入的、是否是可以并发调用的、是否是线程安全的、是否是单例的这几个问题结合起来理解Sharable。
1️⃣