- 浏览: 980955 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Executor接口的定义:http://donald-draper.iteye.com/blog/2365625
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
上一篇文章我们看了一下线程工厂、工作线程,拒绝策略,先回顾一下:
默认的线程池拒绝任务处理策略AbortPolicy,直接抛出RejectedExecutionException;直接丢弃策略DiscardPolicy,丢弃旧的任务DiscardOldestPolicy,调用者执行任务CallerRunsPolicy。DiscardOldestPolicy和CallerRunsPolicy都是在线程池没关闭时,策略才生效,否则关闭直接丢弃任务。
拒绝策略都为ThreadPoolExecutor的内部类。
默认的线程工厂DefaultThreadFactory为Executors的内部类, 用于创建线程,工厂创建分组相同的线程,交由执行器执行。如果有java.lang.SecurityManager,则用System#getSecurityManager线程组,否则用调用者的线程组。创建的新线程为非守护模式,优先级在 MIN_PRIORITY和MAX_PRIORITY之间,默认为NORM_PRIORITY。可以通过Thread#getName获取线程name,默认为pool-N-thread-M,N为线程池编号,M为线程编号。
Worker包装了任务线程,主要是为了维护中断控制状态和其他次要状态记录,及任务的执行。Worker同时继承了AQS,在任务线程执行前lock,任务执行完unlock。加锁的目的主要是保护任务线程的执行,线程池唤醒一个任务线程等待任务,而不是中断当前正在执行任务的线程去执行任务。Worker使用了一个 非重入互质锁,而不是ReentrantLock,这样做的目的是以防在任务执行的过程,线程池控制方法的改变,对任务线程执行的影响,比如setCorePoolSize方法。另外为了防止任务线程在实际执行前被中断,我们初始化锁状态为-1,在runWorker方法中,我们会清除它。runWorker执行任务时,首先释放锁,此时锁打开,允许中断,如果线程池正在stop,确保线程池已中断,否则
做执行前工作,执行任务,做执行后工作,如果任务被中断,则工作线程数量减1;
如果任务完成,则更新完成任务数量,从工作任务集中移除工作线程,尝试结束线程池。
尝试结束线程池,首先检查线程池运行状态如果为运行中,关闭但任务队列不为空,
或线程池工作线程为0,任务队列为空,则直接返回;否则查看工作线程是否为0,不为0,则根据onlyOne参数确定中断多少空闲线程,如果onlyOne为true,中断一个,否则中断所有空闲线程。
今天我们来看任务的提交和执行,如果篇幅够的话,把线程池的关闭也说一下。
先来看任务的提交执行。
首先提交任务:
//AbstractExecutorService
提交任务方法是在AbstractExecutorService中实现 ,具体的执行任务在
execute方法中,这个方法在Executor其中为抽象方法,
ThreadPoolExecutor重写了这个方法,来看execute方法
执行任务方法中我们与3点要看:
1.工作线程数量小于核心线程池数量,添加工作线程,执行任务;
我们来看addWorker添加工作线程执行任务;
再来看执行失败回滚处理:
//tryTerminate
来看尝试结束线程池的这一点,
//如果工作线程不为空,则中断空闲工作线程
再看
//执行结束工作
小节一下:
根据当前线程池状态和核心线程池数量与最大线程池数量,检查是否应该,
添加工作线程执行任务。如果应该添加工作线程,则更新工作线程数,
如果调整成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,
则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于
线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。
回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,
如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭
,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。
2.如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,
回滚任务,从队列中移除任务;
这部在看了第一点之后,没有什么可看的了,一看就明白,
来一下 reject(command);
3.如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;
如果工作线程小于最大线程池数量,则CAS操作workCount,成功创建工作线程执行任务。
有了前面的两点,这一点很容量理解。
在前面一篇文章中,我们讲了工作线程,这一篇我们简单看了一下线程池执行任务,
我们回到上一篇的线程执行,还有一点我们没有看:
我们来看任务的执行
如果任务不为null,即创建工作线程成功,并执行任务,如果为null(即在线程池执行任务的时候,创建工作线程失败,任务入队列),从任务队列取一个任务。
来看从任务队列取任务:
再来看线程池执行任务的第二点,
2.如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;
关键的一点在addWorker(null, false)
上述代码,如果成功添加的一个空任务的工作线程,任务为空的话,则
从任务队列取任务执行,这个过程与创建工作线程失败,任务入队列相
对应。
总结:
执行任务的过程为,如果工作线程数量小于核心线程池数量,添加工作线程,执行任务;如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;如果工作线程小于最大线程池数量,
则CAS操作workCount,成功创建工作线程执行任务。添加工作线程的过程为,如果应该添加工作线程,则CAS更新工作线程数,如果更新成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
上一篇文章我们看了一下线程工厂、工作线程,拒绝策略,先回顾一下:
默认的线程池拒绝任务处理策略AbortPolicy,直接抛出RejectedExecutionException;直接丢弃策略DiscardPolicy,丢弃旧的任务DiscardOldestPolicy,调用者执行任务CallerRunsPolicy。DiscardOldestPolicy和CallerRunsPolicy都是在线程池没关闭时,策略才生效,否则关闭直接丢弃任务。
拒绝策略都为ThreadPoolExecutor的内部类。
默认的线程工厂DefaultThreadFactory为Executors的内部类, 用于创建线程,工厂创建分组相同的线程,交由执行器执行。如果有java.lang.SecurityManager,则用System#getSecurityManager线程组,否则用调用者的线程组。创建的新线程为非守护模式,优先级在 MIN_PRIORITY和MAX_PRIORITY之间,默认为NORM_PRIORITY。可以通过Thread#getName获取线程name,默认为pool-N-thread-M,N为线程池编号,M为线程编号。
Worker包装了任务线程,主要是为了维护中断控制状态和其他次要状态记录,及任务的执行。Worker同时继承了AQS,在任务线程执行前lock,任务执行完unlock。加锁的目的主要是保护任务线程的执行,线程池唤醒一个任务线程等待任务,而不是中断当前正在执行任务的线程去执行任务。Worker使用了一个 非重入互质锁,而不是ReentrantLock,这样做的目的是以防在任务执行的过程,线程池控制方法的改变,对任务线程执行的影响,比如setCorePoolSize方法。另外为了防止任务线程在实际执行前被中断,我们初始化锁状态为-1,在runWorker方法中,我们会清除它。runWorker执行任务时,首先释放锁,此时锁打开,允许中断,如果线程池正在stop,确保线程池已中断,否则
做执行前工作,执行任务,做执行后工作,如果任务被中断,则工作线程数量减1;
如果任务完成,则更新完成任务数量,从工作任务集中移除工作线程,尝试结束线程池。
尝试结束线程池,首先检查线程池运行状态如果为运行中,关闭但任务队列不为空,
或线程池工作线程为0,任务队列为空,则直接返回;否则查看工作线程是否为0,不为0,则根据onlyOne参数确定中断多少空闲线程,如果onlyOne为true,中断一个,否则中断所有空闲线程。
今天我们来看任务的提交和执行,如果篇幅够的话,把线程池的关闭也说一下。
先来看任务的提交执行。
首先提交任务:
//AbstractExecutorService
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
提交任务方法是在AbstractExecutorService中实现 ,具体的执行任务在
execute方法中,这个方法在Executor其中为抽象方法,
ThreadPoolExecutor重写了这个方法,来看execute方法
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 1.如果工作线程小于核心线程池数量,尝试新建一个工作线程执行任务addWorker。 addWorker将会自动检查线程池状态和工作线程数,以防在添加工作线程的过程中, 线程池被关闭。 * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 2.如果创建工作线程执行任务失败,则任务入队列,如果入队列成功, 我们仍需要二次检查线程池状态,以防在入队列的过程中,线程池关闭。 如果线程池关闭,则回滚任务。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. 如果任务入队列失败,则尝试创建一个工作线程执行任务 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //如果当前工作线程数小于核心线程池数量,则添加新的工作线程执行任务 if (addWorker(command, true)) return; c = ctl.get(); } //如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务 if (! isRunning(recheck) && remove(command)) reject(command);//移除成功,则进行拒绝任务处理 else if (workerCountOf(recheck) == 0) //如线程池已关闭,且工作线程为0,则创建一个空闲工作线程 addWorker(null, false); } //根据最大线程池数量,判断是否应该添加工作线程,如果当前工作线程数量小于最大线程池数量,则尝试添加 //工作线程线程执行任务,如果尝试失败,则拒绝任务处理 else if (!addWorker(command, false)) reject(command); }
执行任务方法中我们与3点要看:
1.工作线程数量小于核心线程池数量,添加工作线程,执行任务;
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //如果当前工作线程数小于核心线程池数量,则添加新的工作线程执行任务 if (addWorker(command, true)) return; c = ctl.get(); }
我们来看addWorker添加工作线程执行任务;
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread#start), we roll back cleanly. * 根据当前线程池状态和核心线程池数量与最大线程池数量,检查是否应该, 添加工作线程执行任务。如果应该添加工作线程,则更新工作线程数, 如果调整成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭, 则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于 线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。 * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //如果线程池已关闭或线程池正在关闭,提交的任务为null且任务队列不为空,则直接返回false //添加工作线程失败。 return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //如果工作线程数量大于线程池容量, //或当前工作线程数量大于core(如果core,为true,则为corePoolSize,否则maximumPoolSize) return false; if (compareAndIncrementWorkerCount(c)) //CAS操作工作线程数,即原子操作工作线程数+1,成功则跳出自旋 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //如果在判断是否应该添加工作线程执行任务和CAS操作工作线程数, //线程状态改变,跳出本次自旋 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false;//工作线程是否开始 boolean workerAdded = false;//工作线程是否添加成功 Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果线程池是正在运行或线程池正在关闭,任务为null if (t.isAlive()) // precheck that t is startable //线程存活,抛出非法线程状态异常 throw new IllegalThreadStateException(); //添加工作线程,到工作线程集 workers.add(w); int s = workers.size(); if (s > largestPoolSize) //更新最大线程池数量 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //添加工作线程成功,则执行任务 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //执行任务失败,则回滚工作线程和工作线程数 addWorkerFailed(w); } return workerStarted; }
再来看执行失败回滚处理:
if (! workerStarted) //执行任务失败,则回滚工作线程和工作线程数 addWorkerFailed(w);
/** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) //从工作线程集移除工作线程 workers.remove(w); //工作线程数减-1 decrementWorkerCount(); //检查是否线程池关闭,关闭则执行相关工作 //这个我们在前面说过,则里简单回顾一下 tryTerminate(); } finally { mainLock.unlock(); } }
//tryTerminate
/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { //自旋尝试关闭线程池 for (;;) { int c = ctl.get(); //如果线程池正在运行,或正在关闭且队列不为空,则返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate //如果工作线程不为空,则中断空闲工作线程 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //执行结束工作 terminated(); } finally { //线程池已结束 ctl.set(ctlOf(TERMINATED, 0)); //唤醒等待线程池结束的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
来看尝试结束线程池的这一点,
//如果工作线程不为空,则中断空闲工作线程
interruptIdleWorkers(ONLY_ONE); /** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * 中断等待任务的空闲非锁住状态的工作线程 * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. 如果onlyOne为true以为着,只中断最多一个空闲工作线程,这个在关闭线程池时, 调用或关闭的过程中,工作线程完成任务调用。 */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { //遍历工作线程集 Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) {//锁打开说明,工作线程空闲 try { //如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) //如果是只中断一个空闲线程,则结束本次中断空闲线程任务 break; } } finally { mainLock.unlock(); } }
再看
//执行结束工作
terminated();
/** * Method invoked when the Executor has terminated. Default * implementation does nothing. Note: To properly nest multiple * overridings, subclasses should generally invoke 待子类扩展 * {@code super.terminated} within this method. */ protected void terminated() { }
小节一下:
根据当前线程池状态和核心线程池数量与最大线程池数量,检查是否应该,
添加工作线程执行任务。如果应该添加工作线程,则更新工作线程数,
如果调整成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,
则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于
线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。
回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,
如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭
,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。
2.如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,
回滚任务,从队列中移除任务;
//如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务 if (! isRunning(recheck) && remove(command)) reject(command);//移除成功,则进行拒绝任务处理 else if (workerCountOf(recheck) == 0) //如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程 //即线程池正在关闭之后的状态,且任务队列不为空 addWorker(null, false); }
这部在看了第一点之后,没有什么可看的了,一看就明白,
来一下 reject(command);
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { //调用拒绝任务处理器处理任务 handler.rejectedExecution(command, this); }
3.如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;
如果工作线程小于最大线程池数量,则CAS操作workCount,成功创建工作线程执行任务。
//根据最大线程池数量,判断是否应该添加工作线程,如果当前工作线程数量小于最大线程池数量,则尝试添加 //工作线程线程执行任务,如果尝试失败,则拒绝任务处理 else if (!addWorker(command, false)) reject(command);
有了前面的两点,这一点很容量理解。
在前面一篇文章中,我们讲了工作线程,这一篇我们简单看了一下线程池执行任务,
我们回到上一篇的线程执行,还有一点我们没有看:
final void runWorker(Worker w) { Thread wt = Thread.currentThread();//当前线程 Runnable task = w.firstTask;//工作线程任务 w.firstTask = null; //任务线程的锁状态默认为-1,此时解锁+1,变为0,即锁打开状态,允许中断,在任务未执行之前,不允许中断。 w.unlock(); // allow interrupts, boolean completedAbruptly = true;//完成后是否可以中断 try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //如果线程池正在Stop,则确保线程中断; //如果非处于Stop之后的状态,则判断是否中断,如果中断则判断线程池是否已关闭 //如果线程池正在关闭,但没有中断,则中断线程池 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行前工作 beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行后工作 afterExecute(task, thrown); } } finally { task = null; //任务线程完成任务数量加1,释放锁 w.completedTasks++; w.unlock(); } } //任务已执行完不可以中断 completedAbruptly = false; } finally { //处理任务完成后的工作 processWorkerExit(w, completedAbruptly); } }
我们来看任务的执行
while (task != null || (task = getTask()) != null) {
如果任务不为null,即创建工作线程成功,并执行任务,如果为null(即在线程池执行任务的时候,创建工作线程失败,任务入队列),从任务队列取一个任务。
来看从任务队列取任务:
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 如果线程池处于关闭之后或已关闭任务队列为空,则重置工作线程数 decrementWorkerCount(); return null;//返回null任务 } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); //如果线程池正在运行,根据是否允许空闲线程等待任务和 //当前工作线程与核心线程池数量比较值,判断是否需要超时等待任务 timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) //如果当前工作线程数,小于最大线程数,空闲工作线程不需要超时等待任务, //则跳出自旋,即在当前工作线程小于最大线程池的情况下,有工作线程可用, //任务队列为空。 break; if (compareAndDecrementWorkerCount(c)) //减少工作线程数量失败,返回null return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //如果与自旋前状态不一致,跳出本次自旋 continue retry; // else CAS failed due to workerCount change; retry inner loop } try { //如果非超时则直接take,否则等待keepAliveTime时间,poll任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
再来看线程池执行任务的第二点,
2.如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;
//如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务 if (! isRunning(recheck) && remove(command)) reject(command);//移除成功,则进行拒绝任务处理 else if (workerCountOf(recheck) == 0) //如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程 //即线程池正在关闭之后的状态,且任务队列不为空 addWorker(null, false); }
关键的一点在addWorker(null, false)
//如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程 //即线程池正在关闭之后的状态,且任务队列不为空 addWorker(null, false);
上述代码,如果成功添加的一个空任务的工作线程,任务为空的话,则
从任务队列取任务执行,这个过程与创建工作线程失败,任务入队列相
对应。
总结:
执行任务的过程为,如果工作线程数量小于核心线程池数量,添加工作线程,执行任务;如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;如果工作线程小于最大线程池数量,
则CAS操作workCount,成功创建工作线程执行任务。添加工作线程的过程为,如果应该添加工作线程,则CAS更新工作线程数,如果更新成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
发表评论
-
Executors解析
2017-04-07 14:38 1245ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4451ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2117ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4987Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9100Executor接口的定义:http: ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3036Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20514Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1503Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1071Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1586Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1061Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1325package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1193/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1159Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1672package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2025线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 920Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1733Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2133Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析上-TransferStack
2017-03-21 22:08 3048Queue接口定义:http://donald-draper. ...
相关推荐
- 当提交新任务时,如果线程池状态允许,任务会被放入workQueue,否则拒绝任务。 - 工作线程通过循环调用`runWorker(this)`方法从workQueue中获取并执行任务。 - 如果线程池状态变为SHUTDOWN或STOP,且workQueue...
通过上述对`ThreadPoolExecutor`线程池底层实现原理的解析,我们可以看到Java线程池的强大之处在于其高效的状态管理和任务调度机制。通过对`ctl`变量的巧妙利用,线程池能够有效地管理线程状态和数量,从而实现高...
在这个例子中,它简单地输出了一条消息,并使用注入的线程池执行了一个额外的任务。 #### 五、总结 本文详细介绍了如何在Spring3.2.6中配置线程池和定时任务,包括XML配置文件的具体设置以及程序类的实现。通过这种...
`execute()` 方法是线程池的主要入口,用于提交新的任务到线程池中执行。下面是对该方法的具体分析: 1. **获取当前状态 `ctl`**: - 该方法首先获取线程池的当前状态 `ctl`,以便进行后续操作。 2. **检查线程数...
Java线程池是一种高效管理线程资源的工具,它通过维护一组可重用的...通过`Thread_Project_Test`这个项目,你可以实践并了解线程池的使用,包括创建线程池、提交任务、配置参数等,进一步提升对Java并发编程的理解。
本文将深入解析线程池的管理源码,帮助读者理解其工作原理和优化策略。 在Java中,`java.util.concurrent`包下的`ThreadPoolExecutor`类是线程池的核心实现。它提供了丰富的参数来定制线程池的行为,包括核心线程数...
2. **当池子大小等于 corePoolSize 时**:如果还有新任务提交,但所有核心线程都在执行任务,那么新任务将会被放置到 `workQueue` 阻塞队列中等待执行。 3. **当 workQueue 放不下新入的任务时**:如果 `workQueue` ...
2. `CallerRunsPolicy`: 调用者运行策略,当前提交任务的线程自己处理这个任务,不创建新线程。 3. `DiscardOldestPolicy`: 抛弃最旧的任务,尝试再次提交当前任务。 4. `DiscardPolicy`: 直接丢弃当前任务,不做...
线程池的执行流程涉及execute()方法,当提交任务时,线程池会根据核心线程数、任务队列和最大线程数的状态来决定如何处理任务。如果核心线程数小于corePoolSize,则创建新核心线程;如果核心线程数已满,则将任务...
1. **ExecutorService**: Java的`ExecutorService`接口是线程池的主要入口点,它提供了一种管理和控制线程的方法,如提交任务、关闭线程池等。 2. **ThreadPoolExecutor**: `ThreadPoolExecutor`是`ExecutorService`...
2. 任务提交:提交一个任务到线程池中。 3. 任务执行:线程池中的线程执行任务。 4. 线程池调整:根据线程池的状态和线程数量,动态调整线程池的大小。 构造函数解析 ThreadPoolExecutor 构造函数的参数包括: * ...
ThreadPoolExecutor 是 JUC 中的线程池实现类,它继承自 Executor 接口,提供了 execute() 方法来提交任务。 Executor 接口提供了一个 submit() 方法,可以提交任务并返回 Future 对象,以便可以异步地获取任务的...
本文将深入解析ThreadPoolExecutor的execute()方法执行流程,以帮助我们理解线程池的工作原理。 当一个任务被提交到线程池,线程池的执行策略主要分为四步: 1. 首先,线程池会检查当前的核心线程数是否已达到设定...
`TaskExecutor`是Spring提供的异步任务执行的核心接口,它只有一个方法`execute(Runnable task)`,用于提交任务。Spring提供了多个预定义的`TaskExecutor`实现,以满足不同场景的需求。例如: - `SyncTaskExecutor`...
- ThreadPoolExecutor:线程池执行器,负责创建和管理线程,以及分配任务给工作线程。 - ThreadPoolConfig:线程池配置类,用于设置线程池的参数,如最大线程数、队列容量等。 在实际编程中,线程池的实现方式可能...
- 当Socket连接建立后,将任务提交到线程池执行,主线程继续监听新的客户端连接,实现高并发的Socket服务器。 总之,线程池和Socket的结合使用,能够高效地处理网络服务中的并发连接,提高服务器的吞吐量和响应...
在实际开发过程中,在线程池使用过程中可能会遇到各方面的故障,如线程池阻塞,无法提交新任务等。如果你想监控某一个线程池的执行状态,ThreadPoolExecutor类提供了相关的API,可以实时获取线程池的当前活动线程数...
`ThreadPoolExecutor`类中,`execute()`方法是提交任务的关键入口,它会根据线程池的状态和当前的工作情况决定如何处理任务。同时,`ThreadPoolExecutor`的内部类`Worker`实现了`Runnable`接口,代表了线程池中的...
重要的是,它利用BlockingQueue作为任务队列,平衡了任务提交和执行的速度,提高了效率。线程池的状态管理是其设计的关键。它具有五种状态:RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED,这些状态管理线程池的...
- **handler**:当提交任务数量超过`maximumPoolSize`时,线程池会采取的饱和策略。 #### 五、线程池工作流程示例 假设一个银行场景作为线程池工作的模拟案例: - **初始状态**:线程池初始为空,相当于银行尚未...