`

Netty 4.x学习笔记 – 线程模型

阅读更多

转自http://hongweiyi.com/2014/01/netty-4-x-thread-model/

Netty 4.x学习笔记 – 线程模型

1、前言

前面两篇学习笔记已经说完了ByteBufChannel和Pipeline,这篇开始讲讲前面欠的债——线程模型(EventLoop和EventExecutor)。

 

 

2、Netty线程模型

将具体代码实现前,先来谈谈Netty的线程模型。正如许多博客所提到的,Netty采用了Reactor模式,但是许多博客也只是提到了而已,同时大家 也不会忘记附上几张Doug Lee大神的图,但是并不会深入的解释。为了更好的学习和理解Netty的线程模型,我在这里稍微详细的说一下我对它的理解。

Reactor模式有多个变种,Netty基于Multiple Reactors模式(如下图)做了一定的修改,Mutilple Reactors模式有多个reactor:mainReactor和subReactor,其中mainReactor负责客户端的连接请求,并将请求 转交给subReactor,后由subReactor负责相应通道的IO请求,非IO请求(具体逻辑处理)的任务则会直接写入队列,等待worker threads进行处理。

 

Netty的线程模型基于Multiple Reactors模式,借用了mainReactor和subReactor的结构,但是从代码里看来,它并没有Thread Pool这个东东。Netty的subReactor与worker thread是同一个线程,采用IO多路复用机制,可以使一个subReactor监听并处理多个channel的IO请求,我给称之为:「Single Thread with many Channel」。我根据代码整理出下面这种Netty线程模型图:

上图中的parentGroup和childGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程 池,childGroup线程池会被各个subReactor充分利用,parentGroup线程池则只是在bind某个端口后,获得其中一个线程作为 mainReactor。上图我将subReactor和worker thread合并成了一个个的loop,具体的请求操作均在loop中完成,下文会对loop有个稍微详细的解释。 

以上均是Nio情况下。Oio采用的是Thread per Channel机制,即每个连接均创建一个线程负责该连接的所有事宜。

Doug Lee大神的Reactor介绍:Scalable IO in Java

 

3、EventLoop和EventExecutor实现

EventLoop和EventExecutor实现共有4个主要逻辑接口,EventLoop、EventLoopGroup、 EventExecutor、EventExecutorGroup,内部实现、继承的逻辑表示无法直视,有种擦边球的感觉。具体的类图如下:

EventLoopGroup:

主要方法是newChild,我理解为EventLoop的工厂类。**EventLoopGroup.newChild创建**EventLoop对 象。OioEventLoopGroup除外,它没有实现newChild方法,调用父类的并创建ThreadPerChannelEventLoop对 象。

EventLoop:

主要方法是run(),是整个Netty执行过程的逻辑代码实现,后面细说。

EventExecutorGroup:

线程池实现,主要成员是children数组,主要方法是next(),获得线程池中的一个线程,由子类调用。由于Oio采用的是Thread per Channel机制,所以没有实现前面两个。

EventExecutor:

Task的执行类,主要成员是taskQueue以及真正的运行线程对象executor,主要方法是taskQueue操作方法execute、takeTask、addTask等,以及doStartThread方法,后面细说。

 

4、NioEventLoopGroup实现

这里以常用的NioEventLoopGroup为例。NioEventLoopGroup在Bootstrap初始化时作为参数传入构造方法,由于NioEventLoopGroup涉及的代码较多,就不大篇幅的贴代码了,只写流程性的文字或相应类和方法:

mainReactor:

1. Bootstrap.bind(port)

2. Bootstrap.initAndRegister()

    2.1 Bootstrap.createChannel()

根据EventLoop创建相应的Channel,EventLoop从group().next()中获得。

    2.2 Boostrap.init()

初始化Channel,配置Channel参数,以及Pipeline。其中初始化Pipeline中,需要插入 ServerBootstrapAcceptor对象用作acceptor接收客户端连接请求,acceptor也是一种 ChannelInboundHandlerAdapter。

1 p.addLast(new ChannelInitializer<Channel>() {
2     @Override
3     public void initChannel(Channel ch) throws Exception {
4         ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
5                 currentChildAttrs));
6     }
7 });

调用channel的unsafe对象注册selector,具体实现类为AbstractChannel$AbstractUnsafe.register。如下:

1 public final void register(final ChannelPromise promise) {
2     if (eventLoop.inEventLoop()) {  // 是否在Channel的loop中
3         register0(promise);
4     } else // 不在
5         try {
6             eventLoop.execute(new Runnable() {  // EventLoop执行一个任务
7                 @Override
8                 public void run() {
9                    register0(promise);
10                 }
11             });
12         } catch (Throwable t) {
13             // ...
14         }
15     }
16 }

eventLoop.execute(runnable);是比较重要的一个方法。在没有启动真正线程时,它会启动线程并 将待执行任务放入执行队列里面。启动真正线程(startThread())会判断是否该线程已经启动,如果已经启动则会直接跳过,达到线程复用的目的。 启动的线程,主要调用方法是NioEventLoop的run()方法,run()方法在下面有详细介绍:

1 public void execute(Runnable task) {
2     if (task == null) {
3         throw new NullPointerException("task");
4     }
5  
6     boolean inEventLoop = inEventLoop();
7     if (inEventLoop) {
8         addTask(task);
9     } else {
10         startThread();  // 启动线程
11         addTask(task);  // 添加任务队列
12  
13         // ...
14              
15     }
16  
17     if (!addTaskWakesUp) {
18         wakeup(inEventLoop);
19     }
20 }

3. 接收连接请求

由NioEventLoop.run()接收到请求:

    3.1 AbstractNioMessageChannel$NioMessageUnsafe.read()

    3.2 NioServerSocketChannel.doReadMessages()

获得childEventLoopGroup中的EventLoop,并依据该loop创建新的SocketChannel对象。

    3.3 pipeline.fireChannelRead(readBuf.get(i));

readBuf.get(i)就是3.2中创建的SocketChannel对象。在2.2初始化Bootstrap的时候,已经将acceptor处理器插入pipeline中,所以理所当然,这个SocketChannel对象由acceptor处理器处理。

    3.4 ServerBootstrapAcceptor$ServerBootstrapAcceptor.channelRead();

该方法流程与2.2、2.3类似,初始化子channel,并注册到相应的selector。注册的时候,也会调用eventLoop.execute用以执行注册任务,execute时,启动子线程。即启动了subReactor。

subReactor:

subReactor的流程较为简单,主体完全依赖于loop,用以执行read、write还有自定义的NioTask操作,就不深入了,直接跳过解释loop过程。

loop:

loop是我自己提出来的组件,仅是代表subReactor的主要运行逻辑。例子可以参考NioEventLoop.run()。

loop会不断循环一个过程:select -> processSelectedKeys(IO操作) -> runAllTasks(非IO操作),如下代码:

1 protected void run() {
2     for (;;) {   
3         // ...
4         try {
5             if (hasTasks()) { // 如果队列中仍有任务
6                 selectNow();
7             } else {
8                 select();
9                 // ...
10             }
11  
12             // ...
13  
14             final long ioStartTime = System.nanoTime();  // 用以控制IO任务与非IO任务的运行时间比
15             needsToSelectAgain = false;
16             // IO任务
17             if (selectedKeys != null) {
18                 processSelectedKeysOptimized(selectedKeys.flip());
19             } else {
20                 processSelectedKeysPlain(selector.selectedKeys());
21             }
22             final long ioTime = System.nanoTime() - ioStartTime;
23  
24             final int ioRatio = this.ioRatio;
25             // 非IO任务
26             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
27  
28             if (isShuttingDown()) {
29                 closeAll();
30                 if (confirmShutdown()) {
31                     break;
32                 }
33             }
34         } catch (Throwable t) {
35             // ...
36          
37         }
38     }
39 }

就目前而言,基本上IO任务都会走processSelectedKeysOptimized方法,该方法即代表使用了优化的SelectedKeys。除非采用了比较特殊的JDK实现,基本都会走该方法。

selectedKeys在openSelector()方法中初始化,Netty通过反射修改了Selector的selectedKeys成员和publicSelectedKeys成员。替换成了自己的实现——SelectedSelectionKeySet。

从OpenJDK 6/7的SelectorImpl中可以看到,selectedKeys和publicSeletedKeys均采用了HashSet实现。 HashSet采用HashMap实现,插入需要计算Hash并解决Hash冲突并挂链,而SelectedSelectionKeySet实现使用了双 数组,每次插入尾部,扩展策略为double,调用flip()则返回当前数组并切换到另外一个数据。

ByteBuf中去掉了flip,在这里是否也可以呢?

processSelectedKeysOptimized主要流程如下:

1 final Object a = k.attachment();
2  
3 if (a instanceof AbstractNioChannel) {
4     processSelectedKey(k, (AbstractNioChannel) a);
5 } else {
6     @SuppressWarnings("unchecked")
7     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
8     processSelectedKey(k, task);
9 }

在获得attachment后,判断是Channel呢还是其他,其他则是NioTask。找遍代码并没有发现Netty有注册NioTask的行为,同 时也没发现NioTask的实现类。只有在NioEventLoop.register方法中有注册NioTask至selector的行为,便判断该行 为是由用户调用,可以针对某个Channel注册自己的NioTask。这里就只讲第一个processSelectdKey(k, (AbstractNioChannel) a),但代码就不贴了。

和常规的NIO代码类似,processSelectdKey是判断SeletedKeys的readyOps,并做出相应的操作。操作均是unsafe 做的。如read可以参考:AbstractNioByteChannel$NioByteUnsafe.read()。IO操作的流程大致都是:

  • 获得数据
  • 调用pipeline的方法,fireChannel***
  • 插入任务队列

unsafe这名字实在是太难看了,又没有任何逻辑意义,也没讲明白哪里不safe。

执行完所有IO操作后,开始执行非IO任务(runAllTasks)。Netty会控制IO和非IO任务的比例,ioTime * (100 – ioRatio) / ioRatio,默认ioRatio为50。runAllTasks乃是父类SingleThreadExecutor的方法。方法主体很简单,将任务从 TaskQueue拎出来,直接调用任务的run方法即可。

代码调用的是task.run(),而不是task.start()。即是单线程执行所有任务

1 protected boolean runAllTasks(long timeoutNanos) {
2     fetchFromDelayedQueue(); 
3     Runnable task = pollTask();
4     if (task == null) {
5         return false;
6     }
7  
8     // 控制时间
9     final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
10     long runTasks = 0;
11     long lastExecutionTime;
12     for (;;) {
13         try {
14             task.run();
15         } catch (Throwable t) {
16             logger.warn("A task raised an exception.", t);
17         }
18  
19         runTasks ++;
20  
21         // Check timeout every 64 tasks because nanoTime() is relatively expensive.
22         // XXX: Hard-coded value - will make it configurable if it is really a problem.
23         if ((runTasks & 0x3F) == 0) {
24             lastExecutionTime = ScheduledFutureTask.nanoTime();
25             if (lastExecutionTime >= deadline) {
26                 break;
27             }
28         }
29  
30         task = pollTask();
31         if (task == null) {
32             lastExecutionTime = ScheduledFutureTask.nanoTime();
33             break;
34         }
35     }
36  
37     this.lastExecutionTime = lastExecutionTime;
38     return true;
39 }

 

5、总结

以上内容从设计和代码层面总结Netty线程模型的大致内容,中间有很多我的不成熟的思考与理解,请朋友轻拍与指正。

看源码过程中是比较折磨人的。首先得了解你学习东西的业务价值是哪里?即你学了这个之后能用在哪里,只是不考虑场景仅仅为了看代码而看代码比较难以深入理 解其内涵;其次,看代码一定一定得从逻辑、结构层面看,从细节层面看只会越陷越深,有种一叶障目不见泰山的感觉;最后,最好是能够将代码逻辑、结构画出 来,或者整理出思维导图啥的,可以用以理清思路。前面两篇文章思维道路较为清晰,线程模型的导图有一些但是比较混乱,就不贴出来了,用作自己参考,有兴趣 的可以找我要噢。

分享到:
评论

相关推荐

    netty资料.rar

    4. **线程模型**:Netty 的事件循环模型使用单线程处理多个连接,减少了线程上下文切换的开销,提高了系统性能。 5. **强大的API**:Netty 提供了一套简单易用且强大的API,使得编写网络应用变得直观而简洁。 6. *...

    基于Netty网络编程项目实战笔记.7z

    4. **Netty的线程模型**:分析Netty的多线程模型,理解BossGroup和WorkerGroup的区别。 5. **Netty的实战应用**:可能包含Web服务器、聊天室、分布式系统等案例,展示Netty在实际项目中的应用。 6. **性能优化**:...

    Netty学习资料.zip

    这个“Netty学习资料.zip”压缩包包含了韩顺平老师关于 Netty 的一系列教学资源,包括资料、笔记、课件、代码和软件,这些都是深入理解和实践 Netty 技术的重要参考资料。 首先,资料部分可能包含了一些关于 Netty ...

    java多线程tcpsocketserver源码-netty-learning:网络学习

    高度可定制的线程模型 表现 高吞吐量,低延迟 更少的资源消耗 最小化内存拷贝 安全SSL/TLS 支持 建筑学 核 事件模型 通用通信 API 支持零复制的富字节缓冲区 io.netty.buffer, io.netty.common, io.netty.resolver, ...

    Netty全套学习资源(包括源码、笔记、学习文档等)

    笔记可能涵盖 Netty 的安装配置、基本使用、线程模型、缓冲区操作、编解码器的实现以及异常处理等方面,帮助读者逐步建立起对 Netty 的整体认知。 三、Netty 学习文档 文档通常是对 Netty API 的详细解释,包括官方...

    自动netty笔记111

    这个“自动Netty笔记111”可能是某个开发者或学习者记录的一系列关于Netty学习过程中的关键点和理解。下面将详细阐述Netty的相关知识点。 1. **异步事件驱动模型**: Netty采用了非阻塞I/O模型,基于Java NIO(Non...

    极客时间《Netty源码剖析与实战》学习记录-boy-learning-netty.zip

    - **线程模型**:Netty 使用了 EventLoop(事件循环)和 EventLoopGroup(事件循环组)来管理线程,有效降低了线程切换的开销。 2. **Netty 的组件体系** - **Bootstrap**:启动引导类,用于配置和启动服务器或...

    netty源码剖析视频.zip

    此外,还将探讨Netty的线程模型,包括EventLoop的运行机制,以及如何通过配置优化性能。 第二部分,NIO+Netty5各种RPC架构实战演练,此章节将理论与实践相结合,通过多个具体的RPC(远程过程调用)架构实例,展示...

    Netty4.0学习笔记系列之六:多种通讯协议支持

    在本篇Netty4.0学习笔记系列之六中,我们将深入探讨Netty框架如何支持多种通讯协议,以及它在实现高效、灵活的网络通信中的关键特性。Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发服务器和客户端的...

    Netty4.0学习笔记系列之三:构建简单的http服务

    - 接着,配置`EventLoopGroup`,这是Netty的I/O线程模型,通常包括BossGroup(负责接收连接)和WorkerGroup(处理I/O事件)。 - 配置`ChannelInitializer`,在这里可以添加自定义的管道处理器,处理进来的连接请求...

    Netty4.0学习笔记系列之一:Server与Client的通讯

    在本文中,我们将深入探讨Netty 4.0的学习笔记,特别是关于Server与Client之间的通信机制。 首先,我们要理解Netty的核心概念——NIO(非阻塞I/O)。Netty基于Java NIO库构建,它提供了更高级别的API,简化了多路...

    Netty源码分析总结.rar

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的...在分析源码的过程中,我们通常会关注类的设计模式、线程模型、内存管理以及性能优化等方面,这对于提升网络编程和系统架构能力大有裨益。

    NIO+Netty5视频教程与Netty源码剖析视频教程

    这部分教程主要讲解Netty框架的内部工作机制,包括其设计模式、事件驱动模型、线程模型以及缓冲区等核心概念。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...

    集合、NIO、Netty、Thread、MySql、Hive、HBase、Kafka、Spark、Fink等学习笔记.zip

    集合是Java编程中不可或缺的一部分,...这些学习笔记涵盖了Java开发、分布式系统、大数据处理和实时流计算等多个领域,是提升技术能力的好资源。通过深入学习和实践,你可以构建扎实的技术基础,适应不断变化的IT环境。

    study-netty:netty 学习之路

    study-netty netty 学习之路 项目笔记 项目说明 1、所谓BIO编程,就是使用JDK1.4之前的...这实际上就是最简化的reactor线程模型,实际上netty使用也是这种模型,只不过稍微复杂了一点点。 accpetor thread只负责与clie

    study_netty:netty代码学习总结

    "study_netty:netty代码学习总结" 表明这是一个关于 Netty 框架的学习笔记或项目,可能包含作者对 Netty 源码的理解、使用经验和实践案例。描述中的 "netty代码学习总结" 进一步强调了这个项目是关于 Netty 代码的...

    study-netty:学习Netty

    Netty应运而生,它基于NIO(非阻塞I/O)和Boss-Worker线程模型,提供了更高级别的API,简化了网络编程。Netty的核心组件包括Bootstrap、Channel、EventLoopGroup、Pipeline等。 Bootstrap是启动器,用于配置服务器...

    netty-learning:netty 学习记录

    异步事件驱动意味着 Netty 不是通过传统的线程模型来处理网络连接,而是基于事件循环(Event Loop)和事件处理器(Event Handler)的概念。这种方式提高了系统的并发能力,减少了线程上下文切换的开销。 Netty 的...

    Netty-learn:netty学习记录

    这个“Netty-learn”项目显然是一份关于Netty的学习笔记,记录了作者在探索Netty框架过程中的理解和实践。下面将详细阐述Netty的核心概念和重要特性,以及它在网络编程中的应用。 Netty 的主要特点: 1. **异步...

    java-demo:java学习笔记,包含多线程,java IO流,java 阻塞IO,非阻塞IO,netty demo

    在这个"java-demo"项目中,我们可以深入学习Java技术,特别是关于多线程、IO流以及两种不同的IO模型——阻塞IO(BIO)和非阻塞IO。此外,还涉及到Netty框架的应用,这是一个高性能、异步事件驱动的网络应用框架,常...

Global site tag (gtag.js) - Google Analytics