现在可以来记录AbstractExecutorService的异步任务提交了,单刀直入吧,先看三个submit方法提交单个任务:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
有了上一篇铺垫很好理解,首先将Runnable或Callable任务包装成RunnableFuture ftask,然后就交给execute(ftask)方法,接着就马上返回ftask。这个execute(Runnable)方法将在子类ThreadPoolExecutor中实现,这里先不着急,看下这个Executor接口中对该方法的描述:
/
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <tt>Executor</tt> implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
在将来某个时刻执行给定的任务command,说明这个方法是将来会去执行,在主线程(submit方法)中是不会阻塞的。另外,注意到如果提交的是Runnable接口,返回的future对象中其结果是在任务提交时候就指定了的,要么是null,要么是T result。上面的分析可以简单测试下:
//Callable 任务
class CallabelTask implements Callable<String>{
String name;
public CallabelTask(String name){
this.name = name;
}
@Override
public String call() throws Exception {
System.out.println("Start to execute the task " + name);
TimeUnit.SECONDS.sleep(5);
return name + " is done!";
}
}
//测试提交 Callable 任务
public static void submitCallableTask()throws Exception{
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<String>> results = new ArrayList<Future<String>>(5);
for(int i = 0; i < 5; ){
results.add(executor.submit(new CallabelTask("task_"+(++i))));
}
System.out.println("All the tasks have been submited through invokeAll method!");
executor.shutdown();
for(Future<String> f : results)
System.out.println(f.get());
}
submitCallableTask() 调用结果:
public static void main(String[] args) throws Exception { submitCallableTask();
}
//调用submitCallableTask的结果:
All the tasks have been submited through invokeAll method!
Start to execute the task task_1
Start to execute the task task_5
Start to execute the task task_3
Start to execute the task task_4
Start to execute the task task_2
task_1 is done!
task_2 is done!
task_3 is done!
task_4 is done!
task_5 is done!
从返回的结果看,主线程在执行完executor.submit(new CallabelTask("task_"+(++i))) 之后并没有阻塞,而是继续往下执行println语句打印出语句:
All the tasks have been submited through invokeAll method!
而提交的任务将由其他线程在“将来某个时刻“去执行。当然在主线程去获取执行结果f.get() 的时候肯定是要阻塞的,因为既然要获取结果了,当然要等到任务执行完毕返回才有啊!
如果把上面的CallabelTask改变成RunnableTask,则返回的结果将是null,原因是提交的时候在new TaskFor方法中就已经指定了返回结果为null:
class RunnableTask implements Runnable{
String name;
public RunnableTask(String name){
this.name = name;
}
@Override
public void run() {
System.out.println("Start to execute the task " + name);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//RunnableTask 执行结果:
All the tasks have been submited through invokeAll method!
Start to execute the task task_1
Start to execute the task task_5
Start to execute the task task_3
Start to execute the task task_4
Start to execute the task task_2
null
null
null
null
null
分享到:
相关推荐
`ExecutorService`接口是`java.util.concurrent`包的一部分,用于管理和控制线程的执行,提供了一种将任务提交到线程池并管理它们执行的方式。`AbstractExecutorService`为`ExecutorService`接口中的部分方法提供了...
- **返回**:一个`List<Future<T>>`,其中包含所有已完成任务的结果。 - **invokeAny(Collection<Callable<T>> tasks)** - **描述**:执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果...
`shutdown()`方法用于关闭线程池,停止接收新任务,但会继续执行已提交的任务。 `AbstractExecutorService`类是`ExecutorService`的一个抽象实现,它实现了大部分`ExecutorService`的方法。`ThreadPoolExecutor`是...
new ArrayBlockingQueue<>(1000), // workQueue Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() // handler ); // 提交任务 for (int i = 0; i < 100; i++) { final int index = ...
在上面的代码中,我们首先创建了一个固定大小的线程池,然后提交了一个任务。随后,我们调用 shutdown() 方法来关闭线程池,并使用 awaitTermination() 方法来等待所有任务执行完毕。 使用 shutdownNow() 方法来...
- **任务队列(BlockingQueue<Runnable>)**:存放待执行任务的队列,当线程池和核心线程数都达到上限后,新任务将被放入队列等待。 - **线程存活时间(keepAliveTime)**:非核心线程在空闲时的存活时间,超过这个...
RejectedExecutionHandler是一个策略接口,当新的任务提交给线程池而线程池无法处理时,会通过这个接口的策略来处理拒绝任务的情况。 ThreadFactory是一个接口,它用来创建新线程。它提供了创建线程的方法,可以...
AbstractExecutorService 抽象类继承 ExecutorService 接口,对 ExecutorService 相关方法提供了默认实现,用 RunnableFuture 的实现类 FutureTask 包装 Runnable 任务,交给 execute() 方法执行,然后可以从该 ...
2. 任务提交:当有任务到来时,线程池会创建一个线程来执行任务,如果线程池中的线程数目达到核心池的大小,就会把到达的任务放到缓存队列中。 3. 线程复用:当一个线程执行完一个任务后,不会被销毁,而是可以继续...
ExecutorService 是 Java.util.concurrent 包中的一个接口,它是 Executor 接口的子接口,提供了更丰富的线程管理功能,如提交任务并获取结果、关闭线程池等。 2. **基础线程池**: - **ThreadPoolExecutor**:这...
- **ExecutorService接口**:提供了任务提交和管理的基本方法。 - **AbstractExecutorService抽象类**:实现了部分ExecutorService接口的方法,如shutdown等。 - **ThreadPoolExecutor**:具体的线程池实现,负责...
2. **ExecutorService接口**:继承自Executor接口,增加了管理和控制线程池的功能,如`submit()`用于提交任务,`shutdown()`用于关闭线程池,`shutdownNow()`用于尝试停止所有正在执行的任务等。 3. **...
创建固定大小的线程池newFixedThreadPool每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。...
MyThreadPool的实现通常会继承AbstractExecutorService,然后重写execute()方法来提交任务。同时,需要维护一个线程安全的工作队列和一个线程集合,用于存储工作线程。当线程池创建时,会根据用户配置初始化一定数量...
1. 当有新任务提交时,如果当前线程数小于核心线程数,则创建新线程执行任务。 2. 如果线程数等于核心线程数,但工作队列未满,任务会被放入工作队列。 3. 如果线程数等于核心线程数,且工作队列已满,若线程数小于...