`
adapterofcoms
  • 浏览: 75458 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

Java线程池的瑕疵,For java util concurrent threadpool Since jdk1.5

阅读更多

    java.util.concurrent的作者是Doug Lea : 世界上对Java影响力最大的个人,在jdk1.5之前大家一定熟悉他的backport-util-concurrent.jar."这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算器科学系的老大爷。",他可是并发编程的大师级人物哦!

    Since jdk1.5,在java.util.concurrent包下的线程池模型是基于queue的,threadpool只有一个,而queue却有多个

LinkedBlockingQueue,SynchronousQueue,ScheduledThreadPoolExecutor.DelayedWorkQueue等可参见java.util.concurrent.Executors.注意:我下面的问题是针对LinkedBlockingQueue的,参考的src为jdk1.6.

    ThreadPool通过以下的3个属性来标志池中的线程数:
corePoolSize(类似minimumPoolSize),poolSize(当前池中的线程数),maximumPoolSize(最大的线程数).
这3个属性表达的意思是每次新创建或结束一个线程poolSize++/--,在最忙的情况下threadpool创建的线程数不能超过maximumPoolSize,当空闲的情况下poolSize应该降到corePoolSize,当然threadpool如果从创建时它就从来没有处理过一次请求的话,那么poolSize当然为0.

通过以上2段的说明下面我要引出我所要讲的问题:

我们来看一下java.util.concurrent.ThreadPoolExecutor的execute方法:
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}

它表达的主体意思是:如果当前的poolSize<corePoolSize,那么就增加线程直到poolSize==corePoolSize.
如果poolSize已经到达corePoolSize,那么就把command(task) put to workQueue,如果workQueue为LinkedBlockingQueue的话,那么只有当workQueue offer commands达到workQueue.capacity后,threadpool才会继续增加线程直到maximumPoolSize.
1.*****如果LinkedBlockingQueue.capacity被设置为Integer.MAX_VALUE,那么池中的线程不可能到达maximumPoolSize.*****
所以你如果使用了Executors.newFixedThreadPool的话,那么maximumPoolSize和corePoolSize是一样的并且LinkedBlockingQueue.capacity==Integer.MAX_VALUE,或者如果这样new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,new LinkedBlockingQueue<Runnable>(/*Integer.MAX_VALUE*/))的话,
上述的使用都将导致maximumPoolSize是无效的,也就是说线程池中的线程数不会超出corePoolSize.
这个也让那些tomcat6的开发人员可能也郁闷了,他们不得不改写LinkedBlockingQueue,以tomcat-6.0.20-src为例:

org.apache.tomcat.util.net.NioEndpoint.TaskQueue extends LinkedBlockingQueue<Runnable> override offer method:
 
        public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
            parent = tp;
            this.endpoint = ep;
        }
       
        public boolean offer(Runnable o) {
            //we can't do any checks
            if (parent==null) return super.offer(o);
            //we are maxed out on threads, simply queue the object
            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
            //we have idle threads, just add it to the queue
            //this is an approximation, so it could use some tuning
            if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
            //if we have less threads than maximum force creation of a new thread
            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
            //if we reached here, we need to add it to the queue
            return super.offer(o);
        } 

org.apache.tomcat.util.net.NioEndpoint.start()-->
   TaskQueue taskqueue = new TaskQueue();/***queue.capacity==Integer.MAX_VALUE***/
                     TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
                     executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,TimeUnit.SECONDS,taskqueue, tf);
                     taskqueue.setParent( (ThreadPoolExecutor) executor, this);

2.*****如果把LinkedBlockingQueue.capacity设置为一个适当的值远小于Integer.MAX_VALUE,那么只有put到queue的任务数到达LinkedBlockingQueue的capacity后,才会继续增加池中的线程,使得poolSize超出corePoolSize但不超过maximumPoolSize,这个时候来增加线程数是不是有点晚了呢??????*****.
这样一来reject(command)也可能随之而来了,LinkedBlockingQueue.capacity设置为何值又是个头疼的问题.
所以ThreadPoolExecutor+LinkedBlockingQueue表达的意思是首先会增加线程数到corePoolSize,但只有queue的任务容量到达最大capacity后,才会继续在corePoolSize的基数上增加线程来处理任务,直到maximumPoolSize.

    但为什么我们不能这样呢:将LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,让task尽可能的得到处理,同时在忙的情况下,增加池中的线程充到maximumPoolSize来尽快的处理这些任务.即便是把LinkedBlockingQueue.capacity设置为一个适当的值<<<远小于Integer.MAX_VALUE,也不一定非得在任务数到达LinkedBlockingQueue的capacity之后才去增加线程使poolSize超出corePoolSize趋向maximumPoolSize.

    所以java util concurrent中的ThreadPoolExecutor+LinkedBlockingQueue组合的缺点也就出来了:如果我们想让线程池尽可能多的处理大量的任务的话,我们会把LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,但是如果这样的话池中的线程数量就不能充到最大maximumPoolSize,也就不能充分发挥线程池的最大处理能力.如果我们把LinkedBlockingQueue.capacity设置为一个较小的值,那么线程池中的线程数量会充到最大maximumPoolSize,但是如果池中的线程都忙的话,线程池又会reject请求的任务,因为队列已满.
    如果我们把LinkedBlockingQueue.capacity设置为一个较大的值但不是Integer.MAX_VALUE,那么等到线程池的线程数量准备开始超出corePoolSize时,也就是任务队列满了,这个时候才去增加线程的话,请求任务的执行会有一定的延时,也就是没有得到及时的处理.
    其实也就是说ThreadPoolExecutor缺乏灵敏的线程调度机制,没有根据当前任务的执行情况,是忙,还是闲,以及队列中的待处理任务的数量级进行动态的调配线程数,使得它的处理效率受到影响.

 

那么什么是忙的情况的判断呢? 
busy[1]:如果poolSize==corePoolSize,并且现在忙着执行任务的线程数(currentBusyWorkers)等于poolSize.[而不管现在put到queue的任务数是否到达queue.capacity]
busy[2].1:如果poolSize==corePoolSize,并且put到queue的任务数已到达queue.capacity.[queue.capacity是针对有任务队列极限限制的情况]
busy[2].2:线程池的基本目标是尽可能的快速处理大量的请求任务,那么就不一定非得在put到queue的任务数到达queue的capacity之后才判断为忙的情况,只要queue中现有的任务数(task_counter)与poolSize或者maximumPoolSize存在一定的比例时就可以判断为忙情,比如task_counter>=poolSize或者maximumPoolSize的(NumberOfProcessor+1)倍,这样queue.capacity这个限制可以取消了.

在上述busy[1],busy[2]这2种情况下都应增加线程数,直至maximumPoolSize,使请求的任务得到最快的处理. 

 

    前面讲的是忙的时候ThreadPoolExecutor+LinkedBlockingQueue在处理上的瑕疵,那么空闲的时候又要如何呢?
如果corePoolSize<poolSize<maximumPoolSize,那么线程等待keepAliveTime之后应该降为corePoolSize,嘿嘿,这个就真的成了bug了哦,一个很难发现的bug,poolSize是被降下来了,可是很可能降过了头<corePoolSize,甚至降为0也有可能.
ThreadPoolExecutor.Worker.run()-->ThreadPoolExecutor.getTask():
Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
      /*queue is empty,这里timeout之后,return null,之后call workerCanExit() return true.*/
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
}//end getTask.
private boolean workerCanExit() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try {
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        } finally {
            mainLock.unlock();
        }
        return canExit;
}//end workerCanExit.
在workerCanExit() return true之后,poolSize仍然大于corePoolSize,pooSize的值没有变化,
ThreadPoolExecutor.Worker.run()将结束-->ThreadPoolExecutor.Worker.workerDone-->这个时候才将poolSize--,可惜晚了,在多线程的环境下,poolSize的值将变为小于corePoolSize,而不是等于corePoolSize!!!!!!
例如:如果poolSize(6)大于corePoolSize(5),那么同时timeout的就不一定是一条线程,而是多条,它们都有可能退出run,使得poolSize--减过了corePoolSize.

    提一下java.util.concurrent.ThreadPoolExecutor的allowCoreThreadTimeOut方法, @since 1.6 public void allowCoreThreadTimeOut(boolean value);
它表达的意思是在空闲的时候让线程等待keepAliveTime,timeout后使得poolSize能够降为0.[其实我是希望它降为minimumPoolSize,特别是在服务器的环境下,我们需要线程池保持一定数量的线程来及时处理"零零碎碎,断断续续的不是很有压力的"请求],当然你可以把corePoolSize当作minimumPoolSize,而不调用该方法. 

 

 

 

 

2
1
分享到:
评论
3 楼 2022228 2010-05-12  
貌似的确可以优化这个规则。。。
2 楼 2022228 2010-05-12  
poolSize==corePoolSize之后,修改jdk的规则?究竟是添加到队列,还是开启新的线程?
1 楼 2022228 2010-05-12  
当一个任务通过execute(Runnable)方法欲添加到线程池时:

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。


这就是规则,你想说什么

相关推荐

    JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用

    "JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...

    JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介.doc

    JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介

    jdk1.5x64位 windows版.zip

    在并发编程方面,JDK1.5引入了并发工具类(java.util.concurrent),包括Semaphore、CyclicBarrier、CountDownLatch等,这些工具极大地简化了多线程编程中的同步和协调。 在内存模型和并发性能上,JDK1.5引入了Java...

    JDK1.5线程池源码及详细注释

    JDK 1.5引入了java.util.concurrent包,其中包含了线程池的实现,使得并发编程更加便捷和高效。线程池的核心在于它的设计策略,包括核心线程数、最大线程数、线程存活时间、工作队列以及拒绝策略。 线程池的主要类...

    backport-util-concurrent(2.2 /3.1)

    backport-util-concurrent提供了ThreadPool,它是Java 5 ThreadPoolExecutor的一个简化版本,用于管理和调度线程,有效地控制并发执行的任务数量。 4. **Future和Callable** - backport-util-concurrent也实现了...

    java线程池threadpool简单使用源码

    在Java中,`java.util.concurrent`包提供了`ExecutorService`接口和相关的实现,如`ThreadPoolExecutor`,它是线程池的核心。`ExecutorService`接口定义了添加任务、启动线程池、关闭线程池等方法。`...

    包含 jdk1.5免安装、jdk1.6免安装、jdk1.8(32和64)

    Java Development Kit (JDK) 是Java编程语言的核心组件,它为开发者提供了编译、调试和运行Java应用程序所需的所有工具。这个压缩包包含了三个不同版本的JDK:JDK 1.5、JDK 1.6和JDK 1.8,其中1.5和1.6是早期版本,...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    java线程池封装j

    Java线程池由`java.util.concurrent`包中的`ExecutorService`接口和其子类实现。其中,最常用的是`ThreadPoolExecutor`类,它提供了丰富的参数用于定制线程池的行为。线程池的核心组件包括: - **核心线程数...

    JDK 1.5 for Windows

    10. **并发改进**:添加了`java.util.concurrent`包,包含许多线程管理和同步工具类,如Executor框架,Semaphore,CountDownLatch等。 11. **内存模型的增强**:JDK 1.5对Java内存模型(JMM)进行了改进,确保了多...

    jdk1.5 for windows32 安装包

    JDK 1.5引入了java.util.concurrent包,包含了线程池、并发集合和并发工具类,极大地改善了多线程编程的效率和可靠性。 11. **NIO.2(New I/O 2)** 虽然NIO.2是在JDK 7中引入的,但JDK 1.5开始的NIO(非阻塞I/O...

    Java线程池文档

    线程池在Java中是通过`java.util.concurrent`包下的`ThreadPoolExecutor`类实现的。 线程池的主要作用是限制系统中执行线程的数量,通过预先配置好的线程数量,可以避免过多线程导致的资源浪费和系统拥挤,从而提高...

    Java线程池使用说明

    在JDK 1.5版本之前,Java对线程池的支持非常有限,而在JDK 1.5之后,加入了java.util.concurrent包,其中包含了一系列关于线程池的接口和类,极大地丰富了线程池的应用场景和管理方式。 线程池的主要作用是限制系统...

    jdk jdk1.5 windows系统

    12. **并发编程改进**:包括`java.util.concurrent`包的引入,提供了线程池、并发容器、并发工具类等,简化了多线程编程。 13. **XML支持的增强**:JAXB(Java Architecture for XML Binding)的引入,使得Java对象...

    jdk1.5 java

    JDK 1.5引入了`java.util.concurrent`包,提供了线程安全的数据结构和并发编程工具,如`Executor`框架、`Future`接口、`Semaphore`、`CyclicBarrier`等,极大地简化了多线程编程。 ### 10. **Synchronized关键字...

    jdk1.5 64位 免安装

    在并发处理上,JDK1.5引入了并发工具类(java.util.concurrent),包括线程池(ExecutorService)、并发容器(如ConcurrentHashMap)以及Future接口等,这些工具极大地提高了多线程环境下的程序设计效率和性能。...

    IBMJDK1.5linux.zip

    在并发编程方面,JDK 1.5引入了并发工具类(java.util.concurrent package),如Executor框架、Semaphore、CountDownLatch和CyclicBarrier等,这些工具大大简化了多线程编程,提高了并发应用的效率和可靠性。...

    jdk1.5 windows 64位官方正式版,绝对有效

    对于并发编程,JDK1.5引入了java.util.concurrent包,其中包括了线程池、Future、Callable接口以及CyclicBarrier和Semaphore等同步工具类,极大地丰富了并发处理能力,提升了多线程环境下的性能和可维护性。...

    自定义实现Java线程池1-模拟jdk线程池执行流程1

    首先,Java中的线程池设计始于JDK 5.0,主要通过`java.util.concurrent`包中的`Executor`接口实现。这个接口仅有一个`execute()`方法,用于提交执行任务。我们也将遵循这个设计,实现一个简单的线程池类`...

    java线程池实例详细讲解

    在Java中,`ExecutorService`接口是线程池的主要入口,它是`java.util.concurrent`包的一部分,提供了创建、管理和控制线程池的功能。 线程池的核心概念包括以下几点: 1. **工作队列(Work Queue)**:线程池内部...

Global site tag (gtag.js) - Google Analytics