`
海浪儿
  • 浏览: 274741 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ExecutorCompletionService分析及使用

阅读更多

ExecutorCompletionService分析及使用

 

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:

 

方式一:

通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 

public class CompletionServiceTest {

	static class Task implements Callable<String>{
		private int i;
		
		public Task(int i){
			this.i = i;
		}

		@Override
		public String call() throws Exception {
			Thread.sleep(10000);
			return Thread.currentThread().getName() + "执行完任务:" + i;
		}	
	}
	
	public static void main(String[] args){
		testUseFuture();
	}
	
	private static void testUseFuture(){
		int numThread = 5;
		ExecutorService executor = Executors.newFixedThreadPool(numThread);
		List<Future<String>> futureList = new ArrayList<Future<String>>();
		for(int i = 0;i<numThread;i++ ){
			Future<String> future = executor.submit(new CompletionServiceTest.Task(i));
			futureList.add(future);
		}
				
		while(numThread > 0){
			for(Future<String> future : futureList){
				String result = null;
				try {
					result = future.get(0, TimeUnit.SECONDS);
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				} catch (TimeoutException e) {
					//超时异常直接忽略
				}
				if(null != result){
					futureList.remove(future);
					numThread--;
					System.out.println(result);
					//此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
					break;
				}
			}
		}
	}
}

 方式二:

第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

public class CompletionServiceTest {

	static class Task implements Callable<String>{
		private int i;
		
		public Task(int i){
			this.i = i;
		}

		@Override
		public String call() throws Exception {
			Thread.sleep(10000);
			return Thread.currentThread().getName() + "执行完任务:" + i;
		}	
	}
	
	public static void main(String[] args) throws InterruptedException, ExecutionException{
		testExecutorCompletionService();
	}
	
	private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{
		int numThread = 5;
		ExecutorService executor = Executors.newFixedThreadPool(numThread);
		CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
		for(int i = 0;i<numThread;i++ ){
			completionService.submit(new CompletionServiceTest.Task(i));
		}
}
		
		for(int i = 0;i<numThread;i++ ){		
			System.out.println(completionService.take().get());
		}
		
	}

 

ExecutorCompletionService分析:

 CompletionService是Executor和BlockingQueue的结合体。

public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

 任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture,

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

 QueueingFutureFutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

 

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }
分享到:
评论
3 楼 海浪儿 2016-03-22  
lis1314 写道
private static int count;
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newCachedThreadPool();
ConcurrentLinkedQueue<Future<Integer>> queue = new ConcurrentLinkedQueue<>();
for (int i = 0; i < 5; i++) {
Future<Integer> task = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return count++;
}
});
queue.add(task);
}
Integer sum = 0;
while(true){
Future<Integer> future = queue.poll();
if(future == null)break;
sum += future.get();
}
System.out.println(sum);
pool.shutdown();
pool.isTerminated();
}

想说明什么问题?
2 楼 lis1314 2016-03-07  
private static int count;
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newCachedThreadPool();
ConcurrentLinkedQueue<Future<Integer>> queue = new ConcurrentLinkedQueue<>();
for (int i = 0; i < 5; i++) {
Future<Integer> task = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return count++;
}
});
queue.add(task);
}
Integer sum = 0;
while(true){
Future<Integer> future = queue.poll();
if(future == null)break;
sum += future.get();
}
System.out.println(sum);
pool.shutdown();
pool.isTerminated();
}
1 楼 qichangleixin 2015-07-07  
为什么用双重循环,会出现并发修改异常呢

相关推荐

    Callable,Future的使用方式

    Callable,Future的使用方式,里面使用了三种使用方式分别是FutureTask,ExecutorService,ExecutorCompletionService

    32 请按到场顺序发言—Completion Service详解.pdf

    在使用`ExecutorCompletionService`时,我们需要创建一个`ExecutorService`实例,然后将这个`ExecutorService`传递给`ExecutorCompletionService`的构造函数。接着,我们可以向`ExecutorCompletionService`提交任务...

    jvaa面试宝典

    - 并发工具类:Semaphore、CyclicBarrier、CountDownLatch、ExecutorCompletionService等。 - Future和Callable接口:理解异步计算,以及如何获取结果。 通过深入学习这些知识点,Java开发者可以更好地准备面试,...

    通过多线程任务处理大批量耗时业务并返回结果

    4. `CompletionService`:可能是`ExecutorCompletionService`,它结合了`ExecutorService`和`BlockingQueue`的功能。我们可以使用`CompletionService.take()`方法获取下一个已完成的任务的结果,而不必等待所有任务...

    ThreadDemo.rar

    在Java编程语言中,`java.util.concurrent`(JUC)包是处理并发和多线程的核心工具包。这个包提供了一系列高效、线程...在分析和调试`ThreadDemo`代码时,要注意线程同步、死锁、活锁等问题,确保程序的正确性和效率。

    JAVA并发编程实践-线程的关闭与取消-学习笔记

    10. **TrackingExecutor任务跟踪**:为了确保任务的正常结束,可以使用`ExecutorCompletionService`来跟踪任务的完成情况,并在必要时取消未完成的任务。 11. **处理异常的线程终止**:线程异常终止时,需要正确...

    CoreJava:核心Java

    14. **并发编程**:深入研究并发工具类(如CountDownLatch, CyclicBarrier, Semaphore, ExecutorCompletionService等),以及并发容器(如ConcurrentHashMap, CopyOnWriteArrayList等)。 15. **垃圾回收与内存管理...

    JAVA课程学习笔记.doc

    - `java.util.concurrent.CompletionService`:允许获取执行任务的结果,`ExecutorCompletionService` 是其具体实现,结合了 `ExecutorService` 和 `Future`。 5. 线程池执行原理 线程池的执行过程主要包括任务提交...

    浅谈Java多线程处理中Future的妙用(附源码)

    在实际开发中,我们常常会遇到需要并发处理某些任务的情况,这时如果使用传统的线程同步机制,会非常复杂和低效。比如在电子商务网站中,我们需要并发处理大量的订单,在这种情况下,如果使用传统的线程同步机制,会...

    ExecutorService与CompletionService对比详解.docx

    在示例中,创建了一个ExecutorCompletionService实例,它继承自CompletionService并且使用ExecutorService作为底层的执行器。提交任务的方式与ExecutorService类似,但获取结果时,不再直接从列表中获取Future,而是...

    多线程相关代码(新)

    包括阻塞队列、阻塞栈、ExecutorService、Future、ExecutorCompletionService、死锁、join、重入锁、读写锁、多线程抢票、信号量、signal/await、ThreadLocal等的实例。

    Java线程池学习资料-全

    `ThreadPoolExecutor`的`submit()`返回`Future`对象,而`ExecutorCompletionService`的`submit()`除了返回`Future`,还支持批量处理结果。 当线程池中的线程抛出异常时,如果使用`submit()`,异常会被捕获并封装在`...

    java并发框架源码-notes:记录各种学习笔记(Java、算法、框架、数据库、并发、源码...)

    `ConcurrentHashMap`是线程安全的哈希表,它的实现使用了分段锁技术,提高了并发性能。`BlockingQueue`接口和它的实现如`ArrayBlockingQueue`、`LinkedBlockingQueue`,在多线程环境中用于存储和传递任务,它们在...

    java多线程编程实践

    - **实现原理**:`ConcurrentHashMap`内部采用了分段锁技术,将整个容器分成若干段,每一段使用一个锁来控制并发访问。这样,在进行修改操作时,只需要锁定当前操作所在的一段,而不是整个容器,从而提高了并发性能...

    java.util.concurrent.uml.pdf

    ExecutorCompletionService类是其实现,它利用线程池执行任务,并帮助开发者获取已经完成的任务结果。 Runnable和Callable是两种任务类型。Runnable是任务的一个简单的执行对象,没有返回值。而Callable接口类似于...

    Java——JUC

    - `AtomicInteger`、`AtomicLong`等原子类提供了一种在不使用锁的情况下实现线程安全的方式,通过底层的CAS(Compare and Swap)操作保证了原子性,适用于简单的原子更新操作。 5. **Future和Callable接口** - `...

    springboot集成amazon aws s3对象存储sdk(javav2)

    以上就是如何在SpringBoot项目中集成AWS S3 SDK,实现基本的S3操作,包括分页查询、文件上传(包括分片上传和断点续传)、下载及批量删除。在实际应用中,可能还需要处理错误、设置权限、优化性能等方面的工作,确保...

    java多线程并发

    - **固定大小的线程池**:`newFixedThreadPool(int nThreads)`创建一个可重用的固定线程数量的线程池,这些线程按需创建,且可以重复使用。 ```java ExecutorService service = Executors.newFixedThreadPool(3);...

    java抢票系统源码-POS:邮政

    5. **异步编程**:Java 8引入了CompletableFuture和ExecutorCompletionService等工具,使得开发者能更高效地处理异步任务,提高系统性能。 6. **Web框架**:为了简化开发,项目可能使用Spring Boot或Struts等Web...

Global site tag (gtag.js) - Google Analytics