`

Java7 ForkJoin入门实例

阅读更多

本文转自:http://www.xiaoyaochong.net/wordpress/?p=314

Java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。

有两个核心类ForkJoinPool和ForkJoinTask。

ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(...),invoke(...),submit(...)。具体描述如下:

  客户端非fork/join调用 内部调用fork/join
异步执行 execute(ForkJoinTask) ForkJoinTask.fork
等待获取结果 invoke(ForkJoinTask) ForkJoinTask.invoke
执行,获取Futrue submit(ForkJoinTask) ForkJoinTask.fork(ForkJoinTasks are Futures)

 

 ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(...),invoke(...),submit(...)),共同决定着线程是否阻塞,具体请看下面的测试用例。

首先,用户需要创建一个自己的ForkJoinTask。代码如下:

public class MyForkJoinTask<V> extends ForkJoinTask<V> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    
    private V value;
    
    private boolean success = false;
    
    @Override
    public V getRawResult() {
        return value;
    }

    @Override
    protected void setRawResult(V value) {
        this.value = value;
    }

    @Override
    protected boolean exec() {
        System.out.println("exec");
        return this.success;
    }

    public boolean isSuccess() {
        return success;
    }

    public void setSuccess(boolean isSuccess) {
        this.success = isSuccess;
    }

}

 

测试ForkJoinPool.invoke(...):

@Test
    public void testForkJoinInvoke() throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        MyForkJoinTask<String> task = new MyForkJoinTask<String>();
        task.setSuccess(true);
        task.setRawResult("test");
        String invokeResult = forkJoinPool.invoke(task);
        assertEquals(invokeResult, "test");
    }
    
    @Test
    public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask<String> task = new MyForkJoinTask<String>();
        new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                
                task.complete("test");
            }
        }).start();
        
        // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
        String result = forkJoinPool.invoke(task);
        System.out.println(result);
    }
    
    @Test
    public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask<String> task = new MyForkJoinTask<String>();
        
        task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
        ForkJoinTask<String> result = forkJoinPool.submit(task);
        result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
    }

 

测试ForkJoinPool.submit(...):

@Test
    public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask<String> task = new MyForkJoinTask<String>();
        
        task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
        ForkJoinTask<String> result = forkJoinPool.submit(task);
        result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
    }
    
    @Test
    public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask<String> task = new MyForkJoinTask<String>();
        
        forkJoinPool.submit(task);
        Thread.sleep(1000);
    }
    
    @Test
    public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask<String> task = new MyForkJoinTask<String>();
        new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                
                task.complete("test");
            }
        }).start();
        
        ForkJoinTask<String> result = forkJoinPool.submit(task);
        // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
        result.get(); 
        Thread.sleep(1000);
    }

 

测试ForkJoinPool.execute(...):

 @Test
    public void testForkJoinExecute() throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        MyForkJoinTask<String> task = new MyForkJoinTask<String>();
        forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。
    }

 

分享到:
评论

相关推荐

    Java学习指南1

    特别值得一提的是,本书深入探讨了Java 6和Java 7引入的新功能,例如Fork-Join框架的使用,它允许开发者更有效地处理并行任务;新的NIO Files API,它为文件和目录提供了更加丰富的操作接口;以及Java Servlet 3.0,...

    Java8零基础入门视频教程

    在并发处理方面,Java8推出了Fork/Join框架和Parallel Streams。Fork/Join框架是基于分治策略的并行计算模型,可以将大任务拆分成小任务并行执行,提高计算效率。Parallel Streams是Java8中的并行流,可以自动利用...

    Java核心技术第9版,基础知识+高级特性

    7. **并发编程改进**:Java提供了并发工具类如ConcurrentHashMap、CountDownLatch、CyclicBarrier等,以及Fork/Join框架和CompletableFuture,简化了多线程编程。 8. **反射API**:反射API允许程序在运行时动态地...

    jdk-7u80-windows-x64.rar

    7. **改进的并发工具**:如Fork/Join框架,用于并行计算,以及新的CountDownLatch和CyclicBarrier。 8. **动态语言支持**:JSR 292(invokedynamic指令)增强了对动态语言的支持。 在这个压缩包中,我们有两个文件...

    Java第一项目

    8. 异步编程:虽然Java基础语法不直接支持异步编程,但可以通过多线程(Thread类)或Java 8以后的并发库(如Fork/Join框架、CompletableFuture)实现。 9. 文件I/O:Java提供了java.io包来处理文件读写,包括File、...

    CS62sophomoricParallelismAndConcurrency

    本文档是关于Java中并行化与并发性的入门教材,主要面向大学二年级的学生。教材主要介绍共享内存并行化和并发性的基本概念,并且讨论了与Java相关的特定技术,例如线程和共享内存的使用。教材内容详细,从并行化和...

    MyBatis_3.4.6中文参考

    贡献者可以通过 Fork 仓库、修改文档并提交 Pull Request 的方式参与文档的完善工作。这不仅可以提高文档质量,还能增加社区的活跃度。 #### 三、MyBatis 入门指南 ##### 3.1 安装 - **Jar 包安装**:只需将 `...

    jbpm 工作流开发指南

    JBPM,全称Java Business Process Management,是一款基于Java的企业级工作流管理系统,由JBoss组织开发并开源。它提供了一套完整的解决方案,用于设计、执行和管理业务流程,使得开发者能够轻松地实现业务逻辑的...

    JBPM工作流开发指南v10-20070706.doc

    JBPM(Java Business Process Management)是一个开源的工作流管理系统,它允许开发者设计、执行、监控和优化业务流程。以下是对文档内容的详细解释: 一、概述 这部分通常会介绍JBPM的基本概念、特点以及其在业务...

    JBPM开发指南(中文)

    JBPM还提供了丰富的API和工具集,如Grahpviz可视化工具,用于设计和调试流程,以及JConsole等监控工具,以便实时查看流程实例的状态和性能。通过这些工具,开发者可以轻松地集成JBPM到现有的Java应用服务器环境中,...

    JBPM 开发指南

    - **fork/join**: 分支与合并操作。 - **decision**: 用于做出决策的节点。 - **transition**: 连接不同节点之间的路径。 - **event**: 代表流程中的事件。 - **action/script**: 可以执行的动作或脚本。 - **...

    JBPM工作流开发指南

    与`fork`相反,`join`用于合并多条分支路径,确保所有分支完成后流程继续前进。 #### 3.1.9 `decision`(决策) `decision`用于根据特定条件决定流程的走向,类似于编程中的if-else语句。 #### 3.1.10 `...

    深入浅出jBPM完整版part1

    流程节点详解.....................................67 3.1 公共属性.............Fork和Join节点.........................................81 3.7 Decision节点.............................................86 3.8 ...

    深入浅出jBPM完整版part2

    流程节点详解.....................................67 3.1 公共属性.............Fork和Join节点.........................................81 3.7 Decision节点.............................................86 3.8 ...

Global site tag (gtag.js) - Google Analytics