`

java 线程池2

    博客分类:
  • java
 
阅读更多

https://www.cnblogs.com/wangyu19900123/p/11641645.html

最近在看Java并发编程的艺术》回顾线程池的原理和参数的时候发现一个问题,如果 corePoolSize = 0 阻塞队列是无界的。线程池将如何工作?

我们先回顾一下书里面描述线程池execute()工作的逻辑:

  1. 如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
  2. 如果运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue
  3. 如果 BlockingQueue 内的任务超过上限,则创建新的线程来处理任务。
  4. 如果创建的线程数是单钱运行的线程超出 maximumPoolSize,任务将被拒绝策略拒绝。

看了这四个步骤,其实描述上是有一个漏洞的。如果核心线程数是0,阻塞队列也是无界的,会怎样?如果按照上文的逻辑,应该没有线程会被运行,然后线程无限的增加到队列里面。然后呢?

于是我做了一下试验看看到底会怎样?

public class threadTest {

    private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

    public static void main(String[] args) {

        AtomicInteger atomicInteger = new AtomicInteger();

        while (true) {

            executor.execute(() -> {

                System.out.println(atomicInteger.getAndAdd(1));

            });

        }

    }

}

结果里面的System.out.println(atomicInteger.getAndAdd(1));语句执行了,与上面的描述矛盾了。到底发生了什么?线程池创建线程的逻辑是什么?我们还是从源码来看看到底线程池的逻辑是什么?

<!--[if !supportLists]-->3.1             <!--[endif]-->ctl

要了解线程池,我们首先要了解的线程池里面的状态控制的参数 ctl

  • 线程池的ctl是一个原子的 AtomicInteger

<!--[if !supportLists]-->·         <!--[endif]-->这个ctl包含两个参数

    • workerCount 激活的线程数
    • runState 当前线程池的状态

<!--[if !supportLists]-->·         <!--[endif]-->它的低29位用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:

    • 111: RUNNING
    • 000: SHUTDOWN
    • 001: STOP
    • 010: TIDYING
    • 110: TERMINATED

为了能够使用 ctl 线程池提供了三个方法:

    // Packing and unpacking ctl
    // 获取线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取线程池的工作线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 根据工作线程数和线程池状态获取 ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

<!--[if !supportLists]-->3.2           <!--[endif]-->execute

外界通过 execute 这个方法来向线程池提交任务。

先看代码:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
 
        //如果工作线程数小于核心线程数,
        if (workerCountOf(c) < corePoolSize) {
            //执行addWork,提交为核心线程,提交成功return。提交失败重新获取ctl
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
 
        //如果工作线程数大于核心线程数,则检查线程池状态是否是正在运行,且将新线程向阻塞队列提交。
        if (isRunning(c) && workQueue.offer(command)) {
 
            //recheck 需要再次检查,主要目的是判断加入到阻塞队里中的线程是否可以被执行
            int recheck = ctl.get();
 
            //如果线程池状态不为running,将任务从阻塞队列里面移除,启用拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果线程池的工作线程为零,则调用addWoker提交任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
 
        //添加非核心线程失败,拒绝
        else if (!addWorker(command, false))
            reject(command);
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //获取线程池状态
            int rs = runStateOf(c);
 
            // Check if queue empty only if necessary.
            // 判断是否可以添加任务。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
 
            for (;;) {
                //获取工作线程数量
                int wc = workerCountOf(c);
                //是否大于线程池上限,是否大于核心线程数,或者最大线程数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS 增加工作线程数
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果线程池状态改变,回到开始重新来
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
 
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
 
        //上面的逻辑是考虑是否能够添加线程,如果可以就cas的增加工作线程数量
        //下面正式启动线程
        try {
            //新建worker
            w = new Worker(firstTask);
 
            //获取当前线程
            final Thread t = w.thread;
            if (t != null) {
                //获取可重入锁
                final ReentrantLock mainLock = this.mainLock;
                //锁住
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // rs < SHUTDOWN ==> 线程处于RUNNING状态
                    // 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 当前线程已经启动,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //workers 是一个 HashSet 必须在 lock的情况下操作。
                        workers.add(w);
                        int s = workers.size();
                        //设置 largeestPoolSize 标记workAdded
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //如果添加成功,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //启动线程失败,回滚。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

先看看 addWork() 的两个参数,第一个是需要提交的线程 Runnable firstTask,第二个参数是 boolean 类型,表示是否为核心线程。

execute() 中有三处调用了 addWork() 我们逐一分析。

  • 第一次,条件 if (workerCountOf(c) < corePoolSize) 这个很好理解,工作线程数少于核心线程数,提交任务。所以 addWorker(command, true)
  • 第二次,如果 workerCountOf(recheck) == 0 如果worker的数量为0,那就addWorker(null,false)。为什么这里是 null ?之前已经把 command 提交到阻塞队列了workQueue.offer(command) 。所以提交一个空线程,直接从阻塞队列里面取就可以了。
  • 第三次,如果线程池没有 RUNNING 或者 offer 阻塞队列失败,addWorker(command,false),很好理解,对应的就是,阻塞队列满了,将任务提交到,非核心线程池。与最大线程池比较。

至此,重新归纳execute()的逻辑应该是:

  1. 如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
  2. 如果运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue
  3. 如果加入 BlockingQueue 成功,需要二次检查线程池的状态如果线程池没有处于 Running,则从 BlockingQueue 移除任务,启动拒绝策略。
  4. 如果线程池处于 Running状态,则检查工作线程(worker)是否为0。如果为0,则创建新的线程来处理任务。如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。
  5. 如果加入 BlockingQueue 。失败,则创建新的线程来处理任务。
  6. 如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。

<!--[if !supportLists]-->3.3             <!--[endif]-->总结

回顾我开始提出的问题:

如果 corePoolSize = 0 阻塞队列是无界的。线程池将如何工作?

这个问题应该就不难回答了。

 

<!--[if !supportLists]-->3.4             <!--[endif]-->最后

分享到:
评论

相关推荐

    java线程池完整代码

    2. 线程池的创建:使用 `ConsumeThreadPoolPara` 对象创建线程池,设置线程池的最小和最大线程数。 3. 线程池的管理:使用 Timer 定期扫描线程池,防止线程未激活,并使用 `freeThreadCount` 变量来得到空闲线程的...

    Java简单线程池 线程池中文文档

    简单的线程池程序+中文文档 包结构: com.tangkai.threadpool --SimpleThread.java 工作线程 --TestThreadPool.java 程序入口 --ThreadPoolManager.java 线程池管理类

    自定义实现Java线程池2-完善异常处理和去除同步1

    在Java编程中,自定义线程池是一项常见的需求,它可以帮助我们更有效地管理和控制并发执行的任务。本文将讨论如何在自定义线程池中完善异常处理和去除同步,以提高效率和程序的健壮性。 首先,从提供的代码片段来看...

    java线程池封装j

    Java线程池是一种高效管理线程的技术,它允许开发者预定义一组线程,根据任务的需要灵活调度,而不是每次需要执行任务时都创建新的线程。这种设计模式大大提高了系统的性能,减少了系统资源的消耗,特别是在高并发...

    java线程池知识.ppt

    java线程池知识、

    一个通用的Java线程池类

    2.然后根据提示运行java命令执行示例程序,观看线程池的运行结果 目标:Java中多线程技术是一个难点,但是也是一个核心技术。因为Java本身就是一个多线程语言。本人目前在给46班讲授Swing的网络编程--使用Swing来...

    基于Java线程池技术实现Knock Knock游戏项目.zip

    基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池...

    java线程池实例详细讲解

    Java线程池是一种高效管理线程资源的工具,它能够帮助开发者有效地控制并调度线程,从而提升系统性能,减少系统资源的浪费。在Java中,`ExecutorService`接口是线程池的主要入口,它是`java.util.concurrent`包的一...

    Java 线程池.pptx

    讲述了java线程池的优点,参数,6种线程池的使用场景,线程池用到的handler,线程任务的提交方式等等。

    自定义实现Java线程池

    ### 自定义实现Java线程池 #### 一、概述 在深入探讨自定义Java线程池之前,我们先简要回顾一下线程池的基本概念及其重要性。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动...

    Java线程池使用说明

    Java线程池是Java并发编程中的重要组件,它能够有效地管理和复用线程,从而提高程序的执行效率和降低资源消耗。在JDK 1.5版本之前,Java对线程池的支持非常有限,而在JDK 1.5之后,加入了java.util.concurrent包,...

    java线程池的源码分析.zip

    Java线程池是Java并发编程中的重要组成部分,它在多线程和高并发场景下扮演着关键角色。本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要...

    java线程池threadpool简单使用源码

    Java线程池(ThreadPool)是Java并发编程中的一个重要概念,它可以帮助我们有效地管理和控制并发执行的任务,从而提高系统的效率和稳定性。线程池通过复用已存在的线程,避免了频繁创建和销毁线程带来的开销,同时也...

    Java线程池文档

    Java线程池是一种高效管理线程的机制,它允许开发者预先设定线程的数量,并通过池化的方式重用已创建的线程,以提高系统性能,减少线程的创建和销毁开销。线程池在Java中是通过`java.util.concurrent`包下的`...

    java技术学习-基于Java线程池技术实现Knock Knock游戏项目(包含服务端、客户端两部分)

    java技术学习——基于Java线程池技术实现Knock Knock游戏项目(包含服务端、客户端两部分) java技术学习——基于Java线程池技术实现Knock Knock游戏项目(包含服务端、客户端两部分) java技术学习——基于Java...

    Java 线程池.docx

    Java线程池是一种高效管理线程资源的工具,它的出现是为了应对多线程编程中频繁创建和销毁线程带来的性能开销以及资源消耗。在Java中,通过使用线程池,我们可以预先创建一定数量的线程,这些线程在空闲时可以被复用...

    Java线程池与ThreadPoolExecutor.pdf

    Java线程池是Java并发编程中的重要组成部分,它允许开发者管理多个线程并有效地调度任务。线程池通过ThreadPoolExecutor类实现,这是一个高度可配置的工具,能够根据具体需求定制线程的创建、管理和销毁策略。 ...

    java线程池实例

    Java线程池是一种高效管理线程资源的工具,它通过维护一组可重用的线程来减少创建和销毁线程的开销。在Java中,`java.util.concurrent`包提供了`ExecutorService`接口和它的实现类,如`ThreadPoolExecutor`,来支持...

    Java 线程池的原理与实现

    Java线程池是一种高级的多线程处理框架,它是Java并发编程中非常重要的一个组件。线程池的原理和实现涉及到操作系统调度、内存管理和并发控制等多个方面。理解线程池的工作原理有助于优化程序性能,避免过度创建和...

Global site tag (gtag.js) - Google Analytics