`
高级java工程师
  • 浏览: 412280 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java线程池

阅读更多
Java实现通用线程池

      线程池通俗的描述就是预先创建若干空闲线程,等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务,这样就省去了频繁创建线程的时间,因为频 繁创建线程是要耗费大量的CPU资源的。如果一个应用程序需要频繁地处理大量并发事务,不断的创建销毁线程往往会大大地降低系统的效率,这时候线程池就派 上用场了。
      本文旨在使用Java语言编写一个通用的线程池。当需要使用线程池处理事务时,只需按照指定规范封装好事务处理对象,然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可。并实现线程池的动态修改(修改当前线程数,最大线程数等)。下面是实现代码:
//ThreadTask .java
 package polarman.threadpool;
 
  /**
  * 线程任务
  * @author ryang
  * 2006-8-8
  */
  public interface ThreadTask {
     public void run();
 }


//PooledThread.java
 package polarman.threadpool;
 
 import java.util.Collection;
 import java.util.Vector;
 
  /**
  * 接受线程池管理的线程
  * @author ryang
  * 2006-8-8
  */
  public class PooledThread extends Thread {
    
     protected Vector tasks = new Vector();
     protected boolean running = false;
     protected boolean stopped = false;
     protected boolean paused = false;
     protected boolean killed = false;
     private ThreadPool pool;
    
      public PooledThread(ThreadPool pool){
         this.pool = pool;
     }
    
      public void putTask(ThreadTask task){
         tasks.add(task);
     }
    
      public void putTasks(ThreadTask[] tasks){
         for(int i=0; i<tasks.length; i++)
             this.tasks.add(tasks[i]);
     }
    
      public void putTasks(Collection tasks){
         this.tasks.addAll(tasks);
     }
    
      protected ThreadTask popTask(){
         if(tasks.size() > 0)
             return (ThreadTask)tasks.remove(0);
         else
             return null;
     }
    
      public boolean isRunning(){
         return running;
     }
    
      public void stopTasks(){
         stopped = true;
     }
    
      public void stopTasksSync(){
         stopTasks();
          while(isRunning()){
              try {
                 sleep(5);
              } catch (InterruptedException e) {
             }
         }
     }
    
      public void pauseTasks(){
         paused = true;
     }
    
      public void pauseTasksSync(){
         pauseTasks();
          while(isRunning()){
              try {
                 sleep(5);
              } catch (InterruptedException e) {
             }
         }
     }
    
      public void kill(){
         if(!running)
             interrupt();
         else
             killed = true;
     }
    
      public void killSync(){
         kill();
          while(isAlive()){
              try {
                 sleep(5);
              } catch (InterruptedException e) {
             }
         }
     }
    
      public synchronized void startTasks(){
         running = true;
         this.notify();
     }
    
      public synchronized void run(){
          try{
              while(true){
                  if(!running || tasks.size() == 0){
                     pool.notifyForIdleThread();
                     //System.out.println(Thread.currentThread().getId() + ": 空闲");
                     this.wait();
                  }else{
                     ThreadTask task;
                      while((task = popTask()) != null){
                         task.run();
                          if(stopped){
                             stopped = false;
                              if(tasks.size() > 0){
                                 tasks.clear();
                                 System.out.println(Thread.currentThread().getId() + ": Tasks are stopped");
                                 break;
                             }
                         }
                          if(paused){
                             paused = false;
                              if(tasks.size() > 0){
                                 System.out.println(Thread.currentThread().getId() + ": Tasks are paused");
                                 break;
                             }
                         }
                     }
                     running = false;
                 }
 
                  if(killed){
                     killed = false;
                     break;
                 }
             }
          }catch(InterruptedException e){
             return;
         }
        
         //System.out.println(Thread.currentThread().getId() + ": Killed");
     }
 }


//ThreadPool.java
 package polarman.threadpool;
 
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Vector;
 
  /**
  * 线程池
  * @author ryang
  * 2006-8-8
  */
  public class ThreadPool {
    
     protected int maxPoolSize;
     protected int initPoolSize;
     protected Vector threads = new Vector();
     protected boolean initialized = false;
     protected boolean hasIdleThread = false;
    
      public ThreadPool(int maxPoolSize, int initPoolSize){
         this.maxPoolSize = maxPoolSize;
         this.initPoolSize = initPoolSize;
     }
    
      public void init(){
         initialized = true;
          for(int i=0; i<initPoolSize; i++){
             PooledThread thread = new PooledThread(this);
             thread.start();
             threads.add(thread);
         }
        
         //System.out.println("线程池初始化结束,线程数=" + threads.size() + " 最大线程数=" + maxPoolSize);
     }
    
      public void setMaxPoolSize(int maxPoolSize){
         //System.out.println("重设最大线程数,最大线程数=" + maxPoolSize);
         this.maxPoolSize = maxPoolSize;
         if(maxPoolSize < getPoolSize())
             setPoolSize(maxPoolSize);
     }
    
      /**
      * 重设当前线程数
      * 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成
      * 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
      * @param size
      */
      public void setPoolSize(int size){
          if(!initialized){
             initPoolSize = size;
             return;
          }else if(size > getPoolSize()){
              for(int i=getPoolSize(); i<size && i<maxPoolSize; i++){
                 PooledThread thread = new PooledThread(this);
                 thread.start();
                 threads.add(thread);
             }
          }else if(size < getPoolSize()){
              while(getPoolSize() > size){
                 PooledThread th = (PooledThread)threads.remove(0);
                 th.kill();
             }
         }
        
         //System.out.println("重设线程数,线程数=" + threads.size());
     }
    
      public int getPoolSize(){
         return threads.size();
     }
    
      protected void notifyForIdleThread(){
         hasIdleThread = true;
     }
    
      protected boolean waitForIdleThread(){
         hasIdleThread = false;
          while(!hasIdleThread && getPoolSize() >= maxPoolSize){
              try {
                 Thread.sleep(5);
              } catch (InterruptedException e) {
                 return false;
             }
         }
        
         return true;
     }
    
      public synchronized PooledThread getIdleThread(){
          while(true){
              for(Iterator itr=threads.iterator(); itr.hasNext();){
                 PooledThread th = (PooledThread)itr.next();
                 if(!th.isRunning())
                     return th;
             }
            
              if(getPoolSize() < maxPoolSize){
                 PooledThread thread = new PooledThread(this);
                 thread.start();
                 threads.add(thread);
                 return thread;
             }
            
             //System.out.println("线程池已满,等待...");
             if(waitForIdleThread() == false)
                 return null;
         }
     }
    
      public void processTask(ThreadTask task){
         PooledThread th = getIdleThread();
          if(th != null){
             th.putTask(task);
             th.startTasks();
         }
     }
    
      public void processTasksInSingleThread(ThreadTask[] tasks){
         PooledThread th = getIdleThread();
          if(th != null){
             th.putTasks(tasks);
             th.startTasks();
         }
     }
    
      public void processTasksInSingleThread(Collection tasks){
         PooledThread th = getIdleThread();
          if(th != null){
             th.putTasks(tasks);
             th.startTasks();
         }
     }
 }
 
 

下面是线程池的测试程序
//ThreadPoolTest.java
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
 import polarman.threadpool.ThreadPool;
 import polarman.threadpool.ThreadTask;
 
  public class ThreadPoolTest {
 
      public static void main(String[] args) {
         System.out.println(""quit" 退出");
         System.out.println(""task A 10" 启动任务A,时长为10秒");
         System.out.println(""size 2" 设置当前线程池大小为2");
         System.out.println(""max 3" 设置线程池最大线程数为3");
         System.out.println();
         
         final ThreadPool pool = new ThreadPool(3, 2);
         pool.init();
         
          Thread cmdThread = new Thread(){
              public void run(){
                 
                 BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                 
                  while(true){
                      try {
                         String line = reader.readLine();
                         String words[] = line.split(" ");
                          if(words[0].equalsIgnoreCase("quit")){
                             System.exit(0);
                          }else if(words[0].equalsIgnoreCase("size") && words.length >= 2){
                              try{
                                 int size = Integer.parseInt(words[1]);
                                 pool.setPoolSize(size);
                              }catch(Exception e){
                             }
                          }else if(words[0].equalsIgnoreCase("max") && words.length >= 2){
                              try{
                                 int max = Integer.parseInt(words[1]);
                                 pool.setMaxPoolSize(max);
                              }catch(Exception e){
                             }
                          }else if(words[0].equalsIgnoreCase("task") && words.length >= 3){
                              try{
                                 int timelen = Integer.parseInt(words[2]);
                                 SimpleTask task = new SimpleTask(words[1], timelen * 1000);
                                 pool.processTask(task);
                              }catch(Exception e){
                             }
                         }
                         
                      } catch (IOException e) {
                         e.printStackTrace();
                     }
                 }
             }
         };
         
         cmdThread.start();
          /*
         for(int i=0; i<10; i++){
             SimpleTask task = new SimpleTask("Task" + i, (i+10)*1000);
             pool.processTask(task);
         }*/
     }
 
 }
 
  class SimpleTask implements ThreadTask{
     
     private String taskName;
     private int timeLen;
     
      public SimpleTask(String taskName, int timeLen){
         this.taskName = taskName;
         this.timeLen = timeLen;
     }
     
      public void run() {
         System.out.println(Thread.currentThread().getId() +
                 ": START TASK "" + taskName + """);
          try {
             Thread.sleep(timeLen);
          } catch (InterruptedException e) {
         }
         
         System.out.println(Thread.currentThread().getId() +
                 ": END TASK "" + taskName + """);
     }
     
 }


使用此线程池相当简单,下面两行代码初始化线程池:

ThreadPool pool = new ThreadPool(3, 2);
pool.init();

要处理的任务实现ThreadTask接口即可(如测试代码里的SimpleTask),这个接口只有一个方法run()
两行代码即可调用:

ThreadTask task = ... //实例化你的任务对象
pool.processTask(task); 


分享到:
评论

相关推荐

    java线程池使用后到底要关闭吗

    java线程池使用后到底要关闭吗 java线程池是一种高效的并发编程技术,可以帮助开发者更好地管理线程资源,提高系统的性能和可靠性。然而,在使用java线程池时,一个常见的问题是:使用完线程池后到底要不要关闭?...

    java线程池完整代码

    "Java 线程池完整代码解析" Java 线程池是 Java 语言中的一个重要概念,它允许开发者创建和管理多个线程,以提高程序的并发性和性能。下面是对给定文件的解析,包括 title、description、标签和部分内容的解析。 ...

    java线程池封装j

    Java线程池是一种高效管理线程的技术,它允许开发者预定义一组线程,根据任务的需要灵活调度,而不是每次需要执行任务时都创建新的线程。这种设计模式大大提高了系统的性能,减少了系统资源的消耗,特别是在高并发...

    java线程池知识.ppt

    java线程池知识、

    java线程池实例详细讲解

    Java线程池是一种高效管理线程资源的工具,它能够帮助开发者有效地控制并调度线程,从而提升系统性能,减少系统资源的浪费。在Java中,`ExecutorService`接口是线程池的主要入口,它是`java.util.concurrent`包的一...

    Java线程池使用说明

    Java线程池是Java并发编程中的重要组件,它能够有效地管理和复用线程,从而提高程序的执行效率和降低资源消耗。在JDK 1.5版本之前,Java对线程池的支持非常有限,而在JDK 1.5之后,加入了java.util.concurrent包,...

    java线程池threadpool简单使用源码

    Java线程池(ThreadPool)是Java并发编程中的一个重要概念,它可以帮助我们有效地管理和控制并发执行的任务,从而提高系统的效率和稳定性。线程池通过复用已存在的线程,避免了频繁创建和销毁线程带来的开销,同时也...

    自定义实现Java线程池

    ### 自定义实现Java线程池 #### 一、概述 在深入探讨自定义Java线程池之前,我们先简要回顾一下线程池的基本概念及其重要性。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动...

    java线程池的源码分析.zip

    Java线程池是Java并发编程中的重要组成部分,它在多线程和高并发场景下扮演着关键角色。本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要...

    Java 线程池.docx

    Java线程池是一种高效管理线程资源的工具,它的出现是为了应对多线程编程中频繁创建和销毁线程带来的性能开销以及资源消耗。在Java中,通过使用线程池,我们可以预先创建一定数量的线程,这些线程在空闲时可以被复用...

    Java线程池与ThreadPoolExecutor.pdf

    Java线程池是Java并发编程中的重要组成部分,它允许开发者管理多个线程并有效地调度任务。线程池通过ThreadPoolExecutor类实现,这是一个高度可配置的工具,能够根据具体需求定制线程的创建、管理和销毁策略。 ...

    Java简单线程池 线程池中文文档

    简单的线程池程序+中文文档 包结构: com.tangkai.threadpool --SimpleThread.java 工作线程 --TestThreadPool.java 程序入口 --ThreadPoolManager.java 线程池管理类

    Java 线程池的原理与实现

    Java线程池是一种高级的多线程处理框架,它是Java并发编程中非常重要的一个组件。线程池的原理和实现涉及到操作系统调度、内存管理和并发控制等多个方面。理解线程池的工作原理有助于优化程序性能,避免过度创建和...

    一个通用的Java线程池类

    2.然后根据提示运行java命令执行示例程序,观看线程池的运行结果 目标:Java中多线程技术是一个难点,但是也是一个核心技术。因为Java本身就是一个多线程语言。本人目前在给46班讲授Swing的网络编程--使用Swing来...

    java 线程池

    ### Java线程池详解 #### 一、线程与线程池的概念 在Java中,线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程(例如某个Java应用)至少有一个线程,如果线程...

    java 线程池实现多并发队列后进先出

    Java线程池是一种高效管理并发任务的机制,它允许开发者预先配置一定数量的线程,以便在处理多个并发任务时能有效地复用这些线程,从而避免了频繁创建和销毁线程带来的开销。在Java中,`java.util.concurrent`包下的...

    基于Java线程池技术实现Knock Knock游戏项目.zip

    基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池...

    JAVA线程池的原理与实现.pdf

    Java线程池是一种高效利用系统资源、管理并发执行任务的机制。它的原理是通过预先创建一组线程,这些线程在任务到来时可以立即执行,而不是每次需要执行任务时都新建线程,从而降低了线程创建和销毁带来的开销。...

    Java线程池及观察者模式解决多线程意外死亡重启问题

    Java线程池是Java并发编程中的重要组成部分,它允许开发者高效地管理多个并发执行的线程,有效地控制系统的资源消耗,提高系统性能和稳定性。在Java中,`java.util.concurrent`包提供了`ExecutorService`接口及其...

    Java 线程池.pptx

    讲述了java线程池的优点,参数,6种线程池的使用场景,线程池用到的handler,线程任务的提交方式等等。

Global site tag (gtag.js) - Google Analytics