`
langgufu
  • 浏览: 2306967 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

使用CompletionService批处理任务(线程池阻塞线程)

阅读更多

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。

 

[java] view plaincopy
  1. import java.util.Random;  
  2. import java.util.concurrent.BlockingQueue;  
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.CompletionService;  
  5. import java.util.concurrent.ExecutionException;  
  6. import java.util.concurrent.ExecutorCompletionService;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.LinkedBlockingQueue;  
  11.   
  12. public class Test17 {  
  13.     public static void main(String[] args) throws Exception {  
  14.         Test17 t = new Test17();  
  15.         t.count1();  
  16.         t.count2();  
  17.     }  
  18. //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  
  19.     public void count1() throws Exception{  
  20.         ExecutorService exec = Executors.newCachedThreadPool();  
  21.         BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  
  22.         for(int i=0; i<10; i++){  
  23.             Future<Integer> future =exec.submit(getTask());  
  24.             queue.add(future);  
  25.         }  
  26.         int sum = 0;  
  27.         int queueSize = queue.size();  
  28.         for(int i=0; i<queueSize; i++){  
  29.             sum += queue.take().get();  
  30.         }  
  31.         System.out.println("总数为:"+sum);  
  32.         exec.shutdown();  
  33.     }  
  34. //使用CompletionService(完成服务)保持Executor处理的结果  
  35.     public void count2() throws InterruptedException, ExecutionException{  
  36.         ExecutorService exec = Executors.newCachedThreadPool();  
  37.         CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  
  38.         for(int i=0; i<10; i++){  
  39.             execcomp.submit(getTask());  
  40.         }  
  41.         int sum = 0;  
  42.         for(int i=0; i<10; i++){  
  43. //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  
  44.             Future<Integer> future = execcomp.take();  
  45.             sum += future.get();  
  46.         }  
  47.         System.out.println("总数为:"+sum);  
  48.         exec.shutdown();  
  49.     }  
  50.     //得到一个任务  
  51.     public Callable<Integer> getTask(){  
  52.         final Random rand = new Random();  
  53.         Callable<Integer> task = new Callable<Integer>(){  
  54.             @Override  
  55.             public Integer call() throws Exception {  
  56.                 int i = rand.nextInt(10);  
  57.                 int j = rand.nextInt(10);  
  58.                 int sum = i*j;  
  59.                 System.out.print(sum+"\t");  
  60.                 return sum;  
  61.             }  
  62.         };  
  63.         return task;  
  64.     }  
  65.     /** 
  66.      * 执行结果: 
  67.         6   6   14  40  40  0   4   7   0   0   总数为:106 
  68.         12  6   12  54  81  18  14  35  45  35  总数为:312 
  69.      */  
  70. }  

ExecutorCompletionService统一了ExecutorService和BlockingQueue,既有线程池功能,能提交任务,又有阻塞队列功能,能判断所有线程的执行结果。

分享到:
评论

相关推荐

    C#线程池 所有线程运行完毕

    在等待线程池所有任务完成时,可能会涉及到线程同步,如使用Monitor、Mutex或Semaphore等同步原语。 5. **线程池的优缺点**:线程池的优点包括高效、资源管理优化以及自动调整线程数量。缺点是不保证任务的执行顺序...

    阻塞线程池 阻塞线程池 阻塞线程池

    阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池...

    Django异步任务线程池实现原理

    主线程在接收到耗时任务请求后,不会直接处理这个任务,而是将它提交给线程池中的其他线程去执行,主线程则立即返回一个响应给客户端。这样用户就感觉不到任务处理的延迟,提升了用户体验。 2. 线程池...

    线程池管理多线程上传

    - **线程池大小设置**:应根据系统资源和任务特性合理设置,过大可能导致资源浪费,过小可能造成阻塞。 - **拒绝策略选择**:根据业务需求选择合适的策略,防止系统崩溃或丢失数据。 - **监控与调整**:实时监控...

    C#Winform异步多线程和线程池集成的用法

    本文将深入探讨如何在Winform应用中使用异步多线程和线程池。 一、线程基础 线程是操作系统分配CPU时间的基本单元,每个进程至少包含一个线程。在C#中,可以使用`System.Threading.Thread`类来创建和管理线程。通过...

    并发-线程池和阻塞队列.pdf

    还有各种批处理任务、定时任务等,线程池都是一种非常适用的并发控制手段。 在创建线程池时需要注意一些事项。首先,要根据任务类型和特点选择合适的线程池大小。线程池的线程数过多或过少都不利于性能优化。如果...

    异步+线程池+线程+加载网络图片

    在IT行业中,异步编程、线程池和线程管理是优化应用程序性能和响应速度的关键技术,尤其是在处理网络请求如加载图片这样的IO密集型任务时。让我们深入探讨这些概念及其在实际应用中的重要性。 首先,异步编程是一种...

    多线程的使用-一个线程池的Demo

    线程池是一种线程管理机制,它预先创建了一组线程,当需要执行任务时,可以从池中获取空闲线程,而不是每次都创建新的线程,这样可以减少线程创建和销毁的开销。本文将详细讲解线程池的使用,特别是结合UI界面和多...

    JAVA使用线程池查询大批量数据

    除了`ThreadPoolExecutor`,Java还提供了`Executors`工具类,它提供了一些预设的线程池配置,如`newFixedThreadPool`(固定大小线程池)、`newSingleThreadExecutor`(单线程线程池)等,方便开发者快速创建线程池。...

    史上最强多线程面试44题和答案:线程锁+线程池+线程同步等

    - **便于建模**:多线程使得程序设计更加灵活,可以将复杂的问题分解成若干个小任务,每个任务可以由不同的线程来处理。这种方式有助于构建清晰、模块化的软件架构。 #### 3. 创建线程的几种方式 - **继承`Thread`...

    iocp编程 线程池 多线程并发

    传统的同步I/O模型在等待I/O操作完成时会阻塞一个线程,而IOCP则是非阻塞的,它可以将I/O完成的通知发送到一个或多个线程池线程,让这些线程处理完成后的数据,极大地提高了并发处理能力。 - **创建IOCP**: 使用`...

    C#判断线程池中所有的线程是否已经完成

    在这里,我们使用一个`AutoResetEvent`实例作为等待对象,其默认是未设置状态,即线程会立即被阻塞,直到调用`Set`方法。 ```csharp AutoResetEvent autoResetEvent = new AutoResetEvent(false); ...

    C++线程池 多线程 SOCEKT服务器框架

    当有新的任务到来时,线程池会从待用线程中挑选一个执行任务,而不是每次都新建线程。这种方式提高了系统的稳定性和效率。 接下来,我们探讨一下SOCKET,它是网络编程中的一个重要概念。SOCKET是操作系统提供的接口...

    Java中的线程与线程池.pptx

    2. newSingleThreadExecutor:创建一个单线程线程池,所有任务都在一个线程中顺序执行,保证了任务的执行顺序。 3. newFixedThreadPool:创建一个固定大小的线程池,线程数量保持不变,任务排队等待执行。 4. ...

    C# 多线程 线程池 线程同步

    线程池管理一组预创建的线程,当有任务需要执行时,它会从池中分配线程而不是每次都创建新的。这降低了线程创建和销毁的开销,提高了效率。通过`ThreadPool.QueueUserWorkItem`方法可以将工作项加入线程池,由线程池...

    Python 使用threading+Queue实现线程池示例

    在线程池缓存线程可用已有的闲置线程来执行新任务,避免了创建/销毁带来的系统开销。 1.2 线程并发数量过多,抢占系统资源从而导致阻塞。 线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而...

    Android中的线程池与任务队列

    `AsyncTask`内部就使用了线程池来管理后台任务,它的执行流程包括了`onPreExecute()`(在主线程运行)、`doInBackground()`(在工作线程运行)和`onPostExecute()`(回到主线程)等步骤。 为了优化线程池的性能,...

    多线程编程线程池

    由于 ASP.NET 使用相同的线程池来处理 HTTP 请求,如果线程池中的所有工作线程都被长时间运行的任务占用,那么将无法处理新的 HTTP 请求,从而导致 Web 服务器响应变慢甚至无法响应。 #### 六、取消线程池中的操作 ...

    非阻塞线程池框架,管理线程,管理连接

    非阻塞线程池框架是一种高效的任务执行机制,它的核心理念是通过避免线程之间的等待,从而提升系统整体的并发性能。在传统的阻塞线程池中,如果一个任务正在执行,其他线程必须等待其完成才能获取CPU资源,这在高...

    linux C++ 实现线程池(避免线程创建的耗时)

    6. **线程退出超时**:线程池可能需要设置一个线程退出的超时时间,防止某些任务阻塞导致线程无法释放。当线程达到预设的超时时,线程池可以强制结束该线程,或者将它从池中移除。 7. **代码实现**:`ThreadPool`很...

Global site tag (gtag.js) - Google Analytics