`
adapterofcoms
  • 浏览: 74682 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

c[not c++]版 ThreadPool base on winnt

阅读更多

QueueUserWorkItem,CreateTimerQueueTimer,BindIoCompletionCallback,RegisterWaitForSingleObject你用过吗,性能如何?CreateThreadpool你想用吗,那你就去安装Windows Vista,or 2008吧.

下面的车轮模型来自java util concurrent's thread pool模型.

作者:一个java程序员.

下面的程序没有安装logger,只是简单的printf.

 

threadpool.h

#ifndef _WHEELSOFT_THREADPOOL_H_
#define _WHEELSOFT_THREADPOOL_H_

 

#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  60*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32


typedef void (* runnable_t)(void);

int initializeThreadPool(void **tpp,int minPoolSize,int maxPoolSize,long  keepAliveTime);
int addTaskToThreadPool(void *tp,runnable_t toRun);
int shutdownThreadPool(void *tp);

 

#endif /* !_WHEELSOFT_THREADPOOL_H_ */

 

queuedthreadpool.c
#include "threadpool.h"
#include "stdio.h"
#include "windows.h"
 
/*QTP:QueuedThreadPool Model*/
 
typedef struct QTPTaskNode{
 runnable_t task;
 struct QTPTaskNode * next;
}QTPTaskNode;

typedef struct {
 volatile int counter;
 QTPTaskNode * head;
 QTPTaskNode * tail;

}QTPTaskQueue;

typedef struct {
 int minPoolSize;
 int maxPoolSize;
 long  keepAliveTime;//wait timeout
 
 volatile int   currentPoolSize;
 volatile int   currentBusyWorkers;
 volatile int shouldTerminate;

 QTPTaskQueue * p_tasks;
 
 CRITICAL_SECTION  putLock;
 CRITICAL_SECTION  takeLock;
 CRITICAL_SECTION  poolLock;
 
 HANDLE takeEvent;
 HANDLE terminateEvent; 

}QueuedThreadPool;

typedef struct {
 int counter;
 CRITICAL_SECTION  mainLock;
 QueuedThreadPool * QTPHandlers[MAX_THREADPOOLS];
 
}QTPMarshall;

static volatile long QTPMarshallerInitToken=0;
static volatile int QTPMarshallerInitedToken=0;

static QTPMarshall QTPMarshaller={0};

static DWORD workerForQueuedThreadPool(LPVOID);

static SYSTEM_INFO QTP_SystemInfoGetter;

 

/*return index+1;*/
static int isAvailableQTP(void * tp){ 
 if(tp!=NULL){
  QTPMarshall * _p=&QTPMarshaller;
  int index=0;
  for(;index<_p->counter;)
   if(_p->QTPHandlers[index++]==tp)
    return index+1;  
 }
 return 0;
}

 

static void * allocQTPFromQTPMarshall(){ 
  QTPMarshall * _p=&QTPMarshaller;
  void *tp=NULL;
  EnterCriticalSection(&_p->mainLock);
  if(_p->counter<MAX_THREADPOOLS){
   tp=malloc(sizeof(QueuedThreadPool));
   _p->QTPHandlers[_p->counter++]=tp;
  }   
  LeaveCriticalSection(&_p->mainLock);  
  return tp;
}


static int deleteQTPFromQTPMarshall(void * tp){
 if(tp!=NULL){
  QTPMarshall * _p=&QTPMarshaller;
  int index=0;
  EnterCriticalSection(&_p->mainLock);
  index=isAvailableQTP(tp);
  if(index--){
   for(;index<_p->counter-1;index++)
   _p->QTPHandlers[index]=_p->QTPHandlers[index+1];
   _p->counter--;
   LeaveCriticalSection(&_p->mainLock);//attention: don't forget LeaveCriticalSection when return.
   return 1;
  }
  LeaveCriticalSection(&_p->mainLock);  
 }
 return 0;
}


static void freeQTPFromQTPMarshall(void * tp){
  deleteQTPFromQTPMarshall(tp);
  free(tp);
}

 

static void adjustQTPLimits(QueuedThreadPool *tp){
  /*adjust size and timeout.*/
  if(tp->maxPoolSize <= 0) {
   tp->maxPoolSize = MAX_THREADS;
        } else if (tp->maxPoolSize < MAX_THREADS_MIN) {           
         tp->maxPoolSize = MAX_THREADS_MIN;
        }
        if(tp->minPoolSize >  tp->maxPoolSize) {
         tp->minPoolSize =  tp->maxPoolSize;
        }
        if(tp->minPoolSize <= 0) {
            if(1 == tp->maxPoolSize) {
             tp->minPoolSize = 1;
            } else {
             tp->minPoolSize = tp->maxPoolSize/2;
            }
        }       
        if(tp->keepAliveTime<MIN_WORKER_WAIT_TIMEOUT)
         tp->keepAliveTime=MIN_WORKER_WAIT_TIMEOUT;
        if(tp->keepAliveTime>MAX_WORKER_WAIT_TIMEOUT)
         tp->keepAliveTime=MAX_WORKER_WAIT_TIMEOUT;       
}

 

typedef struct{
 QueuedThreadPool * tp;
 HANDLE th;
}QTPWORKER_PARAM;

 

static int addQTPWorker(QueuedThreadPool *tp){
 //don't check the tp's state.
 if(tp->currentPoolSize<tp->minPoolSize||
  tp->currentPoolSize<tp->maxPoolSize&&
  (tp->currentPoolSize==tp->currentBusyWorkers||tp->p_tasks->counter>=((int)(QTP_SystemInfoGetter.dwNumberOfProcessors)+1)*tp->currentPoolSize)){
  int _tid; 
  EnterCriticalSection(&tp->poolLock);
  if(tp->currentPoolSize<tp->maxPoolSize){
   QTPWORKER_PARAM * _param=malloc(sizeof(QTPWORKER_PARAM));
   if(_param){
    _param->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)workerForQueuedThreadPool,_param,CREATE_SUSPENDED,&_tid);
    _param->tp=tp;
    ResumeThread(_param->th);
    tp->currentPoolSize++;
   }else{
    LeaveCriticalSection(&tp->poolLock);
    return 0;
   }    

  }  
  LeaveCriticalSection(&tp->poolLock);  
 }  
 return 1;
}

 

int initializeThreadPool(void **tpp,int minPoolSize,int maxPoolSize,long  keepAliveTime){ 
 //init QTPMarshall once.
 if(!InterlockedCompareExchange(&QTPMarshallerInitToken,1,0)){
  GetSystemInfo(&QTP_SystemInfoGetter);
  //QTPMarshaller.counter=0;
  InitializeCriticalSection(&QTPMarshaller.mainLock); 
  QTPMarshallerInitedToken=1;//don't Interlocked ?
 }

 if(tpp==NULL||!QTPMarshallerInitedToken||QTPMarshaller.counter>=MAX_THREADPOOLS)
  return 0;

 {//initialize QTP
  QueuedThreadPool *tp=allocQTPFromQTPMarshall();
  if(tp==NULL)
   return 0;

  tp->shouldTerminate=0;
  tp->minPoolSize=minPoolSize,tp->maxPoolSize=maxPoolSize,tp->keepAliveTime=keepAliveTime,tp->currentPoolSize=tp->currentBusyWorkers=0;
  adjustQTPLimits(tp); 
 
  {//init taskQueue
   QTPTaskNode * nullNode=malloc(sizeof(QTPTaskNode));
   QTPTaskQueue * queue=malloc(sizeof(QTPTaskQueue));  
   
   nullNode->task=NULL,nullNode->next=NULL;
   queue->counter=0,queue->head=queue->tail=nullNode;
   
   tp->p_tasks=queue;   
  }
 
  InitializeCriticalSection(&tp->putLock); 
  InitializeCriticalSection(&tp->takeLock);
  InitializeCriticalSection(&tp->poolLock);
  tp->takeEvent=CreateEvent(NULL,FALSE,FALSE,NULL);
  tp->terminateEvent=CreateEvent(NULL,TRUE,FALSE,NULL); 
      
  *tpp=tp;//last to do this.
  printf("initializeTP error:%d,\n",GetLastError());

 } 
 return 1;
}


static void insertQTPTaskNode(QTPTaskQueue * queue,QTPTaskNode * node){ 
 queue->tail->next=node,queue->tail->task=node->task,queue->tail=node,node->task=NULL;
}

 

static QTPTaskNode * extractQTPTaskNode(QTPTaskQueue * queue){
 QTPTaskNode * res=queue->head;  
 queue->head=res->next;
 res->next=NULL;
 return res;
}


int addTaskToThreadPool(void *tp,runnable_t toRun){
 if(isAvailableQTP(tp)){
  QueuedThreadPool * _tp=(QueuedThreadPool *)tp;  
  int oldCounter=0;  
  QTPTaskNode * task=NULL;
  
  if(_tp->shouldTerminate)
   return 0;
  //addQTPWorker
  addQTPWorker(tp);
  task=malloc(sizeof(QTPTaskNode));//note: free it when it end.
  if(task==NULL) //may be out of memory
   return 0;
  task->task=toRun,task->next=NULL;  
  //put task to taskQueue. must check the QTP's status !
  EnterCriticalSection(&_tp->putLock);
  if(!_tp->shouldTerminate){
   insertQTPTaskNode(_tp->p_tasks,task);
   oldCounter=InterlockedExchangeAdd(&_tp->p_tasks->counter,1);
   LeaveCriticalSection(&_tp->putLock);
  }else{
   LeaveCriticalSection(&_tp->putLock);
   free(task);
   return 0;
  }
  
  if(oldCounter<=0)
   SetEvent(_tp->takeEvent);   
  
  return 1;
 }else
  return 0; 
}


static void clearupQTPTaskQueue(QTPTaskQueue * queue){
 if(queue->counter<=0){
  if(queue->head!=queue->tail||queue->head->next!=NULL||queue->tail->next!=NULL)
  ;//log error:for test

  free(queue->tail);
  queue->head=queue->tail=NULL;
 
 }else{  
  //log error:for test
 }
}


/*it is asynchronous.*/
int shutdownThreadPool(void *tp){
 if(deleteQTPFromQTPMarshall(tp)){
  QueuedThreadPool * _tp=(QueuedThreadPool *)tp;
  
  EnterCriticalSection(&_tp->putLock/*just for prevent putting task .*/);
  _tp->shouldTerminate=1;
  LeaveCriticalSection(&_tp->putLock);

  //notify the last worker to clearup QTP.
  SetEvent(_tp->terminateEvent);
  return 1;
 }else
  return 0;
}

 

//Call it just once for per QTP.
static void clearupQueuedThreadPool(QueuedThreadPool * tp){
 if(tp->shouldTerminate){
 
  clearupQTPTaskQueue(tp->p_tasks);
  free(tp->p_tasks);
  
  DeleteCriticalSection(&tp->putLock);
  DeleteCriticalSection(&tp->takeLock);
  DeleteCriticalSection(&tp->poolLock);
  CloseHandle(tp->takeEvent);
  CloseHandle(tp->terminateEvent);

  freeQTPFromQTPMarshall(tp); 
 }
}

 

DWORD workerForQueuedThreadPool(LPVOID pparam){ 

 QTPWORKER_PARAM * param=(QTPWORKER_PARAM *)pparam;
 QueuedThreadPool * tp=param->tp; 

 QTPTaskNode * _task=NULL;
 HANDLE EVENTS[2];
 int curTaskNum=0,shouldTerminate=0,shouldClearupQTP=0;
 DWORD waitRtn; 
 if(!isAvailableQTP(tp))
  return 0;
 
 EVENTS[0]=tp->terminateEvent,EVENTS[1]=tp->takeEvent;//note the sequence.
 shouldTerminate=tp->shouldTerminate;
 
 while(!shouldTerminate){  
   
  if(GetLastError()!=0){
   printf("curThreadId:%d,err:%d\n",GetCurrentThreadId(),GetLastError());
   //shouldTerminate=1;
   //break;
   exit(0);//for test.
  }   
  
  while(1){  
   EnterCriticalSection(&tp->takeLock);
   if(tp->p_tasks->counter<=0)
   {
    LeaveCriticalSection(&tp->takeLock);    
    waitRtn=WaitForMultipleObjects(2,EVENTS,FALSE,tp->keepAliveTime);

    if(waitRtn==WAIT_OBJECT_0){     
     if(tp->p_tasks->counter>0)
      continue;
     else{
      shouldTerminate=1;
      break;
     }      
      
    }else if(waitRtn==WAIT_OBJECT_0+1){
      continue;
    }else if(waitRtn==WAIT_TIMEOUT){

     if(tp->currentBusyWorkers<=0&&tp->currentPoolSize>tp->minPoolSize){      
      EnterCriticalSection(&tp->poolLock);
      if(tp->currentPoolSize>tp->minPoolSize){
       shouldTerminate=2;
       /*Decrement currentPoolSize and shouldClearupQTP ? ,when timeout check.*/
       if(!--tp->currentPoolSize&&tp->shouldTerminate){
        shouldClearupQTP=1;
       }
       LeaveCriticalSection(&tp->poolLock);
       break;
      }else{
       LeaveCriticalSection(&tp->poolLock);
       continue;
      }      
     }else{
      continue;
     }
     
    }else{
     shouldTerminate=3;
     if(GetLastError()!=0){
      printf("curThreadId:%d,WaitForMultipleObjects_err:%d,waitRtn=%d\n",GetCurrentThreadId(),GetLastError(),waitRtn);
      exit(0);//for test.
     }
     break;
    }//end wait handle.       

   }else{
    //take task to _task
    _task=extractQTPTaskNode(tp->p_tasks); 
    curTaskNum=InterlockedExchangeAdd(&tp->p_tasks->counter,-1);
    LeaveCriticalSection(&tp->takeLock);
    
    if(curTaskNum>=2)
     SetEvent(tp->takeEvent);
    break;
   }
   
  }//end get _task loop;  
  
  //toRun the _task.
  if(_task!=NULL){
   InterlockedIncrement(&tp->currentBusyWorkers);
   addQTPWorker(tp);//adjust
   _task->task();
   free(_task);
   _task=NULL;
   InterlockedDecrement(&tp->currentBusyWorkers);
  }
 
 }//end thread main loop. 
  
 if(shouldTerminate!=2){
  EnterCriticalSection(&tp->poolLock);
  if(!--tp->currentPoolSize&&tp->shouldTerminate)
   shouldClearupQTP=1;
  LeaveCriticalSection(&tp->poolLock); 
 } 

 if(shouldClearupQTP)
  clearupQueuedThreadPool(tp); 

 CloseHandle(param->th);
 free(param);
 //CloseHandle(GetCurrentThread()); //<---it is last error:6
 return 1;
}

 

 

 

1
0
分享到:
评论

相关推荐

    C++11 线程池 ThreadPool

    C++11是C++语言的一个重要版本更新,它引入了大量的新特性,其中包括对多线程的支持。线程池(ThreadPool)是一种管理线程资源的有效方式,它在现代并发编程中扮演着至关重要的角色。线程池允许程序预先创建一组线程...

    threadpool.rar_C 智能指针_Threadpool C++_threadpool + timer_智能指针_线程池

    一个C++线程池库,包括智能指针、线程模拟的定时器

    threadpool_informationmcb_threadpool_c++threadpool_

    在标题"threadpool_informationmcb_threadpool_c++threadpool_"中,"informationmcb"可能代表信息管理和控制块,而"threadpool"显然是指线程池,"c++threadpool"则表明我们关注的是C++实现的线程池。 C++11引入了对...

    C++11 ThreadPool:一个易于使用的C ++ 11线程池。-开源

    一个易于使用的C ++ 11线程池。 使用ThreadPool类对自由函数进行排队,并使用std :: for_each()和std :: transform()的并行版本。 可配置为仅用于标头或与库一起使用。 有许多用法示例。

    ThreadPool C++实现

    在给定的"ThreadPool"文件中,可能包含了实现这样一个线程池的代码,包括上述各个组件的详细实现。通过对这些代码的分析和学习,可以深入了解线程池的工作原理,以及如何在C++中有效地利用多线程技术。

    Linux下面C++实现的threadpool代码

    在"Linux下面C++实现的threadpool代码"中,我们可以从以下几个方面来理解其核心知识点: 1. **线程池概念**:线程池是由多个预创建的线程组成的集合,这些线程等待执行由线程池管理者分配的任务。管理者将新任务放...

    C++ 实现线程池ThreadPool

    线程池(ThreadPool)是多线程编程中的一个重要概念,它是一种线程使用模式,用于高效地管理和调度线程资源。在C++中实现线程池可以帮助开发者优化并发执行的任务,减少线程创建和销毁的开销,提高系统效率。下面...

    threadpool 线程池 C语言版

    "threadpool 线程池 C语言版"是针对C语言实现的一种线程池模型,其核心在于任务队列和线程组的管理,以实现线程的复用和优化。 线程池的工作原理是预先创建一定数量的线程,这些线程在池中待命,等待执行任务。当有...

    线程池(C/C++版)

    在`threadpool_C++`和`threadpool_c`这两个文件中,可能包含了线程池的C++和C语言实现。C++版本可能利用了STL库提供的容器和算法,而C语言版本则可能更多地依赖于`pthread`库的原始API。这两种实现都可能涉及到线程...

    C++简单线程池例子

    在C++编程中,线程池是一种管理线程资源的有效方式,它可以帮助我们高效地调度和执行并发任务,避免频繁创建和销毁线程带来的开销。这个“C++简单线程池例子”提供了实现线程池的一个实例,通过分析`...

    C++ OOP ThreadPool-开源

    **C++ OOP ThreadPool 开源实现详解** C++中的线程池(ThreadPool)是一种高效的并发编程模式,它通过预先创建并维护一组可重用的工作线程来管理任务的执行,而不是为每个任务创建新的线程。这样的设计可以避免频繁...

    threadpool.h

    c++11封装的项目中使用的线程池,lambda作为线程函数,可以使用任意形式的线程回调函数。单例模式。欢迎下载,评论

    ThreadPool.zip

    《深入理解多线程编程:ThreadPool.zip实例解析》 在计算机科学中,多线程编程是一种重要的技术,它允许多个任务在同一时间执行,从而提高了程序的效率和响应性。在Java等编程语言中,ThreadPool是实现多线程处理的...

    c++实现的线程池源代码threadpool

    在标题提到的"C++实现的线程池源代码threadpool"中,我们可以看到三个压缩包文件:`boost_1_32_0.rar`、`ThreadPoolDemo.rar`、`thread_pool.rar`,它们可能包含了不同实现线程池的C++源代码。 首先,`boost_1_32_0...

    多线程精品资源--based on C++11 , a mini threadpool , accept varia.zip

    本资源包“多线程精品资源--based on C++11 , a mini threadpool , accept varia.zip”显然聚焦于C++11的多线程特性,并提供了一个小型的线程池实现。 首先,我们来深入理解C++11中的多线程。C++11标准库中的`...

    C++ 等待线程结束

    在C++编程中,线程同步是一个至关重要的概念,特别是在多线程编程中。当一个程序包含多个执行路径,即线程,有时我们需要确保某个线程执行完毕后再进行下一步操作,这就涉及到“等待线程结束”的功能。本篇文章将...

    基于C++使用 epoll + threadpool 实现的 webServer,支持GET、POST.zip

    基于C++使用 epoll + threadpool 实现的 webServer,支持GET、POST C++是一种广泛使用的编程语言,它是由Bjarne Stroustrup于1979年在新泽西州美利山贝尔实验室开始设计开发的。C++是C语言的扩展,旨在提供更...

    threadpool

    在给定的文件中,我们看到`threadpool.c`、`threadpool_test.c`、`threadpool.h`和`Makefile`,这表明是一个C语言实现的线程池项目。`threadpool.c`很可能包含了线程池的主体实现,包括线程池的初始化、任务的添加、...

    Threadpool:轻便,快速,适用于C ++ 20的线程池

    riften::Thiefpool 面向C ++ 20的超快速,轻量级,可窃取工作的线程池。 建立在无锁并发 。用法# include " riften/thiefpool.hpp "// Create thread pool with 4 worker threads.riften::Thiefpool pool ( 4 );// ...

    ThreadPool

    标题中的"ThreadPool"指的是线程池,这是一个编程概念,特别是在多线程编程中非常关键。线程池是一种线程使用模式,它维护着一个工作线程的集合,用于执行一系列的任务。通过线程池,可以有效地管理和控制并发执行的...

Global site tag (gtag.js) - Google Analytics