`

Java Concurrent之Executor

 
阅读更多
JDK5-Executor框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。以往我们通常会

  • 将所有的任务交给一个线程,串行执行 缺点:糟糕的吞吐量 和响应速度
  • 将每个任务分配一个线程执行 缺点:资源管理复杂性 线程的频繁建立与销毁过于销毁资源

为了能够更好地控制多线程,JDK5之后提供了(java.util.concurrent)一种灵活的线程池实现作为Executor框架的一部分。任务执行的主要抽象不是 Thread而是Executor。

什么是线程池?

线程池,从字面上看,是指管理一组同构工作线程的资源池。线程池与工作队列(Work Queue)密切相关。工作线程(Work Thread)的任务很简单: 从工作队列中获取任务执行,然后返回线程池等待获取下一个任务。它比“每个任务分配一个线程”更有优势:

  • 线程池在处理多个请求时,重用线程而不是创建线程 从而很好的将多线程的创建与销毁的巨大资源消耗进行分摊
  • 线程重用而不是重新创建,在请求到来时就不用创建线程,大大提高了程序的响应性
  • 通过适当地调整线程池大小,可以创建足够多的线程以便使处理器处于忙碌状态,同时还可以防止过多的线程相互竞争资源而使应用程序内存耗尽。

JDK5类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

public class Executors {
 
    /**
     * 创建一个固定长度的线程池,每提交一个任务时就创建一个线程,直到达到线程池最大数量,这时线程池规模将不再
     * 变化(若某个线程发生了未预期的异常,则线程池将会补充一个新的线程)
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if nThreads <= 0
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
    /**
     * 单线程的Executor,它创建单个工作线程来执行任务,若这个线程异常结束,会创建另一个线程来替代。
     * 能保证依照任务在任务队列中的顺序来串行执行
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
    /**
     * 创建一个可缓存的线程池,若线程池的当前规模超过了处理需求时,则将回收空闲的线程,
     * 当需求增加时,则可以添加新的线程,
     * 线程池的规模不受任何限制
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
    /**
     * 创建一个固定长度的线程池而且以延迟或定时的方式来执行任务,类似于Timer
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle.
     * @param threadFactory the factory to use when the executor
     * creates a new thread.
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if corePoolSize < 0
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

 测试代码:

   /**
 * 
 * @author Sonicery_D
 */
public class ConcurrentTest {
 
    @Test
    public void test01(){
        Executor threadPool = null;
        /*
         *创建一个固定大小为20的线程池 
         */
        threadPool = Executors.newFixedThreadPool(20);
        /*
         * 创建大小不受限制的可缓存的线程池
         */
        threadPool = Executors.newCachedThreadPool();
        /*
         * 创建一个单工作线程的线程池
         */
        threadPool = Executors.newSingleThreadExecutor();
        /*
         * 创建一个固定长度的线程池而且以延迟或定时的方式来执行任务的线程池
         */
        threadPool = Executors.newScheduledThreadPool(20);
        /*
         * 每个创建线程池的方法都有ThreadFactory的重载
         */
        threadPool = Executors.newFixedThreadPool(20, new ThreadFactory(){
            //控制线程产生的细节操作
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);//设置为守护线程
                thread.setPriority(Thread.MAX_PRIORITY);//设置最大优先级
                return thread;
            }
        });
        /*
         * 提交一个任务
         */
        threadPool.execute(new Runnable(){
 
            @Override
            public void run() {
                // doSomething...
            }
            
        });
        /*我们一般使用ExecutorService,方便我们更好地操作线程池*/
        ExecutorService executorService = (ExecutorService)threadPool;
        /*
         * 提交有返回值的任务
         */
        Future result = executorService.submit(new Callable(){
            @Override
            public String call() throws Exception {
                // to do something...
                return null;
            }
        });
        /*
         * 温柔地关闭线程池
         */
        executorService.shutdown();
        /*
         * 粗暴的关闭线程池
         */
        executorService.shutdownNow();
        
    }
}

 通过源码我们可以看出,Executors类中的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool的默认实现都是:

/**
     *
     * @param corePoolSize 指定线程池中线程数量 包括空闲线程
     * @param maximumPoolSize 指定线程池允许最大线程数量
     * @param keepAliveTime  超过corePoolSize 多余的空闲线程的最大存活时间
     * @param unit keepAliveTime的时间单位 
     * @param workQueue 任务队列,被提交但未执行的任务存放容器。
     * @param threadFactory  用于生产线程池中的线程,可以控制线程产生的具体细节
     * @throws IllegalArgumentException if corePoolSize or
     * keepAliveTime less than zero, or if maximumPoolSize less than or
     * equal to zero, or if corePoolSize greater than maximumPoolSize.
     * @throws NullPointerException if workQueue is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue
                              ThreadFactory threadFactory) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
     /**
     * @param handler 拒绝策略,当任务执行由于线程数量超过限制或者任务队列达到最大限度而造成阻塞 时的拒绝策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

 RejectedExecutionHandler handler: handler拒绝策略,当任务的数量达到线程池的实际负载能力时,该如何处理提交的任务。 JDK内置了4中拒绝策略:

  • AbortPolicy策略: 该策略会直接抛出异常
  • CallerRunsPolicy策略: 只要线程池未关闭,该策略直接在调用者线程中运行当前被放弃任务
  • DiscardOledestPolicy策略: 该策略丢弃最老的一个请求,即即将被执行的任务,并尝试再提交当前任务
  • DiscardPolicy策略: 该策略丢弃无法处理的任务,不做任何处理

所有拒绝的策略都继承了RejectedExecutionHandler,用户可以通过继承RejectedExecutionHandler扩展自己自定义的拒绝策略

ThreadPoolExecutor内部机制
  • 当线程池中的线程数量小于corePoolSize时,新建线程处理任务
  • 当线程池中的线程数量等于corePoolSize时,将任务放入workQueue中,线程池中的空闲线程从workQueue中取任务执行
  • 当workQueue中放不下新来的任务请求时,创建新线程处理;若线程池中的线程数量大于maximumPoolSize时,使用RejectedExecutionHandler 来做拒绝处理。
  • 当线程池中的线程池数量大于corePoolSize时,多余的线程的存活时间最大为keepAliveTime,若无任务可处理则自行销毁

其内部结构如下: 



 

  • 大小: 24.4 KB
分享到:
评论

相关推荐

    java concurrent 包 详细解析

    2. **Executor框架**:`java.util.concurrent.Executor`是执行任务的核心接口,它定义了运行任务的方法。`ExecutorService`是Executor的一个子接口,提供了管理和控制执行器的额外功能,如`shutdown()`用于关闭执行...

    java的concurrent用法详解

    `Executor`框架是`java.util.concurrent`的核心组件之一,它为任务的执行提供了一个统一的接口。其中最重要的接口是`ExecutorService`,它定义了线程池的行为,使得我们可以将任务(`Runnable`或`Callable`对象)...

    java.util.concurrent

    java.util.concurrent总体概览图。 收取资源分3分。...java.util.concurrent主要包括5个部分executor,colletions,locks,atomic,tools。 该图详细的列举了并发包下面的结构,包含所有接口和具体实现类。

    java.util.concurrent 学习ppt

    Java.util.concurrent是Java 5.0引入的一个重要包,它为多线程编程提供了一组高级并发工具。这个包的设计者是Doug Lea,它的出现是JSR-166的一部分,也被称作Tiger更新。Java.util.concurrent的引入是为了解决传统...

    使用Java并发编程Concurrent Programming Using Java

    - **ExecutorService**:这是Java并发编程中最核心的接口之一,它提供了更高级的线程池管理功能。 - **Future与Callable**:`Future`代表异步计算的结果,`Callable`是一个可以返回结果的接口,常用于与`...

    java_util_concurrent中文版pdf

    在Java并发编程中,`java.util.concurrent`(简称JUC)提供了丰富的类和接口,如Executor框架、线程池、并发集合、同步工具类等。这些工具使得程序员能够更方便地管理线程,避免了传统的锁和同步机制带来的复杂性和...

    Java-Executor并发框架.docx

    Java并发框架中的Executor服务是Java 1.5引入的核心组件,位于`java.util.concurrent`包下,极大地简化了多线程编程。Executor接口虽然历史悠久,但其重要性不言而喻,很多开发者对其背后的原理并不十分了解。本文将...

    Doug Lea, Concurrent Programming in Java Design Principles and Patterns

    4. **Java并发工具**:Doug Lea详细介绍了java.util.concurrent包中的各种工具类,如Executor框架、Semaphore信号量、CountDownLatch倒计时器、CyclicBarrier回环栅栏和Future异步计算接口。这些工具极大地简化了...

    java.util.concurrent 测试源文件

    Java.util.concurrent(JUC)是Java平台中的一个核心包,专门用于处理多线程并发问题。这个包包含了大量的工具类和接口,极大地简化了并发编程的复杂性,提高了程序的性能和可伸缩性。本测试源文件主要是针对JUC并发...

    java.util.concurrent 实现线程池队列

    `java.util.concurrent` 包(简称JUC)是Java提供的一个强大的并发工具包,它提供了丰富的并发组件,如线程池、并发容器、锁和同步机制等,极大地简化了并发编程的复杂性。本篇文章将深入探讨如何使用`java.util....

    掌握并发的钥匙:Java Executor框架深度解析

    为了简化多线程编程,Java标准库提供了`java.util.concurrent`包,其中包含了一系列高级并发工具,而Executor框架就是其中之一。 ### Executor框架概述 Executor框架提供了一种执行异步任务的方法,它允许开发者将...

    backport-util-concurrent_java_backport_

    1. **Executor框架**:Java 5引入的Executor框架提供了一种灵活的方式来管理线程和任务执行。在backport-util-concurrent中,你可以找到类似ThreadPoolExecutor和ScheduledThreadPoolExecutor的实现,它们允许开发者...

    Java的concurrent包动画演示

    1. **Executor框架**:Java `concurrent`包中最核心的概念是`Executor`框架,它将任务的提交和执行分离,简化了多线程应用的设计。`ExecutorService`是执行任务的主要接口,而`ThreadPoolExecutor`是其最常见的实现...

    Java中Executor接口用法总结

    Java中的Executor接口是Java并发编程的核心组件之一,它位于java.util.concurrent包中,为执行异步任务提供了一种统一的框架。Executor接口定义了一个单一的方法`execute(Runnable command)`,用于提交一个Runnable...

    javaconcurrent源码-java_concurrent:javaconcurrent包源代码学习,及相关实践示例

    Java `concurrent` 包是Java提供的一个核心库,它为并发编程提供了丰富的工具和类,帮助开发者编写更加高效和线程安全的代码。本资源——`java_concurrent` 源码,提供了对Java并发包的深入学习材料以及实践示例,...

    java.util.concurrent.uml.pdf

    标题中提到了“java.util.concurrent.uml.pdf”,这表明文件是一份Java并发编程工具包java.util.concurrent的UML(统一建模语言)类结构图的PDF格式文件。UML图能够帮助开发者理解Java并发包中的类、接口及其关系,...

    JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用

    "JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...

    java thread and the concurrent utilities

    Java并发工具包`java.util.concurrent.atomic`提供了原子变量类,如`AtomicInteger`、`AtomicLong`等,这些类支持无锁编程模型,提高了并发性能。 #### 四、并发工具库 ##### 1. Executor框架 Executor框架提供了...

    Java实现生产者消费者模型

    Java实现生产者消费者模型 生产者消费者模型,是一般面试题都会考的,下面介绍使用ReetrantLock实现 生产者消费者模型。 定义一个ReentrantLock锁,同时new出两个condition,一...import java.util.concurrent.Executor

    线程池之Executor框架.docx

    import java.util.concurrent.*; public class ExecutorDemo { // 创建 ThreadPoolExecutor 实现类 private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, // 核心线程数 10, // 最大...

Global site tag (gtag.js) - Google Analytics