`
Donald_Draper
  • 浏览: 979814 次
社区版块
存档分类
最新评论

ThreadPoolExecutor解析三(线程池执行提交任务)

    博客分类:
  • JUC
阅读更多
Executor接口的定义:http://donald-draper.iteye.com/blog/2365625
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
上一篇文章我们看了一下线程工厂、工作线程,拒绝策略,先回顾一下:
   默认的线程池拒绝任务处理策略AbortPolicy,直接抛出RejectedExecutionException;直接丢弃策略DiscardPolicy,丢弃旧的任务DiscardOldestPolicy,调用者执行任务CallerRunsPolicy。DiscardOldestPolicy和CallerRunsPolicy都是在线程池没关闭时,策略才生效,否则关闭直接丢弃任务。
拒绝策略都为ThreadPoolExecutor的内部类。
   默认的线程工厂DefaultThreadFactory为Executors的内部类, 用于创建线程,工厂创建分组相同的线程,交由执行器执行。如果有java.lang.SecurityManager,则用System#getSecurityManager线程组,否则用调用者的线程组。创建的新线程为非守护模式,优先级在 MIN_PRIORITY和MAX_PRIORITY之间,默认为NORM_PRIORITY。可以通过Thread#getName获取线程name,默认为pool-N-thread-M,N为线程池编号,M为线程编号。
   Worker包装了任务线程,主要是为了维护中断控制状态和其他次要状态记录,及任务的执行。Worker同时继承了AQS,在任务线程执行前lock,任务执行完unlock。加锁的目的主要是保护任务线程的执行,线程池唤醒一个任务线程等待任务,而不是中断当前正在执行任务的线程去执行任务。Worker使用了一个 非重入互质锁,而不是ReentrantLock,这样做的目的是以防在任务执行的过程,线程池控制方法的改变,对任务线程执行的影响,比如setCorePoolSize方法。另外为了防止任务线程在实际执行前被中断,我们初始化锁状态为-1,在runWorker方法中,我们会清除它。runWorker执行任务时,首先释放锁,此时锁打开,允许中断,如果线程池正在stop,确保线程池已中断,否则
做执行前工作,执行任务,做执行后工作,如果任务被中断,则工作线程数量减1;
如果任务完成,则更新完成任务数量,从工作任务集中移除工作线程,尝试结束线程池。
    尝试结束线程池,首先检查线程池运行状态如果为运行中,关闭但任务队列不为空,
或线程池工作线程为0,任务队列为空,则直接返回;否则查看工作线程是否为0,不为0,则根据onlyOne参数确定中断多少空闲线程,如果onlyOne为true,中断一个,否则中断所有空闲线程。
今天我们来看任务的提交和执行,如果篇幅够的话,把线程池的关闭也说一下。
先来看任务的提交执行。
首先提交任务:
//AbstractExecutorService
 public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

提交任务方法是在AbstractExecutorService中实现 ,具体的执行任务在
execute方法中,这个方法在Executor其中为抽象方法,
ThreadPoolExecutor重写了这个方法,来看execute方法
/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
	 1.如果工作线程小于核心线程池数量,尝试新建一个工作线程执行任务addWorker。
         addWorker将会自动检查线程池状态和工作线程数,以防在添加工作线程的过程中,
	 线程池被关闭。
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
	 2.如果创建工作线程执行任务失败,则任务入队列,如果入队列成功,
	 我们仍需要二次检查线程池状态,以防在入队列的过程中,线程池关闭。
	 如果线程池关闭,则回滚任务。
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
	 如果任务入队列失败,则尝试创建一个工作线程执行任务
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
	    //如果当前工作线程数小于核心线程池数量,则添加新的工作线程执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
	//如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
	    //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务
            if (! isRunning(recheck) && remove(command))
                reject(command);//移除成功,则进行拒绝任务处理
            else if (workerCountOf(recheck) == 0)
	        //如线程池已关闭,且工作线程为0,则创建一个空闲工作线程
                addWorker(null, false);
        }
       //根据最大线程池数量,判断是否应该添加工作线程,如果当前工作线程数量小于最大线程池数量,则尝试添加
       //工作线程线程执行任务,如果尝试失败,则拒绝任务处理
        else if (!addWorker(command, false))
            reject(command);
    }

执行任务方法中我们与3点要看:
1.工作线程数量小于核心线程池数量,添加工作线程,执行任务;
 int c = ctl.get();
 if (workerCountOf(c) < corePoolSize) {
    //如果当前工作线程数小于核心线程池数量,则添加新的工作线程执行任务
     if (addWorker(command, true))
         return;
     c = ctl.get();
}

我们来看addWorker添加工作线程执行任务;
 /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread#start), we roll back cleanly.
     *
     根据当前线程池状态和核心线程池数量与最大线程池数量,检查是否应该,
     添加工作线程执行任务。如果应该添加工作线程,则更新工作线程数,
     如果调整成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,
     则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于
     线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
		//如果线程池已关闭或线程池正在关闭,提交的任务为null且任务队列不为空,则直接返回false
		//添加工作线程失败。
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
		    //如果工作线程数量大于线程池容量,
		    //或当前工作线程数量大于core(如果core,为true,则为corePoolSize,否则maximumPoolSize)
                    return false;
                if (compareAndIncrementWorkerCount(c))
		    //CAS操作工作线程数,即原子操作工作线程数+1,成功则跳出自旋
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
		    //如果在判断是否应该添加工作线程执行任务和CAS操作工作线程数,
		    //线程状态改变,跳出本次自旋
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;//工作线程是否开始
        boolean workerAdded = false;//工作线程是否添加成功
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
		    //如果线程池是正在运行或线程池正在关闭,任务为null
                        if (t.isAlive()) // precheck that t is startable
			    //线程存活,抛出非法线程状态异常
                            throw new IllegalThreadStateException();
			//添加工作线程,到工作线程集
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
			    //更新最大线程池数量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
		    //添加工作线程成功,则执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
	        //执行任务失败,则回滚工作线程和工作线程数
                addWorkerFailed(w);
        }
        return workerStarted;
    }

再来看执行失败回滚处理:
 if (! workerStarted)
//执行任务失败,则回滚工作线程和工作线程数
 addWorkerFailed(w);

 /**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
	        //从工作线程集移除工作线程
                workers.remove(w);
            //工作线程数减-1
            decrementWorkerCount();
	    //检查是否线程池关闭,关闭则执行相关工作
	    //这个我们在前面说过,则里简单回顾一下
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

//tryTerminate
 /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        //自旋尝试关闭线程池
        for (;;) {
            int c = ctl.get();
	    //如果线程池正在运行,或正在关闭且队列不为空,则返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
	        //如果工作线程不为空,则中断空闲工作线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
	        //线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
		        //执行结束工作
                        terminated();
                    } finally {
		        //线程池已结束
                        ctl.set(ctlOf(TERMINATED, 0));
			//唤醒等待线程池结束的线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

来看尝试结束线程池的这一点,
//如果工作线程不为空,则中断空闲工作线程
interruptIdleWorkers(ONLY_ONE);
  /**
     * Interrupts threads that might be waiting for tasks (as
     * indicated by not being locked) so they can check for
     * termination or configuration changes. Ignores
     * SecurityExceptions (in which case some threads may remain
     * uninterrupted).
     *
     中断等待任务的空闲非锁住状态的工作线程
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     如果onlyOne为true以为着,只中断最多一个空闲工作线程,这个在关闭线程池时,
     调用或关闭的过程中,工作线程完成任务调用。
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
	        //遍历工作线程集
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {//锁打开说明,工作线程空闲
                    try {
		        //如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
		    //如果是只中断一个空闲线程,则结束本次中断空闲线程任务
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

再看
//执行结束工作
terminated();

/**
     * Method invoked when the Executor has terminated.  Default
     * implementation does nothing. Note: To properly nest multiple
     * overridings, subclasses should generally invoke
     待子类扩展
     * {@code super.terminated} within this method.
     */
    protected void terminated() { }

小节一下:
根据当前线程池状态和核心线程池数量与最大线程池数量,检查是否应该,
添加工作线程执行任务。如果应该添加工作线程,则更新工作线程数,
如果调整成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,
则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于
线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。
回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,
如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭
,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。


2.如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,
回滚任务,从队列中移除任务;
//如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列
 if (isRunning(c) && workQueue.offer(command)) {
     int recheck = ctl.get();
    //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务
     if (! isRunning(recheck) && remove(command))
         reject(command);//移除成功,则进行拒绝任务处理
     else if (workerCountOf(recheck) == 0)
        //如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程
	//即线程池正在关闭之后的状态,且任务队列不为空
         addWorker(null, false);
 }

这部在看了第一点之后,没有什么可看的了,一看就明白,
来一下 reject(command);
/**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        //调用拒绝任务处理器处理任务
        handler.rejectedExecution(command, this);
    }

3.如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;
如果工作线程小于最大线程池数量,则CAS操作workCount,成功创建工作线程执行任务。
 //根据最大线程池数量,判断是否应该添加工作线程,如果当前工作线程数量小于最大线程池数量,则尝试添加
//工作线程线程执行任务,如果尝试失败,则拒绝任务处理
 else if (!addWorker(command, false))
      reject(command);

有了前面的两点,这一点很容量理解。
在前面一篇文章中,我们讲了工作线程,这一篇我们简单看了一下线程池执行任务,
我们回到上一篇的线程执行,还有一点我们没有看:
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();//当前线程
        Runnable task = w.firstTask;//工作线程任务
        w.firstTask = null;
	//任务线程的锁状态默认为-1,此时解锁+1,变为0,即锁打开状态,允许中断,在任务未执行之前,不允许中断。
        w.unlock(); // allow interrupts,
        boolean completedAbruptly = true;//完成后是否可以中断
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted; 
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
		//如果线程池正在Stop,则确保线程中断;
		//如果非处于Stop之后的状态,则判断是否中断,如果中断则判断线程池是否已关闭
		//如果线程池正在关闭,但没有中断,则中断线程池
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
		   //执行前工作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
		        //执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
		       //执行后工作
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
		    //任务线程完成任务数量加1,释放锁
                    w.completedTasks++;
                    w.unlock();
                }
            }
	    //任务已执行完不可以中断
            completedAbruptly = false;
        } finally {
	    //处理任务完成后的工作
            processWorkerExit(w, completedAbruptly);
        }
    }

我们来看任务的执行
while (task != null || (task = getTask()) != null) {

如果任务不为null,即创建工作线程成功,并执行任务,如果为null(即在线程池执行任务的时候,创建工作线程失败,任务入队列),从任务队列取一个任务。
来看从任务队列取任务:
 /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
	        如果线程池处于关闭之后或已关闭任务队列为空,则重置工作线程数
                decrementWorkerCount();
                return null;//返回null任务
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
		//如果线程池正在运行,根据是否允许空闲线程等待任务和
		//当前工作线程与核心线程池数量比较值,判断是否需要超时等待任务
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                if (wc <= maximumPoolSize && ! (timedOut && timed))
		    //如果当前工作线程数,小于最大线程数,空闲工作线程不需要超时等待任务,
		    //则跳出自旋,即在当前工作线程小于最大线程池的情况下,有工作线程可用,
		    //任务队列为空。
                    break;
                if (compareAndDecrementWorkerCount(c))
		    //减少工作线程数量失败,返回null
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
		    //如果与自旋前状态不一致,跳出本次自旋
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
	        //如果非超时则直接take,否则等待keepAliveTime时间,poll任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

再来看线程池执行任务的第二点,
2.如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;
//如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列
 if (isRunning(c) && workQueue.offer(command)) {
     int recheck = ctl.get();
    //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务
     if (! isRunning(recheck) && remove(command))
         reject(command);//移除成功,则进行拒绝任务处理
     else if (workerCountOf(recheck) == 0)
        //如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程
	//即线程池正在关闭之后的状态,且任务队列不为空
         addWorker(null, false);
 }

关键的一点在addWorker(null, false)
//如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程
//即线程池正在关闭之后的状态,且任务队列不为空
addWorker(null, false);

上述代码,如果成功添加的一个空任务的工作线程,任务为空的话,则
从任务队列取任务执行,这个过程与创建工作线程失败,任务入队列相
对应。

总结:
执行任务的过程为,如果工作线程数量小于核心线程池数量,添加工作线程,执行任务;如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;如果工作线程小于最大线程池数量,
则CAS操作workCount,成功创建工作线程执行任务。添加工作线程的过程为,如果应该添加工作线程,则CAS更新工作线程数,如果更新成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。

ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
0
0
分享到:
评论

相关推荐

    ThreadPoolExecutor源码解析.pdf

    - 当提交新任务时,如果线程池状态允许,任务会被放入workQueue,否则拒绝任务。 - 工作线程通过循环调用`runWorker(this)`方法从workQueue中获取并执行任务。 - 如果线程池状态变为SHUTDOWN或STOP,且workQueue...

    12-线程池ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf

    通过上述对`ThreadPoolExecutor`线程池底层实现原理的解析,我们可以看到Java线程池的强大之处在于其高效的状态管理和任务调度机制。通过对`ctl`变量的巧妙利用,线程池能够有效地管理线程状态和数量,从而实现高...

    Spring3.2.6定时任务+线程池.docx

    在这个例子中,它简单地输出了一条消息,并使用注入的线程池执行了一个额外的任务。 #### 五、总结 本文详细介绍了如何在Spring3.2.6中配置线程池和定时任务,包括XML配置文件的具体设置以及程序类的实现。通过这种...

    线程池ThreadPoolExecutor原理源码分析.md

    `execute()` 方法是线程池的主要入口,用于提交新的任务到线程池中执行。下面是对该方法的具体分析: 1. **获取当前状态 `ctl`**: - 该方法首先获取线程池的当前状态 `ctl`,以便进行后续操作。 2. **检查线程数...

    java线程池实例

    Java线程池是一种高效管理线程资源的工具,它通过维护一组可重用的...通过`Thread_Project_Test`这个项目,你可以实践并了解线程池的使用,包括创建线程池、提交任务、配置参数等,进一步提升对Java并发编程的理解。

    线程池管理源码 java 源码

    本文将深入解析线程池的管理源码,帮助读者理解其工作原理和优化策略。 在Java中,`java.util.concurrent`包下的`ThreadPoolExecutor`类是线程池的核心实现。它提供了丰富的参数来定制线程池的行为,包括核心线程数...

    ThreadPoolExecutor运转机制介绍

    2. **当池子大小等于 corePoolSize 时**:如果还有新任务提交,但所有核心线程都在执行任务,那么新任务将会被放置到 `workQueue` 阻塞队列中等待执行。 3. **当 workQueue 放不下新入的任务时**:如果 `workQueue` ...

    JDK1.5线程池源码及详细注释

    2. `CallerRunsPolicy`: 调用者运行策略,当前提交任务的线程自己处理这个任务,不创建新线程。 3. `DiscardOldestPolicy`: 抛弃最旧的任务,尝试再次提交当前任务。 4. `DiscardPolicy`: 直接丢弃当前任务,不做...

    面试必备:Java线程池解析.pdf

    线程池的执行流程涉及execute()方法,当提交任务时,线程池会根据核心线程数、任务队列和最大线程数的状态来决定如何处理任务。如果核心线程数小于corePoolSize,则创建新核心线程;如果核心线程数已满,则将任务...

    android线程池案例

    1. **ExecutorService**: Java的`ExecutorService`接口是线程池的主要入口点,它提供了一种管理和控制线程的方法,如提交任务、关闭线程池等。 2. **ThreadPoolExecutor**: `ThreadPoolExecutor`是`ExecutorService`...

    线程池顶层实现原理之线程模型,状态,执行流程,原理

    2. 任务提交:提交一个任务到线程池中。 3. 任务执行:线程池中的线程执行任务。 4. 线程池调整:根据线程池的状态和线程数量,动态调整线程池的大小。 构造函数解析 ThreadPoolExecutor 构造函数的参数包括: * ...

    美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇.doc

    ThreadPoolExecutor 是 JUC 中的线程池实现类,它继承自 Executor 接口,提供了 execute() 方法来提交任务。 Executor 接口提供了一个 submit() 方法,可以提交任务并返回 Future 对象,以便可以异步地获取任务的...

    JDK之ThreadPoolExecutor源码分析1

    本文将深入解析ThreadPoolExecutor的execute()方法执行流程,以帮助我们理解线程池的工作原理。 当一个任务被提交到线程池,线程池的执行策略主要分为四步: 1. 首先,线程池会检查当前的核心线程数是否已达到设定...

    spring线程池(同步、异步).docx

    `TaskExecutor`是Spring提供的异步任务执行的核心接口,它只有一个方法`execute(Runnable task)`,用于提交任务。Spring提供了多个预定义的`TaskExecutor`实现,以满足不同场景的需求。例如: - `SyncTaskExecutor`...

    ThreadPool UML图

    - ThreadPoolExecutor:线程池执行器,负责创建和管理线程,以及分配任务给工作线程。 - ThreadPoolConfig:线程池配置类,用于设置线程池的参数,如最大线程数、队列容量等。 在实际编程中,线程池的实现方式可能...

    线程池+socket

    - 当Socket连接建立后,将任务提交到线程池执行,主线程继续监听新的客户端连接,实现高并发的Socket服务器。 总之,线程池和Socket的结合使用,能够高效地处理网络服务中的并发连接,提高服务器的吞吐量和响应...

    Java线程池运行状态监控实现解析

    在实际开发过程中,在线程池使用过程中可能会遇到各方面的故障,如线程池阻塞,无法提交新任务等。如果你想监控某一个线程池的执行状态,ThreadPoolExecutor类提供了相关的API,可以实时获取线程池的当前活动线程数...

    Java线程池,正式上线运行的源码,分享欢迎使用并提意见

    `ThreadPoolExecutor`类中,`execute()`方法是提交任务的关键入口,它会根据线程池的状态和当前的工作情况决定如何处理任务。同时,`ThreadPoolExecutor`的内部类`Worker`实现了`Runnable`接口,代表了线程池中的...

    Java线程池深度解析:提高性能的关键

    重要的是,它利用BlockingQueue作为任务队列,平衡了任务提交和执行的速度,提高了效率。线程池的状态管理是其设计的关键。它具有五种状态:RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED,这些状态管理线程池的...

    java线程池详解文档

    - **handler**:当提交任务数量超过`maximumPoolSize`时,线程池会采取的饱和策略。 #### 五、线程池工作流程示例 假设一个银行场景作为线程池工作的模拟案例: - **初始状态**:线程池初始为空,相当于银行尚未...

Global site tag (gtag.js) - Google Analytics