`
freish
  • 浏览: 83853 次
  • 性别: Icon_minigender_1
  • 来自: 摄影帝国
社区版块
存档分类
最新评论

hotspot1.7 ThreadPoolExecutor代码解析

 
阅读更多

写在开头:此文基于hotspot1.7.0(build 1.7.0-b147),1.6及以前的版本与1.7的版本实现上差别很大。线程池的逻辑非常复杂,原因在于线程池是有状态的(不是狭隘的指RUNNING,SHUTDOWN这些状态,而是一个类的状态,可以理解成对象的共享字段),而为了保证可伸缩性与效率,很多地方在访问这些状态的时候都没有使用锁来保证互斥访问,而采用的是多次检测。这意味着会有很多竞态条件的出现,在分析某个方法的时候,要同时想到多线程间多个方法的交互,要考虑它们的交错执行。这里只分析核心重要的方法,其它方法相对简单,就不多言了。限于本人知识、眼界有限,对于一些代码逻辑的解释极可能没有考虑周全,错漏之处也在所难免。看官自己把握,也欢迎留言指正。

线程池内部有一些状态,先来了解下这些状态的机制。以下用代码注释的方式来解释其中的含义。

/*
这个是用一个int来表示workerCount和runState的,其中runState占int的高3位,
其它29位为workerCount的值。

workerCount:当前活动的线程数;
runState:线程池的当前状态。

用AtomicInteger是因为其在并发下使用compareAndSet效率非常高;
当改变当前活动的线程数时只对低29位操作,如每次加一减一,workerCount的值变了,
但不会影响高3位的runState的值。当改变当前状态的时候,只对高3位操作,不会改变低29位的计数值。
这里有一个假设,就是当前活动的线程数不会超过29位能表示的值,即不会超过536870911,
就目前以及可预见的很长一段时间来讲,这个值是足够用了
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//COUNT_BITS,就是用来表示workerCount占用一个int的位数,其值为前面说的29
private static final int COUNT_BITS = Integer.SIZE - 3;

/*
CAPACITY为29位能表示的最大容量,即workerCount实际能用的最大值。
其值的二进制为:00011111111111111111111111111111(占29位,29个1)
*/
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

/*
以下常量是线程池的状态,状态存储在int的高3位,所以要左移29位。
腾出的低29位来表示workerCount
注意,这5个状态是有大小关系的。RUNNING来判断就可以了
*/

/*
RUNNING的含义:线程池能接受新任务,并且可以运行队列中的任务
-1的二进制为32个1,移位后为:11100000000000000000000000000000
*/
private static final int RUNNING    = -1 << COUNT_BITS;

/*
SHUTDOWN的含义:不再接受新任务,但仍可以执行队列中的任务
0的二进制为32个0,移位后还是全0
*/
private static final int SHUTDOWN   =  0 << COUNT_BITS;

/*
STOP的含义:不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务
1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000
*/
private static final int STOP       =  1 << COUNT_BITS;

/*
TIDYING的含义:所有任务均已终止,workerCount的值为0,
转到TIDYING状态的线程即将要执行terminated()钩子方法.
2的二进制为00000000000000000000000000000010
移位后01000000000000000000000000000000
*/
private static final int TIDYING    =  2 << COUNT_BITS;

/*
TERMINATED的含义:terminated()方法执行结束.
3的二进制为00000000000000000000000000000011
移位后01100000000000000000000000000000
*/
private static final int TERMINATED =  3 << COUNT_BITS;
各状态之间可能的转变有以下几种:
RUNNING -> SHUTDOWN
	调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,因此shutdown可能是在finalize中被隐式调用的
(RUNNING or SHUTDOWN) -> STOP
	调用了shutdownNow方法
SHUTDOWN -> TIDYING
	当队列和线程池均为空的时候
STOP -> TIDYING
	当线程池为空的时候
TIDYING -> TERMINATED
	terminated()钩子方法调用完毕
/*
传入的参数为存储runState和workerCount的int值,这个方法用于取出runState的值。
~为按位取反操作,~CAPACITY值为:11100000000000000000000000000000,
再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
*/
private static int runStateOf(int c)     { return c & ~CAPACITY; }

/*
传入的参数为存储runState和workerCount的int值,这个方法用于取出workerCount的值。
因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了,
保留参数的低29位,也就是workerCount的值。
*/
private static int workerCountOf(int c)  { return c & CAPACITY; }

/*
将runState和workerCount存到同一个int中,这里的rs就是runState,
是已经移位过的值,填充返回值的高3位,wc填充返回值的低29位
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }

首先来看下对外接口中关键的execute方法,其实现如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

先判断传入的参数command是否为null,为null就抛出NullPointerException。然后通过workerCountOf方法从ctl所表示的int值中提取出低29位的值,也就是当前活动的线程数,如果当前活动的线程数小于corePoolSize,则增加一个线程(addWorker,接下来会讲解这个方法,其返回值表示是否新增线程成功)来执行新传入的任务。什么概念?也就是说当池中的线程数小于corePoolSize的时候,不管池中的线程是否有空闲的,每次调用该方法都去增加一个线程,直到池中的数目达到corePoolSize为止。如果新增线程成功,则由新线程执行传入的任务command。在这里有可能出现增加线程失败的情况(原因在解释addWorker的时候讲),那就要当做池中当前线程数超过corePoolSize的情况进行处理。也就是进入第三个if里,若当前线程池的状态为RUNNING状态,且将任务command加入队列成功,就会执行if内的逻辑。先讲此if对应的else里的情况,若在执行execute的时候同时有其它线程执行了shutdown方法,而这两个方法不是互斥的,就有竞态条件问题,execute方法之前判断状态为RUNNING,而执行了几条语句后可能池的状态已经变掉了,因此,如果池的状态不为RUNNING或在将command加入队列失败的时候(失败可能是有界队列满了),两种情况要分开处理,当只是状态仍为RUNNING,而队列满的时候,若池中当前活动的线程数小于maximumPoolSize,则会往池中添加线程,若添线程数已经达到了maximumPoolSize或其它原因导致新增线程失败,就会拒绝该任务(reject(command))。 当状态不为RUNNING的时候,if里的addWorker(command, false)操作将直接返回false,使得if条件为true,也会拒绝这个任务。再继续前面往队列里加入任务成功的处理方式。加入任务成功后,会再次检测池的状态是否为RUNNING,若不是,则从池中移出并拒绝该任务,这也就是说,当池的被SHUTDOWN后,将不再接受新任务。这些检查若都没问题,还需要看看池中的活动线程数有没有变成0(执行的任务抛出异常等导致),若为0,则往里加入一个线程,该线程回去队列里拿任务执行。如果一次性往队列里提交了很多任务,而池中的每个任务执行都抛出异常,那么会不会导致剩余的任务得不到执行?显然不会,这个在后面再讲。

接下来介绍上面提到的addWorker方法,方法实现如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
 
    Worker w = new Worker(firstTask);
    Thread t = w.thread;
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int c = ctl.get();
        int rs = runStateOf(c);
 
        if (t == null ||
            (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                firstTask == null))) {
            decrementWorkerCount();
            tryTerminate();
            return false;
        }
 
        workers.add(w);
 
        int s = workers.size();
        if (s > largestPoolSize)
            largestPoolSize = s;
    } finally {
        mainLock.unlock();
    }
 
    t.start();
    if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
        t.interrupt();
 
    return true;
}

这个代码有点儿长,addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。该方法的返回值代表是否成功新增一个线程。这个方法为for循环加了一个标签,for循环里,做了很多事情。首先通过runStateOf方法取出存储在ctl中的状态值,第一个if里的条件有些小复杂:rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()),又是与又是非的,转换成一个等价实现:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()),如果已经调用了SHUTDOWN,池的状态改变后,第一个条件rs >= SHUTDOWN就为true,后面括号里只要有一个条件为真就返回一个新增线程失败的标识。rs != SHUTDOWN结合前面的rs >= SHUTDOWN,表示线程池的状态已经由SHUTDOWN转为剩余的三个状态之一了,此时就要拒绝这个传入的任务;括号里的第二个条件表示状态已经为非运行状态了,却传入了一个任务,这个任务也要拒绝;括号里的第三个条件表示线程池的状态不为RUNNING,但队列中没有任务了,就不需要新增线程了。然后使用一个嵌套循环,来解析下这个循环吧,纵观下,有break标签,continue标签,是不是很不好理解?所以,写代码时避免使用标签。跑题了,继续解释。这个嵌套循环里首先用workerCountOf方法取出当前活动的线程数。若当前活动线程数超过低29位能表示的最大值(也就是容量)时就不能再加线程了,因为再加就会影响状态的值了!若传入的参数core参数为true,则当前活动的线程数要小于corePoolSize才能创建新线程,大于或等于corePoolSize就不能再创建了;若core参数为false,则当前活动的线程数要小于maximumPoolSize才能创建新线程,大于或等于maximumPoolSize就不能再创建了。接下来使用CAS操作将当前活动线程数加一(compareAndIncrementWorkerCount方法,使用原子的compareAndSet来替换旧值。但并不保证成功,若成功,该方法返回true;若失败,则返回false),当加一成功,则跳出大循环,进入循环体后面的真正新增线程的地方;若加一不成功,判断下当前状态改变没有,若改变了则重新开始外层循环的下一次迭代,若状态没有改变,只是加一失败,那么就继续内层循环,直到加一成功。往当前活动线程数加一成功后,就会来真的新增线程了(先加一后新增线程可以避免锁的使用,使用CAS原子操作加一后,其它线程看到的就是加一后的值,若达到上限,其它线程就不会去创建新线程了。若先创建线程,再去加一,若不加锁,假如一个使用无界队列的线程池,当前活动线程数为corePoolSize少一,外部线程在执行execute的时候都发现线程数不足corePoolSize,都去创建线程,而最终只能有一个线程进入线程池,其它的都得作废,而加锁可以解决这个问题,但是降低了线程池的可伸缩性)。

接下来看如何新增线程的。Worker w = new Worker(firstTask),在Worker的构造方法中,创建了一个线程对象,但这个线程是没有启动的。在构造方法中启动线程,会导致this对象泄露,让线程看到未完整构建的对象,这个要避免。既然不能在构造方法里启动,那么就把创建的线程对象拿出来吧,也就是赋给了t变量。因为整个过程并不是互斥的,所以创建完线程对象后再来判断下当前池的状态,接下来的判断又比较复杂:t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null)),转换成一个容易看懂的等价实现:t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null))。里面有个t==null?为啥会出现t==null? Worker的构造方法是通过调用getThreadFactory().newThread(this)方法来创建线程的,而newThread方法可能会返回null(threadFactory可以通过ThreadPoolExecutor的构造方法传入,如没有传入,有个默认实现)。当创建线程失败要减少当前活动线程数;当池的状态非RUNNING和SHUTDOWN的时候,也需要减少当前活动线程数,并要尝试终止线程池;当线程池的状态为非RUNNING,且有初始任务要执行的时候,因为这个状态要拒绝新进来的任务,所以这个新增的线程也没有用处了。当状态判断没有问题时,就会将创建的Worker对象加入到workers字段中(线程终止时会从里面移除,后面会讲到),当池中的工作线程创新高时,会将这个数记录到largestPoolSize字段中。然后就可以启动这个线程t了。若start后,状态又变成了SHUTDOWN状态(如调用了shutdownNow方法)且新建的线程没有被中断过,就要中断该线程(shutdownNow方法要求中断正在执行的线程),shutdownNow方法本身也会去中断存储在workers中的所有线程,为什么这里还有自己处理下呢?中断所有线程的时候需要持有mainLock锁,而添加Worker对象到workers字段中也要持有mainLock锁,所以存在这样一种很难出现的场景:在将Worker对象加入workers字段,释放mainLock锁之后,Worker对象中的线程(即t)启动前,shutdownNow获得了mainLock锁并完成了所有中断操作,而当线程对象还没调用start之前调用该线程的interrupt方法是无效的。所以,t启动后的这段小代码就是为了防止这种极端情况的出现。

在继续其他方法之前,先说下Worker这个内部类。Worker实现了Runnable接口,可以在后续作为Thread的构造方法参数用以创建线程。同时,Worker还继承了AbstractQueuedSynchronizer类,只是简化每个Worker对象相关的锁的获取,在每次执行一个任务的时候,都需要持有这个锁。在以前的ThreadPoolExecutor实现中,并没有继承AbstractQueuedSynchronizer,而是在Worker内部声明了一个对象字段private final ReentrantLock runLock = new ReentrantLock(),每次执行一个任务的时候,需要对runLock加锁。

接下来我们看一下每次新增一个线程后这个线程都做了些什么,显然需要看看Worker的run方法:

public void run() {
    runWorker(this);
}

只是简单的调用了runWorker方法,继续看runWorker:

final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    w.firstTask = null;
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            clearInterruptsForTaskRun();
            try {
                beforeExecute(w.thread, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这个方法逻辑很简单。还记得前面提到的新增线程时指定第一个任务吗?若存在第一个任务,则先执行第一个任务,否则,从队列中拿任务,不断的执行,直到getTask返回null或执行任务出错(中断或任务本身抛出异常),就退出while循环。getTask方法后面会详细讲解。当有任务执行时(之前通过参数传入的第一个任务或从队列中获取的任务),需要做一个状态判断。也就是clearInterruptsForTaskRun方法,来看看这个方法干了什么:

private void clearInterruptsForTaskRun() {
    if (runStateLessThan(ctl.get(), STOP) &&
        Thread.interrupted() &&
        runStateAtLeast(ctl.get(), STOP))
        Thread.currentThread().interrupt();
}

if条件里的内容为runStateLessThan(ctl.get(), STOP) && Thread.interrupted() && runStateAtLeast(ctl.get(), STOP),这里利用了&&的短路特性,当前一个条件为true的时候才去执行后面一个条件。当当前状态小于STOP时,也就是当前状态为RUNNING时,需要清除线程的中断状态(线程池为RUNNING状态,线程却的中断状态却为true,可能在上次执行的任务里调用了类似Thread.currentThread().interrupt()的方法,因此当然不能让接下来执行的任务受之前任务的影响),如果Thread.interrupted()返回false,表示以前没有设置过中断,整个if的结果就是false;如果Thread.interrupted()返回true,那就要考虑为什么会是true了。是RUNNING状态就已经被中断了还是判断第一个条件后另外一个非池中的线程调用了shutdownNow中断了该线程?如果是后者,表示正在执行的任务需要中断,所以第三个条件判断当前池的状态是否不为RUNNING,如果不为RUNNING,那么就要重新中断该线程以维护shutdownNow方法的语义。

在真正执行任务之前,调用了beforeExecute方法,这是一个钩子方法,用户可以继承ThreadPoolExecutor重写beforeExecute方法来做一些事情。接下来就是真正执行任务的时候,执行完了(正常执行结束或抛出异常)会调用afterExecute方法,afterExecute也是个钩子方法,同beforeExecute方法。随后将task变量置为null,让其再从队列里接收任务,若不置为null,就满足while的第一个条件了,结果就是这个任务被死循环执行。然后将该线程完成的任务数自增。只有当线程终止的时候,才会将该线程执行的任务总数加到线程池的completedTaskCount中,所以completedTaskCount这个值并不是一个准确值。在最后有一个将completedAbruptly置为false的操作,如果线程能走到这里来,说明该线程在执行任务过程中没有抛出异常,也就是说该线程并不是异常结束的,而是正常结束的;如果走不到这一步,completedAbruptly的值还是初始值true,表示线程是异常结束的。线程结束时,会调用processWorkerExit方法做一些清理和数据同步的工作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate();
 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

如果线程是异常结束(被中断、任务执行本身异常等),当前活动的线程数减少一个。如果是正常结束的呢?不应该将其也减一吗?不用担心,在runWorker的while最后一次循环中的getTask方法里做掉了。
接下来将该线程执行过的任务数加到completedTaskCount中,这个在前面也提到了。然后从workers中去除该工作线程。如果该线程的中断是因为调用了shutdown、shutdownNow接口而中断的该如何处理?就是这个tryTerminate了,来看下tryTerminate干了什么:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

这个方法用来干啥的?当池的状态为SHUTDOWN且任务队列为空,需要将池的状态转变为TERMINATED;当池的状态为STOP且池中的当前活动线程数为0,要将池的状态转换成TERMINATED。这个方法就是用来做这种状态转变的。如果状态是RUNNING,表示线程池还正在提供服务,不需要状态变换;如果状态为TIDYING或TERMINATED,池中的活动线程数已经是0,自然也不需要做什么操作了;若状态为SHUTDWON,但队列中还有任务,此时这些任务还需要做掉,因此池中的线程不能终止,因此,这种情况下也不需要做什么。如果状态为SHUTDWON但队列中已经没有任务了,这里调用了一个interruptIdleWorkers(ONLY_ONE)操作去中断一个空闲线程。这么做是为什么?【关于这个的理解可能有问题】调用这个方法的目的是将shutdown信号传播给其它线程。调用shutdown方法的时候会去中断所有空闲线程,如果这时候池中所有的线程都正在执行任务,那么就不会有线程被中断,调用shutdown方法只是设置了线程池的状态为SHUTDOWN,在取任务(getTask,后面会细说)的时候,假如很多线程都发现队列里还有任务(没有使用锁,存在竞态条件),然后都去调用take,如果任务数小于池中的线程数,那么必然有方法调用take后会一直等待(shutdown的时候这些线程正在执行任务,所以没能调用它的interrupt,其中断状态没有被设置),那么在没有任务且线程池的状态为SHUTDWON的时候,这些等待中的空闲线程就需要被终止iinterruptIdleWorkers(ONLY_ONE)回去中断一个线程,让其从take中退出,然后这个线程也进入同样的逻辑,去终止一个其它空闲线程,直到池中的活动线程数为0。

当状态为SHUTDOWN,且活动线程数为0的时候,就可以进入TIDYING状态了,进入TIDYING状态就可以执行钩子方法terminated(),该方法执行结束就进入了TERMINATED状态(参考前文中各状态的含义以及可能的状态转变)。最后的termination.signalAll()所为何事?当线程池shutdown后,外部可能还有很多线程在等待线程池真正结束,即调用了awaitTermination方法,该方法中,外部线程就是在termination上await的,所以,线程池关闭之前要唤醒这些等待的线程,告诉它们线程池关闭结束了。

继续说processWorkerExit方法中调用tryTerminate之后的代码。如果池的状态仍为RUNNING,而线程是因为执行的任务本身抛出了异常而结束或正常结束时该如何处理?这时候池的状态还是RUNNING呢!那就是接下来的这个if块要做的事儿了。当池的状态还是RUNNING,又要分两种情况,一种是异常结束,一种是正常结束。异常结束比较好弄,直接加个线程替换死掉的线程就好了,也就是最后的addWorker操作。而正常结束又有几种情况了,如果允许core线程超时,也就是allowCoreThreadTimeOut为true,那么在池中没有任务的时候,调用带有时限参数的poll方法时就可能返回null,致使线程正常退出,如果允许core线程超时,池中最小的线程数可为0,如果此时队列又有任务了,那么池中必须要有一个线程,若池中活动的线程数不为0,就不需要新增线程来替换死掉的线程,否则就要新增一个;如果不允许core线程超时,池中的线程必须达到corePoolSize个才能让多的线程退出,而不需要用新的线程替换,否则也需要新增一个线程替换这个死掉的线程。

在runWorker执行任务之前调用了w.lock操作,为什么要在执行任务的时候锁定这个每个线程都有一份的锁呢?原因在于调用了线程池shutdown后(前面说过,SHUTDOWN的含义:不再接受新任务,但仍可以执行队列中的任务),会调用interruptIdleWorkers方法去终止空闲线程,该方法会持有mainLock锁,但此时队列中可能还有很多任务,线程也可能还正在执行任务,就可能有一些线程终止不掉。此时,有些线程可能刚执行任务结束,正准备再去队列中拿任务,有些可能还正在执行任务,有些可能刚拿到一个新的任务,对于仍进入队列中拿任务的线程,最终队列中任务会被拿完,而此时拿任务的线程会发现线程池的状态为SHUTDOWN,就会立马返回一个null,返回null意味着ThreadPoolExecutor.runWorker中的循环退出了,这个线程也就自动终止了;此外拿任务并没有持有mainLock锁,所以在终止空闲线程与线程非执行任务期间(如从队列获取任务)存在竞态条件。有可能已经判断了线程池的状态仍未RUNNING,准备从queue里take任务,而在执行take之前,另一个非池中的线程可能调用了shutdown,并且执行完了interruptIdleWorkers方法(马上就会介绍这个方法),若此时队列中恰好没有任务了,若这个正要调用take的线程阻塞,就不会醒过来了,不用担心,interruptIdleWorkers已经中断了该线程,而take是可以响应中断的,再调用take后会立马抛出异常。 对于正在执行中的任务,其它线程不能直接将这个正在线程中断掉,因此除了mainLock锁,interruptIdleWorkers还需要持有线程执行任务时获取的那把锁(这也是为什么执行任务的时候需要获取那个每个线程都有的锁的原因),如果获取不成功表示线程正在执行任务。看下终止空闲线程的方法实现:

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断且能立即(tryLock)获取到前面提到的那把线程任务锁时,就中断该线程。为什么需要持有mainLock?mainLock是用来保护workers变量的。

shutdown是持有mainLock的,但是runWorker的时候并没有,那么,会不会出现碰巧出现同一时刻池中所有线程都刚好执行完任务,去取任务的时候发现池的状态为SHUTDOWN,就立即返回null并终止线程,而导致队列中的剩下的任务得不到执行?这是不会出现的,来看下getTask的实现:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;      // Are workers subject to culling?
 
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
 
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

若进入这个方法的工作线程是即将要终止的线程,该方法就必须返回null,有以下几种情形需要返回null:
1、当前活动线程数超过maximumPoolSize个(调用了setMaximumPoolSize的缘故);
2、线程池已经停止(STOP);
3、线程池已经关闭(SHUTDOWN)且任务队列为空;
4、工作线程获取任务超时,且满足(allowCoreThreadTimeOut || workerCount > corePoolSize)条件

先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一,这时候不管队列中有没有任务,都不用去执行了;如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了;所以这两种场景中获取任务的线程没必要存在了,这里调用了decrementWorkerCount减少活动线程数。前面在processWorkerExit中也提到,如果任务是非正常终止,processWorkerExit里要将活动线程数减一,正常的线程退出,减一是在这里做的。返回null之后,runWorker的while循环就退出了。接下来是个嵌套循环,它的目的就是上述的1和4.后面是从队列中取任务,比较简单,不多说。

以上,核心方法分析结束。欢迎指出理解错漏的地方。

 

更多文章在我的博客:http://www.ticmy.com

0
1
分享到:
评论

相关推荐

    hotspot1.7.rar

    【标题】"hotspot1.7.rar" 是一个包含了Hotspot虚拟机源代码的压缩文件,版本号可能指的是Java 7。Hotspot是Oracle公司开发的一款高性能的Java虚拟机(JVM),它广泛应用于Java应用程序的运行环境中。Hotspot的名字...

    hotspot源代码

    二、Hotspot源代码解析 1. **类加载系统**:Hotspot的类加载器负责查找、加载和初始化类。通过分析其源代码,我们可以了解到类的加载过程,包括双亲委派模型、类加载器的生命周期以及类的验证、准备、解析等阶段。 ...

    openjdk 1.7 windows 64位

    7. 类数据共享:这是一种JVM级别的优化,允许类加载器分享已经解析和初始化的类数据,减少启动时间和内存消耗。 此外,OpenJDK 1.7还引入了Project Coin,它是一组小的语法改进,旨在简化Java编程,如方法引用和...

    Java JDK_1.7下载

    Java JDK 1.7,全称为Java Development Kit 1.7,是Oracle公司提供的用于开发和运行Java应用程序的重要工具集。这个版本在Java的历史上占据了重要的地位,因为它引入了许多新特性、性能优化以及对现有功能的改进,...

    JDK1.7 Windows 32位

    - **编译优化**:JDK1.7的HotSpot虚拟机在JIT(Just-In-Time)编译方面有了进一步的提升,能够更快地识别和优化热点代码。 - **内存管理**:对垃圾回收算法进行了优化,降低了GC停顿时间,提高了系统响应速度。 4...

    hotspot.tar.gz

    《深入解析Hotspot JVM源码》 Hotspot JVM,全称Hotspot Virtual Machine,是Java开发工具包(JDK)中的关键组成部分,负责运行Java应用程序。它由Oracle公司开发,以其出色的性能优化和动态编译能力而闻名。...

    hotspot-d9c3790c85c1.rar

    源代码中,我们可以看到类加载、验证、准备、解析和初始化等阶段的具体实现。 六、性能监控和调试工具 Hotspot提供了一系列内置的性能监控和调试工具,如jconsole、jvisualvm等。通过源代码,我们可以学习到如何...

    JVM Hotspot实现源码

    《OpenJDK中的JVM Hotspot实现源码解析》 在Java世界中,JVM(Java Virtual Machine)是运行Java程序的关键组件,它负责将字节码解释执行或即时编译为机器码,使得Java具备跨平台的能力。Hotspot是Oracle JDK和...

    JAVA jdk1.7 64位

    8. **改进的编译器**:JDK 1.7的Java HotSpot编译器包含了一些优化,如更聪明的类型推断,能够更好地提升编译后的代码性能。 9. **安全性强化**:Java 7增强了安全框架,提高了对网络和沙箱环境的安全防护,尤其是...

    《HotSpot实战》

    3. **即时编译(JIT)**:HotSpot的一大特性是其即时编译机制,它能在运行时将频繁执行的热点代码编译为机器码,以提高执行效率。书中会介绍JIT编译的触发条件、优化策略以及如何查看编译信息。 4. **垃圾收集算法*...

    JDK1.7的64位windows压缩包免安装

    6. **改进的编译器和垃圾收集器**:JDK1.7的编译器(HotSpot)和垃圾收集器(Garbage Collector)进行了优化,提高了运行效率。 7. **元空间(Metaspace)**:替代了永久代(PermGen),解决了内存溢出的问题。 8....

    Hotspot实战-pdf版

    深入解析hotspot实战!内容比较翔实、精辟,值得收藏!

    java 虚拟机 hotspot 源码

    java 虚拟机 hotspot 源码

    Ubuntu12.04安装JDK1.7

    ### Ubuntu 12.04 下安装 JDK 1.7 的详细步骤与解析 #### 一、背景介绍 Ubuntu 12.04 LTS (Precise Pangolin) 是一款非常受欢迎的操作系统,尤其是在服务器领域。它基于 Debian 分支,提供了稳定且安全的平台。...

    [百度网盘] HotSpot实战[完整版][带书签].pdf

    根据提供的文件信息,“HotSpot实战[完整版][带书签].pdf”这本书主要围绕HotSpot虚拟机进行深入探讨,HotSpot作为Java虚拟机的一种实现,是当前最广泛使用的JVM之一,尤其在企业级应用中占据重要地位。下面将根据...

    OpenJDK(HotSpot JVM、Javac)源代码学习研究(包括代码注释、文档、用于代码分析的测试用例)

    OpenJDK(HotSpot JVM、Javac)源代码学习研究(包括代码注释、文档、用于代码分析的测试用例)

    Hotspot VM源码

    HotSpot的基础代码是许多人辛勤劳动的结晶,这个过程迄今已持续了超过10年的时间(当然时间长并不意味着一定好,一半一半吧)。所以到现在为止,他的体积是很大的。有将近1500个C/C++头引用和源代码文件,整个虚拟机...

    jdk1.7和1.8中文版

    在性能方面,JDK1.8对HotSpot虚拟机进行了优化,包括G1垃圾收集器的改进,提高了服务器端应用的性能。同时,新的 Nashorn JavaScript引擎使得Java与JavaScript的交互更为便捷。 对于学习和使用Java的开发者来说,...

    Antamedia-HotSpot.rar_Hotspot

    《Antamedia HotSpot许可证代码样本详解》 在IT领域,网络接入控制是管理网络访问权限的重要手段,其中Antamedia HotSpot是一款广泛使用的热点管理软件。本篇将深入探讨Antamedia HotSpot及其许可证代码文件的核心...

    hotspot-8.rar

    【描述】中提到的"hotspot8官网源码",意味着这个压缩包包含了HotSpot虚拟机8版本的源代码。对于Java开发者来说,深入理解HotSpot源码能够帮助他们更好地理解和优化Java应用程序,因为这涉及到JVM如何处理内存管理、...

Global site tag (gtag.js) - Google Analytics