阅读本文之前,你需要对java的线程池有一定的了解,因为这里不会过多的讲解。
今天我们主要的任务就是看下netty中一个非常重要的类EventLoop,通过这系列文章,你应该了解EventLoop适用的场景,不会滥用它而导致你的应用缓慢。Netty使用了典型的Reactor模型结构,这其中一个很重要的角色就是EventLoop,它使用循环的方式来处理IO或者其他事件。
上图是EventLoop的接口继承关系,其中Executor、ExecutorService、ScheduledExecutorService是java提供的线程池管理接口:
ScheduledExecutorService:提供执行计划任务的接口;
EventExecutorGroup:提供管理EventExecutor的能力,他通过next()来为任务分配执行线程,同时也提供了shutdownGracefully这一优雅下线的接口;
shutdownGraceFully | 优雅关闭(何为优化关闭后面会介绍) |
isShuttingDown | 其管理的所有EventExecutor是否关闭 |
terminationFuture | 返回接收该线程池彻底关闭事件的Future |
children | 含所有管理的EventExecutor |
next | 通过该方法来为任务分配一个EventExecutor |
EventExecutor:实际的事件执行者
parent | 管理它的EventExecutorGroup |
isEventLoop | 当前线程与EventExecutor的执行线程是否是同一个线程,如果是则此处返回true |
newPromise | 创建一个Promise,由该EventExecutor来执行Promise中的listener |
EventLoopGroup: EventLoopGroup和EventLoop的关系与EventExecutorGroup和EventExecutor的关系类似
register | 将channel注册到该EventLoopGroup,注册后EventLoop会负责该channel的相关io事件 |
EventLoop:处理所有的IO操作。EventLoop继承了EventLoopGroup接口,可以被当做一个single的线程池看到(虽然模式差不多,但其实和java的single线程池区别很大)。
我们找一个最常用的EventLoop实现类来介绍:NioEventLoop。介绍它之前我们得先介绍NioEventLoopGroup,一个连接被它分配到对应的NioEventLoop并进行一系列的后续操作。先看看NioEventLoopGroup的构造函数,最终调用的是下面这个构造方法:
private MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, boolean shutdownExecutor, Object... args) { if (nEventExecutors <= 0) { throw new IllegalArgumentException( String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors)); } if (executor == null) { executor = newDefaultExecutorService(nEventExecutors); shutdownExecutor = true; } // 根据nEventExecutors确定EventExecutor的数量 children = new EventExecutor[nEventExecutors]; // 用了两种不同的方式来为一个任务分配EventExecutor,。 // 两种实现结果是相同的,但是第一种利用的位运算,相对效率更高点。。。 // 具体的实现是从children的第一个开始获取,从0->size-1依次取child,到达最后一个后回到第一个child,最终形成一个环形数组。 if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } // 开始初始化每个EventExecutor for (int i = 0; i < nEventExecutors; i ++) { boolean success = false; try { // 实际的初始化由子类自己实现,如NioEventLoopGroup的实现为: // return new NioEventLoop(this, executor, (SelectorProvider) args[0]); children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 如果初始化的过程中发生异常,则将初始化好的EventExecutor全部关闭 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; // 等待关闭完成 try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } final boolean shutdownExecutor0 = shutdownExecutor; final Executor executor0 = executor; final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { // 最后一个关闭完成则标记future完成 if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); if (shutdownExecutor0) { // This cast is correct because shutdownExecutor0 is only try if // executor0 is of type ExecutorService. ((ExecutorService) executor0).shutdown(); } } } }; // 下面的代码比较简单,不过多介绍 xxxxxxxxxxxxxxxxxxxxxxxx } // 看看这两个实现类的差异,这效率扣得不要不要的啊! private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; } } private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } }
我们在使用NioEventGroupLoop的时候,一般都是直接使用默认构造方法,此时第一个参数nEventExecutors=cpu核数 x 2。NioEventGroupLoop中有很大部分的io操作,这个默认值比较靠谱,不用用户再去修改。
private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
构造方法的第二个参数executor,它是执行EventExecutor中的任务的实际线程池。默认使用的是netty实现的ForkJoinPool(比较复杂,有空再回过头来分析)。可以看出NioEventLoop本身是不负责线程的创建销毁的,他把执行逻辑封装在Runnable中交给executor处理,这里的模型和netty4已经不太一样,4的EventLoop对应一个固定线程,而5的EventLoop并未固定到一个线程。这也是我困惑的地方,executor的线程数与EventLoop个数相同,能保证每个EventLoop都有线程去执行,但是每个EventLoop不再是固定的Thread了,它带来的问题是一些ThreadLocal的cache可能会失效。不知道为何会这样设计,先在这留个疑问吧,等release版本出来了再看看。
构造方法的最后一个参数args[0]=SelectorProvider.provider(); SelectorProvider根据不同的操作系统创建出对应的provider,如linux下创建的是sun.nio.ch.EPollSelectorProvider。该参数在NioEventLoop初始化的时候被传入,用于创建Selector(这里有一篇Selector的介绍)。
往EventLoopGroup中提交一个任务,实际上就是交给其child(即EventLoop)处理:
public <T> Future<T> submit(Callable<T> task) { return next().submit(task); } public EventExecutor next() { return chooser.next(); } // next方法的其中一个实现,这样可以保证每个child中处理的连接数基本相同 public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; }
EventLoopGroup提供了一个注册Channel(表示一个连接)的接口:
public ChannelFuture register(Channel channel) { return next().register(channel); }
就一句代码,背后隐藏的信息却非常重要:连接在注册时就绑定了一个固定的EventLoop,绑定的方式为将channel注册到EventLoop所在的Selector,此连接的读写及其他相关操作(如编码解码、超时管理)都交给了这个EventLoop;因此正常情况下一个连接涉及到的方法(如读/写/编解码/超时管理)都在一个EventLoop中进行,意味着所有这些操作都是线程安全的。还记得netty3吗,超时管理是交给HashedWheelTimer进行管理的,由于超时任务和读写任务是在不同的线程执行,如果超时的同时读入或写出数据也到达,可能会产生非预期的效果。然而由于线程模型的修改,虽然能保证线程安全,但却不再保证这些操作都在一个线程里,这种情况下ThreadLocal的相关功能可能会失效。
这里又引申出一个问题,由于一个连接是和一个EventLoop绑定的,如果EventLoop中存在一个执行时间很长的任务,那该EventLoop后续的所有连接都会被hold住得不到处理,因此应用不要在handler中添加会阻塞或者执行时间很长的操作。网上看到很多文章说是把业务操作放到io线程里去执行,导致系统处理慢甚至挂掉,希望看了这篇文章的同学不要犯同样的错。
/** * 设置I/O操作在EventLoop中占的时间比,(0-100),默认为50,即执行I/O的时间与非I/O的时间相同 */ public void setIoRatio(int ioRatio) { for (EventExecutor e: children()) { ((NioEventLoop) e).setIoRatio(ioRatio); } } /** * 当epoll占用100%时(早期jdk的bug,不知道现在彻底处理了没),使用此方法来用新的Selector替代老的EventLoop中的Selector */ public void rebuildSelectors() { for (EventExecutor e: children()) { ((NioEventLoop) e).rebuildSelector(); } }
好了,到这里NioEventLoopGroup的基本功能就这么多,实现不复杂,但初始化使用的几个默认值需要关注。看了这里你是不是大概明白了上一篇文章中那幅图的意思了。不过我还是准备强调下这几点(重要的事情多说一遍):
1、NioEventLoopGroup下默认的NioEventLoop个数为cpu核数 * 2,因为有很多的io处理;
2、NioEventLoop和java的single线程池在5里差异变大了,它本身不负责线程的创建销毁,而是由外部传入的线程池管理。后面的文章会介绍,他的处理逻辑都封装到Runnable中了;
3、channel和EventLoop是绑定的,即一旦连接被分配到EventLoop,其相关的I/O、编解码、超时处理都在同一个EventLoop中,这样可以确保这些操作都是线程安全的,而不像netty3中可能会出现非预期的执行结果。但和netty4不同的是netty5中并不能保证一个连接的所有操作在同一个线程中,因此和ThreadLocal相关的功能可能会失效(比如内存池的PoolThreadCache在这种情况下无法达到最佳效果)。
补充:
关于executor的引入的一个讨论:https://github.com/netty/netty/issues/2250,从这个问题中,我们可以大概了解为什么netty5要这么改:
1、希望能留给用户更多空间来定制化I/O的执行
2、希望用到fork/jion框架的stealing机制,避免因个别连接问题导致整个任务链阻塞。目前还在思考如何修改netty的架构来达到这个目的。
3、目前的默认实现能够保证效率和之前的一样,能保证线程安全,但内存池之类的效率受到了挑战,这个也是这次改动需要考虑的。
如果最终这个改动成功,那么netty可能的变化:
1、整个线程模型改变;
2、用户可以自定义线程池的实现;
3、内存池的相应修改;
4、一个连接的阻塞不会影响其他连接(如果大量连接阻塞就没办法了);
5、有可能可以直接在netty的线程池中执行长任务(执行时间长),而不用对业务处理单独开连接池。
6、一个连接的操作会保证线程安全,但不一定是在同一个线程中执行,因此如果有在I/O handler中使用ThreadLocal的同学,可以提前想下应对方案。
...等等...
想想有点小激动,不过挑战挺多的,是一次很大的底层改动。仰望大神!
相关推荐
赠送jar包:netty-codec-http2-4.1.74.Final.jar; 赠送原API文档:netty-codec-http2-4.1.74.Final-javadoc.jar; 赠送源代码:netty-codec-http2-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-codec-http2-4.1.73.Final.jar; 赠送原API文档:netty-codec-http2-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-http2-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:netty-codec-mqtt-4.1.73.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-http-4.1.27.Final.jar; 赠送原API文档:netty-codec-http-4.1.27.Final-javadoc.jar; 赠送源代码:netty-codec-http-4.1.27.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-native-unix-common-4.1.74.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.74....
赠送jar包:netty-transport-classes-epoll-4.1.74.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.74.Final-sources.jar;...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-codec-dns-4.1.65.Final.jar; 赠送原API文档:netty-codec-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-codec-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-dns-...
赠送jar包:netty-transport-native-unix-common-4.1.68.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.68....
赠送jar包:netty-transport-rxtx-4.1.74.Final.jar; 赠送原API文档:netty-transport-rxtx-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-rxtx-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:netty-codec-haproxy-4.1.73.Final.jar; 赠送原API文档:netty-codec-haproxy-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-haproxy-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-redis-4.1.73.Final.jar; 赠送原API文档:netty-codec-redis-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-redis-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-codec-http-4.1.68.Final.jar; 赠送原API文档:netty-codec-http-4.1.68.Final-javadoc.jar; 赠送源代码:netty-codec-http-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-resolver-dns-4.1.73.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.73.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-mqtt-4.1.74.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.74.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-tcnative-classes-2.0.46.Final.jar; 赠送原API文档:netty-tcnative-classes-2.0.46.Final-javadoc.jar; 赠送源代码:netty-tcnative-classes-2.0.46.Final-sources.jar; 赠送Maven依赖信息...