`
songzi0206
  • 浏览: 159332 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
Group-logo
All are from ...
浏览量:33895
Group-logo
Programming w...
浏览量:19766
社区版块
存档分类
最新评论

AbstractExecutorService任务提交<二>

阅读更多

        submit方法分析完毕,接着看两个invokeAll方法,先看第一个:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

 

 

       第一个for循环,依次把任务包装成RunnableFuture加入到返回结果集List<Future<T>>并提交执行,没有任何阻塞。但是第二个for循环中就会去获取任务执行的结果f.get(),这个方法是会阻塞直到任务执行完。所以invokeAll方法明显不同与submit方法,invokeAll会阻塞主线程。

   整段代码都在try中,并在finally中判断是否done,如果没有(这种情况一般只有当任务为null才会发生),则必须把所有任务都cancel掉,底层会中断所有线程,回收资源。

      关于invokeAll也可以简单测试下,还是使用“AbstractExecutorService任务提交<一>”中的CallableTask:

public static void invokeAllCallableTask()throws Exception{
		ExecutorService executor = Executors.newCachedThreadPool();
		List<CallabelTask> tasks = new ArrayList<CallabelTask>(5);
		for(int i = 0; i < 5; ){
			tasks.add(new CallabelTask("task_"+(++i)));
		}
		List<Future<String>> results = executor.invokeAll(tasks);
		System.out.println("All the tasks have been submited through invokeAll method!");
		executor.shutdownNow();
		for(Future<String> f : results)
			System.out.println(f.get());
	}

       这段代码会在executor.invokeAll(tasks)行阻塞当前线程,直到所有任务都执行完(完成 or 取消 or异常),才会继续往下执行println语句进行打印。这里可以调用executor.shutdownNow()立马关闭线程执行器而不会有问题,因为在执行这一句的时候所有任务肯定已经执行完了。而在“AbstractExecutorService任务提交<一>”submit方法测试中,若使用shutdownNow()会有异常,因为它会去cancel正在执行的任务,很容易直到这个异常便是线程的中断异常了,^_^

 

     关于第二个invokeAll重载方法,逻辑和第一个invokeAll方法是一样的,不同的是这个方法加了时间的限制:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0)
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

       时间限制主要有两部分:

       第一个循环,是依此提交任务阶段,有时间限制。假如有10个任务要提交,那么循环提交的时每次都会检查是否超时,如果提交到第8个发现超时了,那么第9、10两个任务就不会在调用execute了,而是直接返回这批任务;      

       在第二个循环中,这时依此取执行结果阶段,每次也会判断是否超时,一旦发现超时就立马返回这批任务。

       当然如果超时返回,将无法执行后续语句done = true; 这样在finally中就会把所有任务都cancel掉。而客户端(主线程)就会收到CancellationException

 

 

分享到:
评论

相关推荐

    3_AbstractExecutorService源码阅读1

    `ExecutorService`接口是`java.util.concurrent`包的一部分,用于管理和控制线程的执行,提供了一种将任务提交到线程池并管理它们执行的方式。`AbstractExecutorService`为`ExecutorService`接口中的部分方法提供了...

    AbrastractExecutorService

    - **返回**:一个`List&lt;Future&lt;T&gt;&gt;`,其中包含所有已完成任务的结果。 - **invokeAny(Collection&lt;Callable&lt;T&gt;&gt; tasks)** - **描述**:执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果...

    线程池ThreadPoolExecutor

    `shutdown()`方法用于关闭线程池,停止接收新任务,但会继续执行已提交的任务。 `AbstractExecutorService`类是`ExecutorService`的一个抽象实现,它实现了大部分`ExecutorService`的方法。`ThreadPoolExecutor`是...

    线程池java

    new ArrayBlockingQueue&lt;&gt;(1000), // workQueue Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() // handler ); // 提交任务 for (int i = 0; i &lt; 100; i++) { final int index = ...

    Java使用ExecutorService来停止线程服务

    private final Set&lt;Runnable&gt; taskCancelledAtShutdown = Collections.synchronizedSet(new HashSet&lt;Runnable&gt;()); public TrackingExecutor(ExecutorService executorService) { this.executorService = ...

    我的线程池

    - **任务队列(BlockingQueue&lt;Runnable&gt;)**:存放待执行任务的队列,当线程池和核心线程数都达到上限后,新任务将被放入队列等待。 - **线程存活时间(keepAliveTime)**:非核心线程在空闲时的存活时间,超过这个...

    深入理解Java编程线程池的实现原理

    2. 任务提交:当有任务到来时,线程池会创建一个线程来执行任务,如果线程池中的线程数目达到核心池的大小,就会把到达的任务放到缓存队列中。 3. 线程复用:当一个线程执行完一个任务后,不会被销毁,而是可以继续...

    JUC并发编程「公开课第三讲」.pdf

    new LinkedBlockingQueue&lt;&gt;(3), // 工作队列 Executors.defaultThreadFactory(), // 线程工厂 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 ); ``` - **解析**: - **核心线程数** (corePoolSize):...

    美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇.doc

    AbstractExecutorService 抽象类继承 ExecutorService 接口,对 ExecutorService 相关方法提供了默认实现,用 RunnableFuture 的实现类 FutureTask 包装 Runnable 任务,交给 execute() 方法执行,然后可以从该 ...

    java.util.concurrent.uml.pdf

    RejectedExecutionHandler是一个策略接口,当新的任务提交给线程池而线程池无法处理时,会通过这个接口的策略来处理拒绝任务的情况。 ThreadFactory是一个接口,它用来创建新线程。它提供了创建线程的方法,可以...

    徒手实现线程池-1

    ExecutorService 是 Java.util.concurrent 包中的一个接口,它是 Executor 接口的子接口,提供了更丰富的线程管理功能,如提交任务并获取结果、关闭线程池等。 2. **基础线程池**: - **ThreadPoolExecutor**:这...

    Java线程池实战PDF下载 .pdf

    - **ExecutorService接口**:提供了任务提交和管理的基本方法。 - **AbstractExecutorService抽象类**:实现了部分ExecutorService接口的方法,如shutdown等。 - **ThreadPoolExecutor**:具体的线程池实现,负责...

    线程池核心组件源码剖析.docx

    2. **ExecutorService接口**:继承自Executor接口,增加了管理和控制线程池的功能,如`submit()`用于提交任务,`shutdown()`用于关闭线程池,`shutdownNow()`用于尝试停止所有正在执行的任务等。 3. **...

    java线程池源码-java-source:Java源码学习多线程、线程池、集合

    1. 当有新任务提交时,如果当前线程数小于核心线程数,则创建新线程执行任务。 2. 如果线程数等于核心线程数,但工作队列未满,任务会被放入工作队列。 3. 如果线程数等于核心线程数,且工作队列已满,若线程数小于...

    Java并发之线程池Executor框架的深入理解

    创建固定大小的线程池newFixedThreadPool每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。...

    MyThreadPool

    MyThreadPool的实现通常会继承AbstractExecutorService,然后重写execute()方法来提交任务。同时,需要维护一个线程安全的工作队列和一个线程集合,用于存储工作线程。当线程池创建时,会根据用户配置初始化一定数量...

Global site tag (gtag.js) - Google Analytics