`
songzi0206
  • 浏览: 159232 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
Group-logo
All are from ...
浏览量:33877
Group-logo
Programming w...
浏览量:19750
社区版块
存档分类
最新评论

AbstractExecutorService任务提交<三>

阅读更多

    最后来看两个invokeAny方法,这个方法和invokeAll的区别在于,invokeAll会阻塞直到所有任务执行完(完成 or 取消 or异常)才会返回(返回的是所有任务的结果),而invokeAny只需要任何一个方法执行完即返回(返回的时候最先执行完的那个任务的结果)。查看代码发现这两个invokeAny方法都是直接调用doInvokeAny方法实现的 

 

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

 

        一个有超时限制,一个没有。所以主要代码还得看doInvokeAny方法,首先看签名: 

 

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)

 

       第一个参数tasks就是一个Callable任务集,第二个参数表示是否有超时限制,第三个参数表示若有超时限制,则时间限制在nanos纳秒内。

再看具体实现部分: 

 

if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            long lastTime = (timed)? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (InterruptedException ie) {
                        throw ie;
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (Future<T> f : futures)
                f.cancel(true);
        }

 

    直接看主体( try{…}finally{…} )部分, ntasks表示当前还有几个任务没有提交,active表示当前已经提交了的任务数。在try中先提交一个,紧接着是一个无穷循环for(;;),一切就在这个无穷循环中了,分析这个无穷循环,可分两部分:

 

<一> 提交并执行任务,if (f == null)部分:

 

     1) 首先是提交任务,如果还有任务没有提交(即ntasks > 0)则提交一个任务;

  2) 否则就检查是否有线程在执行(即active == 0),如果active==0成立跳出这个无穷循环,这是异常分支,最终会抛出后续代码throw ee;

    3) 若1) 2)两步都不成立了,就开始尝试获取结果:如果有时间限制即timed==true,则通过ecs.poll(nanos, TimeUnit.NANOSECONDS);获取,获取不到就抛TimeoutException;如果没有时间限制则直接ecs.take获取。

 

(Note:这里特别要注意,如果有任务没有提交的话,是不可能执行到2)和3)的,也就是说,一旦执行到3)说明所有任务已经都提交了,这就保证第三步获取的是所有任务中最先完成的那个任务的结果,是第三步可以阻塞获取的原因。)

 

<二> 获取任务,if (f != null)部分  

  进入循环先是提交一个任务(有任务可提交的前提下),紧接着就会检查f是否为null,若部位null,那么就说明已经有任务执行完了,恭喜,不用做额外的任务了。这里有个极端的情况,假如第一个任务(在for循环外第24行提交的)在第一次for循环第29行就获得了结果,那么第二个任务根本就不会提交。同样可以分析,如果在第3次进循环时候获得了结果,那么获得的结果就是前三个任务中最快的结果,后面的任务压根就不执行。

     至于获取结果就简单了,直接通过future.get()获取结果,这个结果可能是正常结果也可能是抛出一个异常。获得正常结果就return,否则在后续代码抛异常。测试例子比较简单就不贴出来了。

 

      最后在finally中取消所有任务。

 

      这里还是需要进一步分析,这边提交任务不再是当前的AbstractExecutorService对象,而是将当前的线程执行器对象封装到ExecutorCompletionService中:

 

ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

        通过这个ecs来提交任务和获取结果,所以再查看ExecutorCompletionService,特别是当前方法中用到的其submit(task), poll(),poll(nanos, TimeUnit.NANOSECONDS),take()方法。从构造方法看,它hold了一个线程执行器Executor就是AbstractExecutorService传过来的this对象,有一个任务完成的队列completionQueueLinkedBlockingQueue对象。仿佛已经很明朗了,不管是take()还是poll(),在获取完成的任务的时候,通过这个阻塞队列获取即可。查看代码果然如此:

 

public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

 

        那任务执行完是如何放到这个阻塞队列的呢?在细看,发现提交任务的时候,这个类中会把RunnableFuture对象进一步包装成QueueingFuture,而再看这个QueueingFuture继承自FutureTask,显然QueueingFuture作为一个FutureTask需要做额外的事情了,还记得上一篇“AbstractExecutorService之异步任务RunnableFuture ”末尾提到:

 

当然你想若是想在cancel时做一些自己的事情,可以通过重写FutureTaskdone()方法,因为在cancel返回之前会调用该方法,这个方法默认不做任何事

 

      这里补充一下,其实除了cancel之外,在执行出现异常进行设置innerSetException、或执行正常结束设置结果值innerSet时,也都会调用done()方法。所以想在任务执行结束后做一点自己的事情,都可以通过重写这个done()方法。查看QueueingFuture代码,果然不出所料:

 

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 

      可以看到,当任务执行完(完成 or 取消 or异常)后调用上面的done方法会把当前任务加入到这个阻塞队列中。这下明朗了,呵呵!

再细看那三个获取第一个执行完任务的方法,completionQueue.poll()会去获取队头对象,若队列空就返回null,这个方法不会阻塞;completionQueue.poll(timeout, unit)会阻塞时间timeout,若这段时间还是一个空队列,则返回nullcompletionQueue.take()会一直阻塞直到队列中有东西可以返回为止。

重新回到AbstractExecutorService. doInvokeAny方法,再次看那个无限循环部分代码,先尝试获取一次结果,这次是非阻塞获取,没有结果就马上提交下一个任务,提交完就本次循环结束。直到所有任务都提交了,循环会进入到阻塞获取结果部分的代码:有时间限制就使用poll(timeout, unit)在允许的时间范围内阻塞,否则就用take()永久阻塞方式获取。 而阻塞的逻辑实现就交给了阻塞队列了。

      这部分也记录完,感觉有点乱,不知道写的是否明白。

分享到:
评论

相关推荐

    3_AbstractExecutorService源码阅读1

    `ExecutorService`接口是`java.util.concurrent`包的一部分,用于管理和控制线程的执行,提供了一种将任务提交到线程池并管理它们执行的方式。`AbstractExecutorService`为`ExecutorService`接口中的部分方法提供了...

    AbrastractExecutorService

    - **返回**:一个`List&lt;Future&lt;T&gt;&gt;`,其中包含所有已完成任务的结果。 - **invokeAny(Collection&lt;Callable&lt;T&gt;&gt; tasks)** - **描述**:执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果...

    线程池ThreadPoolExecutor

    `shutdown()`方法用于关闭线程池,停止接收新任务,但会继续执行已提交的任务。 `AbstractExecutorService`类是`ExecutorService`的一个抽象实现,它实现了大部分`ExecutorService`的方法。`ThreadPoolExecutor`是...

    线程池java

    new ArrayBlockingQueue&lt;&gt;(1000), // workQueue Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() // handler ); // 提交任务 for (int i = 0; i &lt; 100; i++) { final int index = ...

    Java使用ExecutorService来停止线程服务

    private final Set&lt;Runnable&gt; taskCancelledAtShutdown = Collections.synchronizedSet(new HashSet&lt;Runnable&gt;()); public TrackingExecutor(ExecutorService executorService) { this.executorService = ...

    我的线程池

    - **任务队列(BlockingQueue&lt;Runnable&gt;)**:存放待执行任务的队列,当线程池和核心线程数都达到上限后,新任务将被放入队列等待。 - **线程存活时间(keepAliveTime)**:非核心线程在空闲时的存活时间,超过这个...

    深入理解Java编程线程池的实现原理

    2. 任务提交:当有任务到来时,线程池会创建一个线程来执行任务,如果线程池中的线程数目达到核心池的大小,就会把到达的任务放到缓存队列中。 3. 线程复用:当一个线程执行完一个任务后,不会被销毁,而是可以继续...

    美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇.doc

    AbstractExecutorService 抽象类继承 ExecutorService 接口,对 ExecutorService 相关方法提供了默认实现,用 RunnableFuture 的实现类 FutureTask 包装 Runnable 任务,交给 execute() 方法执行,然后可以从该 ...

    java.util.concurrent.uml.pdf

    RejectedExecutionHandler是一个策略接口,当新的任务提交给线程池而线程池无法处理时,会通过这个接口的策略来处理拒绝任务的情况。 ThreadFactory是一个接口,它用来创建新线程。它提供了创建线程的方法,可以...

    徒手实现线程池-1

    ExecutorService 是 Java.util.concurrent 包中的一个接口,它是 Executor 接口的子接口,提供了更丰富的线程管理功能,如提交任务并获取结果、关闭线程池等。 2. **基础线程池**: - **ThreadPoolExecutor**:这...

    Java线程池实战PDF下载 .pdf

    - **ExecutorService接口**:提供了任务提交和管理的基本方法。 - **AbstractExecutorService抽象类**:实现了部分ExecutorService接口的方法,如shutdown等。 - **ThreadPoolExecutor**:具体的线程池实现,负责...

    线程池核心组件源码剖析.docx

    2. **ExecutorService接口**:继承自Executor接口,增加了管理和控制线程池的功能,如`submit()`用于提交任务,`shutdown()`用于关闭线程池,`shutdownNow()`用于尝试停止所有正在执行的任务等。 3. **...

    java线程池源码-java-source:Java源码学习多线程、线程池、集合

    1. 当有新任务提交时,如果当前线程数小于核心线程数,则创建新线程执行任务。 2. 如果线程数等于核心线程数,但工作队列未满,任务会被放入工作队列。 3. 如果线程数等于核心线程数,且工作队列已满,若线程数小于...

    Java并发之线程池Executor框架的深入理解

    创建固定大小的线程池newFixedThreadPool每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。...

    MyThreadPool

    MyThreadPool的实现通常会继承AbstractExecutorService,然后重写execute()方法来提交任务。同时,需要维护一个线程安全的工作队列和一个线程集合,用于存储工作线程。当线程池创建时,会根据用户配置初始化一定数量...

Global site tag (gtag.js) - Google Analytics