Tomcat的线程池主要使用了ThreadPool.java、ThreadPoolRunnable.java、ThreadWithAttributes.java,其中ThreadPoolRunnable.java是一个接口,所有的需要使用线程池的服务都可以实现这个接口。而实现的核心则再ThreadPool.java中的两个内部类:MonitorRunnable.java和ControlRunnable.java。
MonitorRunnable.java在线程池启动之后定期(60s)的扫描线程数,如果空闲的线程大于最大空闲线程数,则结束多余的线程。
ControlRunnable.java是所有启动的线程,若由任务需要执行,ThreadPool会先找一个空闲的ControlRunnable来执行,若没有空闲的,则创建,若现有的busy线程已经达到最大值,则等待。任务执行结束后通知ControlRunnable继续wait,直到有任务执行或被MonitorRunnable回收。
若要使用线程池可以实现Runnable接口,或者可以实现ThreadPoolRunnable 接口,当然自己还可以扩展这个类,以便实现更多的使用线程池的方式。
ThreadPool.java
package com.xiao.tomcatthreadpool;
import java.util.Hashtable;
public class ThreadPool {
public static final int MAX_THREADS = 10;
public static final int MAX_THREADS_MIN = 4;
public static final int MAX_SPARE_THREADS = 5;
public static final int MIN_SPARE_THREADS = 2;
public static final int WORK_WAIT_TIMEOUT = 10*1000;
private String name = "TP";
private boolean isDaemon;
private boolean stopThePool;
private int maxSpareThreads;
private int minSpareThreads;
private int currentThreadCount;
private int currentThreadsBusy;
private int maxThreads;
private int threadPriority = Thread.NORM_PRIORITY;
private int sequence = 0;
private ControlRunnable[] pool;
private MonitorRunnable monitor;
protected Hashtable threads=new Hashtable();
public ThreadPool() {
maxThreads = MAX_THREADS;
maxSpareThreads = MAX_SPARE_THREADS;
minSpareThreads = MIN_SPARE_THREADS;
currentThreadCount = 0;
currentThreadsBusy = 0;
stopThePool = false;
}
public static ThreadPool createThreadPool() {
return new ThreadPool();
}
public synchronized void start() {
stopThePool = false;
currentThreadCount = 0;
currentThreadsBusy = 0;
pool = new ControlRunnable[maxThreads];
openThreads(minSpareThreads);
if (maxSpareThreads < maxThreads) {
monitor = new MonitorRunnable(this);
}
}
public void run(Runnable r) {
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
public void runIt(ThreadPoolRunnable r) {
if(null == r) {
throw new NullPointerException();
}
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
public ControlRunnable findControlRunnable() {
ControlRunnable c;
if ( stopThePool ) {
throw new IllegalStateException();
}
synchronized(this) {
while (currentThreadsBusy == currentThreadCount) {
if (currentThreadCount < maxThreads) {
int toOpen = currentThreadCount + minSpareThreads;
openThreads(toOpen);
} else {
try {
this.wait();
}
catch(InterruptedException e) {
}
}
}
if(0 == currentThreadCount || stopThePool) {
throw new IllegalStateException();
}
int pos = currentThreadCount - currentThreadsBusy - 1;
c = pool[pos];
pool[pos] = null;
currentThreadsBusy++;
}
return c;
}
public void openThreads(int toOpen) {
if(toOpen > maxThreads) {
toOpen = maxThreads;
}
for(int i=currentThreadCount; i<toOpen; i++) {
pool[i - currentThreadsBusy] = new ControlRunnable(this);
}
currentThreadCount = toOpen;
}
public void addThread(ThreadWithAttributes t, ControlRunnable r) {
threads.put(t, r);
}
public void removeThread(Thread t) {
threads.remove(t);
}
public synchronized void notifyThreadEnd(ControlRunnable r) {
currentThreadCount --;
currentThreadsBusy --;
notify();
}
public synchronized void returnController(ControlRunnable r) {
if(0 == currentThreadCount || stopThePool) {
r.terminate();
return;
}
currentThreadsBusy--;
pool[currentThreadCount - currentThreadsBusy - 1] = r;
notify();
}
public synchronized void checkSpareControllers() {
if(stopThePool) {
return;
}
if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads;
for(int i=0; i<toFree; i++) {
ControlRunnable cr = pool[currentThreadCount-currentThreadsBusy -1];
cr.terminate();
pool[currentThreadCount-currentThreadsBusy -1] = null;
currentThreadCount --;
}
}
}
/**
* MonitorRunnable主要任务是监控线程数
* 如果线程数大于最大线程则回收线程
*/
public static class MonitorRunnable implements Runnable {
ThreadPool tp;
Thread t;
boolean shouldTerminate;
int interval = WORK_WAIT_TIMEOUT;
public MonitorRunnable(ThreadPool tp) {
this.tp = tp;
this.start();
}
public void setInterval(int i) {
interval = i;
}
public void start() {
shouldTerminate = false;
t = new Thread(this);
t.setDaemon(tp.getDaemon());
t.setName(tp.getName() + "-Monitor");
t.start();
}
public void stop() {
terminal();
}
public synchronized void terminal() {
this.shouldTerminate = true;
this.notify();
}
public void run() {
while(true) {
try {
synchronized(this) {
this.wait(interval);
}
if(shouldTerminate) {
break;
}
//System.out.println("currentThreadCount=" + currentThreadCount + " currentThreadsBusy=" + currentThreadsBusy + " ");
tp.checkSpareControllers();
} catch(InterruptedException e) {
}
}
}
}
public static class ControlRunnable implements Runnable {
private ThreadPool tp;
private boolean shouldTerminate;
private ThreadWithAttributes t;
private ThreadPoolRunnable toRun;
private Runnable toRunRunnable;
private boolean shouldRun;
public ControlRunnable(ThreadPool tp) {
toRun = null;
shouldTerminate = false;
shouldRun = false;
this.tp = tp;
t = new ThreadWithAttributes(tp, this);
t.setDaemon(true);
t.setName(tp.getName() + "-Processor" + tp.incSequence());
t.setPriority(tp.getThreadPriority());
tp.addThread(t, this);
t.start();
}
public void run() {
boolean _shouldRun = false;
boolean _shouldTerminate = false;
ThreadPoolRunnable _toRun = null;
try {
while(true) {
try {
synchronized(this) {
System.out.println("shouldRun=" + shouldRun);
while(!shouldRun && !shouldTerminate) {
this.wait();
}
_shouldRun = shouldRun;
_shouldTerminate = shouldTerminate;
_toRun = toRun;
}
if (_shouldTerminate)
break;
try {
if(_shouldRun) {
if (_toRun != null) {
_toRun.runIt(t.getThreadData(tp));
} else if (toRunRunnable != null) {
toRunRunnable.run();
} else {
}
}
} catch(Throwable r) {
_shouldTerminate = true;
_shouldRun = false;
tp.notifyThreadEnd(this);
} finally {
if(_shouldRun) {
shouldRun = false;
tp.returnController(this);
}
}
if (_shouldTerminate) {
break;
}
} catch(InterruptedException e) {
}
}
} finally {
tp.removeThread(Thread.currentThread());
}
}
public synchronized void runIt(Runnable toRun) {
this.toRunRunnable = toRun;
shouldRun = true;
this.notify();
}
public synchronized void runIt(ThreadPoolRunnable toRun) {
this.toRun = toRun;
shouldRun = true;
this.notify();
}
public void stop() {
this.terminate();
}
public void kill() {
t.stop();
}
public synchronized void terminate() {
shouldTerminate = true;
this.notify();
}
}
public String getName() {
return name;
}
public boolean getDaemon() {
return isDaemon;
}
public int getThreadPriority() {
return threadPriority;
}
public int incSequence() {
return sequence ++;
}
public void setThreadPriority(int threadPriority) {
this.threadPriority = threadPriority;
}
}
ThreadWithAttributes.java
package com.xiao.tomcatthreadpool;
import java.util.Hashtable;
public class ThreadWithAttributes extends Thread {
private Object control;
public static int MAX_NOTES=16;
private Object notes[]=new Object[MAX_NOTES];
private Hashtable attributes=new Hashtable();
private String currentStage;
private Object param;
private Object thData[];
public ThreadWithAttributes(Object control, Runnable r) {
super(r);
this.control=control;
}
public final Object[] getThreadData(Object control ) {
return thData;
}
public final void setThreadData(Object control, Object thData[] ) {
this.thData=thData;
}
public final void setNote( Object control, int id, Object value ) {
if( this.control != control ) return;
notes[id]=value;
}
public final String getCurrentStage(Object control) {
if( this.control != control ) return null;
return currentStage;
}
/** Information about the current request ( or the main object
* we are processing )
*/
public final Object getParam(Object control) {
if( this.control != control ) return null;
return param;
}
public final void setCurrentStage(Object control, String currentStage) {
if( this.control != control ) return;
this.currentStage = currentStage;
}
public final void setParam( Object control, Object param ) {
if( this.control != control ) return;
this.param=param;
}
public final Object getNote(Object control, int id ) {
if( this.control != control ) return null;
return notes[id];
}
public final Hashtable getAttributes(Object control) {
if( this.control != control ) return null;
return attributes;
}
}
ThreadPoolRunnable.java
package com.xiao.tomcatthreadpool;
public interface ThreadPoolRunnable {
public Object[] getInitData();
public void runIt(Object thData[]);
}
分享到:
相关推荐
Tomcat提供了两种线程池实现,一种是基于Apache Portable Runtime (APR)的Pool技术,另一种是纯Java实现的ThreadPool。本文主要探讨后者,即Java实现的线程池。 Java实现的线程池位于`tomcat-util.jar`中,初始化时...
Tomcat的线程池实现是基于Apache Commons JMX的ExecutorService,它是对Java标准库ExecutorService的一种扩展,增加了更多的监控和管理功能。 Tomcat的线程池主要由`org.apache.tomcat.util.threads.TaskQueue`和`...
NIO模式下,Tomcat使用一个线程池处理多个连接,提高了并发性能。 五、Session管理 Tomcat提供了内置的Session管理,包括会话创建、有效期设置、会话跟踪、分布式环境下的会话复制等。默认情况下,Session信息存储...
5. **线程模型**:Tomcat使用基于Executor的线程池来处理请求,提高了并发性能。 6. **连接器(Connector)和协议处理**:Tomcat支持多种连接器,如HTTP/1.1的`Http11NioProtocol`和`Http11AprProtocol`,用于处理...
在这个“tomcat实现websocket聊天室”的项目中,我们将深入探讨如何利用Tomcat搭建一个具备单聊、群聊、数据库管理以及用户管理功能的聊天室。 首先,我们需要理解WebSocket API的基本概念。WebSocket协议定义了两...
6. 性能优化:Tomcat可以通过调整连接器配置(如最大连接数、超时设置)和容器配置(如线程池大小、内存分配)来优化性能。此外,还可以启用压缩、缓存等特性以减少网络传输和提高响应速度。 总结来说,“简单的...
- **并发线程模式**:Tomcat使用线程池来处理并发请求,提高性能。 - **接收请求与处理**:Tomcat通过Coyote Connector接收HTTP请求,并将其转化为内部格式供处理组件使用。 - **Servlet容器的实现**:Tomcat的...
Tomcat是一款广泛使用的开源Java Servlet容器,它实现了Java Servlet和JavaServer Pages(JSP)规范,是开发和部署Java Web应用程序的关键工具。Tomcat的运行原理主要包括以下几个方面: 1. **架构概述**:Tomcat的...
在描述中提到的服务端采用Executors线程池实现,这意味着学生将学习如何使用Java的并发库来管理线程,提高并发性能。 4. **线程池**:ExecutorService是Java并发框架的一部分,它可以有效地管理和控制线程。使用...
7. 性能优化:Tomcat 8在内存管理、线程池和垃圾收集等方面进行了优化,提升了服务器的性能。 8. 安全增强:提供了更多的安全特性,如更好的密码加密存储和更强的身份验证机制。 总结来说,Tomcat 7和Tomcat 8在...
4. 优化Tomcat线程池设置,根据实际负载调整。 七、故障排查 1. 查看`logs/catalina.out`、`logs/host-manager.*.log`和`logs/manager.*.log`等日志文件。 2. 使用JMX监控Tomcat运行状态,如内存使用、线程池情况等...
对于性能调优,Tomcat可以通过调整`server.xml`中的参数,例如最大连接数、线程池大小等来提高处理能力。对于Nginx,可以优化工作进程数、连接数限制和超时设置等。还可以编写脚本来自动化这些调优过程,确保系统...
此外,它实现了Java 5及更高版本的Executor接口,可以实现线程池,进一步提高性能。 2. **tomcat-juli.jar**: Juli是Tomcat的一个日志框架,全称为Apache Tomcat Utility for Logging Interface。这个jar文件包含...
源码分析部分可能包括Tomcat的启动过程、请求处理流程、线程池管理、容器结构实现以及特定Servlet的生命周期管理等方面。通过阅读源码,开发者可以深入理解Tomcat如何处理网络请求,如何调度线程,以及如何管理和...
这通常涉及到与Tomcat管理接口的交互,例如通过JMX(Java Management Extensions)来获取服务器的运行信息,如线程池、内存使用、请求处理等指标。 2. **异常检测**:当Tomcat服务出现异常时,监控程序需要能够识别...
8. **JMX (Java Management Extensions)**: Tomcat使用JMX暴露管理MBeans(Managed Beans),允许通过JMX代理远程监控和管理Tomcat服务器的状态,包括启动/停止应用、查看线程池状态、调整配置等。 9. **Clustering...
9. **性能优化**:Tomcat可以通过调整配置参数,如线程池大小、最大连接数、内存分配等,来提高其性能。同时,还可以结合使用缓存和负载均衡技术来进一步提升系统性能。 10. **扩展性**:虽然Tomcat本身是轻量级的...
### Tomcat线程池的整体结构 文章首先提供了ThreadPool的类图,展示了其基本组成和关系。主要类包括`ThreadWithAttributes`、`ControlRunnable`、`ThreadPool`、`MonitorRunnable`和`ThreadPoolListener`等,这些类...
2. **线程池管理**:Tomcat如何使用Executor(`Executor`接口和`ThreadPoolExecutor`实现)来管理线程,提高并发性能。 3. **会话管理**:Tomcat如何实现Session跟踪,包括Cookie、URL重写和基于数据库的持久化策略...
1. **Apache Tomcat**: Tomcat是由Apache软件基金会的Jakarta项目开发的,它是Java Servlet和JavaServer Pages规范的实现。Tomcat6.0.18是其6.0系列的一个版本,支持Servlet 2.5和JSP 2.1标准。 2. **无需安装**: ...