- 浏览: 984947 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Executor接口的定义:http://donald-draper.iteye.com/blog/2365625
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
自此我们把线程池ThreadPoolExecutor的java doc使用说明和变量的定义已经看完。
总结:
ThreadPoolExecutor的变量主要有核心线程池数量corePoolSize和最大线程池数量maximumPoolSize,即在当前任务线程数大于核心线程数量时,是否(allowCoreThreadTimeOut)允许空闲任务线程等,保活keepAliveTime时间,等待新任务的到来。一个线程工厂ThreadFactory用于创建任务线程,一个拒绝任务处理器RejectedExecutionHandler,默认的拒绝任务策略为AbortPolicy,抛出运行时异常,当然还有直接丢弃策略DiscardPolicy,丢弃旧的任务DiscardOldestPolicy,还有调用者执行任务策略CallerRunsPolicy。上面的变量为volatile,以便线程池执行操作时,可以使用最新的变量。
一个阻塞的任务队列final BlockingQueue<Runnable> workQueue,阻塞队列可以为Linked,Array,Delay,SynchronousQueue等阻塞类型,具体可以根据场景选择。默认为LinkedBlockingQueue队列,一般判断队列是否为空,用isEmpty方法,LinkedBlockingQueue一般用于任务相互之间独立,没有交叉,可独立执行。如果用SynchronousQueue,则可用poll方法判断,同步队列一般用于任务之间有依赖的关系的场景,一个任务执行依赖于另一个任务的结果。DelayQueue队列用于定时任务。ArrayBlockingQueue队列一般可以用于
资源有限情况,可以避免资源被耗尽。一个AtomicInteger的ctl用于包装线程状态runState和任务线程数workerCount;低29位保存任务线程数,高两位用于存储线程池状态,线程池状态已用有四种RUNNING,SHUTDOWN ,STOP,TIDYING ,TERMINATED。
RUNNING:接受新的任务,处理队列任务;
SHUTDOWN:不在接受新的任务,处理队列任务;
STOP:不在接受新任务,不处理队列任务,中断正在执行的任务线程;
TIDYING:所有的任务已经结束,任务线程为0,线程转换到TIDYING;
TERMINATED:线程池已结束,即terminated()方法执行完。
线程的状态是可以数字化比较的。
一个任务线程集final HashSet<Worker> workers,largestPoolSize记录线程池的最大任务线程数,completedTaskCount为完成任务计数器,在任务线程结束时,更新。一个可重入锁mainLock,用于保护非线程安全的变量如workers,largestPoolSize,completedTaskCount。
一个等待线程池结束条件termination,用于控制超时等待线程池关闭。
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):http://donald-draper.iteye.com/blog/2367064
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
package java.util.concurrent; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicInteger; import java.util.*; /** * An {@link ExecutorService} that executes each submitted task using * one of possibly several pooled threads, normally configured * using {@link Executors} factory methods. * ThreadPoolExecutor用线程池中的线程执行提交的任务,一般通过Executors的 工厂方法创建ThreadPoolExecutor。 * <p>Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of bounding and managing the resources, * including threads, consumed when executing a collection of tasks. * Each {@code ThreadPoolExecutor} also maintains some basic * statistics, such as the number of completed tasks. * 线程池主要是解决两类不同的问题:线程用于在需要执行大量异步任务的情况下,改善性能, 者得力于线程池减少了每个任务运行的负载,同时提供了管理执行一个任务集所用的资源,包括线程。 ThreadPoolExecutor提供了一些基本的统计,比如完成任务的数量。 * <p>To be useful across a wide range of contexts, this class * provides many adjustable parameters and extensibility * hooks. However, programmers are urged to use the more convenient * {@link Executors} factory methods {@link * Executors#newCachedThreadPool} (unbounded thread pool, with * automatic thread reclamation), {@link Executors#newFixedThreadPool} * (fixed size thread pool) and {@link * Executors#newSingleThreadExecutor} (single background thread), that * preconfigure settings for the most common usage * scenarios. Otherwise, use the following guide when manually * configuring and tuning this class: * 为了在大部分上下文或场景,保证线程池的高效性,ThreadPoolExecutor提供了一些可调整的参数 和扩展的hooks。开发者可以用更方便的Executors的工厂方法newCachedThreadPool创建一个无界的 原子回收线程的线程池,newFixedThreadPool方法创建一个固定大小的线程池,newSingleThreadExecutor 创建一个单线程的后台线程,这些都是为大部分应用场景设置的默认线程池配置。 如果以上线程方法不足以满足应用场景,可以手动配置和调校线程池ThreadPoolExecutor。 * <dl> * * <dt>Core and maximum pool sizes</dt> * 核心和最大线程池数量 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the * pool size (see {@link #getPoolSize}) * according to the bounds set by * corePoolSize (see {@link #getCorePoolSize}) and * maximumPoolSize (see {@link #getMaximumPoolSize}). * 线程池执行器将会根据核心线程池数量和最大线程池数量自动地调整线程池大小。 * When a new task is submitted in method {@link #execute}, and fewer * than corePoolSize threads are running, a new thread is created to * handle the request, even if other worker threads are idle. If * there are more than corePoolSize but less than maximumPoolSize * threads running, a new thread will be created only if the queue is * full. By setting corePoolSize and maximumPoolSize the same, you * create a fixed-size thread pool. By setting maximumPoolSize to an * essentially unbounded value such as {@code Integer.MAX_VALUE}, you * allow the pool to accommodate an arbitrary number of concurrent * tasks. Most typically, core and maximum pool sizes are set only * upon construction, but they may also be changed dynamically using * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd> * 当一个新任务提交的线程池执行时,如果当前正在运行的线程数小于核心线程数,即使线程 池中有空闲的线程,仍创建一个新线程处理任务执行请求,即创建一个新的任务线程执行任务。 如果有大于核心线程池数量,小于最大线程池数量的线程在运行,一个新的任务线程将被创建, 直到任务队列满为止。当设置corePoolSize和maximumPoolSize相等时,即创建一个 固定大小的线程池。如果需要设置maximumPoolSize为无界的,比如Integer.MAX_VALUE, 那么将允许线程池容纳任意数量的任务并发执行。在典型的场景中,corePoolSize和maximumPoolSize 仅仅在构造中设置,但是我们也可以动态的调整用#setCorePoolSize和#setMaximumPoolSize函数。 * <dt>On-demand construction</dt> * * <dd> By default, even core threads are initially created and * started only when new tasks arrive, but this can be overridden * dynamically using method {@link #prestartCoreThread} or {@link * #prestartAllCoreThreads}. You probably want to prestart threads if * you construct the pool with a non-empty queue. </dd> * 在默认情况,只有在新任务到达时,才开始创建和启动核心线程, 但是我们可以#prestartCoreThread和 #prestartAllCoreThreads方法动态调整 默认核心线程调度策略。prestartCoreThread方法为创一个空闲任务线程等待任务的到达, prestartAllCoreThreads创建核心线程池数量的空闲任务线程等待任务的到达。 如果我们构造是非空队列的线程池,也许我们想预启动任务线程。 * <dt>Creating new threads</dt> * * <dd>New threads are created using a {@link ThreadFactory}. If not * otherwise specified, a {@link Executors#defaultThreadFactory} is * used, that creates threads to all be in the same {@link * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and * non-daemon status. By supplying a different ThreadFactory, you can * alter the thread's name, thread group, priority, daemon status, * etc. If a {@code ThreadFactory} fails to create a thread when asked * by returning null from {@code newThread}, the executor will * continue, but might not be able to execute any tasks. Threads * should possess the "modifyThread" {@code RuntimePermission}. If * worker threads or other threads using the pool do not possess this * permission, service may be degraded: configuration changes may not * take effect in a timely manner, and a shutdown pool may remain in a * state in which termination is possible but not completed.</dd> * ThreadPoolExecutor用ThreadFactory创建新的任务线程。如果线程工程没有特别的指定, 默认线程工厂为Executors#defaultThreadFactory,默认线程工厂创建线程,具有相同的优先级NORM_PRIORITY, 相同的线程组ThreadGroup,并且为非守护线程。我们可以提供一个不同的线程功能, 用于改变线程名,线程分组,线程优先级,守护状态等。当线程工厂创建线程失败时, 执行器将会,忽略任务,也许可能知道执行器不能够执行任何任务。线程应该可以控制 modifyThread运行时允许。如果工作线程或者其他线程池线程不能够控制RuntimePermission(modifyThread) ,线程池服务将会degraded(退化,降级):配置的改变,也许不能够及时生效,一个关闭的 线程池也许停留在结束状态,但任务为完成。 * <dt>Keep-alive times</dt> * * <dd>If the pool currently has more than corePoolSize threads, * excess threads will be terminated if they have been idle for more * than the keepAliveTime (see {@link #getKeepAliveTime}). This * provides a means of reducing resource consumption when the pool is * not being actively used. If the pool becomes more active later, new * threads will be constructed. This parameter can also be changed * dynamically using method {@link #setKeepAliveTime}. Using a value * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively * disables idle threads from ever terminating prior to shut down. By * default, the keep-alive policy applies only when there are more * than corePoolSizeThreads. But method {@link * #allowCoreThreadTimeOut(boolean)} can be used to apply this * time-out policy to core threads as well, so long as the * keepAliveTime value is non-zero. </dd> 如果当前线程池中的线程数量大于核心线程池数量,如果空闲线程空闲的时间大于 保活时间keepAliveTime的线程,则将会被终止。当线程池没有充分利用的情况下, 此策略可以减少资源的消耗。如果线程池之后,变得更活跃,则新的任务线程将会被闯将。 我们可以用#setKeepAliveTime方法动态的改变保活时间,用一个 Long.MAX_VALU,TimeUnit#NANOSECONDS 作为保活时间,那么空闲的线程可以避免在线程池关闭之前被终止。保活策略只有在当前线程池线程数量大于 核心线程池数量时,才起作用。#allowCoreThreadTimeOut用于控制当任务线程空闲时,是否允许线程等待 keepAliveTime时间,以便在这个过程中,有新的任务进来。 * * <dt>Queuing</dt> * * <dd>Any {@link BlockingQueue} may be used to transfer and hold * submitted tasks. The use of this queue interacts with pool sizing: * BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联。 * [list] * * <li> If fewer than corePoolSize threads are running, the Executor * always prefers adding a new thread * rather than queuing.</li> * 如果当前线程池任务线程数量小于核心线程池数量,执行器总是优先创建一个任务线程, 而不是从线程队列中取一个空闲线程。 * <li> If corePoolSize or more threads are running, the Executor * always prefers queuing a request rather than adding a new * thread.</li> * 如果当前线程池任务线程数量大于核心线程池数量,执行器总是优先从线程队列中取一个空闲线程, 而不是创建一个任务线程。 * <li> If a request cannot be queued, a new thread is created unless * this would exceed maximumPoolSize, in which case, the task will be * rejected.</li> * 如果当前线程池任务线程数量大于核心线程池数量,且队列中无空闲任务线程,将会创建 一个任务线程,直到超出maximumPoolSize,如果超时maximumPoolSize,则任务将会被拒绝。 * [/list] * * There are three general strategies for queuing: * [list=1] * ThreadPoolExecutor有3中出队列策略 * <li> [i] Direct handoffs.[/i] A good default choice for a work * queue is a {@link SynchronousQueue} that hands off tasks to threads * without otherwise holding them. Here, an attempt to queue a task * will fail if no threads are immediately available to run it, so a * new thread will be constructed. This policy avoids lockups when * handling sets of requests that might have internal dependencies. * Direct handoffs generally require unbounded maximumPoolSizes to * avoid rejection of new submitted tasks. This in turn admits the * possibility of unbounded thread growth when commands continue to * arrive on average faster than they can be processed. </li> * 直接handoffs,队列非默认选择为同步队列SynchronousQueue,SynchronousQueue是一个 take同时对应一个put,反之亦然。如果没有线程立刻可用,则任务线程尝试出队列失败, 一个新的线程将会被创建。这种策略用于处理的任务请求对任务队列中的其他任务有依赖的 情况,这种策略可以避免查找。Direct handoffs策略一般需要一个无界的maximumPoolSizes, 为了避免拒绝新任务的提交。这样在任务提交数量平均速度大于线程池可以处理的速度时,可到导致 无限的任务继续提交。 * <li>[i] Unbounded queues.[/i] Using an unbounded queue (for * example a {@link LinkedBlockingQueue} without a predefined * capacity) will cause new tasks to wait in the queue when all * corePoolSize threads are busy. Thus, no more than corePoolSize * threads will ever be created. (And the value of the maximumPoolSize * therefore doesn't have any effect.) This may be appropriate when * each task is completely independent of others, so tasks cannot * affect each others execution; for example, in a web page server. * While this style of queuing can be useful in smoothing out * transient bursts of requests, it admits the possibility of * unbounded work queue growth when commands continue to arrive on * average faster than they can be processed. </li> * Unbounded queues无界的任务队列的默认选择为没有初始化容量的LinkedBlockingQueue, 当所有核心任务线程处于忙碌中时,将会引起新的任务在队列中等待。 这样没有大于核心线程池数量的线程被创建。(因此这种策略下,maximumPoolSize将会无效)。 LinkedBlockingQueue策略可以用于每个任务独立完成,任务之间的执行互不影响; 比如Web服务器。这种策略在突然并发量增大时,平滑的处理突发的并发量的场景中, 比较有效,同时在任务提交数量平均速度大于线程池可以处理的速度时,可到导致 无限的任务继续提交。 * <li>[i]Bounded queues.[/i] A bounded queue (for example, an * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when * used with finite maximumPoolSizes, but can be more difficult to * tune and control. Queue sizes and maximum pool sizes may be traded * off for each other: Using large queues and small pools minimizes * CPU usage, OS resources, and context-switching overhead, but can * lead to artificially low throughput. If tasks frequently block (for * example if they are I/O bound), a system may be able to schedule * time for more threads than you otherwise allow. Use of small queues * generally requires larger pool sizes, which keeps CPUs busier but * may encounter unacceptable scheduling overhead, which also * decreases throughput. </li> * Bounded queues有界队列,可以选择ArrayBlockingQueue,用有限的maximumPoolSizes 阻止资源的浪费,但实际上是很难调校和控制的。队列的size和最大线程池size也许有一个 这种的方案:在CPU的使用率,OS资源,上下文切换负载,可用一个队列容量大一些, maximumPoolSizes小一些,但这样可能导致吞吐量较低。如果任务频繁地阻塞(比如IO限制), 系统可以调度比我们允许的更多的线程。用队列容量小,一般需要maximumPoolSizes大一些, 这样可以保当CUP充分利用,但是可能遇到不可接受的调度负载,这样也会降低吞吐量。 这一段的意思是说,当我们使用ArrayBlockingQueue的时候,我们要在队列容量和maximumPoolSizes 去一个这种,两者谁太大或太小,都可能导致吞吐量的下降。 * [/list] * * </dd> * * <dt>Rejected tasks</dt> * * <dd> New tasks submitted in method {@link #execute} will be * [i]rejected[/i] when the Executor has been shut down, and also * when the Executor uses finite bounds for both maximum threads and * work queue capacity, and is saturated. In either case, the {@code * execute} method invokes the {@link * RejectedExecutionHandler#rejectedExecution} method of its {@link * RejectedExecutionHandler}. Four predefined handler policies are * provided: * 当执行器关闭时,或者执行器用有界的最大线程池数量和任务队列容量饱 和时,新提交的任务将会被拒绝。在其他情况下,execute方法将调用RejectedExecutionHandler 的rejectedExecution方法处理任务。有四种处理策略提供如下: * [list=1] * * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the * handler throws a runtime {@link RejectedExecutionException} upon * rejection. </li> * 默认情况下,ThreadPoolExecutor.AbortPolicy策略是,在拒绝任务时,抛出 RejectedExecutionException运行时异常。 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread * that invokes {@code execute} itself runs the task. This provides a * simple feedback control mechanism that will slow down the rate that * new tasks are submitted. </li> * CallerRunsPolicy:调用线程将会运行任务,及调用execute方法的线程。 这种策略提供了一个反馈机制减慢新任务的提交速度。 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that * cannot be executed is simply dropped. </li> * DiscardPolicy:直接丢弃新提交的任务 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the * executor is not shut down, the task at the head of the work queue * is dropped, and then execution is retried (which can fail again, * causing this to be repeated.) </li> * DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新 尝试执行任务(如果失败,则重复这一过程) * [/list] * * It is possible to define and use other kinds of {@link * RejectedExecutionHandler} classes. Doing so requires some care * especially when policies are designed to work only under particular * capacity or queuing policies. </dd> * 我们也可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。 * <dt>Hook methods</dt> * * <dd>This class provides {@code protected} overridable {@link * #beforeExecute} and {@link #afterExecute} methods that are called * before and after execution of each task. These can be used to * manipulate the execution environment; for example, reinitializing * ThreadLocals, gathering statistics, or adding log * entries. Additionally, method {@link #terminated} can be overridden * to perform any special processing that needs to be done once the * Executor has fully terminated. * ThreadPoolExecutor提供了#beforeExecute和#afterExecute方法分别在任务被执行前和 执行后调用,我们可重写这两个方法,做些需要的工作。这两个方法可以用于控制执行环境。 比如重新初始化ThreadLocals类变量,统计,添加日志等。#terminated方法可以被重写, 用于执行一些特殊的处理,在执行器完全结束前。 * <p>If hook or callback methods throw exceptions, internal worker * threads may in turn fail and abruptly terminate.</dd> * 如果hook和回调方法抛出异常,内部的任务线程将会失败并结束。 * <dt>Queue maintenance</dt> * * <dd> Method {@link #getQueue} allows access to the work queue for * purposes of monitoring and debugging. Use of this method for any * other purpose is strongly discouraged. Two supplied methods, * {@link #remove} and {@link #purge} are available to assist in * storage reclamation when large numbers of queued tasks become * cancelled.</dd> * #getQueue可以访问任务队列,用于监控和调试。如果这个方法用于其他目的, 强烈不建议。#remove和#purge方法用于回收空间,在大量的队列任务被取消时。 * <dt>Finalization</dt> * * <dd> A pool that is no longer referenced in a program [i]AND[/i] * has no remaining threads will be {@code shutdown} automatically. If * you would like to ensure that unreferenced pools are reclaimed even * if users forget to call {@link #shutdown}, then you must arrange * that unused threads eventually die, by setting appropriate * keep-alive times, using a lower bound of zero core threads and/or * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> * 当一个线程池不再被引用,且线程池中没有任务线程,线程池将会自动关闭。 如果您忘记了关闭线程池,但想保证无引用的线程池被回收,您可以通过设置 keep-alive保活时间,用一个核心线程数较小的线程池,或设置#allowCoreThreadTimeOut 运行线程空闲时等待或不等待。 * </dl> * * <p> <b>Extension example</b>. Most extensions of this class * override one or more of the protected hook methods. For example, * here is a subclass that adds a simple pause/resume feature: * 扩展实例。大部分的扩展实例会重写一个或多个protected hook方法。 下面一个实例添加了暂停和恢复的特点。 * <pre> {@code * class PausableThreadPoolExecutor extends ThreadPoolExecutor { * private boolean isPaused; * private ReentrantLock pauseLock = new ReentrantLock(); * private Condition unpaused = pauseLock.newCondition(); * * public PausableThreadPoolExecutor(...) { super(...); } * * protected void beforeExecute(Thread t, Runnable r) { * super.beforeExecute(t, r); * pauseLock.lock(); * try { * while (isPaused) unpaused.await(); * } catch (InterruptedException ie) { * t.interrupt(); * } finally { * pauseLock.unlock(); * } * } * * public void pause() { * pauseLock.lock(); * try { * isPaused = true; * } finally { * pauseLock.unlock(); * } * } * * public void resume() { * pauseLock.lock(); * try { * isPaused = false; * unpaused.signalAll(); * } finally { * pauseLock.unlock(); * } * } * }}</pre> * * @since 1.5 * @author Doug Lea */ public class ThreadPoolExecutor extends AbstractExecutorService { /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * ctl线程的主要控制状态,包装者两个概念fields,workerCount表示有效的 任务线程数量,runState表示是否运行和关闭。 * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * 为了包装workerCount和runState为一个int,我们限制任务线程数量为 (2^29)-1 大约500百万个线程,而不是(2^31)-1两亿个线程。如果这种策略在 将来有问题,可以将ctl改变为AtomicLong,在调整shift/mask常量。改变为AtomicLong 执行在需要的时候,才会做,本线程池实现类用的为简单的int。 * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. * workerCount表示允许启动不许停止的任务线程数量,即运行中的任务线程数量。 workerCount也许瞬态与实际的存活线程有所不同,比如当任务提交执行时,线程工厂 创建一个线程失败,退出线程在结束之前,能在执行bookkeeping(记录)。 用户可见的线程池数量用作当前任务线程数量。 * The runState provides the main lifecyle control, taking on values: *runState提供主要的声明周期控制,有一下值 * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * RUNNING:接受新的任务,处理队列任务; SHUTDOWN:不在接受新的任务,处理队列任务; STOP:不在接受新任务,不处理队列任务,中断正在执行的任务线程; TIDYING:所有的任务已经结束,任务线程为0,线程转换到TIDYING; TERMINATED:线程池已将结束,即terminated()方法执行完。 * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: *几种状态的关系可以数字化的比较。runState随着线程池运行时间的变化, 而增加,但是不必经过每一个状态。状态的转换如下: * RUNNING -> SHUTDOWN(调用shudown方法) * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP(调用shutdownNow方法) * On invocation of shutdownNow() * SHUTDOWN -> TIDYING(当任务队列和线程池都为空时) * When both queue and pool are empty * STOP -> TIDYING(当线程池为空) * When pool is empty * TIDYING -> TERMINATED(terminated方法执行完) * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * 线程调用awaitTermination方法,将会等待线程池状态达到TERMINATED * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). 在SHUTDOWN转换到TIDYING过程比较是难捕捉的,因为在队列在线程池非空时, 队列可能为空,反之在SHUTDOWN状态,在看到队列为空,任务线程为0(这个有时需要进行recheck)时, 我们可以结束线程池。 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;//任务线程数量所占的int的位数 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//最大任务线程数量为2^29-1 // runState is stored in the high-order bits 运行状态runState存储在ctl的高位 private static final int RUNNING = -1 << COUNT_BITS;//100溢出(29) private static final int SHUTDOWN = 0 << COUNT_BITS;//00(29) private static final int STOP = 1 << COUNT_BITS;//01(29) private static final int TIDYING = 2 << COUNT_BITS;//10(29) private static final int TERMINATED = 3 << COUNT_BITS;//11(29) // Packing and unpacking ctl,包装和解包ctl private static int runStateOf(int c) { return c & ~CAPACITY; }//运行状态 private static int workerCountOf(int c) { return c & CAPACITY; }//运行的任务线程数 private static int ctlOf(int rs, int wc) { return rs | wc; }//包装运行状态和任务线程数 /* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. 由于任务线程计数器不会为负数,所以比较状态时,就不必要解包ctl */ //是否小于某个状态 private static boolean runStateLessThan(int c, int s) { return c < s; } //是否大于等于某个状态 private static boolean runStateAtLeast(int c, int s) { return c >= s; } //是否是运行状态 private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * Attempt to CAS-increment the workerCount field of ctl. 尝试CAS任务线程数+1 */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } /** * Attempt to CAS-decrement the workerCount field of ctl. 尝试CAS任务线程数-1 */ private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } /** * Decrements the workerCount field of ctl. This is called only on * abrupt termination of a thread (see processWorkerExit). Other * decrements are performed within getTask. 在任务线程中断结束时,调用processWorkerExit方法,用于清除当前中断任务线程计数,在getTask也有用到。 */ private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } /** * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. 任务队列用于放提交到线程池的任务,并有任务线程处理。我们一般不用 poll返回null来判断队列是否为null,而是用isEmpty方法判断队列是否为空, 以便判断是否应该将线程池状态从SHUTDOWN切换到TIDYING。但是强烈建议在用 DelayQueues作为任务队列时可以用poll,由于poll的方法允许返回null,即使 在延时时间过期时,返回为非null。 */ private final BlockingQueue<Runnable> workQueue; /** * Lock held on access to workers set and related bookkeeping. * While we could use a concurrent set of some sort, it turns out * to be generally preferable to use a lock. Among the reasons is * that this serializes interruptIdleWorkers, which avoids * unnecessary interrupt storms, especially during shutdown. * Otherwise exiting threads would concurrently interrupt those * that have not yet interrupted. It also simplifies some of the * associated statistics bookkeeping of largestPoolSize etc. We * also hold mainLock on shutdown and shutdownNow, for the sake of * ensuring workers set is stable while separately checking * permission to interrupt and actually interrupting. 当需要访问任务线程集合和相关的记录需要,加锁。当我们用一个并发集合 排序时,以便情况下,最好使用锁。其中有一个原因为interruptIdleWorkers, 即中断空闲任务线程,这样可以在关闭线程池的过程中,避免中断风暴。 否则退出的任务线程将会并发中断还没有中断的任务线程,即有可能发生中断风暴。 也被用于简单的统计largestPoolSize等。在关闭和立即关闭时,我们需要持有锁, 以便在独立检查中断允许和实际中断状态时,保证任务线程集的稳定性。 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. 线程池任务线程集,当持有mainLock锁时,可以访问线程池任务线程集 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination 等待线程池结束条件 */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. 在持有mainLock的情况下,追踪最大线程池 */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. 在持有mainLock的情况下,可以访问,completedTaskCount为完成任务计数器, 在任务线程结束时,更新。 */ private long completedTaskCount; /* * All user control parameters are declared as volatiles so that * ongoing actions are based on freshest values, but without need * for locking, since no internal invariants depend on them * changing synchronously with respect to other actions. 所有用于控制参数被修饰为volatiles,以便正在进行的操作,都是基于最新值, 在不需要锁的情况下,相对于其他动作,依赖于这些参数的可变量同步地改变。 即所有需要引用这些参数的变量或动作,可以立即看到参数最新值。 */ /** * Factory for new threads. All threads are created using this * factory (via method addWorker). All callers must be prepared * for addWorker to fail, which may reflect a system or user's * policy limiting the number of threads. Even though it is not * treated as an error, failure to create threads may result in * new tasks being rejected or existing ones remaining stuck in * the queue. * ThreadFactory为创建任务线程的工厂。所有任务线程的创建都是在调用addWorker 的过程中,使用线程工厂创建。所有调用线程工厂创建任务线程的使用者,必要 做好添加任务线程失败的心理准备,这也许会影响系统或用户的线程数量限制策略。 即使不作为错误对待,创建任务线程失败,也许导致新任务被拒绝,或一个任务 阻塞在任务队列中。 * We go further and preserve pool invariants even in the face of * errors such as OutOfMemoryError, that might be thrown while * trying to create threads. Such errors are rather common due to * the need to allocate a native stack in Thread#start, and users * will want to perform clean pool shutdown to clean up. There * will likely be enough memory available for the cleanup code to * complete without encountering yet another OutOfMemoryError. 即使在创建任务线程是可能会有OutOfMemoryError的错误,我们必须尽量保证 线程池的不变性。这种事情的发生,一般是由在我们创建一个线程的本地栈时, 用户想要关闭线程池,清除任务线程。在没有遇到OutOfMemoryError的情况下, 将会有足够的内存用于清理工作。 */ private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. 当线程池饱和或线程池关闭时,拒绝任务处理handler */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. 线程池空闲任务线程,等待任务的时间。如果当前线程数量大于核心线程池数量, 且allowCoreThreadTimeOut为true,任务线程空闲,允许等待keepAliveTime时间, 以便在这个时间范围内,有任务需要执行 */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. 在当前线程数量大于核心线程池数量的情况下,是否允许空闲任务线程等, 保活keepAliveTime时间,等待任务的到来。 */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. 在不允许空闲等待的情况,核心线程池数量,即保活的任务线程最小数量。 如果允许空闲等待,线程池任务线程可能为0。 */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. 最大线程池数量,如果容量是有界的,实际为CAPACITY */ private volatile int maximumPoolSize; /** * The default rejected execution handler,默认的拒绝任务策略,抛出运行时异常 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** * Permission required for callers of shutdown and shutdownNow. * We additionally require (see checkShutdownAccess) that callers * have permission to actually interrupt threads in the worker set * (as governed by Thread.interrupt, which relies on * ThreadGroup.checkAccess, which in turn relies on * SecurityManager.checkAccess). Shutdowns are attempted only if * these checks pass. 当调用者调用shutdown和shutdownNow方法时,需要shutdownPerm运行时允许权限, 以便调用者可以权限中断任务线程,在关闭的时候,首先检查调用者是否有 shutdownPerm运行时权限。通过ThreadGroup.checkAccess是否拥有权限。 * * All actual invocations of Thread.interrupt (see * interruptIdleWorkers and interruptWorkers) ignore * SecurityExceptions, meaning that the attempted interrupts * silently fail. In the case of shutdown, they should not fail * unless the SecurityManager has inconsistent policies, sometimes * allowing access to a thread and sometimes not. In such cases, * failure to actually interrupt threads may disable or delay full * termination. Other uses of interruptIdleWorkers are advisory, * and failure to actually interrupt will merely delay response to * configuration changes so is not handled exceptionally. 实际上调用线程中断(interruptIdleWorkers和interruptWorkers) 忽略SecurityExceptions,意味着尝试中断默认失败。在线程关闭的时候, 除非SecurityManager有不一致的策略(有事允许,有时不允许),否则中断不应该失败。 如果SecurityManager为不一致的策略,线程的中断实际上有可能失败。 */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); }
自此我们把线程池ThreadPoolExecutor的java doc使用说明和变量的定义已经看完。
总结:
ThreadPoolExecutor的变量主要有核心线程池数量corePoolSize和最大线程池数量maximumPoolSize,即在当前任务线程数大于核心线程数量时,是否(allowCoreThreadTimeOut)允许空闲任务线程等,保活keepAliveTime时间,等待新任务的到来。一个线程工厂ThreadFactory用于创建任务线程,一个拒绝任务处理器RejectedExecutionHandler,默认的拒绝任务策略为AbortPolicy,抛出运行时异常,当然还有直接丢弃策略DiscardPolicy,丢弃旧的任务DiscardOldestPolicy,还有调用者执行任务策略CallerRunsPolicy。上面的变量为volatile,以便线程池执行操作时,可以使用最新的变量。
一个阻塞的任务队列final BlockingQueue<Runnable> workQueue,阻塞队列可以为Linked,Array,Delay,SynchronousQueue等阻塞类型,具体可以根据场景选择。默认为LinkedBlockingQueue队列,一般判断队列是否为空,用isEmpty方法,LinkedBlockingQueue一般用于任务相互之间独立,没有交叉,可独立执行。如果用SynchronousQueue,则可用poll方法判断,同步队列一般用于任务之间有依赖的关系的场景,一个任务执行依赖于另一个任务的结果。DelayQueue队列用于定时任务。ArrayBlockingQueue队列一般可以用于
资源有限情况,可以避免资源被耗尽。一个AtomicInteger的ctl用于包装线程状态runState和任务线程数workerCount;低29位保存任务线程数,高两位用于存储线程池状态,线程池状态已用有四种RUNNING,SHUTDOWN ,STOP,TIDYING ,TERMINATED。
RUNNING:接受新的任务,处理队列任务;
SHUTDOWN:不在接受新的任务,处理队列任务;
STOP:不在接受新任务,不处理队列任务,中断正在执行的任务线程;
TIDYING:所有的任务已经结束,任务线程为0,线程转换到TIDYING;
TERMINATED:线程池已结束,即terminated()方法执行完。
线程的状态是可以数字化比较的。
一个任务线程集final HashSet<Worker> workers,largestPoolSize记录线程池的最大任务线程数,completedTaskCount为完成任务计数器,在任务线程结束时,更新。一个可重入锁mainLock,用于保护非线程安全的变量如workers,largestPoolSize,completedTaskCount。
一个等待线程池结束条件termination,用于控制超时等待线程池关闭。
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):http://donald-draper.iteye.com/blog/2367064
发表评论
-
Executors解析
2017-04-07 14:38 1256ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4457ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2127ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4992Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9113Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6088Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3041Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1511Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1081Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1596Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1069Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1330package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1198/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1164Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1678package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2031线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 925Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1737Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2140Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析上-TransferStack
2017-03-21 22:08 3059Queue接口定义:http://donald-draper. ...
相关推荐
- ctl变量是一个AtomicInteger,它结合了线程池的状态和工作线程的数量,通过位运算实现高效地读写操作。 - 高三位存储运行状态,低29位存储工作线程数量,通过CAPACITY掩码确保数量不超过最大值。 5. **Worker类...
线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor
`ThreadPoolExecutor` 通过灵活配置核心线程数量、最大线程数量、任务队列等参数,实现了对线程的有效管理和任务的高效执行。通过对 `execute()` 方法的深入分析,我们能够更全面地理解 `ThreadPoolExecutor` 在实际...
根据给定文件的信息,我们可以深入探讨Java中`ThreadPoolExecutor`线程池的底层实现原理,特别是其核心数据结构`ctl`以及线程池的各种状态转换。以下是对这些知识点的详细解释: ### 一、线程池`ThreadPoolExecutor...
本文将深入解析线程池的源码,特别关注其中的`AtomicInteger`类以及线程池状态的表示。 `AtomicInteger`是Java并发包`java.util.concurrent.atomic`中的一个原子类型类,它提供了在多线程环境下安全地修改整型变量...
下面将详细解析线程池的设计理念、关键组件以及其实现细节。 ### 设计理念 线程池的设计旨在解决创建和销毁线程带来的高昂开销问题。在多线程环境中,频繁地创建和销毁线程会消耗大量的系统资源,降低系统的整体...
线程池的核心是`ThreadPool`类,它维护了一个线程数组,用于存放空闲线程。 请求处理流程如下: 1. 服务启动时,创建一个最大容量为200的线程数组,并初始化5个空闲线程。 2. 当有用户请求到达时,`threadpool.run...
本文将深入解析线程池的管理源码,帮助读者理解其工作原理和优化策略。 在Java中,`java.util.concurrent`包下的`ThreadPoolExecutor`类是线程池的核心实现。它提供了丰富的参数来定制线程池的行为,包括核心线程数...
在实际开发中,Java还提供了一些预定义的线程池,如`Executors.newFixedThreadPool(int nThreads)`创建固定大小的线程池,`Executors.newSingleThreadExecutor()`创建只有一个线程的线程池等。这些预定义的线程池...
AtomicInteger类型的常量ctl是贯穿线程池整个生命周期的重要属性,它是一个原子类对象,主要用来保存线程的数量和线程池的状态,我们看下与这个属性相关的代码如下所示。 //主要用来保存线程数量和线程池的状态,高...
如果你想监控某一个线程池的执行状态,ThreadPoolExecutor类提供了相关的API,可以实时获取线程池的当前活动线程数、正在排队中的线程数、已经执行完成的线程数、总线程数等信息。 线程池使用示例代码如下: ```...
`ThreadPoolExecutor` 提供了一种灵活的方式来配置线程池,通过控制线程的数量、队列的行为以及对超出容量的任务的处理方式,可以实现高性能的应用程序。 `ThreadPoolExecutor` 的构造函数如下: ```java public ...
Java中的ExecutorService接口及其实现类如ThreadPoolExecutor,允许我们自定义线程池的核心参数,如核心线程数、最大线程数、线程存活时间等。线程池能有效控制并发量,防止过多线程导致系统资源耗尽,同时提供定时...
在Java并发编程中,线程池(ThreadPoolExecutor)是一个至关重要的工具,它允许开发者有效地管理线程资源,提高系统的性能和响应性。JDK 1.5引入了java.util.concurrent包,其中包含了线程池的实现,使得并发编程...
线程池的状态是通过 AtomicInteger ctl 变量来记录的,ctl 变量是一个 32 位的整数,高 3 位用于记录线程池的状态,低 29 位用于记录活动线程的数量。线程池的状态有五种:RUNNING、SHUTDOWN、STOP、TIDYING 和 ...
线程池ThreadPoolExecutor底层原理源码分析
线程池是一种基于池化技术的设计模式,其核心思想是在程序启动时创建一定数量的线程放入池中供后续任务使用,当有新的任务到来时,线程池会分配一个空闲线程来执行该任务。这种方式可以有效减少线程创建和销毁带来的...
1. **线程池的实现**:实验中可能使用了ThreadPoolExecutor来创建线程池,定义了核心线程数和最大线程数,以控制并发处理的请求数量。同时,可能还设置了工作队列(如ArrayBlockingQueue或LinkedBlockingQueue),...
ThreadPoolExecutor提供了灵活的线程管理策略,通过corePoolSize和maximumPoolSize控制线程数量,keepAliveTime和TimeUnit控制非核心线程的空闲存活时间。重要的是,它利用BlockingQueue作为任务队列,平衡了任务...