`
yychao
  • 浏览: 98401 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ThreadPoolExecutor分析

 
阅读更多

ThreadPoolExecutor 状态:

 

RUNNING:接受新任务,并处理队列的任务

SHUTDOWN:不接受新任务,但是处理等待队列的任务

STOP:不接受新任务,不处理等待队列的任务,并且中断正在执行的任务

TERMINATED:和STOP相同,并附加所有线程已终结

 

状态转换:

 

RUNNING --> SHUTDOWN :  调用shutdown(),或者再finalize()方法中隐式调用shutdown();

 

RUNNING/SHUTDOWN --> STOP : 调用shutdownNow()

 

SHUTDOWN --> TERMINATED : 队列和线程池都为空

 

STOP --> TERMINATED : 线程池为空

 

ThreadPoolExecutor任务执行步骤:

 


 

ThreadPoolExecutor 关闭操作:

 

shutdown(): 修改runStatus=SHUTDOWN,并中断空闲线程(等待getTask返回)并退出;若没有任何线程或任务,则调用tryTerminate尝试关闭线程池;否则,最后一个worker线程关系线程池

 

步骤:

 

  1. SecurityManager检查是否有关闭线程权限
  2. 如果通过第一步,检测SecurityManager对每个线程是否特殊对待,通过检查后,修改runStatus=SHUTDOWN
  3. 如果1、2都通过但interrupt()线程时抛出SecurityException,此时,需要恢复runStatus状态;一些线程再non-shutdown状态被关闭,此时可能通过tryTerminate启动一个worker线程

PS:shutdown通过interruptIfIdle()终止空闲线程

 

shutdownNow():

 

与shutdown()不同点:

 

  1. runStatus设置为STOP
  2. 所有的工作线程被中断,而非空闲线程
  3. drain等待队列,并返回任务列表

Worker线程执行:

 

        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
    }

worker线程执行步骤:

 

  1. worker执行第一个提交任务,or从任务队列中获取任务(取得任务or等待任务返回)
  2. 执行任务,执行完毕后;重新获取任务
  3. 从getTask()返回null,线程终结;执行workerDone清理线程

任务获取:

 

    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
 

任务获取步骤:

 

  1. runStatus > SHUTDOWN时,返回null;终止线程
  2. runStatus == SHUTDOWN,workQueue.poll()返回任务队列任务
  3. poolSize > corePoolSize || allowCoreThreadTimeOut时,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),等待任务keepAliveTime时间
  4. 否则,等待任务队列workQueue.take()

如果上述2、3、4返回结果不为null,则返回任务;

否则判断线程时候可以退出:

 

    private boolean workerCanExit() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try {
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        } finally {
            mainLock.unlock();
        }
        return canExit;
    }

 任务获取退出:

 

  • runState >=STOP 或者 workQueue.isEmpty() 或者(allowCoreThreadTimeOut &&poolSize > Math.max(1, corePoolSize))
  • 当workerCanExit()为true 且runState >= SHUTDOWN时,唤醒空闲线程interruptIdleWorkers
  • 最后返回null 工作线程退出

线程执行任务:

 

 

        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * Ensure that unless pool is stopping, this thread
                 * does not have its interrupt set. This requires a
                 * double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }
 

任务执行之前、之后回调方法:beforeExecute、afterExecute,方便用户扩展

 

线程退出清理工作:

 

 

    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

 线程退出处理步骤:

 

  1. 收集已完成任务数
  2. 线程缓存队列清理工作线程
  3. poolSize ==0时,tryTerminate()尝试关闭线程池
线程池终止操作:
    private void tryTerminate() {
        if (poolSize == 0) {
            int state = runState;
            if (state < STOP && !workQueue.isEmpty()) {
                state = RUNNING; // disable termination check below
                Thread t = addThread(null);
                if (t != null)
                    t.start();
            }
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }
 线程池终止操作 在三个点调用:
  1. 最后一个线程退出的workerDone()中调用
  2. 直接调用shutdown(),且没有存活线程时
  3. 或者直接调用shutdownNow(),且没有存活线程时
线程池终止操作步骤:

  1. state < STOP && !workQueue.isEmpty(),设置state=RUNNING(关闭线程池终止操作),并新增一个活动线程,保证任务队列处理
  2. state == STOP || state == SHUTDOWN时,转换runState=TERMINATED;并回调terminated()方法

terminated()方法作为回调接口,可以扩展此接口,再线程池终止时,扩展操作

 

 

分享到:
评论

相关推荐

    12、线程池ThreadPoolExecutor实战及其原理分析(下)

    线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...

    ThreadPoolExecutor源码解析.md

    ThreadPoolExecutor源码解析.md

    JDK之ThreadPoolExecutor源码分析1

    《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...

    线程池ThreadPoolExecutor底层原理源码分析

    线程池ThreadPoolExecutor底层原理源码分析

    线程池ThreadPoolExecutor原理源码分析.md

    ### 线程池 `ThreadPoolExecutor` 原理源码分析 #### 一、概述 线程池作为 Java 并发编程中的重要组件,在实际应用中被广泛使用。其核心类 `ThreadPoolExecutor` 实现了对线程的管理、调度等功能。本文将围绕 `...

    11、线程池ThreadPoolExecutor实战及其原理分析(上)

    线程池ThreadPoolExecutor实战及其原理分析(上)

    Python线程池模块ThreadPoolExecutor用法分析

    Python线程池模块`ThreadPoolExecutor`是Python标准库`concurrent.futures`的一部分,它提供了一种方便且高效的方式来管理线程,以便并发地执行多个任务。线程池的概念允许我们预先创建一组线程,然后根据需要分配...

    java 中ThreadPoolExecutor原理分析

    "java 中ThreadPoolExecutor 原理分析" ThreadPoolExecutor 是 Java 并发编程中的一种高级线程池实现,它提供了一个灵活的线程池管理机制,允许开发者根据需要配置线程池的参数以满足不同的需求。在这篇文章中,...

    11-线程池ThreadPoolExecutor底层原理源码分析(上)-周瑜.pdf

    根据提供的文件信息,我们可以深入探讨线程池`ThreadPoolExecutor`的工作原理及其实现细节,同时也会涉及并发编程中的一些关键概念和技术。 ### 线程池`ThreadPoolExecutor`概述 `ThreadPoolExecutor`是Java中非常...

    12-线程池ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf

    ### 二、线程池的五种状态变换源码分析 线程池状态的变化主要由以下几个状态表示: - **RUNNING**:线程池接受新的任务并将队列中的任务提交给worker线程。 - **SHUTDOWN**:不再接受新任务,但继续处理队列中的...

    使用线程池ThreadPoolExecutor 抓取论坛帖子列表

    本文将详细讲解如何使用Java中的`ThreadPoolExecutor`来抓取论坛帖子列表,结合源码分析和实用工具的应用。 首先,我们要了解线程池的基本原理。线程池是由一组预先创建的线程组成的,这些线程可以复用,而不是每次...

    framework-analysis:对使用Hibernate出现的问题进行分析;Spring、MyBatis、AQS、ThreadPoolExecutor、CountDownLatch核心源码分析

    于是乎到现在的Hibernate、MyBatis、Spring、Spring MVC、AQS、ThreadPoolExecutor、CountDownLatch使用场景和核心源码分析。 感觉自己还是真的菜鸡,有太多框架的底层实现都不怎么了解。 当山头被一座一座攻克时,...

    Java进阶之ThreadPoolExecutor

    Java线程池使用无外乎如下几种:  · 使用自定义ThreadPoolExecutor  · 使用Executors....我们来分析下ThreadPoolExecutor的构造参数以及内部实现。  构造参数  ThreadPoolExecutor完整的构造方法如下(其

    线程池源码分析

    线程池ThreadPoolExecutor的源码分析,含中文注释,深入了解线程池的构造

    designpattern.zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    forkjoin.zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    disruptor.zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    jmm(1).zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    JUC并发编程与源码分析视频课.zip

    4. **Executor框架**:介绍ExecutorService、ThreadPoolExecutor和ScheduledExecutorService等,理解任务调度和线程池的管理,学习如何合理配置线程池参数以优化系统性能。 5. **并发工具类**:包括CountDownLatch...

Global site tag (gtag.js) - Google Analytics