- 浏览: 156641 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
lyaqys:
lz实现的OptimisticExclusiveLock有点问 ...
java park/unpark 【java并发】基于JUC CAS原理,自己实现简单独占锁
/*
Thread Pool implementation for unix / linux environments
Copyright (C) 2008 Shobhit Gupta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include "threadpool.h"
using namespace std;
#define ITERATIONS 200
class SampleWorkerThread : public WorkerThread
{
public:
int id;
unsigned virtual executeThis()
{
// Instead of sleep() we could do anytime consuming work here.
//Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds)
sleep(2);
return(0);
}
SampleWorkerThread(int id) : WorkerThread(id), id(id)
{
// cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl;
}
~SampleWorkerThread()
{
// cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl;
}
};
int main(int argc, char **argv)
{
//ThreadPool(N);
//Create a Threadpool with N number of threads
ThreadPool* myPool = new ThreadPool(25);
myPool->initializeThreads();
//We will count time elapsed after initializeThreads()
time_t t1=time(NULL);
//Lets start bullying ThreadPool with tonnes of work !!!
for(unsigned int i=0;i<ITERATIONS;i++){
SampleWorkerThread* myThread = new SampleWorkerThread(i);
//cout << "myThread[" << myThread->id << "] = [" << myThread << "]" << endl;
myPool->assignWork(myThread);
}
// destroyPool(int maxPollSecs)
// Before actually destroying the ThreadPool, this function checks if all the pending work is completed.
// If the work is still not done, then it will check again after maxPollSecs
// The default value for maxPollSecs is 2 seconds.
// And ofcourse the user is supposed to adjust it for his needs.
myPool->destroyPool(2);
time_t t2=time(NULL);
cout << t2-t1 << " seconds elapsed\n" << endl;
delete myPool;
return 0;
}
----------------------
/*
Thread Pool implementation for unix / linux environments
Copyright (C) 2008 Shobhit Gupta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "threadpool.h"
using namespace std;
pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
ThreadPool::ThreadPool()
{
ThreadPool(2);
}
ThreadPool::ThreadPool(int maxThreads)
{
if (maxThreads < 1) maxThreads=1;
//mutexSync = PTHREAD_MUTEX_INITIALIZER;
//mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&mutexSync);
this->maxThreads = maxThreads;
this->queueSize = maxThreads;
//workerQueue = new WorkerThread *[maxThreads];
workerQueue.resize(maxThreads, NULL);
topIndex = 0;
bottomIndex = 0;
incompleteWork = 0;
sem_init(&availableWork, 0, 0);
sem_init(&availableThreads, 0, queueSize);
pthread_mutex_unlock(&mutexSync);
}
void ThreadPool::initializeThreads()
{
for(int i = 0; i<maxThreads; ++i)
{
pthread_t tempThread;
pthread_create(&tempThread, NULL, &ThreadPool::threadExecute, (void *) this );
//threadIdVec[i] = tempThread;
}
}
ThreadPool::~ThreadPool()
{
workerQueue.clear();
}
void ThreadPool::destroyPool(int maxPollSecs = 2)
{
while( incompleteWork>0 )
{
//cout << "Work is still incomplete=" << incompleteWork << endl;
sleep(maxPollSecs);
}
cout << "All Done!! Wow! That was a lot of work!" << endl;
sem_destroy(&availableWork);
sem_destroy(&availableThreads);
pthread_mutex_destroy(&mutexSync);
pthread_mutex_destroy(&mutexWorkCompletion);
}
bool ThreadPool::assignWork(WorkerThread *workerThread)
{
pthread_mutex_lock(&mutexWorkCompletion);
incompleteWork++;
//cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
pthread_mutex_unlock(&mutexWorkCompletion);
sem_wait(&availableThreads);
pthread_mutex_lock(&mutexSync);
//workerVec[topIndex] = workerThread;
workerQueue[topIndex] = workerThread;
//cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
if(queueSize !=1 )
topIndex = (topIndex+1) % (queueSize-1);
sem_post(&availableWork);
pthread_mutex_unlock(&mutexSync);
return true;
}
bool ThreadPool::fetchWork(WorkerThread **workerArg)
{
sem_wait(&availableWork);
pthread_mutex_lock(&mutexSync);
WorkerThread * workerThread = workerQueue[bottomIndex];
workerQueue[bottomIndex] = NULL;
*workerArg = workerThread;
if(queueSize !=1 )
bottomIndex = (bottomIndex+1) % (queueSize-1);
sem_post(&availableThreads);
pthread_mutex_unlock(&mutexSync);
return true;
}
void *ThreadPool::threadExecute(void *param)
{
WorkerThread *worker = NULL;
while(((ThreadPool *)param)->fetchWork(&worker))
{
if(worker)
{
worker->executeThis();
//cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
delete worker;
worker = NULL;
}
pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
//cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
((ThreadPool *)param)->incompleteWork--;
pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
}
return 0;
}
-------------------------------------
/*
Thread Pool implementation for unix / linux environments
Copyright (C) 2008 Shobhit Gupta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>
using namespace std;
/*
WorkerThread class
This class needs to be sobclassed by the user.
*/
class WorkerThread{
public:
int id;
unsigned virtual executeThis()
{
return 0;
}
WorkerThread(int id) : id(id) {}
virtual ~WorkerThread(){}
};
/*
ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and ynchronizations between all threads.
*/
class ThreadPool{
public:
ThreadPool();
ThreadPool(int maxThreadsTemp);
virtual ~ThreadPool();
void destroyPool(int maxPollSecs);
bool assignWork(WorkerThread *worker);
bool fetchWork(WorkerThread **worker);
void initializeThreads();
static void *threadExecute(void *param);
static pthread_mutex_t mutexSync;
static pthread_mutex_t mutexWorkCompletion;
private:
int maxThreads;
pthread_cond_t condCrit;
sem_t availableWork;
sem_t availableThreads;
//WorkerThread ** workerQueue;
vector<WorkerThread *> workerQueue;
int topIndex;
int bottomIndex;
int incompleteWork;
int queueSize;
};
Thread Pool implementation for unix / linux environments
Copyright (C) 2008 Shobhit Gupta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include "threadpool.h"
using namespace std;
#define ITERATIONS 200
class SampleWorkerThread : public WorkerThread
{
public:
int id;
unsigned virtual executeThis()
{
// Instead of sleep() we could do anytime consuming work here.
//Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds)
sleep(2);
return(0);
}
SampleWorkerThread(int id) : WorkerThread(id), id(id)
{
// cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl;
}
~SampleWorkerThread()
{
// cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl;
}
};
int main(int argc, char **argv)
{
//ThreadPool(N);
//Create a Threadpool with N number of threads
ThreadPool* myPool = new ThreadPool(25);
myPool->initializeThreads();
//We will count time elapsed after initializeThreads()
time_t t1=time(NULL);
//Lets start bullying ThreadPool with tonnes of work !!!
for(unsigned int i=0;i<ITERATIONS;i++){
SampleWorkerThread* myThread = new SampleWorkerThread(i);
//cout << "myThread[" << myThread->id << "] = [" << myThread << "]" << endl;
myPool->assignWork(myThread);
}
// destroyPool(int maxPollSecs)
// Before actually destroying the ThreadPool, this function checks if all the pending work is completed.
// If the work is still not done, then it will check again after maxPollSecs
// The default value for maxPollSecs is 2 seconds.
// And ofcourse the user is supposed to adjust it for his needs.
myPool->destroyPool(2);
time_t t2=time(NULL);
cout << t2-t1 << " seconds elapsed\n" << endl;
delete myPool;
return 0;
}
----------------------
/*
Thread Pool implementation for unix / linux environments
Copyright (C) 2008 Shobhit Gupta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "threadpool.h"
using namespace std;
pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
ThreadPool::ThreadPool()
{
ThreadPool(2);
}
ThreadPool::ThreadPool(int maxThreads)
{
if (maxThreads < 1) maxThreads=1;
//mutexSync = PTHREAD_MUTEX_INITIALIZER;
//mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&mutexSync);
this->maxThreads = maxThreads;
this->queueSize = maxThreads;
//workerQueue = new WorkerThread *[maxThreads];
workerQueue.resize(maxThreads, NULL);
topIndex = 0;
bottomIndex = 0;
incompleteWork = 0;
sem_init(&availableWork, 0, 0);
sem_init(&availableThreads, 0, queueSize);
pthread_mutex_unlock(&mutexSync);
}
void ThreadPool::initializeThreads()
{
for(int i = 0; i<maxThreads; ++i)
{
pthread_t tempThread;
pthread_create(&tempThread, NULL, &ThreadPool::threadExecute, (void *) this );
//threadIdVec[i] = tempThread;
}
}
ThreadPool::~ThreadPool()
{
workerQueue.clear();
}
void ThreadPool::destroyPool(int maxPollSecs = 2)
{
while( incompleteWork>0 )
{
//cout << "Work is still incomplete=" << incompleteWork << endl;
sleep(maxPollSecs);
}
cout << "All Done!! Wow! That was a lot of work!" << endl;
sem_destroy(&availableWork);
sem_destroy(&availableThreads);
pthread_mutex_destroy(&mutexSync);
pthread_mutex_destroy(&mutexWorkCompletion);
}
bool ThreadPool::assignWork(WorkerThread *workerThread)
{
pthread_mutex_lock(&mutexWorkCompletion);
incompleteWork++;
//cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
pthread_mutex_unlock(&mutexWorkCompletion);
sem_wait(&availableThreads);
pthread_mutex_lock(&mutexSync);
//workerVec[topIndex] = workerThread;
workerQueue[topIndex] = workerThread;
//cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
if(queueSize !=1 )
topIndex = (topIndex+1) % (queueSize-1);
sem_post(&availableWork);
pthread_mutex_unlock(&mutexSync);
return true;
}
bool ThreadPool::fetchWork(WorkerThread **workerArg)
{
sem_wait(&availableWork);
pthread_mutex_lock(&mutexSync);
WorkerThread * workerThread = workerQueue[bottomIndex];
workerQueue[bottomIndex] = NULL;
*workerArg = workerThread;
if(queueSize !=1 )
bottomIndex = (bottomIndex+1) % (queueSize-1);
sem_post(&availableThreads);
pthread_mutex_unlock(&mutexSync);
return true;
}
void *ThreadPool::threadExecute(void *param)
{
WorkerThread *worker = NULL;
while(((ThreadPool *)param)->fetchWork(&worker))
{
if(worker)
{
worker->executeThis();
//cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
delete worker;
worker = NULL;
}
pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
//cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
((ThreadPool *)param)->incompleteWork--;
pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
}
return 0;
}
-------------------------------------
/*
Thread Pool implementation for unix / linux environments
Copyright (C) 2008 Shobhit Gupta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>
using namespace std;
/*
WorkerThread class
This class needs to be sobclassed by the user.
*/
class WorkerThread{
public:
int id;
unsigned virtual executeThis()
{
return 0;
}
WorkerThread(int id) : id(id) {}
virtual ~WorkerThread(){}
};
/*
ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and ynchronizations between all threads.
*/
class ThreadPool{
public:
ThreadPool();
ThreadPool(int maxThreadsTemp);
virtual ~ThreadPool();
void destroyPool(int maxPollSecs);
bool assignWork(WorkerThread *worker);
bool fetchWork(WorkerThread **worker);
void initializeThreads();
static void *threadExecute(void *param);
static pthread_mutex_t mutexSync;
static pthread_mutex_t mutexWorkCompletion;
private:
int maxThreads;
pthread_cond_t condCrit;
sem_t availableWork;
sem_t availableThreads;
//WorkerThread ** workerQueue;
vector<WorkerThread *> workerQueue;
int topIndex;
int bottomIndex;
int incompleteWork;
int queueSize;
};
发表评论
-
简单的linux -c http-client
2013-10-23 15:35 4769#include<stdio.h> #includ ... -
linux进程cpu资源分配命令nice,renice,taskset
2013-09-04 14:03 1190nice,renice 指定进程运行的优先级 taskset ... -
c++ 动态内存分配
2013-08-28 22:35 867先看一段代码: [cpp] view plaincopy ... -
c 专家编程
2013-08-13 17:06 703总结: -2> int * a = NUL ... -
Linux中线程与CPU核的绑定
2013-08-09 15:15 2135最近在对项目进行性能 ... -
建议编译的时候加警告 atof
2013-08-07 20:46 738#include <stdlib.h> ... -
feodra 17 安装 chrome
2013-08-04 01:35 7831: 下载:http://www.google.cn/chro ... -
Sudo提权出现:xx用户不在 sudoers 文件中
2013-08-03 20:22 920Sudo提权出现:xx用户不在 sudoers 文件中 症状 ... -
select,epoll,poll比较
2013-07-28 17:13 691select,poll,epoll简介 se ... -
gcc编译程序时,可能会用到“-I”(大写i),“-L”(大写l),“-l”(小写l)等参数
2013-07-22 22:45 938我们用gcc编译程序时,可能会用到“-I”(大写i),“-L” ... -
Linux下如何将进程绑定在特定的CPU上运行
2013-07-22 10:52 997Linux下如何将进程绑定在特定的CPU上运行? 以root用 ... -
linux运维常用命令
2013-07-13 20:40 907推荐一个实用命令:awk '{x+=$2} END {prin ... -
linux 进程通信方式
2013-07-07 20:46 630# 管道( pipe ):管道是一种半双工的通信方式,数据只能 ... -
判断两个一个链表是否存在循环(C专家编程中的问题)
2013-06-24 15:35 930判断两个一个链表是否存在循环(C专家编程中的问题) #incl ... -
atoi源码
2013-05-14 19:32 1291原文: http://blog.csdn.net/eroswa ... -
为重负网络优化 Nginx 和 Node.js
2013-05-13 01:12 1033原文:http://linux.cn/forum.php?mo ... -
c语言特殊字符串复制
2013-05-06 01:59 8772.strcpy和memcpy主要有以下3方面的区别。 2.1 ... -
《APUE》:线程和fork(父子进程锁)
2013-04-29 21:07 1200《Unix环境高级编程》这本书附带了许多短小精美的小程序,我在 ... -
CentOS升级Python到2.7版本
2013-04-23 15:24 949[root@localhost ~] python -V ... -
Linux多线程同步的几种方式
2013-04-22 22:49 797Linux多线程同步的几种方式 线程的最大特点是资 ...
相关推荐
标题中的“免积分C++11写的可复用的线程池类 - 开源”意味着这是一个使用C++11标准库实现的线程池类,它具有可复用性,并且是开源的,允许开发者查看和修改源代码,以便根据项目需求进行定制。 线程池是一种多线程...
2、为什么不使用.Net默认的线程池.Net默认的线程池(ThreadPool)是一个静态类,所以是没办法自己创建一个新的程序池的。默认的线程池与应用程序域 (AppDomain)挂钩,一个AppDomain只有一个线程池。假如在线程池中执行...
线程池,别人开源的,稳定性测试还不错,c语言的,linux下面运行没问题
例如,可以设置一个全局或静态的回调队列,工作线程在完成任务后将回调函数添加到该队列,然后通知主线程去执行。 4. **优化** - **线程数量调整**:线程池中线程过多会增加上下文切换的开销,过少则可能导致CPU...
在计算机编程中,线程池模式(也称为复制工作者或工作人员模型)是创建多个线程... 一旦一个线程完成它的任务,它就会从队列中请求下一个任务,直到所有任务都完成。 然后线程可以终止或Hibernate,直到有新任务可用。
一旦任务分配给一个线程,它就会执行任务并返回到待命状态,直到被分配下一个任务。 3. **任务队列**:用于存储待处理的任务。当有新的任务加入时,它会被放入队列中,由线程池管理器调度分配给空闲线程。 4. **...
这个“非常好用的C语言线程池”是一个纯C代码实现的线程池库,适合在C语言项目中使用,提高多任务处理的效率和资源利用率。 线程池的工作原理: 1. **初始化**:线程池在启动时会预先创建一定数量的线程,这些线程...
在描述中提到的"很经典的一个线程池应用"可能指的是一个开源库,如Android的`AsyncTask`或者更先进的`OkHttp`的调度器,它们都提供了线程池管理的实现。这些框架通常会根据任务类型和系统资源智能地调整线程数量,以...
Quartz 是一个开源的作业调度框架,它允许开发者在 Java 应用程序中安排任务的执行。线程池是 Quartz 的核心组成部分,用于管理和执行触发的任务。本文将深入探讨 Quartz 线程池的工作原理、配置以及如何在实际项目...
其次,鱼刺模块线程池可能是另一个开源项目,其具体细节可能因项目而异,但同样遵循线程池的基本原理。鱼刺模块线程池可能提供了更高级的特性,例如线程优先级、任务队列的管理策略、线程超时控制等,以适应不同场景...
本文详细介绍了美团动态线程池实践思路开源项目(DynamicTp)的通知告警模块,该模块提供了多种通知告警功能,每一个通知项都可以独立配置是否开启、告警阈值、告警间隔时间、平台等。具体代码请看 core 模块 notify...
线程池ThreadPool。
3. **ACE框架**:ACE是一个开源的软件框架,其设计目标是提供一套可移植的、面向对象的中间件服务,用于构建分布式实时和嵌入式系统。它包含了大量网络编程、并发处理、对象间通信和系统资源管理的工具和类。ACE的...
一个易于使用的C ++ 11线程池。 使用ThreadPool类对自由函数进行排队,并使用std :: for_each()和std :: transform()的并行版本。 可配置为仅用于标头或与库一起使用。 有许多用法示例。
线程池是一种多线程处理形式,预先创建一定数量的线程,当任务到来时,线程池会分配一个空闲线程处理任务,而不是每次都创建新的线程。这减少了创建和销毁线程的开销,提高了系统的效率。Tomcat的线程池实现是基于...
"uThreadPool"是针对Pascal语言的一个线程池实现,可能是一个开源库或组件,用于帮助开发者更方便地管理和调度并发任务。 标题中的"uThreadPool线程池最新修正"表明这个库可能有一个更新版本,修复了之前用户在CSDN...
cnpack线程池调用一般步骤。cnpack开源项目中TCnThreadPool线程池使用步骤。
POCO(Portable Class Library)是一个开源的C++库,旨在提供跨平台的网络、文件系统、时间日期、JSON、XML、加密、压缩和线程管理等功能。线程池是POCO库的一部分,位于`Net`模块下,主要用于并发编程。 线程池的...
对于那些需要处理大量异步任务或高并发场景的Spring Boot应用来说,这是一个值得考虑的优秀工具。开发者可以通过项目地址(https://gitee.com/hanxt/zimug-monitor-threadpool)获取更多详细信息,并将其应用于自己...
在上面的例子中,我们首先创建了一个包含4个线程的线程池,然后将10个任务(每个任务都是一个简单的函数`worker_function`)提交到线程池。`thread_pool.wait()`确保所有任务都完成后才结束主线程。 使用预编译好的...