Future的局限
在使用线程池批量并行执行任务时,有时需要获取任务的返回值,此时一般可以使用Future实现异步接收,关于Future的相关使用以及实现原理在前一篇文章中分析过(如需详细了解,请点击这里)。但在批量等待获取返回结果时,有些局限,下面先看一个真实的场景。
在对一个页面进行并行渲染时,一般的做法是把页面分成多个模块,每个模块作为一个任务单独提交到线程池中进行并行渲染。在主线程中,需要接受每个模块渲染的结果,对页面进行渲染。如果直接使用Future,这时需要遍历每个模块渲染任务对应的Future,调用其get方法阻塞获取渲染结果,此时会存在多次阻塞,遍历伪代码如下:
for(Future oneFuture:futureList){ Object obj = oneFuture.get(); // oneFuture.get() 可能会存在多次阻塞 //省略使用obj渲染渲染页面代码。 }
最理想的方式是futureList是一个排序后的列表,排前面的都是已经执行完成的,这时可以达到最高的并发效果(下一次循环时,有可能刚好下一个任务执行完成,主线程又可以继续执行自己的业务逻辑)。那要如何对这个列表排序呢?庆幸的是我们不必自己排序,java提供了现成的API: CompletionService,直接使用即可。
CompletionService的使用方法
CompletionService是一个接口类,目前其唯一实现类是ExecutorCompletionService。他的主要功能就是在批量提交的任务中,优先获取已经完成的任务(可以简单的理解为对任务执行完成的先后顺序进行排序)。具体使用方式如下:
public class CompletionServiceTest { public static void main(String[] args) throws Exception{ //创建线程池 Executor executor = Executors.newCachedThreadPool(); Callable<String> temp = null; //创建任务列表 Collection<Callable<String>> tasks = new ArrayList<>(); for (int i=0;i<10;i++){ temp = new PrintTask(i+""); tasks.add(temp); } solve(executor,tasks); } public static void solve(Executor e, Collection<Callable<String>> solvers) throws InterruptedException { //构建CompletionService CompletionService<String> ecs = new ExecutorCompletionService<String>(e); int n = solvers.size(); List<Future<String>> futures = new ArrayList<Future<String>>(n); try { for (Callable<String> s : solvers) futures.add(ecs.submit(s));//提交多个任务 //获取任务执行结果 for (int i = 0; i < n; ++i) { try { Future<String> future = ecs.take();//阻塞优先获取已经完成的任务Future String r = future.get();//这步不会阻塞了 System.out.println(r); } catch (ExecutionException ignore) {} } } finally { for (Future<String> f : futures) //判断是否需要取消任务 f.cancel(true); } } } class PrintTask implements Callable<String>{ private String taskname; public PrintTask(String taskname) { this.taskname = taskname; } @Override public String call() throws Exception { System.out.println("任务"+taskname+":执行中"); Thread.sleep(Integer.parseInt(taskname)*1000); return "任务"+taskname+":执行完成;"; } }
本示例中通过CompletionService的take方法,每次获取到的都是最先完成任务对应的Future,由于任务已经完成,后面调用Future的get方法,就不会产生阻塞。这是如何实现的呢?下面开始对ExecutorCompletionService的实现原理进行分析。
ExecutorCompletionService的实现原理
ExecutorCompletionService在其内部维护了一个阻塞队列BlockingQueue,每当有任务执行完成后,都会放入这个队列。ExecutorCompletionService的take方法本质上是调用的BlockingQueue的take方法,如果队列中没有完成的任务,就阻塞;如果队列中有多个完成的任务,由于BlockingQueue(默认是LinkedBlockingQueue)是FIFO队列,每次take取出的都是优先完成的任务。这就是对ExecutorCompletionService实现原理简述,下面来来具体的实现。
构造方法
ExecutorCompletionService的构造方法有两个,一个是使用默认的LinkedBlockingQueue作为完成任务的存放队列,另一是使用传入的BlockingQueue 参数作为完成任务的存放队列:
//默认使用LinkedBlockingQueue作为存放队列 public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } //使用自定义的BlockingQueue作为存放队列 public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
提交任务submit方法
ExecutorCompletionService中有两个版本的submit方法,一个是Callable类型的参数;另一个是Runnable类型的参数。两个方法的实现基本相同,只是由于Runnable的run方法没有返回值,本质上差异就是需要把Runnable对象使用适配器模式封装成Callable对象(这里的适配器为RunnableAdapter,关于更多适配器模式的理解可以点击这里)。这里只对参数为Callable类型的submit方法进行讲解:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); //把Callable对象封装成FutureTask对象 RunnableFuture<V> f = newTaskFor(task); //把FutureTask对象封装成QueueingFuture对象,并提交任务到线程池 executor.execute(new QueueingFuture(f)); return f; }
可以看到submit方法,本质上上只是先把Callable对象封装成FutureTask对象,在封装成QueueingFuture对象,然后提交到线程池中。相比于线程池的submit方法,该方法只多了一步:把把FutureTask对象封装成QueueingFuture对象。再来看下内部类QueueingFuture的实现,非常简单:
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
QueueingFuture基础自FutureTask,重新了父类的done()方法(该方法在FutureTask中是空的)。done()方法的重新也很简单,只是往阻塞队列中添加该任务。回想下这个done()方式是在什么时候调用的?FutureTask的run方法在任务执行完成后会调用finishCompletion方法,finishCompletion方法末尾调用了done()方法。换句话说,让任务执行完成时会把自身放入ExecutorCompletionService维护的“已完成阻塞队列”:存在于这个队列中的任务都是已经完成的。
take方法
掉用ExecutorCompletionService的take方法,本质上调用的是“已完成阻塞队列”的take方法:
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
另外ExecutorCompletionService还提供了非阻塞的poll方法,以及延时阻塞的poll方法,本质上也是直接调用阻塞队列的对应poll方法:
public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
这两个方法,可以放到一个while循环中,当没有任务执行完成时,主线程可以做一些其他事情(或者sleep一会儿),防止线程阻塞,过一会儿再继续poll 以使并行执行效果最大化。
总结
CompletionService的主要作用就是优先返回线程池中已经执行完成的任务,尽量减少主线程的阻塞时间,是并行最大化。本质上通过维护一个“已完成的阻塞队列”实现。
相关推荐
Java线程池与Lambda表达式是Java开发中的两个重要概念,它们在提高程序效率和代码简洁性方面起着关键作用。 首先,让我们深入理解Java线程。在多核处理器时代,利用多线程能有效利用系统资源,提高程序并发执行的...
Java线程池是Java并发编程中的重要组成部分,它在处理高并发场景时起着至关重要的作用。本篇学习笔记将深入解析Java线程池的框架、结构、原理以及相关源码,帮助读者全面理解线程池的工作机制。 1. 线程池模块结构 ...
ExecutorService 和 CompletionService 是Java并发处理中的两种工具,它们用于管理和执行多个任务。ExecutorService 是一个接口,它是java.util.concurrent.Executor 接口的扩展,提供了一组方法来管理和控制线程池...
【正文】 线程池是Java并发编程中一个重要的...总之,理解并熟练运用线程池是Java程序员必须掌握的技能之一,它可以帮助我们构建高效、稳定的并发程序,处理大量的并发任务,同时确保系统的稳定性和资源的有效利用。
线程池是Java多线程编程中一个非常重要的概念,它是一种有效的管理线程资源、提高系统性能的方式。线程池允许我们预先创建一定数量的线程,这些线程可以在需要时立即执行任务,而无需每次任务来临时都创建新的线程,...
描述中提到了“Java并发编程工具包java.util.concurrent的UML类结构图 PDF”,这强调了文件是一个图表,它可能包括并发包中的线程安全集合、同步器、线程池、执行器等核心组件的类图和接口图。 标签“Java ...
6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8 示例:旅行预定门户网站 第7章 取消与关闭 第8章 线程池的使用 第9章 图形...
在Java中,通常通过Future和Callable接口实现异步计算,或者使用ExecutorService和CompletionService来管理和控制异步任务。 三、ExecutorService与ThreadPoolExecutor ExecutorService是Java并发框架中的核心接口...
线程池ThreadPoolExecutor 阻塞队列BlockingQueue,生产者消费者模式 Selector Channel ByteBuffer ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子变量 内置锁 ...
除了多线程,还包括线程池的使用(ExecutorService,ThreadPoolExecutor),并发工具类(如Future, Callable, CompletionService),以及并发设计模式的应用。 7. **Spring框架**: 了解依赖注入、AOP(面向切面...
总的来说,"java并发(二十四)多线程结果组装"这个主题涵盖了许多高级Java并发技术,如线程池、异步计算、结果合并以及异常处理。理解和掌握这些技术对于编写高效率、健壮的并发代码至关重要。通过实际项目中的实践...
看完《think in java》多线程章节,自己写的多线程文档,还结合了其他的相关网络资料。 线程 一. 线程池 1)为什么要使用线程池 2 2)一个具有线程池的工作队列 3 3)使用线程池的风险: 4 4)有效使用线程池的原则 5...
6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8 示例:旅行预定门户网站 第7章 取消与关闭 第8章 线程池的使用 第9章 图形...
描述中提到的"当监测到线程池中存在空闲线程时则动态向线程池中添加新的任务",这是对Java并发工具类`ExecutorService`的一个应用。`ExecutorService`是Java并发框架`java.util.concurrent`包下的核心接口,它允许...
总的来说,Java并发编程实践中的任务执行是一个涉及线程调度、线程池管理、任务生命周期控制、结果处理等多个方面的复杂主题。理解和掌握这些概念和技术,能够帮助开发者编写出更加高效、可靠的并发程序。
可以使用`Future`接口来获取每个子任务的结果,或者使用`CompletionService`来获取已完成的任务。 3. **线程间通信** 如果需要线程间共享数据或进行同步,Java提供了多种机制,如`synchronized`关键字、`wait()`, ...
Java中的Executor接口是Java并发编程的核心组件之一,它位于java.util.concurrent包中,为执行异步任务提供了一种统一的框架。Executor接口定义了一个单一的方法`execute(Runnable command)`,用于提交一个Runnable...
基于java开发的NIO+多线程实现聊天室+源码+项目解析,适合毕业设计、课程设计、项目开发。项目源码已经过严格测试,可以放心参考并在此基础上延申使用~ 项目简介: 涉及到的技术点 线程池ThreadPoolExecutor 阻塞...
### JAVA高质量并发详解知识点概述 #### 一、Java并发编程基础 - **基础知识:** - **线程基本概念:** Java线程是程序执行流的最小单元,一个线程包含一个程序计数器(PC)、虚拟机栈、本地方法栈、线程私有的工作...