- 浏览: 63411 次
- 性别:
- 来自: 武汉
线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。
在linux中,使用的是posix线程库,首先介绍几个常用的函数:
1 线程的创建和取消函数
pthread_create
创建线程
pthread_join
合并线程
pthread_cancel
取消线程
2 线程同步函数
pthread_mutex_lock
pthread_mutex_unlock
pthread_cond_signal
pthread_cond_wait
关于函数的详细说明,参考man手册
线程池的实现:
线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。
主要有两个类来实现,CTask,CThreadPool
/**
执行任务的类,设置任务数据并执行
**/
C代码
1. class CTask
2. {
3. protected:
4. string m_strTaskName; //任务的名称
5. void* m_ptrData; //要执行的任务的具体数据
6. public:
7. CTask(){}
8. CTask(string taskName)
9. {
10. this->m_strTaskName = taskName;
11. m_ptrData = NULL;
12. }
13. virtual int Run()= 0;
14. void SetData(void* data); //设置任务数据
15. };
class CTask
{
protected:
string m_strTaskName; //任务的名称
void* m_ptrData; //要执行的任务的具体数据
public:
CTask(){}
CTask(string taskName)
{
this->m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); //设置任务数据
};
任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。
线程池类
/**
线程池
**/
Java代码
1. class CThreadPool
2. {
3. private:
4. vector<CTask*> m_vecTaskList; //任务列表
5. int m_iThreadNum; //线程池中启动的线程数
6. static vector<pthread_t> m_vecIdleThread; //当前空闲的线程集合
7. static vector<pthread_t> m_vecBusyThread; //当前正在执行的线程集合
8. static pthread_mutex_t m_pthreadMutex; //线程同步锁
9. static pthread_cond_t m_pthreadCond; //线程同步的条件变量
10. protected:
11. static void* ThreadFunc(void * threadData); //新线程的线程函数
12. static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中
13. static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去
14. int Create(); //创建所有的线程
15. public:
16. CThreadPool(int threadNum);
17. int AddTask(CTask *task); //把任务添加到线程池中
18. int StopAll();
19. };
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList; //任务列表
int m_iThreadNum; //线程池中启动的线程数
static vector<pthread_t> m_vecIdleThread; //当前空闲的线程集合
static vector<pthread_t> m_vecBusyThread; //当前正在执行的线程集合
static pthread_mutex_t m_pthreadMutex; //线程同步锁
static pthread_cond_t m_pthreadCond; //线程同步的条件变量
protected:
static void* ThreadFunc(void * threadData); //新线程的线程函数
static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中
static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去
int Create(); //创建所有的线程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //把任务添加到线程池中
int StopAll();
};
当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。
线程之间的同步用线程锁和条件变量。
这个类的对外接口有两个:
AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。
StopAll函数停止所有的线程
Cpp代码
1. ************************************************
2.
3. 代码:
4.
5. ××××××××××××××××××××CThread.h
6.
7.
8.
9. #ifndef __CTHREAD
10. #define __CTHREAD
11. #include <vector>
12. #include <string>
13. #include <pthread.h>
14.
15. using namespace std;
16.
17. /**
18. 执行任务的类,设置任务数据并执行
19. **/
20. class CTask
21. {
22. protected:
23. string m_strTaskName; //任务的名称
24. void* m_ptrData; //要执行的任务的具体数据
25. public:
26. CTask(){}
27. CTask(string taskName)
28. {
29. this->m_strTaskName = taskName;
30. m_ptrData = NULL;
31. }
32. virtual int Run()= 0;
33. void SetData(void* data); //设置任务数据
34. };
35.
36. /**
37. 线程池
38. **/
39. class CThreadPool
40. {
41. private:
42. vector<CTask*> m_vecTaskList; //任务列表
43. int m_iThreadNum; //线程池中启动的线程数
44. static vector<pthread_t> m_vecIdleThread; //当前空闲的线程集合
45. static vector<pthread_t> m_vecBusyThread; //当前正在执行的线程集合
46. static pthread_mutex_t m_pthreadMutex; //线程同步锁
47. static pthread_cond_t m_pthreadCond; //线程同步的条件变量
48. protected:
49. static void* ThreadFunc(void * threadData); //新线程的线程函数
50. static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中
51. static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去
52. int Create(); //创建所有的线程
53. public:
54. CThreadPool(int threadNum);
55. int AddTask(CTask *task); //把任务添加到线程池中
56. int StopAll();
57. };
58.
59. #endif
60.
61.
62.
63.
64.
65.
66.
67. 类的实现为:
68.
69. ××××××××××××××××××××CThread.cpp
70.
71.
72.
73. #include "CThread.h"
74. #include <string>
75. #include <iostream>
76.
77. using namespace std;
78.
79. void CTask::SetData(void * data)
80. {
81. m_ptrData = data;
82. }
83.
84. vector<pthread_t> CThreadPool::m_vecBusyThread;
85. vector<pthread_t> CThreadPool::m_vecIdleThread;
86. pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
87. pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
88.
89. CThreadPool::CThreadPool(int threadNum)
90. {
91. this->m_iThreadNum = threadNum;
92. Create();
93. }
94. int CThreadPool::MoveToIdle(pthread_t tid)
95. {
96. vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
97. while(busyIter != m_vecBusyThread.end())
98. {
99. if(tid == *busyIter)
100. {
101. break;
102. }
103. busyIter++;
104. }
105. m_vecBusyThread.erase(busyIter);
106. m_vecIdleThread.push_back(tid);
107. return 0;
108. }
109.
110. int CThreadPool::MoveToBusy(pthread_t tid)
111. {
112. vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
113. while(idleIter != m_vecIdleThread.end())
114. {
115. if(tid == *idleIter)
116. {
117. break;
118. }
119. idleIter++;
120. }
121. m_vecIdleThread.erase(idleIter);
122. m_vecBusyThread.push_back(tid);
123. return 0;
124. }
125. void* CThreadPool::ThreadFunc(void * threadData)
126. {
127. pthread_t tid = pthread_self();
128. while(1)
129. {
130. pthread_mutex_lock(&m_pthreadMutex);
131. pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
132. cout << "tid:" << tid << " run" << endl;
133. //get task
134. vector<CTask*>* taskList = (vector<CTask*>*)threadData;
135. vector<CTask*>::iterator iter = taskList->begin();
136. while(iter != taskList->end())
137. {
138.
139. MoveToBusy(tid);
140. break;
141. }
142. CTask* task = *iter;
143. taskList->erase(iter);
144. pthread_mutex_unlock(&m_pthreadMutex);
145. cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
146. cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
147. //cout << "task to be run:" << taskList->size() << endl;
148. task->Run();
149.
150. //cout << "CThread::thread work" << endl;
151. cout << "tid:" << tid << " idle" << endl;
152.
153. }
154. return (void*)0;
155. }
156.
157. int CThreadPool::AddTask(CTask *task)
158. {
159. this->m_vecTaskList.push_back(task);
160. pthread_cond_signal(&m_pthreadCond);
161. return 0;
162. }
163. int CThreadPool::Create()
164. {
165. for(int i = 0; i < m_iThreadNum;i++)
166. {
167. pthread_t tid = 0;
168. pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
169. m_vecIdleThread.push_back(tid);
170. }
171. return 0;
172. }
173.
174. int CThreadPool::StopAll()
175. {
176. vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
177. while(iter != m_vecIdleThread.end())
178. {
179. pthread_cancel(*iter);
180. pthread_join(*iter,NULL);
181. iter++;
182. }
183.
184. iter = m_vecBusyThread.begin();
185. while(iter != m_vecBusyThread.end())
186. {
187. pthread_cancel(*iter);
188. pthread_join(*iter,NULL);
189. iter++;
190. }
191.
192. return 0;
193. }
194.
195. 简单示例:
196.
197. ××××××××test.cpp
198.
199. #include "CThread.h"
200. #include <iostream>
201.
202. using namespace std;
203.
204. class CWorkTask: public CTask
205. {
206. public:
207. CWorkTask()
208. {}
209. int Run()
210. {
211. cout << (char*)this->m_ptrData << endl;
212. sleep(10);
213. return 0;
214. }
215. };
216. int main()
217. {
218. CWorkTask taskObj;
219. char szTmp[] = "this is the first thread running,haha success";
220. taskObj.SetData((void*)szTmp);
221. CThreadPool threadPool(10);
222. for(int i = 0;i < 11;i++)
223. {
224. threadPool.AddTask(&taskObj);
225. }
226. while(1)
227. {
228. sleep(120);
229. }
230. return 0;
231. }
************************************************
代码:
××××××××××××××××××××CThread.h
#ifndef __CTHREAD
#define __CTHREAD
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
/**
执行任务的类,设置任务数据并执行
**/
class CTask
{
protected:
string m_strTaskName; //任务的名称
void* m_ptrData; //要执行的任务的具体数据
public:
CTask(){}
CTask(string taskName)
{
this->m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); //设置任务数据
};
/**
线程池
**/
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList; //任务列表
int m_iThreadNum; //线程池中启动的线程数
static vector<pthread_t> m_vecIdleThread; //当前空闲的线程集合
static vector<pthread_t> m_vecBusyThread; //当前正在执行的线程集合
static pthread_mutex_t m_pthreadMutex; //线程同步锁
static pthread_cond_t m_pthreadCond; //线程同步的条件变量
protected:
static void* ThreadFunc(void * threadData); //新线程的线程函数
static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中
static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去
int Create(); //创建所有的线程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //把任务添加到线程池中
int StopAll();
};
#endif
类的实现为:
××××××××××××××××××××CThread.cpp
#include "CThread.h"
#include <string>
#include <iostream>
using namespace std;
void CTask::SetData(void * data)
{
m_ptrData = data;
}
vector<pthread_t> CThreadPool::m_vecBusyThread;
vector<pthread_t> CThreadPool::m_vecIdleThread;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
Create();
}
int CThreadPool::MoveToIdle(pthread_t tid)
{
vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
while(busyIter != m_vecBusyThread.end())
{
if(tid == *busyIter)
{
break;
}
busyIter++;
}
m_vecBusyThread.erase(busyIter);
m_vecIdleThread.push_back(tid);
return 0;
}
int CThreadPool::MoveToBusy(pthread_t tid)
{
vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
while(idleIter != m_vecIdleThread.end())
{
if(tid == *idleIter)
{
break;
}
idleIter++;
}
m_vecIdleThread.erase(idleIter);
m_vecBusyThread.push_back(tid);
return 0;
}
void* CThreadPool::ThreadFunc(void * threadData)
{
pthread_t tid = pthread_self();
while(1)
{
pthread_mutex_lock(&m_pthreadMutex);
pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
cout << "tid:" << tid << " run" << endl;
//get task
vector<CTask*>* taskList = (vector<CTask*>*)threadData;
vector<CTask*>::iterator iter = taskList->begin();
while(iter != taskList->end())
{
MoveToBusy(tid);
break;
}
CTask* task = *iter;
taskList->erase(iter);
pthread_mutex_unlock(&m_pthreadMutex);
cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
//cout << "task to be run:" << taskList->size() << endl;
task->Run();
//cout << "CThread::thread work" << endl;
cout << "tid:" << tid << " idle" << endl;
}
return (void*)0;
}
int CThreadPool::AddTask(CTask *task)
{
this->m_vecTaskList.push_back(task);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
int CThreadPool::Create()
{
for(int i = 0; i < m_iThreadNum;i++)
{
pthread_t tid = 0;
pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
m_vecIdleThread.push_back(tid);
}
return 0;
}
int CThreadPool::StopAll()
{
vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
while(iter != m_vecIdleThread.end())
{
pthread_cancel(*iter);
pthread_join(*iter,NULL);
iter++;
}
iter = m_vecBusyThread.begin();
while(iter != m_vecBusyThread.end())
{
pthread_cancel(*iter);
pthread_join(*iter,NULL);
iter++;
}
return 0;
}
简单示例:
××××××××test.cpp
#include "CThread.h"
#include <iostream>
using namespace std;
class CWorkTask: public CTask
{
public:
CWorkTask()
{}
int Run()
{
cout << (char*)this->m_ptrData << endl;
sleep(10);
return 0;
}
};
int main()
{
CWorkTask taskObj;
char szTmp[] = "this is the first thread running,haha success";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);
for(int i = 0;i < 11;i++)
{
threadPool.AddTask(&taskObj);
}
while(1)
{
sleep(120);
}
return 0;
}
发表评论
-
C++内存管理
2011-09-19 10:45 8141 内存管理 伟大的Bill G ... -
调试程序
2011-09-14 16:21 741******************************* ... -
有用的网址
2011-08-04 12:44 632http://searchdns.netcraft.com/ -
vim高级应用
2011-08-04 10:29 1011命令模式: “s”为表 ... -
c/c++ 调试
2011-06-08 15:31 843readelf -s a.out 可以查看符号表,就能找到对应 ... -
网站列表
2011-02-25 09:48 699http://www.rosoo.net/ NO ... -
高性能服务器注意事项
2011-01-18 13:09 843对于这里所说的服务器,更精确的定义应该是每秒处理大量离散消息或 ... -
在 Linux 平台中调试 C/C++ 内存泄漏方法
2010-10-20 15:56 923由于 C 和 C++ 程序中完全由程序员自主申请和释放内存 ... -
用socket编写C/S结构程序的流程图
2010-10-08 17:07 24681.面向连接的套接字的系统调用时序图 无连接协议的套接字调用 ... -
将文件读入内存
2010-09-14 17:35 1114将文本文件读入内存。需要用到fseek、fread和ftell ... -
STL中的容器的遍历的使用方法
2010-08-12 17:39 2090STL中的容器按存储方式分为两类,一类是按以数组形式存储的容器 ... -
50 c/c++ 源码网站
2010-08-03 15:47 7961、http://snippets.dzone.com/tag ... -
值得注意的函数
2010-08-03 13:48 560函数 严重性 解决 ... -
调用系统命令
2010-08-02 12:52 650int get_system_info(char* cmdst ... -
结构体对齐
2010-07-30 13:59 13281,比如: struct{ short a1; sh ... -
LINUX c++线程池框架
2010-07-15 11:25 1422本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进 ... -
文件字符串处理
2010-06-30 09:12 707#include <iostream> #incl ... -
不能用类成员函数作为线程函数
2010-06-29 17:12 870不能用类成员函数作为线程函数 -
c++ 字符串替换
2010-06-24 10:43 997#include <string> #incl ...
相关推荐
简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则...
本实例将深入探讨如何在Linux下实现一个简单的线程池,并介绍相关的关键知识点。 1. **线程与线程池的概念** - **线程**:是操作系统分配CPU时间片的基本单位,是程序执行的流,一个进程中可以包含多个线程,它们...
以下是一个简单的线程池设计步骤: 1. **初始化线程池**:创建一定数量的工作线程,并初始化任务队列。 2. **创建任务**:定义任务结构体,包含任务的函数指针和参数。 3. **提交任务**:将任务添加到任务队列,...
### Linux线程池C源码解析 #### 一、概览 本文将深入解析一个Linux下的线程池实现,该实现使用C语言编写,并遵循GNU通用公共许可证版本2(或之后版本)。线程池是一种软件设计模式,它可以提高程序执行效率,通过...
在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...
当有新的任务提交时,线程池会选择一个空闲的线程来执行任务,而不是每次都需要创建新的线程,这样可以避免频繁的线程创建和销毁带来的开销。 `twork_work.cpp`和`twork_work.h`文件可能定义了一个工作单元(Work ...
- 主进程维护一个线程池,预先创建一定数量的线程。 - 当有新的任务到来时,不再创建新的线程,而是将任务放入队列中,由线程池中的空闲线程取出任务进行处理。 - 线程在完成任务后返回线程池等待新的任务,而...
Linux下的线程池是多线程编程中的一个重要概念,它是一种高效的线程管理机制,能够有效地管理和调度系统资源,提高系统的并发性能。本项目是基于C++语言在Linux环境下实现的一个完全抽象的线程池,旨在提供一种灵活...
本教程将深入探讨如何在Linux环境下,使用C++实现一个简单的线程池。 首先,线程池的基本结构通常包括以下几个部分: 1. **线程池容器**:用于存储工作线程的集合,通常使用STL中的`std::vector`来实现。这个容器...
linux pthreadpool实现和线程池的用处 简单易懂 互斥和信号量使用
本文将深入探讨如何在Linux环境下使用C++实现一个线程池,以及它的工作原理和重要性。 首先,线程池是由多个工作线程组成的集合,这些线程预先启动并在需要时执行任务。这样可以避免频繁地创建和销毁线程,减少系统...
在`thread_pool`文件中,可能包含了示例代码,这些代码展示了如何在Linux环境下实现一个简单的线程池。通过阅读和理解这个实例,你可以了解如何创建线程池、添加任务以及管理线程的生命周期。 总之,Linux线程池是...
以下是一个简单的C语言实现线程池的例子: ```c // main.c #include #include #include "thread_pool.h" // 任务处理函数 void *task_test(void *arg) { printf("working on task %d\n", (int)arg); sleep(1); ...
下面展示了一个简单的通用线程池的实现例子: 1. **任务节点定义** - `taskNode` 结构体用于封装待处理的任务数据,如网络连接的句柄 `sockfd`。 2. **任务处理函数** - `printsd` 函数是一个示例性的任务处理...
Linux线程池是一种高效利用系统资源的编程模型,它通过预先创建一组线程,然后将任务分配给这些线程来执行,而不是每次需要时都创建新的线程。这种设计模式可以减少线程创建和销毁的开销,提高系统的响应速度,并...
本资源提供的线程池实现是一个简单而实用的例子,特别适合对C++多线程编程感兴趣或需要在Linux环境中应用线程池技术的开发者。 首先,C++11标准引入了线程库 `<thread>`,使得在C++中进行多线程编程变得更为便捷。...
7. **代码实现**:`ThreadPool`很可能是一个包含线程池管理逻辑的类,它可能包含如初始化线程池、添加任务、启动和停止线程池等功能。类的内部可能维护一个任务队列和线程集合,以及相关的同步原语(如互斥锁、条件...
本文将深入探讨如何在Linux环境下使用Epoll和线程池技术构建一个简单的Web服务器。这个Web服务器的实现是基于Linux内核提供的异步I/O模型,Epoll,以及线程池策略,以优化系统资源的使用和提高并发性能。 首先,让...
线程池的基本思想是预先创建一定数量的线程并将它们置于空闲状态,当有任务到来时,从线程池中选取一个空闲线程执行该任务。任务执行完毕后,线程返回线程池继续等待下一个任务,而不是立即销毁。这种方式可以显著...
本文档介绍了一个简单的Linux C语言实现的线程池示例,该实现主要包括三个部分:`tpool.h`、`tpool.c`以及一个用于测试或实际应用的`.c`文件。 #### tpool.h —— 线程池头文件 `tpool.h`定义了线程池所需的结构体...