`

Java实现通用线程池

    博客分类:
  • java
阅读更多
线程池通俗的描述就是预先创建若干空闲线程,等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务,这样就省去了频繁创建线程的时间,因为频繁创建线程是要耗费大量的CPU资源的。如果一个应用程序需要频繁地处理大量并发事务,不断的创建销毁线程往往会大大地降低系统的效率,这时候线程池就派上用场了。
      本文旨在使用Java语言编写一个通用的线程池。当需要使用线程池处理事务时,只需按照指定规范封装好事务处理对象,然后用已有的线程池对象去自动选择空闲线程自动调用事务处理对象即可。并实现线程池的动态修改(修改当前线程数,最大线程数等)。下面是实现代码:
//ThreadTask .java


package polarman.threadpool;

/** *//**
 * 线程任务
 * @author ryang
 * 2006-8-8
 */
public interface ThreadTask ...{
    public void run();
}




//PooledThread.java

package polarman.threadpool;

import java.util.Collection;
import java.util.Vector;

/** *//**
 * 接受线程池管理的线程
 * @author ryang
 * 2006-8-8
 */
public class PooledThread extends Thread ...{
   
    protected Vector tasks = new Vector();
    protected boolean running = false;
    protected boolean stopped = false;
    protected boolean paused = false;
    protected boolean killed = false;
    private ThreadPool pool;
   
    public PooledThread(ThreadPool pool)...{
        this.pool = pool;
    }
   
    public void putTask(ThreadTask task)...{
        tasks.add(task);
    }
   
    public void putTasks(ThreadTask[] tasks)...{
        for(int i=0; i<tasks.length; i++)
            this.tasks.add(tasks[i]);
    }
   
    public void putTasks(Collection tasks)...{
        this.tasks.addAll(tasks);
    }
   
    protected ThreadTask popTask()...{
        if(tasks.size() > 0)
            return (ThreadTask)tasks.remove(0);
        else
            return null;
    }
   
    public boolean isRunning()...{
        return running;
    }
   
    public void stopTasks()...{
        stopped = true;
    }
   
    public void stopTasksSync()...{
        stopTasks();
        while(isRunning())...{
            try ...{
                sleep(5);
            } catch (InterruptedException e) ...{
            }
        }
    }
   
    public void pauseTasks()...{
        paused = true;
    }
   
    public void pauseTasksSync()...{
        pauseTasks();
        while(isRunning())...{
            try ...{
                sleep(5);
            } catch (InterruptedException e) ...{
            }
        }
    }
   
    public void kill()...{
        if(!running)
            interrupt();
        else
            killed = true;
    }
   
    public void killSync()...{
        kill();
        while(isAlive())...{
            try ...{
                sleep(5);
            } catch (InterruptedException e) ...{
            }
        }
    }
   
    public synchronized void startTasks()...{
        running = true;
        this.notify();
    }
   
    public synchronized void run()...{
        try...{
            while(true)...{
                if(!running || tasks.size() == 0)...{
                    pool.notifyForIdleThread();
                    //System.out.println(Thread.currentThread().getId() + ": 空闲");
                    this.wait();
                }else...{
                    ThreadTask task;
                    while((task = popTask()) != null)...{
                        task.run();
                        if(stopped)...{
                            stopped = false;
                            if(tasks.size() > 0)...{
                                tasks.clear();
                                System.out.println(Thread.currentThread().getId() + ": Tasks are stopped");
                                break;
                            }
                        }
                        if(paused)...{
                            paused = false;
                            if(tasks.size() > 0)...{
                                System.out.println(Thread.currentThread().getId() + ": Tasks are paused");
                                break;
                            }
                        }
                    }
                    running = false;
                }

                if(killed)...{
                    killed = false;
                    break;
                }
            }
        }catch(InterruptedException e)...{
            return;
        }
       
        //System.out.println(Thread.currentThread().getId() + ": Killed");
    }
}


//ThreadPool.java

package polarman.threadpool;

import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;

/** *//**
 * 线程池
 * @author ryang
 * 2006-8-8
 */
public class ThreadPool ...{
   
    protected int maxPoolSize;
    protected int initPoolSize;
    protected Vector threads = new Vector();
    protected boolean initialized = false;
    protected boolean hasIdleThread = false;
   
    public ThreadPool(int maxPoolSize, int initPoolSize)...{
        this.maxPoolSize = maxPoolSize;
        this.initPoolSize = initPoolSize;
    }
   
    public void init()...{
        initialized = true;
        for(int i=0; i<initPoolSize; i++)...{
            PooledThread thread = new PooledThread(this);
            thread.start();
            threads.add(thread);
        }
       
        //System.out.println("线程池初始化结束,线程数=" + threads.size() + " 最大线程数=" + maxPoolSize);
    }
   
    public void setMaxPoolSize(int maxPoolSize)...{
        //System.out.println("重设最大线程数,最大线程数=" + maxPoolSize);
        this.maxPoolSize = maxPoolSize;
        if(maxPoolSize < getPoolSize())
            setPoolSize(maxPoolSize);
    }
   
    /** *//**
     * 重设当前线程数
     * 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成
     * 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
     * @param size
     */
    public void setPoolSize(int size)...{
        if(!initialized)...{
            initPoolSize = size;
            return;
        }else if(size > getPoolSize())...{
            for(int i=getPoolSize(); i<size && i<maxPoolSize; i++)...{
                PooledThread thread = new PooledThread(this);
                thread.start();
                threads.add(thread);
            }
        }else if(size < getPoolSize())...{
            while(getPoolSize() > size)...{
                PooledThread th = (PooledThread)threads.remove(0);
                th.kill();
            }
        }
       
        //System.out.println("重设线程数,线程数=" + threads.size());
    }
   
    public int getPoolSize()...{
        return threads.size();
    }
   
    protected void notifyForIdleThread()...{
        hasIdleThread = true;
    }
   
    protected boolean waitForIdleThread()...{
        hasIdleThread = false;
        while(!hasIdleThread && getPoolSize() >= maxPoolSize)...{
            try ...{
                Thread.sleep(5);
            } catch (InterruptedException e) ...{
                return false;
            }
        }
       
        return true;
    }
   
    public synchronized PooledThread getIdleThread()...{
        while(true)...{
            for(Iterator itr=threads.iterator(); itr.hasNext();)...{
                PooledThread th = (PooledThread)itr.next();
                if(!th.isRunning())
                    return th;
            }
           
            if(getPoolSize() < maxPoolSize)...{
                PooledThread thread = new PooledThread(this);
                thread.start();
                threads.add(thread);
                return thread;
            }
           
            //System.out.println("线程池已满,等待...");
            if(waitForIdleThread() == false)
                return null;
        }
    }
   
    public void processTask(ThreadTask task)...{
        PooledThread th = getIdleThread();
        if(th != null)...{
            th.putTask(task);
            th.startTasks();
        }
    }
   
    public void processTasksInSingleThread(ThreadTask[] tasks)...{
        PooledThread th = getIdleThread();
        if(th != null)...{
            th.putTasks(tasks);
            th.startTasks();
        }
    }
   
    public void processTasksInSingleThread(Collection tasks)...{
        PooledThread th = getIdleThread();
        if(th != null)...{
            th.putTasks(tasks);
            th.startTasks();
        }
    }
}


下面是线程池的测试程序
//ThreadPoolTest.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import polarman.threadpool.ThreadPool;
import polarman.threadpool.ThreadTask;

public class ThreadPoolTest ...{

    public static void main(String[] args) ...{
        System.out.println(""quit" 退出");
        System.out.println(""task A 10" 启动任务A,时长为10秒");
        System.out.println(""size 2" 设置当前线程池大小为2");
        System.out.println(""max 3" 设置线程池最大线程数为3");
        System.out.println();
        
        final ThreadPool pool = new ThreadPool(3, 2);
        pool.init();
        
        Thread cmdThread = new Thread()...{
            public void run()...{
                
                BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                
                while(true)...{
                    try ...{
                        String line = reader.readLine();
                        String words[] = line.split(" ");
                        if(words[0].equalsIgnoreCase("quit"))...{
                            System.exit(0);
                        }else if(words[0].equalsIgnoreCase("size") && words.length >= 2)...{
                            try...{
                                int size = Integer.parseInt(words[1]);
                                pool.setPoolSize(size);
                            }catch(Exception e)...{
                            }
                        }else if(words[0].equalsIgnoreCase("max") && words.length >= 2)...{
                            try...{
                                int max = Integer.parseInt(words[1]);
                                pool.setMaxPoolSize(max);
                            }catch(Exception e)...{
                            }
                        }else if(words[0].equalsIgnoreCase("task") && words.length >= 3)...{
                            try...{
                                int timelen = Integer.parseInt(words[2]);
                                SimpleTask task = new SimpleTask(words[1], timelen * 1000);
                                pool.processTask(task);
                            }catch(Exception e)...{
                            }
                        }
                        
                    } catch (IOException e) ...{
                        e.printStackTrace();
                    }
                }
            }
        };
        
        cmdThread.start();
        /**//*
        for(int i=0; i<10; i++){
            SimpleTask task = new SimpleTask("Task" + i, (i+10)*1000);
            pool.processTask(task);
        }*/
    }

}

class SimpleTask implements ThreadTask...{
    
    private String taskName;
    private int timeLen;
    
    public SimpleTask(String taskName, int timeLen)...{
        this.taskName = taskName;
        this.timeLen = timeLen;
    }
    
    public void run() ...{
        System.out.println(Thread.currentThread().getId() +
                ": START TASK "" + taskName + """);
        try ...{
            Thread.sleep(timeLen);
        } catch (InterruptedException e) ...{
        }
        
        System.out.println(Thread.currentThread().getId() +
                ": END TASK "" + taskName + """);
    }
    
}



使用此线程池相当简单,下面两行代码初始化线程池:

ThreadPool pool = new ThreadPool(3, 2);
pool.init();

要处理的任务实现ThreadTask...接口即可(如测试代码里的SimpleTask),这个接口只有一个方法run()
两行代码即可调用:

ThreadTask task = ... //实例化你的任务对象
pool.processTask(task);
分享到:
评论

相关推荐

    java实现通用线程池的代码

    本文旨在使用Java语言编写一个通用的线程池。当需要使用线程池处理事务时,只需按照指定规范封装好事务处理对象,然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可。并实现线程池的动态修改...

    java实现通用的线程池

    java实现通用的线程池,这是我网上找的资料,O(∩_∩)O~希望大家能用的到。

    一个通用的Java线程池类

    环境:Windows XP ...这里本人翻写一个通用的线程池类,它可以用来作为工具类处理许多多线程问题。代码注释非常详尽,一行注释一行代码。 阅读对象:非常熟悉Java的基本概念,并且熟悉命令行编写代码的人员。

    一个可以直接使用的通用线程池Util

    使用步骤: 1.下载解压之后,在控制台运行javac ThreadPoolTest.java 2.然后根据提示运行java命令...这里本人翻写一个通用的线程池类,它可以用来作为工具类处理许多多线程问题。代码注释非常详尽,一行注释一行代码。

    java中通用的线程池实例代码

    Java 中通用的线程池实例代码 Java 中通用的线程池实例代码是指在 Java 编程语言中创建一个通用的线程池实例,以便于在多线程环境下高效地执行任务。下面是该线程池实例代码的详细解释: 一、线程池的概念 在 ...

    仿ACE线程池机制实现的线程池类

    线程池是一种优化资源管理的机制,通过预先创建并维护一组可重用的线程,...本项目提供的线程池实现,借鉴了ACE库的线程池机制,这表明其可能考虑了多种优化策略和最佳实践,有助于在实际项目中实现高性能的并发处理。

    一种类似JAVA线程池的C++线程池实现方法

    不过,需要注意的是,虽然这种实现借鉴了Java线程池的设计思想,但C++标准库并没有提供内置的线程池实现,因此自定义线程池需要考虑平台兼容性、线程同步、错误处理等问题。此外,对于大型项目,可能还需要考虑...

    java常用工具类封装

    Java中的线程池是由`java.util.concurrent`包中的`ExecutorService`接口及其实现类如`ThreadPoolExecutor`提供的。线程池可以有效地管理和控制并发执行的任务数量,避免频繁创建和销毁线程带来的性能开销。通过设置...

    Java并发之线程池Executor框架的深入理解

    Java中的线程池Executor框架是Java并发编程中的一种常见机制,提供了一个通用的执行器接口,用于定义执行Runnable任务的方式。通过使用线程池,可以大大减少线程的创建和销毁开销,从而提高系统的性能和稳定性。

    Java通用范例开发金典源代码

    《Java通用范例开发金典源代码》是一个包含大量Java编程示例的资源集合,旨在帮助开发者深入理解和掌握Java编程语言的各种特性和应用场景。这个压缩包文件中的源代码覆盖了Java语言的基础到高级各个方面,是Java学习...

    java实现csv导出千万级数据实例

    本实例聚焦于“java实现csv导出千万级数据实例”,旨在提供一个高效、稳定的解决方案,避免因数据量过大而导致的性能问题,如Java中的栈溢出(Stack Overflow)。CSV(Comma Separated Values)格式因其简单、通用性...

    徒手实现线程池-1

    - **ThreadPoolExecutor**:这是一个通用的线程池实现,允许自定义核心线程数、最大线程数、线程存活时间、拒绝策略等参数。它可以处理各种类型的工作负载。 - **ScheduledThreadPoolExecutor**:这是用于定时或...

    线程池使用Demo

    1. `ThreadPoolExecutor`:这是最通用的线程池实现,允许自定义工作队列、线程工厂和拒绝策略。 2. `FixedThreadPool`:固定大小的线程池,线程数量不变,多余的提交任务会被放入队列等待。 3. `...

    线程池通用包(源码版)

    本压缩包提供的"线程池通用包(源码版)"是作者自定义实现的一个线程池工具,内部包含了丰富的注释,有助于深入理解线程池的工作原理。 线程池的核心组件包括以下几个部分: 1. **工作队列(Work Queue)**:工作...

    通用多线程模块(jdk线程池的运用)

    介绍一个通用多线程服务模块。是利用jdk线程池,多线程并行处理多任务,以提高执行效率。

    java实现rpc框架

    Java提供了线程池(ExecutorService)和并发集合(如ConcurrentHashMap)等工具来管理并发。另外,还可以利用锁、信号量等同步原语控制并发访问。 **5. 注册中心** 注册中心是RPC框架中的关键组件,它负责服务提供...

    基于Java实现的代理服务器

    Java 实现的代理服务器是一种网络通信工具,它允许客户端通过该服务器作为中介与目标服务器进行交互,从而隐藏了客户端的真实身份或提供了额外的安全性。在本项目中,我们主要关注的是实现了 SOCKS4 和 SOCKS5 协议...

    java实现邮件自动发送

    在你的程序中,你可以将邮件发送功能封装成一个通用的方法,接收必要的参数(如收件人、主题、正文等),然后调用这个方法实现邮件的发送。 8. **注意点** - 安全性:确保使用安全协议,如TLS或SSL,防止密码在...

Global site tag (gtag.js) - Google Analytics