论坛首页 Java企业应用论坛

Common Pool对象池的中断策略和ThreadPoolExecutor中断策略

浏览 5837 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2010-07-04   最后修改:2010-12-18

题记:

     看到摊子里gogole_09同学分享了他阿里的面筋,其中有个问题提到:线程池的中断策略有哪些?各有什么特点?想想自己也看过Common的对象池源码和用过Sun的ThreadPoolExecutor线程池,却没注意到“中断策略”,这是一个所有缓存池需要考虑的异常问题。

 

Common Pool中断策略

    竟然是中断策略,直接看Common Pool中的borrowObject,从池中取对象,当没有可用的对象的时候的策略。

   

    **StackObjectPool(以Stack为存储结构的Pool)

   

   public synchronized Object borrowObject() throws Exception {
     ....
    Object obj = null;// 取对象
   ...
     obj = _factory.makeObject(); // 制造对象
     newlyCreated = true;
     if (obj == null) {
                    throw new NoSuchElementException("PoolableObjectFactory.makeObject() returned null.");
                  }
     ....
     _numActive++;
     return obj;

 

发现_factory.makeObject(); 这个方法是一个abstract,所以猜测Common Pool并未实现自己的中断策略,而由使用者自行扩展。

 

 

ThreadPoolExecutor中断策略

    JDK提供了4种策略,分别是:

    1、CallerRunsPolicy

    2、AbortPolicy 中止任务

    3、DiscardPolicy 丢弃任务

    4、DiscarOldestPolicy 丢弃最老任务

 

分析中断策略,直接查看ThreadPoolExecutor类中的execute(Runnable command)设计,如下:

 /**
     * The default rejected execution handler
     */
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
....
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        for (;;) {
            if (runState != RUNNING) {
                reject(command);
                return;
            }
            if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                return;
            if (workQueue.offer(command))
                return;
            Runnable r = addIfUnderMaximumPoolSize(command);// 当线程大小超过maximumPoolSize时,返回null
            if (r == command)
                return;
            if (r == null) {
                reject(command);// 执行中断策略
                return;
            }
            // else retry
        }
    }

 

reject方法为:

void reject(Runnable command) {
        handler.rejectedExecution(command, this);// 直接调用接口RejectedExecutionHandler的rejectedExecution方法
}

 

4种中断策略的实现,首先,默认的AbortPolicy implements RejectedExecutionHandler

 public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException();// 直接抛出异常
        }
    }

 这种策略会让程序抛出异常,会一定程度影响系统运行。

 

然后CallerRunsPolicy策略:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {// 只要ThreadPoolExecutor没有关闭,就马上执行新线程,注意这里是run,而非start,新线程会立马执行
                r.run();
            }
        }
    }

 这个策略比较保守,不抛出异常,也不忽略线程,而是让线程直接运行抢占CPU而发生阻塞。

 

DiscardPolicy策略(忽略当前线程):

public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a <tt>DiscardPolicy</tt>.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 直接忽略
        }
    }

这种策略被忽略,而不给出任何警告信息。

 

最后一个DiscardOldestPolicy策略(忽略最老线程)

 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();// 直接移除当前队列中头节点
                e.execute(r);// 把新线程加入到队列的尾节点,并调用线程的start方法启动
            }
        }
    }

 

结论:

从代码中看:

    当线程数量小于pool中活动线程数,则创建新线程;

    当线程数量等于pool中活动线程数,则尝试把任务加入到任务队列里面;

    当任务队列满的时候,则执行中断策略进行调整。

    各自的策略都有不同的特点,根据系统的需求选择合适的策略以达到线程利用率的最优。

 

论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics