public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); /** * Retrieves and removes the Future representing the next * completed task, waiting if none are yet present. * * @return the Future representing the next completed task * @throws InterruptedException if interrupted while waiting * 阻塞/ Future<V> take() throws InterruptedException; /** * Retrieves and removes the Future representing the next * completed task or <tt>null</tt> if none are present. * * @return the Future representing the next completed task, or * <tt>null</tt> if none are present * 非阻塞/ Future<V> poll(); }
CompletionService 也不是到处都能用,它不适合处理任务数量有限但个数不可知的场景。例如,要统计某个文件夹中的文件个数,在遍历子文件夹的时候也会“递归地”提交新的任务,但最后到底提交了多少,以及在什么时候提交完了所有任务,都是未知数,无论 CompletionService 还是线程池都无法进行判断。这种情况只能直接用线程池来处理。
CompletionService 接口的实例可以充当生产者和消费者的中间处理引擎,从而达到将提交任务和处理结果的代码进行解耦的目的。生产者调用submit 方法提交任务,而消费者调用 poll(非阻塞)或 take(阻塞)方法获取下一个结果:这一特征看起来和阻塞队列(BlockingQueue)类似,两者的区别在于 CompletionService 要负责任务的处理,而阻塞队列则不会。
在 JDK 中,该接口只有一个实现类 ExecutorCompletionService,该类使用创建时提供的 Executor 对象(通常是线程池)来执行任务,然后将结果放入一个阻塞队列中。
Example1:
1. 背景
在Java5的多线程中,可以使用Callable接口来实现具有返回值的线程。使用线程池的submit方法提交Callable任务,利用submit方法返回的Future存根,调用此存根的get方法来获取整个线程池中所有任务的运行结果。
方法一:如果是自己写代码,应该是自己维护一个Collection保存submit方法返回的Future存根,然后在主线程中遍历这个Collection并调用Future存根的get()方法取到线程的返回值。
方法二:使用CompletionService类,它整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take方法获取线程的返回值。
2. 实现代码
package com.clzhang.sample.thread; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; public class ThreadPoolTest4 { // 具有返回值的测试线程 class MyThread implements Callable<String> { private String name; public MyThread(String name) { this.name = name; } @Override public String call() { int sleepTime = new Random().nextInt(1000); try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 返回给调用者的值 String str = name + " sleep time:" + sleepTime; System.out.println(name + " finished..."); return str; } } private final int POOL_SIZE = 5; private final int TOTAL_TASK = 20; // 方法一,自己写集合来实现获取线程池中任务的返回结果 public void testByQueue() throws Exception { // 创建线程池 ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); BlockingQueue<Future<String>> queue = new LinkedBlockingQueue<Future<String>>(); // 向里面扔任务 for (int i = 0; i < TOTAL_TASK; i++) { Future<String> future = pool.submit(new MyThread("Thread" + i)); queue.add(future); } // 检查线程池任务执行结果 for (int i = 0; i < TOTAL_TASK; i++) { System.out.println("method1:" + queue.take().get()); } // 关闭线程池 pool.shutdown(); } // 方法二,通过CompletionService来实现获取线程池中任务的返回结果 public void testByCompetion() throws Exception { // 创建线程池 ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); CompletionService<String> cService = new ExecutorCompletionService<String>(pool); // 向里面扔任务 for (int i = 0; i < TOTAL_TASK; i++) { cService.submit(new MyThread("Thread" + i)); } // 检查线程池任务执行结果 for (int i = 0; i < TOTAL_TASK; i++) { Future<String> future = cService.take(); System.out.println("method2:" + future.get()); } // 关闭线程池 pool.shutdown(); } public static void main(String[] args) throws Exception { ThreadPoolTest4 t = new ThreadPoolTest4(); t.testByQueue(); t.testByCompetion(); } }
部分输出:
...
Thread4 finished...
method1:Thread4 sleep time:833
method1:Thread5 sleep time:158
Thread6 finished...
method1:Thread6 sleep time:826
method1:Thread7 sleep time:185
Thread9 finished...
Thread8 finished...
method1:Thread8 sleep time:929
method1:Thread9 sleep time:575
...
Thread11 finished...
method2:Thread11 sleep time:952
Thread18 finished...
method2:Thread18 sleep time:793
Thread19 finished...
method2:Thread19 sleep time:763
Thread16 finished...
method2:Thread16 sleep time:990
...
3. 总结
使用方法一,自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。
使用方法二,使用CompletionService来维护处理线程不的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。
Example2:
当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用future task不合适的,效率也不高的,实例如下:
实例一:用一个非complete Service完成的批量任务
public class NonCompleteServiceTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(10); Future<String>[] futures = new FutureTask[10]; /** * 产生一个随机数,模拟不同的任务的处理时间不同 */ for (int i = 0; i < 10; i++) { futures[i] = executorService.submit(new Callable<String>() { public String call(){ int rnt = new Random().nextInt(5); try { Thread.sleep(rnt*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("run rnt = "+rnt); return String.valueOf(rnt*1000); } }); } /** * 获取结果时,如果任务没有完成,则阻塞,在顺序获取结果时, * 可能别的任务已经完成,显然效率不高 */ for (int i = 0; i < futures.length; i++) { System.out.println(futures[i].get()); } executorService.shutdown(); } }
CompletionService:
JDK:
public interface CompletionService<V>
将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。
public class CompleteServiceTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); /** * 产生一个随机数,模拟不同的任务的处理时间不同 */ for (int i = 0; i < 10; i++) { completionService.submit(new Callable<String>() { public String call(){ int rnt = new Random().nextInt(5); try { Thread.sleep(rnt*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("run rnt = "+rnt); return String.valueOf(rnt*1000); } }); } /** * 获取结果时,总是先拿到队列上已经存在的对象,这样不用依次等待结果 * 显然效率更高 */ for (int i = 0; i < 10; i++) { Future<String> future = completionService.take(); System.out.println(future.get()); } executorService.shutdown(); } }
实例来源:http://tomyz0223.iteye.com/blog/1040300
JDK:http://www.cjsdn.net/Doc/JDK50/java/util/concurrent/CompletionService.html
其实也可以不使用CompletionService,可以先创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据
================================================================================
================================================================================
接下来我们来看看CompletionService接口的具体实现:ExecutorCompletionService。
ExecutorCompletionService实现分析
成员变量
ExecutorCompletionService有三个成员变量:
executor:执行task的线程池,创建CompletionService必须指定;
aes:主要用于创建待执行task;
completionQueue:存储已完成状态的task,默认是基于链表结构的阻塞队列LinkedBlockingQueue。
构造方法
ExecutorCompletionService提供两个构造方法,具体的使用具体情况具体分析,使用者可以根据业务场景来进行选择。
task提交
ExecutorCompletionService提供submit方法来提交Callable类型或者Runnable类型的task:
具体的执行流程如下:
1、参数校验,不符合条件的task抛出异常,程序结束;
2、将Callable类型或者Runnable类型的task构造成FutureTask;
3、把构造好的FutureTask交由线程池executor执行。
看到这里可能大家会比较疑惑了,task调用submit方法可以提交,完成的task是什么时候被加入到completionQueue里的呢?
针对这个问题,从submit方法的源码可以看出,在提交到线程池的时候需要将FutureTask封装成QueueingFuture,我们来看看QueueingFuture的具体实现:
从源码可以看出,QueueingFuture是FutureTask的子类,实现了done方法,在task执行完成之后将当前task添加到completionQueue,done方法的具体调用在FutureTask的finishCompletion方法,上篇介绍FutureTask的文章已经做过具体的分析,在这里就不再赘述了。
已完成状态task获取
CompletionService的take方法和poll方法都可以获取已完成状态的task,我们来看看具体的实现:
从源码可以看出,take和poll都是调用BlockingQueue提供的方法。既然take和poll都可以获取到已完成状态的task,那么他们的区别是什么呢?
take()在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,等待,直到存在这样的task;
poll()在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,不等待,直接返回null。
相关推荐
在这个例子中,`trace_func`会执行传入的函数并获取返回值,然后根据返回值更新`ret_flag`。需要注意的是,为了确保在多线程环境下安全地更新`ret_flag`,可能需要使用锁(如`threading.Lock`)来避免数据竞争。 ...
- 在VC++中,主线程可以通过`Joinable`线程类的`join()`成员函数等待子线程结束并获取返回值。 6. **VS2012中的线程支持**: - Visual Studio 2012引入了C++11标准库,其中包含`std::thread`类,提供了一种更现代...
最近有个需求,用多线程比较合适,但是我需要每个线程的返回值,这就需要我在threading.Thread的基础上进行封装 import threading class MyThread(threading.Thread): def __init__(self,func,args=()): super...
然后,在主线程中,使用std::future对象来获取返回值。 下面是一个示例代码: ```cpp #include #include #include #include #include void initializer(std::promise<int> &promiseObj){ std::cout () ; std::...
然而,在使用多线程时,我们往往需要获取每个线程执行完毕后的返回值,以便进行后续处理。这篇教程将详细介绍如何在Python中通过重写`threading`库的`Thread`类来实现这一目标。 首先,为了获取线程的返回值,我们...
在编程领域,线程是程序执行的一个独立路径,它允许程序同时执行多个任务。在易语言这样的编程环境中,处理线程并返回数据是一项常见的需求。本文将深入探讨易语言中线程返回数据的方法,帮助开发者更好地理解和应用...
在C++编程中,获取指定线程的CPU使用率是一项重要的任务,这有助于优化程序性能,监测系统资源消耗。本文将详细介绍如何通过C++来实现这一功能。 首先,我们需要理解CPU使用率的基本概念。CPU使用率是衡量处理器在...
在这个"多线程互斥实例 多线程获取同一变量"的示例中,我们将探讨如何在多个线程中安全地访问共享资源,避免数据不一致性和竞态条件。 首先,我们需要理解多线程中的一些核心概念: 1. **线程**:线程是操作系统...
NULL 博文链接:https://icgemu.iteye.com/blog/467848
使用CompletionService可以将Future对象添加到阻塞队列中,然后使用take()方法来获取Future对象的结果,这样可以实时获取多线程运行结果。 使用Future和CompletionService可以实现异步计算和非阻塞的任务调用,并...
同时,`jdbctest`可能涉及到了如何在多线程环境下使用JDBC,比如在一个线程中执行查询,然后在另一个线程中处理结果,这与我们的主题“有返回值的线程”相吻合。 总之,要实现有返回值的线程,我们可以使用Java的`...
要执行DOS命令并获取返回值,我们可以使用Windows API中的`CreateProcess`函数。这个函数可以启动一个新的进程,并且可以选择接收进程的输出。以下是使用`CreateProcess`的基本步骤: 1. 准备命令行参数:你需要...
但是在C++11 多线程中我们注意到,std::thread对象会忽略顶层函数的返回值。 那问题来了,我们要怎么获得线程的返回值呢? 我们通过一个例子来说明如何实现这个需求。用多个线程计算(a+b)/ (x+y) 的值 有两种方法...
主线程可以使用pthread_join() 函数等待指定线程的结束,并获取线程的返回值。同时,也可以使用pthread_detach() 函数来通知系统对线程的资源进行自动回收,避免了对线程结束的显式等待。 4. 线程的退出:线程在...