  • 浏览: 78025 次
最近访客 更多访客>>




















维基百科对ThreadPool的解释:In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program. By maintaining a pool of threads, the model increases performance and avoids latency in execution due to frequent creation and destruction of threads for short-lived tasks. The number of available threads is tuned to the computing resources available to the program, such as parallel processors, cores, memory, and network sockets.

我对维基百科的翻译如下:在计算机编程中,线程池是属于计算机程序并发编程中的一种软件设计模式。通常也被称为“replicated workers(重复的工作者)”或者是“worker-crew model”,一个线程池包含多个等待被分配的并发执行的线程。通过持有多个线程的池子模型,避免了线程的频繁创建,以及一些执行短暂任务的线程的频繁销毁,使得性能得到提升。


  • 因为线程是比较昂贵的资源,避免大量重复创建销毁线程,使用者不用关心创建销毁线程
  • 用户提交的任务能够及时的得到处理,提高响应速度
  • 能够更好的监控和管理线程



ExecutorService executorService = Executors.newFixedThreadPool(5);   
executorService.submit(()-> System.out.println("执行线程"));           
executorService.submit(()-> System.out.println("执行线程2"));          




ExecutorService executorService = Executors.newFixedThreadPool(5);   

以下为 Executors.newFixedThreadPool(5); 的内部实现:

* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue.  At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks.  The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());



     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);


  • corePoolSize : 池子(以下内容将会用这个代表线程池)中的线程数,即使是空闲的线程(没干活),也会在池中存在,除非设置allowCoreThreadTimeOut(猜想其是用于标志线程的有效时间,后面我们再看);
  • maximumPoolSize :池子中最多能放多少个线程;
  • keepAliveTime :当线程数大于corePoolSize时,过量的空闲线程将最多等待多长时间,然后被终止;
  • unit :keepAliveTime 时间的单位;
  • workQueue:(比较重要)持有“待执行”的任务队列,该队列只能加入Runnable任务;


该方法中传入的keepAliveTime 为0L,所以,过量的空闲时间将不会有等待时间,而是立马终止;


this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);



public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();

defaultHandler 的内容如下,按字面量的意思是拒绝策略(这里我就翻译的比较残忍了,后面会解释):

 * The default rejected execution handler
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();


     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;





executorService.submit(() -> System.out.println("执行线程"));
executorService.submit(() -> System.out.println("执行线程2"));



public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    return ftask;


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);

内部就是是创建FutrueTask (value主要用于任务执行后的结果返回),下面开始execute(ftask)方法解析:

     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
         * Proceed in 3 steps:
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
        c = ctl.get();
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }else if (!addWorker(command, false))


  • 如果线程数 < corePooiSize, 则尝试创建新的线程来执行当前任务。执行addWorker方法自动检查runState和workerCount变量,如果不符合条件,返回false;
  • 如果该任务成功入队,我们仍然需要双重检测是否应该加入线程,防止当上一次检测完之后,该线程死掉了,或者池子关闭了,所以我们只有再次校验state,有必要的情况下把任务移出队列,或者在当前没有线程的情况下,新建一个线程;
  • 如果任务无法入队,我们尝试创建一个新的线程。如果失败了,那么线程池要么是关闭了,要么是饱和了,那就只好丢弃该任务;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }


int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    c = ctl.get();

如果工作线程数小于corePoolSize,则创建工作线程(这个计算当前线程的值比较绕,orkerCountOf(c) = ((-1 << Integer.SIZE - 3)|0) & ((1 << (Integer.SIZE - 3)) - 1) = 0 ,初始化线程为0),暂不去解析如何创建工作线程(内部实现比较麻烦!)如果创建工作线程成功,则正常返回;

if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);


else if (!addWorker(command, false))



     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
