`

Java多线程高并发高级篇ForkJoin框架篇(一)---ForkJoinPool解密

 
阅读更多

在JDK1.7中,JDK的并发包下新增了一个框架:ForkJoin框架(它只是一种思想,别一听框架就又懵了)。Fork在英文中是叉子的意思,Join是最后归集的意思,我们可以想象一下,你用叉子(ForkJoin框架)去拿一块牛排,分叉的部分合力将牛排叉住(共同处理任务),把儿的部分(Join)可以让我们用它来拿起牛排放到嘴里(最后的结果)。。。。

 

上面描述了一个过程,我们把这个过程叫做:分而治之。见下图



 

接触过大数据的童鞋应该都知道,分而治之方法是一个非常有效的处理大量数据的方法。大数据中MapReduce就是采用了这种方法,将任务分拆,使用多个节点去处理部分任务,然后再用归集节点处理最后的结果。

所有好的思想都是通用的,它们来源于生活,抽象于生活(都是聪明人,想着怎么高效怎么来)。

 

JDK1.7中提供了一种新的线程池:ForkJoinPool。我们使用前面ThreadPoolExecutor的分析方法,来分析下这个线程池。

一、ForkJoinPool的核心构造函数



 

ForkJoinPool核心构造函数,无一例外的还是最后一个最长的。我们看下它们源码。

public ForkJoinPool() {
        this(Runtime.getRuntime().availableProcessors(),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }
/**
     * Creates a {@code ForkJoinPool} with the given parameters.
     *
     * @param parallelism the parallelism level. For default value,
     * use {@link java.lang.Runtime#availableProcessors}.
     * @param factory the factory for creating new threads. For default value,
     * use {@link #defaultForkJoinWorkerThreadFactory}.
     * @param handler the handler for internal worker threads that
     * terminate due to unrecoverable errors encountered while executing
     * tasks. For default value, use {@code null}.
     * @param asyncMode if true,
     * establishes local first-in-first-out scheduling mode for forked
     * tasks that are never joined. This mode may be more appropriate
     * than default locally stack-based mode in applications in which
     * worker threads only process event-style asynchronous tasks.
     * For default value, use {@code false}.
     * @throws IllegalArgumentException if parallelism less than or
     *         equal to zero, or greater than implementation limit
     * @throws NullPointerException if the factory is null
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        Thread.UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        checkPermission();
        if (factory == null)
            throw new NullPointerException();
        if (parallelism <= 0 || parallelism > MAX_ID)
            throw new IllegalArgumentException();
        this.parallelism = parallelism;
        this.factory = factory;
        this.ueh = handler;
        //设置每个处理线程内任务队列的处理方式,FIFO or LIFO
        this.locallyFifo = asyncMode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
        //子任务的队列初始大小为INITIAL_QUEUE_CAPACITY,容量为8
        this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        // initialize workers array with room for 2*parallelism if possible
        int n = parallelism << 1;
        if (n >= MAX_ID)
            n = MAX_ID;
        else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
        }
        //设置线程池中工作线程初始大小默认为16个,n经过运算后为15
        workers = new ForkJoinWorkerThread[n + 1];
        this.submissionLock = new ReentrantLock();
        this.termination = submissionLock.newCondition();
        StringBuilder sb = new StringBuilder("ForkJoinPool-");
        sb.append(poolNumberGenerator.incrementAndGet());
        sb.append("-worker-");
        this.workerNamePrefix = sb.toString();
    }

 重要参数解释(我们还是结合英文注释来看):

①parallelism:并行度( the parallelism level),默认情况下跟我们机器的cpu个数保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我们机器运行时可用的CPU个数(For default value,use {@link java.lang.Runtime#availableProcessors})

②factory:创建新线程的工厂( the factory for creating new threads)。默认情况下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory( For default value,use {@link #defaultForkJoinWorkerThreadFactory})

 ③handler:线程异常情况下的处理器(Thread.UncaughtExceptionHandler handler),该处理器在线程执行任务时由于某些无法预料到的错误而导致任务线程中断时进行一些处理,默认情况为null。( the handler for internal worker threads that terminate due to unrecoverable errors encountered while executing tasks. For default value, use {@code null}.)

④asyncMode:这个参数要注意,在ForkJoinPool中,每一个工作线程都有一个独立的任务队列,asyncMode表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出FIFO,也可以是后进先出LIFO。如果为true,则线程池中的工作线程则使用先进先出方式进行任务调度,默认情况下是false。( if true,establishes local first-in-first-out scheduling mode for forked tasks that are never joined. This mode may be more appropriate than default locally stack-based mode in applications in which worker threads only process event-style asynchronous tasks. For default value, use {@code false}.)

 

二、核心源码解析

在ForkJoinPool中,我们常会使用到3个方法:submit()方法负责大任务提交,fork()负责将任务分拆,join()负责将任务执行结果归集。

以上三个方法都涉及到了任务,在ForkJoinPool中,任务的类型可以是普遍使用的ForkJoinTask,也可以是RecursiveAction和RecursiveTask<V>这两个ForkJoinTask的子类去执行特定类型(Recursive,递归调用)的任务。RecursiveAction表示没有返回值的任务,RecursiveTask<V>表示携带返回值result的任务(The result of the computation.)。


 我们以一个示例开始深入理解ForkJoinPool的工作原理。

示例:计算1-5000的求和。

package com.sitech.threadPool;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPoolDemo {
	private static final Integer THRESHOLD = 200;
    static class CountTask extends RecursiveTask<Integer> {
		private static final long serialVersionUID = 1L;
		//用start和end分别表示子任务计算开始和结束的值
        private int start;
        private int end;
 
        public CountTask(int start , int end) {
            this.start = start;
            this.end = end;
        }
 
        @Override
        protected Integer compute() {
        	//为了防止子任务划分层次太深而造成系统资源耗尽,需要加一个阈值
            if(end - start < THRESHOLD) {
                System.out.println("子任务区间达到阈值,开始计算的区间:[" + start + "," + end + "]");
                Integer sum = 0;
                for(int i = start ; i <= end  ; i++) {
                    sum += i;
                }
                return sum;
            }else {
            	//否则再进行任务拆分,拆分成两个任务
            	CountTask left = new CountTask(start, (start + end) / 2);
            	System.out.println("左半部分子任务,开始计算的区间:[" + start + "," + (start + end) / 2 + "]");
                left.fork();
                CountTask right = new CountTask((start + end) / 2 + 1 , end);
                System.out.println("右半部分子任务,开始计算的区间:[" + ((start + end) / 2 + 1) + "," + end + "]");
                right.fork();
                //左右半任务结果合并
                return left.join() + right.join();
            }
        }
    }
 
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> taskFuture =  pool.submit(new CountTask(1,5000));
        try {
            Integer result = taskFuture.get();
            System.out.println("result = " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 上述示例是将一个大的计算任务【1,5000】分拆成两个子任务:左半部分子任务【1,2500】和右半部分子任务【2501,5000】。因为在计算时候没有达到阈值,所以继续分拆:

①左半部分子任务【1,2500】继续拆分成【1,1250】和【1251,2500】

②右半部分子任务【2501,5000】继续拆分成【2501,3750】和【3751,5000】

............

一直持续拆分,直到拆分的子任务区间end-start在阈值(200)范围之内,开始使用循环进行计算,得到结果,然后两个左半部分的子任务结果不断join(最后得到【1,2500】的计算结果),右半部分的子任务结果不断join,得到【2501,5000】的计算结果,然后两部分再做一次join,就得到了【1,5000】的计算结果。

 

这里面涉及到4个重要方法,我们分别看下源码:

1、ForkJoinPool中的submit方法,源码如下:

 在submit方法中,我们提交了大任务task,计算区间【1,5000】

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
        return task;
    }

 在submit方法中,调用了forkOrSubmit方法,我们看下执行逻辑。这里有一个判断,首先判断当前线程是不是ForkJoinWorkerThread的实例,我们当前提交大任务的线程是main线程Thread[main,5,main],见程序执行图。所以判断条件不成立,走方法addSubmission(task);

 

 

private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }

 

 forkOrSubmit程序执行图如下:

 

在addSubmission方法中,进行子任务添加,在判断条件if ((q = submissionQueue) != null)中,因为submissionQueue在初始化ForkJoinPool时,在构造函数中就已经赋值( this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];),所以肯定成立。这里要弄明白UNSAFE.putOrderedObject(q, u, t)方法,它的作用是按照顺序将t值(我们提交的任务)按照偏移量u放在q中(submissionQueue).初始情况下queueBase和queueTop都在0位置.这里任务插入,queueTop需要移位+1,表示现在submissionQueue中有queueTop-queueBase个任务(当queueTop到达队列最后一个位置时,这里就要进行扩容,也就是调用方法growSubmissionQueue(),源码可以大家自己看).当任务进入submissionQueue后,就需要唤醒空闲线程进行处理(如果没有就需要新建,这里的线程就是ForkJoinPool中的工作线程:ForkJoinWorkerThread).

 

 

private void addSubmission(ForkJoinTask<?> t) {
        final ReentrantLock lock = this.submissionLock;
        lock.lock();
        try {
            ForkJoinTask<?>[] q; int s, m;
            if ((q = submissionQueue) != null) {    // ignore if queue removed
                long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
                UNSAFE.putOrderedObject(q, u, t);
                queueTop = s + 1;
                if (s - queueBase == m)
                    growSubmissionQueue();
            }
        } finally {
            lock.unlock();
        }
        signalWork();
    }
 我们看下signalWork的源码实现:signalWork的作用就是注释中描述的---- The while condition is true if: (there is are too few total workers OR there is at least one waiter) AND (there are too few active workers OR the pool is terminating). The value of e distinguishes the remaining cases: zero (no waiters)for create, negative if terminating (in which case donothing), else release a waiter.如果条件满足其中之一,工作线程的总数很少或者有至少有一个工作线程等待唤醒(当然前提是线程池没有中断),那么创建一个工作线程或者唤醒一个工作线程(Wakes up or creates a worker.).
    /*     
     * Wakes up or creates a worker.
     */
    final void signalWork() {
        /*
         * The while condition is true if: (there is are too few total
         * workers OR there is at least one waiter) AND (there are too
         * few active workers OR the pool is terminating).  The value
         * of e distinguishes the remaining cases: zero (no waiters)
         * for create, negative if terminating (in which case do
         * nothing), else release a waiter. The secondary checks for
         * release (non-null array etc) can fail if the pool begins
         * terminating after the test, and don't impose any added cost
         * because JVMs must perform null and bounds checks anyway.
         */
        long c; int e, u;
        while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
                (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
            if (e > 0) {                         // release a waiting worker
                int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
                if ((ws = workers) == null ||
                    (i = ~e & SMASK) >= ws.length ||
                    (w = ws[i]) == null)
                    break;
                long nc = (((long)(w.nextWait & E_MASK)) |
                           ((long)(u + UAC_UNIT) << 32));
                if (w.eventCount == e &&
                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                    w.eventCount = (e + EC_UNIT) & E_MASK;
                    if (w.parked)
                        UNSAFE.unpark(w);
                    break;
                }
            }
            else if (UNSAFE.compareAndSwapLong
                     (this, ctlOffset, c,
                      (long)(((u + UTC_UNIT) & UTC_MASK) |
                             ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
                addWorker();
                break;
            }
        }
    }
 当然,我们现在还没有工作线程,因此需要创建一个,所以执行步骤走到了addWorker()方法。我们看下addWorker方法的源码,它的作用是在当前线程池对象上(this指java.util.concurrent.ForkJoinPool@1aba308[Running, parallelism = 4, size = 1, active = 1, running = 1, steals = 0, tasks = 0, submissions = 1])创建一个工作线程(注意:ForkJoinWorkerThread继承了Thread),表明这个工作线程工作在这个线程池中(the pool this thread works in)。
 
private void addWorker() {
        Throwable ex = null;
        ForkJoinWorkerThread t = null;
        try {
            t = factory.newThread(this);
        } catch (Throwable e) {
            ex = e;
        }
        if (t == null) {  // null or exceptional factory return
            long c;       // adjust counts
            do {} while (!UNSAFE.compareAndSwapLong
                         (this, ctlOffset, c = ctl,
                          (((c - AC_UNIT) & AC_MASK) |
                           ((c - TC_UNIT) & TC_MASK) |
                           (c & ~(AC_MASK|TC_MASK)))));
            // Propagate exception if originating from an external caller
            if (!tryTerminate(false) && ex != null &&
                !(Thread.currentThread() instanceof ForkJoinWorkerThread))
                UNSAFE.throwException(ex);
        }
        else
            t.start();
    }
 创建工作线程执行的关键方法源码如下:在该方法中,需要把创建好的工作线程进行注册登记,执行方法int k = pool.registerWorker(this)(给线程登记注册,放在works工作线程数组中,并得到下一个工作线程相关内容,注:统一的默认前缀为ForkJoinPool-x-worker-,x为第几个)这里要注意,创建的线程为守护线程,见程序图:
 /**
     * Creates a ForkJoinWorkerThread operating in the given pool.
     *
     * @param pool the pool this thread works in
     * @throws NullPointerException if pool is null
     */
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool.nextWorkerName());
        this.pool = pool;
        int k = pool.registerWorker(this);
        poolIndex = k;
        eventCount = ~k & SMASK; // clear wait count
        locallyFifo = pool.locallyFifo;
        Thread.UncaughtExceptionHandler ueh = pool.ueh;
        if (ueh != null)
            setUncaughtExceptionHandler(ueh);
        setDaemon(true);
    }
 程序图:

 创建完线程后,t不为空,那么addWorker()方法执行后,新创建的工作线程就启动了(t.start()).

 2、在工作线程启动后,大任务【1,5000】计算就开始执行computer()方法了。在computer()方法中,我们做了任务分拆,这里就要说到fork()方法了。
  • 大小: 4.4 KB
  • 大小: 14.4 KB
  • 大小: 56.6 KB
  • 大小: 42.5 KB
  • 大小: 53.2 KB
  • 大小: 61.7 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics