New a threadPool
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
poolCoreSize, poolMaxSize, timeout,
java.util.concurrent.TimeUnit.MILLISECONDS, buff, handler);
poolcoreSize: Pool至少运行的线程数量
poolMaxSize: Pool最多运行的线程数量
timeout:空闲线程发呆时间,如果某线程发呆时间>timeout 而当前线程数量>poolcoreSize,就会终止该线程
buff:线程池所用的缓冲队列 实现BlockingQueue的那几个都能用,能设置定长和无限长度
handler: 当线程池线程数到达poolMaxSize buff 又满了,有新的任务丢到线程池的时候就会call这个hander进行处理.默认提供了几种处理方式,也可以自己实现,例如
public class PoolRejectHandler implements RejectedExecutionHandler {
/* (non-Javadoc)
* @see java.util.concurrent.RejectedExecutionHandler#rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
*/
public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
//log process is slower then produce
//put back to inqueue
BaseTask task = (BaseTask)arg0;
//task.pushToInQueue(task.getInObject());
System.out.println("!!!!!!overload!!!");
try {
task.getInQueue().put(task.getInObject());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这里我把这个task丢回一开始的任务队列了,这里可以实现自己的处理逻辑
New and excute task
任何实现runnable 或者callable接口的类 都可以作为一个任务丢到线程池里面运行
executorService.execute(processTask);
ThreadPool 调度策略
条件: 线程数<poolcoreSize, buff未满,有新的任务提交 结果:新建线程处理这个任务
条件: 线程数>=poolcoreSize, buff未满,有新的任务提交 结果:把任务丢到buff对列中
条件: poolMaxSize>线程数>=poolcoreSize, buff满,有新的任务提交 结果:新建线程
条件: poolMaxSize=线程数>poolcoreSize, buff满,有新的任务提交 结果:调用handler处理这个任务
这里需要讨论的就是最后那个情况,虽然threadpool提供了handler来处理这种超出负荷的情况,但是如果能从源头就直接控制提交到线程池的任务数量,感觉上会好很多
这里我用到一个Manager类和ObjectPool来处理任务的提交
Manager类实现Runnable 和 DispatchListener 接口 ,在Runnable里面处理任务生成和分发,DispatchListener接口中实现任务对象的回收和结果收集(示意代码)
public class DispathManager implements Runnable, DispatchListener {
public void run() {
while (true) {
try {
// get task from taskobjectpool. will wait here if the active taskobjects reach the tasklimit
// so we can control the limit of running task in the threadpool
BaseTask processTask = (BaseTask) taskPool.borrowObject();
//set task params
processTask.setListener(this);
// put in threadpool
executorService.execute(processTask);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
// log dispatch queue exception
e.printStackTrace();
}
}
}
public void recycleTask(BaseTask baseTask) {
// TODO Auto-generated method stub
try {
taskPool.returnObject(baseTask);
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后在Task执行完毕的时候进行回收
public void run() {
try {
//task logic here
// return task
listener.recycleTask(this);
} catch (Exception e) {
listener.onTaskError(e, inObject);
}
}
这样就不必每次都重新new 这个task 对一些重型的task,会提高性能
这里用到了ObjectPool,是Apache Common下的管理ObjectPool的类,使用如下:
New a ObjectPool
ObjectPool taskPool = new GenericObjectPool(new TaskPoolFactory(taskClass), taskLimit,
GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);
其中TaskPoolFactory如下 taskClass是重配置文件读入的
public class TaskPoolFactory extends BasePoolableObjectFactory {
/* (non-Javadoc)
* @see org.apache.commons.pool.BasePoolableObjectFactory#makeObject()
*/
private Class c;
public TaskPoolFactory(Class c) {
super();
this.c = c;
}
@Override
public Object makeObject() throws Exception {
// TODO Auto-generated method stub
return c.newInstance();
}
}
taskLimit是允许借出的最大的数目(借出但是没有归还)
当借出的数目已经达到最大,这个时候还有借出请求 GenericObjectPool.WHEN_EXHAUSTED_BLOCK 就提供了策略(还有几种的) 这里设置为无限等待,也就是线程会wait在这里 直到有借出的task被归还
Objectpool使用参照Manager和task类
最后, 控制taskLimit的大小 就可以控制ThreadPool中处理任务的多少,也就是正在处理的task+buff里的task 必定<= taskLimit,这样就在源头控制了提交任务的数量,保证threadpool不会超负荷。
最后推荐大家的设置就是 ThreadPool buff设置为无限大 然后用poolcoresize和taskLimit来控制负荷能力(视不同机器设置不同的值)
欢迎大家拍砖...
分享到:
相关推荐
4. **提交任务**:使用线程池的`schedule`方法提交任务,对于优先级任务,可以使用`pool.schedule_with_priority`,并传入任务和优先级值。例如: ```cpp pool.schedule_with_priority(priority, std::bind(my_...
`threadpool.c`很可能包含了线程池的主体实现,包括线程池的初始化、任务的添加、线程的管理和调度等功能;`threadpool_test.c`则可能是测试代码,用于验证线程池功能的正确性;`threadpool.h`是头文件,可能定义了...
通过线程池,可以有效地管理和控制并发执行的任务数量,避免了频繁地创建和销毁线程带来的开销。 在描述中提供的链接指向了一个CSDN博客文章,虽然具体内容无法直接引用,但通常这类文章会详细讲解线程池的工作原理...
在C#编程中,线程(Thread)、线程池(ThreadPool)和任务(Task)是并行处理和异步操作的重要组成部分。理解它们的工作原理和使用方法对于优化应用程序的性能至关重要。下面将详细阐述这三个概念及其相关知识点。 ...
3. **同步机制**:确保线程安全,如互斥量(`std::mutex`)、条件变量(`std::condition_variable`)和信号量(`std::semaphore`),用于控制对任务队列的访问和通知线程新任务的到来。 4. **提交任务接口**:一个...
在标题"threadpool_informationmcb_threadpool_c++threadpool_"中,"informationmcb"可能代表信息管理和控制块,而"threadpool"显然是指线程池,"c++threadpool"则表明我们关注的是C++实现的线程池。 C++11引入了对...
rust-threadpool, 用于并行任务执行的非常简单的线程池 线程管理线程池,用于在一组固定工作线程上运行许多作业。 用法将这个添加到你的Cargo.toml:[dependencies]threadpool = "1.0"这是你的箱子 root:extern c
C++11是C++语言的一个重要版本更新,它引入了大量的新特性,其中包括对多线程的支持。线程池(ThreadPool)是一种管理...通过合理的线程池设计,开发者可以更好地控制系统的并发行为,提高程序的响应速度和资源利用率。
`concurrent.futures`支持异步执行、取消任务、超时控制等功能,而且它的API设计更加面向对象,易于理解和使用。 ### 五、实际应用示例 在实际应用中,`threadpool`常用于处理大量的I/O密集型任务,如网络请求、...
这种技术可以有效地管理和控制并发执行的任务,提高系统的性能和资源利用率。 在“线程池threadpool_src”中,我们可以看到两个主要的文件:“www.pudn.com.txt”和“ThreadPoolApp”。虽然具体的内容无法在此提供...
本篇文章将围绕“VC++ ThreadPool程序”进行详细讲解,旨在帮助读者理解和掌握如何在VC++环境中构建和使用线程池。 线程池是一种线程使用模式,它预先创建一组线程,等待任务到来时分配给这些线程执行,而不是每次...
`ThreadPool`管理线程的创建和销毁,优化系统资源的使用,尤其适合处理大量短生命周期的任务。本示例将探讨如何在C#中使用`ThreadPool`进行多线程编程。 在C#中,`ThreadPool`类提供了多种方法来调度工作项,其中`...
5. **信号与槽(Signals & Slots)**:QT5的信号与槽机制是线程间通信的关键,任务的提交、执行和完成都会触发相应的信号,工作线程通过连接这些信号来获取和处理任务。 在实际使用中,开发者可以通过调用线程池...
在代码实例中,可能还包含了线程同步和互斥的示例,如synchronized关键字、Lock接口(如ReentrantLock)的使用,以及Semaphore信号量控制并发访问等,这些都是在多线程环境下保证数据安全和程序正确性的关键手段。...
- **ThreadPool.h/cpp**:线程池的核心实现,包括线程池类的声明和实现。 - **Task.h/cpp**:可能定义了一个任务类,封装了任务的执行逻辑。 - **main.cpp**:测试代码,展示了如何使用线程池执行任务。 - **...
1. 资源管理:线程池可以有效地管理和控制系统的并发度,避免过多的线程导致系统资源的浪费。 2. 性能优化:通过线程复用,减少了创建和销毁线程的开销,提高了系统性能。 3. 异常处理:线程池可以提供统一的异常...
本项目"ThreadPool.zip"提供了一个自定义的线程池实现,特别针对Windows平台,它支持普通任务和定时任务,并能确保排队任务的正确释放。 线程池的工作原理: 1. **线程池初始化**:线程池在启动时会预先创建一定...