分析完AbstractExecutorService异步任务提交之后,一直留着一个问题:就是任务提交之后的最终执行方法execute(Runnable)始终没有细究,只知道它会在将来某个时刻去执行任务,也就是所谓的异步执行。 现在可以揭开异步执行方法executor(Runnable command)的真面目了,回到线程池执行器ThreadPoolExecutor,乍看这个方法,蛮精干的:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
作为异步任务执行的核心方法,看上去短短10行,其实蕴含了许多依赖的组件,这个可以参考文章“ThreadPoolExecutor 分析之类基础架构 ”。
在分析之前,需要再解释几个概念,ThreadPoolExecutor是一个线程池执行器,它里面维护了一个线程池,以及一个等待执行的任务队列。
所谓线程池,其实就是一个Worker对象的集合:HashSet<Worker>
workers = new HashSet<Worker>();至于这个Worker的基本类图如下:
从类图看(当然最好结合代码),Worker本身是一个Runnable,它自己维护了执行它的线程对象thead,又维护了一个Runnable对象firstTask(这个对象就是ThreadPoolExecutor线程池的任务对象了),当ThreadPoolExecutor执行一个任务的时候,先获得 (最直接的方法就是new)一个可用的Thread对象,然后再获得(最直接方法就是new)一个Worker对象,并把Thread对象包装进这个Worker对象中,接着让这个thread对象start就开始执行这个Worker对象的run()方法,而run()方法中会去执行Worker.firstTask.run()方法。这就间接的的执行了目标任务,同时通过worker这个包装(或者说代理)之后,可用做很多额外的工作,比如中断自身执行线程,记录在该线程上执行过的任务数量等。分析addIfUnderCorePoolSize(command)方法时还会分析Worker类。
线程池数量poolSize指工作线程Worker对象的集合workers的实际大小,通过workers.size()可直接获得。
核心线程池数量corePoolSize,可理解为工作线程Worker对象的集合workers的目标大小。如果poolSize
> corePoolSize,那么ThreadPoolExecutor就会有机制在适当的时候回收闲置的线程。
最大线程池数量maxPoolSize,就是工作线程Worker对象的集合workers的大小上限。假如说任务队列满了,再来新任务时,若poolSize还没达到maxPoolSize,则继续创建新的线程来执行新任务,若不幸poolSize达到了上限maxPoolSize,那不能再创建新的线程了,只能采取reject策略来拒绝新任务。
所谓任务队列,就是一个Runnable对象的阻塞队列:BlockingQueue<Runnable> workQueue; 可根据不同需求设置不同的队列类型。
下面分析execute(command)执行流程:
1. 第四行if (poolSize >= corePoolSize ||
!addIfUnderCorePoolSize(command))
解释: 如果当前线程池中线程数量poolSize >= 核心线程数量corePoolSize 成立,那么逻辑或运算符后面的方法addIfUnderCorePoolSize(command)就忽略不做,而直接进花括号内部;如果poolSize
>= corePoolSize不成立,尝试调用addIfUnderCorePoolSize(command)方法,该方法返回true就进花括号,否则整个execute方法就结束。
理解:这很好理解,如果当前线程数量poolSize>=核心线程数量corePoolSize,那当然无法再把当前任务加入到核心线程池中执行了,于是进花括号选择其他的策略执行;如果poolSize没有达到corePoolSize,那很自然是把当前任务放到核心线程池执行,也就是执行逻辑或运算符后的方法addIfUnderCorePoolSize(command)。“放到核心线程池执行”是什么意思呢?就是new 一个新工作线程放到workers集合中,让这个新线程来执行当前的任务command,而这个新线程可以认为是核心线程池中的其中一个线程。
addIfUnderCorePoolSize(command)方法做了什么事情?请看代码:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
主要就是在poolSize < corePoolSize并且当前状态runState == RUNNING时通过方法addThread(firstTask)返回一个线程t,然后马上t.start()执行任务,返回true。这里有失败的可能,因为虽然在前面execute方法中已经保证了poolSize < corePoolSize进入该方法,但是当时并没有加锁,很有可能到了当前的位置poolSize已经改变了,所以这里必须再次检查并且必须加锁访问。顺便简单看下addThread方法:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
直接new一个工作线程Worker对象,封装目标任务firstTask,用线程工厂创建一个新线程t,然后t又赋值给Worker的thread属性,这下对Worker有了新理解吧?线程t 用来执行Worker对象,而Worker对象的thread属性的值=t ,他维护了执行它自己的那个线程。t.start()便开始执行worker.run(),而worker.run中会去执行封装目标任务firstTask.run()方法。当然Worker还没有完,后面还会分析到。
addIfUnderCorePoolSize(command)会返回false吗?如果corePoolSize设置的足够大,基本就不会失败,那execute(command)方法做完这句if语句判断就结束了,压根不需要进入花括号继续。但是不幸的是,addIfUnderCorePoolSize还是经常会失败的。所以接下来还得看看if大括号里面的逻辑。
2. 第五行if (runState == RUNNING &&
workQueue.offer(command))
解释:runState表示这个TheadPoolExecutor的状态,可以有4个状态
a) RUNNING可接收新任务并执行任务队列
b) SHUTDOWN不能接收新任务,但可以继续执行任务队列
c) STOP不能接收新任务,也不在处理任务队列,并且中断正在执行的任务
d) TERMINATED在STOP基础上,所有线程都已终止
程序若到了这一步,说明当前线程数量poolSize >=核心线程数量corePoolSize,这里先判断是不是这个TheadPoolExecutor还是RUNNING状态,若是则试着加入到任务队列workQueue中,无法加入的唯一可能就是队列已经满了。先说没满的情况,就是任务加入到任务队列成功。按照常理,加入了队列以后,只要保证有工作线程就ok了,工作线程会自动去执行任务队列的。所以判断一下if ( runState != RUNNING
|| poolSize == 0),在这个if为true时候,去保证一下任务队列有线程会执行,即执行ensureQueuedTaskHandled(command)方法。这里有两种情况,情况一:runState != RUNNING,这种情况在ensureQueuedTaskHandled方法中会把任务丢给reject拒绝策略处理,情况二:poolSize == 0,这种情况是new一个新线程加入到工作线程集合workers中。
3. 第九行else if (!addIfUnderMaximumPoolSize(command))
解释:程序执行到这个分支,说明上面第五行if条件为false,也就是说当前状态runState != RUNNING,或者任务队列workQueue已经满了。先看第一个条件下,前面解释过runState,除了RUNNING状态,其他三个状态都不能接收新任务,所以当runState != RUNNING时新任务只能根据reject策略拒绝,而这个拒绝的逻辑是在addIfUnderMaximumPoolSize方法中实现的;再看第二个条件下,workQueue已经满,潜在的条件是runState == RUNNING,这种情况怎么处理新任务呢?很简单,若当前线程数量已经poolSize没有达到最大线程数量maxPoolSize,则创建新的线程去执行这个无法加入任务队列的新任务,否则就根据reject策略拒绝,这里的拒绝逻辑就在这个else if条件成立的子句中做的,即第10行reject(command);
到此,任务异步执行的整个过程execute(Runnable command)分析完毕,顺便画个流程图作为本文的结尾,本来应该呼应一下前文,完成对工作线程Worker类的分析,包括选取执行任务队列、及在任务队列为空时将Worker对象回收,但是看来得放到下一篇了。
- 大小: 9.9 KB
- 大小: 43.1 KB
分享到:
相关推荐
通过对 `execute()` 方法的深入分析,我们能够更全面地理解 `ThreadPoolExecutor` 在实际应用场景中的作用和意义。 线程池的设计不仅仅是为了提高程序的并发性能,更重要的是通过合理的配置和管理,避免因过度创建...
ThreadPoolExecutor线程池原理及其execute方法详解 ThreadPoolExecutor是Java并发包中提供的线程池类,用于管理和执行异步任务。ThreadPoolExecutor的执行原理可以分为四个步骤: 1.核心线程池:...
本文将深入解析ThreadPoolExecutor的execute()方法执行流程,以帮助我们理解线程池的工作原理。 当一个任务被提交到线程池,线程池的执行策略主要分为四步: 1. 首先,线程池会检查当前的核心线程数是否已达到设定...
1. **`execute(Runnable command)`**:该方法用于提交一个`Runnable`类型的命令,它不返回任何结果,通常用于执行不需返回值的任务。 2. **`submit(Runnable task)`**:这个方法同样用于提交一个`Runnable`任务,但...
线程池ThreadPoolExecutor使用简介与方法实例 线程池ThreadPoolExecutor是Java并发编程中一个非常重要的概念,它允许开发者将任务提交给线程池,并由线程池来管理这些任务的执行。今天,我们将对线程池...
3. 提交任务:使用execute方法提交任务到ThreadPoolExecutor中。 4. 关闭ThreadPoolExecutor:使用shutdown方法关闭ThreadPoolExecutor,防止新的任务提交。 在使用ThreadPoolExecutor时,还需要注意以下几点: * ...
任务可以通过execute(Runnable)方法被添加到线程池,任务就是一个Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。 当一个任务通过execute(Runnable)方法欲添加到线程池时,线程池会根据当前...
- **任务分发**:使用Java的Jedis或Redisson等客户端库与Redis通信,获取任务后,创建`Runnable`或`Callable`对象,然后调用ThreadPoolExecutor的`execute`或`submit`方法提交任务。 - **线程池配置**:根据系统...
"ThreadPoolExecutor线程池之submit方法详解" 在 Java 中,ThreadPoolExecutor 是一个非常重要的线程池实现类,它提供了多种方式来执行任务,其中 submit 方法是其中一个重要的方法,本文将详细解释 ...
然后,我们使用 ThreadPoolExecutor 的 invokeAll() 方法将这些任务提交到线程池中,并行地执行这些任务。最后,我们使用 Future 对象来获取每个任务的执行结果,并将其添加到一个列表中。 通过使用 ...
1. 当主线程提交任务至线程池(调用execute方法)时,线程池首先检查当前线程数是否小于核心线程数。如果是,就创建新的工作线程(addWorker)并立即执行任务。 2. 如果线程数达到或超过核心线程数,线程池会尝试将...
`ExecutorService`是线程池的主要接口,提供了执行任务的方法,如`execute()`用于执行无返回值的`Runnable`任务,以及`submit()`用于执行有返回值的`Callable`任务。`shutdown()`方法用于关闭线程池,停止接收新任务...
在使用ThreadPoolExecutor执行任务时,我们需要创建一个Callable对象,Callable对象中实现了call()方法,该方法将被线程池执行。例如,我们可以创建一个ThreadPoolTask对象,该对象实现了Callable接口: ```java ...
2. 使用ThreadPoolExecutor构造方法:ThreadPoolExecutor构造方法可以根据实际情况创建线程池,例如指定核心线程数、最大线程数、阻塞队列等。 五、线程池的使用注意事项 在使用线程池时,需要注意以下几点: 1. ...
* RejectedExecutionHandler:饱和策略,最大线程和工作队列容量且已经饱和时 execute 方法都将调用 RejectedExecutionHandler ThreadPoolExecutor 的工作流程 ThreadPoolExecutor 的工作流程可以分为以下四步: ...
首先,需要创建一个ThreadPoolExecutor对象,然后可以通过execute方法来提交任务。例如: ```java ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(10)...
这些方法使用模板方法模式,内部调用了未实现的`execute()`方法,这个方法由具体的线程池实现类提供。 4. **ThreadPoolExecutor类**:实现了ExecutorService接口,是线程池的具体实现。它包含了一系列参数,如核心...
- `execute()`方法用于提交一个`Runnable`任务到线程池中执行。 #### 五、自定义线程池示例(另一种) ```java @Configuration public class WebMvcConfig implements WebMvcConfigurer { @Bean(name = ...
在`ThreadPoolExecutor`的`execute()`方法中,线程池会根据当前线程数、任务队列状态决定是新建线程、重用线程还是拒绝任务。线程池的扩展性和灵活性使其成为Java多线程编程中的重要工具。 总结,`...