- 浏览: 360008 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
呆呆DE萌萌:
不可以吗?Timer里不是有一个指定首次运行时间firstDa ...
在Spring中使用 Java Timer 调度任务 -
accpchf:
不太明白,Error jvm都已经停止线程了,怎么还能转译?
深入探索 高效的Java异常处理框架。 -
bo_hai:
讲的详细。谢谢!
详解spring2.0的scope -
hpjianhua:
学习下...
线程池的实现 -
eltonto:
以后你可以来这里看看favicon在线转换
Tomcat中使用Favicon
设计目标
提供一个线程池的组件,具有良好的伸缩性,当线程够用时,销毁不用线程,当线程不够用时,自动增加线程数量;
提供一个工作任务接口和工作队列,实际所需要的任务都必须实现这个工作任务接口,然后放入工作队列中;
线程池中的线程从工作队列中,自动取得工作任务,执行任务。
主要控制类和功能接口设计
线程池管理器 ThreadPoolManager 的功能:
管理线程池中的各个属性变量
最大工作线程数
最小工作线程数
激活的工作线程总数
睡眠的工作线程总数
工作线程总数 (即:激活的工作线程总数+睡眠的工作线程总数)
创建工作线程
销毁工作线程
启动处于睡眠的工作线程
睡眠处于激活的工作线程
缩任务:当工作线程总数小于或等于最小工作线程数时,销毁多余的睡眠的工作线程,使得现有工作线程总数等于最小工作任务总数
伸任务:当任务队列任务总数大于工作线程数时,增加工作线程总数至最大工作线程数
提供线程池启动接口
提供线程池销毁接口
工作线程 WorkThread 的功能:
从工作队列取得工作任务
执行工作任务接口中的指定任务
工作任务接口 ITask 的功能:
提供指定任务动作
工作队列 IWorkQueue 的功能:
提供获取任务接口,并删除工作队列中的任务;
提供加入任务接口;
提供删除任务接口;
提供取得任务总数接口;
提供自动填任务接口;(当任务总数少于或等于默认总数的25%时,自动装填)
提供删除所有任务接口;
[ ThreadPoolManager ]
[ PoolWatchDog ]
[ WorkThread ]
[ QueueThread ]
[ WorkQueue ]
[ MyWorkQueue ]
[ MyTask ]
提供一个线程池的组件,具有良好的伸缩性,当线程够用时,销毁不用线程,当线程不够用时,自动增加线程数量;
提供一个工作任务接口和工作队列,实际所需要的任务都必须实现这个工作任务接口,然后放入工作队列中;
线程池中的线程从工作队列中,自动取得工作任务,执行任务。
主要控制类和功能接口设计
线程池管理器 ThreadPoolManager 的功能:
管理线程池中的各个属性变量
最大工作线程数
最小工作线程数
激活的工作线程总数
睡眠的工作线程总数
工作线程总数 (即:激活的工作线程总数+睡眠的工作线程总数)
创建工作线程
销毁工作线程
启动处于睡眠的工作线程
睡眠处于激活的工作线程
缩任务:当工作线程总数小于或等于最小工作线程数时,销毁多余的睡眠的工作线程,使得现有工作线程总数等于最小工作任务总数
伸任务:当任务队列任务总数大于工作线程数时,增加工作线程总数至最大工作线程数
提供线程池启动接口
提供线程池销毁接口
工作线程 WorkThread 的功能:
从工作队列取得工作任务
执行工作任务接口中的指定任务
工作任务接口 ITask 的功能:
提供指定任务动作
工作队列 IWorkQueue 的功能:
提供获取任务接口,并删除工作队列中的任务;
提供加入任务接口;
提供删除任务接口;
提供取得任务总数接口;
提供自动填任务接口;(当任务总数少于或等于默认总数的25%时,自动装填)
提供删除所有任务接口;
[ ThreadPoolManager ]
package test.thread.pool1; import java.util.ArrayList; import java.util.List; import test.thread.pool1.impl.MyWorkQueue; /** * <p>Title: 线程池管理器</p> * <p>Description: </p> * @version 1.0 */ public class ThreadPoolManager { /*最大线程数*/ private int threads_max_num; /*最小线程数*/ private int threads_min_num; /* 线程池线程增长步长 */ private int threads_increase_step = 5; /* 任务工作队列 */ private IWorkQueue queue; /* 线程池监视狗 */ private PoolWatchDog poolWatchDog ; /* 队列线程 */ private Thread queueThread ; /* 线程池 封装所有工作线程的数据结构 */ private List pool = new ArrayList(); /* 线程池中 封装所有钝化后的数据结构*/ private List passivePool = new ArrayList(); /* 空闲60秒 */ private static final long IDLE_TIMEOUT = 60000L; /* 关闭连接池标志位 */ private boolean close = false; /** * 线程池管理器 * @param queue 任务队列 * @param threads_min_num 工作线程最小数 * @param threads_max_num 工作线程最大数 */ public ThreadPoolManager(int threads_max_num ,int threads_min_num ,IWorkQueue queue){ this.threads_max_num = threads_max_num; this.threads_min_num = threads_min_num; this.queue = queue; } /** * 线程池启动 */ public void startPool(){ System.out.println("=== startPool.........."); poolWatchDog = new PoolWatchDog("PoolWatchDog"); poolWatchDog.setDaemon(true); poolWatchDog.start(); System.out.println("=== startPool..........over"); } /** * 线程池销毁接口 */ public void destoryPool(){ System.out.println("==========================DestoryPool starting ..."); this.close = true; int pool_size = this.pool.size(); //中断队列线程 System.out.println("===Interrupt queue thread ... "); queueThread.interrupt(); queueThread = null; System.out.println("===Interrupt thread pool ... "); Thread pool_thread = null; for(int i=0; i<pool_size; i++){ pool_thread = (Thread)pool.get(i); if(pool_thread !=null && pool_thread.isAlive() && !pool_thread.isInterrupted()){ pool_thread.interrupt(); System.out.println("Stop pool_thread:" +pool_thread.getName()+"[interrupt] " +pool_thread.isInterrupted()); } }//end for if(pool != null){ pool.clear(); } if(passivePool != null){ pool.clear(); } try{ System.out.println("=== poolWatchDog.join() starting ..."); poolWatchDog.join(); System.out.println("=== poolWatchDog.join() is over ..."); } catch(Throwable ex){ System.out.println("###poolWatchDog ... join method throw a exception ... " + ex.toString()); } poolWatchDog =null; System.out.println("==============================DestoryPool is over ..."); } public static void main(String[] args) throws Exception{ ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000)); threadPoolManager1.startPool(); Thread.sleep(60000); threadPoolManager1.destoryPool(); }
[ PoolWatchDog ]
/** * 线程池监视狗 */ private class PoolWatchDog extends Thread{ public PoolWatchDog(String name){ super(name); } public void run(){ Thread workThread = null; Runnable run = null; //开启任务队列线程,获取数据-------- System.out.println("===QueueThread starting ... ... "); queueThread = new Thread(new QueueThread(),"QueueThread"); queueThread.start(); System.out.println("===Initial thread Pool ... ..."); //初始化线程池的最小线程数,并放入池中 for(int i=0; i<threads_min_num; i++){ run = new WorkThread(); workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i); workThread.start(); if(i == threads_min_num -1){ workThread = null; run = null; } } System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size()); //线程池线程动态增加线程算法-------------- while(!close){ //等待5秒钟,等上述线程都启动---------- synchronized(this){ try{ System.out.println("===Wait the [last time] threads starting ...."); this.wait(15000); } catch(Throwable ex){ System.out.println("###PoolWatchDog invoking is failure ... "+ex); } }//end synchronized //开始增加线程-----------------------spread动作 int queue_size = queue.getTaskSize(); int temp_size = (queue_size - threads_min_num); if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){ System.out.println("================Spread thread pool starting ...."); for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){ System.out.println("=== Spread thread num : "+i); run = new WorkThread(); workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i); workThread.start(); }//end for workThread = null; run = null; System.out.println("===Spread thread pool is over .... and pool size:"+pool.size()); }//end if //删除已经多余的睡眠线程-------------shrink动作 int more_sleep_size = pool.size() - threads_min_num;//最多能删除的线程数 int sleep_threads_size = passivePool.size(); if(more_sleep_size >0 && sleep_threads_size >0){ System.out.println("================Shrink thread pool starting ...."); for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){ System.out.println("=== Shrink thread num : "+i); Thread removeThread = (Thread)passivePool.get(0); if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){ removeThread.interrupt(); } } System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size()); } System.out.println("===End one return [shrink - spread operator] ...."); }//end while }//end run }//end private class
[ WorkThread ]
/** * 工作线程 */ class WorkThread implements Runnable{ public WorkThread(){ } public void run(){ String name = Thread.currentThread().getName(); System.out.println("===Thread.currentThread():"+name); pool.add(Thread.currentThread()); while(true){ //获取任务--------- ITask task = null; try{ System.out.println("===Get task from queue is starting ... "); //看线程是否被中断,如果被中断停止执行任务---- if(Thread.currentThread().isInterrupted()){ System.out.println("===Breaking current thread and jump whlie [1] ... "); break; } task = queue.getTask(); } catch(Throwable ex){ System.out.println("###No task in queue:"+ex); }//end tryc if(task != null){ //执行任务--------- try{ System.out.println("===Execute the task is starting ... "); //看线程是否被中断,如果被中断停止执行任务---- if(Thread.currentThread().isInterrupted()){ System.out.println("===Breaking current thread and jump whlie [1] ... "); break; } task.executeTask(); //任务执行完毕------- System.out.println("===Execute the task is over ... "); } catch(Throwable ex){ System.out.println("###Execute the task is failure ... "+ex); }//end tryc }else{ //没有任务,则钝化线程至规定时间-------- synchronized(this){ try{ System.out.println("===Passivate into passivePool ... "); //看线程是否被中断,如果被中断停止执行任务---- boolean isInterrupted = Thread.currentThread().isInterrupted(); if(isInterrupted){ System.out.println("===Breaking current thread and jump whlie [1] ... "); break; } // passivePool.add(this); passivePool.add(Thread.currentThread()); //准备睡眠线程------- isInterrupted = Thread.currentThread().isInterrupted(); if(isInterrupted){ System.out.println("===Breaking current thread and jump whlie [2] ... "); break; } this.wait(IDLE_TIMEOUT); } catch(Throwable ex1){ System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1); break; } } } }//end while-------- if(pool.contains(passivePool)){ pool.remove(this); } if(passivePool.contains(passivePool)){ passivePool.remove(this); } System.out.println("===The thread execute over ... "); }//end run---------- }
[ QueueThread ]
class QueueThread implements Runnable{ public QueueThread(){ } public void run(){ while(true){ //自动装在任务-------- queue.autoAddTask(); System.out.println("===The size of queue's task is "+queue.getTaskSize()); synchronized(this){ if(Thread.currentThread().isInterrupted()){ break; }else{ try{ this.wait(queue.getLoadDataPollingTime()); } catch(Throwable ex){ System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex); break; } }//end if }//end synchr }//end while }//end run } }
[ WorkQueue ]
package test.thread.pool1; import java.util.LinkedList; import test.thread.pool1.impl.MyTask; /** * <p>Title: 工作队列对象 </p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2005</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public abstract class WorkQueue implements IWorkQueue{ /* 预计装载量 */ private int load_size; /* 数据装载轮循时间 */ private long load_polling_time; /* 队列 */ private LinkedList queue = new LinkedList(); /** * * @param load_size 预计装载量 * @param load_polling_time 数据装载轮循时间 */ public WorkQueue(int load_size,long load_polling_time){ this.load_size = (load_size <= 10) ? 10 : load_size; this.load_polling_time = load_polling_time; } /* 数据装载轮循时间 */ public long getLoadDataPollingTime(){ return this.load_polling_time; } /*获取任务,并删除队列中的任务*/ public synchronized ITask getTask(){ ITask task = (ITask)queue.getFirst(); queue.removeFirst(); return task; } /*加入任务*/ public void addTask(ITask task){ queue.addLast(task); } /*删除任务*/ public synchronized void removeTask(ITask task){ queue.remove(task); } /*任务总数*/ public synchronized int getTaskSize(){ return queue.size(); } /*自动装填任务*/ public synchronized void autoAddTask(){ synchronized(this){ float load_size_auto = load_size - getTaskSize() / load_size; System.out.println("===load_size_auto:"+load_size_auto); if(load_size_auto > 0.25){ autoAddTask0(); } else { System.out.println("=== Not must load new work queue ... Now! "); } } } /*删除所有任务*/ public synchronized void clearAllTask(){ queue.clear(); } /** * 程序员自己实现该方法 */ protected abstract void autoAddTask0(); }
[ MyWorkQueue ]
package test.thread.pool1.impl; import java.util.LinkedList; import test.thread.pool1.WorkQueue; /** * <p>Title: 例子工作队列对象 </p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2005</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public class MyWorkQueue extends WorkQueue{ /** * @param load_size 预计装载量 * @param load_polling_time 数据装载轮循时间 */ public MyWorkQueue(int load_size,long load_polling_time){ super(load_size,load_polling_time); } /** * 自动加载任务 */ protected synchronized void autoAddTask0(){ //------------------- System.out.println("===MyWorkQueue ... invoked autoAddTask0() method ..."); for(int i=0; i<10; i++){ System.out.println("===add task :"+i); this.addTask(new MyTask()); } //------------------- } }
[ MyTask ]
package test.thread.pool1.impl; import test.thread.pool1.ITask; /** * <p>Title: 工作任务接口 </p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2005</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public class MyTask implements ITask { /** * 执行的任务 * @throws java.lang.Throwable */ public void executeTask() throws Throwable{ System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... "); } }
发表评论
-
Apache+tomcat集群
2009-04-24 22:18 3777一、集群和负载均衡的概念 (一)集群的概念 ... -
OOM和JVM配置优化
2009-04-23 10:01 1706OOM这个缩写就是Jav ... -
简单线程池的实现
2008-09-29 15:17 1465最近看了下JAVA线程相关的资料,顺便写了个自己的线程池的实现 ... -
位运算常用操作总结
2008-08-02 12:13 2002位运算应用口诀 清零取反要用与,某位置一可用或 若要取反和交换 ... -
深入探索 高效的Java异常处理框架。
2008-08-01 11:14 12226摘要:本文从Java异常最基本的概念、语法开始讲述了Java异 ... -
利用Java实现串口全双工通讯
2008-07-16 00:53 2253一个嵌入式系统通常需要通过串口与其主控系统进行全双工通讯,譬如 ... -
使用JNative对条码打印机进行打印
2008-07-16 00:50 2917使用JNative对条码打印机进行打印 因项目需要,对Gode ... -
java.lang.OutOfMemoryError: Permgen space 异常
2008-06-25 17:36 1732PermGen space的全称是Permanent Gene ... -
ThreadLocal的设计与使用(原理篇)
2008-02-18 15:11 20244在jdk1.2推出时开始支持java.lang.ThreadL ... -
Java多线程程序设计详细解析
2008-01-27 16:16 1472一、理解多线程 多线程是这样一种机制,它允许在程序中 ... -
为图片添加版权水印
2008-01-17 00:55 2600因为项目中考虑到添加图片版权的保护,特意看了下水印的处理... ... -
Web文件的ContentType类型大全
2008-01-16 20:39 1082".*"="applicatio ... -
Java 中文件的操作
2008-01-16 16:56 1576/* * Made In iuxi.com * */ ...
相关推荐
Windows下一个比较完美的线程池实现和示例 本线程池提供了如下功能: 1.能根据任务个数和当前线程的多少在最小/最大线程个数之间自动调整(Vista后的系统有 SetThreadpoolThreadMaximum 等函数有类似功能); 2.能方便...
Django异步任务线程池实现原理主要涉及以下几个核心知识点: 1. 异步任务执行原理: 当Django应用在处理耗时的任务时,通常会阻塞主线程,导致用户在等待处理结果时无法进行其他操作。为了解决这个问题,Django采用...
在处理大量数据导入数据库的场景中,使用...通过以上步骤,可以利用EasyExcel和线程池实现百万级数据从Excel导入到数据库的功能。这种方式可以提高数据处理的效率,减少内存占用,并且能够更好地利用多核CPU的优势。
总结来说,这个简单的C++线程池实现是一个学习多线程和并发编程的好起点。它通过封装线程管理和任务调度,为开发者提供了一种更高效、更可控的方式来处理并发任务。在实际应用中,线程池可以被扩展以适应更复杂的...
总结起来,Linux C系统编程中使用线程池实现类似`cp`命令的功能,是一个涉及多线程编程、任务调度和同步控制的综合实践。通过这样的实现,我们可以提高文件复制操作的并发性和效率,同时降低系统资源的消耗。在深入...
### C++线程池实现原理分析 #### 一、引言 线程池是一种软件设计模式,用于管理和控制大量线程的创建与销毁,尤其是在处理大量短期任务时,它可以显著提高程序性能。线程池的核心思想是预先创建一组线程,并让它们...
在"java 线程池实现多并发队列后进先出"这个主题中,我们关注的是线程池如何利用特定类型的队列来实现后进先出(LIFO,Last-In-First-Out)的行为。通常,线程池默认使用先进先出(FIFO,First-In-First-Out)的队列...
5. **异步下载**:通过线程池实现的下载是异步的,这意味着主线程不会被阻塞,用户界面仍然可以保持流畅。这是在Android中进行网络操作时必须遵循的原则,因为网络操作在主线程上执行会导致ANR(应用无响应)错误。 ...
在本篇文章中,我们将深入探讨Python中的线程池实现,并参考提供的`ThreadPool.py`源码进行分析。 首先,Python标准库提供了一个名为`concurrent.futures`的模块,其中包含`ThreadPoolExecutor`类,它是实现线程池...
以下将详细讲解基于Win32的C++线程池实现的关键概念和技术。 首先,我们需要理解Win32 API中的线程池接口。Windows提供了CreateThreadpool、SetThreadpoolCallbackPool、QueueUserWorkItem等函数来创建和管理线程池...
前段时间发布了《Windows下一个比较完美的线程池实现和示例》(http://download.csdn.net/detail/fishjam/5106672),根据下载量和评论来看,对大家还比较有用。 现在发布一个利用该线程池实现的Http上传下载实现,...
不过,上述代码展示的是一个自定义的线程池实现,它可能没有使用Java标准库中的`ExecutorService`。 这个自定义线程池的实现包括以下几个关键组件: 1. **线程池参数**: - `reserve`:保留线程数,这些线程不...
下面是一个简化的线程池实现的伪代码: ```cpp class ThreadPool { public: void enqueue(std::function()> task); // 提交任务 ~ThreadPool(); // 关闭线程池 private: std::vector<std::thread> workers; // ...
在"基于win32的C++线程池实现(改进版)"中,开发者已经针对上一版本的问题进行了修复,如崩溃和内存泄漏。这些问题是多线程编程中常见的挑战,崩溃可能是由于线程间的同步问题或者资源管理不当,而内存泄漏则可能导致...
本项目涉及的核心知识点是“Linux下的socket线程池实现”,这涉及到多个技术层面,包括socket编程、多线程技术和线程池的管理。 首先,让我们了解什么是Socket。Socket是网络通信的基本接口,它允许应用程序通过...
Tomcat提供了两种线程池实现,一种是基于Apache Portable Runtime (APR)的Pool技术,另一种是纯Java实现的ThreadPool。本文主要探讨后者,即Java实现的线程池。 Java实现的线程池位于`tomcat-util.jar`中,初始化时...
本篇文章将深入讲解如何使用Qt的线程池实现多线程HTTP下载,并涵盖限速下载及进度显示的相关知识。 首先,我们要了解Qt中的线程池(QThreadPool)机制。QThreadPool是Qt提供的一个管理线程资源的类,它可以调度和...
基于现代C++的高效线程池实现源码+项目文档说明(兼容Linux、macOS 和 Windows系统).zip 【说明】 【1】项目代码完整且功能都验证ok,确保稳定可靠运行后才上传。欢迎下载使用!在使用过程中,如有问题或建议,请及时...
线程池实现
本篇文章将深入讲解如何在Socket服务端实现线程池,以及其在实际项目中的应用。 首先,我们来看`SocketServer.java`。这个文件通常包含了服务器端的主逻辑,其中包括了服务器的启动、监听客户端连接和创建线程池来...