`

原码剖析之ThreadPoolExecutor入门

阅读更多
jdk 1.5 开始提供支持线程池的功能。

线程池使用的场景:创建线程的时间和资源耗费较高,线程执行时间较短。
优点:
1. 这样使用线程池可以避免多次创建耗费巨大的线程,去完成一个较小的任务
2. 复用线程,减低系统的资源浪费。
3. 另外就是线程已经创建好等待任务的执行,那么相应性也会大大提高。
4. 通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。保障系统的稳定运行。


注意点:
1、线程池的大小对于使用者非常关键,比如对于多并发请求较为频繁的场景,有可能因为线程池的大小导致请求的阻塞。
2、线程池可以让任务的提交和执行隔离开来,达到异步的效果。(核心:FutureTask)
3、线程池维护者任务执行的相关信息,如果执行已经执行完成的数量。
4、以及异常和任务队列满时的丢弃策略。
5、不要对那些同步等待其它任务结果的任务排队,这可能造成死锁。

java线程池类库中关键类的继承关系类图:




核心接口和类的介绍:

1、顶层接口 Executor。

执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。

Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务
class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
}

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程
class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
}

此包中提供的 Executor 实现实现了 ExecutorService,这是一个使用更广泛的接口。ThreadPoolExecutor 类提供一个可扩展的线程池实现。Executors 类为这些 Executor 提供了便捷的工厂方法。

内存一致性效果:线程中将 Runnable 对象提交到 Executor 之前的操作 happen-before 其执行开始(可能在另一个线程中)。


public interface Executor {
    /**
     *
     * 执行提交的command命令,command 可能在一个新线程中执行,也可以在池中执行,或者在当前调用的线程,详细参考Executor的实现
     * @param command the runnable task
     * @throws RejectedExecutionException 如任务不能被接受,会抛出异常
     */
    void execute(Runnable command);
}



2、执行器通用服务接口:ExecutorService。

1、是一个实现了可以管理线程池功能的Executor 。
2、提供了可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。
3、可以关闭 ExecutorService,这将导致其拒绝新任务shutdown()
4、通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。

另外:Executors 类提供了用于此包中所提供的执行程序服务的工厂方法。

内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作,后者依次 happen-before 通过 Future.get() 获取的结果。

public interface ExecutorService extends Executor {
    /**
     * 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。 
     */
    void shutdown();

    /**
      试图关闭所有活动的正在执行的任务,停止等待认为的处理,并返回等待执行的任务列表。 
     *无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。
      例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。
     */
    List<Runnable> shutdownNow();

    /**
     * 返回executor 是否已经shutdown
     */
    boolean isShutdown();

    /**
      如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。 
     */
    boolean isTerminated();

    
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    /**
     *提交一个返回值的任务用于执行,返回一个表示任务执行未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 
      如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式的构造。 
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     *提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。 
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。 
     */
    Future<?> submit(Runnable task);

    /**
     * 批量执行任务。返回列表的所有元素的 Future.isDone() 为 true
     */

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     *在特定的时间内执行tasks任务列表,当中断 过期等,即取消尚未完成的任务
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     同上
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}


3、执行器服务的默认实现:AbstractExecutorService。
此类提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。例如,submit(Runnable) 的实现创建了一个关联 RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现。

AbstractExecutorService 对callable 和 runnable 通过适配器做统一管理。
相关类图如下:



原码分析如下:
public abstract class AbstractExecutorService implements ExecutorService {

    /**
     * 为给定可运行任务和默认值返回一个 RunnableFuture。 
     *
     * @param runnable 将被包装的可运行任务
     * @param 用于所返回的将来任务的默认值
     * @return 在运行的时候,它将运行底层可运行任务,作为 Future 任务,它将生成给定值作为其结果,并为底层任务提供取消操作。
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
     * 同上
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Object> ftask = newTaskFor(task, null); //注意:如果提交的是runnable 的任务,默认值返回的是null
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result); //可以提供返回的默认值result
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask); //执行任务
        return ftask;
    }

		/** 调用所有任务
		执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
		注意,可以正常地或通过抛出异常来终止已完成 任务。如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。 
		*/
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); //定义返回的Futrue list
        boolean done = false; //标志执行是否结束
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f); //执行任务!!
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) { //如果任务没有执行完,那么f.get()将强制进行执行完成。
                    try {
                        f.get();
                    } catch (CancellationException ignore) { //注意:如果此次抛出异常,那么返回的list列表里面将不能保证所有的任务都是完成的!!!
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done) //如果未执行完发生意外,那么所有的任务进行取消!
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

		/** 同上,有过期限制*/
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime; //nanos = nanos - 两次执行之间的时间间隔
                lastTime = now; //更改上一次执行的时间
                if (nanos <= 0) // nanos <=0 代表 已经超时
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

}


  • 大小: 30.1 KB
  • 大小: 45.2 KB
分享到:
评论

相关推荐

    STL原码剖析(PDF完整版)

    STL原码剖析(PDF完整版).part2.rar

    STL原码剖析

    STL原码剖析,源码面前,了无秘密,可以深入了解STL中容器的设计思想

    STL原码剖析-简体版.pdf

    STL原码剖析-简体版.pdf 注意是简体版本

    STL原码剖析 pdf

    《STL原码剖析》是一本深度探讨标准模板库(Standard Template Library,简称STL)实现原理的专业书籍。STL是C++编程语言中的一个重要组成部分,它提供了高效、灵活的容器、迭代器、算法和函数对象,极大地提升了C++...

    java入门很简单原码

    "java入门很简单原码"这个标题暗示了我们将探讨如何轻松地开始学习Java编程,通过理解并实践源代码来掌握基础概念。 在Java入门阶段,首先要了解的是Java的基础语法,包括变量、数据类型、运算符、流程控制语句(如...

    STL原码剖析电子书

    学习STL的很好的入门教材,华中科技大学出版,侯捷著。

    STL原码剖析<庖丁解牛>

    总的来说,《STL原码剖析&lt;庖丁解牛&gt;》是一本深度揭秘STL的权威之作,无论是初学者还是经验丰富的开发者,都能从中获益匪浅。通过阅读这本书,读者不仅能掌握STL的基本使用,还能深入理解其背后的思维方式和编程艺术...

    模拟一位原码乘法

    本文通过对给定的代码片段进行了详细分析,介绍了如何实现一位原码乘法的过程。通过了解这些基础知识,读者可以更好地理解计算机内部是如何执行乘法运算的。此外,熟悉这些底层实现有助于加深对计算机体系结构的理解...

    ArcEngine 开发入门原码

    【ArcEngine开发入门原码】是GIS(地理信息系统)领域中的一个重要学习资源,它主要针对的是C#编程语言的开发者。ArcEngine是Esri公司提供的一个强大的开发平台,用于构建地图应用和地理处理解决方案。本资源的核心...

    ARM Linux内核原码剖析

    本书是多位作者在3年Liunx内核分析经验和庞大资料基础上写成的,收录了其他同类书未曾讲解的内容并进行逐行分析,一扫当前市场中其他理论书带给读者的郁闷。书中详细的代码分析与大量插图能够使读者对Linux内核及ARM...

    原码一位乘法器——组成原理课程设计

    原码一位乘法器是计算机组成原理课程设计的重要组成部分,它们之间的相乘结果的符号为相乘两数符号的异或值,而数值则为两数绝对值之积。本文将讲解原码一位乘法器的设计原理和实现方法。 原码一位乘法器的设计原理...

    Visual c#入门经典(原码)

    Visual c#入门经典(原码)

    snort原码分析还不错的哦!

    snort原码分析,还不错的哦! Snort作为一个轻量级的网络入侵检测系统,在实际中应用可能会有些力不从心,但如果想了解研究IDS的工作原理,仔细研究一下它的源码到是非常不错.首先对snort做一个概括的评论。 从工作...

    STL原码剖析中文版

    学习编程的人都知道,阅读、剖析名家代码乃是提高水平的捷径。源码之前,了无秘密。大师们的缜密思维、经验结晶、技术思路、独到风格,都原原本本体现在源码之中。 这本书所呈现的源码,使读者看到vector的实现、...

    STL原码剖析.zip

    学习编程的人都知道,阅读、剖析名家代码乃是提高水平的捷径。源码之前,了无秘密。大师们的缜密思维、经验结晶、技术思路、独到风格,都原原本本体现在源码之中。这本书所呈现的源码,使读者看到vector的实现、list...

    原码反码补码讲课.pptx

    原码反码补码讲课 计算机中的数值表示是计算机基础知识的重要组成部分。在计算机中,数值可以分为整数和实数两大类。整数又可以分为无符号整数和带符号整数。无符号整数是指不带符号的整数,而带符号整数则是带有...

    原码补码转换的matlab程序

    将原码转换成补码,再将补码转成原码的matlab程序

    计算机基础知识:原码反码补码练习(含答案)

    原码、反码和补码是二进制表示正负数的关键概念,它们主要用于无符号整数和有符号整数的表示。以下是对这些知识点的详细解释: 1. **原码**:原码是最直观的二进制表示,其中最高位(称为符号位)为0表示正数,为1...

Global site tag (gtag.js) - Google Analytics