NioEventLoop 内部启用一个线程,处理两件事:
1.负责注册到此类的Channel的IO事件,channel connect ,channel read selector.select()或selector.selectNow()
2.处理一些IO任务
一个线程处理两件事,两件事的使用CUP的时间比例由ioRatio属性决定,默认为各为50%
/** * EventExecutor是一个特殊的EventExecutorGroup * EventExecutorGroup是一个线程池(组),池中每一个线程都是EventExecutor * EventExecutorGroup 聚合了多个(线程数)EventExecutor,每一个EventExecutor都是一条线程 * */ public interface EventExecutor extends EventExecutorGroup { /** * 返回它自己 */ @Override EventExecutor next(); /** *返回线程池组 */ EventExecutorGroup parent(); /** * 判断当前调用方法的线程(Thread.currentThread()),是不是 EventExecutor内部启动的线程 * 详见:SingleThreadEventExecutor */ boolean inEventLoop(); /** * 判断参数线程 是不是 EventExecutor内部启动的线程 */ boolean inEventLoop(Thread thread); /** * 创建一个Promise */ <V> Promise<V> newPromise(); /** * 创建一个ProgressivePromise */ <V> ProgressivePromise<V> newProgressivePromise(); /** *创建一个newSucceededFuture,即默认的isSuccess()==true,而此Future.addListener的监听器FutureListener将会被 * 立即调用(without blocking)。 */ <V> Future<V> newSucceededFuture(V result); /** *创建一个newFailedFuture,即默认的isSuccess()==false,而此Future.addListener的监听器FutureListener将会被 * 立即调用(without blocking)。 */ <V> Future<V> newFailedFuture(Throwable cause); } /** * AbstractEventExecutor 是EventExecutor一个基础的实现。 * 它扩展了AbstractExecutorService,具备了线程池的基本方法:shutdown,sumit,invokeAll,shutdown,void execute(Runnable command)等 */ public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { //返回默认的DefaultPromise public <V> Promise<V> newPromise() { return new DefaultPromise<V>(this); } //返回默认的DefaultProgressivePromise public <V> ProgressivePromise<V> newProgressivePromise() { return new DefaultProgressivePromise<V>(this); } //返回默认的SucceededFuture public <V> Future<V> newSucceededFuture(V result) { return new SucceededFuture<V>(this, result); } //返回默认的FailedFuture== public <V> Future<V> newFailedFuture(Throwable cause) { return new FailedFuture<V>(this, cause); } /** * 改写了AbstractExecutorService.newTaskFor,原返回FutureTask,现返回PromiseTask */ protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new PromiseTask<T>(this, runnable, value); } /** * 改写了AbstractExecutorService.newTaskFor,原返回FutureTask,现返回PromiseTask */ protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new PromiseTask<T>(this, callable); } /** * 调用Runable.run方法,由子类在线程中调用 */ protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } } } /** * AbstractScheduledEventExecutor 在AbstractEventExecutor基础上,提供了shedule方法 * */ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { /** * 一个保存调度任务的队列。默认为 PriorityQueue<ScheduledFutureTask<?>>(); * ScheduledFutureTask 实现了Comparable方法,PriorityQueue可以对其内部的ScheduledFutureTask进排序 * 最近要执行的调度排在前面 */ Queue<ScheduledFutureTask<?>> scheduledTaskQueue; /** * 以下三个方法,改写了AbstractExecutorService,提供了定时调度的功能, * 三个方法内主要创建 ScheduledFutureTask对象,再调用 schedule方法(见下面),添加到队列中(scheduledTaskQueue) * */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) ; /** * 此方法将上面三个方法生成的ScheduledFutureTask,添加调度队列中。 * 由于调度队列PriorityQueue,不是线程安全的,不应多线程访添加队列 * 所以首先判断是否是内部线程(inEventLoop)(子类SingleThreadEventExecutor只创建一条线程),如果是,直接添加--》单线程访问 * 否则将添加的代码转换为Runnable,并调用线程池的execute方法--》添加到另一任务队列,内部线程轮询队列,执行 * Runnable--》实现添加ScheduledFutureTask到PriorityQueue中--》单线程访问 * 即外部线程转换为内调线程调用--》单线程添加任务到PriorityQueue中。 */ <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; } /** * 此类还提供了一些对调度队列查询的功能 **/ protected final Runnable pollScheduledTask(); protected final long nextScheduledTaskNano(); final ScheduledFutureTask<?> peekScheduledTask(); protected final boolean hasScheduledTasks() ; } /** * 启用单线程处理NIO及内部任务 **/ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //执行线程池,默认入参为: private final Executor executor; //任务队列 初始值=new LinkedBlockingQueue<Runnable>(maxPendingTasks); private final Queue<Runnable> taskQueue; //本类启动一条线程,并把线程赋值于此属性,用于inEvenLoop方法,判断执行方法线程是否为本线程 Thread.currentThread == this.thread? private volatile Thread thread; public boolean inEventLoop(Thread thread) { return thread == this.thread; } //线程中断标志 private volatile boolean interrupted; /** * 对于任务队列的一些操作:添加,移除等 *// protected void addTask(Runnable task) ; final boolean offerTask(Runnable task) ; protected boolean removeTask(Runnable task) ; protected Runnable pollTask() ; /** * 执行任务 * **/ protected boolean runAllTasks() { assert inEventLoop();//断言是在由内部线程执行 boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue();//调度队列任务====》执行队列 if (runAllTasksFrom(taskQueue)) {//执行队列-->任务--》执行 ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; } /** * 从父类调度任务队列中,获取所有需要执行的调度任务,(取调度时间最近的,并且已未超过预计执行的时间点的任务) * 添加到执行队列taskQueue中 * 调度队列====》执行队列 * 调度队列:按时间排序,未到时间点不取出 * 执行队列:当前需要被执行的任务 **/ private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; } /** * 从执行队列中取出所有任务执行 **/ protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { safeExecute(task);//直接调用Runnable.run方法 task = pollTaskFrom(taskQueue); if (task == null) { return true; } } } /** * Execute 接口的Execute方法,只是将任务添加到执行队列中 * 如果线程未启动,则启动线程 **/ public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) {//内部线程添加,直接存入执行队列 addTask(task); } else { startThread();//未启动线程就启动 addTask(task);//添加任务到执行队列 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } /** * 未启动线程就启动 **/ private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } } /** * 启动线程 * executor 对象默认为:ThreadPerTaskExecutor * 创建一个新的线程并启动 *public void execute(Runnable command) { * threadFactory.newThread(command).start(); *} * * */ private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { try { SingleThreadEventExecutor.this.run();//由子类NioEventLoop实现 success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { } } }); } public final class NioEventLoop extends SingleThreadEventLoop { NioEventLoop:字面上的意思为:Nio事件循环,即对NIO的事件循环处理。 初始化时,实现对jdk nio的优化。 它持有: Selector selector; //优化JDKnio 时,通过反射获取 jdk内部的selectedKeys,也就是Selector.select 之后,有相应的事件,则 //selectedKeys 就有值了,而不需要通过Selector.selectedKeys()获取了。 //SelectedSelectionKeySet 内部例用两个数组实现存储多个SelectKey,写入-A数据,读取A数组,写入B数组,读取B数组,写入A数组。。。切换进行 private SelectedSelectionKeySet selectedKeys; private final SelectorProvider provider; //在selector上注册channel及IO操作类型,当IO操作发生时,NioTask 被触发 public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task){ ch.register(selector, interestOps, task); } 主要职责: 1.循环调用到selector.select()方法,对已注册的channel,监控相应的感兴趣的(interestOps)网络事件 2.处理IO任务 每一个NioEventLoop启动一个线程,线程内调用run方法。 ioRatio io操作与任务的时间比例 ,默认为各为 50% @Override protected void run() { for (;;) {//无限循环 try { //selectStrategy 默认为DefaultSelectStrategy , //没有任务时(hasTasks()==false)?调用select.select()阻塞方法:否则调用selector.selectNow()方法立即返回(calculateStrategy方法内部) // switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys();//处理IO事件 } finally { // Ensure we always run tasks. runAllTasks();//执行任务:见上面执行任务 } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //见上面执行任务,区别在于增加执行时间限制 } } } catch (Throwable t) { handleLoopException(t); } //忽略 isShutDown代码 } } /** * 处理每一个SelectKey 对应的IO事件 **/ private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } } } /** * **/ private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);//移除OP_CONNECT unsafe.finishConnect();//完成socket连接 } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush();//flush 缓存 } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();//完成读取操作 } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) { int state = 0; try { task.channelReady(k.channel(), k); state = 1; } catch (Exception e) { k.cancel(); invokeChannelUnregistered(task, k, e); state = 2; } finally { switch (state) { case 0: k.cancel(); invokeChannelUnregistered(task, k, null); break; case 1: if (!k.isValid()) { // Cancelled by channelReady() invokeChannelUnregistered(task, k, null); } break; } } } /** *实现 channel 事件的注册 *// public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) { }
相关推荐
Netty4.1的源码,欢迎大家下载。.............................................................................................................................................................................
这个“netty4.1源码”压缩包包含的是Netty框架4.1版本的源代码,对于深入理解Netty的工作原理、性能优化以及自定义功能扩展非常有帮助。 Netty的核心特性包括: 1. **异步非阻塞I/O**:Netty基于Java NIO(非阻塞I...
这个"Netty-4.1 源码包"包含了Netty框架的源代码,允许开发者深入理解其内部工作原理,优化自定义实现,或者排查问题。 在Netty 4.1 版本中,主要包含以下关键知识点: 1. **NIO (Non-blocking I/O)**: Netty 使用...
在本文中,我们将深入探讨 Netty 4.1 的中文API帮助文档和用户指南,以及如何利用这些资源来提升你的网络编程技能。 首先,Netty 4.1 中文API帮助文档是理解 Netty 内部机制的关键工具。它包含了详细的类、接口、...
这个“netty 4.1 中文.CHM”文件是一个压缩包,包含的是Netty 4.1版本的中文版帮助文档,对于开发者来说是一个非常宝贵的资源,特别是对于那些中文为母语的开发者,它提供了方便的理解和学习Netty的途径。...
Netty-4.1.97.Final源码提供了对Netty内部机制的深度洞察,对于Java程序员尤其是希望提升网络编程能力或进行定制化开发的人来说,是一份极其宝贵的资料。 首先,让我们从整体上了解Netty的架构设计。Netty采用了...
赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect
这个压缩包包含的是Netty 4.1的英文API帮助文档和用户指南,对于理解和使用Netty框架非常有帮助。 首先,我们来看`netty 4.1.CHM`文件,这是一个CHM(Compiled Help Manual)格式的帮助文档,通常包含了详细的API...
赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...
赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...
在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...
标题 "netty-netty-4.1.32.final-remark.zip" 提到了 Netty 的版本号 4.1.32.Final,这表明这是一个关于 Netty 4.1.32.Final 版本的资料包。"final" 表示这是该版本的最终发布,通常意味着经过了充分测试和稳定。...
这个“netty-netty-4.1.79.Final.tar.gz”文件是一个包含Netty 4.1.79.Final版本的压缩包,通常用于Java开发环境。解压后,我们可以得到Netty的源代码、库文件和其他相关资源。 Netty的核心特性包括: 1. **异步...
netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》源码 ...
netty案例,netty4.1中级拓展篇一《Netty与SpringBoot整合》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724796&idx=1&sn=ce5dc3c913d464b0e2e4e429a17bb01e&scene=19#wechat_redirect
netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...
这个“netty-netty-4.1.69.Final.tar.gz”文件是Netty的最新稳定版本,版本号为4.1.69.Final,它是一个压缩包文件,通常包含源码、编译后的类库、文档和其他相关资源。 Netty的核心特点包括: 1. **异步事件驱动**...
netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724927&idx=1&sn=a16bc8e98d6a27816da0896adcc83778&scene=19#wechat_redirect