`

Java多线程高并发高级篇ForkJoin框架篇(二)---ForkJoinPool中任务分拆Fork解密

 
阅读更多

我们接上篇,继续说下ForkJoinPool中任务分拆方法fork。

一、fork()方法解密

为了更简明的说明整个调用过程,我们把计算范围再次缩小为:【1,500】。

 

package com.sitech.threadPool;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPoolDemo {
	private static final Integer THRESHOLD = 200;
    static class CountTask extends RecursiveTask<Integer> {
		private static final long serialVersionUID = 1L;
		//用start和end分别表示子任务计算开始和结束的值
        private int start;
        private int end;
 
        public CountTask(int start , int end) {
            this.start = start;
            this.end = end;
        }
 
        @Override
        protected Integer compute() {
        	//为了防止子任务划分层次太深而造成系统资源耗尽,需要加一个阈值
            if(end - start < THRESHOLD) {
                System.out.println("子任务区间达到阈值,开始计算的区间:[" + start + "," + end + "]");
                Integer sum = 0;
                for(int i = start ; i <= end  ; i++) {
                    sum += i;
                }
                return sum;
            }else {
            	//否则再进行任务拆分,拆分成两个任务
            	CountTask left = new CountTask(start, (start + end) / 2);
            	System.out.println("左半部分子任务,开始计算的区间:[" + start + "," + (start + end) / 2 + "]");
                left.fork();
                CountTask right = new CountTask((start + end) / 2 + 1 , end);
                System.out.println("右半部分子任务,开始计算的区间:[" + ((start + end) / 2 + 1) + "," + end + "]");
                right.fork();
                //左右半任务结果合并
                return left.join() + right.join();
            }
        }
    }
 
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> taskFuture =  pool.submit(new CountTask(1,500));
        try {
            Integer result = taskFuture.get();
            System.out.println("result = " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 当我们执行上面方法方法时,初始,main线程提交任务,将任务【1,500】放到任务队列中submissionQueue,任务提交后,因为线程池中没有线程,因此创建一个工作线程--名字叫Thread[ForkJoinPool-1-worker-1,5,main](这一部分请看上一篇帖子,这里不再赘述),放到线程池的works数组中,然后main线程等待在Integer result = taskFuture.get()---获取结果处。见下图


 

 

在工作线程Thread[ForkJoinPool-1-worker-1,5,main]执行t.start后,工作线程开始处理任务,执行方法run()。

 

public void run() {
        Throwable exception = null;
        try {
            onStart();
            pool.work(this);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            onTermination(exception);
        }
    }

在run方法中执行了onStart方法和 pool.work(this);我们先看onStart()方法。在onStart方法中,初始化了工作线程的可以存放任务的队列大小(这就说明了我们前面说的,每一个工作线程都有一个任务队列),默认容量大小是8192。

 

 

protected void onStart() {
        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        int r = pool.workerSeedGenerator.nextInt();
        seed = (r == 0) ? 1 : r; //  must be nonzero
    }

 

 

  再看另外一个方法pool.work(this);,线程池使用该工作线程开始处理任务。关键方法是:scan(w, a),其中a是经过计算的偏移量,表示处理队列中的第几个任务,w是当前的工作线程。

 

final void work(ForkJoinWorkerThread w) {
        boolean swept = false;                // true on empty scans
        long c;
        while (!w.terminate && (int)(c = ctl) >= 0) {
            int a;                            // active count
            if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
                swept = scan(w, a);
            else if (tryAwaitWork(w, c))
                swept = false;
        }
    }

 

 

在scan(w, a)方法中,我们看下源码,它涉及到的关键方法是w.execTask(t);其中t是从任务队列中取到的任务,w还是当前工作线程。

 

private boolean scan(ForkJoinWorkerThread w, int a) {
        int g = scanGuard; // mask 0 avoids useless scans if only one active
        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
        ForkJoinWorkerThread[] ws = workers;
        if (ws == null || ws.length <= m)         // staleness check
            return false;
        for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            ForkJoinWorkerThread v = ws[k & m];
            if (v != null && (b = v.queueBase) != v.queueTop &&
                (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && v.queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    int d = (v.queueBase = b + 1) - v.queueTop;
                    v.stealHint = w.poolIndex;
                    if (d != 0)
                        signalWork();             // propagate if nonempty
                    w.execTask(t);
                }
                r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                return false;                     // store next seed
            }
            else if (j < 0) {                     // xorshift
                r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
            }
            else
                ++k;
        }
        if (scanGuard != g)                       // staleness check
            return false;
        else {                                    // try to take submission
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            if ((b = queueBase) != queueTop &&
                (q = submissionQueue) != null &&
                (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    queueBase = b + 1;
                    w.execTask(t);
                }
                return false;
            }
            return true;                         // all queues empty
        }
    }

 

 

我们看下w.execTask(t);在这个方法中,只要任务不空,一直循环调用处理,调用的关键方法是t.doExec();其中t还是从任务队列中取出的方法,doExec()方法是在ForkJoinTask类中。

 

/**
     * Runs the given task, plus any local tasks until queue is empty
     */
    final void execTask(ForkJoinTask<?> t) {
        currentSteal = t;
        for (;;) {
            if (t != null)
                t.doExec();
            if (queueTop == queueBase)
                break;
            t = locallyFifo ? locallyDeqTask() : popTask();
        }
        ++stealCount;
        currentSteal = null;
    }

 

 

在doExec()方法中,关键源码是completed = exec();,如下:

 

/**
     * Primary execution method for stolen tasks. Unless done, calls
     * exec and records status if completed, but doesn't wait for
     * completion otherwise.
     */
    final void doExec() {
        if (status >= 0) {
            boolean completed;
            try {
                completed = exec();
            } catch (Throwable rex) {
                setExceptionalCompletion(rex);
                return;
            }
            if (completed)
                setCompletion(NORMAL); // must be outside try block
        }
    }

 

在exec()方法中,调用了computer()方法,这里调用的是CountTask 中的computer方法(因为我们在CountTask extends RecursiveTask<Integer>中重写了computer方法)。

 

/**
     * Implements execution conventions for RecursiveTask.
     */
    protected final boolean exec() {
        result = compute();
        return true;
    }

上面的步骤是从任务提交到去计算执行的所有源码部分。很简单明了。

 

在我们写的computer方法中,我们设置阈值200,也就是在end-start>200之前,我们都需要将任务分拆。


 所以,Thread[ForkJoinPool-1-worker-1,5,main]第一次执行到computer方法时,新创建了一个left任务,任务计算区间为【1,250】,然后调用left.fork()方法。我们看下fork方法的源码。

 

public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

 在fork方法中,将当前任务(this,任务区间是【1,250】)执行了pushTask方法。我们继续看下pushTask方法的源码,源码中所有的步骤就是一个:将【1,250】任务放到工作线程的任务队列queue中,并执行pool.signalWork();方法唤醒或者新建一个线程去处理该任务。

 

 

 

final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }


 在pool.signalWork();方法中,如果存在等待执行work线程,则UNSAFE.unpark(w);(release a waiting worker)----释放一个等待中的工作线程,唤醒一个;如果没有空闲线程,依然调用的是addWorker();方法。它的作用我们上面说过,就是创建新的工作线程。这里新创建的工作线程是Thread[ForkJoinPool-1-worker-2,5,main],并且在pool中执行了int k = pool.registerWorker(this);注册步骤。


因为【1,250】任务区间仍然没有达到阈值,因此,任务进一步拆分。【1,250】拆分成【1,125】,【125,250】。这里要新建两个工作线程,Thread[ForkJoinPool-1-worker-3,5,main],Thread[ForkJoinPool-1-worker-4,5,main]。这时,在Thread[ForkJoinPool-1-worker-2,5,main]的任务队列queue中存在两个计算任务:【1,125】,【126,250】,见下图。


 

这时线程Thread[ForkJoinPool-1-worker-3,5,main]处理的任务【1,125】经过execTask等一系列方法后,调用computer方法,此时达到阈值,调用for循环开始计算结果,最终得到结果7875,并且设置。同理Thread[ForkJoinPool-1-worker-4,5,main]线程处理的任务【126,250】经过计算得到最后的结果23500。


 

从以上过程我们可以看出,fork方法的核心功能就是任务拆分,直到其子任务的计算区间达到阈值范围。

 

 

 

  • 大小: 56.6 KB
  • 大小: 36.9 KB
  • 大小: 39.6 KB
  • 大小: 49.5 KB
  • 大小: 39.8 KB
  • 大小: 80.4 KB
  • 大小: 78.3 KB
分享到:
评论

相关推荐

    Fork-Join框架演示

    Fork-Join框架是Java 7中`java.util.concurrent`包的一部分,该包提供了高级别的并发工具,旨在简化并行编程。Fork-Join框架尤其专注于细粒度并行任务的执行,允许开发者通过简单的接口实现复杂算法的并行化。 ####...

    java Fork Join框架及使用

    Fork/Join框架是Java7引入的一种用于并行任务执行的框架,它允许将复杂任务拆分成多个子任务,并行执行,然后通过join操作将结果聚合。Fork/Join框架特别适合处理可以递归拆分的计算密集型任务,比如大数据集的搜索...

    Java并发Fork-Join框架原理

    Java并发Fork-Join框架原理是Java7中提供的一种并行执行任务的框架,旨在提高程序的执行效率和性能。该框架的核心思想是将大任务分割成若干个小任务,并将其分配给不同的线程执行,以充分利用多核CPU的计算能力。 ...

    ForkJoin并发框架入门示例

    ForkJoin并发框架是Java 7引入的一种高效并行计算框架,它基于分而治之(Divide and Conquer)的策略,适用于处理大量可分割的任务。这个框架的核心类是`ForkJoinPool`和`ForkJoinTask`,它们为开发者提供了创建和...

    eclipse-collections-forkjoin-7.1.2-API文档-中文版.zip

    赠送jar包:eclipse-collections-forkjoin-7.1.2.jar; 赠送原API文档:eclipse-collections-forkjoin-7.1.2-javadoc.jar; 赠送源代码:eclipse-collections-forkjoin-7.1.2-sources.jar; 赠送Maven依赖信息文件:...

    35 拆分你的任务—学习使用Fork-Join框架.pdf

    在Java并发编程中,Fork/Join框架是一个强大的工具,尤其在处理大量数据时能显著提升性能。这个框架从Java 7开始引入,是ExecutorService的一个实现,它基于分而治之的策略,将大任务分解成多个小任务,然后并行地...

    Java多线程ForkJoinPool实例详解

    Java多线程ForkJoinPool实例详解 Java多线程编程中的ForkJoinPool实例详解是Java 7中...ForkJoinPool是Java多线程编程中的一个高效的并发框架,可以高效地执行任务,并且提供了丰富的API来管理和监控任务的执行过程。

    探索Java并发:Future与ForkJoin框架深度解析

    ForkJoin框架 是Java 7中引入的,旨在进一步提高并发程序的性能。它使用了一种称为“工作窃取”的算法,允许线程动态地重分配任务。ForkJoin的核心思想是将大任务分解为更小的任务,然后并行处理这些任务,最后合并...

    67-ForkJoin框架学习笔记1

    ForkJoin框架是Java并发编程中的一个重要工具,它基于分治策略,旨在高效处理大量数据。框架的核心思想是将一个大型任务分解成多个小型任务,然后通过并行执行这些子任务来提高处理效率。ForkJoin框架在Hadoop ...

    Java中的Fork,Join框架深度解析

    Java的Fork/Join框架是一种用于并行计算的框架,它基于分治法的原理,将大任务分解成小任务并行执行,最后再将结果合并。这种框架特别适合于可以分解为多个子任务且子任务可以并行处理的场景。本文将详细介绍Fork/...

    Java并发Fork and join

    Fork/Join框架是Java并发库中的一部分,自Java 7开始引入,它为开发者提供了一种高效的处理大规模计算任务的方法。这个框架基于分治策略,将大任务分解成若干小任务,然后并行执行这些小任务,最后再将结果合并。...

    基于JDK的ForkJoin构建一个简单易用的并发组件1

    在Java编程中,线程池和并发任务执行是常见的方式,但在某些场景下,我们可以利用JDK的ForkJoin框架来构建更加高效和易用的并发组件。ForkJoin框架自Java 7引入,它为处理大型任务提供了一种分解成多个子任务并行...

    Fork Join框架机制详解.docx

    在Java中,Fork/Join框架主要由`ForkJoinPool`线程池和`ForkJoinTask`任务类组成。 1. `ForkJoinPool`:这是Fork/Join框架的工作线程池。它维护着一组工作线程,用于执行`ForkJoinTask`。线程池中的每个工作线程都...

    java多线程与并发1

    Java多线程与并发是Java开发中的重要领域,尤其在现代高性能应用中,对多核处理器的充分利用和高效系统设计离不开并发技术。本主题主要基于《Java多线程编程核心技术》和《Java+7并发编程实战手册》两本书籍的核心...

    Java ForkJoin框架的原理及用法

    第二步:使用ForkJoinPool进行执行,task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作...

    fork/join 实例

    Fork/Join框架是Java并发处理的一个重要工具,它基于工作窃取算法,设计用于高效地执行并行计算任务。这个框架是Java 7引入的,位于`java.util.concurrent.fork/join`包中,目的是简化多核处理器环境下大规模数据...

    java fork-join框架介绍

    fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

    Go-Golang版的fork-join

    Go-Golang版的Fork-Join框架是一种在Golang中实现的并发编程模型,灵感来源于Java的ForkJoin框架。这个框架的核心理念是通过将大任务分解为小任务,然后并行执行这些小任务,最后合并结果,以提高计算效率。在Golang...

    深入浅出Java多线程.pdf

    - **ForkJoinPool 类**:提供了 Fork/Join 的执行服务,可以自动调整线程数量。 - **ForkJoinTask 类**:Fork/Join 框架中任务的基本类型,支持异步执行和结果获取。 **19. Java 8 Stream 并行计算原理** - **...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段39讲、多线程Active Objects设计模式(接受异步消息的主动对象)-中.mp4 │ 高并发编程第二阶段40讲、多线程Active Objects设计模式(接受异步消息的主动对象)-下.mp4 │ 高并发编程第二阶段41...

Global site tag (gtag.js) - Google Analytics