目录
- FutureTask
- ExecutorCompletionService
- AbstractExecutorService
- ThreadPoolExecutor
FutureTask
FutureTask类 结构
FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既是Runnable,也是Future。
主要成员变量
//任务运行状态 private volatile int state; //预定义多7种状态 private static final int NEW = 0; //任务新建和执行中 private static final int COMPLETING = 1; //任务将要执行完毕中 private static final int NORMAL = 2; //任务正常执行结束 private static final int EXCEPTIONAL = 3; //任务异常 private static final int CANCELLED = 4; //任务取消 private static final int INTERRUPTING = 5; //任务线程即将被中断 private static final int INTERRUPTED = 6; //任务线程已中断 //可能的状态转换 //NEW -> COMPLETING -> NORMAL // NEW -> COMPLETING -> EXCEPTIONAL // NEW -> CANCELLED //NEW -> INTERRUPTING -> INTERRUPTED private Callable callable;//被提交的任务 private Object outcome; //任务执行结果或者任务异常 private volatile Thread runner;//执行任务的线程 private volatile WaitNode waiters;//等待节点,关联等待线程
run
public void run() { //如果运行状态不是NEW 或者CAS设置运行线程为当前线程失败则返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //当前被提交的任务 Callable c = callable; //如果任务不为null同时运行状态是NEW if (c != null && state == NEW) { V result; //运行正常标记 boolean ran; try { //执行提交的任务,并将结果赋值给result,同时将运行正常标记设置为true result = c.call(); ran = true; } catch (Throwable ex) { //如果执行提交任务异常,则将结果设置为null,运行正常标记设置为false,同时设置异常 result = null; ran = false; setException(ex); } //如果运行正常,则将设置结果 if (ran) set(result); } } finally { //退出前将当前运行线程设置为null runner = null; //重置runner后必须重新读取state防止有泄漏的中断 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { // CAS将运行状态从NEW更新为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //更新成功将结果赋值给outcome outcome = v; //将状态更新为NORMAL,终态 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //结束处理,删除或唤醒等待线程等 finishCompletion(); } } //删除或唤醒所有等待线程,待用done()同时将callable重置为null private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { //CAS将等待节点更新为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //获取等待节点的线程 Thread t = q.thread; //如果等待节点电池不为null,则将其设置为null并唤醒线程 if (t != null) { q.thread = null; LockSupport.unpark(t); } //获取等待线程的直接后继节点 WaitNode next = q.next; //如果直接后继为null则表示所有等待线程都处理完,跳出循环 if (next == null) break; //将节点从链表中删除 q.next = null; // unlink to help gc //继续处理后继节点 q = next; } //跳出循环 break; } } //调用done done(); //将callable重置为null callable = null; } protected void setException(Throwable t) { // CAS 将运行状态从NEW 更新为 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //将异常赋值给outcome outcome = t; //将运行状态更新为终态 EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //结束处理 finishCompletion(); } } //等待状态从INTERRUPTING变为终态 private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); }get
public V get() throws InterruptedException, ExecutionException { //运行状态 int s = state; //如果还没有结束 if (s <= COMPLETING) //等待结束 s = awaitDone(false, 0L); //返回结果 return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { //如果有超时限制,则计算deadLine final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; //入队标记 boolean queued = false; //循环 for (;;) { //如果当前线程已经中断,则将该节点从等待队列中删除,并抛出异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } //运行状态 int s = state; if (s > COMPLETING) { //任务可能已经完成或者被取消了 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 可能任务线程被阻塞了,主线程让出CPU Thread.yield(); else if (q == null) // 等待线程节点为空,则初始化新节点并关联当前线程 q = new WaitNode(); else if (!queued) //如果没有入队过,等待线程入队列,成功则queued=true queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) {//限制超时 nanos = deadline - System.nanoTime(); //如果超时移除等待节点 if (nanos <= 0L) { removeWaiter(q); return state; } //没有超时则线程挂起 LockSupport.parkNanos(this, nanos); } else 线程挂起 LockSupport.park(this); } } private void removeWaiter(WaitNode node) { //如果要删除的节点不为null,则将其节点线程设置为null if (node != null) { node.thread = null; retry: for (;;) { //q初始化为等待节点 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { //q的直接后继 s = q.next; //如果q的线程不为null,表示q不是需要删除的节点,则将q赋值给pred if (q.thread != null) pred = q; else if (pred != null) { //如果pred不为null则表示pred是有效节点, //需要删除的后继节点作为pred的直接后继,进而将需要删除的节点从队列中删除 pred.next = s; if (pred.thread == null) // 竞态检查 continue retry; }//如果头节点是需要删除的节点则CAS更新 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> { //具体执行任务的线程池 private final Executor executor; private final AbstractExecutorService aes; //任务执行完成的阻塞队列 private final BlockingQueue<Future<V>> completionQueue; private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } //FutureTask执行结束的时候会被调用,将task添加到阻塞队列 protected void done() { completionQueue.add(task); } private final Future<V> task; } private RunnableFuture<V> newTaskFor(Callable<V> task) { //如果aes为null则将task包装成一个新的FutureTask,否则调用aes newTaskFor方法进行包装 if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public Future<V> submit(Callable<V> task) { //如果提交的任务为null则抛出NPE if (task == null) throw new NullPointerException(); //将当前任务包装为一个FutureTask RunnableFuture<V> f = newTaskFor(task); //再将FutureTask包装为QueueingFuture 后执行 executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } }
ExecutorCompletionService
AbstractExecutorService 抽象类实现了 ExecutorService 接口,然后在其基础上实现了 invokeAny 方法和 invokeAll 方法,这里的 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。
invokeAny
public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } private T doInvokeAny(Collection> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { //如果提交的任务集合为null则抛NPE if (tasks == null) throw new NullPointerException(); //提交的任务数量 int ntasks = tasks.size(); //如果提交的任务数量为0则抛一个IllegalArgumentException if (ntasks == 0) throw new IllegalArgumentException(); //创建一个结果集合列表 ArrayList<future> futures = new ArrayList<future>(ntasks); //创建一个ExecutorCompletionService ExecutorCompletionService ecs = new ExecutorCompletionService(this); try { ExecutionException ee = null; //如果有限制超时时间则计算出deadLine final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator> it = tasks.iterator(); //执行第一个任务 futures.add(ecs.submit(it.next())); //任务数自减 --ntasks; //活动任务数为1 int active = 1; for (;;) { //获取执行完成结果 Future f = ecs.poll(); //如果没有结果,则表示任务尚未执行完 if (f == null) { //剩余任务数大于0,则继续执行 if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0)//如果活动任务数为0,且没有获取到成功成功结果,则表示所有任务都失败了 break; else if (timed) {// 如果限制超时时间 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else //如果没有要提交的任务,线程池中任务还没有结束阻塞等待结果 f = ecs.take(); } //如果获取到了结果,活动任务数自减 if (f != null) { --active; try { //返回结果 return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { //退出前取消所有任务 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法
首先看一下构造方法/** * 使用给定的初始化参数创建一个新的线程池 * * @param corePoolSize 线程池中保持的线程数量,即使这些线程是空闲的;除非设置了allowCoreThreadTimeOut * @param maximumPoolSize 线程池中允许的最大线程数 * @param keepAliveTime 当线程池中的线程数大于corePoolSize时,这些多余空闲线程等待新任务的最大时间 * @param unit keepAliveTime的时间单位 * @param workQueue 用来在线程执行前持有线程的队列。这个队列只持有通过executor方法提交的runnable任务 * @param threadFactory 线程池创建新线程的线程工厂 * @param handler 线程池关闭或饱和时的处理策略 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }除了构造方法中的这几个初始化参数外还有一个比较重要的属性需要理解
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 这里 COUNT_BITS 设置为 29(32-3),为什么减去三呢?这是因为线程池有5种状态,需要用到三位,高三位用来表示线程池的状态 private static final int COUNT_BITS = Integer.SIZE - 3; // 000 11111111111111111111111111111 // 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1 大约是5亿 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 我们说了,线程池的状态存放在高 3 位中 // 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 对CAPACITY 按位去反高三位全部为1在进行按位与运算,就将地位全部变为0 private static int runStateOf(int c) { return c & ~CAPACITY; } // 按位与允许将高三位全部变为0就得到了当前线程池中的线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } //按位或运算就得到了ctl private static int ctlOf(int rs, int wc) { return rs | wc; }
在介绍完基本属性后接下来要看主要方法executor(Runnable),它处理过程分为如下三步:
- 如果小于corePoolSize的线程在运行,尝试使用给定的任务作为工作线程的第一个任务去去创建一个工作线程。 调用addWorker方法以原子方式检查runState和workeCount,从而防止虚假报警,在返回false时会在不应该的时候添加线程
- 如果任务可以排队成功,那么我们仍然要双重检查是否应该添加一个新的线程,因为自上次检查以来可能现有线程已经死亡或自进入这个方法以来线程池已经关闭。所以需要双重检查,如果有关闭了可以对队列进行回滚,如果线程池没有线程可以开启一个新线程
- 如果任务不能排队,则尝试创建一个新的线程,如果创建新线程失败就知道线程已经关闭或已经饱和了,因此执行拒绝策略
public void execute(Runnable command) { //如果任务为null则抛出NPE if (command == null) throw new NullPointerException(); //获取ctl int c = ctl.get(); //如果线程池的线程数小于corePoolSize if (workerCountOf(c) < corePoolSize) { //添加新工作线程,如果添加成功则返回 if (addWorker(command, true)) return; //添加失败再次获取ctl,进行重复检查 c = ctl.get(); } //如果线程池还是运行的,则对当前任务进行排队;如果排队成功 if (isRunning(c) && workQueue.offer(command)) { //再次获取ctl int recheck = ctl.get(); //如果线程池已经关闭则将任务从队列中删除,如果删除成功对该任务执行饱和策略 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0)//如果没有运行线程,则创建一个新线程 addWorker(null, false); }//执行到此表示,线程池已经关闭或排队失败;则尝试创建新的工作线程,如果添加新的工作线程则执行饱和策略 else if (!addWorker(command, false)) reject(command); }
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //运行在Worker中的线程,真正执行任务的线程 final Thread thread; //初始化的创建的第一个任务,可以为null Runnable firstTask; //用于存放此线程完全的任务数 volatile long completedTasks; // 构造方法 Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); //获取worker中的firstTask Runnable task = w.firstTask; //将firstTask设置为null,因为这个任务是创建worker时候指定的,稍后就会执行这个任务 w.firstTask = null; //对worker进行解锁,允许中断 w.unlock(); boolean completedAbruptly = true; try { //如果任务不为null或者getTask()中可以去到任务 while (task != null || (task = getTask()) != null) { //对工作线程加锁不允许中断 w.lock(); //如果线程池状态大于等于 STOP,那么意味着该线程也要中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行beforeExecute,这是一个扩展点 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行afterExecute,这里也是一个扩展点 afterExecute(task, thrown); } } finally { // 置空 task,准备 getTask 获取下一个任务 task = null; //完成的任务数自增 w.completedTasks++; // 释放掉 worker 的独占锁 w.unlock(); } } completedAbruptly = false; } finally { // 如果到这里,需要执行线程关闭: // 1. 说明 getTask 返回 null,没有任务要执行了 // 2. 任务执行过程中发生了异常 processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); //获取运行状态 int rs = runStateOf(c); // 如果线程池已经关闭 同时队列为空,则减少工作线程数 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //工作线程数 int wc = workerCountOf(c); //是否需要计时处理,如果设置了allowCoreThreadTimeOut或当前工作线程数量大于corePoolSize 则需要计时处理 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //(wc > maximumPoolSize || (timed && timedOut)) 工作线程大于线程池的最大值 或需要计时且已经超时 //(wc > 1 || workQueue.isEmpty()) 工作线程数大于1或任务队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //工作线程数量减少 返回null if (compareAndDecrementWorkerCount(c)) return null; //如果CAS更新数量失败则继续 continue; } try { //如果需要计时,则使用超时poll否则阻塞获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //r 为null则是因为超时导致的 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果是意外结束,则工作线程自减 decrementWorkerCount(); //加锁处理 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //将当前工作线程执行的任务数加到线程池的执行的任务数上 completedTaskCount += w.completedTasks; //将当前工作线程从工作hashSet中删除 workers.remove(w); } finally { mainLock.unlock(); } //尝试结束线程池 tryTerminate(); //获取ctl int c = ctl.get(); //如果线程池运行状态小于STOP 即RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { //如果不是意外结束 if (!completedAbruptly) { //如果设置了allowCoreThreadTimeOut 则线程池最少持有线程则为0否则为corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果min==0但是队列不为空则min=1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果工作线程大于或等于min返回 if (workerCountOf(c) >= min) return; } //此时表示工作线程小于线程池最小线程数则添加一个新的工作线程,不指定firstTask addWorker(null, false); } }
相关推荐
本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要理解Java线程池的核心类`java.util.concurrent.ThreadPoolExecutor`,它是所有自定义...
java线程池源码 cThreadPool 项目描述:对java.util.concurrent包下线程池相关源码进行重新实现,深入研究和学习线程池超时机制、饱和策略、生命周期等知识 ThreadPoolExecutor类下部分方法和内部类介绍: 1、Worker...
线程池是Java多线程编程中不可或缺的一部分,它通过管理一组可重用线程来提高应用程序的性能和...通过分析这个源码,我们可以深入理解线程池的运作机制,并学习如何根据具体需求调整线程池参数,以实现高效的并发编程。
### 详解Java线程池的ctl(线程池控制状态) #### 0. 综述 在Java的并发编程中,`ThreadPoolExecutor`类扮演着非常重要的角色,它提供了创建和管理线程池的能力。线程池的核心在于如何有效地管理和调度线程资源,而...
本项目中分享的是一套经过实际项目验证的、性能稳定的线程池源码,包括ThreadPool.java、PooledThread.java和ThreadTask.java三个核心组件。 ThreadPool.java是线程池的核心类,它主要负责线程池的初始化、任务的...
3:对线程池的基本使用及其部分源码的分析(注意:这里的源码分析是基于jdk1.6;) a:线程池的状态 volatile int runState; static final int RUNNING = 0; 运行状态 static final int SHUTDOWN = 1; 关闭状态;...
同时,分析线程池源码能让我们更好地理解其内部机制,解决实际问题,例如处理拒绝策略、监控线程池状态等。 在`ThreadPoolExecutor`源码中,可以看到关键方法如`execute()`、`afterExecute()`、`beforeExecute()`...
通过分析Java线程池源码,我们可以学习到如何合理配置线程池参数,如何选择合适的工作队列,以及如何处理拒绝策略,从而在实际开发中更好地利用多线程来提高程序效率。此外,源码阅读也有助于理解Java并发库的设计...
3. 源码分析:如何在Java源码中查找线程池和消息队列的实现。 4. 文件路径管理:如何在Java中管理代码源路径,遍历目录结构。 5. 类和对象:`CodeReader`和`SourcePathManager`类的设计和实现。 为了深入理解这些...
源码分析** 阅读源码可以帮助我们更好地理解线程池的内部机制。例如,`ThreadPoolExecutor.execute()`方法是如何处理任务的提交,`ThreadPoolExecutor.shutdown()`和`shutdownNow()`又是如何优雅地关闭线程池。 **...
在Java编程领域,源码分析是一项至关重要的技能,它能帮助开发者深入理解框架的工作原理,提升软件设计和问题解决的能力。"Java源码分析 【源码笔记】"是一系列专门针对Java后端框架进行源码解析的文章集合,旨在...
java线程池源码解读 GRIDSS - 基因组重排识别软件套件 GRIDSS 是一个模块软件套件,包含用于检测基因组重排的工具。 GRIDSS 包括一个全基因组断裂端组装器,以及一个用于 Illumina 测序数据的结构变异调用器。 ...
### 线程池 `ThreadPoolExecutor` 原理源码分析 #### 一、概述 线程池作为 Java 并发编程中的重要组件,在实际应用中被广泛使用。其核心类 `ThreadPoolExecutor` 实现了对线程的管理、调度等功能。本文将围绕 `...
"java并发源码分析之实战编程"这个主题深入探讨了Java平台上的并发处理机制,旨在帮助开发者理解并有效地利用这些机制来提高程序性能和可扩展性。在这个专题中,我们将围绕Java并发库、线程管理、锁机制、并发容器...
在Java、C++等编程语言中,线程池被广泛应用,用于管理线程的生命周期,避免频繁创建和销毁线程带来的开销。本文将基于提供的"ThreadPool.cpp"和"ThreadPool.h"文件,深入解析线程池的实现原理及其自动管理线程创建...
在Java编程中,线程池是一种管理线程的机制,它可以帮助我们更有效地调度和执行并发任务。...结合`SurgeryThreadPool.java`源码分析和`Java.jpg`中的示例,我们可以进一步理解线程池在实际项目中的具体实现和应用。
`标签`中的“源码”提示我们可以深入研究Spring线程池的实现,例如分析`ThreadPoolTaskExecutor`的源代码,了解其内部如何处理任务提交、线程管理以及异常处理等。而“工具”则可能是指Spring提供的一些工具类或辅助...
通过上述对`ThreadPoolExecutor`线程池底层实现原理的解析,我们可以看到Java线程池的强大之处在于其高效的状态管理和任务调度机制。通过对`ctl`变量的巧妙利用,线程池能够有效地管理线程状态和数量,从而实现高...
Java源码分析是软件开发过程中一个重要的学习环节,它能帮助开发者深入理解代码背后的逻辑,提升编程技巧,以及优化程序性能。在这个过程中,我们通常会关注类的设计、算法的应用、数据结构的选择,以及如何利用Java...
本项目以"java多线程实现大批量数据导入源码"为题,旨在通过多线程策略将大量数据切分,并进行并行处理,以提高数据处理速度。 首先,我们需要理解Java中的线程机制。Java通过`Thread`类来创建和管理线程。每个线程...