QueueUserWorkItem,CreateTimerQueueTimer,BindIoCompletionCallback,RegisterWaitForSingleObject你用过吗,性能如何?CreateThreadpool你想用吗,那你就去安装Windows Vista,or 2008吧.
下面的车轮模型来自java util concurrent's thread pool模型.
作者:一个java程序员.
下面的程序没有安装logger,只是简单的printf.
threadpool.h
#ifndef _WHEELSOFT_THREADPOOL_H_
#define _WHEELSOFT_THREADPOOL_H_
#define MAX_THREADS 100
#define MAX_THREADS_MIN 10
#define MIN_WORKER_WAIT_TIMEOUT 60*1000
#define MAX_WORKER_WAIT_TIMEOUT 60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS 32
typedef void (* runnable_t)(void);
int initializeThreadPool(void **tpp,int minPoolSize,int maxPoolSize,long keepAliveTime);
int addTaskToThreadPool(void *tp,runnable_t toRun);
int shutdownThreadPool(void *tp);
#endif /* !_WHEELSOFT_THREADPOOL_H_ */
queuedthreadpool.c
#include "threadpool.h"
#include "stdio.h"
#include "windows.h"
/*QTP:QueuedThreadPool Model*/
typedef struct QTPTaskNode{
runnable_t task;
struct QTPTaskNode * next;
}QTPTaskNode;
typedef struct {
volatile int counter;
QTPTaskNode * head;
QTPTaskNode * tail;
}QTPTaskQueue;
typedef struct {
int minPoolSize;
int maxPoolSize;
long keepAliveTime;//wait timeout
volatile int currentPoolSize;
volatile int currentBusyWorkers;
volatile int shouldTerminate;
QTPTaskQueue * p_tasks;
CRITICAL_SECTION putLock;
CRITICAL_SECTION takeLock;
CRITICAL_SECTION poolLock;
HANDLE takeEvent;
HANDLE terminateEvent;
}QueuedThreadPool;
typedef struct {
int counter;
CRITICAL_SECTION mainLock;
QueuedThreadPool * QTPHandlers[MAX_THREADPOOLS];
}QTPMarshall;
static volatile long QTPMarshallerInitToken=0;
static volatile int QTPMarshallerInitedToken=0;
static QTPMarshall QTPMarshaller={0};
static DWORD workerForQueuedThreadPool(LPVOID);
static SYSTEM_INFO QTP_SystemInfoGetter;
/*return index+1;*/
static int isAvailableQTP(void * tp){
if(tp!=NULL){
QTPMarshall * _p=&QTPMarshaller;
int index=0;
for(;index<_p->counter;)
if(_p->QTPHandlers[index++]==tp)
return index+1;
}
return 0;
}
static void * allocQTPFromQTPMarshall(){
QTPMarshall * _p=&QTPMarshaller;
void *tp=NULL;
EnterCriticalSection(&_p->mainLock);
if(_p->counter<MAX_THREADPOOLS){
tp=malloc(sizeof(QueuedThreadPool));
_p->QTPHandlers[_p->counter++]=tp;
}
LeaveCriticalSection(&_p->mainLock);
return tp;
}
static int deleteQTPFromQTPMarshall(void * tp){
if(tp!=NULL){
QTPMarshall * _p=&QTPMarshaller;
int index=0;
EnterCriticalSection(&_p->mainLock);
index=isAvailableQTP(tp);
if(index--){
for(;index<_p->counter-1;index++)
_p->QTPHandlers[index]=_p->QTPHandlers[index+1];
_p->counter--;
LeaveCriticalSection(&_p->mainLock);//attention: don't forget LeaveCriticalSection when return.
return 1;
}
LeaveCriticalSection(&_p->mainLock);
}
return 0;
}
static void freeQTPFromQTPMarshall(void * tp){
deleteQTPFromQTPMarshall(tp);
free(tp);
}
static void adjustQTPLimits(QueuedThreadPool *tp){
/*adjust size and timeout.*/
if(tp->maxPoolSize <= 0) {
tp->maxPoolSize = MAX_THREADS;
} else if (tp->maxPoolSize < MAX_THREADS_MIN) {
tp->maxPoolSize = MAX_THREADS_MIN;
}
if(tp->minPoolSize > tp->maxPoolSize) {
tp->minPoolSize = tp->maxPoolSize;
}
if(tp->minPoolSize <= 0) {
if(1 == tp->maxPoolSize) {
tp->minPoolSize = 1;
} else {
tp->minPoolSize = tp->maxPoolSize/2;
}
}
if(tp->keepAliveTime<MIN_WORKER_WAIT_TIMEOUT)
tp->keepAliveTime=MIN_WORKER_WAIT_TIMEOUT;
if(tp->keepAliveTime>MAX_WORKER_WAIT_TIMEOUT)
tp->keepAliveTime=MAX_WORKER_WAIT_TIMEOUT;
}
typedef struct{
QueuedThreadPool * tp;
HANDLE th;
}QTPWORKER_PARAM;
static int addQTPWorker(QueuedThreadPool *tp){
//don't check the tp's state.
if(tp->currentPoolSize<tp->minPoolSize||
tp->currentPoolSize<tp->maxPoolSize&&
(tp->currentPoolSize==tp->currentBusyWorkers||tp->p_tasks->counter>=((int)(QTP_SystemInfoGetter.dwNumberOfProcessors)+1)*tp->currentPoolSize)){
int _tid;
EnterCriticalSection(&tp->poolLock);
if(tp->currentPoolSize<tp->maxPoolSize){
QTPWORKER_PARAM * _param=malloc(sizeof(QTPWORKER_PARAM));
if(_param){
_param->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)workerForQueuedThreadPool,_param,CREATE_SUSPENDED,&_tid);
_param->tp=tp;
ResumeThread(_param->th);
tp->currentPoolSize++;
}else{
LeaveCriticalSection(&tp->poolLock);
return 0;
}
}
LeaveCriticalSection(&tp->poolLock);
}
return 1;
}
int initializeThreadPool(void **tpp,int minPoolSize,int maxPoolSize,long keepAliveTime){
//init QTPMarshall once.
if(!InterlockedCompareExchange(&QTPMarshallerInitToken,1,0)){
GetSystemInfo(&QTP_SystemInfoGetter);
//QTPMarshaller.counter=0;
InitializeCriticalSection(&QTPMarshaller.mainLock);
QTPMarshallerInitedToken=1;//don't Interlocked ?
}
if(tpp==NULL||!QTPMarshallerInitedToken||QTPMarshaller.counter>=MAX_THREADPOOLS)
return 0;
{//initialize QTP
QueuedThreadPool *tp=allocQTPFromQTPMarshall();
if(tp==NULL)
return 0;
tp->shouldTerminate=0;
tp->minPoolSize=minPoolSize,tp->maxPoolSize=maxPoolSize,tp->keepAliveTime=keepAliveTime,tp->currentPoolSize=tp->currentBusyWorkers=0;
adjustQTPLimits(tp);
{//init taskQueue
QTPTaskNode * nullNode=malloc(sizeof(QTPTaskNode));
QTPTaskQueue * queue=malloc(sizeof(QTPTaskQueue));
nullNode->task=NULL,nullNode->next=NULL;
queue->counter=0,queue->head=queue->tail=nullNode;
tp->p_tasks=queue;
}
InitializeCriticalSection(&tp->putLock);
InitializeCriticalSection(&tp->takeLock);
InitializeCriticalSection(&tp->poolLock);
tp->takeEvent=CreateEvent(NULL,FALSE,FALSE,NULL);
tp->terminateEvent=CreateEvent(NULL,TRUE,FALSE,NULL);
*tpp=tp;//last to do this.
printf("initializeTP error:%d,\n",GetLastError());
}
return 1;
}
static void insertQTPTaskNode(QTPTaskQueue * queue,QTPTaskNode * node){
queue->tail->next=node,queue->tail->task=node->task,queue->tail=node,node->task=NULL;
}
static QTPTaskNode * extractQTPTaskNode(QTPTaskQueue * queue){
QTPTaskNode * res=queue->head;
queue->head=res->next;
res->next=NULL;
return res;
}
int addTaskToThreadPool(void *tp,runnable_t toRun){
if(isAvailableQTP(tp)){
QueuedThreadPool * _tp=(QueuedThreadPool *)tp;
int oldCounter=0;
QTPTaskNode * task=NULL;
if(_tp->shouldTerminate)
return 0;
//addQTPWorker
addQTPWorker(tp);
task=malloc(sizeof(QTPTaskNode));//note: free it when it end.
if(task==NULL) //may be out of memory
return 0;
task->task=toRun,task->next=NULL;
//put task to taskQueue. must check the QTP's status !
EnterCriticalSection(&_tp->putLock);
if(!_tp->shouldTerminate){
insertQTPTaskNode(_tp->p_tasks,task);
oldCounter=InterlockedExchangeAdd(&_tp->p_tasks->counter,1);
LeaveCriticalSection(&_tp->putLock);
}else{
LeaveCriticalSection(&_tp->putLock);
free(task);
return 0;
}
if(oldCounter<=0)
SetEvent(_tp->takeEvent);
return 1;
}else
return 0;
}
static void clearupQTPTaskQueue(QTPTaskQueue * queue){
if(queue->counter<=0){
if(queue->head!=queue->tail||queue->head->next!=NULL||queue->tail->next!=NULL)
;//log error:for test
free(queue->tail);
queue->head=queue->tail=NULL;
}else{
//log error:for test
}
}
/*it is asynchronous.*/
int shutdownThreadPool(void *tp){
if(deleteQTPFromQTPMarshall(tp)){
QueuedThreadPool * _tp=(QueuedThreadPool *)tp;
EnterCriticalSection(&_tp->putLock/*just for prevent putting task .*/);
_tp->shouldTerminate=1;
LeaveCriticalSection(&_tp->putLock);
//notify the last worker to clearup QTP.
SetEvent(_tp->terminateEvent);
return 1;
}else
return 0;
}
//Call it just once for per QTP.
static void clearupQueuedThreadPool(QueuedThreadPool * tp){
if(tp->shouldTerminate){
clearupQTPTaskQueue(tp->p_tasks);
free(tp->p_tasks);
DeleteCriticalSection(&tp->putLock);
DeleteCriticalSection(&tp->takeLock);
DeleteCriticalSection(&tp->poolLock);
CloseHandle(tp->takeEvent);
CloseHandle(tp->terminateEvent);
freeQTPFromQTPMarshall(tp);
}
}
DWORD workerForQueuedThreadPool(LPVOID pparam){
QTPWORKER_PARAM * param=(QTPWORKER_PARAM *)pparam;
QueuedThreadPool * tp=param->tp;
QTPTaskNode * _task=NULL;
HANDLE EVENTS[2];
int curTaskNum=0,shouldTerminate=0,shouldClearupQTP=0;
DWORD waitRtn;
if(!isAvailableQTP(tp))
return 0;
EVENTS[0]=tp->terminateEvent,EVENTS[1]=tp->takeEvent;//note the sequence.
shouldTerminate=tp->shouldTerminate;
while(!shouldTerminate){
if(GetLastError()!=0){
printf("curThreadId:%d,err:%d\n",GetCurrentThreadId(),GetLastError());
//shouldTerminate=1;
//break;
exit(0);//for test.
}
while(1){
EnterCriticalSection(&tp->takeLock);
if(tp->p_tasks->counter<=0)
{
LeaveCriticalSection(&tp->takeLock);
waitRtn=WaitForMultipleObjects(2,EVENTS,FALSE,tp->keepAliveTime);
if(waitRtn==WAIT_OBJECT_0){
if(tp->p_tasks->counter>0)
continue;
else{
shouldTerminate=1;
break;
}
}else if(waitRtn==WAIT_OBJECT_0+1){
continue;
}else if(waitRtn==WAIT_TIMEOUT){
if(tp->currentBusyWorkers<=0&&tp->currentPoolSize>tp->minPoolSize){
EnterCriticalSection(&tp->poolLock);
if(tp->currentPoolSize>tp->minPoolSize){
shouldTerminate=2;
/*Decrement currentPoolSize and shouldClearupQTP ? ,when timeout check.*/
if(!--tp->currentPoolSize&&tp->shouldTerminate){
shouldClearupQTP=1;
}
LeaveCriticalSection(&tp->poolLock);
break;
}else{
LeaveCriticalSection(&tp->poolLock);
continue;
}
}else{
continue;
}
}else{
shouldTerminate=3;
if(GetLastError()!=0){
printf("curThreadId:%d,WaitForMultipleObjects_err:%d,waitRtn=%d\n",GetCurrentThreadId(),GetLastError(),waitRtn);
exit(0);//for test.
}
break;
}//end wait handle.
}else{
//take task to _task
_task=extractQTPTaskNode(tp->p_tasks);
curTaskNum=InterlockedExchangeAdd(&tp->p_tasks->counter,-1);
LeaveCriticalSection(&tp->takeLock);
if(curTaskNum>=2)
SetEvent(tp->takeEvent);
break;
}
}//end get _task loop;
//toRun the _task.
if(_task!=NULL){
InterlockedIncrement(&tp->currentBusyWorkers);
addQTPWorker(tp);//adjust
_task->task();
free(_task);
_task=NULL;
InterlockedDecrement(&tp->currentBusyWorkers);
}
}//end thread main loop.
if(shouldTerminate!=2){
EnterCriticalSection(&tp->poolLock);
if(!--tp->currentPoolSize&&tp->shouldTerminate)
shouldClearupQTP=1;
LeaveCriticalSection(&tp->poolLock);
}
if(shouldClearupQTP)
clearupQueuedThreadPool(tp);
CloseHandle(param->th);
free(param);
//CloseHandle(GetCurrentThread()); //<---it is last error:6
return 1;
}
分享到:
相关推荐
C++11是C++语言的一个重要版本更新,它引入了大量的新特性,其中包括对多线程的支持。线程池(ThreadPool)是一种管理线程资源的有效方式,它在现代并发编程中扮演着至关重要的角色。线程池允许程序预先创建一组线程...
一个C++线程池库,包括智能指针、线程模拟的定时器
在标题"threadpool_informationmcb_threadpool_c++threadpool_"中,"informationmcb"可能代表信息管理和控制块,而"threadpool"显然是指线程池,"c++threadpool"则表明我们关注的是C++实现的线程池。 C++11引入了对...
一个易于使用的C ++ 11线程池。 使用ThreadPool类对自由函数进行排队,并使用std :: for_each()和std :: transform()的并行版本。 可配置为仅用于标头或与库一起使用。 有许多用法示例。
在给定的"ThreadPool"文件中,可能包含了实现这样一个线程池的代码,包括上述各个组件的详细实现。通过对这些代码的分析和学习,可以深入了解线程池的工作原理,以及如何在C++中有效地利用多线程技术。
在"Linux下面C++实现的threadpool代码"中,我们可以从以下几个方面来理解其核心知识点: 1. **线程池概念**:线程池是由多个预创建的线程组成的集合,这些线程等待执行由线程池管理者分配的任务。管理者将新任务放...
线程池(ThreadPool)是多线程编程中的一个重要概念,它是一种线程使用模式,用于高效地管理和调度线程资源。在C++中实现线程池可以帮助开发者优化并发执行的任务,减少线程创建和销毁的开销,提高系统效率。下面...
"threadpool 线程池 C语言版"是针对C语言实现的一种线程池模型,其核心在于任务队列和线程组的管理,以实现线程的复用和优化。 线程池的工作原理是预先创建一定数量的线程,这些线程在池中待命,等待执行任务。当有...
在`threadpool_C++`和`threadpool_c`这两个文件中,可能包含了线程池的C++和C语言实现。C++版本可能利用了STL库提供的容器和算法,而C语言版本则可能更多地依赖于`pthread`库的原始API。这两种实现都可能涉及到线程...
在C++编程中,线程池是一种管理线程资源的有效方式,它可以帮助我们高效地调度和执行并发任务,避免频繁创建和销毁线程带来的开销。这个“C++简单线程池例子”提供了实现线程池的一个实例,通过分析`...
**C++ OOP ThreadPool 开源实现详解** C++中的线程池(ThreadPool)是一种高效的并发编程模式,它通过预先创建并维护一组可重用的工作线程来管理任务的执行,而不是为每个任务创建新的线程。这样的设计可以避免频繁...
c++11封装的项目中使用的线程池,lambda作为线程函数,可以使用任意形式的线程回调函数。单例模式。欢迎下载,评论
《深入理解多线程编程:ThreadPool.zip实例解析》 在计算机科学中,多线程编程是一种重要的技术,它允许多个任务在同一时间执行,从而提高了程序的效率和响应性。在Java等编程语言中,ThreadPool是实现多线程处理的...
在标题提到的"C++实现的线程池源代码threadpool"中,我们可以看到三个压缩包文件:`boost_1_32_0.rar`、`ThreadPoolDemo.rar`、`thread_pool.rar`,它们可能包含了不同实现线程池的C++源代码。 首先,`boost_1_32_0...
本资源包“多线程精品资源--based on C++11 , a mini threadpool , accept varia.zip”显然聚焦于C++11的多线程特性,并提供了一个小型的线程池实现。 首先,我们来深入理解C++11中的多线程。C++11标准库中的`...
在C++编程中,线程同步是一个至关重要的概念,特别是在多线程编程中。当一个程序包含多个执行路径,即线程,有时我们需要确保某个线程执行完毕后再进行下一步操作,这就涉及到“等待线程结束”的功能。本篇文章将...
基于C++使用 epoll + threadpool 实现的 webServer,支持GET、POST C++是一种广泛使用的编程语言,它是由Bjarne Stroustrup于1979年在新泽西州美利山贝尔实验室开始设计开发的。C++是C语言的扩展,旨在提供更...
在给定的文件中,我们看到`threadpool.c`、`threadpool_test.c`、`threadpool.h`和`Makefile`,这表明是一个C语言实现的线程池项目。`threadpool.c`很可能包含了线程池的主体实现,包括线程池的初始化、任务的添加、...
riften::Thiefpool 面向C ++ 20的超快速,轻量级,可窃取工作的线程池。 建立在无锁并发 。用法# include " riften/thiefpool.hpp "// Create thread pool with 4 worker threads.riften::Thiefpool pool ( 4 );// ...
标题中的"ThreadPool"指的是线程池,这是一个编程概念,特别是在多线程编程中非常关键。线程池是一种线程使用模式,它维护着一个工作线程的集合,用于执行一系列的任务。通过线程池,可以有效地管理和控制并发执行的...