`
unsoundboy
  • 浏览: 63411 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

一个简单的linux线程池

    博客分类:
  • c++
阅读更多


线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。



在linux中,使用的是posix线程库,首先介绍几个常用的函数:

1 线程的创建和取消函数

pthread_create

创建线程

pthread_join

合并线程

pthread_cancel

取消线程

2 线程同步函数

pthread_mutex_lock

pthread_mutex_unlock

pthread_cond_signal

pthread_cond_wait



关于函数的详细说明,参考man手册



线程池的实现:

线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

主要有两个类来实现,CTask,CThreadPool

/**
执行任务的类,设置任务数据并执行
**/
C代码

   1. class CTask 
   2. { 
   3. protected: 
   4.  string m_strTaskName;  //任务的名称 
   5.  void* m_ptrData;       //要执行的任务的具体数据 
   6. public: 
   7.  CTask(){} 
   8.  CTask(string taskName) 
   9.  { 
  10.   this->m_strTaskName = taskName; 
  11.   m_ptrData = NULL; 
  12.  } 
  13.  virtual int Run()= 0; 
  14.  void SetData(void* data);    //设置任务数据 
  15. }; 

class CTask
{
protected:
string m_strTaskName;  //任务的名称
void* m_ptrData;       //要执行的任务的具体数据
public:
CTask(){}
CTask(string taskName)
{
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data);    //设置任务数据
};



任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。



线程池类

/**
线程池
**/
Java代码

   1. class CThreadPool 
   2. { 
   3. private: 
   4.  vector<CTask*> m_vecTaskList;         //任务列表 
   5.  int m_iThreadNum;                            //线程池中启动的线程数            
   6.  static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合 
   7.  static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合 
   8.  static pthread_mutex_t m_pthreadMutex;    //线程同步锁 
   9.  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量 
  10. protected: 
  11.  static void* ThreadFunc(void * threadData); //新线程的线程函数 
  12.  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中 
  13.  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去 
  14.  int Create();          //创建所有的线程 
  15. public: 
  16.  CThreadPool(int threadNum); 
  17.  int AddTask(CTask *task);      //把任务添加到线程池中 
  18.  int StopAll(); 
  19. }; 

class CThreadPool
{
private:
vector<CTask*> m_vecTaskList;         //任务列表
int m_iThreadNum;                            //线程池中启动的线程数          
static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
static pthread_mutex_t m_pthreadMutex;    //线程同步锁
static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
protected:
static void* ThreadFunc(void * threadData); //新线程的线程函数
static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
int Create();          //创建所有的线程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task);      //把任务添加到线程池中
int StopAll();
};



当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

线程之间的同步用线程锁和条件变量。

这个类的对外接口有两个:

AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

StopAll函数停止所有的线程


Cpp代码

   1. ************************************************ 
   2.  
   3. 代码: 
   4.  
   5. ××××××××××××××××××××CThread.h 
   6.  
   7.   
   8.  
   9. #ifndef __CTHREAD 
  10. #define __CTHREAD 
  11. #include <vector> 
  12. #include <string> 
  13. #include <pthread.h> 
  14.  
  15. using namespace std; 
  16.  
  17. /**
  18. 执行任务的类,设置任务数据并执行
  19. **/ 
  20. class CTask 
  21. { 
  22. protected: 
  23.  string m_strTaskName;  //任务的名称 
  24.  void* m_ptrData;       //要执行的任务的具体数据 
  25. public: 
  26.  CTask(){} 
  27.  CTask(string taskName) 
  28.  { 
  29.   this->m_strTaskName = taskName; 
  30.   m_ptrData = NULL; 
  31.  } 
  32.  virtual int Run()= 0; 
  33.  void SetData(void* data);    //设置任务数据 
  34. }; 
  35.  
  36. /**
  37. 线程池
  38. **/ 
  39. class CThreadPool 
  40. { 
  41. private: 
  42.  vector<CTask*> m_vecTaskList;         //任务列表 
  43.  int m_iThreadNum;                            //线程池中启动的线程数            
  44.  static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合 
  45.  static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合 
  46.  static pthread_mutex_t m_pthreadMutex;    //线程同步锁 
  47.  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量 
  48. protected: 
  49.  static void* ThreadFunc(void * threadData); //新线程的线程函数 
  50.  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中 
  51.  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去 
  52.  int Create();          //创建所有的线程 
  53. public: 
  54.  CThreadPool(int threadNum); 
  55.  int AddTask(CTask *task);      //把任务添加到线程池中 
  56.  int StopAll(); 
  57. }; 
  58.  
  59. #endif 
  60.  
  61.   
  62.  
  63.   
  64.  
  65.   
  66.  
  67. 类的实现为: 
  68.  
  69. ××××××××××××××××××××CThread.cpp 
  70.  
  71.   
  72.  
  73. #include "CThread.h" 
  74. #include <string> 
  75. #include <iostream> 
  76.  
  77. using namespace std; 
  78.  
  79. void CTask::SetData(void * data) 
  80. { 
  81.  m_ptrData = data; 
  82. } 
  83.  
  84. vector<pthread_t> CThreadPool::m_vecBusyThread; 
  85. vector<pthread_t> CThreadPool::m_vecIdleThread; 
  86. pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; 
  87. pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; 
  88.  
  89. CThreadPool::CThreadPool(int threadNum) 
  90. { 
  91.  this->m_iThreadNum = threadNum; 
  92.  Create(); 
  93. } 
  94. int CThreadPool::MoveToIdle(pthread_t tid) 
  95. { 
  96.  vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin(); 
  97.  while(busyIter != m_vecBusyThread.end()) 
  98.  { 
  99.   if(tid == *busyIter) 
100.   { 
101.    break; 
102.   } 
103.   busyIter++; 
104.  } 
105.  m_vecBusyThread.erase(busyIter); 
106.  m_vecIdleThread.push_back(tid); 
107.  return 0; 
108. } 
109.  
110. int CThreadPool::MoveToBusy(pthread_t tid) 
111. { 
112.  vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin(); 
113.  while(idleIter != m_vecIdleThread.end()) 
114.  { 
115.   if(tid == *idleIter) 
116.   { 
117.    break; 
118.   } 
119.   idleIter++; 
120.  } 
121.  m_vecIdleThread.erase(idleIter); 
122.  m_vecBusyThread.push_back(tid); 
123.  return 0; 
124. } 
125. void* CThreadPool::ThreadFunc(void * threadData) 
126. { 
127.  pthread_t tid = pthread_self(); 
128.  while(1) 
129.  { 
130.   pthread_mutex_lock(&m_pthreadMutex); 
131.   pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex); 
132.   cout << "tid:" << tid << " run" << endl; 
133.   //get task 
134.   vector<CTask*>* taskList = (vector<CTask*>*)threadData; 
135.   vector<CTask*>::iterator iter = taskList->begin(); 
136.   while(iter != taskList->end()) 
137.   { 
138.     
139.    MoveToBusy(tid); 
140.    break; 
141.   } 
142.   CTask* task = *iter; 
143.   taskList->erase(iter); 
144.   pthread_mutex_unlock(&m_pthreadMutex); 
145.   cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl; 
146.   cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl; 
147.   //cout << "task to be run:" << taskList->size() << endl; 
148.   task->Run(); 
149.    
150.   //cout << "CThread::thread work" << endl; 
151.   cout << "tid:" << tid << " idle" << endl; 
152.    
153.  } 
154.  return (void*)0; 
155. } 
156.  
157. int CThreadPool::AddTask(CTask *task) 
158. { 
159.  this->m_vecTaskList.push_back(task); 
160.  pthread_cond_signal(&m_pthreadCond); 
161.  return 0; 
162. } 
163. int CThreadPool::Create() 
164. { 
165.  for(int i = 0; i < m_iThreadNum;i++) 
166.  { 
167.   pthread_t tid = 0; 
168.   pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList); 
169.   m_vecIdleThread.push_back(tid); 
170.  } 
171.  return 0; 
172. } 
173.  
174. int CThreadPool::StopAll() 
175. { 
176.  vector<pthread_t>::iterator iter = m_vecIdleThread.begin(); 
177.  while(iter != m_vecIdleThread.end()) 
178.  { 
179.   pthread_cancel(*iter); 
180.   pthread_join(*iter,NULL); 
181.   iter++; 
182.  } 
183.  
184.  iter = m_vecBusyThread.begin(); 
185.  while(iter != m_vecBusyThread.end()) 
186.  { 
187.   pthread_cancel(*iter); 
188.   pthread_join(*iter,NULL); 
189.   iter++; 
190.  } 
191.   
192.  return 0; 
193. } 
194.  
195. 简单示例: 
196.  
197. ××××××××test.cpp 
198.  
199. #include "CThread.h" 
200. #include <iostream> 
201.  
202. using namespace std; 
203.  
204. class CWorkTask: public CTask 
205. { 
206. public: 
207.  CWorkTask() 
208.  {} 
209.  int Run() 
210.  { 
211.   cout << (char*)this->m_ptrData << endl; 
212.   sleep(10); 
213.   return 0; 
214.  } 
215. }; 
216. int main() 
217. { 
218.  CWorkTask taskObj; 
219.  char szTmp[] = "this is the first thread running,haha success"; 
220.  taskObj.SetData((void*)szTmp); 
221.  CThreadPool threadPool(10); 
222.  for(int i = 0;i < 11;i++) 
223.  { 
224.   threadPool.AddTask(&taskObj); 
225.  } 
226.  while(1) 
227.  { 
228.   sleep(120); 
229.  } 
230.  return 0; 
231. } 

************************************************

代码:

××××××××××××××××××××CThread.h



#ifndef __CTHREAD
#define __CTHREAD
#include <vector>
#include <string>
#include <pthread.h>

using namespace std;

/**
执行任务的类,设置任务数据并执行
**/
class CTask
{
protected:
string m_strTaskName;  //任务的名称
void* m_ptrData;       //要执行的任务的具体数据
public:
CTask(){}
CTask(string taskName)
{
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data);    //设置任务数据
};

/**
线程池
**/
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList;         //任务列表
int m_iThreadNum;                            //线程池中启动的线程数          
static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
static pthread_mutex_t m_pthreadMutex;    //线程同步锁
static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
protected:
static void* ThreadFunc(void * threadData); //新线程的线程函数
static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
int Create();          //创建所有的线程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task);      //把任务添加到线程池中
int StopAll();
};

#endif







类的实现为:

××××××××××××××××××××CThread.cpp



#include "CThread.h"
#include <string>
#include <iostream>

using namespace std;

void CTask::SetData(void * data)
{
m_ptrData = data;
}

vector<pthread_t> CThreadPool::m_vecBusyThread;
vector<pthread_t> CThreadPool::m_vecIdleThread;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
Create();
}
int CThreadPool::MoveToIdle(pthread_t tid)
{
vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
while(busyIter != m_vecBusyThread.end())
{
  if(tid == *busyIter)
  {
   break;
  }
  busyIter++;
}
m_vecBusyThread.erase(busyIter);
m_vecIdleThread.push_back(tid);
return 0;
}

int CThreadPool::MoveToBusy(pthread_t tid)
{
vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
while(idleIter != m_vecIdleThread.end())
{
  if(tid == *idleIter)
  {
   break;
  }
  idleIter++;
}
m_vecIdleThread.erase(idleIter);
m_vecBusyThread.push_back(tid);
return 0;
}
void* CThreadPool::ThreadFunc(void * threadData)
{
pthread_t tid = pthread_self();
while(1)
{
  pthread_mutex_lock(&m_pthreadMutex);
  pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
  cout << "tid:" << tid << " run" << endl;
  //get task
  vector<CTask*>* taskList = (vector<CTask*>*)threadData;
  vector<CTask*>::iterator iter = taskList->begin();
  while(iter != taskList->end())
  {
  
   MoveToBusy(tid);
   break;
  }
  CTask* task = *iter;
  taskList->erase(iter);
  pthread_mutex_unlock(&m_pthreadMutex);
  cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
  cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
  //cout << "task to be run:" << taskList->size() << endl;
  task->Run();
 
  //cout << "CThread::thread work" << endl;
  cout << "tid:" << tid << " idle" << endl;
 
}
return (void*)0;
}

int CThreadPool::AddTask(CTask *task)
{
this->m_vecTaskList.push_back(task);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
int CThreadPool::Create()
{
for(int i = 0; i < m_iThreadNum;i++)
{
  pthread_t tid = 0;
  pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
  m_vecIdleThread.push_back(tid);
}
return 0;
}

int CThreadPool::StopAll()
{
vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
while(iter != m_vecIdleThread.end())
{
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
}

iter = m_vecBusyThread.begin();
while(iter != m_vecBusyThread.end())
{
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
}

return 0;
}

简单示例:

××××××××test.cpp

#include "CThread.h"
#include <iostream>

using namespace std;

class CWorkTask: public CTask
{
public:
CWorkTask()
{}
int Run()
{
  cout << (char*)this->m_ptrData << endl;
  sleep(10);
  return 0;
}
};
int main()
{
CWorkTask taskObj;
char szTmp[] = "this is the first thread running,haha success";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);
for(int i = 0;i < 11;i++)
{
  threadPool.AddTask(&taskObj);
}
while(1)
{
  sleep(120);
}
return 0;
}



分享到:
评论

相关推荐

    linux线程池创建c实现

    简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则...

    linux 实现一个简单的线程池及工作

    本实例将深入探讨如何在Linux下实现一个简单的线程池,并介绍相关的关键知识点。 1. **线程与线程池的概念** - **线程**:是操作系统分配CPU时间片的基本单位,是程序执行的流,一个进程中可以包含多个线程,它们...

    linux线程池示例程序

    以下是一个简单的线程池设计步骤: 1. **初始化线程池**:创建一定数量的工作线程,并初始化任务队列。 2. **创建任务**:定义任务结构体,包含任务的函数指针和参数。 3. **提交任务**:将任务添加到任务队列,...

    linux线程池c源码

    ### Linux线程池C源码解析 #### 一、概览 本文将深入解析一个Linux下的线程池实现,该实现使用C语言编写,并遵循GNU通用公共许可证版本2(或之后版本)。线程池是一种软件设计模式,它可以提高程序执行效率,通过...

    Linux下C线程池实现

    在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...

    一个简单线程池的实现

    当有新的任务提交时,线程池会选择一个空闲的线程来执行任务,而不是每次都需要创建新的线程,这样可以避免频繁的线程创建和销毁带来的开销。 `twork_work.cpp`和`twork_work.h`文件可能定义了一个工作单元(Work ...

    Linux环境下的通用线程池设计

    - 主进程维护一个线程池,预先创建一定数量的线程。 - 当有新的任务到来时,不再创建新的线程,而是将任务放入队列中,由线程池中的空闲线程取出任务进行处理。 - 线程在完成任务后返回线程池等待新的任务,而...

    Linux 线程池 C++

    Linux下的线程池是多线程编程中的一个重要概念,它是一种高效的线程管理机制,能够有效地管理和调度系统资源,提高系统的并发性能。本项目是基于C++语言在Linux环境下实现的一个完全抽象的线程池,旨在提供一种灵活...

    简单linux C++线程池

    本教程将深入探讨如何在Linux环境下,使用C++实现一个简单的线程池。 首先,线程池的基本结构通常包括以下几个部分: 1. **线程池容器**:用于存储工作线程的集合,通常使用STL中的`std::vector`来实现。这个容器...

    linux c线程池

    linux pthreadpool实现和线程池的用处 简单易懂 互斥和信号量使用

    c++ 实现的线程池(linux环境下)

    本文将深入探讨如何在Linux环境下使用C++实现一个线程池,以及它的工作原理和重要性。 首先,线程池是由多个工作线程组成的集合,这些线程预先启动并在需要时执行任务。这样可以避免频繁地创建和销毁线程,减少系统...

    Linux 线程池(Thread Pool)的实现原理 实例

    在`thread_pool`文件中,可能包含了示例代码,这些代码展示了如何在Linux环境下实现一个简单的线程池。通过阅读和理解这个实例,你可以了解如何创建线程池、添加任务以及管理线程的生命周期。 总之,Linux线程池是...

    Linux线程池使用.docx

    以下是一个简单的C语言实现线程池的例子: ```c // main.c #include #include #include "thread_pool.h" // 任务处理函数 void *task_test(void *arg) { printf("working on task %d\n", (int)arg); sleep(1); ...

    Linux环境下通用线程池设计

    下面展示了一个简单的通用线程池的实现例子: 1. **任务节点定义** - `taskNode` 结构体用于封装待处理的任务数据,如网络连接的句柄 `sockfd`。 2. **任务处理函数** - `printsd` 函数是一个示例性的任务处理...

    linux线程池

    Linux线程池是一种高效利用系统资源的编程模型,它通过预先创建一组线程,然后将任务分配给这些线程来执行,而不是每次需要时都创建新的线程。这种设计模式可以减少线程创建和销毁的开销,提高系统的响应速度,并...

    linux下c++线程池

    本资源提供的线程池实现是一个简单而实用的例子,特别适合对C++多线程编程感兴趣或需要在Linux环境中应用线程池技术的开发者。 首先,C++11标准引入了线程库 `&lt;thread&gt;`,使得在C++中进行多线程编程变得更为便捷。...

    linux C++ 实现线程池(避免线程创建的耗时)

    7. **代码实现**:`ThreadPool`很可能是一个包含线程池管理逻辑的类,它可能包含如初始化线程池、添加任务、启动和停止线程池等功能。类的内部可能维护一个任务队列和线程集合,以及相关的同步原语(如互斥锁、条件...

    Linux下Epoll+线程池的简单Web服务器

    本文将深入探讨如何在Linux环境下使用Epoll和线程池技术构建一个简单的Web服务器。这个Web服务器的实现是基于Linux内核提供的异步I/O模型,Epoll,以及线程池策略,以优化系统资源的使用和提高并发性能。 首先,让...

    LINUX线程池框

    线程池的基本思想是预先创建一定数量的线程并将它们置于空闲状态,当有任务到来时,从线程池中选取一个空闲线程执行该任务。任务执行完毕后,线程返回线程池继续等待下一个任务,而不是立即销毁。这种方式可以显著...

    Linux C线程池简单实现实例

    本文档介绍了一个简单的Linux C语言实现的线程池示例,该实现主要包括三个部分:`tpool.h`、`tpool.c`以及一个用于测试或实际应用的`.c`文件。 #### tpool.h —— 线程池头文件 `tpool.h`定义了线程池所需的结构体...

Global site tag (gtag.js) - Google Analytics