`

JDK并发工具包CompletionService和ExecutorCompletionService的好处和使用场景

    博客分类:
  • java
 
阅读更多

《Java并发编程实践》一书6.3.5节CompletionService:Executor和BlockingQueue,有这样一段话:

  "如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时 将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务 CompletionService。"

这是什么意思呢?我们通过一个例子,分别使用繁琐的做法和 CompletionService来完成,清晰的对比能让我们更好的理解上面的一段话和 CompletionService这个API提供的初衷。考虑这样的场景,有5个Callable任务分别返回5个整数,然后我们在main方法中按照各个任务完成的先后顺序,在控制台打印返回结果。

package net.aty.completeservice;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class ReturnAfterSleepCallable implements Callable<Integer>
{
private int sleepSeconds;

private int returnValue;

public ReturnAfterSleepCallable(int sleepSeconds, int returnValue)
{
this.sleepSeconds = sleepSeconds;
this.returnValue = returnValue;
}

@Override
public Integer call() throws Exception
{
System.out.println("begin to execute.");

TimeUnit.SECONDS.sleep(sleepSeconds);

System.out.println("end to execute.");

return returnValue;
}
}

这个任务会接受2个参数,睡眠指定的时间后,返回指定的结果。睡眠时间越短,意味着任务越先执行完成。

1.繁琐的做法

通过一个List来保存每个任务返回的Future,然后轮询这些Future,直到每个Future都已完成。我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0。

package net.aty.completeservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TraditionalTest
{
public static void main(String[] args)
{
int taskSize = 5;

ExecutorService executor = Executors.newFixedThreadPool(taskSize);

List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();

for (int i = 1; i <= taskSize; i++)
{
int sleep = taskSize - i; // 睡眠时间

int value = i; // 返回结果

// 向线程池提交任务
Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value));

// 保留每个任务的Future
futureList.add(future);
}

// 轮询,获取完成任务的返回结果
while(taskSize > 0)
{
for (Future<Integer> future : futureList)
{
Integer result = null;

try
{
result = future.get(0, TimeUnit.SECONDS);
} catch (InterruptedException e)
{
e.printStackTrace();
} catch (ExecutionException e)
{
e.printStackTrace();
} catch (TimeoutException e)
{
// 超时异常需要忽略,因为我们设置了等待时间为0,只要任务没有完成,就会报该异常
}

// 任务已经完成
if(result != null)
{
System.out.println("result=" + result);

// 从future列表中删除已经完成的任务
futureList.remove(future);  
taskSize--;
//此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决) 
break; // 进行下一次while循环
}
}
}

// 所有任务已经完成,关闭线程池
System.out.println("all over.");
executor.shutdown();
}



}

可见轮询future列表非常的复杂,而且还有很多异常需要处理,TimeOutException异常需要忽略;还要通过双重循环和break,防止遍历集合的过程中,出现并发修改异常。这么多需要考虑的细节,程序员很容易犯错。

2.使用CompletionService

package net.aty.completeservice;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletionServiceTest
{
public static void main(String[] args)
{
int taskSize = 5;

ExecutorService executor = Executors.newFixedThreadPool(taskSize);

// 构建完成服务
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
executor);

for (int i = 1; i <= taskSize; i++)
{
int sleep = taskSize - i; // 睡眠时间

int value = i; // 返回结果

// 向线程池提交任务
completionService
.submit(new ReturnAfterSleepCallable(sleep, value));
}

// 按照完成顺序,打印结果
for (int i = 0; i < taskSize; i++)
{
try
{
System.out.println(completionService.take().get());
} catch (InterruptedException e)
{
e.printStackTrace();
} catch (ExecutionException e)
{
e.printStackTrace();
}
}

// 所有任务已经完成,关闭线程池
System.out.println("all over.");
executor.shutdown();
}
}

可见使用CompletionService不会有TimeOutExeception的问题,不用遍历future列表,不用担心并发修改异常。

3. Completion Service和ExecutorCompletionService的实现

   JDK源码中CompletionService的javadoc说明如下:

/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt>
 * completed tasks and process their results in the order they
 * complete. 
 */
也就是说, CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由 CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。


ExecutorCompletionService是CompletionService的实现,融合了线程池Executor和阻塞队列BlockingQueue的功能。

public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
到这里可以推测,按照任务的完成顺序获取结果,就是通过阻塞队列实现的,阻塞队列刚好具有这样的性质:阻塞和有序。

ExecutorCompletionService 任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
}

QueueingFuture FutureTask 的一个子类,通过改写 FutureTask 类的 done 方法,可以实现当任务完成时,将结果放入到 BlockingQueue 中。

/**
  * FutureTask extension to enqueue upon completion
  */
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

这里简单说明下:FutureTask.done(),这个方法默认什么都不做,就是一个回调,当提交的线程池中的任务完成时,会被自动调 用。这也就说时候,当任务完成的时候,会自动执行QueueingFuture.done()方法,将返回结果加入到阻塞队列中,加入的顺序就是任务完成 的先后顺序。

至此CompletionService和ExecutorCompletionService的讲解结束。特别感谢http://xw-z1985.iteye.com/blog/1997077这篇博客的作者 海浪儿 ,解决了我在看《java并发编程实践》中的困惑,本文也基本上是照抄那篇博文的。

分享到:
评论

相关推荐

    jdk8工具包

    总结起来,这个"jdk8-windowsx64工具包"是保存和使用Java 8开发环境的重要资源。通过它,开发者可以方便地在Windows 64位系统上搭建开发环境,并享受到Java 8带来的诸多新特性和性能提升。对于那些习惯于Java 8特性...

    jdk17工具包!!!!!

    Java JDK 17工具包是Java开发不可或缺的一部分,它包含了用于编写、编译、调试以及运行Java应用程序所需的所有工具。JDK(Java Development Kit)是Java程序员的核心开发环境,其重要性不言而喻。本资源提供的“jdk-...

    JDK21 开发工具包!!!

    **JDK21 开发工具包详解** JDK(Java Development Kit)是Oracle公司发布的用于开发和运行Java应用程序的重要工具集。JDK21作为Java的最新版本,它包含了Java编译器、Java运行时环境、Java调试工具以及其他必要的...

    JDK1.8安装工具包

    JDK1.8,全称Java Development Kit,是Oracle公司提供的用于开发和运行Java应用程序的重要工具集。在本话题中,我们将详细探讨JDK1.8.0_291版本的安装过程以及它包含的关键组件,这对于任何Java开发者来说都是至关...

    开发工具包jdk1.7.0

    **Java Development Kit (JDK) 1.7.0** JDK 1.7.0,也被称为Java SE 7(Java Standard Edition 7),是Oracle公司推出的一个...安装和配置JDK是开发Java应用的第一步,而掌握其中的工具和特性则是提升开发能力的关键。

    jdk1.6jar包

    9. **并发编程工具**:提供了一系列的并发编程工具,如`java.util.concurrent`包,包含线程池、并发容器和并发工具类,方便开发者构建高并发应用。 10. **JMX (Java Management Extensions)**:增强了对Java应用...

    JDK资源包合集(JDK6+JDK7+JDK8)

    Java开发工具包(Java Development Kit,简称JDK)是Java编程语言的核心组件,它提供了编译、调试和运行Java应用程序所需的所有工具。本资源包集合包括了JDK6、JDK7和JDK8三个重要版本,分别代表了Java发展历程中的...

    jdk11/jdk8开发工具包

    总的来说,JDK 8和JDK 11都是Java开发者不可或缺的工具,它们各自的特点和改进适应了不同场景的需求。无论你是初学者还是经验丰富的开发者,理解这两个版本的区别和特性,都将有助于你更好地利用Java进行软件开发。

    Java开发工具包

    Java开发工具包(Java Development Kit,简称JDK)是Java编程语言的核心组件,它为开发者提供了编译、调试和运行Java应用程序所需的所有工具。JDK1.8.0_66是Oracle公司发布的一个特定版本,它包含了Java运行时环境...

    JDK1.8 64 ,JDK是 Java 语言的软件开发工具包

    在标题中提到的"JDK1.8 64"指的是适用于64位操作系统的Java 8开发工具包。Java 8是Java历史上一个重要的版本,它引入了许多新特性,优化了性能,并对语言进行了现代化改造。 1. **JDK的主要组件:** JDK包含Java...

    JDK工具包(Mac)

    本篇文章将详细解析“JDK工具包(Mac)”,包括JDK 11的运行环境和API文档的安装与使用。 首先,让我们了解JDK 11。JDK 11是Java SE(标准版)平台的第15个长期支持版本,发布于2018年9月。它引入了许多新特性,如...

    JDK12解压版工具包

    **JDK12解压版工具包详解** Java Development Kit(JDK)是Java编程语言的核心组件,它包含了开发和运行Java程序所需的所有工具。JDK12是Oracle公司发布的一个重要版本,提供了许多新特性、增强功能以及性能优化,...

    JDK9,JDK历史版本,32位,for windows Java 开发工具包(SDK)

    JDK(Java Development Kit) 是 Java 语言的软件开发工具包(SDK)。  SE(J2SE),standard edition,标准版,是我们通常用的一个版本,从JDK 5.0开始,改名为Java SE。  EE(J2EE),enterprise edition,企业版,...

    windows快速切换jdk版本工具

    在这个场景下,它可能详细解释了如何配置和使用这个工具,包括设置JDK路径、执行切换命令以及可能出现的问题及解决方案。 在实际使用中,用户首先需要将所有JDK安装目录添加到系统路径,然后根据`readme.txt`的指示...

    jdk8源码包

    JDK 8彻底重写了日期和时间API,引入了java.time包,提供更加易用且功能强大的日期、时间、时区处理工具。 5. **Optional类**: Optional类是用来表示可能为null的对象引用,有助于防止NullPointerException。它...

    Linux JDK8 tar包,解压即可用

    Linux JDK8 tar包是Java开发工具包在Linux操作系统上的一个版本,主要针对Java开发者。这个压缩包文件名为“jdk-8u74-linux-x64.tar”,表明它包含的是Java Development Kit (JDK) 8的第74次更新,且是针对64位Linux...

    java卸载工具 jdk卸载工具 官方卸载工具 干净

    Java开发工具包(Java Development Kit,简称JDK)是Java编程语言的核心组件,包含了Java运行环境、编译器、调试器以及其他工具,是开发者进行Java程序开发的基础。然而,随着时间的推移或系统升级,有时我们需要...

    jdk包含sun包

    jdk源码(包含sun包)jdk源码(包含sun包)jdk源码(包含sun包)jdk源码(包含sun包)jdk源码(包含sun包)jdk源码(包含sun包)jdk源码(包含sun包)jdk源码(包含sun包)

    jdk1.5并发新特性.

    JDK 1.5 引入了一系列重要的并发编程新特性,极大地改善了多线程环境下的程序设计。这些特性使得开发者能够编写更加高效、安全的...通过熟练掌握这些工具,开发者可以更好地应对复杂的并发场景,构建健壮的并发应用。

    jdk8-32位工具包

    JDK,全称Java Development Kit,是Oracle公司提供的用于开发和运行Java应用程序的软件开发工具包。JDK8是Java语言的一个重要版本,它的发布在Java社区中引起了广泛的关注和使用。 【描述】中提到的"刚刚上传了jdk6...

Global site tag (gtag.js) - Google Analytics