`
weiyuhu
  • 浏览: 235886 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

linux线程池及其测试

阅读更多
/*-------------------------------------------------------------------------
* tpool.h – 线程池定义
* -------------------------------------------------------------------------
*/
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include  <stdio.h>
#include  <pthread.h>
/*工作线程链表*/
typedef struct tpool_work{
void (*handler_routine)();            /*任务函数指针*/
void *arg;                        /*任务函数参数*/
struct tpool_work *next;                /*下一个任务链表*/
} tpool_work_t;
/*线程池结构体*/
typedef struct tpool{
int num_threads;                    /*最大线程数*/
int max_queue_size;                /*最大任务链表数*/
int do_not_block_when_full;            /*当链表满时是否阻塞*/
pthread_t *threads;                /*线程指针*/
int cur_queue_size;
tpool_work_t *queue_head;            /*链表头*/
tpool_work_t *queue_tail;            /*链表尾*/
pthread_mutex_t queue_lock;        /*链表互斥量*/
pthread_cond_t queue_not_full;        /*链表条件量-未满*/
pthread_cond_t queue_not_empty;    /*链表条件量-非空*/
pthread_cond_t queue_empty;        /*链表条件量-空*/
int queue_closed;
int shutdown;
} tpool_t;
/* 初始化连接池 */
extern tpool_t *tpool_init(int num_worker_threads,\
int max_queue_size, int do_not_block_when_full);
/* 添加一个工作线程 */
extern int tpool_add_work(tpool_t *pool, void  (*routine)(), void *arg);
/* 清除线程池*/
extern int tpool_destroy(tpool_t *pool, int finish);
#endif /* _TPOOL_H_ */

/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include  <stdio.h>
#include  <stdlib.h>
#include  <string.h>
#include  <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\    /*线程池线程个数*/
int max_queue_size, \        /*最大任务数*/
int do_not_block_when_full)    /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...\n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL) {
lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads;                      /*工作线程个数*/
pool->max_queue_size = max_queue_size;                       /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full;       /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL) {
lprintf(log, FATAL,"Unable to malloc() thread info array\n");/*虽然malloc()并不是返回数组指针,但仍然可以这样用,表示数组。calloc返回数组指针*/
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0) {
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */ /*这里预创建的线程并不能运行,需要客户调度一个存在的函数才能工作。这里创建完线程以后,就放在那睡眠,直到唤醒*/
for(i = 0; i != num_worker_threads; i++) {
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0) { /*注意这里的pthread_create(),第一个参数是函数名,这里用malloc产生的指针(数组)来表示,第二个是函数实体tpool_thread,现在的tpool_thread执行部分是空的,就是说没有执行函数体*/
lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread  %d!\n",i);
}
lprintf(log, INFO, "init pool end!\n");
return pool;
}
/*下面这个函数负责调度。把一个真实的函数调度到线程池,然后排队或者运行*/
int tpool_add_work(tpool_t *pool,   \           /*线程池指针*/
void (*routine)(void *),\   /*工作线程函数指针*/
void *arg)                  /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) && \
(pool->do_not_block_when_full)) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed))) {
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0) {
lprintf(log,FATAL,"pthread cond wait failure\n");
return -1;
}
}
if(pool->shutdown || pool->queue_closed) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)  {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL) {
lprintf(log,FATAL,"unable to create work struct\n");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0) {
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0){
lprintf(log,FATAL,"pthread broadcast error\n");
return -1;
}
}
else {
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur;   /*当前工作线程*/
lprintf(log, INFO, "destroy pool begin!\n");
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_lock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 第一步,设置线程退出标记 */
lprintf(log, INFO, "destroy pool begin 1!\n");
if(pool->queue_closed || pool->shutdown) {
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
/* 第二步,禁止新任务加入任务链表 */
lprintf(log, INFO, "destroy pool begin 2!\n");
pool->queue_closed = 1;
if(finish) {
while(pool->cur_queue_size != 0) {
if((rtn = pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
return -1;
}
}
}
/* 第三步,设置线程池销毁标记 */
lprintf(log, INFO, "destroy pool begin 3!\n");
pool->shutdown = 1;
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex unlock failure\n");
return -1;
}
/* 第四步,等待所有已建立的线程退出 */
lprintf(log, INFO, "destroy pool begin 4!\n");
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
if((rtn = pthread_cond_broadcast(&(pool->queue_not_full)))  != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
for(i = 0; i < pool->num_threads; i++) {
if((rtn = pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %d\n",rtn);
return -1;
}
}
/* 第五步,释放线程池所占的内存空间 */
free(pool->threads);
while(pool->queue_head != NULL) {
cur = pool->queue_head->next;
pool->queue_head = pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO, "destroy pool end!\n");
return 0;
}
void *tpool_thread(void *tpool)
{
tpool_work_t *my_work;
tpool_t *pool = (struct tpool *)tpool;

for(;;){/* 线程内循环 */
pthread_mutex_lock(&(pool->queue_lock)); 
/* 如果任务列表为0,并且线程池没有关闭,则一直等待,直到任务到来为止  */
while((pool->cur_queue_size == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));
}
/* 线程池是否已经关闭,如果线程池关闭则线程自己主动关闭 */
if(pool->shutdown){
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL);      /*线程退出状态为空,主线程不捕获各副线程状态*/
}
my_work = pool->queue_head;
pool->cur_queue_size--;
/*将任务链表头部去掉,改任务正在处理中*/
if(pool->cur_queue_size == 0)  
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
/* 任务链表还没有满 */
if((!pool->do_not_block_when_full) &&\
(pool->cur_queue_size == (pool->max_queue_size - 1))) {
pthread_cond_broadcast(&(pool->queue_not_full));
}
/*任务链表为空*/
if(pool->cur_queue_size == 0) {
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
/*启动线程业务处理逻辑*/
(*(my_work->handler_routine))(my_work->arg);
free(my_work);  /*这里释放的是链表头部的线程结构体*/
}
return(NULL);
}

/* -------------------------------------------------------------------------
* log.h 记录函数定义
* -------------------------------------------------------------------------
*/
#ifndef __LOG_H
#define __LOG_H

#include <stdio.h>
#include <semaphore.h>
/*记录的最大长度*/
#define LOGLINE_MAX 1024
/*记录的等级*/
#define DEBUG 1
#define INFO  2
#define WARN  3
#define ERROR 4
#define FATAL 5
/*记录的类型*/
#define LOG_TRUNC   1<<0
#define LOG_NODATE  1<<1
#define LOG_NOLF    1<<2
#define LOG_NOLVL   1<<3
#define LOG_DEBUG   1<<4
#define LOG_STDERR  1<<5
#define LOG_NOTID   1<<6
typedef struct {
int fd;
sem_t sem;
int flags;
} log_t;

/*
* 功能描述:    记录打印函数,将记录打印至记录文件logfile。
* 参数:        log_t - log_open()函数的返回值
*             level - 可以是: DEBUG, INFO, WARN, ERROR, FATAL
*            fmt  - 记录的内容,格式同printf()函数
* 返回值:    成功返回0,失败返回-1
*/
int lprintf( log_t *log, unsigned int level, char *fmt, ... );
/*
* 功能描述: 初始化记录文件the logfile
*参数:    fname - 记录文件logfile的文件名
*         flags     -  记录格式的选项
*          LOG_TRUNC  -    截断打开的记录文件
*          LOG_NODATE -    忽略记录中的每一行
*          LOG_NOLF    -    自动为每条记录新开一行.
*          LOG_NOLVL   - 不记录消息的等级
*          LOG_STDERR -    将消息同时送到STDERR
*返回值:成功返回log_t(>0),失败返回NULL
*/
log_t *log_open( char *fname, int flags );

/*
* 功能描述:关闭记录文件
* 参数:        * log  - 记录文件的指针
*/
void log_close( log_t *log );

#endif
/* -------------------------------------------------------------------------
* log.c – 记录函数实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ... ) {
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char line[LOGLINE_MAX];
static char threadnum[10];
int cnt;
static char *levels[6] = { "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ",  "[error] ", "[fatal] " };
if(!log) return -1;
if( !(log->flags&LOG_DEBUG) && level == DEBUG ) return 0;
fd=log->fd;
/*日期*/
if( !(log->flags&LOG_NODATE) ) {
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='\0';
}
/*线程号*/
if( !(log->flags&LOG_NOTID) ) {
sprintf(threadnum, "(TID:%lu) ", pthread_self());
}
cnt = snprintf(line, sizeof(line), "%s%s%s",
log->flags&LOG_NODATE ? "" : date,
log->flags&LOG_NOLVL  ? "" :
(level > FATAL ? levels[0] : levels[level]),
log->flags&LOG_NOTID  ? "" : threadnum);
va_start(ap, fmt);
vsnprintf(line+cnt, sizeof(line)-cnt, fmt, ap);    /*如果输入的日志过长会自动截取*/
va_end(ap);
line[sizeof(line)-1] = '\0';
if( !(log->flags&LOG_NOLF) ) {
/*chomp(line);*/
/*strcpy(line+strlen(line), "\n");*/
}
sem_wait(&log->sem);         /*用信号实现同步*/
rc = write(fd, line, strlen(line));
sem_post(&log->sem);
if( !rc ) errno = 0;
return rc;
}

log_t *log_open( char *fname, int flags ) {
log_t *log = malloc(sizeof(log_t));
if(!log) {
fprintf(stderr, "log_open: Unable to malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-") ) {
log->fd = 2;
} else {
log->fd = open(fname, O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ? O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1 ) {
fprintf(stderr, "log_open: Opening logfile %s: %s", fname, strerror(errno));
goto log_open_b;
}
if( sem_init(&log->sem, 0, 1) == -1 ) {
fprintf(stderr, "log_open: Could not initialize log semaphore.");
goto log_open_c;
}
return log;

log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
void log_close( log_t *log ) {
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
/* -------------------------------------------------------------------------
* testpool.c – 线程池测试程序
* -------------------------------------------------------------------------
*/

#include <pthread.h>
#include "log.h"
#include "tpool.h"

log_t *log;  /*进程全局日志文件句柄*
/*任务*/
void thread(void *arg)
{
char * ptr=(char *)arg;

sleep(1);
printf("hello world! %s\n",ptr);
}

int main(int argc, char *argv[])
{
tpool_t *pool;  /*线程池指针*/

/* 开启记录文件 */
log=log_open("test.log", 0);
/* 创建一个有100个工作线程,最大200个任务队列的线程池 */
pool=tpool_init(100,200,1);
int i;
/* 开启记录文件 */
* 添加100个任务*/
for (i = 0; i<100;i++)  /*这里往线程池里一次性添加100个线程,参数都一样。在实际情况下,参数是可变的*/
tpool_add_work(pool,thread,"test!");
sleep(10);
/*终止线程池*/
tpool_destroy(pool,1);
/* 关闭记录文件 */
log_close(log);
pthread_exit(NULL);
}
分享到:
评论

相关推荐

    Linux C编程一站式学习

    在深入探讨Linux C编程及其相关知识点之前,首先要理解C语言本身的重要性。C语言是一种强大的、通用的编程语言,被广泛用于系统级编程,因为它提供了对硬件的直接访问,并且具有高效的执行性能。在Linux环境下,...

    linux下线程池

    本文将深入探讨线程池的实现原理及其在Linux环境下的应用。 线程池的核心组成部分包括工作线程、任务队列、线程同步机制(如互斥量和条件变量)以及线程管理接口。在给定的描述中提到,这个线程池实现了增加线程、...

    大并发服务器必看-Linux高性能服务器编程

    - **进程池和线程池**: 详细解释了进程池和线程池的概念、工作原理及其优势,特别是在减少资源消耗、提高响应速度方面的应用。 - **高性能I/O框架库Libevent**: Libevent是一个高性能的事件驱动库,用于处理大量并发...

    基于C++11线程池技术简单易用的轻量级网络编程框架源码.zip

    4. **跨平台编译脚本**:文件`build_for_android.sh`, `build_for_mac.sh`, `build_for_ios.sh`, 和 `build_for_linux.sh`表明该框架旨在支持多种操作系统,包括Android、macOS、iOS和Linux。这些脚本通常用于配置和...

    linux 下 跟踪线程运行情况

    - `ps`命令:通过`ps -efH`可以列出所有进程及其线程,`-H`选项表示按线程显示。 - `top`命令:使用`top -H`可以看到每个进程的线程信息,包括CPU使用率、内存使用等。 - `htop`命令:增强版的`top`,图形化界面...

    Linux程序设计权威指南1.pdf

    - **Linux发行版**:Ubuntu、CentOS、Red Hat等不同版本的特点及其适用场景。 - **安装与配置**:如何安装Linux系统及基本的环境配置。 #### 二、Linux基础命令 - **文件与目录管理**:ls、cd、mkdir等常用命令的...

    Advanced Linux Programming

    - 测试与调试技巧:介绍如何有效测试和调试程序,提高软件质量。 **3. 进程管理** - **进程基本概念**:深入讲解进程的概念、生命周期及其状态转换。 - **进程控制**:学习创建、终止进程以及父进程与子进程之间的...

    高效linux的多线程编程经验12条总结

    4. **条件变量的应用**:探讨条件变量的作用及其在同步控制中的应用。 5. **线程同步与通信**:深入理解线程间如何进行同步和通信,以确保数据的一致性和完整性。 6. **线程安全性的保障**:提供避免死锁和竞态条件...

    Linux服务器配置与管理:MariaDB数据库基础.pptx

    - **功能增强**:支持微秒级别的时间戳、线程池、子查询优化、组提交和进度报告等功能。 - **体积小、速度快**:设计轻量级,运行效率高。 - **开源、免费**:开放源代码,无额外费用,安全可靠。 **MariaDB在Linux...

    Linux多线程服务端编程:使用muduo C++网络库

    3.3.2线程池. . . . . . . . . . . . . . . . . . . .. . . . . . . . . . . . . 63 3.3.3推荐模式. . . . . . . . . . . . . . . . . . . .. . . . . . . . . . . . 64 3.4进程间通信只用TCP . . . . . . . . . . ....

    unix/linux多线程编程指南(sun技术文档)

    #### 三、多线程问题及其解决方法 - **竞态条件(Race Conditions):** - 定义:当两个或多个线程访问同一资源且其操作顺序影响最终结果时发生的问题。 - 解决方法: - 使用互斥锁保护共享资源。 - 确保临界区...

    Json、Linux、Spring定时任务

    以下是常用的Linux命令及其简要说明: 1. **`rm –rf 文件夹名称`**:强制删除指定的文件夹及其所有子目录。 2. **`rm –f text.log`**:强制删除指定的文件。 3. **`ls`**:列出当前目录下的所有文件及目录。 4. *...

    test_CPU.rar_源码

    10. **并发模型**:了解线程池、事件驱动、协程等并发模型及其在多线程测试中的应用也是重要的学习点。 综上所述,这个“test_CPU.rar_源码”压缩包包含了一个多线程CPU性能测试工具的源代码,涵盖了多线程编程、...

    weblogic10.0.36 windows linux 64位安装手册以及问题解决,性能优化合集

    - **补丁更新**:Oracle经常发布补丁以修复已知问题,保持WebLogic及其依赖的JDK最新,有助于问题解决。 4. **性能优化** - **JVM调优**:调整JVM堆大小(如`-Xms`和`-Xmx`参数),设置新生代和老年代比例,启用...

    openfire服务端实现

    以下是对 Openfire 服务端在 Linux 上的实现及其相关开发的详细说明: 1. **Linux 服务器上的 Openfire 安装** - 首先,确保你的 Linux 服务器满足 Openfire 的系统需求,通常需要Java运行环境(JRE)和Java开发...

    系统软件需求清单及其技术参数.pdf

    【系统软件需求清单及其技术参数】的文档详细列出了对软件和数据库的严格技术要求,主要涉及以下几个关键知识点: 1. **J2EE兼容性**:软件必须符合J2EE 1.2、1.3和1.4规范,并通过SUN的认证,确保其在多版本的Java...

    WebLogic宕机大全总结

    ### WebLogic宕机问题及其解决策略 #### 一、引言 在现代企业级应用部署中,Oracle WebLogic Server作为一款高性能的企业级Java应用服务器,因其稳定性和强大的功能集受到广泛青睐。然而,在实际生产环境中,...

Global site tag (gtag.js) - Google Analytics