1 、ThreadPoolExecutor常量:
//线程池生命周期的控制标识
volatile int runState;
//接收新任务、处理排队的任务
static final int RUNNING = 0;
//停止接收新任务,但处理队列任务
static final int SHUTDOWN = 1;
//不接受不处理,中断正在处理的任务
static final int STOP = 2;
//与stop相似,终结所有线程
static final int TERMINATED = 3;
2、工作队列:
/**
* 用来hold住任务,然后交给线程池中的工作线程(相当于一个缓冲队列)
* 任务等待队列(当所有工作线程都处于忙碌状态,队列未满,则会放在等待队列中)
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
* 所有的工作线程集合
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
3、连接池构造函数参数:
/**
* 线程池维护线程的最少数量
*/
private volatile int corePoolSize;
/**
* 线程池最大线程数
*/
private volatile int maximumPoolSize;
/**
* 当前线程池大小
*/
private volatile int poolSize;
/**
* 保持空闲线程活跃时间,空闲线程等待工作时间,如果超过这个时间,而且当前线程池
* 线程数大于corePoolSize数,则销毁线程
*/
private volatile long keepAliveTime;
/**
* Handler called when saturated or shutdown in execute.
* 当执行过程中线程池关闭或者任务饱和了会调用这个handler来处理
* 线程池对拒绝任务的处理策略
* (如果线程池已经到了maxSize,而且工作队列已满,此时如果还有新任务过来,就需要根据不同的策略来处理这些任务)
*/
private volatile RejectedExecutionHandler handler;
/**
* Factory for new threads. All threads are created using this
* factory (via method addThread)
* 线程工厂,所有的线程都是通过它来创建的,其实是调用addThread()方法来创建的
*/
private volatile ThreadFactory threadFactory;
/**
* The default rejected execution handler
* 默认的拒绝处理策略(队列满,且线程池线程数已经>=max线程数)
* 默认抛出异常
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
4、handler拒绝处理策略:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
* Always! 直接就抛出异常
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* 重新添加并执行这个任务,如果线程池已经关闭了,那这个任务将会被抛弃,
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//这里是判断当前线程池的runState状态是否是RUNNING,如果不是,则不接受新任务,也就不执行这个任务
if (!e.isShutdown()) {
r.run();
}
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
* 丢弃当前任务,不做任何处理
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
* 丢弃旧的任务(队列的头部)
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//判断线程池状态是否为RUNNING
if (!e.isShutdown()) {
//移除队列的头(最老的任务,队列是先进先出嘛)
e.getQueue().poll();
//任务添加进去
e.execute(r);
}
}
}
5、一个任务是通过execute(Runnable command) 方法添加到线程池中,任务是一个Runnable对象,执行的方法就是Runnable里面重写的run()方法:
一个任务加入到线程池的几种情况:
1)、线程池线程数<corePoolSize(最小线程数),即使线程池中的线程都是空闲状态,也会立刻创建一个新线程来处理当前任务。
2)、线程池线程数等于corePoolSize(最小线程数),但缓冲队列workQueue未满,则任务会被放入到缓冲队列中,等待线程池空闲线程处理。
3)、线程池线程数>corePoolSize(最小线程数),但缓冲队列workQueue已满,并且线程池中的线程数<maximumPoolSize(最大线程数),则建立新的线程来处理当前任务
4)、线程池线程数=maximumPoolSize,且缓冲队列workQueue已满,此时handler就会根据初始化时设置的任务拒绝策略来处理当前任务。
5)、当线程池线程数>corePoolSize,如果某空闲线程空闲时间超过keepAliveTime,则该线程将被终止。
看到一个简短的例子比喻的不错:
ThreadPoolExecutor相当于是个工作小组,这个小组里员工至少是2人,如果他们忙不过来,任务就被放到任务队列中等待处理。如果积压的任务过多,任务队列也满了,则只能雇佣新员工来处理,但是也不能无限的雇佣,毕竟要控制成本嘛,所以最多只能雇佣2人,如果所有的人都在忙,任务队列还是满的,来了新任务那就只能根据老板的意思去处理了(大不了不做你的生意了,直接扔掉任务),当任务少了,有空闲员工在那儿闲了5分钟都没干活了,老板当然不干啦,炒了空闲的员工老板心里就爽多了,当然整个工作小组还是要有人在的,最少不能少于最初的2人,不然没人给干活老板喝西北风啊。
6、ThreadPoolExecutor核心方法execute(Runnable command) 处理任务:
public void execute(Runnable command) {
//为空,则抛出空指针
if (command == null)
throw new NullPointerException();
//判断是否 当前线程池大小>线程池最小值(如果是小于,不满足条件则创建线程)
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//当前线程大于最小线程,这里来判断任务队列是否已满,返回true,则添加到缓冲队列
if (runState == RUNNING && workQueue.offer(command)) {
//任务排队之后,再复核一下线程池状态
if (runState != RUNNING || poolSize == 0)
//只有当任务重排状态改变之后才会调用此方法
ensureQueuedTaskHandled(command);
}
//此时缓冲队列已满,这时再与最大线程数来比较
//如果小于最大线程数,则创建线程执行,否则由任务拒绝策略来处理
else if (!addIfUnderMaximumPoolSize(command))
//根据拒绝策略处理任务
reject(command); // is shutdown or saturated
}
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//当前线程池大小<初始化线程大小 此时创建线程
if (poolSize < corePoolSize && runState == RUNNING)
//创建并返回线程
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
//启动任务
t.start();
return true;
}
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//线程池大小小于最大线程数 此时创建线程
if (poolSize < maximumPoolSize && runState == RUNNING)
//创建并返回线程
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
//启动任务
t.start();
return true;
}
7、 ThreadPoolExecutor创建新线程的方法:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
//通过ThreadFactory线程工厂来创建一个新线程
//ThreadFactory接口中唯一一个方法Thread newThread(Runnable r);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
//把刚创建的线程加入到工作线程集合中
workers.add(w);
//当前线程数加1
int nt = ++poolSize;
//如果此时线程数>线程池最大线程到达数
if (nt > largestPoolSize)
largestPoolSize = nt;
}
//返回新创建的线程
return t;
}
分享到:
相关推荐
Java 线程池例子 ThreadPoolExecutor Java 中的线程池是指一个容器,里面包含了多个线程,这些线程可以重复使用,以避免频繁创建和销毁线程的开销。ThreadPoolExecutor 是 Java 中一个非常重要的线程池实现类,它...
《ThreadPoolExecutor源码解析》 ThreadPoolExecutor是Java并发编程中重要的组件,它是ExecutorService接口的实现,用于管理和调度线程的执行。理解其源码有助于我们更好地控制并发环境下的任务执行,提高系统的...
线程池是多线程编程中一种高效管理线程资源的方式,主要由Java的`ThreadPoolExecutor`类实现。线程池的工作机制在于控制线程数量,它会将任务放入队列,然后根据线程池的设定创建并启动线程执行这些任务。如果线程...
ThreadPoolExecutor的使用和Android常见的4种线程池使用介绍
"JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...
(转)线程池:java_util_ThreadPoolExecutor 比较详细的介绍了ThreadPoolExecutor用法与属性
ThreadPoolExecutor源码解析.md
1.资源简介:PyQt5中使用多线程模块QThread解决了PyQt5界面程序执行比较耗时操作时,程序卡顿出现的无响应以及界面输出无法实时显示的问题,采用线程池ThreadPoolExecutor解决了ping多个IP多任务耗时问题。...
ThreadPoolExecutor使用和思考
JDK1[1].5中的线程池(ThreadPoolExecutor)使用简介
线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor
### ThreadPoolExecutor 运转机制详解 #### 一、ThreadPoolExecutor 的基本概念与构造函数解析 在Java并发编程中,`ThreadPoolExecutor` 是一种强大的工具,它可以帮助开发者有效地管理和执行线程。`...
线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...
在IT行业中,Redis和Java的ThreadPoolExecutor是两个非常重要的工具,它们在处理高并发和任务调度方面发挥着关键作用。Redis是一种高效的键值存储系统,常用于缓存、消息队列等场景。而ThreadPoolExecutor是Java并发...
在《阿里巴巴java开发手册》中...另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。
《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...
线程池通过ThreadPoolExecutor类实现,这是一个高度可配置的工具,能够根据具体需求定制线程的创建、管理和销毁策略。 ThreadPoolExecutor的核心参数包括: 1. corePoolSize:核心线程数,这是线程池在非繁忙状态下...
ThreadPoolExecutor是Java并发编程中非常重要的一个组件,它位于`java.util.concurrent`包下,用于管理线程资源,实现线程池服务。线程池通过有效地控制并发执行的任务数量,可以提高系统的性能和稳定性。 ...
从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。 相比 threading 等模块,该模块通过 submit 返回的是一个 future ...