先搞个基于windows的线程池设计
基本设计思路:
执行线程
while(true)
{
if(queue.hasJob()){//queue是一个同步队列,因此如果队列空,就持续等待
fetch job;
//选择合适线程---------------à这一步,在windows下可以直接使用QueueUserWorkItem(XP以后),//这里是线程池的关键
Thead.dojob();
}
}
- QueueUserWorkItem:msdn上叙述如下:
This topic describes the original thread pool API. The thread pool API introduced in Windows Vista is simpler, more reliable, has better performance, and provides more flexibility for developers. For information on the current thread pool API, see Thread Pools.
QueueUserWorkItem属于原始的线程池API,vista之后,有更加简单的,可靠的,更高性能,且对开发者提供易于扩展的线程池。
The thread pool is created the first time you call QueueUserWorkItem or BindIoCompletionCallback, or when a timer-queue timer or registered wait operation queues a callback function. By default, the number of threads that can be created in the thread pool is about 500. Each thread uses the default stack size and runs at the default priority.
这里说明,线程池是在第一次QueueUserWorkItem或者BindIoCompletionCallback或者timer-queue timer or registered wait operation调度一个回调函数创建的。缺省情况下,线程池大小为500,每个线程池缺省栈大小和优先级。
-
实现一个等待同步队列
上面的queue是一个同步队列,采用循环队列实现
//具体某个作业 struct CJob{ INT64 id; void* m_data; PTHREAD_PROC m_pFunc; public: CJob() { } CJob(PTHREAD_PROC pFunc,void *data){ this->m_pFunc =pFunc; this->m_data = data; } virtual void run(){ m_pFunc(m_data); } }; //基于临界区的读写锁,vista后,msdn推荐用SRWLock class RWLock{ CRITICAL_SECTION m_mutex; public: RWLock() { InitializeCriticalSectionAndSpinCount(&m_mutex, 1000);//自旋锁临界区,多核 } ~RWLock() { DeleteCriticalSection(&m_mutex); } BOOL tryAndLock(int count=0){//尝试多少次获得锁 int i=0; BOOL bRet = FALSE; do{ bRet = TryEnterCriticalSection(&m_mutex); if(bRet == TRUE) return TRUE; ::Sleep(0); i++; }while(i<count); return FALSE; } void tryAndRelease(){ LeaveCriticalSection(&m_mutex); } void lock(){ ::EnterCriticalSection(&m_mutex); } void unLock(){ ::LeaveCriticalSection(&m_mutex); } }; //封装的信号量 class Semaphore{ HANDLE m_sem; public: explicit Semaphore(int initCount,int maxCount){ m_sem = ::CreateSemaphore(NULL,initCount,maxCount,NULL); } ~Semaphore(){ ::CloseHandle(m_sem); } void V(){ ReleaseSemaphore(m_sem,1,NULL); } void P() { DWORD dwRet = ::WaitForSingleObject(m_sem,INFINITE); switch(dwRet) { case WAIT_ABANDONED: break; case WAIT_OBJECT_0 : break; case WAIT_TIMEOUT: break; case WAIT_FAILED : break; } } }; //存储请求需要的同步队列,现有的都是阻塞模型,尚未添加非阻塞模型 class CBlockQueue{ Semaphore emptySemaphore; Semaphore fullSemaphore; RWLock lock; volatile int front; volatile int rear; CJob** jobArr; int capacity ; public: CBlockQueue(int maxSize=1023):front(0),rear(0),emptySemaphore(maxSize,maxSize),fullSemaphore(0,maxSize),capacity(maxSize+1) { if(maxSize<=0){ // throw "cannot be negative."; } jobArr = new CJob*[maxSize+1]; memset(jobArr,0,sizeof(CJob*)*(1+maxSize)); } ~CBlockQueue(){ delete[]jobArr; } //如果添加失败,将会阻塞,如果实现不好阻塞的可能是主线程 void Put(CJob *pJob) { if(pJob != NULL){ emptySemaphore.P(); lock.lock(); bool bRet = this->enQueue(pJob); lock.unLock(); fullSemaphore.V(); } } //获取任务并从队列头部移走;失败(队列空)会阻塞直到有对象可获得;返回对象 CJob* Take() { fullSemaphore.P(); CJob *pJob = DeQueue(); emptySemaphore.V(); assert(pJob!=NULL); return pJob; } //不移走队列;不阻塞 CJob* peek() { BOOL bRet = lock.tryAndLock(); if(bRet) { int tail = (rear-1+capacity)%capacity; lock.tryAndRelease(); return jobArr[tail]; } return NULL; } private: bool enQueue(CJob *pJob){ //if(isFull()) // return false; jobArr[rear]=pJob; rear = (rear+1)%capacity; return true; } CJob * DeQueue(){ //if(isEmpty())return NULL; CJob* pJob=jobArr[front]; front = (front+1)%capacity; return pJob; } bool isFull(){ return ((front+1+capacity-rear)%capacity==0); } bool isEmpty(){ return front == rear; } };
----------------------------------------------------------------
-
//简单封装的线程池调用,主要为Linux下提供接口
class CThreadsPool { UINT64 m_lKeepAliveTime;//计时函数,单位ms,表示如果请求等待时间超出这个时间,那么移除这个请求,并标记请求失败 int m_nCurThreadNum; int m_nMinThreadNum; int m_nMaxThreadNum; bool m_bAutoBalance; //priorityqueue pq;最小队列记录总的空闲线程 CBlockQueue *m_pBlockingQueue; public: CThreadsPool(CBlockQueue *pBlockingQueue,int minThreadNum=128,int maxThreadNum=1024,bool autoBalance=false,UINT64 keepAliveTime=0) { m_nCurThreadNum= minThreadNum; m_nMinThreadNum = minThreadNum; m_bAutoBalance = autoBalance; m_lKeepAliveTime = keepAliveTime; m_pBlockingQueue = pBlockingQueue; } void InitPool(){ //create threads ,and suspend to wait job. } ~CThreadsPool(void) { } unsigned int DoJob() { //select an available thread //(1)if failed,and current thread size is less than max size,then create a bulk of threads, otherwise ,failed with OVERSIZE //(2)success,go on if(m_pBlockingQueue!= NULL){ do{ CJob* pJob = m_pBlockingQueue->Take(); if(pJob == NULL) { //error } else { //select a thread; BOOL bRet = QueueUserWorkItem((LPTHREAD_START_ROUTINE)(pJob->m_pFunc),pJob->m_data,WT_EXECUTELONGFUNCTION); if(bRet ==FALSE){ DWORD dwError = ::GetLastError(); } else { //这就执行完了? } } }while(TRUE); } return 0; } };
---------------------------基本上就可以调用了,继续测试下看看有木有bug :
CBlockQueue queue; DWORD producer_work(void *pParam) { printf("producer id = %d\n",*(int*)pParam); return 0L; } DWORD Producer(void *pParam) { static int id =1; for(int i=0;i<102343;i++){ Sleep(0);//Sleep是为了更好的看到效果 CJob *pJob = new CJob; pJob->id = id++; pJob->m_pFunc = producer_work; pJob->m_data = &pJob->id; printf("put job id = %d\n",pJob->id); queue.Put(pJob); } return 0L; } DWORD Consumer(void *pParam) { CThreadsPool pool(&queue); pool.InitPool(); pool.DoJob(); return 0L; } int main(int argc,char*argv[]) { DWORD threadID; HANDLE hThread1 = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Producer,NULL,0,&threadID); DWORD threadID2; HANDLE hThread2 = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Consumer,NULL,0,&threadID2); ::WaitForSingleObject(hThread2,INFINITE); }
后台输出结果:
“ThreadPool.exe”: 已加载“C:\Windows\winsxs\x86_microsoft.vc90.debugcrt_1fc8b3b9a1e18e3b_9.0.30729.1_none_bb1f6aa1308c35eb\msvcr90d.dll”,已加载符号。
线程 'Win32 线程' (0x16bc) 已退出,返回值为 0 (0x0)。
线程 'Win32 线程' (0x9c0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x14e4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1520) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xa10) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1680) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x17bc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x3bc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x11a4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1770) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1588) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x3a0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x12e4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1498) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xcd0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1314) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x310) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1634) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xc04) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1164) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x908) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x34c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x175c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x13bc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x258) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1514) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x680) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x4a8) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x9b4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xc2c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x2ec) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xd40) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1198) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1774) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x12f0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x10f8) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x65c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1768) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x152c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x784) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x16dc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xaa8) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x12d0) 已退出,返回值为 -1073741510 (0xc000013a)。
程序“[1848] ThreadPool.exe: 本机”已退出,返回值为 -1073741510 (0xc000013a)。
可见线程确实已经启动了。
输出结果:
上面结果,可以看出,put和take过程确实是异步多线程处理的。
上面实例是生产者——消费者模型
一个生产者,多个消费者,多个消费者通过线程池控制,具体个数依赖于QueueUserWorkItem;线程池有一个单独的线程控制。
因此实际上一共有 :
主线程+生产者线程+控制线程+N个消费者线程。
实际上也可以改成 多生产者——多消费者模型,这类模型通常适合于web服务器处理。
我们在实际过程中,采用的是如下模型:
多生产者(web请求)→单消费者(同时又是单生产者,web请求队列排队,创建合适线程个数)→多消费者(多线程获取web请求队列,处理该请求)。
还不了解web服务器采用的是那种模型。
相关推荐
默认情况下,线程池会自动调整线程数量以适应负载。 3. **System.Threading.ThreadPool类**:C#的标准库提供了`ThreadPool`类,它提供了一种方便的方式来使用线程池。通过`ThreadPool.QueueUserWorkItem`方法可以将...
这个最新的版本“apache-tomcat-8.5.59-windows-x64.zip”是专门为Windows 64位操作系统设计的。让我们深入探讨一下这个版本包含的知识点。 首先,Apache Tomcat 8.5.x系列是Tomcat服务器的一个稳定版本,它在功能...
这个最新的版本“apache-tomcat-8.5.70-windows-x64.zip”是专门为Windows操作系统设计的64位版本。在本文中,我们将深入探讨Apache Tomcat 8.5.70在Windows环境下的安装、配置、管理和优化。 首先,安装过程通常...
标题中的"apache-tomcat-8.5.56-windows-x64.zip"表明这是Apache Tomcat的第8.5.56版本,专为Windows 64位操作系统设计的安装包。 Tomcat 8.5.x系列是基于Java SE 8的,因此在安装和配置前,你需要确保你的系统已经...
"apache-tomcat-8.5.99-windows-x64.zip"表明这是专为Windows 64位操作系统设计的。在64位系统上运行64位版本的Tomcat可以充分利用系统的内存资源,对于处理大型应用或高并发场景更为有利。 **4. 安装与配置** 解压...
`apache-tomcat-9.0.68-windows-x64.zip`是Apache Tomcat 9.0.68的最新版本,专为64位Windows系统设计。这个压缩包包含了运行Tomcat所需的所有文件,包括可执行文件、配置文件、库文件以及相关的文档。 **Tomcat的...
这个版本是专门为Windows 64位操作系统设计的,提供了在该平台上运行Java Web应用的能力。在本文中,我们将深入探讨Apache Tomcat 8.5.43及其与Windows、Apache和Java的关系,以及如何在Windows x64系统上安装和配置...
默认情况下,它会监听80和1935端口,可以通过配置文件修改这些设置。 2. 推流与拉流:ZLMediaKit支持推流和拉流操作。推流者可以将音视频数据发送到ZLMediaKit,拉流者则通过URL从服务器获取流数据。 3. RESTful ...
【Apache Tomcat 6.0.32 for Windows x86】是一款专为Windows操作系统设计的轻量级Java应用服务器,由Apache软件基金会开发并维护。这个版本是针对32位Windows系统的,它主要用来部署和运行Java Servlets和Java...
在Windows 64位环境下运行,它可以充分利用64位操作系统的优势,提供更高的内存管理和更大的数据处理能力。 标题中的"Windows64位 MySql 5.6.19"指的是适用于64位Windows操作系统的MySQL 5.6.19版本。这个版本专为...
"apache-tomcat-9.0.2-windows-x64"指的是Tomcat的第9.0.2版本,针对64位Windows操作系统设计的。这个版本是一个测试版,意味着它可能包含了最新的功能改进和修复,但尚未经过全面的生产环境验证。 1. **Tomcat 9.0...
官方原版的`apache-tomcat-8.5.51-windows-x86.zip`是专门为32位Windows系统设计的版本,它提供了在Windows环境下运行Java Web应用的基础架构。 1. **Tomcat的简介**:Tomcat是由Apache Software Foundation维护的...
本安装包是专为Windows操作系统设计的,支持32位(x86)和64位(x64)架构。MySQL 5.5在Windows平台上的安装过程相对简单,特别是对于初学者和程序员来说,能够快速搭建数据库环境,进行数据存储和管理。 安装包中...
标题中的“iocp_api.dll”指的是一个动态链接库(Dynamic Link Library)文件,...对于学习和理解如何在Windows环境下利用IOCP提升程序性能,以及如何有效管理和使用线程池的开发者来说,这些内容具有很高的参考价值。
Redis-x64-3.2.100是Redis数据库的一个特定版本,专为64位Windows操作系统设计。Redis是一个开源、高性能的键值存储系统,常被用作数据库、缓存和消息代理。它的特点是数据结构丰富,支持字符串、哈希、列表、集合和...
这个"官方原版apache-tomcat-9.0.33-windows-x64.zip"是Apache Tomcat的第9.0.33版本,专为64位Windows操作系统设计。以下是关于这个版本的Apache Tomcat的详细知识: 1. **版本号**:9.0.33代表了Tomcat的版本。每...
Tomcat的日志信息默认存储在`logs`目录下,这对于调试和监控系统行为非常有用。用户可以自定义日志配置,如使用不同的日志框架(例如Log4j)或改变日志级别。 9. **应用部署** 应用程序通常以`.war`文件形式部署...
安装Tomcat 9.0.0.M10在Windows上通常是解压到指定目录,然后通过bin目录下的批处理文件启动和关闭服务。配置包括修改`server.xml`来定制端口、连接器和其他服务器设置,以及设置环境变量指向Tomcat的主目录。 **6....
这款64位版本是专为Windows操作系统设计的,可以充分利用64位系统的优势,如处理大量内存和管理大量并发连接。 Tomcat的7.0版本引入了许多增强功能和性能优化,旨在提高其稳定性和安全性。例如,它支持Servlet 3.0...