继续ThreadPoolExecutor 线程池的分析~~~~~
2.工作线程(WorkThread): 线程池中线程。
ThreadPoolExecutor 实现的核心的思想也不例外:WorkThread 一直从BlockingQueue中获取新任务,如果获得那么执行,如果获得不了那么进入等待,直到获得新的任务。这样就保证线程可以执行多个任务。
线程池任务执行的流程:appThread 把Task 交给 ThreadPoolManager ,ThreadPoolManager 把Task交给 BlockingQueue,WorkThread 一直在等待从BlockingQueue 中获得Task执行。
注意:appThread 和 workThread 是通过 BlockingQueue 进行解耦,实现了异步执行。
那么下面我们分析下java中ThreadPoolExecutor 的实现,以更好的理解线程池~
任务队列:本实现中使用的Java中的任务队列是BlockingQueue 接口,可以是ArrayBlockingQueue 也可以是LinkedBlockingQueue等。
/** *任务队列 */ private final BlockingQueue<Runnable> workQueue;
/** * 工作线程 */ private final class Worker implements Runnable { /** * 当task被执行时候,runLock 必须获取,并在执行完后释放。主要是用来,通过打断工作线程来取消task执行的场景。 */ private final ReentrantLock runLock = new ReentrantLock(); /** * 进入run loop之前 初始化的第一个任务 */ private Runnable firstTask; /** 每个线程完成的任务的计数,最后termination的时候,统计到completedTaskCount */ volatile long completedTasks; /** * worker 运行的线程 */ Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } /** * 如果当前线程没有执行任务,那么中断线程(关键是runLock 锁,一旦能获得锁,就意味着没有任务在执行) */ void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } /** * Interrupts thread even if running a task. */ void interruptNow() { thread.interrupt(); } /** * Runs a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { /* * Ensure that unless pool is stopping, this thread * does not have its interrupt set. This requires a * double-check of state in case the interrupt was * cleared concurrently with a shutdownNow -- if so, * the interrupt is re-enabled. */ if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); /* ran: 是用来保证afterExecute 肯定会被执行!! */ boolean ran = false; beforeExecute(thread, task); //任务执行前的扩展点 try { task.run(); //执行任务 ran = true; afterExecute(task, null); //任务执行后的扩展点 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * thread(start ->run ) -->work(->run)->runTask */ public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null //注意:如果 task != null 那么runTask 当前任务!!! || (task = getTask()) != null) { //如果task == null,并且getTask不为null 那么runTask 新任务 runTask(task); task = null; } } finally { workerDone(this); } } }
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; public class ThreadPoolExecutor extends AbstractExecutorService { /** * Permission for checking shutdown */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); /** * runState 功能:提供线程池生命周期的控制 * * RUNNING: 可以接受新任务,并处理队列中的任务。 * SHUTDOWN: 不再接受新任务,但是会处理队列中的任务。 * STOP: 不再接受新任务,也不处理队列中的人,并且打断正在处理中的任务 * TERMINATED: 和stop一样,并且关闭所有的线程 * * 状态转换: * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TERMINATED * When both queue and pool are empty * STOP -> TERMINATED: * When pool is empty */ volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3; /** * 持有此锁 来更新o poolSize, corePoolSize, * maximumPoolSize, runState, and workers set. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 支持awaitTermination 的条件 */ private final Condition termination = mainLock.newCondition(); /** * 线程池中的所有工作线程,必须在持有mainlock的情况下更新 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * allowCoreThreadTimeOut = ture时的参考量 */ private volatile long keepAliveTime; /** * 是否允许CoreThread 过期,默认是false ,core thread 会一直alive 即便是空闲。如果为ture,那么会等待keepAliveTime ,超时会关闭 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程数目。小于corePoolSize 会一直创建新线程来处理任务。 */ private volatile int corePoolSize; /** * 最大线程数目 */ private volatile int maximumPoolSize; /** * 持有mainLock 锁定情况下才能更新的 当前线程数目 */ private volatile int poolSize; /** * 任务丢弃策略 */ private volatile RejectedExecutionHandler handler; /** * 线程工厂 */ private volatile ThreadFactory threadFactory; /** * 线程池中的最大数目 */ private int largestPoolSize; /** * 已经完成的任务数 */ private long completedTaskCount; /** * 默认的丢弃策略 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** *创建线程池的构造函数 * * @param corePoolSize 池中所保存的线程数,包括空闲线程。 * @param maximumPoolSize 池中允许的最大线程数。超过最大线程数,将执行RejectedExecutionHandler 逻辑 * @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 * @param unit 参数的时间单位。 * @param workQueue 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 * @param threadFactory 执行程序创建新线程时使用的工厂。 * @param handler 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /* * Support for execute(). * * 方法 execute() 处理各种提交一个新task情况,execute 分三个步骤执行 * * 1. 当线程数目小于corePoolSize 时,会根据提供的command命令启动一个新的线程,command 会是第一个执行的任务 * 此时会调用addIfUnderCorePoolSize 来检测 runState and pool size * * 2. 把任务放到队列里。 * * 3. 如果队列不能接受command任务。那么我们试图新增一个线程, 通过addIfUnderMaximumPoolSize 来完成创建新线程来执行任务。 * * 如果失败,那么交由RejectedExecutionHandler来处理拒绝的逻辑。 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //如果 poolSize >= corePoolSize 失败,那么addIfUnderCorePoolSize 进行创建新线程处理 if (runState == RUNNING && workQueue.offer(command)) { //如果poolsize > =corePoolSize,或者创建新线程失败了,那么向workQueue队列里放置任务 if (runState != RUNNING || poolSize == 0) //如果runState状态发生变化,那么调用;如果状态没有变化,那么无所谓又其他线程执行! ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) //如果向workQueue队列放置任务失败,那么执行addIfUnderMaximumPoolSize 添加新新线程执行command任务 reject(command); // 如果addIfUnderMaximumPoolSize 失败,那么走拒绝任务执行流程 } } /** * 创建一个(执行首个任务为firstTask)的线程,并返回 * @param firstTask 当前线程执行的第一个任务。firstTask 可以为null */ private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //注意:非常关键thread.run() 委托给了 Worker.run() 调用。woker的run调用是循环获得任务(有可能等待的)! if (t != null) { w.thread = t; //添加工作线程 workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) //调节最大线程数 largestPoolSize = nt; } return t; } /** pool 没有shutdown,并且poolSize(当前活动的线程数) <corePoolSize,创建一个新线程 */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) //注意条件!! t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } /** * 创建线程 当前线程数小于 maximumPoolSize ,并且线程池没有关闭 */ private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } /** * 插入一个任务后,如果pool state 的状态发生变化(可能被并发的调用shutdownNow),再次做检查 */ private void ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean reject = false; Thread t = null; try { int state = runState; if (state != RUNNING && workQueue.remove(command)) //如果 state != RUNNING 意味着 可能已经关闭。 reject = true; else if (state < STOP && //此处只可能是shutdown,那么如果 poolSize <corePoolSize,并且workQueue.isEmpty()=false ,那么新生成线程来处理任务。 poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) t = addThread(null); } finally { mainLock.unlock(); } if (reject) reject(command); //丢弃这个任务 else if (t != null) t.start(); } /** * 丢弃一个任务 */ void reject(Runnable command) { handler.rejectedExecution(command, this); } /* Utilities for worker thread control */ Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) //注意:如果线程池已经关闭(此时为shutdownNow状态,不分配任务),那么不继续执行其他任务 return null; Runnable r; if (state == SHUTDOWN) //注意: 如果是shutdown ,此时为shutdown状态,需要把队列中的任务执行完毕!!! r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // 如果当前线程poolSize >corePoolSize(任务比较多哈) 或者允许核心线程过期, 那么从队列中取限时任务 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); //注意:线addIfUnderMaximumPoolSize程池的最关键的实现在这里,如果没有任务,那么进入等待!!!! if (r != null) return r; //如果没有取到数据,则表示是不是需要关闭线程池里面的一些线程了 if (workerCanExit()) { //如果工作线程允许退出(调用shutdownnow,队列为空,允许线程过期并且线程数目多于1个) if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //终端当前线程 return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } } /** * 检查是否一个worker线程没有获得一个task的时候,允许退出。如果pool 是stop 或者队列为空,或者允许核心线程过期并且线程数多于1个。 */ private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; } /** * 唤醒所有的出于等待任务的线程。 */ void interruptIdleWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfIdle(); } finally { mainLock.unlock(); } } /** * 记录已经退出的worker 完成的任务数量 */ void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); if (--poolSize == 0) tryTerminate(); } finally { mainLock.unlock(); } } /* Termination support. */ /** * 终止线程池 */ private void tryTerminate() { if (poolSize == 0) { int state = runState; if (state < STOP && !workQueue.isEmpty()) { //如果活动能够队列不为空,那么不能终止线程池 state = RUNNING; // disable termination check below Thread t = addThread(null); if (t != null) t.start(); } if (state == STOP || state == SHUTDOWN) { runState = TERMINATED; termination.signalAll(); terminated(); } } } public void shutdown() { SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } int state = runState; if (state < SHUTDOWN) runState = SHUTDOWN; //保证不再接受新的任务 try { for (Worker w : workers) { w.interruptIfIdle();//关闭空闲的worker线程 } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } tryTerminate(); // Terminate now if pool and queue empty } finally { mainLock.unlock(); } } /** * 立即关闭线程池 */ public List<Runnable> shutdownNow() { /* * shutdownNow differs from shutdown only in that * 1. runState is set to STOP, * 2. all worker threads are interrupted, not just the idle ones, and * 3. the queue is drained and returned. */ SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } int state = runState; if (state < STOP) runState = STOP; try { for (Worker w : workers) { //所有工作线程立即关闭! w.interruptNow(); } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } List<Runnable> tasks = drainQueue(); tryTerminate(); // Terminate now if pool and queue empty return tasks; } finally { mainLock.unlock(); } } /** * 调用shutdownNow 的时候,把任务列表项组织成新的list */ private List<Runnable> drainQueue() { List<Runnable> taskList = new ArrayList<Runnable>(); workQueue.drainTo(taskList); while (!workQueue.isEmpty()) { Iterator<Runnable> it = workQueue.iterator(); try { if (it.hasNext()) { Runnable r = it.next(); if (workQueue.remove(r)) taskList.add(r); } } catch (ConcurrentModificationException ignore) { } } return taskList; } /** 返回是否在running状态*/ public boolean isShutdown() { return runState != RUNNING; } /** * 返回线程池执行器是否调用shutdown或者shutdownnow */ public boolean isTerminating() { int state = runState; return state == SHUTDOWN || state == STOP; } public boolean isTerminated() { return runState == TERMINATED; } /** 等待结束当前线程池*/ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { //对于awaitNanos 的情况,一般需要循环!! if (runState == TERMINATED) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } protected void finalize() { shutdown(); } /** * 设置线程生产工厂 */ public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } /** * 获取线程生产工厂 */ public ThreadFactory getThreadFactory() { return threadFactory; } /** * 设置任务丢弃策略 */ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null) throw new NullPointerException(); this.handler = handler; } /** * 获取任务丢弃策略 */ public RejectedExecutionHandler getRejectedExecutionHandler() { return handler; } /** * Sets the core number of threads. This overrides any value set * in the constructor. If the new value is smaller than the * current value, excess existing threads will be terminated when * they next become idle. If larger, new threads will, if needed, * be started to execute any queued tasks. * * @param corePoolSize the new core size * @throws IllegalArgumentException if <tt>corePoolSize</tt> * less than zero * @see #getCorePoolSize */ public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int extra = this.corePoolSize - corePoolSize; this.corePoolSize = corePoolSize; if (extra < 0) { int n = workQueue.size(); // don't add more threads than tasks while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) { Thread t = addThread(null); if (t != null) t.start(); else break; } } else if (extra > 0 && poolSize > corePoolSize) { try { Iterator<Worker> it = workers.iterator(); while (it.hasNext() && extra-- > 0 && poolSize > corePoolSize && workQueue.remainingCapacity() == 0) it.next().interruptIfIdle(); } catch (SecurityException ignore) { // Not an error; it is OK if the threads stay live } } } finally { mainLock.unlock(); } } /** * 获取核心线程的数量 */ public int getCorePoolSize() { return corePoolSize; } /** * 预先启动一个核心线程 */ public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); } /** * 预先启动所有的核心线程 */ public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null)) ++n; return n; } public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; } public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); allowCoreThreadTimeOut = value; } /** * Sets the maximum allowed number of threads. This overrides any * value set in the constructor. If the new value is smaller than * the current value, excess existing threads will be * terminated when they next become idle. * * @param maximumPoolSize the new maximum * @throws IllegalArgumentException if the new maximum is * less than or equal to zero, or * less than the {@linkplain #getCorePoolSize core pool size} * @see #getMaximumPoolSize */ public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int extra = this.maximumPoolSize - maximumPoolSize; this.maximumPoolSize = maximumPoolSize; if (extra > 0 && poolSize > maximumPoolSize) { try { Iterator<Worker> it = workers.iterator(); while (it.hasNext() && extra > 0 && poolSize > maximumPoolSize) { it.next().interruptIfIdle(); --extra; } } catch (SecurityException ignore) { } } } finally { mainLock.unlock(); } } /** * 返回最大的线程数 */ public int getMaximumPoolSize() { return maximumPoolSize; } public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); this.keepAliveTime = unit.toNanos(time); } /** * 返回线程的存活时间 */ public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } /* User-level queue utilities */ /** *返回任务队列。(调试用) * * @return the task queue */ public BlockingQueue<Runnable> getQueue() { return workQueue; } public boolean remove(Runnable task) { return getQueue().remove(task); } /** * Tries to remove from the work queue all {@link Future} * tasks that have been cancelled. This method can be useful as a * storage reclamation operation, that has no other impact on * functionality. Cancelled tasks are never executed, but may * accumulate in work queues until worker threads can actively * remove them. Invoking this method instead tries to remove them now. * However, this method may fail to remove tasks in * the presence of interference by other threads. */ public void purge() { // Fail if we encounter interference during traversal try { Iterator<Runnable> it = getQueue().iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future<?>) { Future<?> c = (Future<?>)r; if (c.isCancelled()) it.remove(); } } } catch (ConcurrentModificationException ex) { return; } } /** * 返回池中当前的线程数量 */ public int getPoolSize() { return poolSize; } /** * 返回活动的线程数量 */ public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) { if (w.isActive()) ++n; } return n; } finally { mainLock.unlock(); } } /** * 返回最大的poolsize值。 */ public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } /** * 返回所有提交过的任务数量(近似值) */ public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; //累加池中每个线程已经完成的任务数 if (w.isActive()) //如果线程是活跃的,表明其正在执行任务 ++n; } return n + workQueue.size();//累加上缓冲队列里的任务 } finally { mainLock.unlock(); } } /** * 返回所有已经执行完的任务的数量(近似值) */ public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } }
/* Extension hooks */ /** * 任务执行请执行的方法。子类可以覆盖此方法 */ protected void beforeExecute(Thread t, Runnable r) { } /** * 每个任务执行完毕执行的方法.子类可以覆盖此方法 */ protected void afterExecute(Runnable r, Throwable t) { } /** * Executor 关闭时,执行的方法。子类可以覆盖此方法 */ protected void terminated() { }
/** * 在当前的调用线程中执行此任务的策略 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } /** * 抛出异常的丢弃 */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); } } /** * 直接丢弃不做任何处理 */ public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } /** * 用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。 */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
