`
songzi0206
  • 浏览: 158806 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
Group-logo
All are from ...
浏览量:33805
Group-logo
Programming w...
浏览量:19678
社区版块
存档分类
最新评论

ThreadPoolExecutor 工作线程调度和回收

阅读更多

 

      ThreadPoolExecutor对任务的提交和异步执行已分析完毕,现在要补充一些关于它对线程池的管理,也就是对其工作线程的调度和回收.

       还记得上一篇“ThreadPoolExecutor execute 方法分”最后一个关于任务异步执行的流程图,虽然分支庞杂,但只有两个条逻辑路径会增加工作线程加入到线程池:一是当前线程池的大小<核心线程池大小(poolSize<corePoolSize),二是任务队列已满,并且poolSize < maxPoolSize。参考“ThreadPoolExecutor execute 方法分”最后的流程图,也就是图右半边两个执行步骤“将command加入核心线程池执行”和“尝试在最大线程池数量限制下执行”会增加新的工作线程。具体对应源代码://加入核心线程池执行

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}
//在最大核心线程池数量限制下执行
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}

         上面两方法主要是调用了私有方法addThread(Runnable firstTask),它是ThreadPoolExecutor中唯一一个增加工作线程到线程池的方法。关于该方法在“ThreadPoolExecutor execute 方法分”中已分析,所以这里主要是分析下当增加工作线程的生命周期。当t.start()以后,Worker线程开始工作,请看Worker.run()方法:public void run() {

            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
}

while循环:

       第一轮循环,task必定不等于null,因为是根据提交的任务才新增工作线程的(addThread(firstTask)),可以认为这个firstTask是该线程的“份内工作”,当他执行完份内工作,结束第一轮循环;

        后续循环,会执行“分外工作”,每次都会让getTask()去获取一个待执行的task,然后执行runTask(task)周而复始,这便是它的工作看来得细究两个方法:runTask(task)getTask()

 

先看runTask(task):

 

private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }
 

 

分析3

1)   该方法内部是加锁的,锁的原理会到java.util.concurrent.locks中再写,这里只说这个锁的作用。     这个锁是来保护当前线程(也就是thread对象)的,换句话说,该工作线程是不是空闲、是不是

    回收,都是通过这个锁来判断的。

2)    正常执行目标任务之前,设计了两个钩子,beforeExecuteafterExecute,这个可以通过重写

    来做些需要的事情。

3)    beforeExecute之前的if语句有点晦涩,它主要是:在当前ThreadPoolExecutor状态小于STOP

    前,清除该线程中断标识;假如恰好是由shutdownNow中断了线程,那么该线程会重新中断,

    任务也就取消、不会再执行了。

再看getTask()

 

Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
}

 

  这是一个无限循环,跳出有两种可能:

1)  是当前状态是STOPTERMINATE,则返回null,这和上一篇“ThreadPoolExecutor execute 方法

   分”讲到的在这两种状态下任务队列不会再执行是不谋而合的;

2)  是在任务队列中取到了待执行的任务并返回给工作线程执行;

3)  是如果当前线程满足了可以结束的条件,返回null

 

关于它从任务队列中取任务的逻辑也值得思考:

A)   如果当前已经SHUTDOWN了,那意味着任务队列中不会再加入新的任务,所以直

   接workQueue.poll()即可,无需等待,任务没有也就没有了;

B)   否则再看当前线程池的大小,若不大于核心线程池大小,那该线程就不该回收,所以

   用workQueue.take()进行阻塞获取任务,该线程就会一直阻塞在这里直到任务队列中由了

   新任务并被他取到;

C)   倘若不巧当前线程池大小已经大于核心线程池大小,那该线程是“可以”回收的,只不过

   不是直接回收,而是设计了两个标识提供给客户端进行选择:

   allowCoreThreadTimeOutkeepAliveTime只有在前者为true的情况下,线程才可以

   回收,并且只回收空闲时间已经达到keepAliveTime值的那些线程。对应到代码即:       

   allowCoreThreadTimeOuttrue时,

   workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)等待获取。

 

 小结,已经看到了工作线程生命周期结束部分了,满足三个条件时候工作线程回收:

1)  参数allowCoreThreadTimeOuttrue

2)  该线程在keepAliveTime时间内获取不到任务,即空闲这么长时间

3)  当前线程池大小 > 核心线程池大小corePoolSize

 

       当然这只是对当前正在执行的一个工作线程的分析,并不是线程池回收的全部逻辑,其他逻辑

还有:

 

a)   在每个工作线程每次取不到任务的时候,检查发现可以跳出工作,并且当前状态已经 >=

   SHUTDOWN, 中断所有空闲的工作线程。在ThreadPoolExecutor中,一般的,中断工作线程意味

   着,该线程可能抛出中断异常(如果该线程在获取任务中阻塞),也可能不受影响(如果该线

   程正在执行任务)。这里当然是前一种,因为这里是中断空闲线程,空闲线程总是在获取任务

   队列时候阻塞着,从上面代码看该异常catch住但不做任何处理;和前面呼应下,如何判断当前

   线程是空闲的呢?tryLock() ^_^

b)  调用ThreadPoolExecutor.shutdown(),会做两件事情,一设置当前状态为SHUTDOWN使任务队列不

   再增加新任务,这样当忙碌的工作线程将任务执行完之后,通过步骤1) 回收;二是将所有空闲的

   工作线程中断;

c)   调用ThreadPoolExecutor.shutdownNow(),不管三七二十一,将所有工作线程中断,并返回还没

    完成的任务集;

      中断工作线程就是将其回收,看代码,不管是不是有中断异常,最终都将执行方法workerDone(worker),该方法会将工作线程从线程池中remove掉,当全部remove以后,调用tryTerminate(),该方法就不写了,马上下班呵呵。

分享到:
评论

相关推荐

    Java线程池与ThreadPoolExecutor.pdf

    线程池通过ThreadPoolExecutor类实现,这是一个高度可配置的工具,能够根据具体需求定制线程的创建、管理和销毁策略。 ThreadPoolExecutor的核心参数包括: 1. corePoolSize:核心线程数,这是线程池在非繁忙状态下...

    线程池ThreadPoolExecutor

    - `handler`:拒绝策略,当线程池和工作队列都满时,用于处理新任务。 常见的拒绝策略包括: 1. `CallerRunsPolicy`:调用者运行策略,当前线程直接运行被拒绝的任务。 2. `DiscardOldestPolicy`:丢弃最旧任务,将...

    java多线程,对多线程,线程池进行封装,方便使用

    ThreadPoolExecutor的主要参数包括核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程空闲时间(keepAliveTime)和工作队列(BlockingQueue&lt;Runnable&gt; workQueue)。合理设置这些参数可以优化线程池...

    java实现内存分配,动态优先级,时间片调度,图形化界面

    Java并不直接支持时间片调度,但可以通过自定义线程调度器实现类似功能,比如创建一个`ThreadPoolExecutor`并定制其`RejectedExecutionHandler`。 4. **图形化界面**:Java提供了丰富的GUI库,如JavaFX和Swing,...

    初中级Android开发社招面试之线程.pdf

    关于`AsyncTask`的原理,它有串行调度器和工作线程池,前者负责任务排队,后者执行实际任务。`InternalHandler`负责将结果从工作线程发送到主线程。由于`AsyncTask`的静态成员`sHandler`需要在主线程初始化,因此`...

    java线程和线程池的使用.docx

    在 Java 中,线程是程序执行的独立路径,它们允许...推荐使用实现 Runnable 接口的方式创建线程,并通过线程池来管理和调度线程。在实际开发中,根据需求选择合适的线程创建方式和线程池配置,以达到最佳的并发性能。

    java多线程Demo

    Java的ExecutorService和ThreadPoolExecutor提供了线程池的概念,它可以有效地管理和控制线程,避免大量创建和销毁线程带来的开销。通过ThreadPoolExecutor,我们可以定制线程池的大小、存活时间、任务队列等参数,...

    java多线程详解

    Java的`ExecutorService`接口和`ThreadPoolExecutor`类提供了创建和管理线程池的功能,有助于提高多线程程序的性能和效率。 #### 知识点九:线程的优先级与调度 虽然线程的优先级可以在一定程度上影响其被调度的...

    Java的多线程编程

    Java提供了一个优先级系统来控制线程调度,但实际效果并不一定如预期,因为线程调度很大程度上依赖于操作系统。 8. 线程中断与异常处理 可以通过interrupt()方法中断线程,配合isInterrupted()和...

    java线程学习笔记

    创建Thread类的实例后,通过start()方法启动线程,线程调度器将自动调用run()方法。 执行器Executor是一个用于执行Runnable或Callable任务的接口,它提供了一种将任务提交与任务如何执行解耦的方法。Executor框架中...

    多线程的例子

    线程调度器根据优先级和策略决定哪个线程获得CPU执行权。 6. 死锁:当两个或更多线程相互等待对方释放资源而形成僵局时,就会发生死锁。避免死锁的方法包括避免循环等待、设置超时、资源预分配等。 7. 线程池:为...

    java 程序多线程设计课件

    9. **线程优先级**:Java线程有三个优先级:`MIN_PRIORITY`, `NORM_PRIORITY`, `MAX_PRIORITY`,但优先级并不保证绝对的执行顺序,因为线程调度由JVM决定。 10. **守护线程(Daemon Thread)**:守护线程不会阻止...

    java多线程设计模式

    - **线程调度**:包括抢占式和时间片轮转,由操作系统决定线程的执行顺序。 8. **线程异常处理** - **未捕获异常终止线程**:线程抛出未捕获异常会立即终止,应妥善处理线程异常,避免程序崩溃。 通过对这些知识...

    线程池管理源码 java 源码

    3. **线程调度**:线程从任务队列中取出任务并执行,执行完后重复此过程。 4. **线程回收**:当线程空闲超过存活时间,且线程池中的线程数大于核心线程数,该线程会被回收。 线程池的扩展类`Executors`提供了一些预...

    经典Java --线程

    但实际操作中,优先级并不总是决定执行顺序,因为线程调度是操作系统的行为。 中断线程:使用Thread的interrupt()方法可以中断线程,但不会立即停止线程,而是设置一个中断标志。线程可以通过检查isInterrupted()或...

    陕西多线程1.6终极版(附查名字).rar

    8. **守护线程**:Daemon线程是一种不会阻止程序退出的线程,如垃圾回收器就是守护线程。主线程结束时,所有非守护线程结束,程序才会退出。 9. **线程间的通信**:Java的BlockingQueue接口和并发工具类如Semaphore...

    ThreadPool2

    2. **线程池的工作流程**:当任务提交到线程池时,线程池会根据当前运行的线程数量和工作队列的状态决定如何处理。如果当前线程数量小于核心线程数,那么会立即创建新线程执行任务;如果线程数量等于或大于核心线程...

    Java多线程应用练习源代码及相关说明资料

    - **线程优先级**:Java为线程定义了10个优先级,但并不保证高优先级的线程一定先执行,因为线程调度受到操作系统的限制。 7. **守护线程** - **守护线程**:如垃圾回收器就是守护线程,当所有非守护线程结束时,...

    Java多线程自学笔记

    - **线程调度**: - 由操作系统负责调度线程,决定哪个线程获得CPU时间。 - Java虚拟机无法直接控制调度策略。 #### 八、守护线程 - **守护线程**: - 被标记为守护线程的线程,在所有非守护线程结束后,JVM会...

    Java多线程

    Java的线程调度策略包括:抢占式调度(优先级较高的线程优先执行)和时间片轮转。Java默认采用抢占式调度,但可以通过Thread.setPriority()设置线程优先级。 六、守护线程(Daemon) 守护线程不会阻止程序的退出,...

Global site tag (gtag.js) - Google Analytics