之前对线程池的理解很是主观,我的理解是在线程池初始化的时候就生成指定的数量的线程,然后将一些任务添加到一个阻塞队列中,然后多个线程同时从阻塞队列中取任务执行,当没有任务时线程阻塞,今天下午看了下大神的博客以及源码,发现我自己的理解有些偏颇,所以今天再总结一下。
我们创建线程池时一般都是通过使用Executors.newXXX来创建,所以我们从这个方法入手,无论是创建Fixed还是cached,还是single的线程池,都是调用的同一个方法——ThreadPoolExecutor的构造方法,只不过是参数不同。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,//这两二个参数都是整数,用于限制线程的数量,在后面会用到 0L, TimeUnit.MILLISECONDS,//这两个参数都是用来指定从阻塞队列中获取任务的时候的超时时间,more on this later。 new LinkedBlockingQueue<Runnable>());//用来保存任务的阻塞队列 }
上面的构造方法最终调用的构造方法为:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,//上面这5个已经说过了。 ThreadFactory threadFactory,//这个用来产生thread,默认是 Executors.defaultThreadFactory()这个的返回值,他负责产生thread,设置thread的name,优先级,设置为daemon,没有别的作用。 RejectedExecutionHandler handler) {//这个用来处理当过多的任务添加到队列时,如果队列无法添加的时候的操作,默认的是AbortPolicy,当发生上述情况时,会抛一个异常。 。。。。//复制操作省略了 }
我们看一下提交任务的方法execute:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//如果当前的worker(也即是工作的线程数量)小于corePoolSize,进入if,这里的corePoolSize也就是我们在构造方法中穿入的第一个参数,很重要,他表示一个线程池只少要保存的线程的个数,这些线程是不会过期的,如果阻塞队列中没有任务,则会 在阻塞队列的poll方法中阻塞,而超过这个值的线程在没有任务的时候就要被回收掉。 if (addWorker(command, true))//添加一个worker,即一个线程,more on this later, return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//如果上面的没有return,则进入这个if的判断,条件是正在运行,且这个任务能添加到队列中,则进入if int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//重新检查,虽然上面应有个isRunning的判断,但是可能有时间差。进入这个if,如果不是running的话 reject(command);//拒绝这个任务,调用rejectPolicy,默认是抛个异常。 else if (workerCountOf(recheck) == 0)//如果当前的工作线程个数为0,则调用addWorker添加一个工作线程,如果不为0的话,那么就会有线程不停的从阻塞队列中获得任务,所以一定会有线程执行这个任务的。 addWorker(null, false); } else if (!addWorker(command, false))//如果添加到队列中失败,则判断是否可以添加工作线程,如果不可以,则拒绝这个任务。 reject(command); }
经过上面,我们知道了corePoolSize,他是要保存的最小的工作的线程数(more on this later),但是还没有涉及到maximumPoolSize,我们趁热打铁看看addWorder方法
private boolean addWorker(Runnable firstTask, boolean core) {//第一个参数表示如果要创建工作线程的话,他要执行的第一个任务,第二个参数是用来做判断的,如果是true,表示和corePoolSize判断,false表示和maximumPoolSize判断。 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN &&//已经shutdown,不能继续接受任务。 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//工作线程的个数 if (wc >= CAPACITY ||//如果现在已经创建的线程的数量太多, wc >= (core ? corePoolSize : maximumPoolSize))//这里显示了core的作用,如果是true,则根据第一个判断, return false;//如果创建的worker太多,则返回false,表示不能继续创建。 if (compareAndIncrementWorkerCount(c))//使用cas增加c,退出循环,进行创建worker break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask);//创建worker final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//将创建的worker添加到集合中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//调用创建的worker中的Thread的start方法, workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
下面我们看一下Worker这个类:
Worker(Runnable firstTask) {//这个参数表示当这个worker创建成功后执行的第一个任务。 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask;//第一个任务 this.thread = getThreadFactory().newThread(this);//worker中封装了一个线程,并且这个线程执行的任务是this,因为Worker也是一个runnable,也就是说调用的worker类的run方法 }
同时我们发现Worker也是一个runnable,他的run方法是调用的ThreadPoolExecutor的runWorker方法,也就是当调用worker.thread.start方法的时候是执行的ThreadPoolExecutor的runWorker方法,参数是worker自己,我们看一下
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;//穿入的第一个任务 w.firstTask = null; w.unlock(); // worker继承了aqs,实现了lock方法,他的目的是害怕并发执行造成不确定性的后果 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//或者是第一个任务不为null或者是从队列中获得任务不是null,这个getTask方法就是从队列中获得任务,比较重要,等会看 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//执行获取的任务,执行这个任务的是worker中封装的线程。从这里基本就可以总结出大概了:当线程池添加任务时,会判断worker的数量,如果过小,就会创建worker,添加到集合,用这个worker执行第一个任务,执行完了再从阻塞队列中获取任务。 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly);//这个方法也很重要,它用来处理在所有的任务都处理完成之后的操作。 } }
看到这里有两个方法比较重要,一个是getTask,一个是processWorderExit,
getTask方法:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果现在工作的线程多于corePoolSize就会是true if (wc <= maximumPoolSize && ! (timedOut && timed))//刚上来timed是false,跳出第二个for break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { Runnable r = timed ?//如果当前的工作线程多余corePoolSize,则是true workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://如果是true,表示有超时时间的等待。从这里就能看出corePoolSize的用处了,如果有超过corePoolSize的线程,那么再从队列中获取任务时就会有时间的等待,而且我们之前传的keepAliveTime是0,也就是说 如果当前队列中没有任务的话,超过corePoolSize那些线程就立即返回,而如果不是超过corePoolSize的线程则会一直等待, workQueue.take();//没有超时时间的等待,会一直阻塞。 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
在这个getTask方法中,我们看到了corePoolSize的重要作用。
processWorderExit方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w);//这里可以发现,对于那些在获取任务时没有阻塞的线程,在没有任务后就会被移除集合,也就是被垃圾回收掉。但是对于那些阻塞的就不会被回收掉。 } finally { mainLock.unlock(); } 。。。。。//下面的没看。 }
再看一下在Executors中的静态方法:
1、创建一个fixedThreadPool的话,corePoolSize和maximumPoolSize是一样的,因为创建的worker的数量不会超过maximumPoolSize,因此worker的数量最多是corePoolSize,即getTask方法中的timed一定是true 所以所有的线程在从阻塞队列中获取任务时都是会永远阻塞的,不会丢掉worker。并且传入的等待时间是0,所以在从阻塞队列中获得任务时不会阻塞,worker直接直接返回。
2、创建一个cachedThreadPool,
new ThreadPoolExecutor(0, //线程最小的数量是0,即getTask中所有的timed都是false,所以所有的worker在获取不到任务时都会被抛弃,再有任务时就会创建建立worker Integer.MAX_VALUE,//最大值超级大,也就是说创建woker的个数几乎是不受限制的。 60L, TimeUnit.SECONDS,//每个worker在从阻塞队列中获得任务时可以阻塞60秒。 new SynchronousQueue<Runnable>() );//一个同步的queue,比较特殊,插入的线程会阻塞直到一个获取的线程的到来,同样获取的线程也会阻塞直到插入的线程到来。
可以看出,cachedThreadPool几乎可以被认为是不断创建worker的,worker从阻塞队列中获取任务时如果没有任务时就会阻塞60秒,然后被抛弃。
3、创建一个单线程的线程池:
new ThreadPoolExecutor(1, 1,//核心的数量和最大的数量都是1,也就是永远是1, 0L, TimeUnit.MILLISECONDS,//阻塞直到有任务 new LinkedBlockingQueue<Runnable>())
很好的纠正了对线程池的理解。
相关推荐
Executors 部分提供了一些线程池类,例如 ThreadPoolExecutor、ScheduledThreadPoolExecutor 等,这些类可以帮助开发者管理线程池。 JUC 框架的类结构可以分为五个部分: * Lock 框架和 Tools 类 * Collections * ...
- **ThreadPoolExecutor**:最常见的线程池实现,包括核心线程数、最大线程数、线程空闲时间等参数。 - **ScheduledExecutorService**:支持定时及周期性任务执行,如ScheduledThreadPoolExecutor。 4. **并发...
1. **线程池**:Java通过`ExecutorService`和相关的实现(如`ThreadPoolExecutor`)提供了线程池的概念。线程池可以有效管理多个线程,避免频繁创建和销毁线程带来的开销,同时可以控制并发执行的任务数量,提高系统...
1. **线程池(ExecutorService)**:线程池是JUC中用于管理线程的重要工具,通过ThreadPoolExecutor实现。它可以重用已存在的线程,避免频繁创建销毁线程的开销,提高系统效率。线程池的参数调整(如核心线程数、...
突击并发编程JUC系列-ThreadPoolExecutor 线程池 JVM系统学习之路系列 【JVM系统学习之路系列】 JVM 概述篇 【JVM系统学习之路系列】一篇看懂类加载 【JVM系统学习之路系列】运行时数据区概述和程序计数器 【JVM系统...
ExecutorService是ThreadPoolExecutor的抽象,它是线程池的核心。通过ExecutorService,我们可以更有效地管理线程,避免频繁创建和销毁线程的开销。ThreadPoolExecutor允许我们配置核心线程数、最大线程数、线程...
JUC线程池的核心类是`ThreadPoolExecutor`,它的主要工作原理如下: 1. 当一个任务被提交给线程池时,线程池首先检查当前运行的线程数是否小于`corePoolSize`。如果是,那么就会创建一个新的工作线程来执行任务。...
`ExecutorService`接口和其实现类如`ThreadPoolExecutor`提供了线程池的管理和调度功能。 2. **Future和Callable**:`Future`接口代表一个异步计算的结果,而`Callable`接口则用于定义可返回结果的计算任务。两者...
- `ThreadPoolExecutor`:自定义线程池,理解其核心参数如核心线程数、最大线程数、工作队列和拒绝策略。 - `ScheduledExecutorService`:定时任务线程池,可以周期性或延迟执行任务。 7. **并发编程最佳实践** ...
6. **线程池**:ExecutorService 和 ThreadPoolExecutor 提供了线程池管理,通过线程池可以有效地控制并发数量,避免大量线程创建和销毁带来的开销,同时线程池内部也实现了线程安全的管理。 7. **并发容器**:Java...
在实际应用中,`BlockingQueue`和`BlockingDeque`常被用来实现工作队列、缓存、线程池等并发组件,例如`ThreadPoolExecutor`就利用`BlockingQueue`来存储等待执行的任务。 理解`BlockingQueue`和`BlockingDeque`的...
其中,`ExecutorService`和`ThreadPoolExecutor`用于管理线程池,`Semaphore`用于控制并发访问的许可数量,`CountDownLatch`和`CyclicBarrier`用于协调线程间的同步。 4. **Fork/Join框架**:Fork/Join框架是JUC的...
ThreadPoolExecutor 是 JUC 中的线程池实现类,它继承自 Executor 接口,提供了 execute() 方法来提交任务。 Executor 接口提供了一个 submit() 方法,可以提交任务并返回 Future 对象,以便可以异步地获取任务的...
2. **线程池**:ThreadPoolExecutor是实现ExecutorService的主要类,允许定制线程池的大小、任务队列以及拒绝策略。线程池的使用可以避免频繁创建和销毁线程带来的开销。 3. **Future和CompletableFuture**:Future...
- `ExecutorService`和`ThreadPoolExecutor`:它们构成了线程池的基础,允许开发者有效地管理和调度线程,避免过度创建线程带来的性能开销。 - `BlockingQueue`:这是一个线程安全的数据结构,常用于线程间数据...
1. **线程池**:ExecutorService是Java中线程池的核心接口,通过ThreadPoolExecutor和Executors工厂类可以创建不同类型的线程池,如固定大小的线程池、单线程池和缓存线程池。线程池可以有效地管理和控制线程,避免...
通过`ExecutorService`接口及其实现类如`ThreadPoolExecutor`,可以定制线程池的行为,如线程数量、任务队列类型等,提高系统的并行处理能力。 2. **同步容器(Synchronized Collections)** - 包括`BlockingQueue...
ThreadPoolExecutor是ExecutorService的实现,可以自定义线程池的核心线程数、最大线程数、任务队列等参数。 2. **Future和Callable**:Callable接口允许我们在任务中返回一个结果,与Runnable相比更加灵活。Future...