`

java线程池--ThreadPoolExecutor

阅读更多

前言

 

前面几篇文章分别总结了AQS,以及基于AQS实现的几个APICountDownLatchSemaphoreReentrantLock。本篇接着讲解另一个基于AQS的实现:java线程池ThreadPoolExecutor,另外还有一个基于AQS实现的API 读写锁ReentrantReadWriteLock放到下次讲解。

 

java线程池是在日常开发中使用频率较高的一门技术,我们平时的使用一般是使用Executors4类静态方法创建:

1newFixedThreadPool(int nThreads):创建固定线程数的线程池,使用无界队列LinkedBlockingQueue,对应的还有一个带工厂参数的静态方法ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

 

2newCachedThreadPool():创建缓存线程池,线程数量没有限制,使用SynchronousQueue队列(没有容量,每个 put 必须等待一个 take),对应的还有一个带工厂参数的静态方法newCachedThreadPool(ThreadFactory threadFactory)

 

3newScheduledThreadPool(int corePoolSize):创建可以延迟执行或者以一定频率执行的线程池,可以取代Timer。使用DelayedWorkQueue,线程数量也没有限制(只限制了核心线程数)。对应的还有一个带工厂参数的静态方法newCachedThreadPool(ThreadFactory threadFactory)

 

4newSingleThreadExecutor():创建但线程的线程池,使用无界队列LinkedBlockingQueue对应的还有一个带工厂参数的静态方法newSingleThreadExecutor(ThreadFactory threadFactory)。另外还可以通过newSingleThreadScheduledExecutor()newSingleThreadScheduledExecutor(ThreadFactory threadFactory)创建单线程版本的延迟执行或者以一定频率执行的线程池,第三,四种方式的结合体。

 

不管采用上述哪种方法,这些方法最终会创建ThreadPoolExecutor实例。另外springThreadPoolTaskExecutor本质上也是使用的ThreadPoolExecutor,只是把配置参数抽取出来 方便在xml中配置而已。同样的我们也可以根据ThreadPoolExecutor的构造方法,根据业务需要创建自定义的线程池,这就必须了解ThreadPoolExecutor的基本构成。

 

如果要想熟悉java线程池的基本原理,就必须熟悉ThreadPoolExecutor的实现原理。

 

ThreadPoolExecutor实现原理

 

构造方法

首先来看ThreadPoolExecutor的构造方法,ThreadPoolExecutor4个重载的构造方法,但最终本质上都是调用的同一个,我们只用分析这一个构造方法就行:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

 

该构造方法,一共有7个参数,分别一个一个来看:

1corePoolSize:核心线程数,即线程池需要维持的最少线程数。当线程池刚创建时,线程数为0(也可以指定初始化,调用prestartAllCoreThreads方法),向线程池添加任务时,会直接创建线程进行处理,线程个数达到核心线程数。

2maximumPoolSize:线程池运行创建的最大线程数,需要大于等于corePoolSize

3keepAliveTime:当线程数大于corePoolSize,并且有部分线程已经空闲,空闲超过keepAliveTime时长,就会消耗该线程。

4unitkeepAliveTime的时间单位。

5workQueue:任务队列(BlockingQueue类型的阻塞队列),当线程数等于corePoolSize时,再新增任务这时不会新增线程,而是先放到任务队列进行缓冲。当任务队列满时,就会创建新的线程,直达线程数到达允许的最大值maximumPoolSize,如果还有新任务进来,就进入“决绝策略”环节,详见第7个参数handler

6threadFactory:创建线程时使用的工厂实例,要求必须实现ThreadFactory接口,一般用于自定义线程名称、指定优先级等。默认实现是DefaultThreadFactory,可以仿照这个类实现自己的自定义工厂类。

7handler:拒绝策略,当线程数到达maximumPoolSize,并且workQueue任务队列已满的情况下,如果还有新任务加入,就会执行“拒绝策略”。要求必须实现RejectedExecutionHandler接口, ThreadPoolExecutor提供了四个默认实现可以选择:

AbortPolicy:直接抛出RejectedExecutionException异常给提交任务线程;

CallerRunsPolicy:交给提交任务线程执行,即直接执行任务的run方法;

DiscardOldestPolicy:线程池放弃队列中最老的任务,并把这个新任务加入队列;

DiscardPolicy:直接放弃任务,并且不返回异常。

有个默认的构造方法使用的就是AbortPolicy--抛出异常策略。另外 使用Executors创建的线程池都是使用的这个默认策略。

当然,也可以自己实现RejectedExecutionHandler接口,定义自己的拒绝策略

 

到这里构造方法讲解完毕,熟悉了构造方法,相信也就可以实现任意的自定义线程池了。

 

execute提交任务方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //当前线程数小于核心线程数,直接调用addWorker创建核心线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程已经关闭,放弃任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果无法加入队列(队列已满),并且也没办法创建新线程(线程到达最大值),执行决绝策略
        //如果无法加入队列(队列已满),但还可以创建新线程,就创建普通线程(非核心线程)
        else if (!addWorker(command, false))
            reject(command);
}

上面已经对构造方法的参数进行过讲解,结合给出的注释 再来看这个提交任务方法就比较好理解了。这里不再累述,主要需要提出讲的是添加线程方法addWorker(Runnable firstTask, boolean core),第二参数判断是否创建核心线程(非核心线程有可能会被回收),这个方法的主要作用就是实例化一个WorkerThreadPoolExecutor的内部类),并启动Worker中的线程。下面重点讲下Worker类的实现,因为它就是对AQS的实现。

 

WorkerAQS的实现

Worker继承AQS,主要实现了一个排它锁,有人会问为啥不直接用ReentrantLockWorker使用这个锁从队列中获取任务时加锁,每个线程同时只执行一个任务,不需要重入锁(如果是重入锁,就有可能同一个线程同时执行多个任务),这部分内容在runWorker方法中会讲到。首先来看下Worker的实现:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;
 
    //Worker种的线程
    final Thread thread;
    //线程的初始任务
    Runnable firstTask;
    //该Worker已完成的任务数
    volatile long completedTasks;
   
    //构造方法
    Worker(Runnable firstTask) {
        // 为AQS的state赋值,在执行runWorker之前都是阻塞状态
        setState(-1);
        //初始任务
        this.firstTask = firstTask;
        //调用工厂实例,创建线程
        this.thread = getThreadFactory().newThread(this);
    }
 
    //启动Worker,runWorker中有个死循环 不停的从任务队列中获取任务执行
    public void run() {
        runWorker(this);
    }
   
    //0表示锁可用,1表示已被占用
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
 
    //排它获取锁实现,是非公平实现,主要为了保证性能
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
 
    //释放锁实现
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
 
    //阻塞获取锁
    public void lock()        { acquire(1); }
    //非阻塞尝试获取锁
    public boolean tryLock()  { return tryAcquire(1); }
    //释放锁
    public void unlock()      { release(1); }
    //锁是否被占用
    public boolean isLocked() { return isHeldExclusively(); }
 
    //中断线程方法(注意不是中断获取锁)
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
 

 

重要的地方都加了注释,结合注释应该可以很好的理解Worker的实现。需要说明的Worker还实现了Runnable接口,在addWorker方法中会启动Worker并执行其run方法,也就会接着执行runWorker方法,该方法会循环从任务队列中获取任务并执行:

 
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 释放锁,记得初始化的时候AQS的state被设置为-1吗?在这里释放
        w.unlock();
        boolean completedAbruptly = true;
        try {
            //不停的获取任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();//加锁
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //这一段都是判断是否需要中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行任务前,需要执行的操作,这个可以留给用户自定义
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行任务操作
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务执行完成,需要执行的操作,这个可以留给用户自定义
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //对worker已完成的任务数加1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //Worker结束时 需要执行的相关清理工作
            processWorkerExit(w, completedAbruptly);
        }
}
 

 

可以看到runWorker方法是一个死循环,只有当Worker线程被外部中断时才会结束。在获取任务执行的过程中需要加锁,这个锁就是Worker中使用AQS实现的锁。或许会有人困惑为什么需要加锁,明明这里只有Worker一个线程在执行(每个Worker实例都有一个私有的锁),这是因为在外部有可能会在其他线程中调用ThreadPoolExecutor的其他方法中断Worker,为了保证任务完整的被执行,必须在获得锁的情况下才能中断Worker,这些方法有两个:interruptIdleWorkers(会shutdown,以及回收空闲Worker线程时等方法调用)、interruptWorkers(会被shutdownNow方法调用)

 

beforeExecuteafterExecute方法

这两个方法是在具体执行某一个任务之前和之后调用,这个两个方法是空方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

 

并且这个这两个方法都是protected,也就是说专门留给ThreadPoolExecutor的子类实现的。当我们需要在执行每个任务的前后打印一些日志,或者精确的计算每个任务的执行时间,就可以继承ThreadPoolExecutor实现这个两个方法,创建自己的线程池实现。

 

shutdown平滑关闭线程池方法

先来看该方法的实现代码,再说明为什么是平滑关闭线程:

   

 public void shutdown() {
        //主锁,所有Worker共用,注意与Worker中的锁区分开
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检查权限
            //设置线程池状态,之后添加的任务直接执行“拒绝策略”
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();//首先中断空闲的线程
            //关闭线程池后,触发的操作,默认是空,预留给子类实现
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        //这个方法的主要作用就是等待Worker把任务队列中的任务执行完成,
        //然后当Worker空闲时,调用interruptIdleWorkers()方法中断线程。
        tryTerminate();
}

 

结合给出的注释应该就很好理解什么是平滑关闭了,首先把线程池状态置为关闭,后续再往队列中添加任务会直接决绝;然后调用interruptIdleWorkers()方法清理所有已经空闲的Worker;最后通过tryTerminate()方法等待Worker把任务队列中的任务执行完成,待Worker状态变为空闲时,在调用interruptIdleWorkers()方法中断Worker线程。其中onShutdown()方法默认是空的,可以在自定义的ThreadPoolExecutor中去实现,比如在线程池关闭时加写日志。另外再重点看下interruptIdleWorkers()方法:

private void interruptIdleWorkers(boolean onlyOne) {
        //主锁,所有Worker公用
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //尝试获取Worker自己的锁,如果能获取到 说明该Worker已经空闲
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //中断Worker线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

 

方法的实现逻辑很简单,只是需要注意这里用到两个锁,一个是线程池的主锁使用,所用Worker都公用这一个锁;另外一个是Worker自己的锁是通过AQS自己实现的,主要用于判断线程是否空闲,如果空闲就执行中断操作。

 

可以看到shutdown方法的整个实现过程会保证所有已经放入任务队列中的任务最终都被执行,是平滑的关闭线程池方法。

 

shutdownNow暴力关闭方法

相对于shutdown平滑关闭方法,线程池还提供暴力关闭方法shutdownNow:

    

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            //返回任务队列中还没有执行的任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //如果任务已经在执行中,会保证任务执行完成后,才中断线程
        tryTerminate();
        return tasks;
    }

 

shutdownNowshutdown方法唯一不同的地方就是通过调用drainQueue()方法,返回给调用方任务队列中还没有执行的任务类别,调用方可以把这些任务保存起来,方便后续处理。同时也会保证已经在执行过程中的任务继续执行完成后,才中断线程。所以shutdownNow虽然暴力,但还是能保证任务状态的一致性(即不存在任务只被执行了一半的状态),同时返回还没有执行的任务列表,为后续恢复执行创造了条件。

 

由于通过interrupt方法中断线程只是打一个标记,线程真正被中断还需要一定的时间。也就是说执行完shutdownNowshutdown方法后线程池不一定马上就中断了,这时可以通过isTerminated()方法来判断线程池是不是已经彻底关闭。

 

至此ThreadPoolExecutor的核心实现讲解完毕。

 

总结

 

我们平时基本都是使用Executors框架来创建线程池,通过本文的总结,相信大家都可以通过ThreadPoolExecutor创建出自己的线程池,必要时还可以继承ThreadPoolExecutor对它进行扩展。

 

 

 

 

0
0
分享到:
评论

相关推荐

    java 线程池例子ThreadPoolExecutor

    Java 线程池例子 ThreadPoolExecutor Java 中的线程池是指一个容器,里面包含了多个线程,这些线程可以重复使用,以避免频繁创建和销毁线程的开销。ThreadPoolExecutor 是 Java 中一个非常重要的线程池实现类,它...

    java线程池使用后到底要关闭吗

    java线程池使用后到底要关闭吗 java线程池是一种高效的并发编程技术,可以帮助开发者更好地管理线程资源,提高系统的性能和可靠性。然而,在使用java线程池时,一个常见的问题是:使用完线程池后到底要不要关闭?...

    12、线程池ThreadPoolExecutor实战及其原理分析(下)

    线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...

    JAVA并发编程实践-线程池-学习笔记

    Java并发编程实践中的线程池是...总之,理解和掌握Java线程池的原理和配置技巧,能帮助开发者编写出高效、稳定的并发程序,提升系统性能。在实践中,应结合具体业务需求灵活调整线程池参数,以达到最佳的并发执行效果。

    Java线程池与ThreadPoolExecutor.pdf

    Java线程池是Java并发编程中...总结来说,理解并正确使用Java线程池和ThreadPoolExecutor对于优化Java应用程序的并发性能至关重要。通过调整线程池的参数,可以平衡资源利用率和系统响应时间,从而提高整体的系统效率。

    java线程池ThreadPoolExecutor类使用详解.docx

    在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了...

    徒手实现线程池-1

    在Java并发编程中,线程池(ThreadPoolExecutor)是一个至关重要的工具,它的目的是简化多线程的管理和控制,提高系统的效率和资源利用率。线程池的实现涉及到多种类型的队列和策略,每种都有其特定的适用场景。下面...

    线程池原理-ThreadPoolExecutor源码解析

    线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor

    线程-线程池-锁-集合-Map-队列.docx

    在Java中,使用`ExecutorService`和`ThreadPoolExecutor`来创建线程池,而不是直接使用`Executors`,因为`Executors`创建的线程池可能会导致资源耗尽的问题。 集合是Java中存储数据的主要工具,包括List、Set和Map...

    自定义实现Java线程池1-模拟jdk线程池执行流程1

    【自定义Java线程池实现】 在Java编程中,线程池是一种高效管理线程资源的方式,可以提高系统的性能和响应速度。本篇将探讨如何模拟Java的JDK线程池执行流程,以理解其设计原理。核心知识点包括线程池的执行策略、...

    java线程池封装j

    Java线程池是一种高效管理线程的技术,它允许开发者预定义一组线程,根据任务的需要灵活调度,而不是每次需要执行任务时都创建新的线程。这种设计模式大大提高了系统的性能,减少了系统资源的消耗,特别是在高并发...

    Java 线程池的原理与实现

    Java线程池是一种高级的多线程处理框架,它是Java并发编程中非常重要的一个组件。线程池的原理和实现涉及到操作系统调度、内存管理和并发控制等多个方面。理解线程池的工作原理有助于优化程序性能,避免过度创建和...

    java 线程池实现多并发队列后进先出

    Java线程池是一种高效管理并发任务的机制,它允许开发者预先配置一定数量的线程,以便在处理多个并发任务时能有效地复用这些线程,从而避免了频繁创建和销毁线程带来的开销。在Java中,`java.util.concurrent`包下的...

    java线程池实例详细讲解

    Java线程池的实现主要有`ThreadPoolExecutor`类,它提供了丰富的构造参数来定制线程池的行为: - `corePoolSize`:线程池的基本大小,即当线程池创建后和运行过程中,即使没有任务,也会保持这个数量的线程存活。 -...

    自定义实现Java线程池

    ### 自定义实现Java线程池 #### 一、概述 在深入探讨自定义Java线程池之前,我们先简要回顾一下线程池的基本概念及其重要性。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动...

    java线程池的源码分析.zip

    首先,我们要理解Java线程池的核心类`java.util.concurrent.ThreadPoolExecutor`,它是所有自定义线程池的基类。线程池通过维护一个工作线程集合来管理任务的执行,避免了频繁创建和销毁线程的开销,提高了系统的...

    java线程池threadpool简单使用源码

    要理解`java线程池threadpool简单使用源码`,你需要查看`src`目录下的Java文件,了解如何实例化`ThreadPoolExecutor`,设置相关参数,以及如何提交任务到线程池。同时,查看源码中对`ThreadGroup`的使用,理解它如何...

    Java线程池文档

    Java线程池是一种高效管理线程的机制,它允许开发者预先设定线程的数量,并通过池化的方式重用已创建的线程,以提高系统性能,减少线程的创建和销毁开销。线程池在Java中是通过`java.util.concurrent`包下的`...

    java线程池实例

    Java线程池是一种高效管理线程资源的工具,它通过维护一组可重用的线程来减少创建和销毁线程的开销。在Java中,`java.util.concurrent`包提供了`ExecutorService`接口和它的实现类,如`ThreadPoolExecutor`,来支持...

    JAVA线程池例子

    Java线程池是一种高效管理线程资源的技术,它允许开发者创建一组可重用的工作线程,从而避免频繁地创建和销毁线程带来的性能开销。线程池在Java中主要通过`java.util.concurrent`包中的`ExecutorService`接口及其...

Global site tag (gtag.js) - Google Analytics