`
拖拖鞋
  • 浏览: 91997 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

实现一个自己的线程池

阅读更多
        原创!转载请注明出处http://ycde2009.iteye.com/blog/2032605
        一直想写线程池,但由于工作时间紧迫,一直没能腾出时间来写,这几天跳槽,正好把它完结掉。
        因为在java这种面向对象的语言中,创建和销毁对象是很费时间的,因为每创建一个对象不仅要在堆内存中占据掉一个句柄,而且还有可能消耗掉其他的资源,虚拟机也会将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术(或者也叫做缓存资源技术)产生的原因,线程池也就是在这种情况下被建造出来。我的想法是在一个线程池中,里面先放一些已经创建好了的线程,池子里面有一个任务队列,当有任务的时候,就让线程去执行,所有任务执行完了以后,线程就睡眠,这里用到了多线程的生产者消费者模式,而且还有一个线程池的管家,来管理线程池里面的线程。这样一想,一个能基本运行的线程池也就想好了。但是我们发现,这样想的话显得我们的线程池不够灵活,因为,它缺少了伸缩性;比如,他不能根据任务的多少来灵活的增加和减少线程数,某个任务失败后也不能重发,最关键的是任务没有优先级。这样一想的话,我们还得在原来的基础上加点东西。
        主要附加功能,可以维护线程池,在指定的空闲时间后维护,目的是:
1、可以根据任务的多少来相应的增减线程数量
2、处理的任务可以有优先级,优先级高的,最先执行
3、重发的功能
        下面我把主要的线程池和任务池的代码贴出来,废话也不多说了。代码里都有注释。需要完整的文件的话,附件里有,可以下载!
package myThreadPoolV3;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

import myThreadPoolV3.inter.ITask;
import myThreadPoolV3.util.DateUtil;
import myThreadPoolV3.util.ThreadPoolException;

/**  
 *   
 * MyThreadPool  
 * 线程池 
 * @author yangchuang  
 * @since 2014-3-15 上午09:45:39    
 * @version 1.0.0  
 * 修改记录
 * 修改日期    修改人    修改后版本    修改内容
 */
public class MyThreadPool {
    /**  
     * maxNumThreadSize:最多的线程数量
     * @since 1.0.0  
     */  
    private int maxNumThreadSize;
    
    /**  
     * minNumThreadSize:最少的线程数量
     * @since 1.0.0  
     */  
    private int minNumThreadSize;
    
    /**  
     * keepAliveTime:线程池维护线程所允许的空闲时间 单位(秒)
     * @since 1.0.0  
     */  
    private int keepAliveTime;
    
    /**  
     * taskPool:任务池
     * 按照优先级来进行存储
     * @since 1.0.0  
     */  
    private TaskPool<ITask> taskPool=new TaskPool<ITask>();
    
    /**  
     * workThreadQuery:线程安全的线程队列
     */  
    private BlockingQueue<WorkThread> workThreadQuery=new LinkedBlockingQueue<WorkThread>();
    
    /**  
     * EXECUTE_TASK_LOCK:执行任务队列的时候,wait()与notiyfAll()的监视对象
     * @since 1.0.0  
     */  
    private static final String EXECUTE_TASK_LOCK=new String("executeTaskLock");
    
    /**  
     * threadFreeSet:空闲线程的ID存放在此
     * @since 1.0.0  
     */  
    private ConcurrentMap<String,String> threadFreeMap=new ConcurrentHashMap<String,String>();
    
    /**  
     * maintainTimer:定时维护线程池的
     * @since 1.0.0  
     */  
    private Timer maintainTimer;
    
      
    /**  
     * 创建一个新的实例 MyThreadPool.  
     *      最多的线程数量=7
     *      最少的线程数量=3
     *      线程池维护线程所允许的空闲时间 单位(秒)=30(秒)
     */
    public MyThreadPool(){
        this.maxNumThreadSize=7;
        this.minNumThreadSize=3;
        this.keepAliveTime=30*DateUtil.DATE_SECOND;
        loadWorkThreadQuery();
        maintainTimer=new Timer();
    }
    
      
    /**  
     * 创建一个新的实例 MyThreadPool.  
     *  
     * @param maxNumThreadSize        最多的线程数量
     * @param minNumThreadSize        最少的线程数量
     * @param keepAliveTime         线程池维护线程所允许的空闲时间 单位(秒)
     * @throws ThreadPoolException  
     */
    public MyThreadPool(int maxNumThreadSize,int minNumThreadSize,int keepAliveTime) throws ThreadPoolException{
        if(minNumThreadSize<1){
            throw new ThreadPoolException("线程池中不能没有线程");
        }
        if(maxNumThreadSize<minNumThreadSize){
            throw new ThreadPoolException("最多的线程数不能少于最少的线程数");
        }
        if(keepAliveTime<1){
            throw new ThreadPoolException("线程池维护线程所允许的空闲时间不能少于1秒");
        }
        this.maxNumThreadSize=maxNumThreadSize;
        this.minNumThreadSize=minNumThreadSize;
        this.keepAliveTime=keepAliveTime*DateUtil.DATE_SECOND;
        loadWorkThreadQuery();
        maintainTimer=new Timer();
    }
    
    private void loadWorkThreadQuery(){
        for(int i=0;i<minNumThreadSize;i++){
            new WorkThread();
        }
    }
    
    private class WorkThread extends Thread{
        private String ID;
        private boolean isWorking;
        private boolean isDied;
        public WorkThread(){
            ID=""+hashCode();
            workThreadQuery.add(this);
            addThreadFreeNum(this.getID());
            System.out.println("新线程被添加++++空闲线程数"+threadFreeMap.size()+"  总的线程数"+workThreadQuery.size()+"  未处理的任务数:"+taskPool.taskSize());
        }
        @Override
        public void run() {
            while(!isDied){
                while(taskPool.taskSize()<1&&!isDied){
                    System.out.println(Thread.currentThread().getName()+"  无任务!睡眠");
                    setStopWorking();
                    try {
                        synchronized(EXECUTE_TASK_LOCK){
                            EXECUTE_TASK_LOCK.wait();
                            System.out.println(Thread.currentThread().getName()+"  被唤醒");
                        }
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                while(taskPool.taskSize()>0&&!isDied){
                    System.out.println(Thread.currentThread().getName()+"  有任务执行;任务总数:"+taskPool.taskSize());
                    setStartWorking();
                    ITask task = taskPool.removeTask();
                    try {
                        if(task!=null){
                            task.stratWork();
                        }
                    }
                    catch (Exception e) {
                        try {
                            taskPool.rejoinTask(task);
                        }
                        catch (Exception e1) {
                            e1.printStackTrace();
                        }
                        e.printStackTrace();
                    }
                }
            }
            System.out.println(Thread.currentThread().getName()+"  死亡");
        }
        public void setStartWorking() {
            this.isWorking = true;
            subtractThreadFreeNum(this.getID());
        }
        public void setStopWorking() {
            this.isWorking = false;
            addThreadFreeNum(this.getID());
        }
        public void setDied(boolean isDied) {
            this.isDied = isDied;
            workThreadQuery.remove(this);
            subtractThreadFreeNum(this.getID());
            synchronized(EXECUTE_TASK_LOCK){
                EXECUTE_TASK_LOCK.notifyAll();
            }
        }
        public String getID() {
            return ID;
        }
        public void setID(String iD) {
            ID = iD;
        }
        
    }
    
    public void execute(){
        for(WorkThread workThread:workThreadQuery){
            workThread.start();
        }
        maintainTimer.schedule(new TimerTask() {
            @Override
            public void run() {
                maintainPool();
            }
        }, 3000, keepAliveTime);
    }
    
    private void maintainPool(){
        System.out.println("开始维护线程池++++++++++++++++");
        System.out.println(this);
        // 任务数量大于空闲线程数量,并且总线程数量小于线程池的规定的最大值,就新增线程
        if(taskPool.taskSize()>this.threadFreeMap.size()&&workThreadQuery.size()<maxNumThreadSize){
            // 最多再允许创建的线程数
            int maxCreatThreadNum=maxNumThreadSize-workThreadQuery.size();
            // 需要创建的线程数
            int needCreatThreadNum=taskPool.taskSize()-this.threadFreeMap.size();
            // 实际的创建数目
            int creatThreadNum=needCreatThreadNum>maxCreatThreadNum?maxCreatThreadNum:needCreatThreadNum;
            for(int i=0;i<creatThreadNum;i++){
                WorkThread workThread=new WorkThread();
                workThread.start();
            }
        }
        // 空闲线程多于任务队列的数量,并且总的线程多余最小的线程个数,就要减少线程
        if(taskPool.taskSize()<this.threadFreeMap.size()&&workThreadQuery.size()>minNumThreadSize){
            // 允许销毁的最大线程数
            int maxDestroyThreadNum=workThreadQuery.size()-minNumThreadSize;
            // 需要销毁的线程数
            int needDestroyThreadNum=this.threadFreeMap.size()-taskPool.taskSize();
            // 实际的销毁数目
            int destroyThreadNum=needDestroyThreadNum>maxDestroyThreadNum?maxDestroyThreadNum:needDestroyThreadNum;
            for(int i=0;i<destroyThreadNum;i++){
                for(WorkThread workThread:workThreadQuery){
                    if(!workThread.isWorking){
                        workThread.setDied(true);
                        break;
                    }
                }
            }
        }
        System.out.println("维护后的信息");
        System.out.println(this);
        System.out.println("结束维护线程池++++++++++++++++");
    }
    
    public void addTask(ITask task) throws Exception{
        taskPool.addTask(task);
        synchronized(EXECUTE_TASK_LOCK){
            EXECUTE_TASK_LOCK.notifyAll();
        }
    }
    
    public int getMaxNumThreadSize() {
        return maxNumThreadSize;
    }
    public void setMaxNumThreadSize(int maxNumThreadSize) {
        this.maxNumThreadSize = maxNumThreadSize;
    }
    public int getMinNumThreadSize() {
        return minNumThreadSize;
    }
    public void setMinNumThreadSize(int minNumThreadSize) {
        this.minNumThreadSize = minNumThreadSize;
    }
    public int getKeepAliveTime() {
        return keepAliveTime;
    }
    public void setKeepAliveTime(int keepAliveTime) {
        this.keepAliveTime = keepAliveTime;
    }

    public synchronized void addThreadFreeNum(String id) {
        this.threadFreeMap.put(id,id);
    }
    public synchronized void subtractThreadFreeNum(String id) {
        this.threadFreeMap.remove(id,id);
    }
    @Override
    public String toString(){
        String showInfo="%%%%%%%%空闲的线程数"+threadFreeMap.size()+"  总的线程数"+workThreadQuery.size()+"  未处理的任务数:"+taskPool.taskSize();
        showInfo+="\n%%%%%%%%threadFreeSet:"+threadFreeMap;
        showInfo+="\n%%%%%%%%workThreadQuery:"+workThreadQuery;
        return showInfo;
    }
}

package myThreadPoolV3;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

import myThreadPoolV3.inter.ITask;
import myThreadPoolV3.util.ThreadPoolException;

/**  
 *   
 * TaskPool  
 * 任务池
 * @author yangchuang  
 * @since 2014-3-15 上午09:53:55    
 * @version 1.0.0  
 * 修改记录
 * 修改日期    修改人    修改后版本    修改内容
 */
public class TaskPool<T extends ITask> {
    /**  
     * taskMap:线程安全的任务Map集合
     * 按照优先级来进行存储
     * @since 1.0.0  
     */  
    private ConcurrentMap<Integer,BlockingQueue<T>> taskMap;
    
    /**  
     * rejoinTaskMap:重发记录表,任务执行失败后,再次加入任务队列的时候,现在这里记录
     * @since 1.0.0  
     */  
    private ConcurrentMap<T,Integer> rejoinTaskMap;
      
    /**  
     * 创建一个新的实例 TaskPool.  
     */
    public TaskPool(){
        this.taskMap=new ConcurrentHashMap<Integer,BlockingQueue<T>>();
        this.taskMap.put(ITask.MAX_PRIORITY, new LinkedBlockingQueue<T>());
        this.taskMap.put(ITask.NORM_PRIORITY, new LinkedBlockingQueue<T>());
        this.taskMap.put(ITask.MIN_PRIORITY, new LinkedBlockingQueue<T>());
        this.rejoinTaskMap=new ConcurrentHashMap<T,Integer>();
    }
    
    /**  
     * taskSize 获取当前任务池中的任务数量
     * @return  当前任务池中的任务数量
     * @since  1.0.0  
    */
    public int taskSize(){
        return this.taskMap.get(T.MAX_PRIORITY).size()
                +this.taskMap.get(T.NORM_PRIORITY).size()
                +this.taskMap.get(T.MIN_PRIORITY).size();
    }
    
    /**  
     * addTask      添加任务
     * 任务必须设置优先级,并且优先级只能在ITask接口内的三个优先级的值之一
     *      MAX_PRIORITY:最高优先级
     *      NORM_PRIORITY:默认优先级
     *      MIN_PRIORITY:最低优先级
     * @param task  
     * @throws Exception 当该task任务的优先级不在规定的范围,抛出此异常
     * @since  1.0.0  
    */
    public void addTask(T task) throws Exception{
        if(this.taskMap.containsKey(task.getPriority())){
            this.taskMap.get(task.getPriority()).add(task);
            return;
        }
        throw new ThreadPoolException("优先级的值设置有误");
    }
    
    /**  
     * romoveTask      获取任务从优先级高的开始
     * @param task  
     * @throws Exception 
     * @since  1.0.0  
    */
    public synchronized ITask removeTask(){
        if(this.taskMap.get(T.MAX_PRIORITY).size()>0){
            return this.taskMap.get(T.MAX_PRIORITY).remove();
        }
        if(this.taskMap.get(T.NORM_PRIORITY).size()>0){
            return this.taskMap.get(T.NORM_PRIORITY).remove();
        }
        if(this.taskMap.get(T.MIN_PRIORITY).size()>0){
            return this.taskMap.get(T.MIN_PRIORITY).remove();
        }
        return null;
    }
    
    /**  
     * rejoin   当任务执行失败后,再次将他加入任务池
     * @param task
     * @throws Exception 
     * @since  1.0.0  
    */
    public synchronized void rejoinTask(T task) throws Exception{
        // 如果再次执行的Map集合没有该任务那么就添加进去
        if(!rejoinTaskMap.containsKey(task)){
            rejoinTaskMap.put(task, 1);
        }
        // 如果该任务的再次执行的次数大于他设置的值,那么就将其移除出再次执行的Map集合
        if(rejoinTaskMap.get(task)>task.getMaxAgainExecuteNum()){
            rejoinTaskMap.remove(task);
            return;
        }
        // 否则的话,将其加一,再放入任务队列
        else{
            rejoinTaskMap.put(task, rejoinTaskMap.get(task)+1);
        }
        this.taskMap.get(task.getPriority()).add(task);
    }
    
}


需要完整的文件的话,附件里有,可以下载!
欢迎大家多多交流!谢谢!
分享到:
评论

相关推荐

    一个简单线程池的实现

    当有新的任务提交时,线程池会选择一个空闲的线程来执行任务,而不是每次都需要创建新的线程,这样可以避免频繁的线程创建和销毁带来的开销。 `twork_work.cpp`和`twork_work.h`文件可能定义了一个工作单元(Work ...

    Windows下一个比较完美的线程池实现和示例

    Windows下一个比较完美的线程池实现和示例 本线程池提供了如下功能: 1.能根据任务个数和当前线程的多少在最小/最大线程个数之间自动调整(Vista后的系统有 SetThreadpoolThreadMaximum 等函数有类似功能); 2.能方便...

    这是一个使用C++实现的简易线程池.zip

    这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易...

    线程池原理及创建(C++实现)

    4. **任务接口 (CJob)**:这是一个抽象基类,定义了一个`Run`方法,所有的任务类都需要从它继承并实现这个方法。`Run`方法包含任务的具体逻辑。 5. **任务队列**:用于存储待处理任务的数据结构。线程池中的工作线程...

    Java 自己实现线程池

    Java开发,Android开发,自己实现线程池,明白线程池的实现机制

    linux 实现一个简单的线程池及工作

    4. **Linux下实现线程池的工具和API** - **pthread库**:提供线程创建、同步等函数,如`pthread_create()`、`pthread_join()`、`pthread_mutex_t`和`pthread_cond_t`等。 - **标准C库**:如`queue.h`(链表)或...

    C++实现线程池详解(基于boost源码以及封装等线程池)

    二、实现线程池可以按照以下步骤进行 三、简单的C++线程池代码示例 四、 基于boost编写的源码库 - 线程池 4.1 基于boost编写的源码库地址 4.2 boost线程池的先进先出、后进先出、优先级代码示例 五、看看人家线程池...

    线程池的VC实现例子

    而"threadpool.rar"可能是另一个线程池的实现版本,"VC实现线程池.txt"很可能是对VC环境下线程池实现的文本说明。 通过学习和分析这些文件,我们可以更深入地理解线程池的原理,以及如何在VC环境中有效地利用线程池...

    Java实现通用线程池

    PooledThread是一个线程类,继承自Thread类,用于实现线程池中的线程;ThreadPool是一个线程池类,用于管理线程池中的线程。 ThreadTask接口定义了一个run方法,用于执行线程池任务;PooledThread类继承自Thread类...

    线程池.zip,互斥锁+条件变量+队列,实现线程池,包括线程池的创建,塞任务,和销毁线程池

    在这个项目中,"thread_pool.c"可能是实现线程池核心功能的源代码,"thread_mutex.c"涉及互斥锁的相关操作,"thread_cond.c"实现了条件变量的功能,"main.c"是程序的入口,负责初始化线程池、提交任务以及销毁线程池...

    仿ACE线程池机制实现的线程池类

    CManager维护一个任务队列,根据线程池的策略(如固定大小、动态调整等)决定何时创建新的线程,何时销毁空闲线程,以及如何将任务分发给线程池中的工作者。调度器可能还需要监控线程池的状态,防止过多的线程导致...

    使用Vector实现简单线程池

    在实际项目中,使用标准库提供的`ExecutorService`和`ThreadPoolExecutor`通常比自己实现线程池更安全、更可靠,因为它们经过了大量的测试和优化,具有丰富的功能和良好的社区支持。但是,自定义线程池的实现可以...

    线程池的简单实现

    线程池是多线程编程中的一个重要概念,它在服务器端程序中被广泛使用,以提高资源利用率和系统性能。线程池通过预先创建并管理一定数量的线程,可以有效地减少线程创建和销毁的开销,同时提供了一种灵活的线程调度和...

    C语言实现的简单线程池

    C语言实现的简单线程池

    VC++ 线程池(ThreadPool)实现

    本文将深入探讨VC++中线程池的实现,并提供一个简单的实现示例。 线程池的基本概念: 线程池是由一组工作线程组成的集合,这些线程预先创建并处于待命状态,等待执行由应用程序提交的任务。当一个任务被提交到...

    自定义实现Java线程池1-模拟jdk线程池执行流程1

    我们也将遵循这个设计,实现一个简单的线程池类`SampleThreadPoolExecutor`,它继承自`Executor`接口。 在`SampleThreadPoolExecutor`类中,我们需要关注以下几个关键属性: 1. `wc`: 工作中的线程数量 2. `...

    Qt实现线程池开发实例

    Qt是一个跨平台的C++图形用户界面应用程序开发框架,它提供了丰富的功能和工具,包括对多线程的支持。在实际应用中,线程池是一种有效的管理并发任务的方式,它可以优化系统资源的利用,提高程序的响应速度。本实例...

    简单C++线程池实现

    本文将深入探讨如何实现一个简单的线程池,并通过提供的文件名`_threadpool.cpp`, `_promain.cpp`, `_threadpool.h`来解释其核心概念和组件。 首先,`_threadpool.h`是头文件,通常包含类定义和相关函数声明。...

    C++实现的线程池

    在VC++ 2010环境下实现C++线程池,需要关注该编译器对C++11标准的支持程度,可能需要使用特定的编译选项或者第三方库如PPL(Parallel Pattern Library)来实现线程池功能。对于初学者,理解并实现这样一个线程池可以...

Global site tag (gtag.js) - Google Analytics