`

java线程池之CompletionService

阅读更多

 

Future的局限

 

在使用线程池批量并行执行任务时,有时需要获取任务的返回值,此时一般可以使用Future实现异步接收,关于Future的相关使用以及实现原理在前一篇文章中分析过(如需详细了解,请点击这里)。但在批量等待获取返回结果时,有些局限,下面先看一个真实的场景。

 

在对一个页面进行并行渲染时,一般的做法是把页面分成多个模块,每个模块作为一个任务单独提交到线程池中进行并行渲染。在主线程中,需要接受每个模块渲染的结果,对页面进行渲染。如果直接使用Future,这时需要遍历每个模块渲染任务对应的Future,调用其get方法阻塞获取渲染结果,此时会存在多次阻塞,遍历伪代码如下:

 

for(Future oneFuture:futureList){
   Object obj = oneFuture.get(); // oneFuture.get() 可能会存在多次阻塞
   //省略使用obj渲染渲染页面代码。
}
 

 

 

最理想的方式是futureList是一个排序后的列表,排前面的都是已经执行完成的,这时可以达到最高的并发效果(下一次循环时,有可能刚好下一个任务执行完成,主线程又可以继续执行自己的业务逻辑)。那要如何对这个列表排序呢?庆幸的是我们不必自己排序,java提供了现成的API: CompletionService,直接使用即可。

 

CompletionService的使用方法

 

CompletionService是一个接口类,目前其唯一实现类是ExecutorCompletionService。他的主要功能就是在批量提交的任务中,优先获取已经完成的任务(可以简单的理解为对任务执行完成的先后顺序进行排序)。具体使用方式如下:

 

public class CompletionServiceTest {
    public static void main(String[] args) throws Exception{
        //创建线程池
        Executor executor = Executors.newCachedThreadPool();
        Callable<String> temp = null;
        //创建任务列表
        Collection<Callable<String>> tasks = new ArrayList<>();
        for (int i=0;i<10;i++){
            temp = new PrintTask(i+"");
            tasks.add(temp);
        }
        solve(executor,tasks);
    }
 
    public static void solve(Executor e, Collection<Callable<String>> solvers)
            throws InterruptedException {
        //构建CompletionService
        CompletionService<String> ecs = new ExecutorCompletionService<String>(e);
        int n = solvers.size();
        List<Future<String>> futures = new ArrayList<Future<String>>(n);
        try {
            for (Callable<String> s : solvers)
                futures.add(ecs.submit(s));//提交多个任务
 
            //获取任务执行结果
            for (int i = 0; i < n; ++i) {
                try {
                    Future<String> future = ecs.take();//阻塞优先获取已经完成的任务Future
                    String r = future.get();//这步不会阻塞了
                    System.out.println(r);
                } catch (ExecutionException ignore) {}
            }
        }
        finally {
            for (Future<String> f : futures)
                //判断是否需要取消任务
                f.cancel(true);
        }
    }
}
 
class PrintTask implements Callable<String>{
 
    private String taskname;
 
    public PrintTask(String taskname) {
        this.taskname = taskname;
    }
 
    @Override
    public String call() throws Exception {
        System.out.println("任务"+taskname+":执行中");
        Thread.sleep(Integer.parseInt(taskname)*1000);
        return "任务"+taskname+":执行完成;";
    }
}
 

 

 

本示例中通过CompletionServicetake方法,每次获取到的都是最先完成任务对应的Future,由于任务已经完成,后面调用Futureget方法,就不会产生阻塞。这是如何实现的呢?下面开始对ExecutorCompletionService的实现原理进行分析。

 

ExecutorCompletionService的实现原理

 

ExecutorCompletionService在其内部维护了一个阻塞队列BlockingQueue,每当有任务执行完成后,都会放入这个队列。ExecutorCompletionServicetake方法本质上是调用的BlockingQueuetake方法,如果队列中没有完成的任务,就阻塞;如果队列中有多个完成的任务,由于BlockingQueue(默认是LinkedBlockingQueue)是FIFO队列,每次take取出的都是优先完成的任务。这就是对ExecutorCompletionService实现原理简述,下面来来具体的实现。

 

构造方法

ExecutorCompletionService的构造方法有两个,一个是使用默认的LinkedBlockingQueue作为完成任务的存放队列,另一是使用传入的BlockingQueue 参数作为完成任务的存放队列:

 
//默认使用LinkedBlockingQueue作为存放队列
public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
 
//使用自定义的BlockingQueue作为存放队列
public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
}
 

 

提交任务submit方法

ExecutorCompletionService中有两个版本的submit方法,一个是Callable类型的参数;另一个是Runnable类型的参数。两个方法的实现基本相同,只是由于Runnablerun方法没有返回值,本质上差异就是需要把Runnable对象使用适配器模式封装成Callable对象(这里的适配器为RunnableAdapter,关于更多适配器模式的理解可以点击这里)。这里只对参数为Callable类型的submit方法进行讲解:

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        //把Callable对象封装成FutureTask对象
        RunnableFuture<V> f = newTaskFor(task);
        //把FutureTask对象封装成QueueingFuture对象,并提交任务到线程池
        executor.execute(new QueueingFuture(f));
        return f;
}
 

 

可以看到submit方法,本质上上只是先把Callable对象封装成FutureTask对象,在封装成QueueingFuture对象,然后提交到线程池中。相比于线程池的submit方法,该方法只多了一步:把把FutureTask对象封装成QueueingFuture对象。再来看下内部类QueueingFuture的实现,非常简单:

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 

QueueingFuture基础自FutureTask,重新了父类的done()方法(该方法在FutureTask中是空的)。done()方法的重新也很简单,只是往阻塞队列中添加该任务。回想下这个done()方式是在什么时候调用的?FutureTaskrun方法在任务执行完成后会调用finishCompletion方法,finishCompletion方法末尾调用了done()方法。换句话说,让任务执行完成时会把自身放入ExecutorCompletionService维护的已完成阻塞队列:存在于这个队列中的任务都是已经完成的。

 

take方法

掉用ExecutorCompletionServicetake方法,本质上调用的是已完成阻塞队列take方法:

public Future<V> take() throws InterruptedException {
        return completionQueue.take();
}
 

 

另外ExecutorCompletionService还提供了非阻塞的poll方法,以及延时阻塞的poll方法,本质上也是直接调用阻塞队列的对应poll方法:

public Future<V> poll() {
        return completionQueue.poll();
    }
 
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
}

 

这两个方法,可以放到一个while循环中,当没有任务执行完成时,主线程可以做一些其他事情(或者sleep一会儿),防止线程阻塞,过一会儿再继续poll 以使并行执行效果最大化。

 

总结

 

CompletionService的主要作用就是优先返回线程池中已经执行完成的任务,尽量减少主线程的阻塞时间,是并行最大化。本质上通过维护一个已完成的阻塞队列实现。

 

 

 

0
0
分享到:
评论

相关推荐

    18.【线程池、Lambda表达式】_java线程_lambda线程池_meantbs3_

    Java线程池与Lambda表达式是Java开发中的两个重要概念,它们在提高程序效率和代码简洁性方面起着关键作用。 首先,让我们深入理解Java线程。在多核处理器时代,利用多线程能有效利用系统资源,提高程序并发执行的...

    JAVA课程学习笔记.doc

    Java线程池是Java并发编程中的重要组成部分,它在处理高并发场景时起着至关重要的作用。本篇学习笔记将深入解析Java线程池的框架、结构、原理以及相关源码,帮助读者全面理解线程池的工作机制。 1. 线程池模块结构 ...

    ExecutorService与CompletionService对比详解.docx

    ExecutorService 和 CompletionService 是Java并发处理中的两种工具,它们用于管理和执行多个任务。ExecutorService 是一个接口,它是java.util.concurrent.Executor 接口的扩展,提供了一组方法来管理和控制线程池...

    笔记-6、线程池1

    【正文】 线程池是Java并发编程中一个重要的...总之,理解并熟练运用线程池是Java程序员必须掌握的技能之一,它可以帮助我们构建高效、稳定的并发程序,处理大量的并发任务,同时确保系统的稳定性和资源的有效利用。

    线程池介绍

    线程池是Java多线程编程中一个非常重要的概念,它是一种有效的管理线程资源、提高系统性能的方式。线程池允许我们预先创建一定数量的线程,这些线程可以在需要时立即执行任务,而无需每次任务来临时都创建新的线程,...

    java.util.concurrent.uml.pdf

    描述中提到了“Java并发编程工具包java.util.concurrent的UML类结构图 PDF”,这强调了文件是一个图表,它可能包括并发包中的线程安全集合、同步器、线程池、执行器等核心组件的类图和接口图。 标签“Java ...

    Java并发编程实战

    6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8 示例:旅行预定门户网站 第7章 取消与关闭 第8章 线程池的使用 第9章 图形...

    java多线程异步性

    在Java中,通常通过Future和Callable接口实现异步计算,或者使用ExecutorService和CompletionService来管理和控制异步任务。 三、ExecutorService与ThreadPoolExecutor ExecutorService是Java并发框架中的核心接口...

    java多线程程序设计:Java NIO+多线程实现聊天室

    线程池ThreadPoolExecutor 阻塞队列BlockingQueue,生产者消费者模式 Selector Channel ByteBuffer ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子变量 内置锁 ...

    JAVA核心知识点整理、面试必备

    除了多线程,还包括线程池的使用(ExecutorService,ThreadPoolExecutor),并发工具类(如Future, Callable, CompletionService),以及并发设计模式的应用。 7. **Spring框架**: 了解依赖注入、AOP(面向切面...

    java并发(二十四)多线程结果组装

    总的来说,"java并发(二十四)多线程结果组装"这个主题涵盖了许多高级Java并发技术,如线程池、异步计算、结果合并以及异常处理。理解和掌握这些技术对于编写高效率、健壮的并发代码至关重要。通过实际项目中的实践...

    个人总结的深入java多线程开发

    看完《think in java》多线程章节,自己写的多线程文档,还结合了其他的相关网络资料。 线程 一. 线程池 1)为什么要使用线程池 2 2)一个具有线程池的工作队列 3 3)使用线程池的风险: 4 4)有效使用线程池的原则 5...

    Java 并发编程实战

    6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8 示例:旅行预定门户网站 第7章 取消与关闭 第8章 线程池的使用 第9章 图形...

    通过多线程任务处理大批量耗时业务并返回结果

    描述中提到的"当监测到线程池中存在空闲线程时则动态向线程池中添加新的任务",这是对Java并发工具类`ExecutorService`的一个应用。`ExecutorService`是Java并发框架`java.util.concurrent`包下的核心接口,它允许...

    JAVA并发编程实践-线程执行-学习笔记

    总的来说,Java并发编程实践中的任务执行是一个涉及线程调度、线程池管理、任务生命周期控制、结果处理等多个方面的复杂主题。理解和掌握这些概念和技术,能够帮助开发者编写出更加高效、可靠的并发程序。

    java项目史上最简单的多线程使用方法(demo)

    可以使用`Future`接口来获取每个子任务的结果,或者使用`CompletionService`来获取已完成的任务。 3. **线程间通信** 如果需要线程间共享数据或进行同步,Java提供了多种机制,如`synchronized`关键字、`wait()`, ...

    Java中Executor接口用法总结

    Java中的Executor接口是Java并发编程的核心组件之一,它位于java.util.concurrent包中,为执行异步任务提供了一种统一的框架。Executor接口定义了一个单一的方法`execute(Runnable command)`,用于提交一个Runnable...

    基于java开发的NIO+多线程实现聊天室+源码+项目解析(毕业设计&课程设计&项目开发)

    基于java开发的NIO+多线程实现聊天室+源码+项目解析,适合毕业设计、课程设计、项目开发。项目源码已经过严格测试,可以放心参考并在此基础上延申使用~ 项目简介: 涉及到的技术点 线程池ThreadPoolExecutor 阻塞...

    JAVA高质量并发详解,多线程并发深入讲解

    ### JAVA高质量并发详解知识点概述 #### 一、Java并发编程基础 - **基础知识:** - **线程基本概念:** Java线程是程序执行流的最小单元,一个线程包含一个程序计数器(PC)、虚拟机栈、本地方法栈、线程私有的工作...

Global site tag (gtag.js) - Google Analytics