一、序言
关于“池”的概念,我的理解是它是为了让我们更快的获得资源,节省时间,在我所知的所有池(线程池、连接池、常量池、缓存池、对象池等等),都是这个作用,这里我们仅仅分享线程池的相关理解。
1.我们什么时候要用线程池?
在JAVA 里面我们一切都是对象,线程(Thread)同样也是对象,只要是对象那么就要涉及创建、使用、回收等三个主要步骤。通常情况下,创建线程的时间 和 回收(销毁)线程的时间的开销,由JVM控制,而使用过程由我们控制。假设我们在使用时间很短,并且发生频率很高的情况下,那么线程的频繁创建和销毁就会占用大量的的时间,为了减少这种开销,我们利用线程池技术,创建一个线程池,用的时候从里面拿,不用了放回去,再空闲的时间再进行销毁,能节省时间。
并且在一定程度上,无状态的线程是可以复用的,可以减少对象的创建。
二、功能介绍和设计:我们将分析JAVA线程池,然后写一个出来方便理解
1.创建线程池:
// 这是JDK 提供的此线程池的创建 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
a.corePoolSize :线程池基本线程数,因为一般池的创建分为两种,一种是初始化的时候就进行创建默认的最小连数目,还有一种是当使用的时候才创建,直到创建的数目超过基本数值的时候,后面的获取,才根据池里面的数量空闲状态进行返回,在此之前,都是一直创建,直到超过基本值。
b.maximumPoolSize:线程池最大容量
c.keepAliveTime:线程活动保持时间 ,线程在池里面的空闲的时间,相当于外包人员,在长时间没有接到任务的时候,就让他离职!减少负担。
d.unit:时间单位,这个可以参考TimeUtil 提供了很多精确的时间类型
e.workQueue:工作队列,这里负责维护多个任务调度,比如当我们进网吧的时候是需要刷卡的,当人比较多的时候需要排队,而且同时还要应对结账的人员,这里用队列的方式进行管理。
至于BlockingQueue 的实现,我们后面再介绍。
f.threadFactory:线程工厂,这里面提供了多种线程的创建方式
g:RejectedExecutionHandler:饱和策略,和缓存池类似,我们不可能无限制的分配下去,当线程数量到最大值的时候,我们需要用一种策略进行处理。
具体的处理策略我们也留到后面。
我们的设计如下:
2.功能体现:
既然是线程池,里面肯定放的是线程,同时我们肯定要有执行线程的方法execute,先来看看JDK这部分主体代码:
/** * 自己实现线程池,为了方便理解,按照JDK 的进行编写 * * @author Ran */ public class ThreadPool { private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile ThreadFactory threadFactory; private final BlockingQueue<Runnable> workQueue; private volatile long keepAliveTime; private volatile RejectedExecutionHandler handler; // 当前实际线程数 private volatile int poolSize; // 状态锁,主要用于对poolSize,corePoolSize,maximumPoolSize,runState,workers 更新时的锁定 private final ReentrantLock mainLock = new ReentrantLock(); // 线程的一些状态 volatile int runState; static final int RUNNING = 0; // 不接受新任务了,但是已经加入队列的还会执行 static final int SHUTDOWN = 1; // 停止了,队列里面的任务也不执行了 static final int STOP = 2; // 全部停止,会关闭所有正在执行的线程 static final int TERMINATED = 3; // 存放工作线程的集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 记录线程池到达的最高峰值 的线程数 private int largestPoolSize; // 构造 public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } // 执行方法 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 如果线程池数,超过我们的基本连接数,直接执行下面 // 如果当前线程数小于基本线程数,就创建并执行 ,返回 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 如果创建失败再次检测,如果有正在取线程的就放进去 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) // 如果线程池不处于运行状态,或者是第一个线程进入,执行 // 确保始终有一个线程执行该任务 ensureQueuedTaskHandled(command); }else if (!addIfUnderMaximumPoolSize(command)) // 否则就执行其他策略 reject(command); // is shutdown or saturated } } // 添加并返回 private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 可以看出,当前线程数是小于基本线程数,并且线程池处于活动状态 // 那么就进行创建 添加操作 if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; //然后会启动该线程 t.start(); return true; } // 添加操作 private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); // 创建,这路的创建,如果传的null, 会new 一个,默认在DefaultThreadFactory // 里面 可以看到 Thread t = threadFactory.newThread(w); if (t != null) { // 可以看出,创建的线程实际是worker 对象,里面封装了很多内容 w.thread = t; // 然后保存进去。返回 workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; } // 排队后从新检查状态,如果处于非RUNNING 状态,会把刚才队列里面的清掉 // 确保有一个线程来处理这个任务(前提是addThread 要成功) private void ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean reject = false; Thread t = null; try { // 重新检查状态 int state = runState; // 如果挂了,就把刚才那个移除返回true,执行处理的策略了 if (state != RUNNING && workQueue.remove(command)) reject = true; else if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) // 如果线程池禁止添加新任务 了,并且队列不为空,并且基本数未满 t = addThread(null); } finally { mainLock.unlock(); } if (reject) reject(command); else if (t != null) t.start(); } // 调用拒绝执行的策略 void reject(Runnable command) { //handler.rejectedExecution(command, this); } // 队列添加失败(一般是满了)的时候,在满足条件的情况下,会再次创建新 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } // 这是内部类,里面对封装了,我们对线程 private final class Worker implements Runnable { // 针对每个线程任务进行控制, private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; Thread thread; public Worker(Runnable firstTask) { this.firstTask = firstTask; } @Override public void run() { } } }
执行方法小结:
上面代码,估计大家看着有点乱,这里我进行描述,然后借助图例,然后再回过头进行理解,就清楚了。整个执行方法无非就坐了几件事情:
1.当前线程数poolSize 小于 corePoolSize基本线程数,也就是说连最小、最基本的线程数都没满足,那么就会执行addIfUnderCorePoolSize方法。
1.1 addIfUnderCorePoolSize 主要是把一个线程包装成Worker对象,会执行addThread方法,用工厂创建线程,然后保存在我们的集合里,然后执行该线程,OK 返回true,失败返回false.
2.当上面条件不满足的情况,我们会看workQueue 是否满了,如果workQueue.offer(command) 成功,表示未满,就保存进队列。
2.1成功判断如果是第一个进入的线程,poolSize == 0,那么会执行ensureQueuedTaskHandled方法,该方法会再次验证线程池处于什么状态,如果是非RUNNING了,就把刚才加入队列的线程,移除,然后执行拒绝的策略,如果remove 失败了,会创建一个线程来,保证该
3. 如果workQueue.offer(command) 失败,说明队列满了,会执行addIfUnderMaximumPoolSize 方法,该方法是去判断线程池是否满了,未满的情况下 ,创建(包装)线程,并且执行,返回true, 否则返回false.
执行拒绝策略。
好吧,如果你还无法理解,我还是用实际例子:
假设我有4台电脑,准备让4个人帮我打文件:corePoolSize = 4.这时候来了3个人 3<4. 我就给他们穿上工作服(Worker包装),然后 让他们做事(addIfUnderCorePoolSize),过了一会又来2个人,这时候发现人够了,那么就把新来的放进等候室(队列)(workQueue.offer(command) ),然后人来多了,再让他们去等候间的时候,发现人满了,这是就要用你指定的策略了(RejectedExecutionHandler)。当然还有一种可能是也许第一个人进来的时候,可以add 失败,也许电脑就坏了,或者卡死了(!RUNNING),这时候就会为了确这个人有活干(ensureQueuedTaskHandled)你得重新检查一下,重新弄弄系统什么的吧!
下面我copy 的逻辑图,给大家再理解理解,图片来源:http://ifeve.com/java-threadpool/
3. 细节处理
上面我们初步解释了连接池的工作原理,但是里面线程怎么工作,怎么管理,以及淘汰策略怎么完成的,这些现在进行解释:
3.1 线程如何工作?
我们回到worker 里面的run 方法:
@Override public void run() { try { Runnable task = firstTask; firstTask = null; // 一直从队列里面取,知道执行完成 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this);// 退出
上面主要有getTask 和 runTask ,我们分别来看看 这里面做了什么吧;
Runnable getTask() { for (;;) { try { int state = runState; // 放弃队列里面任务执行,直接返回 if (state > SHUTDOWN) return null; Runnable r; // 该状态,会返回已经加入队列连的线程 if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); // 如果超过的基本线程,并且allowCoreThreadTimeOut // 参数允许回收 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // 获取超时,那么就会回收 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else // 否则就一直等待可以线程进入 r = workQueue.take(); if (r != null) return r; // 判断是否可以退出 if (workerCanExit()) { // 如果已经停了,就中断所有工作的线程(除当 // 前线程) if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
我们可以看出,getTask主要是返回队列中的任务,过程中根据池的不同状态做不同处理,获得task之后,我我们再看看runTask 的执行。
private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { // 这段代码的锁和线程池的锁不一样,用多次判断!我不明所以。。 // 在JDK 1.7 里面的变化挺大的,大家可以去参考里面的。 if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); boolean ran = false; // 这是在任务之前执行方法,方便你重写的的。 beforeExecute(thread, task); try { // 看出来了,还是用的Runable 的run 方法 task.run(); ran = true; // 这里也方便重写 afterExecute(task, null); // 这里会记录完成任务的数量 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } }
虽然解释得不很清楚,我相信至少能理解线程在线程池里面主要的额工作流程了。
小结:
1.上面分析了JDK 线程池的基本实现原理,仅供参考
2.JDK1.7 的变化挺大的,我表示很无奈,以后分析源码,还是往JDK1.8 上面靠吧,不然被淘汰了都不知道!
3.整个流程有些在没加锁的地方,老是喜欢用多次判断的方式,因为volatile的可见性,确实可以这么做,我不得不吐槽,写得并不好,这里JDK 1.7 里面进行的大量重构!
4.有写得不好,或者不明白的地方,欢迎大家一起提出,分享,由于篇幅和知识吸收的关系,里面的其他策略,下次分享,这里仅仅分析主要工作流程。
相关推荐
《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...
2. **增加工作线程**:如果当前工作的线程数少于核心线程数,则创建一个新的工作线程来执行任务;如果当前工作的线程数已经达到或超过核心线程数,则进入下一步。 3. **使用阻塞队列**:将任务放入阻塞队列中等待...
本文将深入分析Worker线程的执行流程,特别是其源码实现。 首先,Worker类是ThreadPoolExecutor的一个内部类,它继承了AbstractQueuedSynchronizer(AQS),这是一个抽象的同步队列,用于实现锁和其他同步组件的...
《Android应用源码解析——基于SundPoolSample的深度学习》 在当今移动互联网时代,Android作为主流的智能...通过实际操作和源码分析,我们可以将理论知识转化为实战经验,进一步提升我们的编程素养和问题解决能力。
本文将深入探讨“多线程环境下快速登录YY案例源码”这一主题,帮助你理解如何利用多线程优化登录流程,提升用户体验。 标题中的“多线程环境下快速登录YY案例源码”意味着这个示例代码着重展示了如何在多个线程中...
在源码分析方面,`ThreadPoolExecutor`的工作流程主要包括: 1. 如果当前线程数小于核心线程数,会立即创建新线程执行任务。 2. 如果当前线程数等于核心线程数,任务会被放入任务队列。 3. 如果任务队列已满且线程数...
- 对于Java开发者,`jstack`是分析线程状态的常用工具,可以帮助诊断死锁等问题。 综上所述,"Thread(线程)"这一主题涵盖了大量的理论知识和实践经验,包括线程的创建、管理、同步、通信、性能优化等方面。通过...
4. **`RunandReset`源码分析**:`RunandReset`是一个内部操作,用于决定线程是否应该重新执行任务。如果线程在执行过程中被中断,`RunandReset`会决定是否恢复中断标志并重新提交任务。 5. **`set`方法设置返回值**...
通过分析Java线程池源码,我们可以学习到如何合理配置线程池参数,如何选择合适的工作队列,以及如何处理拒绝策略,从而在实际开发中更好地利用多线程来提高程序效率。此外,源码阅读也有助于理解Java并发库的设计...
通过这种方式,可以灵活地控制线程的工作流程,比如优先处理高优先级的任务。 3. **并发控制**:为了避免过多的并发请求导致服务器压力过大,源码可能会包含一些限制并发数的策略,如设置线程池的最大线程数,或者...
线程池是Java多线程编程中不可或缺的一部分,它通过管理一组可重用线程来提高应用程序的性能和效率。在Java中,`java.util.concurrent` 包中的 `ExecutorService` 和其子类如 `ThreadPoolExecutor` 提供了线程池的...
- **WorkerThread**:每个任务都在一个单独的 WorkerThread 中执行,这些线程由 ThreadPoolExecutor 创建并管理。 - **InternalHandler**:AsyncTask 使用 InternalHandler 在后台线程和 UI 线程之间通信,将结果和...
在Java编程中,线程池是一种管理线程的机制,它可以帮助我们更有效地调度和执行并发任务。...结合`SurgeryThreadPool.java`源码分析和`Java.jpg`中的示例,我们可以进一步理解线程池在实际项目中的具体实现和应用。
- **IntentService**:默认在后台服务进程中运行,自动管理线程,适合执行一次性任务。 6. **文件名称列表解析** - "module-initializer-master"可能包含项目的主目录,可能包括源码、配置文件、示例等,通过阅读...
源码分析更是能够深入理解其工作原理,帮助提升编程技能。 首先,我们要明白什么是多线程。在单核CPU系统中,多线程是指在同一个进程中,同时执行多个不同的线程,以此来提高CPU的利用率和程序的响应速度。Java提供...
9. **源码分析**:阅读和理解JDK中Thread类和其他相关类的源码,有助于深入理解线程的工作原理,例如线程调度、中断机制等。 10. **性能优化**:合理设计线程数量、避免过度竞争、合理使用同步机制等,都是提升多...
源码分析** 阅读源码可以帮助我们更好地理解线程池的内部机制。例如,`ThreadPoolExecutor.execute()`方法是如何处理任务的提交,`ThreadPoolExecutor.shutdown()`和`shutdownNow()`又是如何优雅地关闭线程池。 **...
以下是一个简单的Handler-Looper-MessageQueue工作流程: 1. 在工作线程中初始化Looper。 2. 创建一个Handler实例,关联到该Looper。 3. 在主线程中,创建Message对象并设置其目标Handler,然后将消息放入Message...
【标题】基于Java的短信网关平台源码分析 短信网关平台是通信系统中的重要组成部分,它负责处理和传递短信服务请求。本项目以Java为开发语言,提供了丰富的功能和可靠的性能,对于理解Java在大型系统中的应用,以及...
本文将深入分析其源码,了解其工作原理和设计思路。 1. **初始化与配置** - UIL的初始化涉及到`ImageLoaderConfiguration`的构建,这包括内存缓存、磁盘缓存、线程池和图片处理策略等设置。`DiskCacheUtils`和`...