`
海浪儿
  • 浏览: 274109 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty4服务端启动源码分析-线程的创建

阅读更多

本文为原创,转载请注明出处

netty4服务端启动源码分析-线程的创建

 

本文分析Netty中boss和worker的线程的创建过程:

以下代码是服务端的启动代码,线程的创建就发生在其中。

EventLoopGroup bossGroup = new NioEventLoopGroup();

 

NioEventLoopGroup的类关系图如下:



 构造方法执行过程如下:

 

// NioEventLoopGroup
public NioEventLoopGroup() {
        this(0);
    }

public NioEventLoopGroup(int nThreads) {
        this(nThreads, null);
    }

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
        super(nThreads, threadFactory, selectorProvider);
    }

 看下父类MultithreadEventLoopGroup的构造方法

// MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

 注:如果没有指定创建的线程数量,则默认创建的线程个数为DEFAULT_EVENT_LOOP_THREADS,该数值为:处理器数量x2

 

再来看MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup的构造方法

 

 /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }

        children = new SingleThreadEventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(threadFactory, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }

 变量children就是用来存放创建的线程的数组,里面每一个元素都通过children[i] = newChild(threadFactory, args)创建。而newChild方法则由子类NioEventLoopGroup实现

 

// NioEventLoopGroup
  protected EventExecutor newChild(
            ThreadFactory threadFactory, Object... args) throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
    }

 每个元素的真实类型为NioEventLoop,而NioEventLoop的类关系图如下

 



 (注:其实有点变态的,譬如EventLoop继承EventLoopGroup,不知道是啥原因,仅仅是因为EventLoop里有个方法parent(),返回EventLoopGroup这个功能吗?后续待确认)

 

接着看NioEventLoop的构造函数:

 

// NioEventLoop
 NioEven NioEventLoop tLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
        super(parent, threadFactory, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }

 首先分析一下selector = openSelector()

 

 

// NioEventLoop
 private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }

        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
            selectorImplClass.isAssignableFrom(selector.getClass());
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

        return selector;
    }

       这里对sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys做了优化,NioEventLoop中的变量selectedKeys的类型是SelectedSelectionKeySet,有哪些优化呢?(内部用两个数组存储?初始分配数组大小置为1024避免频繁扩容?当大小超过1024时,对数组进行双倍扩容?)。

       利用反射,当注册到selector中的selectionKey已准备就绪时,selectedKeys中的元素就不会为空,后面会根据selectedKeys进行分发。

 

最后分析super(parent, threadFactory, false),即父类SingleThreadEventExecutor的构造函数

 /**
     * Create a new instance
     *
     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
     * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
     *                          executor thread
     */
    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }

        this.parent = parent;
        this.addTaskWakesUp = addTaskWakesUp;

        thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    if (state < ST_SHUTTING_DOWN) {
                        state = ST_SHUTTING_DOWN;
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error(
                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            synchronized (stateLock) {
                                state = ST_TERMINATED;
                            }
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });

        taskQueue = newTaskQueue();
    }

    /**
     * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
     * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
     * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
     * implementation that does not support blocking operations at all.
     */
    protected Queue<Runnable> newTaskQueue() {
        return new LinkedBlockingQueue<Runnable>();
    }

 boss线程就在此处创建:thread = threadFactory.newThread(new Runnable()

同时也创建了线程的任务队列,是一个LinkedBlockingQueue结构。

SingleThreadEventExecutor.this.run()由子类NioEventLoop实现,后面的文章再进行分析

 

总结:

EventLoopGroup bossGroup = new NioEventLoopGroup()发生了以下事情:

      1、 为NioEventLoopGroup创建数量为:处理器个数 x 2的,类型为NioEventLoop的实例。每个NioEventLoop实例 都持有一个线程,以及一个类型为LinkedBlockingQueue的任务队列

      2、线程的执行逻辑由NioEventLoop实现

      3、每个NioEventLoop实例都持有一个selector,并对selector进行优化。

  • 大小: 22.3 KB
  • 大小: 27.6 KB
分享到:
评论
8 楼 kyorisvc2012 2018-01-18  
EventLoopGroup和EventLoop的继承关系感觉就是为了形成一个组合模式
7 楼 至尊宝_唯一 2016-10-25  
还是没有能理解selectedKeys的优化。。。
6 楼 zhouliangyc1121 2016-05-30  
受教啦 
5 楼 congqingbin 2014-08-23  
4.0.9包下载下来后怎么看源代码呢?
4 楼 Peng-Lee 2014-01-11  
写得不错,楼主
3 楼 海浪儿 2013-12-03  
谢谢
2 楼 rentaoshu 2013-12-03  
  好文,有种相见恨晚的感觉
1 楼 lxzh504 2013-08-16  
呵呵,这么多人浏览过就没有人评论?这么好的文章。顶一个。

相关推荐

    netty源码分析之服务端启动全解析

    - ServerBootstrap:用于配置和启动Netty服务端的辅助类,是Bootstrap的子类。 - Channel:是Netty网络操作的抽象,代表一个打开的连接。 - EventLoop:负责处理网络事件的线程,它与Channel绑定,并在一个循环中...

    netty源码深入分析

    《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...

    使用jboss netty 创建高性能webservice客户端及服务端

    最后,标签中的"源码"提示我们要关注Netty的底层实现,通过阅读和分析源码,我们可以学习到更多关于网络编程的高级技巧。而"工具"则表明Netty是一个强大的工具,可以帮助开发者快速、高效地完成网络应用的开发。对于...

    Netty框架网络编程实战-Netty_chat.zip

    - **ServerBootstrap**:服务器启动器,专门用于创建服务端的Channel。 - **ChannelHandler**:处理器,定义了事件处理逻辑,可以是处理器链。 - **ChannelPipeline**:管道,用于管理多个ChannelHandler,事件会...

    socket长连接,netty服务器与android源码

    Netty的核心是其NIO(非阻塞I/O)模型,它允许在一个线程中处理多个连接,大大提升了系统的并发能力。Netty提供了丰富的API和预定义的协议编解码器,使得网络编程变得更加简单。 在Android客户端使用Socket通信时,...

    netty源码 4.*版本

    8. **Bootstrap** 和 **ServerBootstrap**:Bootstrap 用于创建客户端连接,ServerBootstrap 用于启动服务端监听。 9. **Future** 和 **Promise**:Netty 的 Future 和 Promise 用于异步结果的处理,它们提供了丰富...

    Netty3.x 源码解析

    由于Netty源码较庞大,所以文章建议通过分析几个关键组件来构建对整个框架的理解,从而避免在庞大的源码海洋中迷失方向。通过理解Netty的关键点,包括NIO的使用、事件处理机制、ChannelPipeline的运作以及Handler的...

    高清Netty5.0架构剖析和源码解读

    NIO客户端13 3.Netty源码分析16 3.1. 服务端创建16 3.1.1. 服务端启动辅助类ServerBootstrap16 3.1.2. NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类...

    以netty4.1源码中的EchoServer为例对netty的源码进行分析.docx

    在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...

    netty源码解析视频

    #### 五、Netty源码分析实战案例 1. **ChannelHandlerContext与ChannelHandlerAdaptor详解**: - 分析`ChannelHandlerContext`的生命周期及其与`ChannelHandler`之间的交互方式。 - 深入理解`...

    java Netty 框架例子源码.rar

    Java Netty 是一个高性能、异步...通过分析这些示例源码,我们可以深入理解 Netty 的工作原理,掌握如何在实际项目中构建高性能的网络应用。同时,这也将有助于我们了解如何处理异常、实现安全性以及优化网络通信性能。

    Netty源码剖析与实战配套代码.zip

    这个"Netty源码剖析与实战配套代码.zip"文件包含了对Netty框架的深入源码分析和实战应用示例。下面将详细探讨Netty的核心概念和关键功能,以及它如何在实际项目中发挥作用。 1. **Netty概述** - Netty是由JBOSS...

    javanetty源码-java_study-:netty+java(源代码参考)

    通过阅读和分析Netty源码,开发者能够更好地理解其内部工作原理,优化网络应用,以及定制适合自己项目的特性和功能。对于Java开发者来说,深入学习Netty源码不仅可以提升网络编程技能,也有助于解决实际问题,比如...

    netty5.0架构剖析和源码解读

    ***ty源码分析 Netty源码的分析主要围绕服务端和客户端的创建、读写操作等方面进行。服务端的启动涉及到ServerBootstrap类,以及NioServerSocketChannel的注册,新的客户端接入,客户端连接的建立通过Bootstrap类...

    netty权威指南例子源码(第2-7章)

    - Bootstrap是客户端启动器,ServerBootstrap是服务端启动器,它们都是Netty应用的入口,用于配置并启动I/O线程、管道工厂等。 3. **Channel与ChannelHandler**: - Channel是I/O操作的载体,可以进行读写操作。 ...

    netty视频详解(90节大长篇)

    - **源码分析**: - **初始化过程**:分析如何创建Channel、EventLoop等组件,并配置ChannelPipeline。 - **事件循环机制**:深入了解EventLoop的工作原理,包括如何调度任务、处理I/O事件等。 - **编解码器设计*...

Global site tag (gtag.js) - Google Analytics