我们现在来看看CompletionService :
package ExecutorDemos;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorBasicDemo {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
//创建ExecutorCompletionService实例
ExecutorService service=Executors.newFixedThreadPool(20);
CompletionService<Object> serv = new ExecutorCompletionService<Object>(service);
//创建50个工作
for (int index = 0; index < 50; index++) {
final int Position = index;
Callable<Object> dosomethings = new Callable<Object>() {
public Object call() throws Exception {
long value=(long) (Math.random() * 10000);
System.out.println("Current sleep time is:"+value+", Position is "+Position);
Thread.sleep(value);
return value;
}
};
serv.submit(dosomethings);
}
Thread.sleep(1000);
for (int index = 0; index < 50; index++) {
//按顺序取出来
Future<Object> task = serv.take();
Object value = task.get();
System.out.println(value);
}
service.shutdown();
}
}
执行的结果是:
你发现结果是按顺序取出结果的,而且可以看出它与Future有关系。在上一篇中,我们看过一个例子:
package ExecutorDemos;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class FutureTaskTest {
private static AtomicInteger Count = new AtomicInteger(0);
@SuppressWarnings("unchecked")
public static void main(String args[]) {
ExecutorService es = Executors.newFixedThreadPool(10);
List<Future<Integer>> tasks = new ArrayList<Future<Integer>>();
for (int i = 0; i <= 10; i++) {
final int iCount=i;
FutureTask<Integer> futureTask = new FutureTask<Integer>(
new Callable() {
@SuppressWarnings("static-access")
public Integer call() throws Exception {
Thread.currentThread().sleep(
(new Random()).nextInt(1000));
System.out.println("Postion is "+iCount);
return Count.getAndIncrement();
}
});
tasks.add(futureTask);
es.submit(futureTask);
}
Integer result = new Integer(0);
try {
for (Future<Integer> task : tasks) {
result += (Integer) task.get();
}
es.shutdown();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
其执行结果是:
从结果看,取数据的时候并不能保证结果数据按照其原始顺序提取!我们对比一下上面这两段代码,发现貌似只是按顺序提取数据的那个代码多了个ExecutorCompletionService,其他部分基本相同,这样我们可以解释ExecutorCompletionService它的作用了,那就是
异步的执行任务,并按原始任务的顺序取得结果! 从ExecutorCompletionService源代码来看:
其构造函数中
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
实例化了一个堵塞队列,那么用这个干什么用哪?接下来看看submit的实现吧:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
呵呵,上面的submit实现解释Runnable和Callable是如何转化成Future的,另外还使用了Executor.execute(new QueueingFuture(f))! 而QueueingFuture是ExecutorCompletionService的一个私有内部类,我们看看它的
实现:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
//completionQueue是使用的外部类的属性,即ExecutorCompletionService中的属性
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
Executor.execute方法会自动调用done方法并将Future加入到堵塞队列中,知道了吧?就是在这里把外部的Runnable或者Callable转化过来的Future和堵塞队列建立的联系(加入队列)completionQueue的类型是LinkedBlockingQueue, LinkedBlockingQueue表明它是一个Queue,这就意味着它的元素是按先进先出(FIFO)的次序进行存储的。以特定次序插入的元素会以相同的次序被取出--但根据插入保证,任何从空队列中取出元素的尝试都会堵塞调用线程直到该元素可被取出时为止。同样地,任何向一个已满队列中插入元素的尝试将会堵塞调用线程直到该队列的存储空间有空余时为止。
对于take()方法的实现:
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
可以明确看到take()时,是从堵塞队列中take(),这可以解释为什么ExecutorCompletionService可以保证结果安装原始顺序输出的了!对于堵塞队列,我将在后面的文章中讨论到.我们在了解其基本原理的基础上,才能很好的使用它,不是吗?
综上,如果您的应用对结果数据的顺序没有要求,你就可以使用FutureTask和ExecutorService! 如果有要求,最好使用CompletionService!
分享到:
相关推荐
Vue 3.0+Vite 2.0+Vue-Router4.0+Element-Plus+Echarts 5.0+Axios 开发的后台管理系统 Vue 3.0+Vite 2.0+Vue-Router4.0+Element-Plus+Echarts 5.0+Axios 开发的后台管理系统 Vue 3.0+Vite 2.0+Vue-Router4.0+...
安装完成后,可以通过该工具连接并管理本地或远程的Redis服务。Redis服务器的启动和配置通常涉及到修改配置文件redis.windows.conf,根据实际需求设置端口、密码保护、持久化策略等参数。 总之,Redis 5.0.14和...
本篇文章将深入探讨"sonar-scanner-cli-5.0.1.3006-linux"的特性、安装与使用方法,以及在Linux环境下如何高效地集成到开发流程中。 首先,SonarScanner 5.0.1.3006是该客户端的一个特定版本,它带来了诸多增强功能...
dotnet-sdk-5.0.408-win-x64
本文将详细介绍dotnet-sdk-5.0.400-linux-x64及其包含的组件,探讨其在跨平台开发中的重要性。 首先,"dotnet-sdk-5.0.400-linux-x64.tar.gz"是一个针对Linux x64架构的.NET SDK压缩包。这个版本5.0.400代表了.NET ...
dotnet-sdk-5.0.202-win-x64.exe
SSH整合是指将Spring、Hibernate和Struts这三大Java开源框架集成在一起,用于构建高效、灵活的企业级Web应用程序。这三个框架分别负责不同的职责:Spring作为应用的基石,提供依赖注入(DI)和面向切面编程(AOP),...
在协议层,PCIe 5.0继续采用TLP(事务层包)和DLLP(数据链接层包)的设计,但对这些包的处理进行了优化,以减少延迟并提高带宽利用率。同时,它还增强了对流控制、错误处理和多路复用的支持。 总的来说,"NCB-PCI_...
这个是安装说明文件,里面要用的安装文件,除了MYSQL 5。0太大,不能上传,其他的都上传了!!安装文件免费共享了,想快速安装成功的兄弟,请下载这个说明文件,我要收3分哦!我也想去下别人的好东西!...
Centos7 el7.x86_64 官方离线 RPM 安装包,安装指令为 sudo rpm -ivh llvm5.0-devel-5.0.1-7.el8.x86_64.rpm
ambari-2.7.5 编译过程中四个大包下载很慢,所以需要提前下载,包含:hbase-2.0.2.3.1.4.0-315-bin.tar.gz ,hadoop-3.1.1.3.1.4.0-315.tar.gz , grafana-6.4.2.linux-amd64.tar.gz ,phoenix-5.0.0.3.1.4.0-315....
winrar 5.0 + key
根据数据库服务器的实际版本,选择5.0+或8.0+的jar包,并正确引入到项目中,可以确保Java应用能够顺利与MySQL进行数据交互。同时,理解JDBC的基本使用方法和最佳实践,有助于提高代码的稳定性和效率。
CentOS-5.0-i386-bin-DVD.part06.rar
【标题】"thinkphp5.0+phpmailer-demo"是一个基于ThinkPHP5.0框架结合PHPMailer库的示例项目,旨在演示如何在ThinkPHP...同时,这也是一个很好的学习实例,可以帮助初学者快速上手并解决实际工作中遇到的邮件发送问题。
Understand-5.0.930-Windows-64bit.exe windows Understand-5.0.930-Windows-64bit.exe
【标题】"达内JAVA TTS5.0 PDF"涵盖了Oracle编程的相关知识,这通常意味着这份资料是关于Java编程与Oracle数据库的结合使用。Oracle编程主要指的是如何利用Java语言进行Oracle数据库的交互,包括数据的增删查改、...
Gradle是一个好用的构建工具 使用它的原因是 1、配置相关依赖代码量少,不会像maven一样xml过多 2、打包编译测试发布都有,而且使用起来方便 3、利用自定义的任务可以完成自己想要的功能
在本文中,我们将深入探讨Gradle 5.0 Milestone 1的主要特性和改进,以及它如何影响开发者的工作流程。 1. **性能提升** - **更快的初始化**:Gradle 5.0引入了更快的项目初始化速度,通过缓存项目元数据来减少...
llvm5.0-libs-5.0.1-7.el7.x86-64.rpm