`
xiaoshan5634
  • 浏览: 74140 次
  • 性别: Icon_minigender_1
  • 来自: 东莞
社区版块
存档分类
最新评论

tomcat线程池的实现

阅读更多

Tomcat的线程池主要使用了ThreadPool.java、ThreadPoolRunnable.java、ThreadWithAttributes.java,其中ThreadPoolRunnable.java是一个接口,所有的需要使用线程池的服务都可以实现这个接口。而实现的核心则再ThreadPool.java中的两个内部类:MonitorRunnable.java和ControlRunnable.java。

MonitorRunnable.java在线程池启动之后定期(60s)的扫描线程数,如果空闲的线程大于最大空闲线程数,则结束多余的线程。

ControlRunnable.java是所有启动的线程,若由任务需要执行,ThreadPool会先找一个空闲的ControlRunnable来执行,若没有空闲的,则创建,若现有的busy线程已经达到最大值,则等待。任务执行结束后通知ControlRunnable继续wait,直到有任务执行或被MonitorRunnable回收。

若要使用线程池可以实现Runnable接口,或者可以实现ThreadPoolRunnable 接口,当然自己还可以扩展这个类,以便实现更多的使用线程池的方式。

ThreadPool.java

package com.xiao.tomcatthreadpool;

import java.util.Hashtable;

public class ThreadPool {
	
	public static final int MAX_THREADS = 10;
    public static final int MAX_THREADS_MIN = 4;
    public static final int MAX_SPARE_THREADS = 5;
    public static final int MIN_SPARE_THREADS = 2;
    public static final int WORK_WAIT_TIMEOUT = 10*1000;
    
    private String name = "TP";
    private boolean isDaemon;
    private boolean stopThePool;
    private int maxSpareThreads;
    private int minSpareThreads;
    private int currentThreadCount;
    private int currentThreadsBusy;
    private int maxThreads;
    
    private int threadPriority = Thread.NORM_PRIORITY;
    private int sequence = 0;
    
    private ControlRunnable[] pool;
    private MonitorRunnable monitor;
    
    protected Hashtable threads=new Hashtable();
    
    public ThreadPool() {
    	maxThreads = MAX_THREADS;
    	maxSpareThreads = MAX_SPARE_THREADS;
    	minSpareThreads = MIN_SPARE_THREADS;
    	currentThreadCount = 0;
    	currentThreadsBusy = 0;
    	stopThePool = false;
    }
    
    public static ThreadPool createThreadPool() {
    	return new ThreadPool();
    }
    
    public synchronized void start() {
    	stopThePool = false;
    	currentThreadCount = 0;
    	currentThreadsBusy = 0;
    	pool = new ControlRunnable[maxThreads];
    	openThreads(minSpareThreads);
    	if (maxSpareThreads < maxThreads) {
            monitor = new MonitorRunnable(this);
        }
    }
    
    public void run(Runnable r) {
    	ControlRunnable c = findControlRunnable();
        c.runIt(r);
    }
    
    public void runIt(ThreadPoolRunnable r) {
    	if(null == r) {
            throw new NullPointerException();
        }
        ControlRunnable c = findControlRunnable();
        c.runIt(r);
    }
    
    public ControlRunnable findControlRunnable() {
    	ControlRunnable c;
    	if ( stopThePool ) {
            throw new IllegalStateException();
        }
    	synchronized(this) {
    		while (currentThreadsBusy == currentThreadCount) {
    			if (currentThreadCount < maxThreads) {
                    int toOpen = currentThreadCount + minSpareThreads;
                    openThreads(toOpen);
                } else {
                	try {
                        this.wait();
                    }
                    catch(InterruptedException e) {
                    	
                    }
                }
    		}
    		if(0 == currentThreadCount || stopThePool) {
                throw new IllegalStateException();
            }
    		int pos = currentThreadCount - currentThreadsBusy - 1;
            c = pool[pos];
            pool[pos] = null;
            currentThreadsBusy++;
    	}
    	return c;
    }
    
    public void openThreads(int toOpen) {
    	if(toOpen > maxThreads) {
    		toOpen = maxThreads;
    	}
    	for(int i=currentThreadCount; i<toOpen; i++) {
    		pool[i - currentThreadsBusy] = new ControlRunnable(this);
    	}
    	currentThreadCount = toOpen;
    }
    
    public void addThread(ThreadWithAttributes t, ControlRunnable r) {
    	threads.put(t, r);
    }
    
    public void removeThread(Thread t) {
    	threads.remove(t);
    }
    
    public synchronized void notifyThreadEnd(ControlRunnable r) {
    	currentThreadCount --;
    	currentThreadsBusy --;
    	notify();
    }
    
    public synchronized void returnController(ControlRunnable r) {
    	if(0 == currentThreadCount || stopThePool) {
            r.terminate();
            return;
        }
    	currentThreadsBusy--;
    	pool[currentThreadCount - currentThreadsBusy - 1] = r;
        notify();
    }
    
    public synchronized void checkSpareControllers() {
    	if(stopThePool) {
            return;
        }
    	if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
    		int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads;
    		for(int i=0; i<toFree; i++) {
    			ControlRunnable cr = pool[currentThreadCount-currentThreadsBusy -1];
    			cr.terminate();
    			pool[currentThreadCount-currentThreadsBusy -1] = null;
    			currentThreadCount --;
    		}
    	}
    	
    }
    
    /**
     * MonitorRunnable主要任务是监控线程数
     * 如果线程数大于最大线程则回收线程
     */
    public static class MonitorRunnable implements Runnable {
    	ThreadPool tp;
    	Thread t;
    	boolean shouldTerminate;
    	int interval = WORK_WAIT_TIMEOUT;
    	public MonitorRunnable(ThreadPool tp) {
    		this.tp = tp;
    		this.start();
    	}
    	
    	public void setInterval(int i) {
    		interval = i;
    	}

    	public void start() {
    		shouldTerminate = false;
    		t = new Thread(this);
    		t.setDaemon(tp.getDaemon());
    	    t.setName(tp.getName() + "-Monitor");
    		t.start();
    	}
    	
    	public void stop() {
    		terminal();
    	}
    	
    	public synchronized void terminal() {
    		this.shouldTerminate = true;
    		this.notify();
    	}
    	
    	public void run() {
    		while(true) {
    			try {
    				synchronized(this) {
    					this.wait(interval);
    				}
    				if(shouldTerminate) {
                        break;
                    }
    				//System.out.println("currentThreadCount=" + currentThreadCount + " currentThreadsBusy=" + currentThreadsBusy + " ");
    				tp.checkSpareControllers();
    			} catch(InterruptedException e) {
    				
    			}
    		}
    	}
    }
    
    public static class ControlRunnable implements Runnable {
    	private ThreadPool tp;
    	private boolean shouldTerminate;
    	private ThreadWithAttributes     t;
    	
    	private ThreadPoolRunnable   toRun;
        private Runnable toRunRunnable;
        private boolean    shouldRun;
    	
    	public ControlRunnable(ThreadPool tp) {
    		toRun = null;
    		shouldTerminate = false;
            shouldRun = false;
            this.tp = tp;
            t = new ThreadWithAttributes(tp, this);
            t.setDaemon(true);
            t.setName(tp.getName() + "-Processor" + tp.incSequence());
            t.setPriority(tp.getThreadPriority());
            tp.addThread(t, this);
            t.start();
    	}
    	
    	public void run() {
    		boolean _shouldRun = false;
            boolean _shouldTerminate = false;
            ThreadPoolRunnable _toRun = null;
            try {
            	while(true) {
            		try {
            			synchronized(this) {
            				System.out.println("shouldRun=" + shouldRun);
            				while(!shouldRun && !shouldTerminate) {
            					this.wait();
            				}
            				_shouldRun = shouldRun;
                            _shouldTerminate = shouldTerminate;
                            _toRun = toRun;
            			}
            			if (_shouldTerminate)
                            break;
            			try {
            				if(_shouldRun) {
            					if (_toRun != null) {
            						_toRun.runIt(t.getThreadData(tp));
            					} else if (toRunRunnable != null) {
            						toRunRunnable.run();
            					} else {
            					}
            				}
            			} catch(Throwable r) {
            				_shouldTerminate = true;
                            _shouldRun = false;
                            tp.notifyThreadEnd(this);
            			} finally {
            				if(_shouldRun) {
            					shouldRun = false;
            					tp.returnController(this);
            				}
            				
            			}
            			if (_shouldTerminate) {
                            break;
                        }
            		} catch(InterruptedException e) {
            			
            		}
            	}
            } finally {
            	tp.removeThread(Thread.currentThread());
            }
            
    	}
    	
    	public synchronized void runIt(Runnable toRun) {
    	    this.toRunRunnable = toRun;
            shouldRun = true;
            this.notify();
        }
    	
    	public synchronized void runIt(ThreadPoolRunnable toRun) {
    	    this.toRun = toRun;
            shouldRun = true;
            this.notify();
        }
    	
    	public void stop() {
            this.terminate();
        }

        public void kill() {
            t.stop();
        }
    	
    	public synchronized void terminate() {
    		shouldTerminate = true;
    		this.notify();
    	}
    }
    
    public String getName() {
    	return name;
    }
    
    public boolean getDaemon() {
    	return isDaemon;
    }

	public int getThreadPriority() {
		return threadPriority;
	}
	
	public int incSequence() {
		return sequence ++;
	}

	public void setThreadPriority(int threadPriority) {
		this.threadPriority = threadPriority;
	}

}

 ThreadWithAttributes.java

package com.xiao.tomcatthreadpool;

import java.util.Hashtable;

public class ThreadWithAttributes extends Thread {
    
    private Object control;
    public static int MAX_NOTES=16;
    private Object notes[]=new Object[MAX_NOTES];
    private Hashtable attributes=new Hashtable();
    private String currentStage;
    private Object param;
    
    private Object thData[];

    public ThreadWithAttributes(Object control, Runnable r) {
        super(r);
        this.control=control;
    }
    
    public final Object[] getThreadData(Object control ) {
        return thData;
    }
    
    public final void setThreadData(Object control, Object thData[] ) {
        this.thData=thData;
    }

    public final void setNote( Object control, int id, Object value ) {
        if( this.control != control ) return;
        notes[id]=value;
    }

    public final String getCurrentStage(Object control) {
        if( this.control != control ) return null;
        return currentStage;
    }

    /** Information about the current request ( or the main object
     * we are processing )
     */
    public final Object getParam(Object control) {
        if( this.control != control ) return null;
        return param;
    }

    public final void setCurrentStage(Object control, String currentStage) {
        if( this.control != control ) return;
        this.currentStage = currentStage;
    }

    public final void setParam( Object control, Object param ) {
        if( this.control != control ) return;
        this.param=param;
    }

    public final Object getNote(Object control, int id ) {
        if( this.control != control ) return null;
        return notes[id];
    }

    public final Hashtable getAttributes(Object control) {
        if( this.control != control ) return null;
        return attributes;
    }
}

 ThreadPoolRunnable.java

package com.xiao.tomcatthreadpool;

public interface ThreadPoolRunnable {
	
	public Object[] getInitData();
	
	public void runIt(Object thData[]);

}
 
分享到:
评论

相关推荐

    Tomcat线程池实现简介

    Tomcat提供了两种线程池实现,一种是基于Apache Portable Runtime (APR)的Pool技术,另一种是纯Java实现的ThreadPool。本文主要探讨后者,即Java实现的线程池。 Java实现的线程池位于`tomcat-util.jar`中,初始化时...

    TOMCAT的线程池源码

    Tomcat的线程池实现是基于Apache Commons JMX的ExecutorService,它是对Java标准库ExecutorService的一种扩展,增加了更多的监控和管理功能。 Tomcat的线程池主要由`org.apache.tomcat.util.threads.TaskQueue`和`...

    tomcat 工作原理

    NIO模式下,Tomcat使用一个线程池处理多个连接,提高了并发性能。 五、Session管理 Tomcat提供了内置的Session管理,包括会话创建、有效期设置、会话跟踪、分布式环境下的会话复制等。默认情况下,Session信息存储...

    简单的Tomcat源码实现

    5. **线程模型**:Tomcat使用基于Executor的线程池来处理请求,提高了并发性能。 6. **连接器(Connector)和协议处理**:Tomcat支持多种连接器,如HTTP/1.1的`Http11NioProtocol`和`Http11AprProtocol`,用于处理...

    tomcat实现websocket聊天室

    在这个“tomcat实现websocket聊天室”的项目中,我们将深入探讨如何利用Tomcat搭建一个具备单聊、群聊、数据库管理以及用户管理功能的聊天室。 首先,我们需要理解WebSocket API的基本概念。WebSocket协议定义了两...

    简单的tomcat实现1

    6. 性能优化:Tomcat可以通过调整连接器配置(如最大连接数、超时设置)和容器配置(如线程池大小、内存分配)来优化性能。此外,还可以启用压缩、缓存等特性以减少网络传输和提高响应速度。 总结来说,“简单的...

    Tomcat(二) Tomcat实现:Servlet与web.xml介绍 以及 源码分析Tomcat实现细节1

    - **并发线程模式**:Tomcat使用线程池来处理并发请求,提高性能。 - **接收请求与处理**:Tomcat通过Coyote Connector接收HTTP请求,并将其转化为内部格式供处理组件使用。 - **Servlet容器的实现**:Tomcat的...

    Tomcat深入剖析pdf+源码(Tomcat运行原理)

    Tomcat是一款广泛使用的开源Java Servlet容器,它实现了Java Servlet和JavaServer Pages(JSP)规范,是开发和部署Java Web应用程序的关键工具。Tomcat的运行原理主要包括以下几个方面: 1. **架构概述**:Tomcat的...

    MOOC网络编程实践期末(实现自己的Tomcat)

    在描述中提到的服务端采用Executors线程池实现,这意味着学生将学习如何使用Java的并发库来管理线程,提高并发性能。 4. **线程池**:ExecutorService是Java并发框架的一部分,它可以有效地管理和控制线程。使用...

    tomcat 7 和 tomcat 8

    7. 性能优化:Tomcat 8在内存管理、线程池和垃圾收集等方面进行了优化,提升了服务器的性能。 8. 安全增强:提供了更多的安全特性,如更好的密码加密存储和更强的身份验证机制。 总结来说,Tomcat 7和Tomcat 8在...

    tomcat-jdbc数据源所需jar包tomcat-jdbc.jar+tomcat-juli.jar

    此外,它实现了Java 5及更高版本的Executor接口,可以实现线程池,进一步提高性能。 2. **tomcat-juli.jar**: Juli是Tomcat的一个日志框架,全称为Apache Tomcat Utility for Logging Interface。这个jar文件包含...

    tomcat8.0.5手册,帮助文档

    4. 优化Tomcat线程池设置,根据实际负载调整。 七、故障排查 1. 查看`logs/catalina.out`、`logs/host-manager.*.log`和`logs/manager.*.log`等日志文件。 2. 使用JMX监控Tomcat运行状态,如内存使用、线程池情况等...

    Nginx实现tomcat与weblogic集群的负载均衡及故障处理

    对于性能调优,Tomcat可以通过调整`server.xml`中的参数,例如最大连接数、线程池大小等来提高处理能力。对于Nginx,可以优化工作进程数、连接数限制和超时设置等。还可以编写脚本来自动化这些调优过程,确保系统...

    How Tomcat Works 中文版+例程源码

    源码分析部分可能包括Tomcat的启动过程、请求处理流程、线程池管理、容器结构实现以及特定Servlet的生命周期管理等方面。通过阅读源码,开发者可以深入理解Tomcat如何处理网络请求,如何调度线程,以及如何管理和...

    java tomcat 监控程序

    这通常涉及到与Tomcat管理接口的交互,例如通过JMX(Java Management Extensions)来获取服务器的运行信息,如线程池、内存使用、请求处理等指标。 2. **异常检测**:当Tomcat服务出现异常时,监控程序需要能够识别...

    Tomcat 接口文档 API

    8. **JMX (Java Management Extensions)**: Tomcat使用JMX暴露管理MBeans(Managed Beans),允许通过JMX代理远程监控和管理Tomcat服务器的状态,包括启动/停止应用、查看线程池状态、调整配置等。 9. **Clustering...

    Tomcat Tomcat Tomcat Tomcat

    9. **性能优化**:Tomcat可以通过调整配置参数,如线程池大小、最大连接数、内存分配等,来提高其性能。同时,还可以结合使用缓存和负载均衡技术来进一步提升系统性能。 10. **扩展性**:虽然Tomcat本身是轻量级的...

    Tomat研究之ThreadPool

    ### Tomcat线程池的整体结构 文章首先提供了ThreadPool的类图,展示了其基本组成和关系。主要类包括`ThreadWithAttributes`、`ControlRunnable`、`ThreadPool`、`MonitorRunnable`和`ThreadPoolListener`等,这些类...

    tomcat源码

    2. **线程池管理**:Tomcat如何使用Executor(`Executor`接口和`ThreadPoolExecutor`实现)来管理线程,提高并发性能。 3. **会话管理**:Tomcat如何实现Session跟踪,包括Cookie、URL重写和基于数据库的持久化策略...

    tomcat6.0.18 解压缩版

    1. **Apache Tomcat**: Tomcat是由Apache软件基金会的Jakarta项目开发的,它是Java Servlet和JavaServer Pages规范的实现。Tomcat6.0.18是其6.0系列的一个版本,支持Servlet 2.5和JSP 2.1标准。 2. **无需安装**: ...

Global site tag (gtag.js) - Google Analytics