参考:http://m.blog.csdn.net/article/details?id=51287803
Java SE5的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和执行任务之间提供了一个间接层,Executor代替客户端执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在Java SE5/6中时启动任务的优选方法。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等
创建线程池
Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
见类图,接口Executor只有一个方法execute,接口ExecutorService扩展了Executor并添加了一些生命周期管理的方法,如shutdown、submit等。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。
Callable,Future用于返回结果
Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。
实例:用ExecutorService 实现对一个大数组并行求和
package executor; import java.util.*; import java.util.concurrent.*; /* * 并行计算求和. * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和, * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。 * */ public class ExecutorServiceParalelSumdemo { private int coreCpuNum; private ExecutorService executor; /* * save the result of each thread's sum calculation * */ private List<FutureTask<Long>> tasks = new ArrayList<FutureTask<Long>>(); public ExecutorServiceParalelSumdemo(){ coreCpuNum = Runtime.getRuntime().availableProcessors(); System.out.println("this host has "+coreCpuNum+ " CPU(s)"); //for before Java 8.0 //executor = Executors.newFixedThreadPool(coreCpuNum); //this CPU parallelism API is Java8 or later ONLY executor = Executors.newWorkStealingPool(coreCpuNum); } /* * thread main body */ class CalculatorTask implements Callable<Long>{ int nums[]; int start; int end; public CalculatorTask(final int nums[],int start,int end){ this.nums = nums; this.start = start; this.end = end; } @Override public Long call() throws Exception { long sum =0; for(int i=start;i<end;i++){ sum += nums[i]; } return sum; } } private long getFinalSum(){ long sum = 0; System.out.println(tasks.size() + " future tasks in pool"); for(int i=0;i<tasks.size();i++){ try { /* * If this future's thread not return its result, * get() will block here. So perf issue introduced. * we can use CompletionService to solve this potential issue. */ sum += tasks.get(i).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return sum; } public long ParallelSum(int[] nums){ int start,end,increment; // 根据CPU核心个数拆分任务,创建每个thread和对应的 FutureTask,并提交到ExecutorService中。 for(int i=0;i<coreCpuNum;i++) { increment = (nums.length/coreCpuNum)+1; start = i*increment; end = start+increment; if(end > nums.length){ end = nums.length; } //create thread tasks CalculatorTask calculator = new CalculatorTask(nums, start, end); //create each future result per thread task FutureTask<Long> task = new FutureTask<Long>(calculator); tasks.add(task); if(!executor.isShutdown()){ //execute() can't return result executor.submit(task); } } return getFinalSum(); } public void close(){ executor.shutdown(); } }
在上述例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使用CompletionService。
它与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。
CompletionService版本的求和例子
package executor; import java.util.*; import java.util.concurrent.*; public class CompletionServiceDemo { /* * 并行计算求和. * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和, * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。 * */ private int coreCpuNum; private ExecutorService executor; /* * CompletionService与ExecutorService最主要的区别在于 *前者submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装, *内部维护一个保存Future对象的BlockingQueue。 *只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。 *它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。 *所以,先完成的必定先被取出。这样就减少了不必要的等待时间。 * */ /* * CompletionService has a internal bloking queue to save the result of each * thread's sum calculation. so List<FutureTask<Long>> tasks appears unnecessary now * */ private CompletionService<Long> mcs; /* * save the result of each thread's sum calculation * */ public CompletionServiceDemo(){ coreCpuNum = Runtime.getRuntime().availableProcessors(); System.out.println("this host has "+coreCpuNum+ " CPU(s)"); //for before Java 8.0 //executor = Executors.newFixedThreadPool(coreCpuNum); //this CPU parallelism API is Java8 or later ONLY executor = Executors.newWorkStealingPool(coreCpuNum); mcs=new ExecutorCompletionService<>(executor); } /* * thread main body */ class CalculatorTask implements Callable<Long>{ int nums[]; int start; int end; public CalculatorTask(final int nums[],int start,int end){ this.nums = nums; this.start = start; this.end = end; } @Override public Long call() throws Exception { long sum =0; for(int i=start;i<end;i++){ sum += nums[i]; } return sum; } } private long getFinalSum(){ long sum = 0; for(int i=0;i<coreCpuNum;i++){ try { /* * get one complete result from CompletionServer internal * blocking queue */ sum += mcs.take().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return sum; } public long ParallelSum(int[] nums){ int start,end,increment; // 根据CPU核心个数拆分任务,创建每个thread和对应的 FutureTask,并提交到ExecutorService中。 for(int i=0;i<coreCpuNum;i++) { increment = (nums.length/coreCpuNum)+1; start = i*increment; end = start+increment; if(end > nums.length){ end = nums.length; } //create thread tasks CalculatorTask mthread = new CalculatorTask(nums, start, end); if(!executor.isShutdown()){ mcs.submit(mthread); } } return getFinalSum(); } public void close(){ executor.shutdown(); } }
相关推荐
在示例中,创建了一个ExecutorCompletionService实例,它继承自CompletionService并且使用ExecutorService作为底层的执行器。提交任务的方式与ExecutorService类似,但获取结果时,不再直接从列表中获取Future,而是...
有以下类的实例: ThreadPool ScheduledThread CyclicBarrier BlockingQueue CountDownLatch FutureTask CompletionService Semaphore
例如,`TestCyclicBarrier`和`TestCyclicBarrier2`可能包含了如何创建和使用`CyclicBarrier`来同步多个任务的实例。 2. **BlockingQueue**:阻塞队列是一种线程安全的数据结构,它支持插入和移除操作,并且在队列满...
在使用`ExecutorCompletionService`时,我们需要创建一个`ExecutorService`实例,然后将这个`ExecutorService`传递给`ExecutorCompletionService`的构造函数。接着,我们可以向`ExecutorCompletionService`提交任务...
- 通过`completionService.submit(serv)`提交执行`XMPPIOService.call()`,完成从Socket报文到Packet的转换。 2. **Packet处理**: - 接收的Socket报文通过`XMPPIOService.call()`转换为Packet,这是内部处理的...
- `java.util.concurrent.CompletionService`:允许获取执行任务的结果,`ExecutorCompletionService` 是其具体实现,结合了 `ExecutorService` 和 `Future`。 5. 线程池执行原理 线程池的执行过程主要包括任务提交...
17. **CompletionService** - 提供一种方式,一旦任务完成,就获取并处理结果。 18. **线程服务的优雅停止** - 使用`shutdown()`和平稳关闭线程池,等待已提交任务完成。 - `shutdownNow()`尝试停止所有任务,...
4.2 实例封闭 4.2.1 Java监视器模式 4.2.2 示例:车辆追踪 4.3 线程安全性的委托 4.3.1 示例:基于委托的车辆追踪器 4.3.2 独立的状态变量 4.3.3 当委托失效时 4.3.4 发布底层的状态变量 4.3.5 示例:发布...
4.2 实例封闭 4.2.1 Java监视器模式 4.2.2 示例:车辆追踪 4.3 线程安全性的委托 4.3.1 示例:基于委托的车辆追踪器 4.3.2 独立的状态变量 4.3.3 当委托失效时 4.3.4 发布底层的状态变量 4.3.5 示例:发布...
`Future`常与`ExecutorService`的`submit()`方法一起使用,返回一个`Future`实例,可以通过该实例监控任务状态。 `CompletionService`接口扩展了`ExecutorService`,它提供了获取已完成任务的便捷方式,特别适用于...
Executor框架包括了Executor、ExecutorService、CompletionService、Future、Callable等接口和类。Executors则是Executor框架的一个工具类,用于创建不同类型的线程池。 在实际应用中,线程池的使用场景广泛,例如...
Thread类的子类重写run()方法,而Runnable接口实现者则将run()方法放在实现类中,然后通过Thread类实例化来启动线程。 二、异步执行 异步执行是相对于同步执行而言的。在同步执行中,线程会等待某个操作完成后再...
- **CompletionService**:结合Executor和Future,提供了一种等待多个任务完成的解决方案。 #### 2.2 Java内存模型 - **volatile关键字**:确保了变量的可见性和禁止指令重排。 - **原子变量**:如AtomicInteger等...
此外,Java并发API(java.util.concurrent)提供了一些高级工具,如ExecutorService、Future、Callable和CompletionService,它们简化了多线程编程并提高了性能。 - ExecutorService允许我们创建线程池,管理和控制...
探讨为什么需要同步机制、`synchronized`关键字的工作原理、实例方法和静态方法的同步以及同步块的应用。 - **第3章:Volatile关键字的使用** 解释何时使用`volatile`关键字,以及它如何影响变量的可见性。同时...
在这个项目中,我们可能会看到一系列使用Java 8及更高版本的新特性的实例,例如函数式编程、流API、并发处理、模块化系统以及类型推断等。 1. **函数式编程**:自Java 8以来,函数式编程的概念被引入,使得Java...