- 浏览: 10299 次
- 性别:
- 来自: 成都
最新评论
我们继续EventLoop。走起!
在前一节我们谈到了一个eventloop负责两个工作,第一作为IO线程,负责处理相应的IO操作;第二作为任务线程,执行队列中的任务。
下面我们先来看看负责IO线程中的一个TCP数据是如何从socket中传递到netty的handler中的。
我在netty探索之旅二的时候说了一下Java NIO中的selector的使用流程:
1,通过Selector.open()打开一个Selector。
2,将Channel注册到Selector中,并设置需要监听的事件。
3,不断循环:
调用select()方法---阻塞
调用selector.selectedKeys()获取SelectionKey
迭代每一个selectedkey
判断是哪些IO事件就绪,强调一下OP_ACCEPT事件:如果是OP_ACCEPT事件,就调用 SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()获取SocketChannel,将其设置为非阻塞后,注册到Selector中。
从selectedkey中获取对应的channel和附加对象。
根据业务更改selectedkey的监听事件(key.interestOps(OP_READ | SelectionKey.OP_WRITE);)
将已经处理过的key从selectedkeys集合中删除。
以上的流程翻译成代码就是:
在netty探索之旅三文章中的NioEventLoop的实例化时,会通过provider.openSelector()获取一个selector对象。赋值给NioEventLoop的selector变量中。
然后在调用SelectorProvider.openSocketChannel()打开一个新的 Java NIO SocketChannel:
将Channel注册到Selector中,在netty探索之旅三的channel注册过程中已经看过了这个注册过程:
Bootstrap.initAndRegister-->AbstractBootstrap.initAndRegister--> MultithreadEventLoopGroup.register-->SingleThreadEventLoop.register-->
AbstractUnsafe.register-->AbstractUnsafe.register0-->AbstractNioChannel.doRegister
AbstractNioChannel.doRegister:
以上把SocketChannel对象注册到selector中。
那么等待事件就绪的循环在哪里喃?
当EventLoop.execute第一次被调用时,触发startThread()的调用,进而把EventLoop所对应的本地Java线程启动。
线程启动后,就会调用run方法:
SingleThreadEventExecutor.this.run()是个抽象的方法。具体的实现方法是在NioEventLoop中。
来了来了,循环来了。
IO事件的轮询
第一步会调用hasTasks()
判断一下任务队列(taskQueue)是否为空,任务队列我们后面讨论,
selectStrategy.calculateStrategy计算出适合的策略:DefaultSelectStrategy
如果队列不是空就调用selectSupplier.get(),IntSupplier对象在NioEventLoop类中创建的:
否则就是返回SelectStrategy.SELECT,返回到主方法中:
细看一个select方法:
代码里面写了点注释,可以大致的了解到这个select方法在做什么:大概就是在执行阻塞select方法前,先进行判断的队列,如果发现有任务加入或者是定时任务快到了的时候,就执行selector.selectNow()(selectNow()是立即返回的, 不会阻塞当前线程),否则就执行一个有超时时间的阻塞select方法(selector.select(timeoutMillis))。
IO事件的处理
在NioEventLoop.run()方法中,第一步是通过select/selectNow调用查询当前是否有就绪的IO事件.那么当有IO事件就绪时,就需要处理这些IO事件,接着看run()方法余下的代码部分:
processSelectedKeys查询就绪的IO事件,处理它们。runAllTasks运行taskQueue中的任务。
ioRatio变量字面意思是IO占比,它表示此线程分配给IO操作所占的时间比(即运行 processSelectedKeys耗时在整个循环中所占用的时间)。ioRatio的默认值是50,表示IO操作和执行task的所占用的线程执行时间比是1:1。
接着我们看processSelectedKeys方法:
根据selectedKeys字段是否为空,而分别调用processSelectedKeysOptimized或 processSelectedKeysPlain。selectedKeys字段的值根据JVM平台的不同,而有设置不同的值。下面我们就以processSelectedKeysOptimized来分析
迭代selectedKeys获取就绪的IO事件,然后为每个事件都调用processSelectedKey来处理它。k.attachment()代码获取的是什么附加对象喃?还记得在客户端的channel注册过程中的以下代码吗?
这个this就是我们的附加对象,就是NioSocketChannel对象。
回到上面的代码中,后面接着调用processSelectedKey方法来处理IO事件:
是不是比较熟悉了,processSelectedKey处理三个事件:
OP_READ,可读事件,即 Channel 中收到了新数据可供上层读取.
OP_WRITE,可写事件,即上层可以向Channel 写入数据.
OP_CONNECT,连接建立事件,即TCP连接已经建立,Channel处于active状态.
OP_READ事件处理:
调用unsafe的read方法,unsafe这个变量谁喃?这个变量通过ch(NioSocketChannel).unsafe()获取:
在NioSocketChannel的父AbstractChannel中对unsafe进行了赋值:
newUnsafe()方法在AbstractNioByteChannel类中实现:
回到OP_READ事件处理中:
unsafe.read()方法就是调用NioByteUnsafe的read方法:
read方法做了如下工作:
1,分配ByteBuf。2,从SocketChannel中读取数据。3,调用pipeline.fireChannelRead发送一个inbound事件。
看到pipeline.fireChannelRead(byteBuf);这句代码没,这句代码就是把读取到的数据,放在pipeline中,后续的事情就是pipeline的inbound事件后续的事情了。哈哈,这样就和pipeline结合起来了。
OP_WRITE处理
OP_CONNECT处理
将OP_CONNECT从就绪事件集中清除。调用unsafe.finishConnect()通知上层连接已建立。
unsafe.finishConnect()调用最后会调用到pipeline().fireChannelActive(),产生一个 inbound事件,通知pipeline中的各个handler TCP通道已建立(即 ChannelInboundHandler.channelActive方法会被调用)
Netty的任务队列机制
Task添加
普通Runnable任务
NioEventLoop继承的SingleThreadEventExecutor中有变量Queue<Runnable> taskQueue,用于存放添加的task。当我们需要将一个Runnable添加到taskQueue中时,我们可以进行如下操作:
eventLoop.execute方法,实际上是调用了SingleThreadEventExecutor.execute()方法
addTask方法:
taskQueue是存放着待执行的任务的队列。
schedule任务
通过调用eventLoop.scheduleXXX之类的方法来添加一个定时任务。schedule功能的实现是在 SingleThreadEventExecutor的父类AbstractScheduledEventExecutor中,此类中有一个变量:Queue<ScheduledFutureTask<?>> scheduledTaskQueue。scheduledTaskQueue是一个队列(Queue),存放的元素是ScheduledFutureTask.而ScheduledFutureTask是对Schedule任务的一个抽象。
看一下AbstractScheduledEventExecutor所实现的schedule方法:
当一个Runnable传递进来后,会被封装为一个ScheduledFutureTask对象,此对象会记录下这个 Runnable在何时运行,已何种频率运行等信息。
构建完了ScheduledFutureTask,继续调用另一个重载的schedule方法:
ScheduledFutureTask对象就会被添加到scheduledTaskQueue中了。
任务的执行
当一个任务被添加到taskQueue后,它是怎么被EventLoop执行的呢?
还记得NioEventLoop.run()方法不,会分别调用processSelectedKeys()和runAllTasks()方法。runAllTasks一看就是来执行任务的。
fetchFromScheduledTaskQueue()将scheduledTaskQueue中已经可以执行的(即定时时间已到的 schedule 任务)添加到taskQueue中,作为可执行的task等待被调度执行。
AbstractScheduledEventExecutor中的pollScheduledTask:
最后runAllTasks()方法就会不断调用task = pollTask() 从taskQueue中获取一个可执行的 task,然后调用它的run()方法来运行此task。
在前一节我们谈到了一个eventloop负责两个工作,第一作为IO线程,负责处理相应的IO操作;第二作为任务线程,执行队列中的任务。
下面我们先来看看负责IO线程中的一个TCP数据是如何从socket中传递到netty的handler中的。
我在netty探索之旅二的时候说了一下Java NIO中的selector的使用流程:
1,通过Selector.open()打开一个Selector。
2,将Channel注册到Selector中,并设置需要监听的事件。
3,不断循环:
调用select()方法---阻塞
调用selector.selectedKeys()获取SelectionKey
迭代每一个selectedkey
判断是哪些IO事件就绪,强调一下OP_ACCEPT事件:如果是OP_ACCEPT事件,就调用 SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()获取SocketChannel,将其设置为非阻塞后,注册到Selector中。
从selectedkey中获取对应的channel和附加对象。
根据业务更改selectedkey的监听事件(key.interestOps(OP_READ | SelectionKey.OP_WRITE);)
将已经处理过的key从selectedkeys集合中删除。
以上的流程翻译成代码就是:
public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打开服务端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开 Selector Selector selector = Selector.open(); // 服务端 Socket 监听8080端口, 并配置为非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 将 channel 注册到 selector 中. // 通常我们都是先注册一个OP_ACCEPT事件,然后在OP_ACCEPT到来时,再将这个Channel的OP_READ,注册到Selector中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 通过调用select方法, 阻塞地等待channel I/O可操作 // TIMEOUT:阻塞的时间 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 获取I/O操作就绪的SelectionKey,通过SelectionKey可以知道哪些Channel 的哪类I/O操作已经就绪. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 当获取一个SelectionKey后, 就要将它删除,表示我们已经对这IO 事件进行了处理. keyIterator.remove(); if (key.isAcceptable()) { // 当 OP_ACCEPT 事件到来时, 我们就有从ServerSocketChannel中获取一个SocketChannel, // 代表客户端的连接 // 注意, 在OP_ACCEPT事件中,从key.channel()返回的Channel是 ServerSocketChannel. // 而在OP_WRITE和OP_READ中,从key.channel()返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在OP_ACCEPT到来时,再将这个Channel的OP_READ注册到Selector中. // 注意,这里我们如果没有设置OP_READ的话,即interest set仍然是 OP_CONNECT的话,那么select方法会一直直接返回. clientChannel.register(key.selector(),OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
在netty探索之旅三文章中的NioEventLoop的实例化时,会通过provider.openSelector()获取一个selector对象。赋值给NioEventLoop的selector变量中。
然后在调用SelectorProvider.openSocketChannel()打开一个新的 Java NIO SocketChannel:
private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } }
将Channel注册到Selector中,在netty探索之旅三的channel注册过程中已经看过了这个注册过程:
Bootstrap.initAndRegister-->AbstractBootstrap.initAndRegister--> MultithreadEventLoopGroup.register-->SingleThreadEventLoop.register-->
AbstractUnsafe.register-->AbstractUnsafe.register0-->AbstractNioChannel.doRegister
AbstractNioChannel.doRegister:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
以上把SocketChannel对象注册到selector中。
那么等待事件就绪的循环在哪里喃?
当EventLoop.execute第一次被调用时,触发startThread()的调用,进而把EventLoop所对应的本地Java线程启动。
线程启动后,就会调用run方法:
thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; ......................
SingleThreadEventExecutor.this.run()是个抽象的方法。具体的实现方法是在NioEventLoop中。
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
来了来了,循环来了。
IO事件的轮询
第一步会调用hasTasks()
protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); }
判断一下任务队列(taskQueue)是否为空,任务队列我们后面讨论,
selectStrategy.calculateStrategy计算出适合的策略:DefaultSelectStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }
如果队列不是空就调用selectSupplier.get(),IntSupplier对象在NioEventLoop类中创建的:
private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } };
否则就是返回SelectStrategy.SELECT,返回到主方法中:
case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));
细看一个select方法:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //判断当前的定时任务队列中的第一个定时任务的延迟时间是不是快到了 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { //如果快到了就跳出循环,在跳出循环的时候发现还没有进行过select,就调用selectNow if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // 轮询过程中发现有任务加入,中断本次轮询 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //以上的判断是为了让任务队列能够及时执行,在进行阻塞select操作的之前满足以上的条件,就执行一次非阻塞select操作,跳出循环 //到这里说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到 //就进行一个阻塞式的select操作,截止到第一个定时任务的截止时间 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } }
代码里面写了点注释,可以大致的了解到这个select方法在做什么:大概就是在执行阻塞select方法前,先进行判断的队列,如果发现有任务加入或者是定时任务快到了的时候,就执行selector.selectNow()(selectNow()是立即返回的, 不会阻塞当前线程),否则就执行一个有超时时间的阻塞select方法(selector.select(timeoutMillis))。
IO事件的处理
在NioEventLoop.run()方法中,第一步是通过select/selectNow调用查询当前是否有就绪的IO事件.那么当有IO事件就绪时,就需要处理这些IO事件,接着看run()方法余下的代码部分:
cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
processSelectedKeys查询就绪的IO事件,处理它们。runAllTasks运行taskQueue中的任务。
ioRatio变量字面意思是IO占比,它表示此线程分配给IO操作所占的时间比(即运行 processSelectedKeys耗时在整个循环中所占用的时间)。ioRatio的默认值是50,表示IO操作和执行task的所占用的线程执行时间比是1:1。
接着我们看processSelectedKeys方法:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
根据selectedKeys字段是否为空,而分别调用processSelectedKeysOptimized或 processSelectedKeysPlain。selectedKeys字段的值根据JVM平台的不同,而有设置不同的值。下面我们就以processSelectedKeysOptimized来分析
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } } }
迭代selectedKeys获取就绪的IO事件,然后为每个事件都调用processSelectedKey来处理它。k.attachment()代码获取的是什么附加对象喃?还记得在客户端的channel注册过程中的以下代码吗?
protected void doRegister() throws Exception { selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
这个this就是我们的附加对象,就是NioSocketChannel对象。
回到上面的代码中,后面接着调用processSelectedKey方法来处理IO事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
是不是比较熟悉了,processSelectedKey处理三个事件:
OP_READ,可读事件,即 Channel 中收到了新数据可供上层读取.
OP_WRITE,可写事件,即上层可以向Channel 写入数据.
OP_CONNECT,连接建立事件,即TCP连接已经建立,Channel处于active状态.
OP_READ事件处理:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
调用unsafe的read方法,unsafe这个变量谁喃?这个变量通过ch(NioSocketChannel).unsafe()获取:
@Override public Unsafe unsafe() { return unsafe; }
在NioSocketChannel的父AbstractChannel中对unsafe进行了赋值:
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
newUnsafe()方法在AbstractNioByteChannel类中实现:
protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); }
回到OP_READ事件处理中:
unsafe.read()方法就是调用NioByteUnsafe的read方法:
@Override public final void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { byteBuf.release(); byteBuf = null; close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } }
read方法做了如下工作:
1,分配ByteBuf。2,从SocketChannel中读取数据。3,调用pipeline.fireChannelRead发送一个inbound事件。
看到pipeline.fireChannelRead(byteBuf);这句代码没,这句代码就是把读取到的数据,放在pipeline中,后续的事情就是pipeline的inbound事件后续的事情了。哈哈,这样就和pipeline结合起来了。
OP_WRITE处理
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); }
OP_CONNECT处理
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
将OP_CONNECT从就绪事件集中清除。调用unsafe.finishConnect()通知上层连接已建立。
unsafe.finishConnect()调用最后会调用到pipeline().fireChannelActive(),产生一个 inbound事件,通知pipeline中的各个handler TCP通道已建立(即 ChannelInboundHandler.channelActive方法会被调用)
Netty的任务队列机制
Task添加
普通Runnable任务
NioEventLoop继承的SingleThreadEventExecutor中有变量Queue<Runnable> taskQueue,用于存放添加的task。当我们需要将一个Runnable添加到taskQueue中时,我们可以进行如下操作:
EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() { @Override public void run() { System.out.println("Hello, Netty!"); } });
eventLoop.execute方法,实际上是调用了SingleThreadEventExecutor.execute()方法
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
addTask方法:
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { rejectedExecutionHandler.rejected(task, this); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
taskQueue是存放着待执行的任务的队列。
schedule任务
通过调用eventLoop.scheduleXXX之类的方法来添加一个定时任务。schedule功能的实现是在 SingleThreadEventExecutor的父类AbstractScheduledEventExecutor中,此类中有一个变量:Queue<ScheduledFutureTask<?>> scheduledTaskQueue。scheduledTaskQueue是一个队列(Queue),存放的元素是ScheduledFutureTask.而ScheduledFutureTask是对Schedule任务的一个抽象。
看一下AbstractScheduledEventExecutor所实现的schedule方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } return schedule(new ScheduledFutureTask<Void>( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); }
当一个Runnable传递进来后,会被封装为一个ScheduledFutureTask对象,此对象会记录下这个 Runnable在何时运行,已何种频率运行等信息。
构建完了ScheduledFutureTask,继续调用另一个重载的schedule方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
ScheduledFutureTask对象就会被添加到scheduledTaskQueue中了。
任务的执行
当一个任务被添加到taskQueue后,它是怎么被EventLoop执行的呢?
还记得NioEventLoop.run()方法不,会分别调用processSelectedKeys()和runAllTasks()方法。runAllTasks一看就是来执行任务的。
protected boolean runAllTasks() { boolean fetchedAll; do { fetchedAll = fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { break; } } } while (!fetchedAll); lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; }
fetchFromScheduledTaskQueue()将scheduledTaskQueue中已经可以执行的(即定时时间已到的 schedule 任务)添加到taskQueue中,作为可执行的task等待被调度执行。
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; }
AbstractScheduledEventExecutor中的pollScheduledTask:
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } return null; }
最后runAllTasks()方法就会不断调用task = pollTask() 从taskQueue中获取一个可执行的 task,然后调用它的run()方法来运行此task。
发表评论
-
netty探索之旅七
2017-03-06 16:28 361前面我们分析了Pipeline,还有一个东西值得我们去研究研究 ... -
netty探索之旅六
2017-03-05 15:45 395netty中的管道--ChannelPipeline的事件传输 ... -
netty探索之旅五
2017-03-02 11:09 391netty中的管道--ChannelPipeline机制 前 ... -
netty探索之旅三
2017-02-17 16:00 496下面就开始我们的探索 ... -
netty探索之旅二
2017-02-13 13:30 315上一篇只是简单的介绍了一下NIO中的Selector。 这里我 ... -
netty探索之旅四
2017-02-27 16:19 356上一篇我们研究了netty的客户端代码,这一篇我们研究一下服务 ... -
netty探索之旅一
2017-02-12 16:38 335其实一直都在关注NETTY,前面也花了点时间去看过,但是 ...
相关推荐
Java进阶技术-netty进阶之路
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
Netty进阶之路 跟着案例学Netty 整本书无密码,Netty进阶之路 跟着案例学Netty
《Netty进阶之路:跟着案例学Netty》中的案例涵盖了Netty的启动和停止、内存、并发多线程、性能、可靠性、安全等方面,囊括了Netty绝大多数常用的功能及容易让人犯错的地方。在案例的分析过程中,还穿插讲解了Netty...
精选自1000多个一线业务实际案例,从原理到实践全景式讲解Netty项目实践,快速领悟Netty专家花大量时间积累的经验,提高编程水平及分析解决问题的能力,《Netty木又威指南》作者力作,众专家力荐 Netty将Java NIO...
Netty学习之ServerChannel Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本篇中,我们将深入探讨ServerChannel这一核心概念,它是Netty中用于接收客户端...
读书笔记:Netty权威指南学习之旅
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨Netty的基本概念,通过“Hello World”范例来理解其工作原理。 首先,让我们理解...
在《Netty进阶之路:跟着案例学Netty》中,作者将在过去几年实践中遇到的问题,以及Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例进行剖析,讲解问题背后的原理,并结合Netty源码分析...
《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...
Netty基础,用于学习Netty,参考黑马程序员的netty教程
《跟闪电侠学Netty:Netty即时聊天实战与底层原理》是一本深入浅出的Netty技术指南,旨在...通过学习这本书,你不仅可以学会Netty的基本使用,还能深入了解其设计思想和优化手段,为你的Java网络编程之路打下坚实基础。
8. **异常处理**:Netty提供了一套完整的异常处理机制,使得在发生错误时能够优雅地恢复或关闭连接。 9. **丰富的API**:Netty的API设计简洁且强大,易于理解和使用,降低了开发网络应用的门槛。 10. **社区活跃**...
ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。
**Netty 深度解析** Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它广泛应用于各种领域,如分布式系统、云计算、游戏服务器、大数据传输等。Netty 的设计...
在Netty中,最重要的概念之一是“Boss线程”和“Worker线程”的模型。Boss线程负责接收新的连接请求,而Worker线程则处理这些连接后的读写操作,这种模型能够有效提高系统的并发能力。Netty的NIO(非阻塞I/O)模型...
Netty (netty-netty-5.0.0.Alpha2.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
Netty是Java领域的一款高性能、异步事件驱动的网络应用框架,主要用于快速开发可维护的高性能协议服务器和客户端。在本精讲中,我们将深入探讨Netty的核心概念、设计模式以及实际应用场景,帮助你全面理解并掌握...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 实时通讯的原理与应用,以及如何利用它构建 WebSocket 服务。 WebSocket 是...
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect