`

JUC之 ThreadPoolExecutor 源码解析

 
阅读更多


用下面语句创建一个线程池 ThreadPoolExecutor


ExecutorService executorProducer = Executors. newFixedThreadPool (2);


该段代码主要初始化线程池的一些参数,如: corePoolSize maximumPoolSize workQueue 等。





executorProducer.execute( new Runnable() {

     public void run() {


     }

});


上述代码首先创建一个 Runnable 实例,然后执行 execute() 方法,具体代码如下:


public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize (command)) {
            if (runState == RUNNING && workQueue.offer (command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

 


当线程池已满,或者不能把当前线程加入到线程池中 addIfUnderCorePoolSize() ,则将运行状态的线程加入( workQueue . offer () )到等待队列中去( LinkedBlockingQueue workQueue )。


AddIfUnderCorePoolSize 代码如下:


private boolean addIfUnderCorePoolSize (Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }


 

其中有个方法 addThread 代码如下:

private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w) ;
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }


 

其所用数据结构如下图所示:


  1. 用户创建一个多线程对象实例( Producer )。

  2. 使用 1 创建出来的实例,创建一个 Worker 实例。 --> worker.firstTask = producer.

  3. 2 的实例作为 target 创建一个线程实例。 Thread t = new Thread(..., worker, ...)

  4. 2 的属性 worker.thread 3 创建的线程实例关联起来。 Worker.thread = t

  5. 如果 3 的实例为 null, 表示未能创建 produder 对性的执行线程 worker 。转 7

  6. 如果 3 创建了一个线程,则启动线程 t.start() 返回 true 表示成功将 producer 分配给线程池一个执行线程 worker

  7. 如果未能创建 worker 执行 producer ,则将 producer 放入队列 workQueue.offer() 中,等待以后执行。

其中 3 创建的 thread 主要目的是为了执行 6 中的 thread.start ()方法,因为 Runnable 只是一个接口,没有办法启动一个线程,所以用 Thread 包装了一下。


由图可知通过 Executors 执行用户的线程流程如下:

ThreadPoolExecutor.executor(Producer) --> Thread.start() --> Thread.run() --> Worker.run() --> Worker.runTask(producer) --> producer.run().


其中 worker.run () 代码如下:


public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;

               // 如果 worker 有线程,或者可以从等待队列取到线程,则执行该线程

                while (task != null || (task = getTask() ) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

 

最终执行线程代码如下:


private void runTask (Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * Ensure that unless pool is stopping, this thread
                 * does not have its interrupt set. This requires a
                 * double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run(); //线程实际执行的地方
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

 

 

 

最后,总结一下 ThreadPoolExecutor 的执行图如下:

 

 

1. ThreadPoolExecutor 通过 executor方法的 addIfUnderCorePoolSize(Runnable)创建线程池里执行线程 worker[i]

2. 如果需要执行的线程个数小于线程池的最大线程数量,则每个 worker 执行一个线程,结束任务。

3. 如果需要执行的线程个数大于线程池的最大线程数量,则每个 worker 取个一个线程,把剩余需要执行的任务存入等待队列中(workQueue.offer()).

4. worker.run() 设置 task 为 firstTask 或者 getTask()的线程。其中 getTask()即为从等待队列取得一个任务,分给该worker执行。

5. task.run() 执行线程。

 

 

  • 大小: 59 KB
  • 大小: 12.8 KB
分享到:
评论

相关推荐

    ThreadPoolExecutor源码解析.md

    ThreadPoolExecutor源码解析.md

    JUC并发编程与源码分析视频课.zip

    《JUC并发编程与源码分析视频课》是一门深入探讨Java并发编程的课程,主要聚焦于Java Util Concurrency(JUC)库的使用和源码解析。JUC是Java平台提供的一组高级并发工具包,它极大地简化了多线程编程,并提供了更...

    JUC+课程源码+线程操作

    本课程资源主要围绕JUC进行展开,通过样例源码帮助学习者深入理解和掌握线程操作的相关知识。 在JUC中,核心组件包括`ExecutorService`、`Semaphore`、`CountDownLatch`、`CyclicBarrier`、`Future`、`...

    尚硅谷大厂必备技术之JUC并发编程基础视频 配套资料(自己根据视频整理课件,和代码)

    【尚硅谷】大厂必备技术之JUC并发编程视频 配套资料,自己根据视频整理 pdf 课件,和代码 视频地址:...

    并发编程、juc工具包源码分析笔记

    Java 并发库(Java Util Concurrency, JUC)是 Java 平台中用于处理并发问题的核心工具包,提供了丰富的类和接口来简化并发编程。 在计算机系统中,进程是操作系统资源分配的基本单位,它拥有独立的内存空间,而...

    juc源码视频教程最全

    本教程将深入探讨JUC源码,旨在帮助你全面理解其背后的实现原理。 在Java中,JUC(java.util.concurrent)包包含了多种并发控制和同步组件,如线程池、锁、原子变量、并发容器等。这些组件设计精巧,性能优秀,能够...

    juc并发编程脑图以及相关示例代码

    juc并发编程脑图以及相关示例代码

    尚硅谷Java视频_JUC 视频教程

    尚硅谷_JUC线程高级_源码、课件 ·1. 尚硅谷_JUC线程高级_volatile 关键字与内存可见性 ·2. 尚硅谷_JUC线程高级_原子变量与 CAS 算法 ·3. 尚硅谷_JUC线程高级_模拟 CAS 算法 ·4. 尚硅谷_JUC线程高级_同步容器类...

    最详细JUC面试内容解析.docx

    ### 最详细JUC面试内容解析 #### 一、并发基础:内存可见性 在多线程编程中,尤其是在Java语言的并发控制中,**内存可见性**是一个非常关键的概念。简单来说,内存可见性指的是当一个线程修改了一个共享变量的值...

    Fork:join框架与CompleteableFuture源码解析.pptx

    全网第一篇通过图文介绍Fork/Join框架与CompleteableFuture的PPT

    根据尚硅谷JUC并发编程(对标阿里P6-P7)视频自己整理的pdf文档

    1、根据尚硅谷JUC并发编程(对标阿里P6-P7)视频自己整理的pdf文档 2、包含源码 视频地址:https://www.bilibili.com/video/BV1ar4y1x727/?p=1&vd_source=c634d163b940964d44747b4c3976117b 参考资料:...

    尚硅谷JUC视频笔记整理,很详细和全面,帮你迅速掌握JUC

    JUC(Java Util Concurrent),即Java的并发工具包,是Java提供的一套并发编程解决方案,它通过一系列接口和类简化了并发编程的复杂性。本笔记整理涉及了JUC的内存可见性、volatile关键字以及CAS算法和原子变量等多...

    个人学习JUC代码笔记总集

    1. **线程池**:ExecutorService是Java中线程池的核心接口,通过ThreadPoolExecutor和Executors工厂类可以创建不同类型的线程池,如固定大小的线程池、单线程池和缓存线程池。线程池可以有效地管理和控制线程,避免...

    JUC线程锁框架

    在这个深度解析JUC线程锁框架的主题中,我们将探讨其核心组件、设计模式以及如何在实际应用中有效利用。 1. **原子变量(Atomic Variables)** JUC提供了一系列的原子变量类,如AtomicInteger、AtomicLong等,它们...

    美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇.doc

    美团动态线程池实践思路开源项目(DynamicTp)线程池源码解析及通知告警篇 本文详细介绍了美团动态线程池实践思路开源项目(DynamicTp)的通知告警模块,该模块提供了多种通知告警功能,每一个通知项都可以独立配置...

    尚硅谷Java视频_JUC视频教程

    - **ThreadPoolExecutor**: 是`Executor`框架的核心类之一,提供了创建自定义线程池的能力。 4. **并发工具类** - **Atomic类**: `java.util.concurrent.atomic`包下的原子类提供了基本类型的原子更新操作,如`...

    尚硅谷JUC百度云连接

    根据提供的文件信息,“尚硅谷JUC百度云连接”这一标题和描述主要指向的是尚硅谷教育机构所提供的关于Java并发编程(Java Util Concurrency,简称JUC)的学习资源,并且通过一个百度网盘链接来分享这些资源。...

    juc学习代码。。。。

    其次,`ExecutorService`和`ThreadPoolExecutor`是JUC中线程池的核心组件。`ExecutorService`接口定义了线程池的基本操作,而`ThreadPoolExecutor`是其具体实现,提供了更灵活的配置和管理。线程池可以有效地管理和...

    java高并发源码-java-concurrent:Java高并发,JUC,相关源码。1、马士兵高并发视频源码(听课时练习)

    JUC,全称Java Util Concurrency,是Java平台标准版(Java SE)的一部分,提供了丰富的并发工具类,用于构建高效、可扩展的多线程应用。 首先,我们来了解下Java并发编程的基础知识。在Java中,多线程主要通过...

    狂神说JUC.pdf

    在JUC中,提供了Executors类来创建各种类型的线程池,并且可以通过ThreadPoolExecutor类来更细致地控制线程池的行为。 最后,实际应用中,线程池和锁都是为了解决资源的并发访问问题。在提高系统效率的同时,确保...

Global site tag (gtag.js) - Google Analytics