`
suichangkele
  • 浏览: 199999 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

juc-ThreadPoolExecutor线程池总结

    博客分类:
  • java
阅读更多

之前对线程池的理解很是主观,我的理解是在线程池初始化的时候就生成指定的数量的线程,然后将一些任务添加到一个阻塞队列中,然后多个线程同时从阻塞队列中取任务执行,当没有任务时线程阻塞,今天下午看了下大神的博客以及源码,发现我自己的理解有些偏颇,所以今天再总结一下。

我们创建线程池时一般都是通过使用Executors.newXXX来创建,所以我们从这个方法入手,无论是创建Fixed还是cached,还是single的线程池,都是调用的同一个方法——ThreadPoolExecutor的构造方法,只不过是参数不同。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,//这两二个参数都是整数,用于限制线程的数量,在后面会用到
                       0L, TimeUnit.MILLISECONDS,//这两个参数都是用来指定从阻塞队列中获取任务的时候的超时时间,more on this later。
                       new LinkedBlockingQueue<Runnable>());//用来保存任务的阻塞队列
}

 上面的构造方法最终调用的构造方法为:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,//上面这5个已经说过了。
                              ThreadFactory threadFactory,//这个用来产生thread,默认是 Executors.defaultThreadFactory()这个的返回值,他负责产生thread,设置thread的name,优先级,设置为daemon,没有别的作用。
                              RejectedExecutionHandler handler) {//这个用来处理当过多的任务添加到队列时,如果队列无法添加的时候的操作,默认的是AbortPolicy,当发生上述情况时,会抛一个异常。
。。。。//复制操作省略了
    }

我们看一下提交任务的方法execute:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
      
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//如果当前的worker(也即是工作的线程数量)小于corePoolSize,进入if,这里的corePoolSize也就是我们在构造方法中穿入的第一个参数,很重要,他表示一个线程池只少要保存的线程的个数,这些线程是不会过期的,如果阻塞队列中没有任务,则会
在阻塞队列的poll方法中阻塞,而超过这个值的线程在没有任务的时候就要被回收掉。
            if (addWorker(command, true))//添加一个worker,即一个线程,more on this later,
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//如果上面的没有return,则进入这个if的判断,条件是正在运行,且这个任务能添加到队列中,则进入if
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//重新检查,虽然上面应有个isRunning的判断,但是可能有时间差。进入这个if,如果不是running的话
                reject(command);//拒绝这个任务,调用rejectPolicy,默认是抛个异常。
            else if (workerCountOf(recheck) == 0)//如果当前的工作线程个数为0,则调用addWorker添加一个工作线程,如果不为0的话,那么就会有线程不停的从阻塞队列中获得任务,所以一定会有线程执行这个任务的。
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//如果添加到队列中失败,则判断是否可以添加工作线程,如果不可以,则拒绝这个任务。
            reject(command);
    }

 经过上面,我们知道了corePoolSize,他是要保存的最小的工作的线程数(more on this later),但是还没有涉及到maximumPoolSize,我们趁热打铁看看addWorder方法

 private boolean addWorker(Runnable firstTask, boolean core) {//第一个参数表示如果要创建工作线程的话,他要执行的第一个任务,第二个参数是用来做判断的,如果是true,表示和corePoolSize判断,false表示和maximumPoolSize判断。
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&//已经shutdown,不能继续接受任务。
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//工作线程的个数
                if (wc >= CAPACITY ||//如果现在已经创建的线程的数量太多,
                    wc >= (core ? corePoolSize : maximumPoolSize))//这里显示了core的作用,如果是true,则根据第一个判断,
                    return false;//如果创建的worker太多,则返回false,表示不能继续创建。
                if (compareAndIncrementWorkerCount(c))//使用cas增加c,退出循环,进行创建worker
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);//创建worker
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将创建的worker添加到集合中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//调用创建的worker中的Thread的start方法,
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 下面我们看一下Worker这个类:

Worker(Runnable firstTask) {//这个参数表示当这个worker创建成功后执行的第一个任务。
     setState(-1); // inhibit interrupts until runWorker
     this.firstTask = firstTask;//第一个任务
     this.thread = getThreadFactory().newThread(this);//worker中封装了一个线程,并且这个线程执行的任务是this,因为Worker也是一个runnable,也就是说调用的worker类的run方法
}

 同时我们发现Worker也是一个runnable,他的run方法是调用的ThreadPoolExecutor的runWorker方法,也就是当调用worker.thread.start方法的时候是执行的ThreadPoolExecutor的runWorker方法,参数是worker自己,我们看一下

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//穿入的第一个任务
        w.firstTask = null;
        w.unlock(); // worker继承了aqs,实现了lock方法,他的目的是害怕并发执行造成不确定性的后果
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//或者是第一个任务不为null或者是从队列中获得任务不是null,这个getTask方法就是从队列中获得任务,比较重要,等会看
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行获取的任务,执行这个任务的是worker中封装的线程。从这里基本就可以总结出大概了:当线程池添加任务时,会判断worker的数量,如果过小,就会创建worker,添加到集合,用这个worker执行第一个任务,执行完了再从阻塞队列中获取任务。
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//这个方法也很重要,它用来处理在所有的任务都处理完成之后的操作。
        }
    }

看到这里有两个方法比较重要,一个是getTask,一个是processWorderExit,

getTask方法:

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;  // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果现在工作的线程多于corePoolSize就会是true

                if (wc <= maximumPoolSize && ! (timedOut && timed))//刚上来timed是false,跳出第二个for
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?//如果当前的工作线程多余corePoolSize,则是true
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://如果是true,表示有超时时间的等待。从这里就能看出corePoolSize的用处了,如果有超过corePoolSize的线程,那么再从队列中获取任务时就会有时间的等待,而且我们之前传的keepAliveTime是0,也就是说
如果当前队列中没有任务的话,超过corePoolSize那些线程就立即返回,而如果不是超过corePoolSize的线程则会一直等待,
                    workQueue.take();//没有超时时间的等待,会一直阻塞。
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

 在这个getTask方法中,我们看到了corePoolSize的重要作用。

processWorderExit方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
   try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);//这里可以发现,对于那些在获取任务时没有阻塞的线程,在没有任务后就会被移除集合,也就是被垃圾回收掉。但是对于那些阻塞的就不会被回收掉。
    } finally {
        mainLock.unlock();
    }
。。。。。//下面的没看。
}

 

再看一下在Executors中的静态方法:

1、创建一个fixedThreadPool的话,corePoolSize和maximumPoolSize是一样的,因为创建的worker的数量不会超过maximumPoolSize,因此worker的数量最多是corePoolSize,即getTask方法中的timed一定是true 所以所有的线程在从阻塞队列中获取任务时都是会永远阻塞的,不会丢掉worker。并且传入的等待时间是0,所以在从阻塞队列中获得任务时不会阻塞,worker直接直接返回。

 2、创建一个cachedThreadPool,

new ThreadPoolExecutor(0, //线程最小的数量是0,即getTask中所有的timed都是false,所以所有的worker在获取不到任务时都会被抛弃,再有任务时就会创建建立worker
     Integer.MAX_VALUE,//最大值超级大,也就是说创建woker的个数几乎是不受限制的。
     60L, TimeUnit.SECONDS,//每个worker在从阻塞队列中获得任务时可以阻塞60秒。
     new SynchronousQueue<Runnable>()   );//一个同步的queue,比较特殊,插入的线程会阻塞直到一个获取的线程的到来,同样获取的线程也会阻塞直到插入的线程到来。

 可以看出,cachedThreadPool几乎可以被认为是不断创建worker的,worker从阻塞队列中获取任务时如果没有任务时就会阻塞60秒,然后被抛弃。

3、创建一个单线程的线程池:

new ThreadPoolExecutor(1, 1,//核心的数量和最大的数量都是1,也就是永远是1,
                                    0L, TimeUnit.MILLISECONDS,//阻塞直到有任务
                                    new LinkedBlockingQueue<Runnable>())

 

很好的纠正了对线程池的理解。 

 

分享到:
评论

相关推荐

    Java 多线程与并发(7-26)-JUC - 类汇总和学习指南.pdf

    Executors 部分提供了一些线程池类,例如 ThreadPoolExecutor、ScheduledThreadPoolExecutor 等,这些类可以帮助开发者管理线程池。 JUC 框架的类结构可以分为五个部分: * Lock 框架和 Tools 类 * Collections * ...

    个人学习-JUC-笔记

    - **ThreadPoolExecutor**:最常见的线程池实现,包括核心线程数、最大线程数、线程空闲时间等参数。 - **ScheduledExecutorService**:支持定时及周期性任务执行,如ScheduledThreadPoolExecutor。 4. **并发...

    heima-JUC-资料

    1. **线程池**:Java通过`ExecutorService`和相关的实现(如`ThreadPoolExecutor`)提供了线程池的概念。线程池可以有效管理多个线程,避免频繁创建和销毁线程带来的开销,同时可以控制并发执行的任务数量,提高系统...

    JUC-master

    1. **线程池(ExecutorService)**:线程池是JUC中用于管理线程的重要工具,通过ThreadPoolExecutor实现。它可以重用已存在的线程,避免频繁创建销毁线程的开销,提高系统效率。线程池的参数调整(如核心线程数、...

    JavaTutorial:突击 Java 系列教程

    突击并发编程JUC系列-ThreadPoolExecutor 线程池 JVM系统学习之路系列 【JVM系统学习之路系列】 JVM 概述篇 【JVM系统学习之路系列】一篇看懂类加载 【JVM系统学习之路系列】运行时数据区概述和程序计数器 【JVM系统...

    juc-learn:juc相关源码的分析以及使用介绍

    ExecutorService是ThreadPoolExecutor的抽象,它是线程池的核心。通过ExecutorService,我们可以更有效地管理线程,避免频繁创建和销毁线程的开销。ThreadPoolExecutor允许我们配置核心线程数、最大线程数、线程...

    java并发编程:juc线程池

    JUC线程池的核心类是`ThreadPoolExecutor`,它的主要工作原理如下: 1. 当一个任务被提交给线程池时,线程池首先检查当前运行的线程数是否小于`corePoolSize`。如果是,那么就会创建一个新的工作线程来执行任务。...

    juc尚硅谷-自学笔记

    `ExecutorService`接口和其实现类如`ThreadPoolExecutor`提供了线程池的管理和调度功能。 2. **Future和Callable**:`Future`接口代表一个异步计算的结果,而`Callable`接口则用于定义可返回结果的计算任务。两者...

    JUC并发编程学习笔记(硅谷)

    - `ThreadPoolExecutor`:自定义线程池,理解其核心参数如核心线程数、最大线程数、工作队列和拒绝策略。 - `ScheduledExecutorService`:定时任务线程池,可以周期性或延迟执行任务。 7. **并发编程最佳实践** ...

    JUC面试知识点总结1

    6. **线程池**:ExecutorService 和 ThreadPoolExecutor 提供了线程池管理,通过线程池可以有效地控制并发数量,避免大量线程创建和销毁带来的开销,同时线程池内部也实现了线程安全的管理。 7. **并发容器**:Java...

    Java 多线程与并发(16-26)-JUC集合- BlockingQueue详解.pdf

    在实际应用中,`BlockingQueue`和`BlockingDeque`常被用来实现工作队列、缓存、线程池等并发组件,例如`ThreadPoolExecutor`就利用`BlockingQueue`来存储等待执行的任务。 理解`BlockingQueue`和`BlockingDeque`的...

    并发学习并发学习并发学习并发学习并发学习

    其中,`ExecutorService`和`ThreadPoolExecutor`用于管理线程池,`Semaphore`用于控制并发访问的许可数量,`CountDownLatch`和`CyclicBarrier`用于协调线程间的同步。 4. **Fork/Join框架**:Fork/Join框架是JUC的...

    美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇.doc

    ThreadPoolExecutor 是 JUC 中的线程池实现类,它继承自 Executor 接口,提供了 execute() 方法来提交任务。 Executor 接口提供了一个 submit() 方法,可以提交任务并返回 Future 对象,以便可以异步地获取任务的...

    juc2

    2. **线程池**:ThreadPoolExecutor是实现ExecutorService的主要类,允许定制线程池的大小、任务队列以及拒绝策略。线程池的使用可以避免频繁创建和销毁线程带来的开销。 3. **Future和CompletableFuture**:Future...

    JUC代码收集,java高并发多线程学习

    - `ExecutorService`和`ThreadPoolExecutor`:它们构成了线程池的基础,允许开发者有效地管理和调度线程,避免过度创建线程带来的性能开销。 - `BlockingQueue`:这是一个线程安全的数据结构,常用于线程间数据...

    个人学习JUC代码笔记总集

    1. **线程池**:ExecutorService是Java中线程池的核心接口,通过ThreadPoolExecutor和Executors工厂类可以创建不同类型的线程池,如固定大小的线程池、单线程池和缓存线程池。线程池可以有效地管理和控制线程,避免...

    Java——JUC

    通过`ExecutorService`接口及其实现类如`ThreadPoolExecutor`,可以定制线程池的行为,如线程数量、任务队列类型等,提高系统的并行处理能力。 2. **同步容器(Synchronized Collections)** - 包括`BlockingQueue...

    狂神说JUC代码狂神说JUC代码

    ThreadPoolExecutor是ExecutorService的实现,可以自定义线程池的核心线程数、最大线程数、任务队列等参数。 2. **Future和Callable**:Callable接口允许我们在任务中返回一个结果,与Runnable相比更加灵活。Future...

Global site tag (gtag.js) - Google Analytics