`
bugyun
  • 浏览: 559135 次
社区版块
存档分类
最新评论

CompletionService、Executor、BlockingQueue 比较例子

 
阅读更多

 

转:http://www.cnblogs.com/nayitian/p/3273468.html

 

在Java5的多线程中,可以使用Callable接口来实现具有返回值的线程。使用线程池的submit方法提交Callable任务,利用submit方法返回的Future存根,调用此存根的get方法来获取整个线程池中所有任务的运行结果。

方法一:如果是自己写代码,应该是自己维护一个Collection保存submit方法返回的Future存根,然后在主线程中遍历这个Collection并调用Future存根的get()方法取到线程的返回值。

方法二:使用CompletionService类,它整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take方法获取线程的返回值。

2. 实现代码

复制代码
package com.clzhang.sample.thread;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPoolTest4 {
    // 具有返回值的测试线程
    class MyThread implements Callable<String> {
        private String name;
        public MyThread(String name) {
            this.name = name;
        }

        @Override
        public String call() {
            int sleepTime = new Random().nextInt(1000);
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 返回给调用者的值
            String str = name + " sleep time:" + sleepTime;
            System.out.println(name + " finished...");

            return str;
        }
    }

    private final int POOL_SIZE = 5;
    private final int TOTAL_TASK = 20;

    // 方法一,自己写集合来实现获取线程池中任务的返回结果
    public void testByQueue() throws Exception {
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        BlockingQueue<Future<String>> queue = new LinkedBlockingQueue<Future<String>>();

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            Future<String> future = pool.submit(new MyThread("Thread" + i));
            queue.add(future);
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            System.out.println("method1:" + queue.take().get());
        }

        // 关闭线程池
        pool.shutdown();
    }

    // 方法二,通过CompletionService来实现获取线程池中任务的返回结果
    public void testByCompetion() throws Exception {
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        CompletionService<String> cService = new ExecutorCompletionService<String>(pool);

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            cService.submit(new MyThread("Thread" + i));
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            Future<String> future = cService.take();
            System.out.println("method2:" + future.get());
        }

        // 关闭线程池
        pool.shutdown();
    }

    public static void main(String[] args) throws Exception {
        ThreadPoolTest4 t = new ThreadPoolTest4();
        t.testByQueue();
        t.testByCompetion();
    }
}
复制代码

部分输出:

...

Thread4 finished...
method1:Thread4 sleep time:833
method1:Thread5 sleep time:158
Thread6 finished...
method1:Thread6 sleep time:826
method1:Thread7 sleep time:185
Thread9 finished...
Thread8 finished...
method1:Thread8 sleep time:929
method1:Thread9 sleep time:575

...

Thread11 finished...
method2:Thread11 sleep time:952
Thread18 finished...
method2:Thread18 sleep time:793
Thread19 finished...
method2:Thread19 sleep time:763
Thread16 finished...
method2:Thread16 sleep time:990

...

3. 总结

使用方法一,自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。

使用方法二,使用CompletionService来维护处理线程不的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。

分享到:
评论

相关推荐

    BlockingQueue队列自定义超时时间取消线程池任务

    在这个例子中,我们将任务放入`BlockingQueue`,然后在`submitTask`方法中等待任务完成或超时。超时后,我们尝试取消任务。这提供了一种灵活的方式来管理和控制异步任务,使其具备超时取消的能力。 总的来说,通过...

    Executor框架使用详解

    `CompletionService`接口扩展了`ExecutorService`,它提供了获取已完成任务的便捷方式,特别适用于处理大量并发任务时,需要按完成顺序处理结果的场景。 `Callable`接口类似于`Runnable`,但它可以返回一个结果。`...

    Executor,Executors,ExecutorService比较.docx

    在这个例子中,`ExecutorService`被用来提交一个计算任务,`submit()`返回的`Future`对象用于获取任务的执行结果。 总结来说,`Executor`定义了任务执行的基本行为,`Executors`是创建线程池的工厂,而`...

    重新编译的Container-executor 目录文件路径/etc/container-executor.cfg

    hadoop自带的Container-executor在配置yarn-kerberos时存在问题,这边给出编译后的Container-executor,默认加载配置文件路径/etc/container-executor.cfg,大家不用再重新编译了

    spark:Executor分配详解

    ### Spark:Executor分配详解 #### 一、Spark的资源分配机制概述 在深入探讨Executor的分配之前,我们先从整体上理解一下Spark集群的工作原理。Spark集群架构主要包括以下几个核心组件:SparkMaster(简称Master)...

    ExecutorService与CompletionService对比详解.docx

    ExecutorService 是一个接口,它是java.util.concurrent.Executor 接口的扩展,提供了一组方法来管理和控制线程池,包括提交任务、关闭线程池以及获取任务执行结果。而CompletionService则是一个接口,它提供了一种...

    Android Executor线程池

    3. 工作队列(BlockingQueue&lt;Runnable&gt; workQueue):存储待执行任务的队列,限制同时执行的任务数量。 4. 空闲线程存活时间(long keepAliveTime):当线程池中的线程数量超过核心线程数时,空闲线程等待新任务的...

    重新编译好的contain-executor文件,指向/etc/hadoop/container-executor.cfg

    hadoop自带的Container-executor在配置yarn-kerberos时存在问题,以及在配置cgroup时需要把container-executor.cfg的上级目录拥有者均改为root,带来不便。 所以需要重新编译Container-executor,这边提供重新编译好...

    Executor 2.1 Pre-Release 11

    Executor 2.1 Pre-Release 11是一款专为非苹果电脑设计的Macintosh OS模拟程序,它允许用户在非Macintosh系统上运行基于Classic Mac OS的应用程序。这个软件版本是一个预发布版,意味着它是公开发布前的最后一个测试...

    Executor

    Executor是一款实用软件,专为提升Windows操作系统的使用体验而设计。这款工具的目的是优化和个性化用户的日常计算体验,使得各种任务执行更为流畅、快捷。Executor的主要功能集中在快速启动、自定义快捷方式以及对...

    Executor快速打开应用

    Executor是一款高效且便捷的应用启动工具,它允许用户通过简单的配置,快速地打开电脑中的任何应用程序,从而极大地提升了桌面管理的效率。在日常工作中,我们常常会安装很多软件,导致桌面图标繁多,查找起来十分...

    Executor-超好用的快捷启动软件

    Executor是一款高效的快捷启动软件,它的主要功能是帮助用户快速启动应用程序、文件或系统命令,极大地提高了用户的工作效率。在IT领域,这样的工具被广泛使用,尤其是对于经常需要频繁切换和执行各种任务的用户来说...

    mybatis中的sqlsession--executor实现.zip

    而`Executor`则是`SqlSession`内部的一个执行器接口,是MyBatis实现数据库操作的核心部分。本文将深入探讨`SqlSession`和`Executor`的实现机制及其在MyBatis中的作用。 `SqlSession`是MyBatis的对外接口,它提供了...

    Python库 | Flask_Executor-0.3.1-py3-none-any.whl

    在上面的例子中,`time_consuming_function` 将在后台执行,而 `long_task` 视图函数会立即返回,允许服务器处理其他请求。 需要注意的是,虽然 **Flask_Executor** 提供了异步任务的能力,但它并不意味着你的整个...

    31i C语言.rar_*fanuc* exe*_C语言_c language executor_fanuc_milkm3l

    FANUC C Language Executor

    executor汉化版

    "Executor汉化版"作为其中的佼佼者,不仅以其炫酷的界面吸引用户,还以其实用的功能满足不同用户的需求。本文将对"Executor汉化版"这一快速启动工具的功能、特点以及使用方式进行详细解读。 首先,"Executor汉化版...

    executor ------

    高效率 快捷操作

    Java基础篇:Executor框架.pdf

    该文档详细记录了Executor框架结构、使用示意图、ThreadPoolExecutor使用示例、线程池原理分析、几种常见线程池(FixedThreadPool、SingleThreadExecutor、CachedThreadPool)的详解以及线程池大小确定等内容

    xxl-job-executor-go-master

    XXL-JOB是一个分布式任务调度平台,而"xxl-job-executor-go-master"则表示这个项目是XXL-JOB的Go语言实现版本的源码仓库。这个仓库是针对XXL-JOB执行器的一个Go语言实现,它使得开发者可以利用Go语言来编写调度任务...

Global site tag (gtag.js) - Google Analytics