`
Vampiredx
  • 浏览: 3632 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Java ThreadPoolExecutor 学习笔记(二)

阅读更多
ThreadPoolExecutor 实现了 ExecutorService 接口,从常用的方法来看一下 ThreadPoolExecutor 内部实现的大致流程.

Future submit(Callable task)
这个方法的实现在父类 AbstractExecutorService 里面:
1. 用 Callable task去创建 FutureTask 实例。FutureTask 是 RunnableFuture 接口的实现类,即实现了 Runnable 接口,也实现了 Future 接口。
2. 用 FutureTask 实例调用子类 execute(Runnable command)。
3. 把 FutureTask 实例返回

Future submit(Runnable task, T result)
这个方法的实现也在父类 AbstractExecutorService 里面:
1. 用 Runnable task 和 result 去创建 FutureTask 实例(成功执行 task 后,返回 result)。
2. 用 FutureTask 实例调用子类 execute(Runnable command)。
3. 把 FutureTask 实例返回

void execute(Runnable command)
1. 获取当前线程数量。
2. 如果当前线程数量少于 corePoolSize,调用 addWorker(command, true)去增加新的核心线程。
3. 如果添加核心线程失败(因为并发原因不能再添加核心线程),或当前线程数量大于等于 corePoolSize,把 command 放进 workQueue。
4. 再次检查线程池状态,如果被关闭,从 workQueue 移除 command ,调用 handler.rejectedExecution(command)。如果第2次检查查线程池没有被关闭,并且没有 worker 在工作,则调用 addWorker(null, false) 新增线程。
5. 如果 command 无法放进 workQueue,调用 addWorker(command, false) 增加新的 线程。
6. 新增线程失败(失败可能是因为线程池被关闭或者线程数量已经达到 maximumPoolSize ),则调用 handler.rejectedExecution(command) 。(源代码往 workQueue 增加task调用的是 offer 方法,也就是说 workQueue 如果已经满了,当前线程不会被blocking。在默认 AbortPolicy 策略下,抛出 RejectedExecutionException。 但是有的时候,我们并不希望得到一个 RejectedExecutionException,而是希望阻塞当前以及后续的线程提交新的 task 。可以实现 customized RejectedExecutionHandler,调用workQueue的put方法以达到这个目的。)

从上面的流程可以看出,只有在 workQueue 满了以后,才会增加非核心的线程。如果 workQueue 的 size 很大,那么很可能一直工作的只有 corePoolSize 数量的线程。

boolean addWorker(Runnable firstTask, boolean core)
1. 获取当前的线程数量,线程池状态
2. 如果状态显示线程池已经被关闭,或者正在被关闭,workQueue 已经空了,则直接返回false(创建新的线程失败)
3. 如果当前是添加核心线程,线程池数量大于等于 corePoolSize,返回false。
4. 如果当前是添加非核心线程,线程池数量大于等于 maximumPoolSize,则直接返回false
5. 使用 CAS(compareAndSet) 去增加当前的线程数量+1,CAS失败则从step 3开始重试。成功则再次获得线程池状态,如果线程池状态已经改变,则从step 1开始重试
6. 使用 firstTask创建一个worker,这个过程会调用 ThreadFactory 去创建新的 Worker 线程
7. 设置 workerStarted = false
8. 获取线程池的 mainLock,再次检查线程池状态,如果还是 RUNNING,或者是 SHUTDOWN 但是 firstTask 是空的(处于正在关闭中,需要新的线程把已有任务处理完毕),则继续
9. 把新的 worker 保存在 workers,更新 largestPoolSize,释放mainLock
10. 启动新线程,设置workerStarted = true。(如果线程池状态已经在step 8改变,或者启动新线程报错(OOM),workerStarted还是false) 新的线程会调用 Worker.run(),代理调用到 runWorker() 方法。
11. 再次检查 workerStarted,如果是 false,则 call addWorkerFailed(worker)去移除 worker,减少当前的线程数量
12. 返回当前 workerStarted

从这里可以看到,对于当前的线程数量增加,主要还是通过 CAS 操作去更新。但是对于 workers,largestPoolSize 的维护,则是通过mainLock完成的。由于线程池状态的更新没有使用 mainLock,即便在获取 mainLock 的情况下,也需要反复 check。另外一点要指出的是,如果 step 4 CAS 操作成功的话,当前的线程数量+1。在step 8获取 mainLock 后,由于线程池状态的改变也是可能失败的,则在后面 addWorkerFailed(worker) 中会-1。

void addWorkerFailed(Worker w)
1. 获取线程池的mainLock
2. CAS操作减去线程数量-1
3. call tryTerminate() 尝试关闭线程池 ( tryTerminate()的流程请看后续)
4. 释放mainLock



class Worker
Worker 类是 ThreadPoolExecutor 内部私有类,继承于 AbstractQueuedSynchronizer,并且实现了 Runnable 接口。在前面 ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core) 中step 10,work 的线程被启动,将调用 Worker.run()方法执行task。在Worker.run()里面,代理调用ThreadPoolExecutor.runWorker(Worker w)。所以 Worker 主要还是实现了 AbstractQueuedSynchronizer 的 isHeldExclusively(), tryAcquire(int i), tryRelease(int i)方法,并提供了 lock() , isLocked() , tryLock() , unlock() 给 ThreadPoolExecutor 调用(Adapter模式),以及保持了 volatile long completedTasks 属性为统计使用

void runWorker(Worker w)
1. 从 Worker 里获取 firstTask(第一次)
2. 或者调用 getTask() 从 workQueue 里面获取任务
3. 调用 w.lock() 获取锁
4. 检查线程池状态,如果是 STOP,TIDYING,TERMINATED,则直接 call Thread.interrupt() worker 线程
5. 调用 task.run() 执行 task 。final阶段会把调用 completedTasks++ 统计完成任务数量。这里保留了 beforeExecute(wt, task) 和 afterExecute(task, thrown) 2个方法没有实现(模板模式),以便子类扩展
6. 2-5重复执行,如果 step 2 从 workQueue 获取null,则跳出循环,completedAbruptly = false。如果执行task过程中遇到任何Error或者Exception, completedAbruptly = true
7. 调用 processWorkerExit(w, completedAbruptly);

Runnable getTask()
1. 获取当前的线程池状态,如果是STOP,TIDYING或TERMINATED,调用decrementWorkerCount()减去worker数量,返回null
2. 获取当前works数量
3. 如果数量已经大于 maximumPoolSize,并且数量大于1或者 workQueue 已经空了,调用 compareAndDecrementWorkerCount(c) CAS操作减少works数量1,返回null
4. 如果线程池允许核心线程超时并且上次poll已经超时,并且数量大于1或者 workQueue 已经空了,调用 compareAndDecrementWorkerCount(c) CAS 操作减少 works 数量1,返回null
5. CAS操作失败则重新从step 1开始
6. 如果线程池允许核心线程超时,或者当前是非核心线程,调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 从 workQueue 获取任务。反之则调用 workQueue.take() 获取任务。获得任务后直接返回,超时则标记timedOut = true,重新从step 1开始
7. 期间如果线程被打断(一般是被调用shutdown()、shutdownNow()),则标记timedOut = false,重新从step 1开始

当线程池idle的时候,核心线程调用 workQueue.take() 在这里被阻塞。非核心线程则调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),超时后线程退出。


void processWorkerExit(Worker w, boolean completedAbruptly)
1. 如果是异常退出 completedAbruptly = true,调用 decrementWorkerCount() 减去 worker 数量
2. 获取 mainLock,把 worker 完成任务数量统计到线程池维护的计数器里,completedTaskCount += w.completedTasks
3. 从 workers 删除 worker,释放 mainLock
4. 调用 tryTerminate() 尝试关闭线程池
5. 如果当前的线程池状态是 RUNNING 或者 SHUTDOWN,继续
6. 如果 worker 是正常退出,并且线程池是允许核心线程超时的或者 worker 数量已经大于等于 corePoolSize,直接返回。否则调用 addWorker(null, false) 创建一个新的线程

Java ThreadPoolExecutor 学习笔记(一)
Java ThreadPoolExecutor 学习笔记(三)

分享到:
评论

相关推荐

    Java专题学习笔记

    Java专题学习笔记主要涵盖了Java语言的核心概念、进阶特性以及实际应用中的问题解析。这份笔记是结合了讲师的讲解和个人的整理,旨在为热爱Java编程的朋友们提供丰富的学习资源。以下将详细介绍其中可能包含的知识点...

    良葛格Java学习笔记

    【良葛格Java学习笔记】 本笔记主要涵盖了Java编程语言的核心概念和技术,旨在帮助初学者以及有一定基础的开发者深入理解并掌握Java。Java作为一种广泛应用于企业级应用开发、移动开发(尤其是Android)以及大数据...

    java学习笔记源码MD.rar

    Java学习笔记源码MD.rar是一个压缩包,其中包含了一系列关于Java编程语言的学习资料,特别是针对JVM(Java虚拟机),Spring框架以及Java多线程等内容的深入探讨。这些主题是Java开发中的核心部分,对于任何想要提升...

    java面试题 学习笔记

    Java面试题及学习笔记概述 Java作为一种广泛应用的编程语言,其面试题库广泛且深入,涵盖了从基础语法到高级特性的各个层面。本篇将基于常见的Java面试问题,结合学习笔记,深入探讨Java的核心概念和技术。 一、...

    JAVA 多线程学习笔记

    这篇学习笔记将深入探讨Java多线程的核心概念、实现方式以及相关工具的使用。 一、多线程基础 1. 线程与进程:在操作系统中,进程是资源分配的基本单位,而线程是程序执行的基本单位。每个进程至少有一个主线程,...

    java线程学习笔记

    Java线程学习笔记涉及了Java多线程编程的多个关键知识点,本篇知识点整理将详细解释每个概念及其在Java中的实现方式。 基本知识部分包含了Java线程编程的基础内容,它们是并发编程的基石。 任务Runnable是一个接口...

    Java公司培训经典学习笔记

    Java公司培训经典学习笔记是针对Java编程语言进行深入学习的一份宝贵资料,涵盖了从基础到高级的诸多知识点,旨在帮助开发者提升技能,适应企业级项目开发的需求。以下将详细阐述这些笔记中的关键点: 1. **Java...

    Java JDK1.6学习笔记

    Java JDK1.6学习笔记是一份详实的资料,涵盖了Java开发工具集(JDK)1.6版本的核心概念和技术。这份笔记对于Java初学者以及有一定经验的开发者来说都是一份宝贵的参考资料,它可能包含了从基础语法到高级特性的讲解...

    java学习笔记,好好学习

    这篇“java学习笔记”可能包含了从基础到进阶的各种Java编程概念和技术。以下是对这些笔记可能涵盖的一些关键知识点的详细说明: 1. **Java基础知识**: - **语法**:包括变量声明、数据类型(如整型、浮点型、...

    Java 基础 第3阶段:高级应用-尚硅谷学习笔记(含面试题) 2023年

    在这个阶段的学习中,尚硅谷提供了2023年的学习笔记和面试题,帮助你提升Java技能并为求职做好准备。 首先,多线程是Java的一个关键特性,允许程序同时执行多个任务。理解线程的创建(通过Thread类或实现Runnable...

    JAVA课程学习笔记.doc

    本篇学习笔记将深入解析Java线程池的框架、结构、原理以及相关源码,帮助读者全面理解线程池的工作机制。 1. 线程池模块结构 线程池框架分为多层结构,其中包括核心实现类、辅助类和接口等组件。例如,`sun.nio.ch....

    Java并发编程学习笔记

    Java的ExecutorService接口提供了线程池服务,其中包括ThreadPoolExecutor和ScheduledThreadPoolExecutor等实现。 7. **Volatile**: Volatile关键字保证了变量的可见性,但不保证原子性。当多个线程访问volatile...

    Java线程编程学习笔记(二)

    这篇“Java线程编程学习笔记(二)”很可能是对Java并发编程深入探讨的一部分,特别是涉及多线程示例的实践应用。我们将从标题、描述以及标签来推测可能涵盖的知识点,并结合"Multi-Threads Demo"这一压缩包文件名来...

    《Core Java》学习笔记

    这份学习笔记旨在帮助读者深入理解和掌握Java编程技术,以下是笔记中的主要知识点: 1. **Java基础** - **变量与数据类型**:Java有基本数据类型(如int、char、boolean等)和引用数据类型(类、接口、数组)。...

    Java学习超强笔记

    以上只是Java学习笔记中可能涉及的部分知识点,实际上,完整的笔记还会包含更深入的Java特性,如注解、模块系统、JDBC数据库访问、Spring框架等内容,以及实际项目开发中的最佳实践。这份笔记是初级Java程序员系统...

    张龙JAVA圣思园笔记

    张龙老师的JAVA圣思园笔记深入浅出地涵盖了JAVA-SE的基础知识点,帮助学习者构建扎实的Java编程基础。 1. **IO(输入/输出)**: Java IO是处理数据输入和输出的重要模块,包括文件操作、流处理等。笔记中可能讲解...

    java学习笔记基础和框架

    Java学习笔记基础与框架 Java是一种广泛使用的面向对象的编程语言,它的设计目标是具有简单性、面向对象、健壮性、安全性、可移植性等特点。这份“java学习笔记基础和框架”涵盖了从Java的基础概念到高级特性的全...

    达内的Java学习笔记

    这份“达内的Java学习笔记”涵盖了从基础到高级的Java知识体系,旨在帮助初学者和有经验的开发者深入理解Java的核心概念和技术。 1. **Java基础** - **语法结构**:了解Java的基本语法,包括数据类型(如整型、...

    Java demo 算法笔记

    这份"Java demo 算法笔记"集合了Java开发中的多种关键知识点,包括但不限于基础语法、框架源码解析、算法实现以及并发处理等内容,对于学习和提升Java编程技能具有极大的帮助。 首先,我们来探讨Java的基础部分。...

    java并发编程学习笔记

    ### Java并发编程学习笔记知识点详解 #### 一、Java并发编程概述 Java并发编程是指在Java应用程序中同时执行多个操作的技术。它通过多线程、线程池等机制实现资源的有效利用,提高程序运行效率。Java并发编程的...

Global site tag (gtag.js) - Google Analytics