`
szhnet
  • 浏览: 109449 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

关于ThreadPoolExecutor的一些问题

阅读更多
关于ThreadPoolExecutor的一些问题。
1.先看看jdk 1.5中ThreadPoolExecutor的execute方法
 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        for (;;) {
            if (runState != RUNNING) {
                reject(command);
                return;
            }
            if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                return;
            if (workQueue.offer(command))
                return;
            Runnable r = addIfUnderMaximumPoolSize(command);
            if (r == command)
                return;
            if (r == null) {
                reject(command);
                return;
            }
            // else retry
        }
    }

    这里没有获取mainLock锁,检查了runState还有poolSize,然后如果poolSize还没超过核心数会调用addIfUnderCorePoolSize方法。
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

    这里对获得了mainLock锁,然后再次对poolSize做了检查,但是却没有检查runState。这个应该是个错误(addIfUnderMaximumPoolSize方法也一样)。如果在获得mainLock之前,进行了线程调度,另外的线程抢先将runState变为TERMINATED状态就会有问题。这时虽然已经是TERMINATED,但是仍然会新建一个worker,然后worker会执行这个任务,再之后会调用getTask方法,获取新任务。来看一下getTask方法。
 Runnable getTask() throws InterruptedException {
        for (;;) {
            switch(runState) {
            case RUNNING: {
                if (poolSize <= corePoolSize)   // untimed wait if core
                    return workQueue.take();
                
                long timeout = keepAliveTime;
                if (timeout <= 0) // die immediately for 0 timeout
                    return null;
                Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
                if (r != null)
                    return r;
                if (poolSize > corePoolSize) // timed out
                    return null;
                // else, after timeout, pool shrank so shouldn't die, so retry
                break;
            }

            case SHUTDOWN: {
                // Help drain queue 
                Runnable r = workQueue.poll();
                if (r != null)
                    return r;
                    
                // Check if can terminate
                if (workQueue.isEmpty()) {
                    interruptIdleWorkers();
                    return null;
                }

                // There could still be delayed tasks in queue.
                // Wait for one, re-checking state upon interruption
                try {
                    return workQueue.take();
                } catch(InterruptedException ignore) {}
                break;
            }

            case STOP:
                return null;
            default:
                assert false; 
            }
        }
    }

    没有case TERMINATED,也就是应该进入default。这时内部状态就出问题了。如果开启了断言这时应该会抛出AssertionError,这还算好的。如果没有开断言会忽略掉assert false,然后会retry继续执行for循环,一直循环下去。这样就会有个线程被泄露掉。
    可以写段代码试一下
public static void main(String[] args) {
		final ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new Runnable() {

			public void run() {
				try {
					TimeUnit.NANOSECONDS.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
				exec.shutdownNow();
			}
			
		});
		
		exec.execute(new Runnable() {

			public void run() {
				System.out.println("run");
			}
			
		});
	}

    为了使错误可以容易出现,可以在addIfUnderMaximumPoolSize方法中获取mainLock锁之前,设个断点让线程停一下,然后再继续就可以看到问题了。
    在jdk 1.6中,ThreadPoolExecutor的相关方法已经被重写了,已经没有这个问题了。

2.如果worker在执行任务时抛出异常会导致池中的一个线程被终结。ThreadPoolExecutor的工作方式是每次提交新任务时如果运行的线程少于corePoolSize,则创建新线程来处理请求;达到corePoolSize之后则加到任务队列中。所以假设线程池已经达到了corePoolSize,后续的任务都加到了任务队列中,然后某个任务抛出了异常使得线程被终结。这时实际上线程池的poolSize已经小于corePoolSize了,如果这时我们又调用方法提交了任务,则肯定会执行addIfUnderCorePoolSize方法,再增加一个worker线程。但是如果没有再提交任务的话,就算任务队列中有任务,线程池是不会主动去增加线程的。如果比较极端,有很多任务在执行中都抛了异常,结果所有的线程都被终结了,那会不会使线程池变成空池,导致任务队列中的任务得不到执行呢?其实是不会的
private void tryTerminate() {
        if (poolSize == 0) {
            int state = runState;
            if (state < STOP && !workQueue.isEmpty()) {
                state = RUNNING; // disable termination check below
                Thread t = addThread(null);
                if (t != null)
                    t.start();
            }
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }

    当worker线程终结时,会调用上边的这个方法。从代码中可以看到如果poolSize是0,但队列不是空的,则会调用addThread新建一个线程。这个是jdk 1.6中的源码,jdk 1.5中和这个不完全一样,但是也会保证线程池不会意外的变成空池,而导致任务队列中的任务得不到执行。但是这样处理也只能保证有一个线程来处理队列中的任务,所以极端的情况下,线程池有可能会只剩下一个worker线程。
    只保证有一个线程来执行任务貌似不是太好,如果任务较多的话,一个线程可能太累了,而且任务之间如果有协作关系的话,还有可能会由于线程饥饿而导致死锁。我觉得是不是应该这样更好一些
private void tryTerminate() {
        if (poolSize == 0) {
            int state = runState;
            if (state < STOP && !workQueue.isEmpty()) {
                state = RUNNING; // disable termination check below
                int n = workQueue.size();
                while (n-- > 0 && pooSize < corePoolSize) {
                	 Thread t = addThread(null);
                     if (t != null)
                         t.start();
                     else
                    	 break;
                }
            }
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }
0
1
分享到:
评论
1 楼 store88 2010-07-27  
我用了1.6.0_18 linux 32 的,也死锁了

相关推荐

Global site tag (gtag.js) - Google Analytics