`
ispring
  • 浏览: 359420 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

线程池的实现

    博客分类:
  • Java
阅读更多
设计目标
     提供一个线程池的组件,具有良好的伸缩性,当线程够用时,销毁不用线程,当线程不够用时,自动增加线程数量;
     提供一个工作任务接口和工作队列,实际所需要的任务都必须实现这个工作任务接口,然后放入工作队列中;
     线程池中的线程从工作队列中,自动取得工作任务,执行任务。
主要控制类和功能接口设计

线程池管理器 ThreadPoolManager 的功能:
     管理线程池中的各个属性变量
     最大工作线程数
     最小工作线程数
     激活的工作线程总数
     睡眠的工作线程总数
     工作线程总数 (即:激活的工作线程总数+睡眠的工作线程总数)
    创建工作线程
     销毁工作线程
     启动处于睡眠的工作线程
     睡眠处于激活的工作线程
     缩任务:当工作线程总数小于或等于最小工作线程数时,销毁多余的睡眠的工作线程,使得现有工作线程总数等于最小工作任务总数
     伸任务:当任务队列任务总数大于工作线程数时,增加工作线程总数至最大工作线程数
     提供线程池启动接口
     提供线程池销毁接口

工作线程 WorkThread  的功能:
     从工作队列取得工作任务
     执行工作任务接口中的指定任务
工作任务接口 ITask   的功能:
     提供指定任务动作

工作队列 IWorkQueue  的功能:
     提供获取任务接口,并删除工作队列中的任务;
     提供加入任务接口;
     提供删除任务接口;
     提供取得任务总数接口;
     提供自动填任务接口;(当任务总数少于或等于默认总数的25%时,自动装填)
     提供删除所有任务接口;

[ ThreadPoolManager ]
package test.thread.pool1;
import java.util.ArrayList;
import java.util.List;
import test.thread.pool1.impl.MyWorkQueue;

/**
 * <p>Title: 线程池管理器</p>
 * <p>Description: </p>
 * @version 1.0
 */

public class ThreadPoolManager {
    /*最大线程数*/
    private int threads_max_num;

    /*最小线程数*/
    private int threads_min_num;
  
    /* 线程池线程增长步长 */
    private int threads_increase_step = 5;

    /* 任务工作队列 */
    private IWorkQueue queue;
  
    /* 线程池监视狗 */
    private PoolWatchDog poolWatchDog ;
  
    /* 队列线程 */
    private Thread queueThread ;
  
    /* 线程池 封装所有工作线程的数据结构 */
    private List pool = new ArrayList();
  
    /* 线程池中 封装所有钝化后的数据结构*/
    private List passivePool = new ArrayList();
  
    /* 空闲60秒 */
    private static final long IDLE_TIMEOUT = 60000L;
  
    /* 关闭连接池标志位 */
    private boolean close = false;
  
    /**
     * 线程池管理器
     * @param queue 任务队列
     * @param threads_min_num 工作线程最小数
     * @param threads_max_num 工作线程最大数
     */
    public ThreadPoolManager(int threads_max_num
                          ,int threads_min_num
                          ,IWorkQueue queue){
        this.threads_max_num = threads_max_num;
        this.threads_min_num = threads_min_num;
        this.queue = queue;    
    }

    /**
     * 线程池启动
     */
    public void startPool(){
        System.out.println("=== startPool..........");
        poolWatchDog = new PoolWatchDog("PoolWatchDog");
        poolWatchDog.setDaemon(true);
        poolWatchDog.start();
        System.out.println("=== startPool..........over");
  }

    /**
     * 线程池销毁接口
     */
    public void destoryPool(){
        System.out.println("==========================DestoryPool starting ...");
        this.close = true;
        int pool_size = this.pool.size();
    
        //中断队列线程
         System.out.println("===Interrupt queue thread ... ");
        queueThread.interrupt();
        queueThread = null;
    
        System.out.println("===Interrupt thread pool ... ");
        Thread pool_thread = null;
        for(int i=0; i<pool_size; i++){
            pool_thread = (Thread)pool.get(i);
            if(pool_thread !=null 
              && pool_thread.isAlive() 
              && !pool_thread.isInterrupted()){
                  pool_thread.interrupt();
                  System.out.println("Stop pool_thread:"
                          +pool_thread.getName()+"[interrupt] "
                          +pool_thread.isInterrupted());
            }
        }//end for
    
        if(pool != null){
            pool.clear();
        }
        if(passivePool != null){
            pool.clear();
        }
    
        try{
            System.out.println("=== poolWatchDog.join() starting ...");
            poolWatchDog.join();
            System.out.println("=== poolWatchDog.join() is over ...");
        }
        catch(Throwable ex){
            System.out.println("###poolWatchDog ... join method throw a exception ... " + ex.toString());
        }
    
        poolWatchDog =null;
            System.out.println("==============================DestoryPool is over ...");    
    }
  
  
    public static void main(String[] args) throws Exception{
        ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
    
        threadPoolManager1.startPool();
        Thread.sleep(60000);
        threadPoolManager1.destoryPool();
    }



[ PoolWatchDog ]
  /**
   * 线程池监视狗
   */
  private class PoolWatchDog extends Thread{
    public PoolWatchDog(String name){
      super(name);
    }
  
    public void run(){
      Thread workThread = null;
      Runnable run = null;
      
      //开启任务队列线程,获取数据--------
      System.out.println("===QueueThread starting ... ... ");
      queueThread = new Thread(new QueueThread(),"QueueThread");
      queueThread.start();
      
      System.out.println("===Initial thread Pool ... ...");
      //初始化线程池的最小线程数,并放入池中
      for(int i=0; i<threads_min_num; i++){
        run = new WorkThread();
        workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
        workThread.start();
        if(i == threads_min_num -1){
          workThread = null;
          run = null;
        }
      }
      System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());

      //线程池线程动态增加线程算法--------------
      while(!close){
      
        //等待5秒钟,等上述线程都启动----------
        synchronized(this){          
          try{
            System.out.println("===Wait the [last time] threads starting ....");
            this.wait(15000);
          }
          catch(Throwable ex){
            System.out.println("###PoolWatchDog invoking is failure ... "+ex);
          }
        }//end synchronized
          
        //开始增加线程-----------------------spread动作
        int queue_size = queue.getTaskSize();
        int temp_size = (queue_size - threads_min_num);
        
        if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
          System.out.println("================Spread thread pool starting ....");
          for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
            System.out.println("=== Spread thread num : "+i);
            run = new WorkThread();
            workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
            workThread.start();
          }//end for
          
          workThread = null;
          run = null;    
          System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
        }//end if
          
        //删除已经多余的睡眠线程-------------shrink动作
        int more_sleep_size = pool.size() - threads_min_num;//最多能删除的线程数
        int sleep_threads_size = passivePool.size();
        if(more_sleep_size >0 && sleep_threads_size >0){
          System.out.println("================Shrink thread pool starting ....");        
          for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
            System.out.println("=== Shrink thread num : "+i);
            Thread removeThread = (Thread)passivePool.get(0);
            if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
              removeThread.interrupt();
            }
          }
          System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());          
        }

        System.out.println("===End one return [shrink - spread operator] ....");    
      }//end while
    }//end run 
  }//end private class



[ WorkThread ]
  /**
   * 工作线程
   */
  class WorkThread implements Runnable{
  
    public WorkThread(){
    }
  
    public void run(){
      String name = Thread.currentThread().getName();
      System.out.println("===Thread.currentThread():"+name);
      pool.add(Thread.currentThread());    
    
      while(true){
      
        //获取任务---------
        ITask task = null;
        try{
          System.out.println("===Get task from queue is starting ... ");
          //看线程是否被中断,如果被中断停止执行任务----
          if(Thread.currentThread().isInterrupted()){
            System.out.println("===Breaking current thread and jump whlie [1] ... ");
            break;
          }
          task = queue.getTask();
        }
        catch(Throwable ex){
          System.out.println("###No task in queue:"+ex);
        }//end tryc
        
        if(task != null){
          //执行任务---------
          try{
            System.out.println("===Execute the task is starting ... ");
            //看线程是否被中断,如果被中断停止执行任务----
            if(Thread.currentThread().isInterrupted()){
              System.out.println("===Breaking current thread and jump whlie [1] ... ");
              break;
            }     
            task.executeTask();
            //任务执行完毕-------
            System.out.println("===Execute the task is over ... ");
          }
          catch(Throwable ex){
            System.out.println("###Execute the task is failure ... "+ex);
          }//end tryc
          
        }else{
          //没有任务,则钝化线程至规定时间--------
          synchronized(this){
            try{
              System.out.println("===Passivate into passivePool ... ");
              
              //看线程是否被中断,如果被中断停止执行任务----
              boolean isInterrupted = Thread.currentThread().isInterrupted();
              if(isInterrupted){
                System.out.println("===Breaking current thread and jump whlie [1] ... ");
                break;
              }
//              passivePool.add(this);
            passivePool.add(Thread.currentThread());

              
              //准备睡眠线程-------
              isInterrupted = Thread.currentThread().isInterrupted();
              if(isInterrupted){
                System.out.println("===Breaking current thread and jump whlie [2] ... ");
                break;
              }              
              this.wait(IDLE_TIMEOUT);
            }
            catch(Throwable ex1){
              System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
              break;
            }
          }          
        }        
      }//end while--------
      
      if(pool.contains(passivePool)){
        pool.remove(this);
      }
      if(passivePool.contains(passivePool)){
        passivePool.remove(this);
      }
      System.out.println("===The thread execute over ... "); 
    }//end run----------
  }



[ QueueThread ]
class QueueThread implements Runnable{
  
    public QueueThread(){
    }
  
    public void run(){
      while(true){
        //自动装在任务--------
        queue.autoAddTask();
        System.out.println("===The size of queue's task is "+queue.getTaskSize());
      
        synchronized(this){
          if(Thread.currentThread().isInterrupted()){
            break;
          }else{
              try{
                this.wait(queue.getLoadDataPollingTime());
              }
              catch(Throwable ex){
                System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
                break;
              }
          }//end if
        }//end synchr
        
      }//end while
    }//end run
  } 
}


[ WorkQueue ]
package test.thread.pool1;

import java.util.LinkedList;
import test.thread.pool1.impl.MyTask;

/**
 * <p>Title: 工作队列对象 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public abstract class WorkQueue implements IWorkQueue{
  /* 预计装载量 */
  private int load_size;
  
  /* 数据装载轮循时间 */
  private long load_polling_time;
  
  /* 队列 */
  private LinkedList queue = new LinkedList();
  
  /**
   * 
   * @param load_size 预计装载量
   * @param load_polling_time 数据装载轮循时间
   */
  public WorkQueue(int load_size,long load_polling_time){
    this.load_size = (load_size <= 10) ? 10 : load_size;
    this.load_polling_time = load_polling_time;
  }

  /* 数据装载轮循时间 */
  public long getLoadDataPollingTime(){
    return this.load_polling_time;
  }


  /*获取任务,并删除队列中的任务*/
  public synchronized ITask getTask(){
    ITask task = (ITask)queue.getFirst();
    queue.removeFirst();
    return task;
  }

  /*加入任务*/
  public void  addTask(ITask task){
    queue.addLast(task);
  }

  /*删除任务*/
  public synchronized void removeTask(ITask task){
    queue.remove(task);
  }

  /*任务总数*/
  public synchronized int getTaskSize(){
    return queue.size();
  }

  /*自动装填任务*/
  public synchronized void autoAddTask(){
  
    synchronized(this){
      float load_size_auto = load_size - getTaskSize() / load_size;
      System.out.println("===load_size_auto:"+load_size_auto);
      
      if(load_size_auto > 0.25){        
        autoAddTask0();
      }
      else {
        System.out.println("=== Not must load new work queue ... Now! ");
      }    
    }
  }

  /*删除所有任务*/
  public synchronized void clearAllTask(){
    queue.clear();
  }
  
  /**
   * 程序员自己实现该方法
   */
  protected abstract void autoAddTask0();
}



[ MyWorkQueue ]
package test.thread.pool1.impl;

import java.util.LinkedList;
import test.thread.pool1.WorkQueue;

/**
 * <p>Title: 例子工作队列对象 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MyWorkQueue extends WorkQueue{

  /**
   * @param load_size 预计装载量
   * @param load_polling_time 数据装载轮循时间
   */
  public MyWorkQueue(int load_size,long load_polling_time){
    super(load_size,load_polling_time);
  }

  /**
   * 自动加载任务
   */
  protected synchronized void autoAddTask0(){
    //-------------------
    System.out.println("===MyWorkQueue ...  invoked autoAddTask0() method ...");
    for(int i=0; i<10; i++){
      System.out.println("===add task :"+i);
      this.addTask(new MyTask());
    }    
    //-------------------
  }
}




[ MyTask ]
package test.thread.pool1.impl;
import test.thread.pool1.ITask;

/**
 * <p>Title: 工作任务接口 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MyTask implements ITask {

  /**
   * 执行的任务
   * @throws java.lang.Throwable
   */
  public void executeTask() throws Throwable{
    System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
  }
}

分享到:
评论
1 楼 hpjianhua 2010-10-08  
学习下...

相关推荐

    Windows下一个比较完美的线程池实现和示例

    Windows下一个比较完美的线程池实现和示例 本线程池提供了如下功能: 1.能根据任务个数和当前线程的多少在最小/最大线程个数之间自动调整(Vista后的系统有 SetThreadpoolThreadMaximum 等函数有类似功能); 2.能方便...

    Django异步任务线程池实现原理

    Django异步任务线程池实现原理主要涉及以下几个核心知识点: 1. 异步任务执行原理: 当Django应用在处理耗时的任务时,通常会阻塞主线程,导致用户在等待处理结果时无法进行其他操作。为了解决这个问题,Django采用...

    通过EasyExcel+线程池实现百万级数据从Excel导入到数据库

    在处理大量数据导入数据库的场景中,使用...通过以上步骤,可以利用EasyExcel和线程池实现百万级数据从Excel导入到数据库的功能。这种方式可以提高数据处理的效率,减少内存占用,并且能够更好地利用多核CPU的优势。

    简单C++线程池实现

    总结来说,这个简单的C++线程池实现是一个学习多线程和并发编程的好起点。它通过封装线程管理和任务调度,为开发者提供了一种更高效、更可控的方式来处理并发任务。在实际应用中,线程池可以被扩展以适应更复杂的...

    Linux C系统编程:使用线程池实现cp命令

    总结起来,Linux C系统编程中使用线程池实现类似`cp`命令的功能,是一个涉及多线程编程、任务调度和同步控制的综合实践。通过这样的实现,我们可以提高文件复制操作的并发性和效率,同时降低系统资源的消耗。在深入...

    c++线程池实现原理分析

    ### C++线程池实现原理分析 #### 一、引言 线程池是一种软件设计模式,用于管理和控制大量线程的创建与销毁,尤其是在处理大量短期任务时,它可以显著提高程序性能。线程池的核心思想是预先创建一组线程,并让它们...

    java 线程池实现多并发队列后进先出

    在"java 线程池实现多并发队列后进先出"这个主题中,我们关注的是线程池如何利用特定类型的队列来实现后进先出(LIFO,Last-In-First-Out)的行为。通常,线程池默认使用先进先出(FIFO,First-In-First-Out)的队列...

    Android下用线程池实现Http下载程序

    5. **异步下载**:通过线程池实现的下载是异步的,这意味着主线程不会被阻塞,用户界面仍然可以保持流畅。这是在Android中进行网络操作时必须遵循的原则,因为网络操作在主线程上执行会导致ANR(应用无响应)错误。 ...

    Python的线程池实现

    在本篇文章中,我们将深入探讨Python中的线程池实现,并参考提供的`ThreadPool.py`源码进行分析。 首先,Python标准库提供了一个名为`concurrent.futures`的模块,其中包含`ThreadPoolExecutor`类,它是实现线程池...

    基于win32的C++线程池实现

    以下将详细讲解基于Win32的C++线程池实现的关键概念和技术。 首先,我们需要理解Win32 API中的线程池接口。Windows提供了CreateThreadpool、SetThreadpoolCallbackPool、QueueUserWorkItem等函数来创建和管理线程池...

    使用线程池实现的Http上传下载实现

    前段时间发布了《Windows下一个比较完美的线程池实现和示例》(http://download.csdn.net/detail/fishjam/5106672),根据下载量和评论来看,对大家还比较有用。 现在发布一个利用该线程池实现的Http上传下载实现,...

    Java版线程池实现

    不过,上述代码展示的是一个自定义的线程池实现,它可能没有使用Java标准库中的`ExecutorService`。 这个自定义线程池的实现包括以下几个关键组件: 1. **线程池参数**: - `reserve`:保留线程数,这些线程不...

    基于UNIX C语言的一种线程池实现.pdf

    "基于UNIX C语言的一种线程池实现" 一、标题解释 "基于UNIX C语言的一种线程池实现",这篇文章介绍了在UNIX操作系统下使用C语言实现的一个线程池方案。线程池是一种常用的设计模式,能够提高系统的性能和可扩展性。...

    c++ 线程池实现及安全队列

    下面是一个简化的线程池实现的伪代码: ```cpp class ThreadPool { public: void enqueue(std::function()&gt; task); // 提交任务 ~ThreadPool(); // 关闭线程池 private: std::vector&lt;std::thread&gt; workers; // ...

    基于win32的C++线程池实现(改进版)

    在"基于win32的C++线程池实现(改进版)"中,开发者已经针对上一版本的问题进行了修复,如崩溃和内存泄漏。这些问题是多线程编程中常见的挑战,崩溃可能是由于线程间的同步问题或者资源管理不当,而内存泄漏则可能导致...

    一个linux下的socket线程池实现

    本项目涉及的核心知识点是“Linux下的socket线程池实现”,这涉及到多个技术层面,包括socket编程、多线程技术和线程池的管理。 首先,让我们了解什么是Socket。Socket是网络通信的基本接口,它允许应用程序通过...

    Tomcat线程池实现简介

    Tomcat提供了两种线程池实现,一种是基于Apache Portable Runtime (APR)的Pool技术,另一种是纯Java实现的ThreadPool。本文主要探讨后者,即Java实现的线程池。 Java实现的线程池位于`tomcat-util.jar`中,初始化时...

    基于现代C++的高效线程池实现源码+项目文档说明(兼容Linux、macOS 和 Windows系统).zip

    基于现代C++的高效线程池实现源码+项目文档说明(兼容Linux、macOS 和 Windows系统).zip 【说明】 【1】项目代码完整且功能都验证ok,确保稳定可靠运行后才上传。欢迎下载使用!在使用过程中,如有问题或建议,请及时...

    线程池实现

    线程池实现

    socket 线程池实现(已经在项目中应用)

    本篇文章将深入讲解如何在Socket服务端实现线程池,以及其在实际项目中的应用。 首先,我们来看`SocketServer.java`。这个文件通常包含了服务器端的主逻辑,其中包括了服务器的启动、监听客户端连接和创建线程池来...

Global site tag (gtag.js) - Google Analytics