`
jameswxx
  • 浏览: 776505 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ThreadPoolExecutor相关类的分析

    博客分类:
  • java
阅读更多

 一:ThreadPoolExecutor  

        从 Java 5 开始,Java 提供了自己的线程池。线程池就是一个线程的容器,每次只执行额定数量的线程java.util.concurrent.ThreadPoolExecutor 就是这样的线程池。它很灵活,但使用起来也比较复杂。 首先是构造函数。以最简单的构造函数为例:

public ThreadPoolExecutor(   

int corePoolSize,   

int maximumPoolSize,   

long keepAliveTime,   

TimeUnit unit,   

BlockingQueue<Runnable> workQueue)    

看起来挺复杂的。这里介绍一下。

corePoolSize 指的是保留的线程池大小。

maximumPoolSize 指的是线程池的最大大小。

keepAliveTime 指的是空闲线程结束的超时时间。

unit 是一个枚举,表示 keepAliveTime 的单位。

workQueue 表示存放任务的队列。

    可以从线程池的工作过程中了解这些参数的意义。线程池的工作过程如下:

1 、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。

2 、当调用 execute() 方法添加一个任务时,线程池会做如下判断:

 a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;

 b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。

 c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;

 d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

3 、当一个线程完成任务时,它会从队列中取下一个任务来执行。

4 、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

    这样的过程说明,并不是先加入任务就一定会先执行。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 4~13 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。下面是一个线程池使用的例子:

public static void main(String[] args) {   

  BlockingQueue<Runnable> queue = new   LinkedBlockingQueue<Runnable>();   

  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS,   queue);   

for (int i = 0; i < 20; i++) {   

executor.execute(new Runnable() {   

  public void run() {   

    try {   

         Thread.sleep(1000);   

    } catch (InterruptedException e) {   

    e.printStackTrace();   

    }   

      System.out.println(String.format("thread %d finished", this.hashCode()));   

 }   

});   

}    

executor.shutdown();   

}  

对这个例子的说明如下:

1、BlockingQueue 只是一个接口,常用的实现类有 LinkedBlockingQueue 和 ArrayBlockingQueue。用 LinkedBlockingQueue 的好处在于没有大小限制。这样的话,因为队列不会满,所以 execute() 不会抛出异常,而线程池中运行的线程数也永远不会超过 corePoolSize 个,keepAliveTime 参数也就没有意义了。

2、shutdown() 方法不会阻塞。调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。

   到这里对于这个线程池还只是介绍了一小部分。ThreadPoolExecutor 具有很强的可扩展性,不过扩展它的前提是要熟悉它的工作方式。 java.util.concurrent.ThreadPoolExecutor 类提供了丰富的可扩展性。你可以通过创建它的子类来自定义它的行为。例如,我希望当每个任务结束之后打印一条消息,但我又无法修改任务对象,那么我可以这样写: 除了 afterExecute 方法之外,ThreadPoolExecutor 类还有 beforeExecute() 和 terminated() 方法可以重写,分别是在任务执行之前和整个线程池停止之后执行。

   除了可以添加任务执行前后的动作之外, ThreadPoolExecutor 还允许你自定义当添加任务失败后的执行策略。你可以调用线程池的 setRejectedExecutionHandler() 方法,用自定义的 RejectedExecutionHandler 对象替换现有的策略。 ThreadPoolExecutor 提供 4 个现有的策略,分别是:

ThreadPoolExecutor.AbortPolicy:表示拒绝任务并抛出异常

ThreadPoolExecutor.DiscardPolicy:表示拒绝任务但不做任何动作

ThreadPoolExecutor.CallerRunsPolicy:表示拒绝任务,并在调用者的线程中直接执行该任务

ThreadPoolExecutor.DiscardOldestPolicy:表示先丢弃任务队列中的第一个任务,然后把这个任务加进队列。

这里是一个例子:

ThreadPoolExecutor executor = new ThreadPoolExecutor(size, maxSize, 1, TimeUnit.DAYS, queue);

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

除此之外,你也可以通过实现 RejectedExecutionHandler 接口来编写自己的策略。下面是一个例子:

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.SECONDS, queue,
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(String.format("Task %d rejected.", r.hashCode()));
}
}

);



 二:ThreadPoolExecutor 实现的接口 Executer

          看jdk的javadoc,ThreadPoolExecutor实现了两个接口:Executer,ExecuterService,注意, ExecuterService继承了 Executer, 看看javadoc对 Executer的描述

public interface Executor

执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用

new Thread(new(RunnableTask())).start():
 Executor executor = anExecutor;
 executor.execute(new RunnableTask1());
 executor.execute(new RunnableTask2());

 ...

 不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:
 class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
 class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }

        许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。
 class SerialExecutor implements Executor {
     final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
     final Executor executor;
     Runnable active;
     SerialExecutor(Executor executor) {
         this.executor = executor;
     }
     public synchronized void execute(final Runnable r) {
         tasks.offer(new Runnable() {
             public void run() {
                 try {
                     r.run();
                 } finally {
                     scheduleNext();
                 }
             }
         });
         if (active == null) {
             scheduleNext();
         }
     }

     protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
             executor.execute(active);
         }
     }
 }

            此包中提供的 Executor 实现实现了 ExecutorService,这是一个使用更广泛的接口。ThreadPoolExecutor 类提供一个可扩展的线程池实现。Executors 类为这些 Executor 提供了便捷的工厂方法。



三:ThreadPoolExecutor 实现的接口 ExecutorService

public interface ExecutorService extends Executor

          ExecutorService 提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。
       通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。 Executors 类提供了用于此包中所提供的执行程序服务的工厂方法。


用法示例
下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool(int) 工厂方法:
 class NetworkService implements Runnable {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public NetworkService(int port, int poolSize)
        throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }
    public void run() { // run the service
      try {
        for (;;) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }
  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // read and service request on socket
    }
 }

 下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:
 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

 内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作,后者依次 happen-before 通过 Future.get() 获取的结果。

 

 

四:异步任务 Future接口

        一般情况下,调用线程池的execute方法执行的任务都是不需要返回结果的,但是在某些情况下,我们可能需要执行异步的任务,就是提交一个任务,然后我做自己的事情,不管它,一会儿我再查看这个任务返回的结果,那么这个时候,就要用到异步任务,异步任务都实现了Future接口。

        它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
用法示例(注意,下列各类都是构造好的。)
 interface ArchiveSearcher { String search(String target); }
 class App {
   ExecutorService executor = ...
   ArchiveSearcher searcher = ...
   void showSearch(final String target)
       throws InterruptedException {
     Future<String> future
       = executor.submit(new Callable<String>() {
         public String call() {
             return searcher.search(target);
         }});
     displayOtherThings(); // do other things while searching
     try {
       displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
 }

 FutureTask 类是 Future 的一个实现,并且FutureTask 实现 了Runnable接口,所以可通过 Executor 来执行。例如,可用下列内容替换上面带有 submit 的构造:
     FutureTask<String> future =
       new FutureTask<String>(new Callable<String>() {
         public String call() {
           return searcher.search(target);
       }});
     executor.execute(future);

        String result=future.get();

     System.out.println("result is:"+result);

分享到:
评论

相关推荐

    高并发之——通过源码深度解析ThreadPoolExecutor类是如何保证线程池正确运行的

    对于线程池的核心类ThreadPoolExecutor来说,有哪些重要的属性和内部类为线程池的正确运行提供重要的保障呢? ThreadPoolExecutor类中的重要属性 在ThreadPoolExecutor类中,存在几个非常重要的属性和方法,接下来,...

    Java线程池与ThreadPoolExecutor.pdf

    线程池通过ThreadPoolExecutor类实现,这是一个高度可配置的工具,能够根据具体需求定制线程的创建、管理和销毁策略。 ThreadPoolExecutor的核心参数包括: 1. corePoolSize:核心线程数,这是线程池在非繁忙状态下...

    并发容器——BlockingQueue相关类

    分析这个测试文件可以帮助我们更好地理解和应用`BlockingQueue`。实际操作中,我们可以通过监控和调优这些代码来优化并发性能,比如调整队列大小,选择适合的实现类,或者优化生产者和消费者的处理速度。

    多线程相关

    本篇将详细探讨多线程相关的知识点,结合给定的标签"源码"和"工具",我们将讨论多线程的基本原理、在实际开发中的应用以及相关工具的使用。 一、多线程基本概念 1. 线程:线程是操作系统调度的基本单位,每个线程都...

    深入理解高并发编程-核心技术原理

    通过分析如ThreadPoolExecutor等关键类的源码,读者可以理解线程池的创建和执行机制,以及线程的管理方式。 **线程与线程池**部分介绍了操作系统中的线程概念,指出线程是CPU调度的基本单位,而多线程则能充分利用...

    安卓开发框架工具类相关-工具类线程池日志自定义的控件程序崩溃捕捉处.rar

    Java中的ExecutorService和ThreadPoolExecutor可以创建线程池,通过合理调度任务,避免过多线程导致资源浪费和系统不稳定。使用线程池可以控制并发数量,优化内存管理,提高性能。 3. **日志(Log)**: 日志是...

    并发编程、juc工具包源码分析笔记

    synchronized 关键字、ReentrantLock 等)、并发容器(如 ConcurrentHashMap、ConcurrentLinkedQueue 等)、原子变量(AtomicInteger、AtomicReference 等)、线程池(ThreadPoolExecutor 和其相关类)以及线程间的...

    源码深度分析线程池中Worker线程的执行流程

    首先,Worker类是ThreadPoolExecutor的一个内部类,它继承了AbstractQueuedSynchronizer(AQS),这是一个抽象的同步队列,用于实现锁和其他同步组件的基础框架。同时,Worker也实现了Runnable接口,这意味着它是一...

    Java并发包源码分析(JDK1.8)

    AQS相关应用(CountDownLatch、CyclicBarrier、Semaphore等),executor(ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask等),collection(ConcurrentHashMap、CopyOnWriteArrayList等), 对每个类的...

    简易chat room作业

    3. GUI界面编程:由于文件中出现了`JPanel`、`Timer`、`ActionListener`等GUI相关类的引用,这说明编程作业中包括了图形用户界面的设计与实现。用户界面可能是聊天室的可视部分,例如显示消息的窗口和聊天输入区域。...

    net.rar_地图 _自动类

    在Java中,`java.util.concurrent.ExecutorService`和`java.util.concurrent.ThreadPoolExecutor`是实现线程池的主要类。开发者可以自定义线程池的大小、任务队列、拒绝策略等参数,以适应不同的性能需求。 地图...

    JDK 性能

    本文将深入探讨JDK性能相关的知识,主要涉及源码分析和工具使用。 首先,了解JDK的源码对于性能优化至关重要。通过阅读和理解JDK源码,我们可以得知Java内部的运行机制,从而找到性能瓶颈并进行针对性优化。例如,...

    java工具类集合

    - 这个包包含了许多并发编程相关的工具类,如`ExecutorService`, `Semaphore`, `CountDownLatch`, `CyclicBarrier`等,以及线程池`ThreadPoolExecutor`。 7. `java.util.Random`: - 提供随机数生成的功能,广泛...

    java技术指南

    对于I/O操作,文档讨论了java.io包中的基本I/O类,比如用于读写数据的InputStream和OutputStream类、Reader和Writer类,以及序列化和反序列化的相关类。 文档还介绍了java.math包下的BigInteger和BigDecimal类,这...

    Java28个相关包

    18. **java.applet**:Applet小程序相关类,不过现在由于浏览器安全策略,Applet已逐渐被淘汰。 19. **java.rmi**:远程方法调用(Remote Method Invocation),用于分布式计算。 20. **java.awt.image**:图像...

    28个java常用的工具类源码

    12. **ExecutorService 和 ThreadPoolExecutor**: Java并发编程的重要工具,用于管理线程池,提高系统资源利用率。 13. **Optional**: 自Java 8引入,用于表示可能为null的对象,有助于避免空指针异常。 14. **...

    线程池java写的代码

    `ExecutorService`是线程池的主要接口,而`ThreadPoolExecutor`是其主要的实现类,提供了创建、管理和关闭线程池的方法。 ### 2. ThreadPoolExecutor构造参数 `ThreadPoolExecutor`的构造函数接收五个参数: - `...

    java.util.concurrent 测试源文件

    3. **线程池相关类**:除了ThreadPoolExecutor,还有ScheduledThreadPoolExecutor,用于定时或周期性执行任务。WorkStealingPool是ForkJoinPool的一种形式,适合大规模并行计算。 4. **并发集合**:JUC提供了线程...

    线程池相关详解及总结.doc

    在Java中,线程池主要由`java.util.concurrent`包下的`ThreadPoolExecutor`类实现。 `ThreadPoolExecutor`的构造方法接受五个参数,用于定义线程池的行为: 1. `corePoolSize`: 初始化时创建的线程数。 2. `...

    java reflect

    在深入理解Java反射的同时,了解并发处理和线程池的内部机制也是非常有益的,因为这两个主题都与程序的性能和可扩展性密切相关。 总的来说,Java反射提供了强大的能力,让我们能够在运行时动态地操控代码,但同时也...

Global site tag (gtag.js) - Google Analytics