关于ThreadPoolExecutor的一些问题。
1.先看看jdk 1.5中ThreadPoolExecutor的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
Runnable r = addIfUnderMaximumPoolSize(command);
if (r == command)
return;
if (r == null) {
reject(command);
return;
}
// else retry
}
}
这里没有获取mainLock锁,检查了runState还有poolSize,然后如果poolSize还没超过核心数会调用addIfUnderCorePoolSize方法。
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
这里对获得了mainLock锁,然后再次对poolSize做了检查,但是却
没有检查runState。这个应该是个错误(addIfUnderMaximumPoolSize方法也一样)。如果在获得mainLock之前,进行了线程调度,另外的线程抢先将runState变为TERMINATED状态就会有问题。这时虽然已经是TERMINATED,但是仍然会新建一个worker,然后worker会执行这个任务,再之后会调用getTask方法,获取新任务。来看一下getTask方法。
Runnable getTask() throws InterruptedException {
for (;;) {
switch(runState) {
case RUNNING: {
if (poolSize <= corePoolSize) // untimed wait if core
return workQueue.take();
long timeout = keepAliveTime;
if (timeout <= 0) // die immediately for 0 timeout
return null;
Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
if (r != null)
return r;
if (poolSize > corePoolSize) // timed out
return null;
// else, after timeout, pool shrank so shouldn't die, so retry
break;
}
case SHUTDOWN: {
// Help drain queue
Runnable r = workQueue.poll();
if (r != null)
return r;
// Check if can terminate
if (workQueue.isEmpty()) {
interruptIdleWorkers();
return null;
}
// There could still be delayed tasks in queue.
// Wait for one, re-checking state upon interruption
try {
return workQueue.take();
} catch(InterruptedException ignore) {}
break;
}
case STOP:
return null;
default:
assert false;
}
}
}
没有case TERMINATED,也就是应该进入default。这时内部状态就出问题了。如果开启了断言这时应该会抛出AssertionError,这还算好的。如果没有开断言会忽略掉assert false,然后会retry继续执行for循环,一直循环下去。这样就会有个线程被泄露掉。
可以写段代码试一下
public static void main(String[] args) {
final ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Runnable() {
public void run() {
try {
TimeUnit.NANOSECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
exec.shutdownNow();
}
});
exec.execute(new Runnable() {
public void run() {
System.out.println("run");
}
});
}
为了使错误可以容易出现,可以在addIfUnderMaximumPoolSize方法中获取mainLock锁之前,设个断点让线程停一下,然后再继续就可以看到问题了。
在jdk 1.6中,ThreadPoolExecutor的相关方法已经被重写了,已经没有这个问题了。
2.如果worker在执行任务时抛出异常会导致池中的一个线程被终结。ThreadPoolExecutor的工作方式是每次提交新任务时如果运行的线程少于corePoolSize,则创建新线程来处理请求;达到corePoolSize之后则加到任务队列中。所以假设线程池已经达到了corePoolSize,后续的任务都加到了任务队列中,然后某个任务抛出了异常使得线程被终结。这时实际上线程池的poolSize已经小于corePoolSize了,如果这时我们又调用方法提交了任务,则肯定会执行addIfUnderCorePoolSize方法,再增加一个worker线程。但是如果没有再提交任务的话,就算任务队列中有任务,线程池是不会主动去增加线程的。如果比较极端,有很多任务在执行中都抛了异常,结果所有的线程都被终结了,那会不会使线程池变成空池,导致任务队列中的任务得不到执行呢?其实是不会的
private void tryTerminate() {
if (poolSize == 0) {
int state = runState;
if (state < STOP && !workQueue.isEmpty()) {
state = RUNNING; // disable termination check below
Thread t = addThread(null);
if (t != null)
t.start();
}
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll();
terminated();
}
}
}
当worker线程终结时,会调用上边的这个方法。从代码中可以看到如果poolSize是0,但队列不是空的,则会调用addThread新建一个线程。这个是jdk 1.6中的源码,jdk 1.5中和这个不完全一样,但是也会保证线程池不会意外的变成空池,而导致任务队列中的任务得不到执行。但是这样处理也只能保证有一个线程来处理队列中的任务,所以极端的情况下,线程池有可能会只剩下一个worker线程。
只保证有一个线程来执行任务貌似不是太好,如果任务较多的话,一个线程可能太累了,而且任务之间如果有协作关系的话,还有可能会由于线程饥饿而导致死锁。我觉得是不是应该这样更好一些
private void tryTerminate() {
if (poolSize == 0) {
int state = runState;
if (state < STOP && !workQueue.isEmpty()) {
state = RUNNING; // disable termination check below
int n = workQueue.size();
while (n-- > 0 && pooSize < corePoolSize) {
Thread t = addThread(null);
if (t != null)
t.start();
else
break;
}
}
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll();
terminated();
}
}
}
分享到:
相关推荐
Java 线程池例子 ThreadPoolExecutor Java 中的线程池是指一个容器,里面包含了多个线程,这些线程可以重复使用,以避免频繁创建和销毁线程的开销。ThreadPoolExecutor 是 Java 中一个非常重要的线程池实现类,它...
ThreadPoolExecutor使用和思考
ThreadPoolExecutor源码解析.md
了解ThreadPoolExecutor的这些核心概念和工作流程,能帮助开发者更好地调整线程池参数,优化并发性能,以及在系统出现问题时进行排查和定位。在实际开发中,合理配置线程池参数,避免线程池过载,是保障系统稳定运行...
1.资源简介:PyQt5中使用多线程模块QThread解决了PyQt5界面程序执行比较耗时操作时,程序卡顿出现的无响应以及界面输出无法实时显示的问题,采用线程池ThreadPoolExecutor解决了ping多个IP多任务耗时问题。...
(转)线程池:java_util_ThreadPoolExecutor 比较详细的介绍了ThreadPoolExecutor用法与属性
线程池是多线程编程中一种高效管理线程资源的方式,主要由Java的`ThreadPoolExecutor`类实现。线程池的工作机制在于控制线程数量,它会将任务放入队列,然后根据线程池的设定创建并启动线程执行这些任务。如果线程...
ThreadPoolExecutor的使用和Android常见的4种线程池使用介绍
线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...
除了通过构造函数自定义线程池外,Java还提供了一些预定义的线程池,它们通过 `Executors` 工具类创建,适用于不同的应用场景。 1. **newFixedThreadPool**:创建一个固定大小的线程池,线程池的大小由参数 `...
NULL 博文链接:https://bijian1013.iteye.com/blog/2284676
"JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...
死磕ThreadPoolExecutor线程池.pdf!!死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf
线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor
今天,我们将对线程池ThreadPoolExecutor进行详细的介绍,并提供一些实际的使用示例。 一、线程池ThreadPoolExecutor简介 ThreadPoolExecutor是Java中的一个类,它实现了Executor接口,用于管理线程池中的线程。...
JDK1[1].5中的线程池(ThreadPoolExecutor)使用简介