`
marb
  • 浏览: 422448 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java 线程池ThreadPoolExecutor

    博客分类:
  • JAVA
 
阅读更多

JDK1.5 开始关于多线程加了很多特性。如:

ConcurrentHashMap: 放弃使用公用锁同步每一个方法,使用了更细化的锁机制,分离锁。对于大数据量的 HashMap 同步操作效率有了较大提升。

CopyOnWriteArrayList: 是同步 List 的一个并发替代品。其线程安全性来源于这样一个事实:只要有效的不可变对象被正确发布,那么访问它将不再需要更多的同步。在每次需要修改时它们会创建并重新发布一个信的容器拷贝,以此来实现可变性。

增加了 Callable Future Callable runnable 的一个可选替代。我们之前用的 Runnable 是不能返回状态的,而 Callable 是可以返回状态,返回的状态保存在泛型 Future<T> 里。

JDK1.5 里面还包含了一个重要的特性就是线程池。通过查看代码可以看出主要都是由大师 Doug Lea 来完成的。本文主要介绍线程池 ThreadPoolExecutor 的使用。

 

JDK1.5 的线程池由 Executor 框架提供。 Executor 框架将处理请求任务的提交和它的执行解耦。可以制定执行策略。在线程池中 执行线程可以重用已经存在的线程,而不是创建新的线程,可以在处理多请求时抵消线程创建、消亡产生的开销。如果线程池过大,会导致内存的高使用量,还可能 耗尽资源。如果过小,会由于存在很多的处理器资源未工作,对吞吐量造成损失。

由于内容较多没有一一研究,只看了较常用的 ThreadPoolExecutor ,所以在这里做个介绍。 ThreadPoolExecutor 的继承关系如下。

Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor

核心池大小 (core pool size) 、最大池的大小 (maximum pool size) 、存活时间 (keep-alive time) 共同管理着线程的创建和销毁。

线程池类为 java.util.concurrent.ThreadPoolExecutor ,常用构造方法为:

/**

* Creates a new <tt> ThreadPoolExecutor </tt> with the given initial

* parameters.

*

* @param corePoolSize the number of threads to keep in the

* pool, even if they are idle.

* @param maximumPoolSize the maximum number of threads to allow in the

* pool.

* @param keepAliveTime when the number of threads is greater than

* the core, this is the maximum time that excess idle threads

* will wait for new tasks before terminating.

* @param unit the time unit for the keepAliveTime

* argument.

* @param workQueue the queue to use for holding tasks before they

* are executed. This queue will hold only the <tt> Runnable </tt>

* tasks submitted by the <tt> execute </tt> method.

* @param handler the handler to use when execution is blocked

* because the thread bounds and queue capacities are reached.

* @throws IllegalArgumentException if corePoolSize, or

* keepAliveTime less than zero, or if maximumPoolSize less than or

* equal to zero, or if corePoolSize greater than maximumPoolSize.

* @throws NullPointerException if <tt> workQueue </tt>

* or <tt> handler </tt> are null.

*/

public ThreadPoolExecutor( int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

RejectedExecutionHandler handler) {

this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors. defaultThreadFactory (), handler);

}

corePoolSize : 线程池维护线程的最少数量,哪怕是空闲的。

maximumPoolSize :线程池维护线程的最大数量。

keepAliveTime : 线程池维护线程所允许的空闲时间。

unit : 线程池维护线程所允许的空闲时间的单位。

workQueue : 线程池所使用的缓冲队列,改缓冲队列的长度决定了能够缓冲的最大数量。

拒绝任务:拒绝任务是指当线程池里面的线程数量达到 maximumPoolSize workQueue 队列已满的情况下被尝试添加进来的任务。

handler : 线程池对拒绝任务的处理策略。在 ThreadPoolExecutor 里面定义了 4 handler 策略,分别是

1. CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。

2. AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。

3. DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。

4. DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。

 

一个任务通过 execute(Runnable) 方法被添加到线程池,任务就是一个 Runnable 类型的对象,任务的执行方法就是 Runnable 类型对象的 run() 方法。

当一个任务通过 execute(Runnable) 方法欲添加到线程池时,线程池采用的策略如下:

1. 如果此时线程池中的数量小于 corePoolSize ,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

2. 如果此时线程池中的数量等于 corePoolSize ,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列。

3. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量小于 maximumPoolSize ,建新的线程来处理被添加的任务。

4. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量等于 maximumPoolSize ,那么通过 handler 所指定的策略来处理此任务。

处理任务的优先级为:

核心线程 corePoolSize 、任务队列 workQueue 、最大线程 maximumPoolSize ,如果三者都满了,使用 handler 处理被拒绝的任务。当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过 keepAliveTime ,线程将被终止。这样,线程池可以动态的调整池中的线程数。

理解了上面关于 ThreadPoolExecutord 的介绍,应该就基本能了解它的一些使用,不过在 ThreadPoolExocutor 里面有个关键的 Worker 类,所有的线程都要经过 Worker 的包装。这样才能够做到线程可以复用而无需重新创建线程。

同时 Executors 类里面有 newFixedThreadPool(),newCachedThreadPool() 等几个方法,实际上也是间接调用了 ThreadPoolExocutor ,不过是传的不同的构造参数。

下面通过一个例子的执行结果来理解

 

代码:

  1.  import java.io.Serializable;  
  2. import java.util.concurrent.ArrayBlockingQueue;  
  3. import java.util.concurrent.ThreadPoolExecutor;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6. public class TestThreadPool {  
  7.   
  8. private static int produceTaskSleepTime = 2;  
  9. private static int consumeTaskSleepTime = 2000;  
  10. private static int produceTaskMaxNumber = 9;  
  11.   
  12. public static void main(String[] args) {  
  13.   
  14. // 构造一个线程池  
  15. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(233, TimeUnit. SECONDS ,  
  16. new ArrayBlockingQueue<Runnable>(2), new ThreadPoolExecutor.DiscardOldestPolicy() );  
  17. for ( int i = 1; i <= produceTaskMaxNumber ; i++) {  
  18. try {  
  19. // 产生一个任务,并将其加入到线程池  
  20. String task = "task@ " + i;  
  21. System. out .println( "put " + task);  
  22. threadPool.execute( new ThreadPoolTask(task));  
  23.   
  24. // 便于观察,等待一段时间  
  25. Thread. sleep ( produceTaskSleepTime );  
  26. catch (Exception e) {  
  27. e.printStackTrace();  
  28. }  
  29. }  
  30. }  
  31.   
  32. public static class ThreadPoolTask implements Runnable, Serializable {  
  33. private static final long serialVersionUID = 0;  
  34. // 保存任务所需要的数据  
  35. private Object threadPoolTaskData ;  
  36.   
  37. ThreadPoolTask(Object tasks) {  
  38. this . threadPoolTaskData = tasks;  
  39. }  
  40.   
  41. public void run() {  
  42. // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句  
  43. System. out .println( "start .." + threadPoolTaskData );  
  44. try {  
  45. // // 便于观察,等待一段时间  
  46. Thread. sleep ( consumeTaskSleepTime );  
  47. catch (Exception e) {  
  48. e.printStackTrace();  
  49. }  
  50. threadPoolTaskData = null ;  
  51. }  
  52.   
  53. public Object getTask() {  
  54. return this . threadPoolTaskData ;  
  55. }  
  56. }  
  57. }   

 

上面代码定义了一个 corePoolSize 2 maximumPoolSize 3 workerQuene 容量为 3 的线程池,也就是说在饱和状态下,只能容纳 6 个线程, 3 个是运行状态, 3 个在队列里面。同时代码又往线程池里面添加了 9 个线程,每个线程会运行 2 秒,这样必然会到达饱和状态。而饱和状态就涉及到对拒绝任务的处理策略,本处采用了 ThreadPoolExecutor.DiscardOldestPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

put task@ 7

put task@ 8

put task@ 9

start ..task@ 8

start ..task@ 9

采用 ThreadPoolExecutor.DiscardOldestPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

start ..task@ 6

start ..task@ 4

start ..task@ 5

put task@ 7

start ..task@ 7

put task@ 8

put task@ 9

start ..task@ 8

start ..task@ 9

采用 ThreadPoolExecutor.AbortPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

put task@ 7

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

put task@ 8

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

put task@ 9

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

start ..task@ 4

start ..task@ 5

采用 ThreadPoolExecutor.DiscardPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

put task@ 7

put task@ 8

put task@ 9

start ..task@ 4

start ..task@ 5

从运行结果可以看出不同的 Handler 策略对拒绝任务的处理方式。

目前还只偏重在使用层面的理解,底层细节的原理还有待日后学习,欢迎交流。

 

 

本文部分内容和例子参考了: http://blog.csdn.net/imicro/archive/2007/08/29/1763955.aspx

分享到:
评论

相关推荐

    java线程池ThreadPoolExecutor类使用详解.docx

    在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了...

    12、线程池ThreadPoolExecutor实战及其原理分析(下)

    线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...

    Java线程池ThreadPoolExecutor原理及使用实例

    Java线程池ThreadPoolExecutor原理及使用实例 Java线程池ThreadPoolExecutor是Java并发编程中的一种基本机制,主要用于管理和执行任务的线程池。下面对其原理和使用实例进行详细介绍。 线程池概述 线程池是一个...

    Java线程池与ThreadPoolExecutor.pdf

    Java线程池是Java并发编程中...总结来说,理解并正确使用Java线程池和ThreadPoolExecutor对于优化Java应用程序的并发性能至关重要。通过调整线程池的参数,可以平衡资源利用率和系统响应时间,从而提高整体的系统效率。

    java线程池使用后到底要关闭吗

    java线程池使用后到底要关闭吗 java线程池是一种高效的并发编程技术,可以帮助开发者更好地管理线程资源,提高系统的性能和可靠性。然而,在使用java线程池时,一个常见的问题是:使用完线程池后到底要不要关闭?...

    java 线程池例子ThreadPoolExecutor

    Java 线程池例子 ThreadPoolExecutor Java 中的线程池是指一个容器,里面包含了多个线程,这些线程可以重复使用,以避免频繁创建和销毁线程的开销。ThreadPoolExecutor 是 Java 中一个非常重要的线程池实现类,它...

    11、线程池ThreadPoolExecutor实战及其原理分析(上)

    线程池ThreadPoolExecutor实战及其原理分析(上)

    spring线程池ThreadPoolExecutor配置以及FutureTask的使用

    这个类是Spring对Java内置的`java.util.concurrent.ThreadPoolExecutor`的封装,允许开发者在Spring应用上下文中声明式地定义线程池。在本篇文章中,我们将深入探讨`ThreadPoolTaskExecutor`的配置及其使用,并结合`...

    线程池之ThreadPoolExecutor.docx

    线程池是多线程编程中一种高效管理线程资源的方式,主要由Java的`ThreadPoolExecutor`类实现。线程池的工作机制在于控制线程数量,它会将任务放入队列,然后根据线程池的设定创建并启动线程执行这些任务。如果线程...

    线程池ThreadPoolExecutor原理源码分析.md

    ### 线程池 `ThreadPoolExecutor` 原理源码分析 #### 一、概述 线程池作为 Java 并发编程中的重要组件,在实际应用中被广泛使用。其核心类 `ThreadPoolExecutor` 实现了对线程的管理、调度等功能。本文将围绕 `...

    使用线程池ThreadPoolExecutor 抓取论坛帖子列表

    `ThreadPoolExecutor`是Java并发包`java.util.concurrent`中的核心类,用于实现线程池服务。通过设置线程池参数,我们可以定制线程池的行为,比如核心线程数、最大线程数、线程空闲时间等。 创建一个`...

    线程池ThreadPoolExecutor使用简介与方法实例

    线程池ThreadPoolExecutor是Java并发编程中一个非常重要的概念,它允许开发者将任务提交给线程池,并由线程池来管理这些任务的执行。今天,我们将对线程池ThreadPoolExecutor进行详细的介绍,并提供一些实际的使用...

    java线程池封装j

    Java线程池是一种高效管理线程的技术,它允许开发者预定义一组线程,根据任务的需要灵活调度,而不是每次需要执行任务时都创建新的线程。这种设计模式大大提高了系统的性能,减少了系统资源的消耗,特别是在高并发...

    12-线程池ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf

    通过上述对`ThreadPoolExecutor`线程池底层实现原理的解析,我们可以看到Java线程池的强大之处在于其高效的状态管理和任务调度机制。通过对`ctl`变量的巧妙利用,线程池能够有效地管理线程状态和数量,从而实现高...

    11-线程池ThreadPoolExecutor底层原理源码分析(上)-周瑜.pdf

    `ThreadPoolExecutor`是Java中非常重要的一个类,它用于创建线程池。线程池可以重用一组预先创建好的线程来执行任务,从而减少了创建和销毁线程所带来的开销,同时也能够控制运行的线程数量,避免由于过多的线程导致...

    java线程池实例详细讲解

    Java线程池的实现主要有`ThreadPoolExecutor`类,它提供了丰富的构造参数来定制线程池的行为: - `corePoolSize`:线程池的基本大小,即当线程池创建后和运行过程中,即使没有任务,也会保持这个数量的线程存活。 -...

    自定义实现Java线程池

    ### 自定义实现Java线程池 #### 一、概述 在深入探讨自定义Java线程池之前,我们先简要回顾一下线程池的基本概念及其重要性。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动...

    java线程池threadpool简单使用源码

    要理解`java线程池threadpool简单使用源码`,你需要查看`src`目录下的Java文件,了解如何实例化`ThreadPoolExecutor`,设置相关参数,以及如何提交任务到线程池。同时,查看源码中对`ThreadGroup`的使用,理解它如何...

    java线程池的源码分析.zip

    首先,我们要理解Java线程池的核心类`java.util.concurrent.ThreadPoolExecutor`,它是所有自定义线程池的基类。线程池通过维护一个工作线程集合来管理任务的执行,避免了频繁创建和销毁线程的开销,提高了系统的...

    Java线程池文档

    Java线程池是一种高效管理线程的机制,它允许开发者预先设定线程的数量,并通过池化的方式重用已创建的线程,以提高系统性能,减少线程的创建和销毁开销。线程池在Java中是通过`java.util.concurrent`包下的`...

Global site tag (gtag.js) - Google Analytics