`
sfjsffjjj
  • 浏览: 11187 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

线程池示例代码,请大家多指教

阅读更多
包结构
src
  test
    TestThreadPool          测试类
  thread
    ThreadPool              线程池类
    WorkThread              工作线程类
    TaskMonitorThread       任务监测线程类
    TaskTimeOutThread       任务超时监测线程类
  task
    TaskManager             任务管理器
    WorkTask                任务接口
    WorkTaskImp             正常任务类
    WorkTaskAImp            异常任务类
    WorkTaskBImp            超时任务类
  event
    AbstractEvent           任务事件类
    BeginTaskEvent          任务执行开始事件类
    EndTaskEvent            任务执行结束事件类
    TaskRunTime             任务运行时间类
    TaskTimeOutEvent        任务执行超时事件类

源代码
package test;

import task.TaskManager;
import task.WorkTask;
import task.WorkTaskAImp;
import task.WorkTaskBImp;
import task.WorkTaskImp;
import thread.ThreadPool;

/**
 * 线程池测试类,测试功能如下:
 * 1、测试线程池创建功能
 * 2、测试处理并发请求功能
 * 3、测试关闭功能
 **/
public class TestThreadPool {
	public static void main(String[] args){
		//创建线程池,开启处理请求服务
		final int threadCount=10;
		ThreadPool pool=ThreadPool.getInstance();
		pool.init(threadCount);
		//接收客户端请求
		WorkTask task1=new WorkTaskBImp("执行超时任务...");
		TaskManager.addTask(task1);
		final int requestCount=20;
		for(int i=0;i<requestCount;i++){
			WorkTask task=new WorkTaskImp("执行第"+i+"个增加用户操作...");
			TaskManager.addTask(task);
		}
		WorkTask task2=new WorkTaskBImp("执行超时任务...");
		TaskManager.addTask(task2);
		for(int i=0;i<requestCount;i++){
			WorkTask task=new WorkTaskAImp("执行第"+i+"个修改用户异常操作...");
			TaskManager.addTask(task);
		}
		for(int i=0;i<requestCount;i++){
			WorkTask task=new WorkTaskImp("执行第"+i+"个删除用户操作...");
			TaskManager.addTask(task);
		}
		//关闭线程池
		try {
			Thread.sleep(2000);//为了显示处理请求效果
			pool.close();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

package thread;

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

import event.BeginTaskEvent;
import event.EndTaskEvent;

import task.TaskManager;
/**
 * 线程池类,功能如下:
 * 1、初始化线程池
 * 2、获取空闲线程
 * 3、任务运行,注册超时监测
 * 4、任务结束,注销超时监测
 * 5、关闭线程池
 */
public class ThreadPool {
	private int threadcount;
	private int GetIdleThreadPollTime=50;//获取空闲线程轮询间隔时间,可配置
	private static ThreadPool pool=new ThreadPool();//线程实例	
	private Vector<WorkThread> threadlist=new Vector<WorkThread>();//工作线程列表
	private TaskMonitorThread mainThread;//任务监测线程
	private TaskTimeOutThread timeThread; //任务超时线程
	private boolean StopGetIdleThread=false;
    //单例模式
	private ThreadPool(){
	}	
	public static synchronized ThreadPool getInstance(){
		return pool;
	}
	private void stopGetIdleThread(){
		StopGetIdleThread = true;
	}
	//初始化线程池
	public  void init(int count){
		System.out.println("开始初始化线程池...");
		threadcount=count;
		for(int i=0;i<count;i++){
			WorkThread t=new WorkThread(new Integer(i));
			threadlist.add(t);
			t.start();
		}
		mainThread=new  TaskMonitorThread(pool);
		mainThread.start();
		timeThread=new TaskTimeOutThread(pool);
		timeThread.start();
		System.out.println("结束初始化线程池...");
	}
	//获取空闲线程
	public  WorkThread getIdleThread(){
			while(true){
				if (StopGetIdleThread) return null;
				synchronized(threadlist){
					for(int i=0;i<threadlist.size();i++){
						WorkThread t=(WorkThread)threadlist.get(i);
						if (t.getMyState().equals(WorkThread.IDlESTATE)){
							return t;
						}
					}
				}
				try {
					Thread.sleep(GetIdleThreadPollTime);//放弃CPU,若干时间后重新获取空闲线程
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
	}

	//任务运行,注册监测
	public  void beginTaskRun(BeginTaskEvent begin){
		timeThread.beginTaskRun(begin);
	}
	//任务结束,注销监视
	public  void endTaskRun(EndTaskEvent end){
		timeThread.endTaskRun(end);
	}
	
	//从工作线程表中移除线程
	public  void removeWorkThread(WorkThread t){
		threadlist.remove(t);
	}
	//添加新的线程
	public void addWorkThread(){
		synchronized(threadlist){
			WorkThread t=new WorkThread(new Integer(++threadcount));
			threadlist.add(t);
			t.start();
		}
	}
	
	//关闭线程池
	public  void close(){
		//停止获取空闲线程
		stopGetIdleThread();
		//关闭任务监测线程,不再接收请求
		mainThread.kill();
		//关闭超时监测线程
		timeThread.kill();
		//关闭工作线程,不再处理任务
		for(int i=0;i<threadlist.size();i++){
			WorkThread t=(WorkThread)threadlist.get(i);
			t.kill();
		}
	}
	
	
}

package thread;

import task.WorkTask;
import event.BeginTaskEvent;
import event.EndTaskEvent;
/**
 * 工作线程类,功能如下:
 * 1、执行业务方法,业务参数可动态设置
 * 2、自身状态可设置、可获取
 * 3、自我唤醒功能
 * 4、自杀功能
 */
public final class WorkThread extends Thread{
	private boolean shutdown=false;
	private String info; //业务参数
	private Object threadKey;//线程标识
	private Object lock=new Object();//锁对象
	private String state; //线程状态
	private int waitExecFinishPollTime=500;//关闭线程时的轮询等待时间,可配置
	public static final String CREATESTATE="1";//创建状态
	public static final String RUNSTATE="2";   //运行状态
	public static final String IDlESTATE="3";  //空闲状态
    private WorkTask nowTask; //当前任务

	//获取线程标识key
	public Object getThreadKey() {
		return threadKey;
	}      
    //设置线程的任务
    public void setWorkTask(WorkTask task){
    	this.nowTask=task;
    }
	//设置是否关闭线程
	private void setShutdown(boolean shutdown) {
		this.shutdown = shutdown;
	}
	//设置线程状态
	private void setMyState(String state){
		this.state=state;
	}
	//获取线程状态
	public String getMyState(){
		return state;
	}
	public WorkThread(Object key){
		System.out.println("正在创建工作线程...线程编号"+key.toString());
		this.threadKey=key;
		this.state=CREATESTATE;
	}
	
	@Override
	public synchronized void start() {
		// TODO Auto-generated method stub
		super.start();
		setMyState(RUNSTATE);
	}
	public void run(){
		while(true){
			try {
				setMyState(IDlESTATE);
				synchronized(this){
					wait(); /*开始等待,直至被激活*/
				}
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			if (shutdown) break;
			try{
				new BeginTaskEvent(this,Thread.currentThread(),nowTask).execute();
				nowTask.execute();//执行业务
				new EndTaskEvent(this,Thread.currentThread(),nowTask).execute();
			}catch(Exception e){
				new EndTaskEvent(this,Thread.currentThread(),nowTask).execute();
				System.out.println(e.getMessage());
			}
		}
	}
	//重新激活线程
	public void activate(){
		synchronized(this){
			setMyState(RUNSTATE);
			notify();
		}
	}
	//关闭线程
	public void kill(){
		synchronized(this){
		    if (this.getMyState().equals(IDlESTATE)){//如果线程处于空闲状态,则直接关掉
				System.out.println("正在关闭工作线程...线程编号"+threadKey.toString());
		    	this.setShutdown(true);
		    	this.activate();
		    }else if (this.getMyState().equals(RUNSTATE)){//如果线程处于运行状态,则执行完后再关掉
				System.out.println("正在等待线程执行业务完成...工作线程编号"+threadKey.toString());
		    	while(this.getMyState().equals(RUNSTATE)){
		    		try {
						Thread.sleep(waitExecFinishPollTime);//放弃CPU,若干时间后再检查线程状态
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
		    	}
				System.out.println("正在关闭工作线程...线程编号"+threadKey.toString());
		    	this.setShutdown(true);
		    	this.activate();
		    }
		}
	}
}


package thread;

import task.TaskManager;
import task.WorkTask;
/**
 * 任务检测线程类
 * 1、自杀功能 
 */
public final class TaskMonitorThread extends Thread {
	private ThreadPool threadPool;
	private int GetWorkTaskPollTime=10;//监测任务轮询时间,可配置
	private boolean shutdown=false; 
	public TaskMonitorThread(ThreadPool pool){
		System.out.println("正在创建任务监测线程...");
		this.threadPool=pool;
	}
	private void setShutDown(boolean b){
		this.shutdown=b;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
			while(true){
				if (shutdown) break;
				WorkTask task=TaskManager.getWorkTask();//看是否有任务请求
				if (task==null){
					try {
						Thread.sleep(GetWorkTaskPollTime);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}else{
					WorkThread t=threadPool.getIdleThread();//获取空闲线程
					if (t==null) break;
					t.setWorkTask(task);//设置线程任务
					task.setTaskThreadKey(t.getThreadKey());//为了显示任务当前线程
					t.activate();//激活空闲线程
					try {
						Thread.sleep(GetWorkTaskPollTime); 
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
	}
	//关闭线程
	public  void kill(){
		System.out.println("正在关闭任务监测线程...");
		this.setShutDown(true);
	}
	
}

package thread;

import java.util.Vector;

import event.BeginTaskEvent;
import event.EndTaskEvent;
import event.TaskRunTime;
import event.TaskTimeOutEvent;
/**
 * 任务超时监测线程类
 * 1、任务开始注册
 * 2、任务完成注销
 * 3、自杀功能 
 */
public class TaskTimeOutThread extends Thread {
	private ThreadPool pool;
	private boolean shutdown=false;
	private Vector<TaskRunTime> taskruntimelist=new Vector<TaskRunTime>();//运行任务列表
	private int pollTime=500; //轮询时间
	private int TaskOutTime=2000; //任务过时时间
	public TaskTimeOutThread(ThreadPool pool){
		this.pool=pool;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
			while(!shutdown){
				synchronized(taskruntimelist){
					for(int i=0;i<taskruntimelist.size();i++){
						TaskRunTime t=(TaskRunTime) taskruntimelist.get(i);
						if (t.checkRunTimeOut(TaskOutTime)){
							taskruntimelist.remove(i);
							new TaskTimeOutEvent(t.getEvent()).execute();
							break;
						}
					}
				}
				try {
					sleep(pollTime);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
	}
	
	//任务运行,开始监测
	public  void beginTaskRun(BeginTaskEvent begin){
		taskruntimelist.add(new TaskRunTime(begin));
	}
	//任务正常结束
	public  void endTaskRun(EndTaskEvent end){
		synchronized(taskruntimelist){
			for(int i=0;i<taskruntimelist.size();i++){
				BeginTaskEvent begin=((TaskRunTime) taskruntimelist.get(i)).getEvent();
				if (begin.equals(end)){
					taskruntimelist.remove(i);
					break;
				}
			}
		}
	}

	
	//自杀
	public void kill(){
		System.out.println("正在关闭超时监测线程...");
		while(taskruntimelist.size()>0){
			try {
				Thread.sleep(pollTime);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		shutdown=true;
	}
	
}

package task;

import java.util.ArrayList;
import java.util.List;
/**
 *  任务管理器
 *  1、添加任务
 *  2、监测是否有新任务
 */
public class TaskManager {
	private  static List taskQueue=new ArrayList<WorkTask>(); //任务队列 
	private TaskManager(){
		
	}
	//添加任务                                            
	public synchronized static  void addTask(WorkTask task){
		taskQueue.add(task);
	}
	//判断是否有任务未执行
	public synchronized static WorkTask getWorkTask(){
		if (taskQueue.size()>0){
			return (WorkTask)taskQueue.remove(0);
		}else
			return null;
	}
}

package task;
/**
 * 任务接口 
 * 继承它来定义自己具体的工作任务
 */
public interface WorkTask {
	void execute() throws Exception; //执行工作任务
	void setTaskThreadKey(Object key);//设置任务线程编号
}

package task;
/**
 * 任务类1
 * 正常执行的工作任务 
 */
public class WorkTaskImp implements WorkTask {
	protected String param;
	protected Object threadkey; //为了显示执行线程编号
	protected final int TaskExecTime=500; //任务执行时间
	public void execute() throws Exception {
		// TODO Auto-generated method stub
		System.out.println(param+"工作线程编号"+threadkey.toString());
	    Thread.sleep(TaskExecTime);
	}
	public WorkTaskImp(String param){
		this.param=param;
	}
	public void setTaskThreadKey(Object key){
		this.threadkey=key;
	}
	public String toString(){
		return param+"工作线程编号"+threadkey.toString();
	}
}

package task;
/**
 * 任务类2
 * 执行报异常的工作任务 
 */
public class WorkTaskAImp extends WorkTaskImp{
	public WorkTaskAImp(String param) {
		super(param);
		// TODO Auto-generated constructor stub
	}
	public void execute() throws Exception {
		// TODO Auto-generated method stub
		throw new Exception("运行WorkTaskAImp任务时出错");
	}

}

package task;
/*
 * 任务类3
 * 执行超时的工作任务
 */
public class WorkTaskBImp extends WorkTaskImp{

	public WorkTaskBImp(String param) {
		super(param);
		// TODO Auto-generated constructor stub
	}

	public void execute() throws Exception {
		// TODO Auto-generated method stub
		System.out.println("正在"+param);
	    Thread.sleep(50000); //随便定义
	}

}

package event;

import task.WorkTask;
import thread.WorkThread;
/*
 *任务抽象事件 
 */
public abstract class AbstractEvent {
	protected WorkThread workthread;
	protected Thread nowthread;
	protected WorkTask nowtask;
	//事件触发
	public synchronized void execute(){};
	@Override
	public boolean equals(Object obj) {
		// TODO Auto-generated method stub
		AbstractEvent other=(AbstractEvent)obj;
		return this.workthread==other.workthread&&this.nowtask==this.nowtask;
	};
	
}

package event;

import task.WorkTask;
import thread.ThreadPool;
import thread.WorkThread;
/*
 * 任务开始运行事件
 */
public class BeginTaskEvent extends AbstractEvent{
	public BeginTaskEvent(WorkThread workthread,Thread nowthread,WorkTask task){
		this.workthread=workthread;
		this.nowthread=nowthread;
		this.nowtask=task;
	}
	@Override
	public  void execute() {
		// TODO Auto-generated method stub
		ThreadPool pool=ThreadPool.getInstance();
		pool.beginTaskRun(this);
	}
}

package event;

import task.WorkTask;
import thread.ThreadPool;
import thread.WorkThread;
/*
 * 任务运行结束事件
 */
public class EndTaskEvent extends AbstractEvent {
	public EndTaskEvent(WorkThread workthread,Thread nowthread,WorkTask task){
		this.workthread=workthread;
		this.nowthread=nowthread;
		this.nowtask=task;
	}

	@Override
	public  void execute() {
		// TODO Auto-generated method stub
		ThreadPool pool=ThreadPool.getInstance();
		pool.endTaskRun(this);
	}
	
}

package event;
/*
 * 任务运行时间类
 */
public class TaskRunTime {
		private long begintime;
		private long endtime;
		private BeginTaskEvent event;
		public TaskRunTime(BeginTaskEvent event){
			this.event=event;
			this.begintime=System.currentTimeMillis();
			this.endtime=this.begintime;
		}
		public BeginTaskEvent getEvent() {
			return event;
		}
		//检查是否超时
		public boolean checkRunTimeOut(long maxtime){
			endtime=System.currentTimeMillis();
			long cha=endtime-begintime;
			return cha>=maxtime;
		}
		
	}

package event;

import task.WorkTask;
import thread.ThreadPool;
import thread.WorkThread;
/*
 * 任务超时事件
 */
public class TaskTimeOutEvent  {
	private AbstractEvent event;
	public TaskTimeOutEvent(AbstractEvent event){
		this.event=event;
	}


	@SuppressWarnings("deprecation")
	public  void execute() {
		// TODO Auto-generated method stub
		ThreadPool pool=ThreadPool.getInstance();
		pool.addWorkThread();
		pool.removeWorkThread(event.workthread);
		Object obj=event.workthread.getThreadKey();
		System.out.println("正在停止工作超时线程...线程编号"+obj);
		event.nowthread.stop();
		
	}

}

运行结果
开始初始化线程池...
正在创建工作线程...线程编号0
正在创建工作线程...线程编号1
正在创建工作线程...线程编号2
正在创建工作线程...线程编号3
正在创建工作线程...线程编号4
正在创建工作线程...线程编号5
正在创建工作线程...线程编号6
正在创建工作线程...线程编号7
正在创建工作线程...线程编号8
正在创建工作线程...线程编号9
正在创建任务监测线程...
结束初始化线程池...
正在执行超时任务1...
执行第0个增加用户操作...工作线程编号1
执行第1个增加用户操作...工作线程编号2
执行第2个增加用户操作...工作线程编号3
执行第3个增加用户操作...工作线程编号4
执行第4个增加用户操作...工作线程编号5
执行第5个增加用户操作...工作线程编号6
执行第6个增加用户操作...工作线程编号7
执行第7个增加用户操作...工作线程编号8
执行第8个增加用户操作...工作线程编号9
执行第9个增加用户操作...工作线程编号1
执行第10个增加用户操作...工作线程编号2
执行第11个增加用户操作...工作线程编号3
执行第12个增加用户操作...工作线程编号4
执行第13个增加用户操作...工作线程编号5
执行第14个增加用户操作...工作线程编号6
执行第15个增加用户操作...工作线程编号7
执行第16个增加用户操作...工作线程编号8
执行第17个增加用户操作...工作线程编号9
执行第18个增加用户操作...工作线程编号1
执行第19个增加用户操作...工作线程编号2
正在执行超时任务2...
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
运行WorkTaskAImp任务时出错
执行第0个删除用户操作...工作线程编号4
执行第1个删除用户操作...工作线程编号5
执行第2个删除用户操作...工作线程编号6
执行第3个删除用户操作...工作线程编号7
执行第4个删除用户操作...工作线程编号8
执行第5个删除用户操作...工作线程编号9
执行第6个删除用户操作...工作线程编号1
执行第7个删除用户操作...工作线程编号2
执行第8个删除用户操作...工作线程编号4
执行第9个删除用户操作...工作线程编号5
执行第10个删除用户操作...工作线程编号6
执行第11个删除用户操作...工作线程编号7
执行第12个删除用户操作...工作线程编号8
执行第13个删除用户操作...工作线程编号9
正在关闭任务监测线程...
正在关闭超时监测线程...
正在创建工作线程...线程编号11
正在停止工作超时线程...线程编号0
正在创建工作线程...线程编号12
正在停止工作超时线程...线程编号3
正在关闭工作线程...线程编号1
正在关闭工作线程...线程编号2
正在关闭工作线程...线程编号4
正在关闭工作线程...线程编号5
正在关闭工作线程...线程编号6
正在关闭工作线程...线程编号7
正在关闭工作线程...线程编号8
正在关闭工作线程...线程编号9
正在关闭工作线程...线程编号11
正在关闭工作线程...线程编号12

  • src.rar (5.4 KB)
  • 下载次数: 390
分享到:
评论
36 楼 C_J 2009-08-20  
xujunJ2EE 写道
to C-J:
要看你的status放置到什么地方了,如果你要求Pool中的对象都必须有status这个属性,那么对这些对象就只能侵入了。
或者你可以通过在Pool中放置一个proxy对象,该对象包含你的原始对象和status属性。
但总的来说,我还是感觉common pool的实现比较简单,在容器中直接放置空闲对象。我以前作对象池的思路也是类似自己维护状态,后来想想,其实有更简单的方式为什么不用呢


对,我的意思也就是 status属性有必要么? 恰好我看到2个项目都这样做...所以会提出疑问..

35 楼 sfjsffjjj 2009-08-19  
同意fjlyxx观点,我们就是要探讨不同情况下的对应的最优的线程池模型
fjlyxx兄,看了你的代码有几个问题请教一下
1、代码池的最小空闲线程个数和最大空闲线程个数由什么来决定
2、代码池为什么没有监控请求的线程,而只有监测工作线程状态和优化工作线程个数的监控线程,
   监控请求的线程也应该放在线程池中吧

希望大家可以分享自己研究的现有优秀框架和服务器的线程池方面的心得
到最后每个人都能写出一篇对线程池模型应用的总结
那就是我们结贴的时刻了



34 楼 fjlyxx 2009-08-19  
wmj2003 写道
建议多看看jdk1.5及以上版本的多线程相关的东西,线程池在jdk1.5以后已经不推荐使用了。


不同意这种看法,线程池是死的但是怎么活用它学问就大了.同一个线程池可以用的很好也可以用的很垃圾.
几种经典的线程使用模型还是很有用的.怎么找可用线程,怎么返回线程,创建线程策略,线程和业务的无缝结合,通信等等内容够喝一壶的.
33 楼 wmj2003 2009-08-19  
建议多看看jdk1.5及以上版本的多线程相关的东西,线程池在jdk1.5以后已经不推荐使用了。
32 楼 xujunJ2EE 2009-08-19  
to C-J:
要看你的status放置到什么地方了,如果你要求Pool中的对象都必须有status这个属性,那么对这些对象就只能侵入了。
或者你可以通过在Pool中放置一个proxy对象,该对象包含你的原始对象和status属性。
但总的来说,我还是感觉common pool的实现比较简单,在容器中直接放置空闲对象。我以前作对象池的思路也是类似自己维护状态,后来想想,其实有更简单的方式为什么不用呢
31 楼 C_J 2009-08-19  
xujunJ2EE 写道
LZ的容器放置的是所有的Thread对象,给每个Thread对象一个状态,在getIdleThread()的时候通过遍历Thread的状态来决定是给空闲对象还是等待,这样其实不好。common pool是这样设计的:容器中只保存空闲对象。这样只要判断容器的size就可以知道是否有空闲对象了,当然common pool是不等待的,它在判断如果没有空闲对象的话就直接调用factory的makeObject方法new出一个新对象。LZ可以自己加上wait处理。获得对象后容器remove()该对象。
在return的时候,就直接把对象add()容器中,这样是不是更简单呢 哈哈


1,同意,我感觉pool最原初的设计就应该是这样,它应该不涉及到业务逻辑或者说对象属性,borrow的时候返回的是一个原始的对象,只是这个对象早已经被构造了.
在returnObject()和borrowObject()的时候 应该事先对 对象 进行一些处理或者说成员变量的修正.


2,如果status来标识actived or sleeping,我感觉是不是也有它的好处?
30 楼 xujunJ2EE 2009-08-19  
LZ的容器放置的是所有的Thread对象,给每个Thread对象一个状态,在getIdleThread()的时候通过遍历Thread的状态来决定是给空闲对象还是等待,这样其实不好。common pool是这样设计的:容器中只保存空闲对象。这样只要判断容器的size就可以知道是否有空闲对象了,当然common pool是不等待的,它在判断如果没有空闲对象的话就直接调用factory的makeObject方法new出一个新对象。LZ可以自己加上wait处理。获得对象后容器remove()该对象。
在return的时候,就直接把对象add()容器中,这样是不是更简单呢 哈哈
29 楼 lcllcl987 2009-08-19  
LZ想挑战Doug Lea
28 楼 aswang 2009-08-18  
wahahah
public synchronized void init(int count){   
        System.out.println("开始初始化线程池...");   
        this.threadCount=count;   
        for(int i=0;i<count;i++){   
            WorkThread t=new WorkThread(new Integer(i));   
            threadlist.add(t);   
            t.start();   
        }  
27 楼 aswang 2009-08-18  
public class ThreadPool {   
    private int threadCount; //线程总个数   
    private int GetIdleThreadPollTime=50;//获取空闲线程轮询间隔时间,可配置   
    private static ThreadPool pool=new ThreadPool();//线程实例     
    private List threadlist=new ArrayList();//工作线程列表   
    private TaskMonitorThread mainThread;//任务监测线程   
    private boolean StopGetIdleThread=false;   
    //单例模式   
    private ThreadPool(){   
    }      
    public static  ThreadPool getInstance(){   
        return pool;   
    }   
}


这段代码很好 !
26 楼 lnaigg 2009-08-18  
没事不要搞这种底层组件的山寨轮子,真的。
25 楼 C_J 2009-08-17  

需要的时候去pool中拿thread对象,做完业务后,再放回pool...
如果线程数达到maxSleeping阀值,让业务等待..
我觉得没必要在init的时候就让thread start
24 楼 fjlyxx 2009-08-17  
sfjsffjjj 写道
1、

从上图可以看出:线程池的工作线程总数应该等于业务平均完成时间内的请求总数
2、

从上图可以看出:随着请求频率的变大,线程池的工作线程总数也越来越大
3、

综合分析:1、请求频率超高的情况,通过增加工作线程数是满足不了,因为每个系统都
有个最优线程总数,超过这个线程总数,多线程就没多大意义了。
2、请求频率超高的情况,只能降低请求频率,依据公式【请求频率=业务平均完成时间
内的请求个数/业务平均完成时间】,我们只能降低业务平均完成时间,估计得通过硬件
来解决吧





LZ 我觉得你可以转变一种思路 不要任务/请求 去驱动线程 哪里有一个请求就一个线程的说法
我觉得可以反过来  用线程去做任务  如果你有空闲的线程 那么就让这些线程去跑业务 如果没有 任务就放到队列里面 让它等着 直到它超时 或被调用.
23 楼 benswallow 2009-08-17  
LZ,还在用1.4.换成1.5吧,直接Executor类
22 楼 sfjsffjjj 2009-08-17  
1、

从上图可以看出:线程池的工作线程总数应该等于业务平均完成时间内的请求总数
2、

从上图可以看出:随着请求频率的变大,线程池的工作线程总数也越来越大
3、

综合分析:1、请求频率超高的情况,通过增加工作线程数是满足不了,因为每个系统都
有个最优线程总数,超过这个线程总数,多线程就没多大意义了。
2、请求频率超高的情况,只能降低请求频率,依据公式【请求频率=业务平均完成时间
内的请求个数/业务平均完成时间】,我们只能降低业务平均完成时间,估计得通过硬件
来解决吧



21 楼 niveko 2009-08-17  
建议楼主先去看看1.5里面Doug Lea先生写的ThreadPoolExecutor类的源码,然后再决定自己是否应该再写一个线程池。你现在虽然写的很容易,却有可能会给后来的人维护造成很大的困难!
20 楼 C_J 2009-08-17  
commmons pool的接口定义大致分两块,如下:

/*
 * @author Rodney Waldhoff
 * @author Sandy McArthur
 * @version $Revision: 777748 $ $Date: 2009-05-23 08:00:44 +0800 $
 * @since Pool 1.0
 */

// pool本身接口
public abstract class BaseKeyedObjectPool implements KeyedObjectPool {
    public abstract Object borrowObject(Object key) throws Exception;
    public abstract void returnObject(Object key, Object obj) throws Exception;
    public abstract void invalidateObject(Object key, Object obj) throws Exception;
    public void addObject(Object key) throws Exception, UnsupportedOperationException {
        throw new UnsupportedOperationException();
    }
    public int getNumIdle(Object key) throws UnsupportedOperationException {
        return -1;
    }
    public int getNumActive(Object key) throws UnsupportedOperationException {
        return -1;
    }  
    public int getNumIdle() throws UnsupportedOperationException {
        return -1;
    }
    public int getNumActive() throws UnsupportedOperationException {
        return -1;
    }
    public void clear() throws Exception, UnsupportedOperationException {
        throw new UnsupportedOperationException();
    }   
    public void clear(Object key) throws Exception, UnsupportedOperationException {
        throw new UnsupportedOperationException();
    }   
    public void close() throws Exception {
        closed = true;
    }   
    public void setFactory(KeyedPoolableObjectFactory factory) throws IllegalStateException, UnsupportedOperationException {
        throw new UnsupportedOperationException();
    }   
    protected final boolean isClosed() {
        return closed;
    }   
    protected final void assertOpen() throws IllegalStateException {
        if(isClosed()) {
            throw new IllegalStateException("Pool not open");
        }
    }
    private volatile boolean closed = false;
}


/*
 * @author Rodney Waldhoff
 * @version $Revision: 791907 $ $Date: 2009-07-08 00:56:33 +0800 $
 * @since Pool 1.0
 */

// pool factory接口
public abstract class BaseKeyedPoolableObjectFactory implements KeyedPoolableObjectFactory {
    /**
     * Create an instance that can be served by the pool.
     *
     * @param key the key used when constructing the object
     * @return an instance that can be served by the pool
     */
    public abstract Object makeObject(Object key)
        throws Exception;

    /**
     * Destroy an instance no longer needed by the pool.
     * <p>
     * The default implementation is a no-op.
     * </p>
     *
     * @param key the key used when selecting the instance
     * @param obj the instance to be destroyed
     */
    public void destroyObject(Object key, Object obj)
        throws Exception {
    }

    /**
     * Ensures that the instance is safe to be returned by the pool.
     * <p>
     * The default implementation always returns <tt>true</tt>.
     * </p>
     *
     * @param key the key used when selecting the object
     * @param obj the instance to be validated
     * @return always <code>true</code> in the default implementation
     */ 
    public boolean validateObject(Object key, Object obj) {
        return true;
    }

    /**
     * Reinitialize an instance to be returned by the pool.
     * <p>
     * The default implementation is a no-op.
     * </p>
     *
     * @param key the key used when selecting the object
     * @param obj the instance to be activated
     */
    public void activateObject(Object key, Object obj)
        throws Exception {
    }

    /**
     * Uninitialize an instance to be returned to the idle object pool.
     * <p>
     * The default implementation is a no-op.
     * </p>
     *
     * @param key the key used when selecting the object
     * @param obj the instance to be passivated
     */
    public void passivateObject(Object key, Object obj)
        throws Exception {
    }
}


对factory看得不是很明白....
19 楼 windywany 2009-08-17  
用Excutor不是很好?线程这东西,能少尽量少.
18 楼 luoyahu 2009-08-17  
jdk 1.5不是就有吗?何必再写一个。。。。
17 楼 C_J 2009-08-16  
稍微研究了下commons的代码.
commons的对象池比LZ的要强悍有如下几个方面(不全,望高手补充):
1,pool的每个进出object都进行类型检查
2,提供了一个线程安全的版本.
3,对当前active object和sleeping object的控制
4,对object的获取和释放的时间控制
5,容错处理

其他...
// 以下是commons项目中一个最简单的pool(对象是用Stack存储的),还有其他复杂的存储结构,对象数量控制和时间控制的pool现在还没看懂,呵呵,功力不够啊~ 先分享下晚上的学习成果吧(对这里的factory角色还没理解到位,只是做点简单阅读,还要向大家学习).


package java.org.apache.commons.pool.impl;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Stack;
import java.org.apache.commons.pool.BaseObjectPool;
import java.org.apache.commons.pool.ObjectPool;
import java.org.apache.commons.pool.PoolableObjectFactory;
/**
 *
 * @author Rodney Waldhoff  //三位作者
 * @author Dirk Verbeeck
 * @author Sandy McArthur
 * @version $Revision: 777748 $ $Date: 2009-05-23 08:00:44 +0800 $ //时间还比较新
 * @since Pool 1.0
 */
public class StackObjectPool extends BaseObjectPool implements ObjectPool {
  /** The default cap on the number of "sleeping" instances in the pool. */
    protected static final int DEFAULT_MAX_SLEEPING  = 8; // 池中最少保留对象数
    protected static final int DEFAULT_INIT_SLEEPING_CAPACITY = 4; // 初始化池中对象数

    /** My pool. */
    protected Stack _pool = null; // 用Stack存储池中对象

    /** My {@link PoolableObjectFactory}. */
    protected PoolableObjectFactory _factory = null; // 工厂对象

    /** The cap on the number of "sleeping" instances in the pool. */
    protected int _maxSleeping = DEFAULT_MAX_SLEEPING;

    /** Number of object borrowed but not yet returned to the pool. */
    protected int _numActive = 0; //当前对象活动数
    // 构造方法1
    public StackObjectPool() {
        this((PoolableObjectFactory)null,DEFAULT_MAX_SLEEPING,DEFAULT_INIT_SLEEPING_CAPACITY);
    }

    // 构造方法2
    public StackObjectPool(int maxIdle) {
        this((PoolableObjectFactory)null,maxIdle,DEFAULT_INIT_SLEEPING_CAPACITY);
    }

    // 构造方法3
    public StackObjectPool(int maxIdle, int initIdleCapacity) {
        this((PoolableObjectFactory)null,maxIdle,initIdleCapacity);
    }

    // 构造方法4
    public StackObjectPool(PoolableObjectFactory factory) {
        this(factory,DEFAULT_MAX_SLEEPING,DEFAULT_INIT_SLEEPING_CAPACITY);
    }

    // 构造方法5
    public StackObjectPool(PoolableObjectFactory factory, int maxIdle) {
        this(factory,maxIdle,DEFAULT_INIT_SLEEPING_CAPACITY);
    }

    /**
     * Create a new <tt>SimpleObjectPool</tt> using
     * the specified <i>factory</i> to create new instances,
     * capping the number of "sleeping" instances to <i>max</i>,
     * and initially allocating a container capable of containing
     * at least <i>init</i> instances.
     *
     * @param factory the {@link PoolableObjectFactory} used to populate the pool
     * @param maxIdle cap on the number of "sleeping" instances in the pool
     * @param initIdleCapacity initial size of the pool (this specifies the size of the container,
     *             it does not cause the pool to be pre-populated.)
     */
    public StackObjectPool(PoolableObjectFactory factory, int maxIdle, int initIdleCapacity) {
        _factory = factory;
        _maxSleeping = (maxIdle < 0 ? DEFAULT_MAX_SLEEPING : maxIdle);
        int initcapacity = (initIdleCapacity < 1 ? DEFAULT_INIT_SLEEPING_CAPACITY : initIdleCapacity);
        _pool = new Stack();
        _pool.ensureCapacity( initcapacity > _maxSleeping ? _maxSleeping : initcapacity); // at least the number of components
    }


     /**
     * Create an object, and place it into the pool.
     * addObject() is useful for "pre-loading" a pool with idle objects.
     * @throws Exception when the {@link #_factory} has a problem creating an object.
     */
    public synchronized void addObject() throws Exception {
        assertOpen(); // 检查pool是否关闭
        if (_factory == null) {
            throw new IllegalStateException("Cannot add objects without a factory.");
        }
        Object obj = _factory.makeObject(); // 创建一个可以被pool服务的对象

        boolean success = true;
        if(!_factory.validateObject(obj)) { // 进行新对象的类型检查
            success = false; 
        } else {
            _factory.passivateObject(obj); // Uninitialize an instance to be returned to the idle object pool.
        }

        boolean shouldDestroy = !success;

        if (success) {
            Object toBeDestroyed = null;
            if(_pool.size() >= _maxSleeping) { //  如果STATCK的大小大于pool的阀值,释放Stack中一个对象
                shouldDestroy = true;
                toBeDestroyed = _pool.remove(0); // remove the stalest object
	
            }
            _pool.push(obj);
            obj = toBeDestroyed; // swap returned obj with the stalest one so it can be destroyed
        }
        notifyAll(); // _numIdle has changed
	

	// 把以下代码写到这里的原因是 _factory可能是为null的,可支持无工厂对象池的创建
        if(shouldDestroy) { // by constructor, shouldDestroy is false when _factory is null
            try {
                _factory.destroyObject(obj); // 这里看得不是很明白
            } catch(Exception e) {
                // ignored
            }
        }
    }
    
    // 从对象池获取对象
    public synchronized Object borrowObject() throws Exception {
        assertOpen();
        Object obj = null;
        boolean newlyCreated = false;
        while (null == obj) {
            if (!_pool.empty()) {
                obj = _pool.pop();
            } else {
                if(null == _factory) {
                    throw new NoSuchElementException();
                } else {
                    obj = _factory.makeObject(); // 创建一个可以被pool服务的对象
                    newlyCreated = true;
                  if (obj == null) {
                    throw new NoSuchElementException("PoolableObjectFactory.makeObject() returned null.");
                  }
                }
            }
            if (null != _factory && null != obj) {
                try {
                    _factory.activateObject(obj); // 重新配置当前对象,准备给上层系统应用
                    if (!_factory.validateObject(obj)) { // 校验类型
                        throw new Exception("ValidateObject failed");
                    }
                } catch (Throwable t) {
                    try {
                        _factory.destroyObject(obj);
                    } catch (Throwable t2) {
                        // swallowed
                    } finally {
                        obj = null;
                    }
                    if (newlyCreated) {
                        throw new NoSuchElementException(
                            "Could not create a validated object, cause: " +
                            t.getMessage());
                    }
                }
            }
        }
        _numActive++; // 活动数量+1
        return obj;
    }
    
    // 返回一个对象给pool
    public synchronized void returnObject(Object obj) throws Exception {
        boolean success = !isClosed();
        if(null != _factory) { 
            if(!_factory.validateObject(obj)) { // 类型校验
                success = false;
            } else {
                try {
                    _factory.passivateObject(obj); // 卸载上层系统的对象,对象准备返回pool
                } catch(Exception e) {
                    success = false;
                }
            }
        }

        boolean shouldDestroy = !success;

        _numActive--;
        if (success) {
            Object toBeDestroyed = null;
            if(_pool.size() >= _maxSleeping) {
                shouldDestroy = true;
                toBeDestroyed = _pool.remove(0); // remove the stalest object
            }
            _pool.push(obj);
            obj = toBeDestroyed; // swap returned obj with the stalest one so it can be destroyed
        }
        notifyAll(); // _numActive has changed

        if(shouldDestroy) { // by constructor, shouldDestroy is false when _factory is null
            try {
                _factory.destroyObject(obj);
            } catch(Exception e) {
                // ignored
            }
        }
    }

    public synchronized void invalidateObject(Object obj) throws Exception {
        _numActive--;
        if (null != _factory) {
            _factory.destroyObject(obj); // 校验类型
        }
        notifyAll(); // _numActive has changed
    }

    /**
     * Return the number of instances
     * currently idle in this pool.
     *
     * @return the number of instances currently idle in this pool
     */
    public synchronized int getNumIdle() {
        return _pool.size();
    }

    /**
     * Return the number of instances currently borrowed from this pool.
     *
     * @return the number of instances currently borrowed from this pool
     */
    public synchronized int getNumActive() {
        return _numActive;
    }

    /**
     * Clears any objects sitting idle in the pool.
     */
    public synchronized void clear() {
        if(null != _factory) {
            Iterator it = _pool.iterator();
            while(it.hasNext()) {
                try {
                    _factory.destroyObject(it.next());
                } catch(Exception e) {
                    // ignore error, keep destroying the rest
                }
            }
        }
        _pool.clear();
    }

    /**
     * Close this pool, and free any resources associated with it.
     * <p>
     * Calling {@link #addObject} or {@link #borrowObject} after invoking
     * this method on a pool will cause them to throw an
     * {@link IllegalStateException}.
     * </p>
     *
     * @throws Exception <strong>deprecated</strong>: implementations should silently fail if not all resources can be freed.
     */
    public void close() throws Exception {
        super.close();
        clear();
    }

   
    /**
     * Sets the {@link PoolableObjectFactory factory} this pool uses
     * to create new instances. Trying to change
     * the <code>factory</code> while there are borrowed objects will
     * throw an {@link IllegalStateException}.
     *
     * @param factory the {@link PoolableObjectFactory} used to create new instances.
     * @throws IllegalStateException when the factory cannot be set at this time
     */
    public synchronized void setFactory(PoolableObjectFactory factory) throws IllegalStateException {
        assertOpen();
        if(0 < getNumActive()) {
            throw new IllegalStateException("Objects are already active");
        } else {
            clear();
            _factory = factory;
        }
    }
}



相关推荐

Global site tag (gtag.js) - Google Analytics