`
Missing1984
  • 浏览: 12885 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

ThreadPool,ObjectPool和任务控制

阅读更多

New a threadPool

ThreadPoolExecutor executorService = new ThreadPoolExecutor(
				poolCoreSize, poolMaxSize, timeout,
				java.util.concurrent.TimeUnit.MILLISECONDS, buff, handler);

 

poolcoreSize:  Pool至少运行的线程数量

poolMaxSize:   Pool最多运行的线程数量

timeout:空闲线程发呆时间,如果某线程发呆时间>timeout 而当前线程数量>poolcoreSize,就会终止该线程

buff:线程池所用的缓冲队列 实现BlockingQueue的那几个都能用,能设置定长和无限长度

handler: 当线程池线程数到达poolMaxSize buff 又满了,有新的任务丢到线程池的时候就会call这个hander进行处理.默认提供了几种处理方式,也可以自己实现,例如

public class PoolRejectHandler implements RejectedExecutionHandler {

    /* (non-Javadoc)
     * @see java.util.concurrent.RejectedExecutionHandler#rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
     */
    public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
       //log process is slower then produce
       
       //put back to inqueue
        BaseTask task = (BaseTask)arg0;
       
        //task.pushToInQueue(task.getInObject());
        System.out.println("!!!!!!overload!!!");
        try {
            task.getInQueue().put(task.getInObject());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }    
}

 这里我把这个task丢回一开始的任务队列了,这里可以实现自己的处理逻辑

 

New and excute  task

任何实现runnable 或者callable接口的类 都可以作为一个任务丢到线程池里面运行

executorService.execute(processTask);

 

ThreadPool 调度策略

条件: 线程数<poolcoreSize, buff未满,有新的任务提交   结果:新建线程处理这个任务

条件: 线程数>=poolcoreSize, buff未满,有新的任务提交  结果:把任务丢到buff对列中

条件: poolMaxSize>线程数>=poolcoreSize, buff满,有新的任务提交  结果:新建线程

条件: poolMaxSize=线程数>poolcoreSize, buff满,有新的任务提交  结果:调用handler处理这个任务

 

 

这里需要讨论的就是最后那个情况,虽然threadpool提供了handler来处理这种超出负荷的情况,但是如果能从源头就直接控制提交到线程池的任务数量,感觉上会好很多

 

 

这里我用到一个Manager类和ObjectPool来处理任务的提交

 

Manager类实现Runnable 和 DispatchListener 接口 ,在Runnable里面处理任务生成和分发,DispatchListener接口中实现任务对象的回收和结果收集(示意代码)

public class DispathManager implements Runnable, DispatchListener {
	public void run() {
		while (true) {
			try {
				
				// get task from taskobjectpool. will wait here if the active taskobjects reach the tasklimit
				// so we can  control the limit of running task in the threadpool  
				BaseTask processTask = (BaseTask) taskPool.borrowObject();


				//set task params
				processTask.setListener(this);


				// put in threadpool  
				executorService.execute(processTask);
			} catch (InterruptedException e) {
				break;
			} catch (Exception e) {
				// log dispatch queue exception
				e.printStackTrace();
			}
		}
	}

	public void recycleTask(BaseTask baseTask) {
		// TODO Auto-generated method stub
		try {
			taskPool.returnObject(baseTask);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

然后在Task执行完毕的时候进行回收

    public void run() {
        try {
	//task logic here

            // return task
            listener.recycleTask(this);
        } catch (Exception e) {
            listener.onTaskError(e, inObject);
        }
    }

 

这样就不必每次都重新new 这个task 对一些重型的task,会提高性能

 

这里用到了ObjectPool,是Apache Common下的管理ObjectPool的类,使用如下:

 

 

New a ObjectPool

ObjectPool taskPool = new GenericObjectPool(new TaskPoolFactory(taskClass), taskLimit,
				GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);

 

其中TaskPoolFactory如下 taskClass是重配置文件读入的

public class TaskPoolFactory extends BasePoolableObjectFactory {

    /* (non-Javadoc)
     * @see org.apache.commons.pool.BasePoolableObjectFactory#makeObject()
     */
    private Class c;
    
    public TaskPoolFactory(Class c) {
        super();
        this.c = c;
    }
    @Override
    public Object makeObject() throws Exception {
        // TODO Auto-generated method stub
        return c.newInstance();
    }
}

 

taskLimit是允许借出的最大的数目(借出但是没有归还)

当借出的数目已经达到最大,这个时候还有借出请求 GenericObjectPool.WHEN_EXHAUSTED_BLOCK 就提供了策略(还有几种的) 这里设置为无限等待,也就是线程会wait在这里 直到有借出的task被归还

 

Objectpool使用参照Manager和task类

 

最后, 控制taskLimit的大小 就可以控制ThreadPool中处理任务的多少,也就是正在处理的task+buff里的task 必定<= taskLimit,这样就在源头控制了提交任务的数量,保证threadpool不会超负荷。

 

 

最后推荐大家的设置就是 ThreadPool buff设置为无限大 然后用poolcoresize和taskLimit来控制负荷能力(视不同机器设置不同的值)

 

 

 

欢迎大家拍砖...

0
3
分享到:
评论

相关推荐

    Boost threadpool优先级实例

    4. **提交任务**:使用线程池的`schedule`方法提交任务,对于优先级任务,可以使用`pool.schedule_with_priority`,并传入任务和优先级值。例如: ```cpp pool.schedule_with_priority(priority, std::bind(my_...

    threadpool

    `threadpool.c`很可能包含了线程池的主体实现,包括线程池的初始化、任务的添加、线程的管理和调度等功能;`threadpool_test.c`则可能是测试代码,用于验证线程池功能的正确性;`threadpool.h`是头文件,可能定义了...

    ThreadPool

    通过线程池,可以有效地管理和控制并发执行的任务数量,避免了频繁地创建和销毁线程带来的开销。 在描述中提供的链接指向了一个CSDN博客文章,虽然具体内容无法直接引用,但通常这类文章会详细讲解线程池的工作原理...

    C# Thread、ThreadPool、Task测试

    在C#编程中,线程(Thread)、线程池(ThreadPool)和任务(Task)是并行处理和异步操作的重要组成部分。理解它们的工作原理和使用方法对于优化应用程序的性能至关重要。下面将详细阐述这三个概念及其相关知识点。 ...

    threadpool.tar.gz

    3. **同步机制**:确保线程安全,如互斥量(`std::mutex`)、条件变量(`std::condition_variable`)和信号量(`std::semaphore`),用于控制对任务队列的访问和通知线程新任务的到来。 4. **提交任务接口**:一个...

    threadpool_informationmcb_threadpool_c++threadpool_

    在标题"threadpool_informationmcb_threadpool_c++threadpool_"中,"informationmcb"可能代表信息管理和控制块,而"threadpool"显然是指线程池,"c++threadpool"则表明我们关注的是C++实现的线程池。 C++11引入了对...

    rust-threadpool, 用于并行任务执行的非常简单的线程池.zip

    rust-threadpool, 用于并行任务执行的非常简单的线程池 线程管理线程池,用于在一组固定工作线程上运行许多作业。 用法将这个添加到你的Cargo.toml:[dependencies]threadpool = "1.0"这是你的箱子 root:extern c

    C++11 线程池 ThreadPool

    C++11是C++语言的一个重要版本更新,它引入了大量的新特性,其中包括对多线程的支持。线程池(ThreadPool)是一种管理...通过合理的线程池设计,开发者可以更好地控制系统的并发行为,提高程序的响应速度和资源利用率。

    python threadpool

    `concurrent.futures`支持异步执行、取消任务、超时控制等功能,而且它的API设计更加面向对象,易于理解和使用。 ### 五、实际应用示例 在实际应用中,`threadpool`常用于处理大量的I/O密集型任务,如网络请求、...

    线程池threadpool_src

    这种技术可以有效地管理和控制并发执行的任务,提高系统的性能和资源利用率。 在“线程池threadpool_src”中,我们可以看到两个主要的文件:“www.pudn.com.txt”和“ThreadPoolApp”。虽然具体的内容无法在此提供...

    VC++ThreadPool程序

    本篇文章将围绕“VC++ ThreadPool程序”进行详细讲解,旨在帮助读者理解和掌握如何在VC++环境中构建和使用线程池。 线程池是一种线程使用模式,它预先创建一组线程,等待任务到来时分配给这些线程执行,而不是每次...

    C# ThreadPool 多线程 代码示例

    `ThreadPool`管理线程的创建和销毁,优化系统资源的使用,尤其适合处理大量短生命周期的任务。本示例将探讨如何在C#中使用`ThreadPool`进行多线程编程。 在C#中,`ThreadPool`类提供了多种方法来调度工作项,其中`...

    QT_ThreadPool.rar

    5. **信号与槽(Signals & Slots)**:QT5的信号与槽机制是线程间通信的关键,任务的提交、执行和完成都会触发相应的信号,工作线程通过连接这些信号来获取和处理任务。 在实际使用中,开发者可以通过调用线程池...

    ThreadPool.zip

    在代码实例中,可能还包含了线程同步和互斥的示例,如synchronized关键字、Lock接口(如ReentrantLock)的使用,以及Semaphore信号量控制并发访问等,这些都是在多线程环境下保证数据安全和程序正确性的关键手段。...

    ThreadPool-master.zip

    - **ThreadPool.h/cpp**:线程池的核心实现,包括线程池类的声明和实现。 - **Task.h/cpp**:可能定义了一个任务类,封装了任务的执行逻辑。 - **main.cpp**:测试代码,展示了如何使用线程池执行任务。 - **...

    Threadpool

    1. 资源管理:线程池可以有效地管理和控制系统的并发度,避免过多的线程导致系统资源的浪费。 2. 性能优化:通过线程复用,减少了创建和销毁线程的开销,提高了系统性能。 3. 异常处理:线程池可以提供统一的异常...

    windows 版本的一个线程池实现,支持普通任务,定时任务,可以保证排队的任务正确释放-ThreadPool.zip

    本项目"ThreadPool.zip"提供了一个自定义的线程池实现,特别针对Windows平台,它支持普通任务和定时任务,并能确保排队任务的正确释放。 线程池的工作原理: 1. **线程池初始化**:线程池在启动时会预先创建一定...

Global site tag (gtag.js) - Google Analytics