ThreadPoolExecutor 实现了 ExecutorService 接口,从常用的方法来看一下 ThreadPoolExecutor 内部实现的大致流程.
Future submit(Callable task)
这个方法的实现在父类 AbstractExecutorService 里面:
1. 用 Callable task去创建 FutureTask 实例。FutureTask 是 RunnableFuture 接口的实现类,即实现了 Runnable 接口,也实现了 Future 接口。
2. 用 FutureTask 实例调用子类 execute(Runnable command)。
3. 把 FutureTask 实例返回
Future submit(Runnable task, T result)
这个方法的实现也在父类 AbstractExecutorService 里面:
1. 用 Runnable task 和 result 去创建 FutureTask 实例(成功执行 task 后,返回 result)。
2. 用 FutureTask 实例调用子类 execute(Runnable command)。
3. 把 FutureTask 实例返回
void execute(Runnable command)
1. 获取当前线程数量。
2. 如果当前线程数量少于 corePoolSize,调用 addWorker(command, true)去增加新的核心线程。
3. 如果添加核心线程失败(因为并发原因不能再添加核心线程),或当前线程数量大于等于 corePoolSize,把 command 放进 workQueue。
4. 再次检查线程池状态,如果被关闭,从 workQueue 移除 command ,调用 handler.rejectedExecution(command)。如果第2次检查查线程池没有被关闭,并且没有 worker 在工作,则调用 addWorker(null, false) 新增线程。
5. 如果 command 无法放进 workQueue,调用 addWorker(command, false) 增加新的 线程。
6. 新增线程失败(失败可能是因为线程池被关闭或者线程数量已经达到 maximumPoolSize ),则调用 handler.rejectedExecution(command) 。
(源代码往 workQueue 增加task调用的是 offer 方法,也就是说 workQueue 如果已经满了,当前线程不会被blocking。在默认 AbortPolicy 策略下,抛出 RejectedExecutionException。 但是有的时候,我们并不希望得到一个 RejectedExecutionException,而是希望阻塞当前以及后续的线程提交新的 task 。可以实现 customized RejectedExecutionHandler,调用workQueue的put方法以达到这个目的。)
从上面的流程可以看出,只有在 workQueue 满了以后,才会增加非核心的线程。如果 workQueue 的 size 很大,那么很可能一直工作的只有 corePoolSize 数量的线程。
boolean addWorker(Runnable firstTask, boolean core)
1. 获取当前的线程数量,线程池状态
2. 如果状态显示线程池已经被关闭,或者正在被关闭,workQueue 已经空了,则直接返回false(创建新的线程失败)
3. 如果当前是添加核心线程,线程池数量大于等于 corePoolSize,返回false。
4. 如果当前是添加非核心线程,线程池数量大于等于 maximumPoolSize,则直接返回false
5. 使用 CAS(compareAndSet) 去增加当前的线程数量+1,CAS失败则从step 3开始重试。成功则再次获得线程池状态,如果线程池状态已经改变,则从step 1开始重试
6. 使用 firstTask创建一个worker,这个过程会调用 ThreadFactory 去创建新的 Worker 线程
7. 设置 workerStarted = false
8. 获取线程池的 mainLock,再次检查线程池状态,如果还是 RUNNING,或者是 SHUTDOWN 但是 firstTask 是空的(处于正在关闭中,需要新的线程把已有任务处理完毕),则继续
9. 把新的 worker 保存在 workers,更新 largestPoolSize,释放mainLock
10. 启动新线程,设置workerStarted = true。(如果线程池状态已经在step 8改变,或者启动新线程报错(OOM),workerStarted还是false) 新的线程会调用 Worker.run(),代理调用到 runWorker() 方法。
11. 再次检查 workerStarted,如果是 false,则 call addWorkerFailed(worker)去移除 worker,减少当前的线程数量
12. 返回当前 workerStarted
从这里可以看到,对于当前的线程数量增加,主要还是通过 CAS 操作去更新。但是对于 workers,largestPoolSize 的维护,则是通过mainLock完成的。由于线程池状态的更新没有使用 mainLock,即便在获取 mainLock 的情况下,也需要反复 check。另外一点要指出的是,如果 step 4 CAS 操作成功的话,当前的线程数量+1。在step 8获取 mainLock 后,由于线程池状态的改变也是可能失败的,则在后面 addWorkerFailed(worker) 中会-1。
void addWorkerFailed(Worker w)
1. 获取线程池的mainLock
2. CAS操作减去线程数量-1
3. call tryTerminate() 尝试关闭线程池 ( tryTerminate()的流程请看后续)
4. 释放mainLock
class Worker
Worker 类是 ThreadPoolExecutor 内部私有类,继承于 AbstractQueuedSynchronizer,并且实现了 Runnable 接口。在前面 ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core) 中step 10,work 的线程被启动,将调用 Worker.run()方法执行task。在Worker.run()里面,代理调用ThreadPoolExecutor.runWorker(Worker w)。所以 Worker 主要还是实现了 AbstractQueuedSynchronizer 的 isHeldExclusively(), tryAcquire(int i), tryRelease(int i)方法,并提供了 lock() , isLocked() , tryLock() , unlock() 给 ThreadPoolExecutor 调用(Adapter模式),以及保持了 volatile long completedTasks 属性为统计使用
void runWorker(Worker w)
1. 从 Worker 里获取 firstTask(第一次)
2. 或者调用 getTask() 从 workQueue 里面获取任务
3. 调用 w.lock() 获取锁
4. 检查线程池状态,如果是 STOP,TIDYING,TERMINATED,则直接 call Thread.interrupt() worker 线程
5. 调用 task.run() 执行 task 。final阶段会把调用 completedTasks++ 统计完成任务数量。这里保留了 beforeExecute(wt, task) 和 afterExecute(task, thrown) 2个方法没有实现(模板模式),以便子类扩展
6. 2-5重复执行,如果 step 2 从 workQueue 获取null,则跳出循环,completedAbruptly = false。如果执行task过程中遇到任何Error或者Exception, completedAbruptly = true
7. 调用 processWorkerExit(w, completedAbruptly);
Runnable getTask()
1. 获取当前的线程池状态,如果是STOP,TIDYING或TERMINATED,调用decrementWorkerCount()减去worker数量,返回null
2. 获取当前works数量
3. 如果数量已经大于 maximumPoolSize,并且数量大于1或者 workQueue 已经空了,调用 compareAndDecrementWorkerCount(c) CAS操作减少works数量1,返回null
4. 如果线程池允许核心线程超时并且上次poll已经超时,并且数量大于1或者 workQueue 已经空了,调用 compareAndDecrementWorkerCount(c) CAS 操作减少 works 数量1,返回null
5. CAS操作失败则重新从step 1开始
6. 如果线程池允许核心线程超时,或者当前是非核心线程,调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 从 workQueue 获取任务。反之则调用 workQueue.take() 获取任务。获得任务后直接返回,超时则标记timedOut = true,重新从step 1开始
7. 期间如果线程被打断(一般是被调用shutdown()、shutdownNow()),则标记timedOut = false,重新从step 1开始
当线程池idle的时候,核心线程调用 workQueue.take() 在这里被阻塞。非核心线程则调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),超时后线程退出。
void processWorkerExit(Worker w, boolean completedAbruptly)
1. 如果是异常退出 completedAbruptly = true,调用 decrementWorkerCount() 减去 worker 数量
2. 获取 mainLock,把 worker 完成任务数量统计到线程池维护的计数器里,completedTaskCount += w.completedTasks
3. 从 workers 删除 worker,释放 mainLock
4. 调用 tryTerminate() 尝试关闭线程池
5. 如果当前的线程池状态是 RUNNING 或者 SHUTDOWN,继续
6. 如果 worker 是正常退出,并且线程池是允许核心线程超时的或者 worker 数量已经大于等于 corePoolSize,直接返回。否则调用 addWorker(null, false) 创建一个新的线程
Java ThreadPoolExecutor 学习笔记(一)
Java ThreadPoolExecutor 学习笔记(三)
分享到:
相关推荐
Java专题学习笔记主要涵盖了Java语言的核心概念、进阶特性以及实际应用中的问题解析。这份笔记是结合了讲师的讲解和个人的整理,旨在为热爱Java编程的朋友们提供丰富的学习资源。以下将详细介绍其中可能包含的知识点...
【良葛格Java学习笔记】 本笔记主要涵盖了Java编程语言的核心概念和技术,旨在帮助初学者以及有一定基础的开发者深入理解并掌握Java。Java作为一种广泛应用于企业级应用开发、移动开发(尤其是Android)以及大数据...
Java学习笔记源码MD.rar是一个压缩包,其中包含了一系列关于Java编程语言的学习资料,特别是针对JVM(Java虚拟机),Spring框架以及Java多线程等内容的深入探讨。这些主题是Java开发中的核心部分,对于任何想要提升...
Java面试题及学习笔记概述 Java作为一种广泛应用的编程语言,其面试题库广泛且深入,涵盖了从基础语法到高级特性的各个层面。本篇将基于常见的Java面试问题,结合学习笔记,深入探讨Java的核心概念和技术。 一、...
这篇学习笔记将深入探讨Java多线程的核心概念、实现方式以及相关工具的使用。 一、多线程基础 1. 线程与进程:在操作系统中,进程是资源分配的基本单位,而线程是程序执行的基本单位。每个进程至少有一个主线程,...
Java线程学习笔记涉及了Java多线程编程的多个关键知识点,本篇知识点整理将详细解释每个概念及其在Java中的实现方式。 基本知识部分包含了Java线程编程的基础内容,它们是并发编程的基石。 任务Runnable是一个接口...
Java公司培训经典学习笔记是针对Java编程语言进行深入学习的一份宝贵资料,涵盖了从基础到高级的诸多知识点,旨在帮助开发者提升技能,适应企业级项目开发的需求。以下将详细阐述这些笔记中的关键点: 1. **Java...
Java JDK1.6学习笔记是一份详实的资料,涵盖了Java开发工具集(JDK)1.6版本的核心概念和技术。这份笔记对于Java初学者以及有一定经验的开发者来说都是一份宝贵的参考资料,它可能包含了从基础语法到高级特性的讲解...
这篇“java学习笔记”可能包含了从基础到进阶的各种Java编程概念和技术。以下是对这些笔记可能涵盖的一些关键知识点的详细说明: 1. **Java基础知识**: - **语法**:包括变量声明、数据类型(如整型、浮点型、...
在这个阶段的学习中,尚硅谷提供了2023年的学习笔记和面试题,帮助你提升Java技能并为求职做好准备。 首先,多线程是Java的一个关键特性,允许程序同时执行多个任务。理解线程的创建(通过Thread类或实现Runnable...
本篇学习笔记将深入解析Java线程池的框架、结构、原理以及相关源码,帮助读者全面理解线程池的工作机制。 1. 线程池模块结构 线程池框架分为多层结构,其中包括核心实现类、辅助类和接口等组件。例如,`sun.nio.ch....
Java的ExecutorService接口提供了线程池服务,其中包括ThreadPoolExecutor和ScheduledThreadPoolExecutor等实现。 7. **Volatile**: Volatile关键字保证了变量的可见性,但不保证原子性。当多个线程访问volatile...
这篇“Java线程编程学习笔记(二)”很可能是对Java并发编程深入探讨的一部分,特别是涉及多线程示例的实践应用。我们将从标题、描述以及标签来推测可能涵盖的知识点,并结合"Multi-Threads Demo"这一压缩包文件名来...
这份学习笔记旨在帮助读者深入理解和掌握Java编程技术,以下是笔记中的主要知识点: 1. **Java基础** - **变量与数据类型**:Java有基本数据类型(如int、char、boolean等)和引用数据类型(类、接口、数组)。...
以上只是Java学习笔记中可能涉及的部分知识点,实际上,完整的笔记还会包含更深入的Java特性,如注解、模块系统、JDBC数据库访问、Spring框架等内容,以及实际项目开发中的最佳实践。这份笔记是初级Java程序员系统...
张龙老师的JAVA圣思园笔记深入浅出地涵盖了JAVA-SE的基础知识点,帮助学习者构建扎实的Java编程基础。 1. **IO(输入/输出)**: Java IO是处理数据输入和输出的重要模块,包括文件操作、流处理等。笔记中可能讲解...
Java学习笔记基础与框架 Java是一种广泛使用的面向对象的编程语言,它的设计目标是具有简单性、面向对象、健壮性、安全性、可移植性等特点。这份“java学习笔记基础和框架”涵盖了从Java的基础概念到高级特性的全...
这份“达内的Java学习笔记”涵盖了从基础到高级的Java知识体系,旨在帮助初学者和有经验的开发者深入理解Java的核心概念和技术。 1. **Java基础** - **语法结构**:了解Java的基本语法,包括数据类型(如整型、...
这份"Java demo 算法笔记"集合了Java开发中的多种关键知识点,包括但不限于基础语法、框架源码解析、算法实现以及并发处理等内容,对于学习和提升Java编程技能具有极大的帮助。 首先,我们来探讨Java的基础部分。...
### Java并发编程学习笔记知识点详解 #### 一、Java并发编程概述 Java并发编程是指在Java应用程序中同时执行多个操作的技术。它通过多线程、线程池等机制实现资源的有效利用,提高程序运行效率。Java并发编程的...