private boolean addWorker(Runnable firstTask, boolean core) {
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c); 
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
    boolean workerStarted = false; //工作线程是否启动
    boolean workerAdded = false;   //工作线程是否被加入
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
            } finally {
            if (workerAdded) {
                workerStarted = true;
    } finally {
        if (! workerStarted)
    return workerStarted;


     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    try {
        if (w != null)
    } finally {


回忆一下,前面我们提到创建一个工作线程new Worker,然后执行其封装的Thread的start方法…;



     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
    private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);

        /** Delegates main run loop to outer runWorker  */
        public void run() {

        // Lock methods
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                return true;
            return false;

        protected boolean tryRelease(int unused) {
            return true;

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                } catch (SecurityException ignore) {


Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);




if (workerAdded) {
    workerStarted = true;


/** Delegates main run loop to outer runWorker  */
public void run() {


final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                } finally {
                    task = null;
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);


while (task != null || (task = getTask()) != null) {
    try {
        beforeExecute(wt, task);
        Throwable thrown = null;
    } finally {
        task = null;

以上代码表明,首次进入时,工作线程应该是携带线程的(当然也有创建不携带任务的工作线程的情况),所以正常来讲,都可以执行run方法,run方法执行完毕之后,worker完成的任务数量递增。接下来重点来了,该线程会继续循环,task ==null的情况下,执行getTask方法,从队列中获取任务,接着执行run方法;


     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     * @param w the worker


  1. 当我们首次进入的时候,有一个初始任务,既然如此就不需要从队列中获取了。否则,只要线程池在运行中,就要从池子中获取任务。如果返回null,则工作线程修改池子的状态或配置参数,并退出。
  2. 在执行任务之前,为了在任务执行过程中不被打断,所以需要加锁;如果池子关闭,保证线程被中断,如果没有关闭,确保线程不被中断。
  3. 每个任务执行之前会先调用beforeExecute,当然有可能会抛出异常,如果抛出异常,就会导致线程死掉(跳出循环,并设置completedAbruptly属性值为true),不让它执行任务。
  4. 假设beforeExecute执行顺利,线程开始执行任务,捕获执行过程中的各种异常,然后交给afterExecute处理。我们分开处理Runtime异常以及Error以及其他Throwables。因为我们不能在Runable的run方法中重新抛出异常,所以封装异常..(这段不是特别理解,有懂得人,评论告诉我),任何抛出的异常都会导致线程被杀死。
  5. 在执行完任务的run方法时,调用afterExecute,这个方法同样会抛出异常,同样也会导致线程死掉。



     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            return null;

        int wc = workerCountOf(c);

        //timed = true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
        try {
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;



     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果是异常完成的情况

        final ReentrantLock mainLock = this.mainLock;
        try {
            completedTaskCount += w.completedTasks;
        } finally {

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            addWorker(null, false);



final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //1. 线程池仍处于运行状态;
        //2. 线程池处于TIDYING 或者 TERMINATE 状态,说明已经在关闭了 不需要重复关闭
        //3. 线程池处于SHUTDOWN 状态并且任务队列不为空,不能终止
        if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
        if (workerCountOf(c) != 0) { // Eligible to terminate

        final ReentrantLock mainLock = this.mainLock;
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
        } finally {
        // else retry on failed CAS


private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                } catch (SecurityException ignore) {
                } finally {
            if (onlyOne)
    } finally {



public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
         * Proceed in 3 steps:
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
        c = ctl.get();
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    else if (!addWorker(command, false))


final void reject(Runnable command) {
    handler.rejectedExecution(command, this);


private volatile RejectedExecutionHandler handler;


public interface RejectedExecutionHandler {

     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);


  1. 1. AbortPolicy.class 的执行策略是直接抛出异常:

    public static class AbortPolicy implements RejectedExecutionHandler {
            * Creates an {@code AbortPolicy}.
       public AbortPolicy() { }
            * Always throws RejectedExecutionException.
            * @param r the runnable task requested to be executed
            * @param e the executor attempting to execute this task
            * @throws RejectedExecutionException always
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           throw new RejectedExecutionException("Task " + r.toString() +
                                                " rejected from " +
  2. 可以看到,CallerRunsPolicy的执行策略是尝试关闭线程池,如果失败了,则直接执行任务;

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
            * Creates a {@code CallerRunsPolicy}.
       public CallerRunsPolicy() { }
            * Executes task r in the caller's thread, unless the executor
            * has been shut down, in which case the task is discarded.
            * @param r the runnable task requested to be executed
            * @param e the executor attempting to execute this task
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
  3. DiscardOldestPolicy的执行策略是尝试关闭线程池,如果关闭失败,则把队列中最老的一个任务丢弃,并且执行当前任务;

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            * Creates a {@code DiscardOldestPolicy} for the given executor.
       public DiscardOldestPolicy() { }
            * Obtains and ignores the next task that the executor
            * would otherwise execute, if one is immediately available,
            * and then retries execution of task r, unless the executor
            * is shut down, in which case task r is instead discarded.
            * @param r the runnable task requested to be executed
            * @param e the executor attempting to execute this task
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
  4. DiscardPolicy的执行策略是,直接丢弃任务;

    public static class DiscardPolicy implements RejectedExecutionHandler {
            * Creates a {@code DiscardPolicy}.
       public DiscardPolicy() { }
            * Does nothing, which has the effect of discarding task r.
            * @param r the runnable task requested to be executed
            * @param e the executor attempting to execute this task
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {


    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();



A thread pool is a group of pre-instantiated, idle threads which stand ready to be given work. These are preferred over instantiating new threads for each task when there is a large number of short tasks to be done rather than a small number of long ones. This prevents having to incur the overhead of creating a thread a large number of times.

Implementation will vary by environment, but in simplified terms, you need the following:

  • A way to create threads and hold them in an idle state. This can be accomplished by having each thread wait at a barrier until the pool hands it work. (This could be done with mutexes as well.)
  • A container to store the created threads, such as a queue or any other structure that has a way to add a thread to the pool and pull one out.
  • A standard interface or abstract class for the threads to use in doing work. This might be an abstract class called Task with an execute() method that does the work and then returns.

When the thread pool is created, it will either instantiate a certain number of threads to make available or create new ones as needed depending on the needs of the implementation.

When the pool is handed a Task, it takes a thread from the container (or waits for one to become available if the container is empty), hands it a Task, and meets the barrier. This causes the idle thread to resume execution, invoking the execute() method of the Task it was given. Once execution is complete, the thread hands itself back to the pool to be put into the container for re-use and then meets its barrier, putting itself to sleep until the cycle repeats.





  • Creating too many threads wastes resources and costs time creating the unused threads.
  • Destroying too many threads requires more time later when creating them again.
  • Creating threads too slowly might result in poor client performance (long wait times).
  • Destroying threads too slowly may starve other processes of resources.





    ### JAVA线程池原理及几种线程池类型的详细介绍 #### 一、线程池的引入背景及重要性 在现代软件开发中,特别是在基于Java的应用程序设计中,线程池技术已经成为提高系统性能和资源利用率的关键手段之一。线程池...



    Java concurrency线程池之线程池原理(一)_动力节点Java学院整理

    Java concurrency线程池之线程池原理(一)_动力节点Java学院整理,动力节点口口相传的Java黄埔军校


    JAVA线程池原理实例详解 JAVA线程池是java中的一种高效的线程管理机制,它可以控制线程的创建、销毁和复用,从而提高系统的性能和可靠性。下面我们将详细介绍JAVA线程池的原理、创建、使用方法及相关注意事项。 一...

    Java 线程池的原理与实现



    本文将深入探讨Java线程池的原理、封装以及相关知识点。 ### 1. Java线程池概念 Java线程池由`java.util.concurrent`包中的`ExecutorService`接口和其子类实现。其中,最常用的是`ThreadPoolExecutor`类,它提供了...




    本文将深入探讨Java线程池的工作原理,并通过实际代码示例,展示如何高效地使用线程池。 Java线程池是并发编程中的一个重要工具,它通过减少线程创建和销毁的开销,提高了程序的性能和资源利用率。理解线程池的...


    ### 自定义实现Java线程池 #### 一、概述 在深入探讨自定义Java线程池之前,我们先简要回顾一下线程池的基本概念及其重要性。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动...









    Java 线程池原理深入分析


    java 线程池实现多并发队列后进先出





    本文所提及的基于Java线程池技术的数据爬虫设计与实现,不仅涉及到了数据爬虫的原理和架构,还包括了多线程编程的知识点,以及线程池技术在数据爬虫中的具体应用。 首先,数据爬虫的基本原理是模拟用户的点击行为,...






    ### Java线程池详解 #### 引言 在现代计算机科学中,线程作为轻量级的进程,已经成为操作系统和应用程序提高并发性、优化资源...理解和掌握Java线程池的原理和使用方法,对于开发高性能、高可靠的Java应用至关重要。

Global site tag (gtag.js) - Google Analytics