`
wbj0110
  • 浏览: 1610955 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

ThreadPoolExecutor源码分析

    博客分类:
  • Java
阅读更多

package java.util.concurrent;

import java.util.concurrent.locks.*;

import java.util.*;

 

 

public class ThreadPoolExecutor extends AbstractExecutorService {

 

 

    /**

     * runState provides the main lifecyle control, taking on values:

     *

     *   RUNNING:  Accept new tasks and process queued tasks

     *   SHUTDOWN: Don't accept new tasks, but process queued tasks

     *   STOP:     Don't accept new tasks, don't process queued tasks,

     *             and interrupt in-progress tasks

     *   TERMINATED: Same as STOP, plus all threads have terminated

     *

     * The numerical order among these values matters, to allow

     * ordered comparisons. The runState monotonically increases over

     * time, but need not hit each state. The transitions are:

     *

     * RUNNING -> SHUTDOWN

     *    On invocation of shutdown(), perhaps implicitly in finalize()

     * (RUNNING or SHUTDOWN) -> STOP

     *    On invocation of shutdownNow()

     * SHUTDOWN -> TERMINATED

     *    When both queue and pool are empty

     * STOP -> TERMINATED

     *    When pool is empty

     */

    volatile int runState; // 当前状态

    static final int RUNNING    = 0; // 运行状态

    static final int SHUTDOWN   = 1; // 关闭状态

    static final int STOP       = 2; // 强制关闭状态

    static final int TERMINATED = 3; // 结束状态

 

    // 任务队列

    private final BlockingQueue<Runnable> workQueue;

 

    /**

     * Lock held on updates to poolSize, corePoolSize,

     * maximumPoolSize, runState, and workers set.

     */

 

    // 同步锁

    private final ReentrantLock mainLock = new ReentrantLock();

 

    /**

     * Wait condition to support awaitTermination

     */

    // 线程通信

    private final Condition termination = mainLock.newCondition();

 

    /**

     * Set containing all worker threads in pool. Accessed only when

     * holding mainLock.

     */

 

    // Worker集合

    private final HashSet<Worker> workers = new HashSet<Worker>();

 

    private volatile long  keepAliveTime; // 线程池维护线程所允许的空闲时间

 

    /**

     * If false (default) core threads stay alive even when idle.  If

     * true, core threads use keepAliveTime to time out waiting for

     * work.

     */

 

    // 空闲的时候让线程等待keepAliveTime,timeout后使得poolSize能够降为0

    private volatile boolean allowCoreThreadTimeOut;

 

    /**

     * Core pool size, updated only while holding mainLock, but

     * volatile to allow concurrent readability even during updates.

     */

    private volatile int   corePoolSize; // 线程池维护线程的最少数量

 

    /**

     * Maximum pool size, updated only while holding mainLock but

     * volatile to allow concurrent readability even during updates.

     */

    private volatile int   maximumPoolSize; // 线程池维护线程的最大数量

 

    /**

     * Current pool size, updated only while holding mainLock but

     * volatile to allow concurrent readability even during updates.

     */

    private volatile int   poolSize; // 线程池当前线程数量

 

    /**

     * Handler called when saturated or shutdown in execute.

     */

    private volatile RejectedExecutionHandler handler; // 线程池对拒绝任务的处理策略

 

    private volatile ThreadFactory threadFactory; // 线程创建工厂

 

    /**

     * Tracks largest attained pool size.

     */

    private int largestPoolSize; // 工作线程的最大数量计数

 

    /**

     * Counter for completed tasks. Updated only on termination of

     * worker threads.

     */

    private long completedTaskCount; // 完成任务计数

 

    /**

     * The default rejected execution handler

     */

    private static final RejectedExecutionHandler defaultHandler =

        new AbortPolicy(); // 默认线程池拒绝任务的处理类

 

     

    /*

     * 1.核心线程(任务):我们定义的线程,即实现了Runnable接口的类,是我们将要放到线程池中执行的类.

     * 2.工作线程:由线程池中创建的线程,是用来获得核心线程并执行核心线程的线程(比较拗口哦,具体看代码就知道是什么东东了).

     */

 

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

             Executors.defaultThreadFactory(), defaultHandler);

    }

 

 

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

             threadFactory, defaultHandler);

    }

 

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              RejectedExecutionHandler handler) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

             Executors.defaultThreadFactory(), handler);

    }

 

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler) {

        if (corePoolSize < 0 ||

            maximumPoolSize <= 0 ||

            maximumPoolSize < corePoolSize ||

            keepAliveTime < 0)

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

    }

 

 

    /*

     * 核心和最大池大小

     * ThreadPoolExecutor 将根据 corePoolSize(参见 getCorePoolSize())和 maximumPoolSize(参见 getMaximumPoolSize())

     * 设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,

     * 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。

     * 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。

     * 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。

     * 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。

     * 在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

     */

 

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

 

        // 如果工作线程数大于等于线程池最大线程数或者工作线程数量小于线程池内最小线程数

        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

            // 如果为运行状态则只不在创建工作队列而只向工作队列添加任务

            if (runState == RUNNING && workQueue.offer(command)) {

                // 当状态为非运行或者第一个核心线程执行

                if (runState != RUNNING || poolSize == 0)

                    ensureQueuedTaskHandled(command);

            }

            else if (!addIfUnderMaximumPoolSize(command))

                reject(command); // is shutdown or saturated

        }

    }

 

 

    // 添加工作线程

    private Thread addThread(Runnable firstTask) {

        // 创建Worker(包装Runnable)

        Worker w = new Worker(firstTask);

        // 创建运行线程

        Thread t = threadFactory.newThread(w);

 

        // 向工作队列添加Worker并将当前线程池计数+1

        // 重新设置线程池最大计数

        if (t != null) {

            w.thread = t;

            workers.add(w);

            int nt = ++poolSize;

            if (nt > largestPoolSize)

                largestPoolSize = nt;

        }

        return t;

    }

 

    // 线程池中线程数量小于最小线程处理方法

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {

        Thread t = null;

 

        // 获得锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

 

        try {

            // 当前池中线程数量小于最小线程数并且状态为运行

            if (poolSize < corePoolSize && runState == RUNNING)

                t = addThread(firstTask); // 创建新工作线程

        } finally {

            mainLock.unlock(); // 释放锁

        }

 

        // 如果线程对象不为空则运行

        if (t == null) return false;

        t.start();

        return true;

    }

 

 

    // 线程池中线程数量小于最大线程池线程数

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

        Thread t = null;

        // 获得锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

 

        try {

            // 当前工作线程数小于最大线程池大小并且状态为运行

            if (poolSize < maximumPoolSize && runState == RUNNING)

                t = addThread(firstTask); // 创建新工作线程

        } finally {

            mainLock.unlock(); // 释放锁

        }

        // 如果线程对象不为空运行线程

        if (t == null) return false;

        t.start();

        return true;

    }

 

    // 当状态为非运行或者第一个核心线程执行

    private void ensureQueuedTaskHandled(Runnable command) {

        // 获得锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

 

        boolean reject = false;

        Thread t = null;

 

        try {

            // 在非运行状态向???

            int state = runState;

            if (state != RUNNING && workQueue.remove(command))

                reject = true; // ???

 

            // 如果状态为非强制关闭 并且

            // 当前工作线程大小小于最小线程池大小

            // 并且 任务队列为非空

            else if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty())

                t = addThread(null); // 创建工作线程

        } finally {

            mainLock.unlock(); // 释放锁

        }

        if (reject) reject(command); // ???

        else if (t != null) t.start(); // 工作线程不为空,则执行

    }

 

    /**

     * Invokes the rejected execution handler for the given command.

     */

    // 用处理类处理溢出的任务

    void reject(Runnable command) {

        handler.rejectedExecution(command, this);

    }

 

 

    private final class Worker implements Runnable {

        /**

         * The runLock is acquired and released surrounding each task

         * execution. It mainly protects against interrupts that are

         * intended to cancel the worker thread from instead

         * interrupting the task being run.

         */

        private final ReentrantLock runLock = new ReentrantLock();

 

        /**

         * Initial task to run before entering run loop. Possibly null.

         */

        private Runnable firstTask; // 首任务

 

        /**

         * Per thread completed task counter; accumulated

         * into completedTaskCount upon termination.

         */

        volatile long completedTasks;

 

        /**

         * Thread this worker is running in.  Acts as a final field,

         * but cannot be set until thread is created.

         */

        Thread thread;

 

        Worker(Runnable firstTask) {

            this.firstTask = firstTask;

        }

 

        // 工作线程是否正在运行

        boolean isActive() {

            return runLock.isLocked();

        }

 

        /**

         * Interrupts thread if not running a task.

         */

        void interruptIfIdle() {

            final ReentrantLock runLock = this.runLock;

            // ? 注意这个条件,摆明的就是要等Worker中runTask()方法运行完后才成立。

            if (runLock.tryLock()) {

                try {

                    thread.interrupt();

                } finally {

                    runLock.unlock();

                }

            }

        }

 

        /**

         * Interrupts thread even if running a task.

         */

 

        // 中断当前工作线程

        void interruptNow() {

            thread.interrupt();

        }

 

        /**

         * Runs a single task between before/after methods.

         */

        private void runTask(Runnable task) {

 

            // 获取锁

            final ReentrantLock runLock = this.runLock;

            runLock.lock();

 

            try {

                /*

                 * Ensure that unless pool is stopping, this thread

                 * does not have its interrupt set. This requires a

                 * double-check of state in case the interrupt was

                 * cleared concurrently with a shutdownNow -- if so,

                 * the interrupt is re-enabled.

                 */

 

                // ???

                if (runState < STOP && Thread.interrupted() && runState >= STOP)

                    thread.interrupt();

 

                /*

                 * Track execution state to ensure that afterExecute

                 * is called only if task completed or threw

                 * exception. Otherwise, the caught runtime exception

                 * will have been thrown by afterExecute itself, in

                 * which case we don't want to call it again.

                 */

                boolean ran = false; // 是否执行

                beforeExecute(thread, task); // 当前任务执行前调用

                try

                    task.run(); // 执行任务

                    ran = true;

                    afterExecute(task, null); // 当前任务执行后调用

                    ++completedTasks; // 更新执行完成任务计数

                } catch (RuntimeException ex) {

                    if (!ran) // 保证alterExcute被执行

                        afterExecute(task, ex);

                    throw ex;

                }

            } finally {

                runLock.unlock(); // 释放锁

            }

        }

 

        /**

         * Main run loop

         */

        public void run() {

            try {

                Runnable task = firstTask;

                firstTask = null;

 

                // 如果任务为空则尝试获取下一个任务

                while (task != null || (task = getTask()) != null) {

                    runTask(task); // 运行任务

                    task = null;

                }

            } finally {

                workerDone(this); // 如果当前线程池大小为0则尝关闭

            }

        }

    }

 

 

     // 从队列中获取工作任务

    Runnable getTask() {

        for (;;) {

            try {

                int state = runState; // 获取运行状态

                if (state > SHUTDOWN) // 如果正在关闭

                    return null; // 不再获取任务

 

                Runnable r;

                if (state == SHUTDOWN)  // SHUTDOWN状态,需要把剩下的任务执行完成

                    r = workQueue.poll(); // 取任务

                else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // 池中线程数目大于核心池数目,任务较多. 或者允许核心线程池获取任务超时

                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); // 快速获取任务(过期时间纳秒),过期返回null

                else

                    r = workQueue.take(); // 任务较少使用阻塞方法取任务

 

                if (r != null) return r; // 返回任务

 

                if (workerCanExit()) {

                    // 如果整个线程池状态变为SHUTDOWN和TERMINATED之间,那么中断所有Worker(interrupt)

                    if (runState >= SHUTDOWN) // Wake up others

                        interruptIdleWorkers();

                    return null;

                }

                // Else retry

            } catch (InterruptedException ie) {

                // 获取中断异常并忽略

                // On interruption, re-check runState

            }

        }

    }

 

    // Worker从工作队列中去不到任务时调用此方法判断是否跳出循环并结束Worker的运行

    private boolean workerCanExit() {

        // 获取并加锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

 

        boolean canExit; // 是否可以关闭

        try {

            // 运行状态是否为STOP和SHUTDOWN 或者任务队列为空

            // 或者允许线程池线程空闲超时并且当前池大于核心池

            canExit = runState >= STOP || workQueue.isEmpty() ||

                (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize));

        } finally {

            mainLock.unlock(); // 总是释放锁

        }

        return canExit; // 返回是否能关闭(boolean)

    }

 

    // 中断工作队列中所有线程

    void interruptIdleWorkers() {

        // 获取锁并加锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

 

        // 循环工作队列中所有Worker并执行中断

        try {

            for (Worker w : workers)

                w.interruptIfIdle();

        } finally {

            mainLock.unlock(); // 释放锁

        }

    }

 

    // Worker执行完成

    void workerDone(Worker w) {

        // 获取并加锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

 

        try {

            // 累加完成任务总计

            completedTaskCount += w.completedTasks;

            workers.remove(w); // 从工作队列中移出当前Worker

 

            // 如果当前线程池大小等于0时尝试终止

            if (--poolSize == 0) tryTerminate();

        } finally {

            mainLock.unlock(); // 释放锁

        }

    }

 

    private void tryTerminate() {

        if (poolSize == 0) { // 如果当前线程数为0

            int state = runState;

            // 如果运行状态小于STOP 并且工作队列不为空

            if (state < STOP && !workQueue.isEmpty()) {

                // 运行状态更改为RUNNING

                state = RUNNING; // disable termination check below

                // 向工作队列添加新的Worker并执行

                Thread t = addThread(null);

                if (t != null) t.start();

            }

            if (state == STOP || state == SHUTDOWN) {

                // 如果运行状态为STOP或者SHUTDOWN时

                // 更改状态为TERMINATED并通知等待线程,和结束运行

                runState = TERMINATED;

                termination.signalAll();

                terminated();

            }

        }

    }

 

 

    public void shutdown() {

 

 

    SecurityManager security = System.getSecurityManager();

    if (security != null)

            security.checkPermission(shutdownPerm);

 

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (security != null) { // Check if caller can modify our threads

                for (Worker w : workers)

                    security.checkAccess(w.thread);

            }

 

            int state = runState;

            if (state < SHUTDOWN)

                runState = SHUTDOWN;

 

            try {

                for (Worker w : workers) {

                    w.interruptIfIdle();

                }

            } catch (SecurityException se) { // Try to back out

                runState = state;

                // tryTerminate() here would be a no-op

                throw se;

            }

 

            tryTerminate(); // Terminate now if pool and queue empty

        } finally {

            mainLock.unlock();

        }

    }

 

 

    public List<Runnable> shutdownNow() {

        /*

         * shutdownNow differs from shutdown only in that

         * 1. runState is set to STOP,

         * 2. all worker threads are interrupted, not just the idle ones, and

         * 3. the queue is drained and returned.

         */

    SecurityManager security = System.getSecurityManager();

    if (security != null)

            security.checkPermission(shutdownPerm);

 

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (security != null) { // Check if caller can modify our threads

                for (Worker w : workers)

                    security.checkAccess(w.thread);

            }

 

            int state = runState;

            if (state < STOP)

                runState = STOP;

 

            try {

                for (Worker w : workers) {

                    w.interruptNow();

                }

            } catch (SecurityException se) { // Try to back out

                runState = state;

                // tryTerminate() here would be a no-op

                throw se;

            }

 

            List<Runnable> tasks = drainQueue();

            tryTerminate(); // Terminate now if pool and queue empty

            return tasks;

        } finally {

            mainLock.unlock();

        }

    }

 

    private List<Runnable> drainQueue() {

        List<Runnable> taskList = new ArrayList<Runnable>();

        workQueue.drainTo(taskList);

        /*

         * If the queue is a DelayQueue or any other kind of queue

         * for which poll or drainTo may fail to remove some elements,

         * we need to manually traverse and remove remaining tasks.

         * To guarantee atomicity wrt other threads using this queue,

         * we need to create a new iterator for each element removed.

         */

        while (!workQueue.isEmpty()) {

            Iterator<Runnable> it = workQueue.iterator();

            try {

                if (it.hasNext()) {

                    Runnable r = it.next();

                    if (workQueue.remove(r))

                        taskList.add(r);

                }

            } catch (ConcurrentModificationException ignore) {

            }

        }

        return taskList;

    }

 

    public boolean isShutdown() {

        return runState != RUNNING;

    }

 

    // 判断是否处在关闭但尚未完成的过程中

    public boolean isTerminating() {

        int state = runState;

        return state == SHUTDOWN || state == STOP;

    }

 

    // 判断是否已关闭并且所有任务都完成

    public boolean isTerminated() {

        return runState == TERMINATED;

    }

 

    // 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行.

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {

        long nanos = unit.toNanos(timeout);

 

        // 获取锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

 

        try {

            for (;;) {

                // 如果已经关闭返回true

                if (runState == TERMINATED) return true;

                // 等待超时返回false

                if (nanos <= 0) return false;

                // 释放锁并挂起等待关闭方法的通知

                nanos = termination.awaitNanos(nanos);

            }

        } finally {

            mainLock.unlock(); // 释放锁

        }

    }

 

    protected void finalize()  {

        shutdown();

    }

 

    public void setThreadFactory(ThreadFactory threadFactory) {

        if (threadFactory == null)

            throw new NullPointerException();

        this.threadFactory = threadFactory;

    }

 

    public ThreadFactory getThreadFactory() {

        return threadFactory;

    }

 

    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {

        if (handler == null)

            throw new NullPointerException();

        this.handler = handler;

    }

 

    public RejectedExecutionHandler getRejectedExecutionHandler() {

        return handler;

    }

 

    public void setCorePoolSize(int corePoolSize) {

        if (corePoolSize < 0)

            throw new IllegalArgumentException();

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            int extra = this.corePoolSize - corePoolSize;

            this.corePoolSize = corePoolSize;

            if (extra < 0) {

                int n = workQueue.size(); // don't add more threads than tasks

                while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {

                    Thread t = addThread(null);

                    if (t != null)

                        t.start();

                    else

                        break;

                }

            }

            else if (extra > 0 && poolSize > corePoolSize) {

                try {

                    Iterator<Worker> it = workers.iterator();

                    while (it.hasNext() &&

                           extra-- > 0 &&

                           poolSize > corePoolSize &&

                           workQueue.remainingCapacity() == 0)

                        it.next().interruptIfIdle();

                } catch (SecurityException ignore) {

                    // Not an error; it is OK if the threads stay live

                }

            }

        } finally {

            mainLock.unlock();

        }

    }

 

    public int getCorePoolSize() {

        return corePoolSize;

    }

 

    public boolean prestartCoreThread() {

        return addIfUnderCorePoolSize(null);

    }

 

    public int prestartAllCoreThreads() {

        int n = 0;

        while (addIfUnderCorePoolSize(null))

            ++n;

        return n;

    }

 

    public boolean allowsCoreThreadTimeOut() {

        return allowCoreThreadTimeOut;

    }

 

 

    public void allowCoreThreadTimeOut(boolean value) {

        if (value && keepAliveTime <= 0)

            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

 

        allowCoreThreadTimeOut = value;

    }

 

 

    public void setMaximumPoolSize(int maximumPoolSize) {

        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)

            throw new IllegalArgumentException();

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            int extra = this.maximumPoolSize - maximumPoolSize;

            this.maximumPoolSize = maximumPoolSize;

            if (extra > 0 && poolSize > maximumPoolSize) {

                try {

                    Iterator<Worker> it = workers.iterator();

                    while (it.hasNext() &&

                           extra > 0 &&

                           poolSize > maximumPoolSize) {

                        it.next().interruptIfIdle();

                        --extra;

                    }

                } catch (SecurityException ignore) {

                    // Not an error; it is OK if the threads stay live

                }

            }

        } finally {

            mainLock.unlock();

        }

    }

 

    public int getMaximumPoolSize() {

        return maximumPoolSize;

    }

 

 

    public void setKeepAliveTime(long time, TimeUnit unit) {

        if (time < 0)

            throw new IllegalArgumentException();

        if (time == 0 && allowsCoreThreadTimeOut())

            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

        this.keepAliveTime = unit.toNanos(time);

    }

 

    public long getKeepAliveTime(TimeUnit unit) {

        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);

    }

 

 

    // 返回执行程序的任务队列

    public BlockingQueue<Runnable> getQueue() {

        return workQueue;

    }

 

 

    // 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。

    public boolean remove(Runnable task) {

        return getQueue().remove(task);

    }

 

 

    public void purge() {

        // Fail if we encounter interference during traversal

        try {

            Iterator<Runnable> it = getQueue().iterator();

            while (it.hasNext()) {

                Runnable r = it.next();

                if (r instanceof Future<?>) {

                    Future<?> c = (Future<?>)r;

                    if (c.isCancelled())

                        it.remove();

                }

            }

        }

        catch (ConcurrentModificationException ex) {

            return;

        }

    }

 

 

    // 返回当前线程数

    public int getPoolSize() {

        return poolSize;

    }

 

 

    // 返回主动执行任务的近似线程数

    public int getActiveCount() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            int n = 0;

            for (Worker w : workers) {

                if (w.isActive())

                    ++n;

            }

            return n;

        } finally {

            mainLock.unlock();

        }

    }

 

 

    // 返回曾经同时位于池中的最大线程数。

    public int getLargestPoolSize() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            return largestPoolSize;

        } finally {

            mainLock.unlock();

        }

    }

 

 

    // 返回曾计划执行的近似任务总数

    public long getTaskCount() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            long n = completedTaskCount;

            for (Worker w : workers) {

                n += w.completedTasks;

                if (w.isActive())

                    ++n;

            }

            return n + workQueue.size();

        } finally {

            mainLock.unlock();

        }

    }

 

 

    // 返回已完成执行的近似任务总数

    public long getCompletedTaskCount() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            long n = completedTaskCount;

            for (Worker w : workers)

                n += w.completedTasks;

            return n;

        } finally {

            mainLock.unlock();

        }

    }

 

    /* Extension hooks */

 

 

 

    // 线程执行之前执行(需用户自行实现)

    protected void beforeExecute(Thread t, Runnable r) { }

 

    // 线程执行之后执行(需用户自行实现)

    protected void afterExecute(Runnable r, Throwable t) { }

 

    protected void terminated() { }

 

    public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

 

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                r.run();

            }

        }

    }

 

 

    // 默认线程池拒绝任务的处理类

    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

 

        /**

         * Always throws RejectedExecutionException.

         * @param r the runnable task requested to be executed

         * @param e the executor attempting to execute this task

         * @throws RejectedExecutionException always.

         */

        // 处理方法(抛出异常)

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            throw new RejectedExecutionException();

        }

    }

 

    public static class DiscardPolicy implements RejectedExecutionHandler {

        /**

         * Creates a <tt>DiscardPolicy</tt>.

         */

        public DiscardPolicy() { }

 

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

        }

    }

 

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

 

        public DiscardOldestPolicy() { }

 

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                e.getQueue().poll();

                e.execute(r);

            }

        }

    }

}         

分享到:
评论

相关推荐

    JDK之ThreadPoolExecutor源码分析1

    《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...

    ThreadPoolExecutor源码解析.md

    ThreadPoolExecutor源码解析.md

    线程池ThreadPoolExecutor底层原理源码分析

    线程池ThreadPoolExecutor底层原理源码分析

    线程池ThreadPoolExecutor原理源码分析.md

    ### 线程池 `ThreadPoolExecutor` 原理源码分析 #### 一、概述 线程池作为 Java 并发编程中的重要组件,在实际应用中被广泛使用。其核心类 `ThreadPoolExecutor` 实现了对线程的管理、调度等功能。本文将围绕 `...

    12-线程池ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf

    ### 二、线程池的五种状态变换源码分析 线程池状态的变化主要由以下几个状态表示: - **RUNNING**:线程池接受新的任务并将队列中的任务提交给worker线程。 - **SHUTDOWN**:不再接受新任务,但继续处理队列中的...

    线程池源码分析

    线程池ThreadPoolExecutor的源码分析,含中文注释,深入了解线程池的构造

    11-线程池ThreadPoolExecutor底层原理源码分析(上)-周瑜.pdf

    根据提供的文件信息,我们可以深入探讨线程池`ThreadPoolExecutor`的工作原理及其实现细节,同时也会涉及并发编程中的一些关键概念和技术。 ### 线程池`ThreadPoolExecutor`概述 `ThreadPoolExecutor`是Java中非常...

    JUC并发编程与源码分析视频课.zip

    《JUC并发编程与源码分析视频课》是一门深入探讨Java并发编程的课程,主要聚焦于Java Util Concurrency(JUC)库的使用和源码解析。JUC是Java平台提供的一组高级并发工具包,它极大地简化了多线程编程,并提供了更...

    framework-analysis:对使用Hibernate出现的问题进行分析;Spring、MyBatis、AQS、ThreadPoolExecutor、CountDownLatch核心源码分析

    于是乎到现在的Hibernate、MyBatis、Spring、Spring MVC、AQS、ThreadPoolExecutor、CountDownLatch使用场景和核心源码分析。 感觉自己还是真的菜鸡,有太多框架的底层实现都不怎么了解。 当山头被一座一座攻克时,...

    jdk1.8-source:JDK1.8源码分析包

    JDK1.8源码分析 相关的原始码分析结果会以注解的形式体现到原始码中 已完成部分: ReentrantLock CountDownLatch Semaphore HashMap TreeMap LinkedHashMap ConcurrentHashMap 执行器 ...

    Android 上百实例源码分析以及开源分析 集合打包下

    这个压缩包"Android 上百实例源码分析以及开源分析 集合打包下"提供了丰富的资源,旨在帮助开发者们通过实例学习和剖析Android应用的实现细节。下面将详细讨论这个资源包中的主要知识点。 1. **Android基础知识**:...

    java并发源码分析之实战编程

    "java并发源码分析之实战编程"这个主题深入探讨了Java平台上的并发处理机制,旨在帮助开发者理解并有效地利用这些机制来提高程序性能和可扩展性。在这个专题中,我们将围绕Java并发库、线程管理、锁机制、并发容器...

    java线程池的源码分析.zip

    本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要理解Java线程池的核心类`java.util.concurrent.ThreadPoolExecutor`,它是所有自定义...

    designpattern.zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    forkjoin.zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    disruptor.zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    jmm(1).zip

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    使用线程池ThreadPoolExecutor 抓取论坛帖子列表

    本文将详细讲解如何使用Java中的`ThreadPoolExecutor`来抓取论坛帖子列表,结合源码分析和实用工具的应用。 首先,我们要了解线程池的基本原理。线程池是由一组预先创建的线程组成的,这些线程可以复用,而不是每次...

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    深入理解高并发编程-核心技术原理

    书中还特别提及了线程池的创建和管理,以及通过ThreadPoolExecutor类的源码分析,揭示线程池的工作原理。 在**基础案例篇**,作者通过实际问题,如并发编程中的常见问题和挑战,引导读者理解并发编程中的可见性和...

Global site tag (gtag.js) - Google Analytics