本文转自:http://www.xiaoyaochong.net/wordpress/?p=314
Java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。
有两个核心类ForkJoinPool和ForkJoinTask。
ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(...),invoke(...),submit(...)。具体描述如下:
客户端非fork/join调用 | 内部调用fork/join | |
异步执行 | execute(ForkJoinTask) | ForkJoinTask.fork |
等待获取结果 | invoke(ForkJoinTask) | ForkJoinTask.invoke |
执行,获取Futrue | submit(ForkJoinTask) | ForkJoinTask.fork(ForkJoinTasks are Futures) |
ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(...),invoke(...),submit(...)),共同决定着线程是否阻塞,具体请看下面的测试用例。
首先,用户需要创建一个自己的ForkJoinTask。代码如下:
public class MyForkJoinTask<V> extends ForkJoinTask<V> { /** * */ private static final long serialVersionUID = 1L; private V value; private boolean success = false; @Override public V getRawResult() { return value; } @Override protected void setRawResult(V value) { this.value = value; } @Override protected boolean exec() { System.out.println("exec"); return this.success; } public boolean isSuccess() { return success; } public void setSuccess(boolean isSuccess) { this.success = isSuccess; } }
测试ForkJoinPool.invoke(...):
@Test public void testForkJoinInvoke() throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask<String> task = new MyForkJoinTask<String>(); task.setSuccess(true); task.setRawResult("test"); String invokeResult = forkJoinPool.invoke(task); assertEquals(invokeResult, "test"); } @Test public void testForkJoinInvoke2() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask<String> task = new MyForkJoinTask<String>(); new Thread(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } task.complete("test"); } }).start(); // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...) String result = forkJoinPool.invoke(task); System.out.println(result); } @Test public void testForkJoinSubmit() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask<String> task = new MyForkJoinTask<String>(); task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞 ForkJoinTask<String> result = forkJoinPool.submit(task); result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete }
测试ForkJoinPool.submit(...):
@Test public void testForkJoinSubmit() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask<String> task = new MyForkJoinTask<String>(); task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞 ForkJoinTask<String> result = forkJoinPool.submit(task); result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete } @Test public void testForkJoinSubmit2() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask<String> task = new MyForkJoinTask<String>(); forkJoinPool.submit(task); Thread.sleep(1000); } @Test public void testForkJoinSubmit3() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask<String> task = new MyForkJoinTask<String>(); new Thread(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } task.complete("test"); } }).start(); ForkJoinTask<String> result = forkJoinPool.submit(task); // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...) result.get(); Thread.sleep(1000); }
测试ForkJoinPool.execute(...):
@Test public void testForkJoinExecute() throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask<String> task = new MyForkJoinTask<String>(); forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。 }
相关推荐
书中涵盖了Java的基础知识,包括语言特性、类库、编程技术和术语,并针对Java 6和7的新功能进行了更新,如Fork-Join框架、新的NIO Files API和Java Servlet 3.0等内容。 本书的目标读者群体广泛,包括计算机专业...
在并发处理方面,Java8推出了Fork/Join框架和Parallel Streams。Fork/Join框架是基于分治策略的并行计算模型,可以将大任务拆分成小任务并行执行,提高计算效率。Parallel Streams是Java8中的并行流,可以自动利用...
7. **并发编程改进**:Java提供了并发工具类如ConcurrentHashMap、CountDownLatch、CyclicBarrier等,以及Fork/Join框架和CompletableFuture,简化了多线程编程。 8. **反射API**:反射API允许程序在运行时动态地...
7. **改进的并发工具**:如Fork/Join框架,用于并行计算,以及新的CountDownLatch和CyclicBarrier。 8. **动态语言支持**:JSR 292(invokedynamic指令)增强了对动态语言的支持。 在这个压缩包中,我们有两个文件...
8. 异步编程:虽然Java基础语法不直接支持异步编程,但可以通过多线程(Thread类)或Java 8以后的并发库(如Fork/Join框架、CompletableFuture)实现。 9. 文件I/O:Java提供了java.io包来处理文件读写,包括File、...
本文档是关于Java中并行化与并发性的入门教材,主要面向大学二年级的学生。教材主要介绍共享内存并行化和并发性的基本概念,并且讨论了与Java相关的特定技术,例如线程和共享内存的使用。教材内容详细,从并行化和...
贡献者可以通过 Fork 仓库、修改文档并提交 Pull Request 的方式参与文档的完善工作。这不仅可以提高文档质量,还能增加社区的活跃度。 #### 三、MyBatis 入门指南 ##### 3.1 安装 - **Jar 包安装**:只需将 `...
JBPM,全称Java Business Process Management,是一款基于Java的企业级工作流管理系统,由JBoss组织开发并开源。它提供了一套完整的解决方案,用于设计、执行和管理业务流程,使得开发者能够轻松地实现业务逻辑的...
JBPM(Java Business Process Management)是一个开源的工作流管理系统,它允许开发者设计、执行、监控和优化业务流程。以下是对文档内容的详细解释: 一、概述 这部分通常会介绍JBPM的基本概念、特点以及其在业务...
- **fork/join**: 分支与合并操作。 - **decision**: 用于做出决策的节点。 - **transition**: 连接不同节点之间的路径。 - **event**: 代表流程中的事件。 - **action/script**: 可以执行的动作或脚本。 - **...
流程节点详解.....................................67 3.1 公共属性.............Fork和Join节点.........................................81 3.7 Decision节点.............................................86 3.8 ...
流程节点详解.....................................67 3.1 公共属性.............Fork和Join节点.........................................81 3.7 Decision节点.............................................86 3.8 ...