来源 :【java并发】juc Executor框架详解 Java线程池架构原理和源码解析(ThreadPoolExecutor)
1. Executor整体架构:
AbstractExecutorService 是抽象类,主要实现了 ExecutorService 和 futureTask 相关的一些任务创建和提交的方法。
ThreadPoolExecutor 是最核心的一个类,是线程池的内部实现。线程池的功能都在这里实现了,平时用的最多的基本就是这个了。其源码很精练,远没当时想象的多。
ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上提供了支持定时调度的功能。线程任务可以在一定延时时间后才被触发执行。
2. ThreadPoolExecutor 原理 :
2.1 ThreadPoolExecutor内部的几个重要属性
新版的JDk中还有一个重要的原子变量 , ctl ,
1. 线程池本身的状态 // 不是单个线程
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
2. 等待任务队列和工作集
private final BlockingQueue<Runnable> workQueue; //等待被执行的Runnable任务 private final HashSet<Worker> workers = new HashSet<Worker>(); //正在被执行的Worker任务集
3. 线程的存活时间和大小
private volatile long keepAliveTime;// 线程存活时间 private volatile boolean allowCoreThreadTimeOut;// 是否允许核心线程存活 如果==false,核心线程即使idle也会keep alive private volatile int corePoolSize;// 核心池大小 private volatile int maximumPoolSize; // 最大池大小 private volatile int poolSize; //当前池大小 private int largestPoolSize; //最大池大小,区别于maximumPoolSize,是用于记录线程池曾经达到过的最大并发,理论上小于等于maximumPoolSize。
poolSize corePoolSize maximumPoolSize keepAliveTime分别表示什么?
poolSize : 当前工作的线程
corePoolSize : 维护的最少工作线程
MaximunPoolSize : 最多允许工作的线程
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
// Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
4. 线程工厂和拒绝策略
private volatile RejectedExecutionHandler handler;// 拒绝策略,用于当线程池无法承载新线程是的处理策略。 private volatile ThreadFactory threadFactory;// 线程工厂,用于在线程池需要新创建线程的时候创建线程
5. 线程池完成任务数
private long completedTaskCount;//线程池运行到当前完成的任务数总和
2.2 ThreadPoolExecutor 的内部工作原理
有了以上定义好的数据,下面来看看内部是如何实现的 。 Doug Lea 的整个思路总结起来就是 5 句话:
2. 如果当前池大小 poolSize 大于 corePoolSize ,且等待队列未满,则进入等待队列
3. 如果当前池大小 poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待队列已满,则创建新线程执行任务。
4. 如果当前池大小 poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待队列已满,则调用拒绝策略来处理该任务。
5. 线程池里的每个线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。
最核心的execute() 方法 :
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 如果小于 corePool 就会创建线程 并运行 if (runState == RUNNING && workQueue.offer(command)) { // 如果大于corePool ,并且线程池是运行的,试图添加到等待队列 if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) // reject处理 多种策略 可指定 默认AbortPolicy reject(command); // is shutdown or saturated } }
这段代码看似简单,其实有点难懂,很多人也是这里没看懂,没事,我一个if一个if说:
首先第一个判定空操作就不用说了,下面判定的poolSize >= corePoolSize成立时候会进入if的区域,当然它不成立也有可能会进入,他会判定addIfUnderCorePoolSize是否返回false,如果返回false就会进去;
我们先来看下addIfUnderCorePoolSize方法的源码是什么:
源码段3:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); // 注意此处的start() , 这就是线程池中的线程运行的起点 return true; }
可以发现,这段源码是如果发现小雨corePoolSize就会创建一个新的线程,并且调用线程的start()方法将线程运行起来:这个addThread()方法,我们先不考虑细节,因为我们还要先看到前面是怎么进去的,这里可以发信啊,只有没有创建成功Thread才会返回false,也就是当当前的poolSize > corePoolSize的时候,或线程池已经不是在running状态的时候才会出现;
注意:这里在外部判定一次poolSize和corePoolSize只是初步判定,内部是加锁后判定的,以得到更为准确的结果,而外部初步判定如果是大于了,就没有必要进入这段有锁的代码了。
此时我们知道了,当前线程数量大于corePoolSize的时候,就会进入【代码段2】的第一个if语句中,回到【源码段2】,继续看if语句中的内容:
这里标记为
源码段4:
if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated
第一个if,也就是当当前状态为running的时候,就会去执行workQueue.offer(command),这个workQueue其实就是一个BlockingQueue,offer()操作就是在队列的尾部写入一个对象,此时写入的对象为线程的对象而已;所以你可以认为只有线程池在RUNNING状态,才会在队列尾部插入数据,否则就执行else if,其实else if可以看出是要做一个是否大于MaximumPoolSize的判定,如果大于这个值,就会做reject的操作,关于reject的说明,我们在【源码段1】的解释中已经非常明确的说明,这里可以简单看下源码,以应征结果:
源码段5:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) //在corePoolSize = maximumPoolSize下,该代码几乎不可能运行 t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } void reject(Runnable command) { handler.rejectedExecution(command, this); }
也就是如果线程池满了,而且线程池调用了shutdown后,还在调用execute方法时,就会抛出上面说明的异常:RejectedExecutionException
再回头来看下【代码段4】中进入到等待队列后的操作:
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
这段代码是要在线程池运行状态不是RUNNING或poolSize == 0才会调用,他是干啥呢?
他为什么会不等于RUNNING呢?外面那一层不是判定了他== RUNNING了么,其实有时间差就是了,如果是poolSize == 0也会执行这段代码,但是里面的判定条件是如果不是RUNNING,就做reject操作,在第一个线程进去的时候,会将第一个线程直接启动起来;很多人也是看这段代码很绕,因为不断的循环判定类似的判定条件,你主要记住他们之间有时间差,要取最新的就好了。
此时貌似代码看完了?咦,此时有问题了:// 前面我们看到的时,只是new 出了一个新的线程,并没有调用他的start()方法!
1、 等待中的线程在后来是如何跑起来的呢?线程池是不是有类似Timer一样的守护进程不断扫描线程队列和等待队列?还是利用某种锁机制,实现类似wait和notify实现的?
2、 线程池的运行队列和等待队列是如何管理的呢?这里还没看出影子呢!
NO,NO,NO!
Java在实现这部分的时候,使用了怪异的手段,神马手段呢,还要再看一部分代码才晓得。
在前面【源码段3】中,我们看到了一个方法叫:addThread(),也许很少有人会想到关键在这里,其实关键就是在这里:
我们看看addThread()方法到底做了什么。
源码段6:
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
Work : // 新版的Work , 可能更下面的代码有些出入!!!!!!
private final class Worker extends AbstractQueuedSynchronizer // 实现了AQS implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { // 这个是我的jdk中的源代码,下面的run()方法,是从别的网页粘贴过来的,应当以前版本的,不过差别不大,只是新的版本中,把 runWorker(this); // while循环部分都放入了runWorker()中 } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 注意 , 在他内部自己实现了独占锁 public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
这里创建了一个Work,其余的操作,就是讲poolSize叠加,然后将将其放入workers的运行队列等操作;
我们主要关心Worker是干什么的,因为这个threadFactory对我们用途不大,只是做了Thread的命名处理;而Worker你会发现它的定义也是一个Runnable,外部开始在代码段中发现了调用哪个这个Worker的start()方法,也就是线程的启动方法,其实也就是调用了Worker的run()方法,那么我们重点要关心run方法是如何处理的
源码段7:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
FirstTask其实就是开始在创建work的时候,由外部传入的Runnable对象,也就是你自己的Thread,你会发现它如果发现task为空,就会调用getTask()方法再判定,直到两者为空,并且是一个while循环体。
那么看看getTask()方法的实现为:
源码段8:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } }
你会发现它是从workQueue队列中,也就是等待队列中获取一个元素出来并返回!
回过头来根据代码段6理解下:
当前线程运行完后,在到workQueue中去获取一个task出来,继续运行,这样就保证了线程池中有一定的线程一直在运行;此时若跳出了while循环,只有workQueue队列为空才会出现或出现了类似于shutdown的操作,自然运行队列会减少1,当再有新的线程进来的时候,就又开始向worker里面放数据了,这样以此类推,实现了线程池的功能。
这里可以看下run方法的finally中调用的workerDone方法为:
源码段9:
void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); if (--poolSize == 0) tryTerminate(); } finally { mainLock.unlock(); } }
注意这里将workers.remove(w)掉,并且调用了—poolSize来做操作。
至于tryTerminate是做了更多关于回收方面的操作。
最后我们还要看一段代码就是在【源码段6】中出现的代码调用为:runTask(task);这个方法也是运行的关键。
源码段10:
- private void runTask(Runnable task) {
- final ReentrantLock runLock = this.runLock;
- runLock.lock();
- try {
- if (runState < STOP &&
- Thread.interrupted() &&
- runState >= STOP)
- thread.interrupt();
- boolean ran = false;
- beforeExecute(thread, task);
- try {
- task.run(); // run()
- ran = true;
- afterExecute(task, null);
- ++completedTasks;
- } catch (RuntimeException ex) {
- if (!ran)
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- runLock.unlock();
- }
- }
你可以看到,这里面的task为传入的task信息,调用的不是start方法,而是run方法,因为run方法直接调用不会启动新的线程,也是因为这样,导致了你无法获取到你自己的线程的状态,因为线程池是直接调用的run方法,而不是start方法来运行。
这里有个beforeExecute和afterExecute方法,分别代表在执行前和执行后,你可以做一段操作,在这个类中,这两个方法都是【空body】的,因为普通线程池无需做更多的操作。
如果你要实现类似暂停等待通知的或其他的操作,可以自己extends后进行重写构造;
如何实现线程复用?
Doug Lea 的实现思路是 线程池里的每个线程执行完任务后不立刻退出,而是去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。这个功能的实现 关键在于Worker 。线程池在执行 Runnable 任务的时候,并不单纯把 Runnable 任务交给创建一个 Thread 。而是会把Runnable 任务封装成 Worker 任务。
下面看看 Worker 的实现:
代码很简单,可以看出, worker 里面包装了 firstTask 属性,在构造worker 的时候传进来的那个 Runnable 任务就是 firstTask 。 同时也实现了Runnable接口,所以是个代理模式,看看代理增加了哪些功能。 关键看 woker 的 run方法:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
可以看出 worker 的 run 方法是一个循环,第一次循环运行的必然是 firstTask ,在运行完 firstTask 之后,并不会立刻结束,而是会调用 getTask 获取新的任务( getTask 会从等待队列里获取等待中的任务),如果keepAliveTime 时间内得到新任务则继续执行,得不到新任务则那么线程才会退出。这样就保证了多个任务可以复用一个线程,而不是每次都创建新任务。 keepAliveTime 的逻辑在哪里实现的呢?主要是利用了 BlockingQueue的 poll 方法支持等待。可看 getTask 的代码段:
if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); //等候特定的时间再移除 else r = workQueue.take();
3. 线程池的使用策略:
1. newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
2. newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
3. newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
4. newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
相关推荐
### 线程池 `ThreadPoolExecutor` 原理源码分析 #### 一、概述 线程池作为 Java 并发编程中的重要组件,在实际应用中被广泛使用。其核心类 `ThreadPoolExecutor` 实现了对线程的管理、调度等功能。本文将围绕 `...
"java 中ThreadPoolExecutor 原理分析" ThreadPoolExecutor 是 Java 并发编程中的一种高级线程池实现,它提供了一个灵活的线程池管理机制,允许开发者根据需要配置线程池的参数以满足不同的需求。在这篇文章中,...
Java线程池ThreadPoolExecutor原理及使用实例 Java线程池ThreadPoolExecutor是Java并发编程中的一种基本机制,主要用于管理和执行任务的线程池。下面对其原理和使用实例进行详细介绍。 线程池概述 线程池是一个...
线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...
线程池ThreadPoolExecutor底层原理源码分析
ThreadPoolExecutor 的工作原理可以分为以下几个步骤: 1. 创建线程池:创建一个 ThreadPoolExecutor 实例,并设置其核心参数,例如 corePoolSize、maxPoolSize 和 keep-alive time。 2. 提交任务:提交一个任务给...
线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor
线程池ThreadPoolExecutor实战及其原理分析(上)
根据提供的文件信息,我们可以深入探讨线程池`ThreadPoolExecutor`的工作原理及其实现细节,同时也会涉及并发编程中的一些关键概念和技术。 ### 线程池`ThreadPoolExecutor`概述 `ThreadPoolExecutor`是Java中非常...
根据给定文件的信息,我们可以深入探讨Java中`ThreadPoolExecutor`线程池的底层实现原理,特别是其核心数据结构`ctl`以及线程池的各种状态转换。以下是对这些知识点的详细解释: ### 一、线程池`ThreadPoolExecutor...
线程池的实现原理主要包括以下几个步骤: 1. 当提交一个任务到线程池时,线程池会检查当前运行的线程是否少于核心线程数`corePoolSize`,如果是,则创建新的线程执行任务。 2. 如果线程数量达到`corePoolSize`,新...
ThreadPoolExecutor线程池原理及其execute方法详解 ThreadPoolExecutor是Java并发包中提供的线程池类,用于管理和执行异步任务。ThreadPoolExecutor的执行原理可以分为四个步骤: 1.核心线程池:...
本文将深入解析ThreadPoolExecutor的execute()方法执行流程,以帮助我们理解线程池的工作原理。 当一个任务被提交到线程池,线程池的执行策略主要分为四步: 1. 首先,线程池会检查当前的核心线程数是否已达到设定...
首先,我们要了解线程池的基本原理。线程池是由一组预先创建的线程组成的,这些线程可以复用,而不是每次执行任务时都创建新的线程。`ThreadPoolExecutor`是Java并发包`java.util.concurrent`中的核心类,用于实现...
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
"线程池的实现以及底层原理" 线程池是Java多线程编程中的一种常见技术,能够提高系统的性能和可靠性。本文将详细介绍线程池的实现原理、线程池的优势、线程池的类型、线程池的创建方式以及线程池的使用注意事项。 ...
2. 线程池(ThreadPoolExecutor)的概念与优势: 线程池是一种多线程处理形式,它预先创建了若干数量的可执行线程并放在一个池子中,需要的时候直接拿来使用,使用完毕后再放回池中。线程池技术的优势在于:能够减少...