- 浏览: 1017755 次
- 性别:
- 来自: 杭州
文章分类
- 全部博客 (826)
- 硬件 (8)
- 软件 (24)
- 软件工程 (34)
- JAVA (229)
- C/C++/C# (77)
- JavaScript (8)
- PHP (1)
- Ruby (3)
- MySQL (14)
- 数据库 (19)
- 心情记事 (12)
- 团队管理 (19)
- Hadoop (1)
- spring (22)
- mybatis(ibatis) (7)
- tomcat (16)
- velocity (0)
- 系统架构 (6)
- JMX (8)
- proxool (1)
- 开发工具 (16)
- python (10)
- JVM (27)
- servlet (5)
- JMS (26)
- ant (2)
- 设计模式 (5)
- 智力题 (2)
- 面试题收集 (1)
- 孙子兵法 (16)
- 测试 (1)
- 数据结构 (7)
- 算法 (22)
- Android (11)
- 汽车驾驶 (1)
- lucene (1)
- memcache (12)
- 技术架构 (7)
- OTP-Erlang (7)
- memcached (17)
- redis (20)
- 浏览器插件 (3)
- sqlite (3)
- Heritrix (9)
- Java线程 (1)
- scala (0)
- Mina (6)
- 汇编 (2)
- Netty (15)
- libevent (0)
- CentOS (12)
- mongod (5)
- mac os (0)
最新评论
-
kingasdfg:
你这里面存在一个错误添加多个任务 应该是这样的 /** * ...
Quartz的任务的临时启动和暂停和恢复【转】 -
kyzeng:
纠正一个错误,long型对应的符号是J,不是L。
Jni中C++和Java的参数传递 -
zhaohaolin:
抱歉,兄弟,只是留下作记录,方便学习,如果觉得资料不好,可以到 ...
netty的个人使用心得【转】 -
cccoooccooco:
谢谢!自己一直以为虚机得使用网线才可以与主机连接呢。。
主机网卡无网线连接与虚拟机通信 -
yuqilin001:
要转别人的东西,请转清楚点嘛,少了这么多类,误人子弟
netty的个人使用心得【转】
Java 7 Fork/Join 并行计算框架概览
文章分类:Java编程
应用程序并行计算遇到的问题
当硬件处理能力不能按摩尔定律垂直发展的时候,选择了水平发展。多核处理器已广泛应用,未来处理器的核心数将进一步发布,甚至达到上百上千的数量。而现在 很多的应用程序在运行在多核心的处理器上并不能得到很好的性能提升,因为应用程序的并发处理能力不强,不能够合理有效地的利用计算资源。线性的计算只能利 用n分之一的计算支援。
要提高应用程序在多核处理器上的执行效率,只能想办法提高应用程序的本身的并行能力。常规的做法就是使用多线程,让更多的任务同时处理,或者让一部分操作 异步执行,这种简单的多线程处理方式在处理器核心数比较少的情况下能够有效地利用处理资源,因为在处理器核心比较少的情况下,让不多的几个任务并行执行即 可。但是当处理器核心数发展很大的数目,上百上千的时候,这种按任务的并发处理方法也不能充分利用处理资源,因为一般的应用程序没有那么多的并发处理任务 (服务器程序是个例外)。所以,只能考虑把一个任务拆分为多个单元,每个单元分别得执行最后合并每个单元的结果。一个任务的并行拆分,一种方法就是寄希望 于硬件平台或者操作系统,但是目前这个领域还没有很好的结果。另一种方案就是还是只有依靠应用程序本身对任务经行拆封执行。
Fork/Join框架
依靠应用程序本身并行拆封任务,如果使用简单的多线程程序的方法,复杂度必然很大。这就需要一个更好的范式或者工具来代程序员处理这类问题。Java 7也意识到了这个问题,才标准库中集成了由Doug Lea开发的Fork/Join并行计算框架。通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。但是Fork/Join并行计算框架,并不是银弹,并不能解决所有应用程序在超多核心处理器上的并发问题。
如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。其原理如下图。
应用程序开发者需要做的就是拆分任务并组合每个子任务的中间结果,而不用再考虑线程和锁的问题。
一个简单的例子
我们首先看一个简单的Fork/Join的任务定义。
这段代码中,定义了一个累加的任务,在compute方法中,判断当前的计算范围是否小于一个值,如果是则计算,如果没有,就把任务拆分为连个子任务,并合并连个子任务的中间结果。程序递归的完成了任务拆分和计算。
任务定义之后就是执行任务,Fork/Join提供一个和Executor框架 的扩展线程池来执行任务。
Fork/Join框架的主要类
RecursiveAction供不需要返回值的任务继续。
RecursiveTask通过泛型参数设置计算的返回值类型。
ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。
sumit方法返回了task本身,ForkJoinTask实现了Future接口,所以可以通过它等待获得结果。
另一例子
这个例子并行排序数组,不需要返回结果,所以继承了RecursiveAction。
动手尝试
Fork/Join框架的代码已经整合到了最新的JDK7的Binary Snapshot Releases中,可以通过这个地址 下载。
本文中的代码见附件。
当硬件处理能力不能按摩尔定律垂直发展的时候,选择了水平发展。多核处理器已广泛应用,未来处理器的核心数将进一步发布,甚至达到上百上千的数量。而现在 很多的应用程序在运行在多核心的处理器上并不能得到很好的性能提升,因为应用程序的并发处理能力不强,不能够合理有效地的利用计算资源。线性的计算只能利 用n分之一的计算支援。
要提高应用程序在多核处理器上的执行效率,只能想办法提高应用程序的本身的并行能力。常规的做法就是使用多线程,让更多的任务同时处理,或者让一部分操作 异步执行,这种简单的多线程处理方式在处理器核心数比较少的情况下能够有效地利用处理资源,因为在处理器核心比较少的情况下,让不多的几个任务并行执行即 可。但是当处理器核心数发展很大的数目,上百上千的时候,这种按任务的并发处理方法也不能充分利用处理资源,因为一般的应用程序没有那么多的并发处理任务 (服务器程序是个例外)。所以,只能考虑把一个任务拆分为多个单元,每个单元分别得执行最后合并每个单元的结果。一个任务的并行拆分,一种方法就是寄希望 于硬件平台或者操作系统,但是目前这个领域还没有很好的结果。另一种方案就是还是只有依靠应用程序本身对任务经行拆封执行。
Fork/Join框架
依靠应用程序本身并行拆封任务,如果使用简单的多线程程序的方法,复杂度必然很大。这就需要一个更好的范式或者工具来代程序员处理这类问题。Java 7也意识到了这个问题,才标准库中集成了由Doug Lea开发的Fork/Join并行计算框架。通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。但是Fork/Join并行计算框架,并不是银弹,并不能解决所有应用程序在超多核心处理器上的并发问题。
如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。其原理如下图。
应用程序开发者需要做的就是拆分任务并组合每个子任务的中间结果,而不用再考虑线程和锁的问题。
一个简单的例子
我们首先看一个简单的Fork/Join的任务定义。
- public class Calculator extends RecursiveTask<Integer> {
- private static final int THRESHOLD = 100 ;
- private int start;
- private int end;
- public Calculator( int start, int end) {
- this .start = start;
- this .end = end;
- }
- @Override
- protected Integer compute() {
- int sum = 0 ;
- if ((start - end) < THRESHOLD){
- for ( int i = start; i< end;i++){
- sum += i;
- }
- }else {
- int middle = (start + end) / 2 ;
- Calculator left = new Calculator(start, middle);
- Calculator right = new Calculator(middle + 1 , end);
- left.fork();
- right.fork();
- sum = left.join() + right.join();
- }
- return sum;
- }
- }
public class Calculator extends RecursiveTask<Integer> { private static final int THRESHOLD = 100; private int start; private int end; public Calculator(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if((start - end) < THRESHOLD){ for(int i = start; i< end;i++){ sum += i; } }else{ int middle = (start + end) /2; Calculator left = new Calculator(start, middle); Calculator right = new Calculator(middle + 1, end); left.fork(); right.fork(); sum = left.join() + right.join(); } return sum; } }
这段代码中,定义了一个累加的任务,在compute方法中,判断当前的计算范围是否小于一个值,如果是则计算,如果没有,就把任务拆分为连个子任务,并合并连个子任务的中间结果。程序递归的完成了任务拆分和计算。
任务定义之后就是执行任务,Fork/Join提供一个和Executor框架 的扩展线程池来执行任务。
- @Test
- public void run() throws Exception{
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- Future<Integer> result = forkJoinPool.submit(new Calculator( 0 , 10000 ));
- assertEquals(new Integer( 49995000 ), result.get());
- }
@Test public void run() throws Exception{ ForkJoinPool forkJoinPool = new ForkJoinPool(); Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000)); assertEquals(new Integer(49995000), result.get()); }
Fork/Join框架的主要类
RecursiveAction供不需要返回值的任务继续。
RecursiveTask通过泛型参数设置计算的返回值类型。
ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { doSubmit(task); return task; }
sumit方法返回了task本身,ForkJoinTask实现了Future接口,所以可以通过它等待获得结果。
另一例子
这个例子并行排序数组,不需要返回结果,所以继承了RecursiveAction。
- public class SortTask extends RecursiveAction {
- final long [] array;
- final int start;
- final int end;
- private int THRESHOLD = 100 ; //For demo only
- public SortTask( long [] array) {
- this .array = array;
- this .start = 0 ;
- this .end = array.length - 1 ;
- }
- public SortTask( long [] array, int start, int end) {
- this .array = array;
- this .start = start;
- this .end = end;
- }
- protected void compute() {
- if (end - start < THRESHOLD)
- sequentiallySort(array, start, end);
- else {
- int pivot = partition(array, start, end);
- new SortTask(array, start, pivot - 1 ).fork();
- new SortTask(array, pivot + 1 , end).fork();
- }
- }
- private int partition( long [] array, int start, int end) {
- long x = array[end];
- int i = start - 1 ;
- for ( int j = start; j < end; j++) {
- if (array[j] <= x) {
- i++;
- swap(array, i, j);
- }
- }
- swap(array, i + 1 , end);
- return i + 1 ;
- }
- private void swap( long [] array, int i, int j) {
- if (i != j) {
- long temp = array[i];
- array[i] = array[j];
- array[j] = temp;
- }
- }
- private void sequentiallySort( long [] array, int lo, int hi) {
- Arrays.sort(array, lo, hi + 1 );
- }
- }
public class SortTask extends RecursiveAction { final long[] array; final int start; final int end; private int THRESHOLD = 100; //For demo only public SortTask(long[] array) { this.array = array; this.start = 0; this.end = array.length - 1; } public SortTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } protected void compute() { if (end - start < THRESHOLD) sequentiallySort(array, start, end); else { int pivot = partition(array, start, end); new SortTask(array, start, pivot - 1).fork(); new SortTask(array, pivot + 1, end).fork(); } } private int partition(long[] array, int start, int end) { long x = array[end]; int i = start - 1; for (int j = start; j < end; j++) { if (array[j] <= x) { i++; swap(array, i, j); } } swap(array, i + 1, end); return i + 1; } private void swap(long[] array, int i, int j) { if (i != j) { long temp = array[i]; array[i] = array[j]; array[j] = temp; } } private void sequentiallySort(long[] array, int lo, int hi) { Arrays.sort(array, lo, hi + 1); } }
- @Test
- public void run() throws InterruptedException {
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- Random rnd = new Random();
- long [] array = new long [SIZE];
- for ( int i = 0 ; i < SIZE; i++) {
- array[i] = rnd.nextInt();
- }
- forkJoinPool.submit(new SortTask(array));
- forkJoinPool.shutdown();
- forkJoinPool.awaitTermination(1000 , TimeUnit.SECONDS);
- for ( int i = 1 ; i < SIZE; i++) {
- assertTrue(array[i - 1 ] < array[i]);
- }
- }
@Test public void run() throws InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); Random rnd = new Random(); long[] array = new long[SIZE]; for (int i = 0; i < SIZE; i++) { array[i] = rnd.nextInt(); } forkJoinPool.submit(new SortTask(array)); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS); for (int i = 1; i < SIZE; i++) { assertTrue(array[i - 1] < array[i]); } }
动手尝试
Fork/Join框架的代码已经整合到了最新的JDK7的Binary Snapshot Releases中,可以通过这个地址 下载。
本文中的代码见附件。
发表评论
-
调试jdk中的源码,查看jdk局部变量
2013-06-15 23:30 1055调试jdk中的源码,查看jdk局部变量 2012-04 ... -
Eclipse快捷键 10个最有用的快捷键<转>
2013-04-11 23:28 1082Eclipse中10个最有用的快捷键组合 一个Eclip ... -
Lucene 3.6 中文分词、分页查询、高亮显示等
2012-12-09 23:35 18241、准备工作 下载lucene 3.6.1 : htt ... -
Maven实战(九)——打包的技巧(转)
2012-10-12 00:41 941“打包“这个词听起 ... -
基于Maven的web工程如何配置嵌入式Jetty Server开发调试环境(转)
2012-10-12 00:28 9421、首先在web工程的POM文件里添加依赖jar包如下: ... -
轻轻松松学Solr(1)--概述及安装[转]
2012-09-18 14:59 998概述 这段时间对企 ... -
分析Netty工作流程[转]
2012-09-04 19:02 893下面以Netty中Echo的例 ... -
让eclipse在ubuntu下面好看一点
2012-03-27 10:17 925<p> </p> <h1 cla ... -
zookeeper安装和应用场合(名字,配置,锁,队列,集群管理)[转]
2012-01-12 17:59 1653安装和配置详解 本文 ... -
Jakarta-Common-BeanUtils使用笔记[转]
2012-01-10 14:13 1160Jakarta-Common-BeanUtils ... -
一个关于Java Thread wait(),notify()的实用例【转】
2012-01-07 16:05 1024///// // ProducerConsume ... -
Java基础:Java中的 assert 关键字解析【转】
2012-01-06 19:50 1066J2SE 1.4在语言上提供了 ... -
一篇不错的讲解Java异常的文章(转载)----感觉很不错,读了以后很有启发[转]
2012-01-06 15:02 1272六种异常处理的陋习 ... -
如何解决HP QC(Quality Center)在Windows 7下不能工作的问题
2011-12-26 10:48 1588HP QC(Quantity Center) 是一款不错的测 ... -
JAVA读写文件,中文乱码 【转】
2011-12-19 23:43 2123最近在做HTML静态生成,需要从硬盘上把模版文件的内容读出来。 ... -
Java 6 JVM参数选项大全(中文版)【转】
2011-12-19 19:51 974Java 6 JVM参数选项大全(中文版) 作者 ... -
使用assembly plugin实现自定义打包【转】
2011-12-13 01:58 975在上一篇文章中,讨论到在对maven的机制不熟悉的情况下,为了 ... -
使用maven ant task实现非标准打包[转]
2011-12-13 01:56 1050maven很强大,但是总有些事情干起来不是得心应手,没有使用a ... -
Java日期转换SimpleDateFormat格式大全【转】
2011-12-08 20:22 131924小时制时间 显示: public clas ... -
使用Spring的表单标签库
2011-11-22 20:08 107813.9. 使用Spring的 ...
相关推荐
- **Fork/Join框架**:这是Java 7中最引人注目的新特性之一。Fork/Join框架为分治算法(divide-and-conquer algorithms)提供了一种高效实现方式,它可以将一个大任务分解成若干个小任务,并在多核处理器上并行执行...
Fork/Join 框架是一种新的并行计算框架,它简化了多线程编程模型。这个框架鼓励开发者将任务分解成更小的任务,然后将结果合并。这种方式非常适合于需要进行大量并行处理的应用场景。 ##### 2. Objects 类 Java 7 ...
- **定义**:Fork/Join是一种并行计算框架,用于高效地分配任务并在多核处理器上执行。 - **CUDA与Fork/Join**:在CUDA中,可以通过类似Fork/Join的方式来实现数据并行处理,但具体实现方式有所不同,更多依赖于CUDA...
Java 8 Stream 并行计算原理** - **Stream API**:Java 8 引入的新特性,支持函数式编程风格。 - **并行流**:利用多核处理器的优势,将数据分割为多个部分并行处理。 - **并行流与串行流对比**:并行流适用于数据...
- Fork/Join框架介绍 - CompletableFuture的使用案例 - Stream API与并行流 - 并发安全的集合类 #### Java多线程基础 **线程的概念与创建方式:** - **概念:** 在计算机科学中,线程是操作系统能够进行运算...
又如,在数据分析场景下,利用`Fork/Join`框架进行并行计算,可以大幅减少数据处理时间。 综上所述,“Java并发编程实战.pdf”这一文档为读者提供了丰富的理论基础和技术实践指导,帮助开发人员更好地理解和掌握...
书中介绍了如何使用Fork/Join框架来实现高效的并行递归算法。 以上内容概述了《Java Concurrency In Practice》这本书中所涉及的关键知识点和技术细节,希望能为读者提供有价值的参考和启示。通过深入学习这些内容...
- **fork/join**:并行执行控制结构。 - **initial**:初始块。 - **delays**:显式延时。 - **UDP**:用户定义的原语。 - **wait**:等待条件。 这些结构通常不被综合工具所支持,因为它们主要用于测试和验证目的...
- **并发控制结构**: 如`fork...join`结构,支持高效的并行处理。 - **通信机制**: 如`mailbox`、`semaphore`和`event`等,用于协调并发进程间的通信。 - **虚接口**: 用于连接设计模块和验证环境,简化了接口的复杂...
### 线程与并发:Ruby并行世界的探索之旅 #### Ruby 语言概览 Ruby 是一种高级的、面向对象的编程语言,由日本开发者松本行弘(Yukihiro "Matz" Matsumoto)于 1995 年创建。其设计初衷旨在实现简单、自然且强大的...
- **高级并发控制**:SystemVerilog提供了多种并发控制机制,包括fork/join、wait、@等操作符,使得开发者能够更容易地处理并发事件。 - **接口和封装**:为了提高代码的可复用性,SystemVerilog支持接口和封装的...
像素处理分为分叉阶段(Fork Phase)和合并阶段(Join Phase),这两个阶段处理像素的着色和其他计算任务。像素着色器(Pixel Shader)或片段着色器(Fragment Shader)在这个过程中计算像素的颜色和其他属性。 ...
- **并行计算**:利用多线程或多进程实现高效的数据处理任务。 - **系统服务开发**:构建后台运行的服务程序,例如守护进程(daemon)。 #### 6. 总结 《UNIX环境高级编程(第2版)》不仅是一本理论性强的技术书籍...
9. **分叉和连接点(Fork and Join Nodes)**:支持并发处理,分叉点启动多个线程,连接点等待所有线程完成后再继续后续流程。 10. **扩展域(Expansion Region)**:一种特殊的区域,允许活动的重复执行,如迭代或...
- **并行执行**:探讨了如何使用`fork-join`结构来编写并行执行的代码,这对于提高系统性能有着重要作用。 #### 六、系统任务 - **显示结果**:介绍了如何使用系统任务(system tasks)来显示仿真结果,包括常用的`$...
为了实现更复杂的控制流,可以在 `initial` 块内部嵌套循环语句(如 `forever`),或者使用 `fork` 和 `join` 来并行执行多个任务,提高仿真效率。 ##### 遍历测试中的for循环应用 对于具有多种工作模式的设计,...
#### 知识点概览 本文档旨在提供一个全面且深入的理解关于Linux环境下的多线程编程技术。主要内容涵盖了多线程的基础概念、线程编程的基本方法、线程属性的配置以及同步机制的使用等关键领域。以下是根据文档标题、...