- 浏览: 74667 次
- 性别:
- 来自: 苏州
文章分类
最新评论
-
name327:
LZ文章看似杂乱,但是点点说到NIO要点,需要经过大量的实验才 ...
java nio在多线程环境下的恶梦之终结 -
沙舟狼客:
你好楼主,有空帮忙分析一下mina2.0.7的异常:
java ...
java nio在多线程环境下的恶梦之终结 -
Copperfield:
A non-static nested class (or ' ...
你能通过下面的3道java面试题吗? -
ay8yt:
java端的代码怎么没有啊,tomcat网站里找不到啊
浏览器[IE,Firefox]不支持comet技术-AJAX不能支持服务端推消息 -
t8500071:
引用if remove the current selecte ...
java nio在多线程环境下的恶梦之终结
java.util.concurrent的作者是Doug Lea : 世界上对Java影响力最大的个人,在jdk1.5之前大家一定熟悉他的backport-util-concurrent.jar."这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算器科学系的老大爷。",他可是并发编程的大师级人物哦!
Since jdk1.5,在java.util.concurrent包下的线程池模型是基于queue的,threadpool只有一个,而queue却有多个
LinkedBlockingQueue,SynchronousQueue,ScheduledThreadPoolExecutor.DelayedWorkQueue等可参见java.util.concurrent.Executors.注意:我下面的问题是针对LinkedBlockingQueue的,参考的src为jdk1.6.
ThreadPool通过以下的3个属性来标志池中的线程数:
corePoolSize(类似minimumPoolSize),poolSize(当前池中的线程数),maximumPoolSize(最大的线程数).
这3个属性表达的意思是每次新创建或结束一个线程poolSize++/--,在最忙的情况下threadpool创建的线程数不能超过maximumPoolSize,当空闲的情况下poolSize应该降到corePoolSize,当然threadpool如果从创建时它就从来没有处理过一次请求的话,那么poolSize当然为0.
通过以上2段的说明下面我要引出我所要讲的问题:
我们来看一下java.util.concurrent.ThreadPoolExecutor的execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
它表达的主体意思是:如果当前的poolSize<corePoolSize,那么就增加线程直到poolSize==corePoolSize.
如果poolSize已经到达corePoolSize,那么就把command(task) put to workQueue,如果workQueue为LinkedBlockingQueue的话,那么只有当workQueue offer commands达到workQueue.capacity后,threadpool才会继续增加线程直到maximumPoolSize.
1.*****如果LinkedBlockingQueue.capacity被设置为Integer.MAX_VALUE,那么池中的线程不可能到达maximumPoolSize.*****
所以你如果使用了Executors.newFixedThreadPool的话,那么maximumPoolSize和corePoolSize是一样的并且LinkedBlockingQueue.capacity==Integer.MAX_VALUE,或者如果这样new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,new LinkedBlockingQueue<Runnable>(/*Integer.MAX_VALUE*/))的话,
上述的使用都将导致maximumPoolSize是无效的,也就是说线程池中的线程数不会超出corePoolSize.
这个也让那些tomcat6的开发人员可能也郁闷了,他们不得不改写LinkedBlockingQueue,以tomcat-6.0.20-src为例:
org.apache.tomcat.util.net.NioEndpoint.TaskQueue extends LinkedBlockingQueue<Runnable> override offer method:
public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
parent = tp;
this.endpoint = ep;
}
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
//this is an approximation, so it could use some tuning
if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
org.apache.tomcat.util.net.NioEndpoint.start()-->
TaskQueue taskqueue = new TaskQueue();/***queue.capacity==Integer.MAX_VALUE***/
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor, this);
2.*****如果把LinkedBlockingQueue.capacity设置为一个适当的值远小于Integer.MAX_VALUE,那么只有put到queue的任务数到达LinkedBlockingQueue的capacity后,才会继续增加池中的线程,使得poolSize超出corePoolSize但不超过maximumPoolSize,这个时候来增加线程数是不是有点晚了呢??????*****.
这样一来reject(command)也可能随之而来了,LinkedBlockingQueue.capacity设置为何值又是个头疼的问题.
所以ThreadPoolExecutor+LinkedBlockingQueue表达的意思是首先会增加线程数到corePoolSize,但只有queue的任务容量到达最大capacity后,才会继续在corePoolSize的基数上增加线程来处理任务,直到maximumPoolSize.
但为什么我们不能这样呢:将LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,让task尽可能的得到处理,同时在忙的情况下,增加池中的线程充到maximumPoolSize来尽快的处理这些任务.即便是把LinkedBlockingQueue.capacity设置为一个适当的值<<<远小于Integer.MAX_VALUE,也不一定非得在任务数到达LinkedBlockingQueue的capacity之后才去增加线程使poolSize超出corePoolSize趋向maximumPoolSize.
所以java util concurrent中的ThreadPoolExecutor+LinkedBlockingQueue组合的缺点也就出来了:如果我们想让线程池尽可能多的处理大量的任务的话,我们会把LinkedBlockingQueue.capacity设置为Integer.MAX_VALUE,但是如果这样的话池中的线程数量就不能充到最大maximumPoolSize,也就不能充分发挥线程池的最大处理能力.如果我们把LinkedBlockingQueue.capacity设置为一个较小的值,那么线程池中的线程数量会充到最大maximumPoolSize,但是如果池中的线程都忙的话,线程池又会reject请求的任务,因为队列已满.
如果我们把LinkedBlockingQueue.capacity设置为一个较大的值但不是Integer.MAX_VALUE,那么等到线程池的线程数量准备开始超出corePoolSize时,也就是任务队列满了,这个时候才去增加线程的话,请求任务的执行会有一定的延时,也就是没有得到及时的处理.
其实也就是说ThreadPoolExecutor缺乏灵敏的线程调度机制,没有根据当前任务的执行情况,是忙,还是闲,以及队列中的待处理任务的数量级进行动态的调配线程数,使得它的处理效率受到影响.
那么什么是忙的情况的判断呢?
busy[1]:如果poolSize==corePoolSize,并且现在忙着执行任务的线程数(currentBusyWorkers)等于poolSize.[而不管现在put到queue的任务数是否到达queue.capacity]
busy[2].1:如果poolSize==corePoolSize,并且put到queue的任务数已到达queue.capacity.[queue.capacity是针对有任务队列极限限制的情况]
busy[2].2:线程池的基本目标是尽可能的快速处理大量的请求任务,那么就不一定非得在put到queue的任务数到达queue的capacity之后才判断为忙的情况,只要queue中现有的任务数(task_counter)与poolSize或者maximumPoolSize存在一定的比例时就可以判断为忙情,比如task_counter>=poolSize或者maximumPoolSize的(NumberOfProcessor+1)倍,这样queue.capacity这个限制可以取消了.
在上述busy[1],busy[2]这2种情况下都应增加线程数,直至maximumPoolSize,使请求的任务得到最快的处理.
前面讲的是忙的时候ThreadPoolExecutor+LinkedBlockingQueue在处理上的瑕疵,那么空闲的时候又要如何呢?
如果corePoolSize<poolSize<maximumPoolSize,那么线程等待keepAliveTime之后应该降为corePoolSize,嘿嘿,这个就真的成了bug了哦,一个很难发现的bug,poolSize是被降下来了,可是很可能降过了头<corePoolSize,甚至降为0也有可能.
ThreadPoolExecutor.Worker.run()-->ThreadPoolExecutor.getTask():
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
/*queue is empty,这里timeout之后,return null,之后call workerCanExit() return true.*/
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}//end getTask.
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}//end workerCanExit.
在workerCanExit() return true之后,poolSize仍然大于corePoolSize,pooSize的值没有变化,
ThreadPoolExecutor.Worker.run()将结束-->ThreadPoolExecutor.Worker.workerDone-->这个时候才将poolSize--,可惜晚了,在多线程的环境下,poolSize的值将变为小于corePoolSize,而不是等于corePoolSize!!!!!!
例如:如果poolSize(6)大于corePoolSize(5),那么同时timeout的就不一定是一条线程,而是多条,它们都有可能退出run,使得poolSize--减过了corePoolSize.
提一下java.util.concurrent.ThreadPoolExecutor的allowCoreThreadTimeOut方法, @since 1.6 public void allowCoreThreadTimeOut(boolean value);
它表达的意思是在空闲的时候让线程等待keepAliveTime,timeout后使得poolSize能够降为0.[其实我是希望它降为minimumPoolSize,特别是在服务器的环境下,我们需要线程池保持一定数量的线程来及时处理"零零碎碎,断断续续的不是很有压力的"请求],当然你可以把corePoolSize当作minimumPoolSize,而不调用该方法.
评论
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
这就是规则,你想说什么
发表评论
-
Mybatis miscellaneous
2018-07-26 13:13 0一.mybatis utilz 二 ... -
HTML5 WebSocket and Web Messages Pushlet
2015-08-20 07:24 1131jsr-356 Programming Model I ... -
Maven Multi-module Inheritance
2015-07-02 18:03 836所有用Maven管理的真实的项目都应该是分模块的,每 ... -
JAX-WS JWS
2015-07-02 17:52 775e.g: @WebServicepublic int ... -
Spring AOP&AutoProxy&Transactions
2015-06-27 19:50 812AOPAdvice: action taken by an ... -
Spring IOC BeanFactory ApplicationContext
2015-06-27 19:47 734BeanFactoryPostProcessor[e.g: ... -
JAXB JAXP Miscellaneous
2015-03-07 21:12 795schemegenxjcC:\jdk1.8.0_25\bin\ ... -
ThreadLocal&WeakReference
2015-02-26 14:05 1182ThreadLocalThreadLocalMap.Ent ... -
Eclipse&Maven&Nexus
2015-06-26 22:34 1249Nexus2:nexus\bin\jsw\windo ... -
Spring Framework Miscellaneous
2014-12-16 11:09 908-------------------------- ... -
Quartz Scheduler JDBC JobStore Clustering
2014-08-20 09:43 1169#org.quartz.jobStore.isClu ... -
Java Mail Internals
2014-08-05 17:07 737Java EE 7 Technologieshttp:// ... -
ActiveMQ-Client[JMS] Runtime
2014-07-27 13:04 961activemq-client:org.apache.ac ... -
Mybatis-Spring Interactions
2014-07-27 12:55 647Spring side: DataSourceUtils ... -
java深度 GC
2013-05-16 22:28 964java深度 GCjvm reduceinitialcardm ... -
Java正则表达式
2011-07-18 20:08 744表达式意义:1.字符x 字符 x。例如a表示字符a\\ ... -
MyEclipse & Eclipse JEE
2011-04-09 13:56 1872去除Eclipse的JS验证:将windows-&g ... -
MINA,xSocket同样的性能缺陷及陷阱,Grizzly better
2010-03-04 22:06 5219MINA,Grizzly[grizzly-nio-framew ... -
java nio在多线程环境下的恶梦之终结
2010-03-01 16:43 17349有人说java nio在多线程环境下编程简直就是个恶梦,其实你 ... -
DWR在和spring集成时的bug,SpringCreator.getType???
2010-02-04 12:18 3317DWR在和spring集成时,在dwr.xml中将设置crea ...
相关推荐
"JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...
JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介
在并发编程方面,JDK1.5引入了并发工具类(java.util.concurrent),包括Semaphore、CyclicBarrier、CountDownLatch等,这些工具极大地简化了多线程编程中的同步和协调。 在内存模型和并发性能上,JDK1.5引入了Java...
JDK 1.5引入了java.util.concurrent包,其中包含了线程池的实现,使得并发编程更加便捷和高效。线程池的核心在于它的设计策略,包括核心线程数、最大线程数、线程存活时间、工作队列以及拒绝策略。 线程池的主要类...
Java Development Kit (JDK) 是Java编程语言的核心组件,它为开发者提供了编译、调试和运行Java应用程序所需的所有工具。这个压缩包包含了三个不同版本的JDK:JDK 1.5、JDK 1.6和JDK 1.8,其中1.5和1.6是早期版本,...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
Java线程池由`java.util.concurrent`包中的`ExecutorService`接口和其子类实现。其中,最常用的是`ThreadPoolExecutor`类,它提供了丰富的参数用于定制线程池的行为。线程池的核心组件包括: - **核心线程数...
9. **并发改进(Concurrency Enhancements)**:包括`java.util.concurrent`包的引入,提供了线程安全的数据结构和高级并发工具,如`Executor`框架,简化了多线程编程。 10. **NIO.2(New I/O 2)**:虽然JDK 1.5中...
backport-util-concurrent提供了ThreadPool,它是Java 5 ThreadPoolExecutor的一个简化版本,用于管理和调度线程,有效地控制并发执行的任务数量。 4. **Future和Callable** - backport-util-concurrent也实现了...
10. **并发改进**:添加了`java.util.concurrent`包,包含许多线程管理和同步工具类,如Executor框架,Semaphore,CountDownLatch等。 11. **内存模型的增强**:JDK 1.5对Java内存模型(JMM)进行了改进,确保了多...
JDK 1.5引入了java.util.concurrent包,包含了线程池、并发集合和并发工具类,极大地改善了多线程编程的效率和可靠性。 11. **NIO.2(New I/O 2)** 虽然NIO.2是在JDK 7中引入的,但JDK 1.5开始的NIO(非阻塞I/O...
线程池在Java中是通过`java.util.concurrent`包下的`ThreadPoolExecutor`类实现的。 线程池的主要作用是限制系统中执行线程的数量,通过预先配置好的线程数量,可以避免过多线程导致的资源浪费和系统拥挤,从而提高...
在JDK 1.5版本之前,Java对线程池的支持非常有限,而在JDK 1.5之后,加入了java.util.concurrent包,其中包含了一系列关于线程池的接口和类,极大地丰富了线程池的应用场景和管理方式。 线程池的主要作用是限制系统...
12. **并发编程改进**:包括`java.util.concurrent`包的引入,提供了线程池、并发容器、并发工具类等,简化了多线程编程。 13. **XML支持的增强**:JAXB(Java Architecture for XML Binding)的引入,使得Java对象...
JDK 1.5引入了`java.util.concurrent`包,提供了线程安全的数据结构和并发编程工具,如`Executor`框架、`Future`接口、`Semaphore`、`CyclicBarrier`等,极大地简化了多线程编程。 ### 10. **Synchronized关键字...
在并发编程方面,JDK 1.5引入了并发工具类(java.util.concurrent package),如Executor框架、Semaphore、CountDownLatch和CyclicBarrier等,这些工具大大简化了多线程编程,提高了并发应用的效率和可靠性。...
对于并发编程,JDK1.5引入了java.util.concurrent包,其中包括了线程池、Future、Callable接口以及CyclicBarrier和Semaphore等同步工具类,极大地丰富了并发处理能力,提升了多线程环境下的性能和可维护性。...
在Java中,`java.util.concurrent`包提供了`ExecutorService`接口和相关的实现,如`ThreadPoolExecutor`,它是线程池的核心。`ExecutorService`接口定义了添加任务、启动线程池、关闭线程池等方法。`...
首先,Java中的线程池设计始于JDK 5.0,主要通过`java.util.concurrent`包中的`Executor`接口实现。这个接口仅有一个`execute()`方法,用于提交执行任务。我们也将遵循这个设计,实现一个简单的线程池类`...
在Java中,`ExecutorService`接口是线程池的主要入口,它是`java.util.concurrent`包的一部分,提供了创建、管理和控制线程池的功能。 线程池的核心概念包括以下几点: 1. **工作队列(Work Queue)**:线程池内部...