`

【转】线程池深入分析(上)

阅读更多

转载地址:http://blog.csdn.net/a511596982/article/details/8299108

线程池是并发包里面很重要的一部分,在实际情况中也是使用很多的一个重要组件。

下图描述的是线程池API的一部分。广义上的完整线程池可能还包括Thread/Runnable、Timer/TimerTask等部分。这里只介绍主要的和高级的API以及架构和原理。

大多数并发应用程序是围绕执行任务(Task)进行管理的。所谓任务就是抽象、离散的工作单元(unit of work)。把一个应用程序的工作(work)分离到任务中,可以简化程序的管理;这种分离还在不同事物间划分了自然的分界线,可以方便程序在出现错误时进行恢复;同时这种分离还可以为并行工作提供一个自然的结构,有利于提高程序的并发性。下面通过任务的执行策略来引入Executor相关的介绍。

 

一、任务的执行策略

 

任务的执行策略包括4W3H部分:

  • 任务在什么(What)线程中执行
  • 任务以什么(What)顺序执行(FIFO/LIFO/优先级等)
  • 同时有多少个(How Many)任务并发执行
  • 允许有多少个(How Many)个任务进入执行队列
  • 系统过载时选择放弃哪一个(Which)任务,如何(How)通知应用程序这个动作
  • 任务执行的开始、结束应该做什么(What)处理

在后面的章节中会详细分写这些策略是如何实现的。我们先来简单回答些如何满足上面的条件。

  1. 首先明确一定是在Java里面可以供使用者调用的启动线程类是Thread。因此Runnable或者Timer/TimerTask等都是要依赖Thread来启动的,因此在ThreadPool里面同样也是靠Thread来启动多线程的。
  2. 默认情况下Runnable接口执行完毕后是不能拿到执行结果的,因此在ThreadPool里就定义了一个Callable接口来处理执行结果。
  3. 为了异步阻塞的获取结果,Future可以帮助调用线程获取执行结果。
  4. Executor解决了向线程池提交任务的入口问题,同时ScheduledExecutorService解决了如何进行重复调用任务的问题。
  5. CompletionService解决了如何按照执行完毕的顺序获取结果的问题,这在某些情况下可以提高任务执行的并发,调用线程不必在长时间任务上等待过多时间。
  6. 显然线程的数量是有限的,而且也不宜过多,因此合适的任务队列是必不可少的,BlockingQueue的容量正好可以解决此问题。
  7. 固定任务容量就意味着在容量满了以后需要一定的策略来处理过多的任务(新任务),RejectedExecutionHandler正好解决此问题。
  8. 一定时间内阻塞就意味着有超时,因此TimeoutException就是为了描述这种现象。TimeUnit是为了描述超时时间方便的一个时间单元枚举类。
  9. 有上述问题就意味了配置一个合适的线程池是很复杂的,因此Executors默认的一些线程池配置可以减少这个操作。


二、线程池Executor的类体系结构与常用线程池

 

 

Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService

下面这张图完整描述了线程池的类体系结构。

 

首先Executor的execute方法只是执行一个Runnable的任务,当然了从某种角度上将最后的实现类也是在线程中启动此任务的。根据线程池的执行策略最后这个任务可能在新的线程中执行,或者线程池中的某个线程,甚至是调用者线程中执行(相当于直接运行Runnable的run方法)。这点在后面会详细说明。

ExecutorService在Executor的基础上增加了一些方法,其中有两个核心的方法:

  • Future<?> submit(Runnable task)
  • <T> Future<T> submit(Callable<T> task)

这两个方法都是向线程池中提交任务,它们的区别在于Runnable在执行完毕后没有结果,Callable执行完毕后有一个结果。这在多个线程中传递状态和结果是非常有用的。另外他们的相同点在于都返回一个Future对象。Future对象可以阻塞线程直到运行完毕(获取结果,如果有的话),也可以取消任务执行,当然也能够检测任务是否被取消或者是否执行完毕。

 

 

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在

Executors类里面提供了一些静态工厂,生成一些常用的线程池

  • newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
  • newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  • newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
  • newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
  • newSingleThreadScheduledExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。

 

三、线程池Executor的数据结构

 

 

由于已经看到了ThreadPoolExecutor的源码,因此很容易就看到了ThreadPoolExecutor线程池的数据结构。下图3描述了这种数据结构。

图3 ThreadPoolExecutor 数据结构

其实,即使没有上述图形描述ThreadPoolExecutor的数据结构,我们根据线程池的要求也很能够猜测出其数据结构出来。

  • 线程池需要支持多个线程并发执行,因此有一个线程集合Collection<Thread>来执行线程任务;
  • 涉及任务的异步执行,因此需要有一个集合来缓存任务队列Collection<Runnable>;
  • 很显然在多个线程之间协调多个任务,那么就需要一个线程安全的任务集合,同时还需要支持阻塞、超时操作,那么BlockingQueue是必不可少的;
  • 既然是线程池,出发点就是提高系统性能同时降低资源消耗,那么线程池的大小就有限制,因此需要有一个核心线程池大小(线程个数)和一个最大线程池大小(线程个数),有一个计数用来描述当前线程池大小;
  • 如果是有限的线程池大小,那么长时间不使用的线程资源就应该销毁掉,这样就需要一个线程空闲时间的计数来描述线程何时被销毁;
  • 前面描述过线程池也是有生命周期的,因此需要有一个状态来描述线程池当前的运行状态;
  • 线程池的任务队列如果有边界,那么就需要有一个任务拒绝策略来处理过多的任务,同时在线程池的销毁阶段也需要有一个任务拒绝策略来处理新加入的任务;
  • 上面种的线程池大小、线程空闲实际那、线程池运行状态等等状态改变都不是线程安全的,因此需要有一个全局的锁(mainLock)来协调这些竞争资源;
  • 除了以上数据结构以外,ThreadPoolExecutor还有一些状态用来描述线程池的运行计数,例如线程池运行的任务数、曾经达到的最大线程数,主要用于调试和性能分析。

 

四、线程池Executor生命周期

 

 

线程池Executor是异步的执行任务,因此任何时刻不能够直接获取提交的任务的状态。这些任务有可能已经完成,也有可能正在执行或者还在排队等待执行。因此关闭线程池可能出现一下几种情况:

  • 平缓关闭:已经启动的任务全部执行完毕,同时不再接受新的任务
  • 立即关闭:取消所有正在执行和未执行的任务

另外关闭线程池后对于任务的状态应该有相应的反馈信息。

 

图4 描述了线程池的4种状态。

  • 线程池在构造前(new操作)是初始状态,一旦构造完成线程池就进入了执行状态RUNNING。严格意义上讲线程池构造完成后并没有线程被立即启动,只有进行“预启动”或者接收到任务的时候才会启动线程。这个会后面线程池的原理会详细分析。但是线程池是出于运行状态,随时准备接受任务来执行。
  • 线程池运行中可以通过shutdown()和shutdownNow()来改变运行状态。shutdown()是一个平缓的关闭过程,线程池停止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列还没有开始的任务,这时候线程池处于SHUTDOWN状态;shutdownNow()是一个立即关闭过程,线程池停止接受新的任务,同时线程池取消所有执行的任务和已经进入队列但是还没有执行的任务,这时候线程池处于STOP状态。
  • 一旦shutdown()或者shutdownNow()执行完毕,线程池就进入TERMINATED状态,此时线程池就结束了。
  • isTerminating()描述的是SHUTDOWN和STOP两种状态。
  • isShutdown()描述的是非RUNNING状态,也就是SHUTDOWN/STOP/TERMINATED三种状态。

 

图4

线程池的API如下:

图5

其中shutdownNow()会返回那些已经进入了队列但是还没有执行的任务列表。awaitTermination描述的是等待线程池关闭的时间,如果等待时间线程池还没有关闭将会抛出一个超时异常。

对于关闭线程池期间发生的任务提交情况就会触发一个拒绝执行的操作。这是java.util.concurrent.RejectedExecutionHandler描述的任务操作。下一个小结中将描述这些任务被拒绝后的操作。

 

总结下这个小节

  1. 线程池有运行、关闭、停止、结束四种状态,结束后就会释放所有资源
  2. 平缓关闭线程池使用shutdown()
  3. 立即关闭线程池使用shutdownNow(),同时得到未执行的任务列表
  4. 检测线程池是否正处于关闭中,使用isShutdown()
  5. 检测线程池是否已经关闭使用isTerminated()
  6. 定时或者永久等待线程池关闭结束使用awaitTermination()操作

 

五、线程池Executor任务拒绝策略

 

紧接上面,对于关闭线程池期间发生的任务提交情况就会触发一个拒绝执行的操作。这是java.util.concurrent.RejectedExecutionHandler描述的任务操作。

先来分析下为什么有任务拒绝的情况发生

这里先假设一个前提:线程池有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此在任务队列长度有限的情况下就会出现新任务的拒绝处理问题,需要有一种策略来处理应该加入任务队列却因为队列已满无法加入的情况。另外在线程池关闭的时候也需要对任务加入队列操作进行额外的协调处理。

 

RejectedExecutionHandler提供了四种方式来处理任务拒绝策略。

这四种策略是独立无关的,是对任务拒绝处理的四种表现形式。

最简单的方式就是直接丢弃任务。但是却有两种方式,到底是该丢弃哪一个任务,比如可以丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy)。丢弃最旧任务也不是简单的丢弃最旧的任务,而是有一些额外的处理。除了丢弃任务还可以直接抛出一个异常(RejectedExecutionException),这是比较简单的方式。抛出异常的方式(AbortPolicy)尽管实现方式比较简单,但是由于抛出一个RuntimeException,因此会中断调用者的处理过程。除了抛出异常以外还可以不进入线程池执行,在这种方式(CallerRunsPolicy)中任务将有调用者线程去执行。

 

上面是一些理论知识,下面结合一些例子进行分析讨论。

 

[java] view plaincopy
 
  1. package xylz.study.concurrency;  
  2.   
  3. import java.lang.reflect.Field;  
  4. import java.util.concurrent.ArrayBlockingQueue;  
  5. import java.util.concurrent.ThreadPoolExecutor;  
  6. import java.util.concurrent.TimeUnit;  
  7. import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;  
  8. import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;  
  9.   
  10. public class ExecutorServiceDemo {  
  11.   
  12.     static void log(String msg) {  
  13.         System.out.println(System.currentTimeMillis() + " -> " + msg);  
  14.     }  
  15.   
  16.     static int getThreadPoolRunState(ThreadPoolExecutor pool) throws Exception {  
  17.         Field f = ThreadPoolExecutor.class.getDeclaredField("runState");  
  18.         f.setAccessible(true);  
  19.         int v = f.getInt(pool);  
  20.         return v;  
  21.     }  
  22.   
  23.     public static void main(String[] args) throws Exception {  
  24.   
  25.         ThreadPoolExecutor pool = new ThreadPoolExecutor(110, TimeUnit.SECONDS,  
  26.                 new ArrayBlockingQueue<Runnable>(1));  
  27.         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());  
  28.         for (int i = 0; i < 10; i++) {  
  29.             final int index = i;  
  30.             pool.submit(new Runnable() {  
  31.   
  32.                 public void run() {  
  33.                     log("run task:" + index + " -> " + Thread.currentThread().getName());  
  34.                     try {  
  35.                         Thread.sleep(1000L);  
  36.                     } catch (Exception e) {  
  37.                         e.printStackTrace();  
  38.                     }  
  39.                     log("run over:" + index + " -> " + Thread.currentThread().getName());  
  40.                 }  
  41.             });  
  42.         }  
  43.         log("before sleep");  
  44.         Thread.sleep(4000L);  
  45.         log("before shutdown()");  
  46.         pool.shutdown();  
  47.         log("after shutdown(),pool.isTerminated=" + pool.isTerminated());  
  48.         pool.awaitTermination(1000L, TimeUnit.SECONDS);  
  49.         log("now,pool.isTerminated=" + pool.isTerminated() + ", state="  
  50.                 + getThreadPoolRunState(pool));  
  51.     }  
  52.   
  53. }  

 


第一种方式直接丢弃(DiscardPolicy)的输出结果是:

 

[java] view plaincopy
 
  1. 1294494050696 -> run task:0  
  2. 1294494050696 -> before sleep  
  3. 1294494051697 -> run over:0 -> pool-1-thread-1  
  4. 1294494051697 -> run task:1  
  5. 1294494052697 -> run over:1 -> pool-1-thread-1  
  6. 1294494054697 -> before shutdown()  
  7. 1294494054697 -> after shutdown(),pool.isTerminated=false  
  8. 1294494054698 -> now,pool.isTerminated=true, state=3  

 

 

对于上面的结果需要补充几点。

  1. 线程池设定线程大小为1,因此输出的线程就只有一个”pool-1-thread-1”,至于为什么是这个名称,以后会分析。
  2. 任务队列的大小为1,因此可以输出一个任务执行结果。但是由于线程本身可以带有一个任务,因此实际上一共执行了两个任务(task0和task1)。
  3. shutdown()一个线程并不能理解是线程运行状态位terminated,可能需要稍微等待一点时间。尽管这里等待时间参数是1000秒,但是实际上从输出时间来看仅仅等了约1ms。
  4. 直接丢弃任务是丢弃将要进入线程池本身的任务,所以当运行task0是,task1进入任务队列,task2~task9都被直接丢弃了,没有运行。

如果把策略换成丢弃最旧任务(DiscardOldestPolicy),结果会稍有不同。

 

[java] view plaincopy
 
  1. 1294494484622 -> run task:0  
  2. 1294494484622 -> before sleep  
  3. 1294494485622 -> run over:0 -> pool-1-thread-1  
  4. 1294494485622 -> run task:9  
  5. 1294494486622 -> run over:9 -> pool-1-thread-1  
  6. 1294494488622 -> before shutdown()  
  7. 1294494488622 -> after shutdown(),pool.isTerminated=false  
  8. 1294494488623 -> now,pool.isTerminated=true, state=3  

 

 

这里依然只是执行两个任务,但是换成了任务task0和task9。实际上task1~task8还是进入了任务队列,只不过被task9挤出去了。

对于异常策略(AbortPolicy)就比较简单,这回调用线程的任务执行。

对于调用线程执行方式(CallerRunsPolicy),输出的结果就有意思了。

 

[java] view plaincopy
 
  1. 1294496076266 -> run task:2 -> main  
  2. 1294496076266 -> run task:0 -> pool-1-thread-1  
  3. 1294496077266 -> run over:0 -> pool-1-thread-1  
  4. 1294496077266 -> run task:1 -> pool-1-thread-1  
  5. 1294496077266 -> run over:2 -> main  
  6. 1294496077266 -> run task:4 -> main  
  7. 1294496078267 -> run over:4 -> main  
  8. 1294496078267 -> run task:5 -> main  
  9. 1294496078267 -> run over:1 -> pool-1-thread-1  
  10. 1294496078267 -> run task:3 -> pool-1-thread-1  
  11. 1294496079267 -> run over:3 -> pool-1-thread-1  
  12. 1294496079267 -> run over:5 -> main  
  13. 1294496079267 -> run task:7 -> main  
  14. 1294496079267 -> run task:6 -> pool-1-thread-1  
  15. 1294496080267 -> run over:7 -> main  
  16. 1294496080267 -> run task:9 -> main  
  17. 1294496080267 -> run over:6 -> pool-1-thread-1  
  18. 1294496080267 -> run task:8 -> pool-1-thread-1  
  19. 1294496081268 -> run over:9 -> main  
  20. 1294496081268 -> before sleep  
  21. 1294496081268 -> run over:8 -> pool-1-thread-1  
  22. 1294496085268 -> before shutdown()  
  23. 1294496085268 -> after shutdown(),pool.isTerminated=false  
  24. 1294496085269 -> now,pool.isTerminated=true, state=3  

 

参考内容:

深入浅出 Java Concurrency (28): 线程池 part 1 简介
http://www.blogjava.net/xylz/archive/2010/12/19/341098.html
深入浅出 Java Concurrency (29): 线程池 part 2 Executor 以及Executors
http://www.blogjava.net/xylz/archive/2010/12/21/341281.html
深入浅出 Java Concurrency (30): 线程池 part 3 Executor 生命周期
http://www.blogjava.net/xylz/archive/2011/01/04/342316.html
深入浅出 Java Concurrency (31): 线程池 part 4 线程池任务拒绝策略
http://www.blogjava.net/xylz/archive/2011/01/08/342609.html
java的concurrent用法详解
http://blog.csdn.net/a511596982/article/details/8063742

 

分享到:
评论

相关推荐

    线程池源码分析

    线程池ThreadPoolExecutor的源码分析,含中文注释,深入了解线程池的构造

    concurrent线程池的实现技术分析

    【线程池与并发技术分析】 线程池是Java中多线程编程的重要工具,它通过预先创建并管理一组线程来提升系统性能,...深入理解线程池的工作原理和源码,有助于优化系统的并发性能,避免因不当使用导致的性能瓶颈或错误。

    java线程池的源码分析.zip

    本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要理解Java线程池的核心类`java.util.concurrent.ThreadPoolExecutor`,它是所有自定义...

    C++线程池,带PPT 线程池

    在IT领域,线程池是一种优化并发处理的机制,它在多核或多处理器系统中尤其重要,能够有效地管理和调度线程...通过分析提供的PPT,开发者可以更深入地了解线程池的工作机制,以及如何在实际项目中应用和优化线程池。

    聊聊并发(3)Java线程池的分析和使用Java开发Jav

    在Java编程中,线程池是一种管理线程资源的有效方式,它可以提高...在阅读《聊聊并发(3)Java线程池的分析和使用》这份文档时,你可以学习到更多关于线程池的实践技巧和案例分析,这对于提升Java开发能力大有裨益。

    一个简单线程池的实现

    通过这个简单的线程池实现,我们可以深入学习多线程编程,了解线程池的原理,并且掌握如何在实际项目中优化并发性能。不过要注意,由于存在bug,我们需要仔细分析代码,找出问题所在,并进行修正,以使其更稳定和...

    C++ 线程池源码+demo _android源码分析

    总的来说,这个资源为学习和实践C++线程池提供了丰富的素材,无论是对C++的旧版本还是新版本,都可以深入理解线程池的设计原理和实现技巧,同时也能了解到多线程编程中的挑战和解决方案。通过对这些源码的分析,...

    多线程写法(精易模块线程池和鱼刺模块线程池)

    通过`content.txt`文件,我们可以深入研究这两个线程池实现的源码,理解它们的工作原理和内部机制。源码分析可以帮助我们更好地掌握如何在实际项目中应用这些线程池,优化代码性能,解决并发问题。 总之,多线程...

    vc 线程池技术源码   

    本文将深入探讨VC++线程池的原理、使用方法以及相关源码分析。 线程池是一种管理线程的机制,它预先创建一组线程,并保持在就绪状态。当有新的任务需要执行时,线程池会从已存在的线程中选择一个执行任务,而不是...

    Python的线程池实现

    在本篇文章中,我们将深入探讨Python中的线程池实现,并参考提供的`ThreadPool.py`源码进行分析。 首先,Python标准库提供了一个名为`concurrent.futures`的模块,其中包含`ThreadPoolExecutor`类,它是实现线程池...

    安卓,线程池的使用 ,封装

    本文将深入探讨Android中线程池的使用以及如何进行封装,以实现更灵活、高效的并发处理。 首先,我们需要理解Android中的线程池是如何工作的。线程池是由`java.util.concurrent.ThreadPoolExecutor`类提供的,它...

    Java 线程池原理深入分析

    }固定大小线程池使用了ThreadPoolExecutor,核心线程数和最大线程数都设置为指定的值nThreads,工作队列使用无界队列LinkedBlockingQueue。这意味着线程池会维护一定数量的线程,即使没有任务执行,它们也会存活,...

    简单的线程池模型 (Dephi版),线程池

    线程池是一种在计算机编程中优化并发处理的机制,它允许多个任务或工作单元在一组预先创建的线程上并行执行。在Delphi编程环境中,线程池的实现通常涉及对操作系统提供的API或者VCL(Visual Component Library)组件...

    一个简单的线程池例子

    通过查看和分析这个源代码,我们可以学习到线程池的具体实现细节,如如何创建线程、如何管理任务队列、如何调度任务以及如何控制线程池的生命周期。这有助于我们深入理解线程池的工作原理,从而更好地在实际项目中...

    java线程池实例详细讲解

    例如,通过`ThreadPoolExecutor`的`getPoolSize()`、`getActiveCount()`和`getCompletedTaskCount()`等方法获取线程池状态信息,以便于分析和调整线程池的配置。同时,避免创建过多的线程池,因为每个线程池都会消耗...

    linux线程池c源码

    本文将深入解析一个Linux下的线程池实现,该实现使用C语言编写,并遵循GNU通用公共许可证版本2(或之后版本)。线程池是一种软件设计模式,它可以提高程序执行效率,通过复用已创建的线程来减少创建和销毁线程的开销...

    spring 线程池

    `标签`中的“源码”提示我们可以深入研究Spring线程池的实现,例如分析`ThreadPoolTaskExecutor`的源代码,了解其内部如何处理任务提交、线程管理以及异常处理等。而“工具”则可能是指Spring提供的一些工具类或辅助...

    两种线程池的实现和性能评价

    通过对HH线程池和LF线程池的实现原理及性能特点进行深入分析,并结合实验数据,可以得出以下结论: - 在处理大量短时间任务时,HH线程池能够通过异步机制提高系统性能; - 对于长时间运行的任务或高并发场景,LF...

    线程池示例代码

    线程池是一种多线程处理形式,用于管理并发任务的执行。它通过预先创建一组可重用线程来提高系统效率,避免频繁地创建和销毁线程带来的开销。...因此,深入研究线程池的实现和使用技巧对于提升系统性能至关重要。

    线程池(c++)

    分析这个文件可以帮助我们深入理解线程池的实现细节和优化策略。 总结来说,C++中的线程池是一种高效的并发处理方案,通过合理管理和调度线程,能够提高系统的资源利用率和响应速度。在实现时,需要考虑多线程环境...

Global site tag (gtag.js) - Google Analytics