- 浏览: 235918 次
- 性别:
- 来自: 南京
最新评论
-
baby8117628:
vc下mp3 IDv1和IDV2的读取 -
gezexu:
你好,我按照你的步骤一步步进行但是安装libvorbis的时候 ...
linux如何搭建强大的FFMPEG环境 -
ini_always:
帅哥,转载也把格式做好点,另外出处也要注明一下吧。。。
MP3文件格式解析
/*-------------------------------------------------------------------------
* tpool.h – 线程池定义
* -------------------------------------------------------------------------
*/
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include <stdio.h>
#include <pthread.h>
/*工作线程链表*/
typedef struct tpool_work{
void (*handler_routine)(); /*任务函数指针*/
void *arg; /*任务函数参数*/
struct tpool_work *next; /*下一个任务链表*/
} tpool_work_t;
/*线程池结构体*/
typedef struct tpool{
int num_threads; /*最大线程数*/
int max_queue_size; /*最大任务链表数*/
int do_not_block_when_full; /*当链表满时是否阻塞*/
pthread_t *threads; /*线程指针*/
int cur_queue_size;
tpool_work_t *queue_head; /*链表头*/
tpool_work_t *queue_tail; /*链表尾*/
pthread_mutex_t queue_lock; /*链表互斥量*/
pthread_cond_t queue_not_full; /*链表条件量-未满*/
pthread_cond_t queue_not_empty; /*链表条件量-非空*/
pthread_cond_t queue_empty; /*链表条件量-空*/
int queue_closed;
int shutdown;
} tpool_t;
/* 初始化连接池 */
extern tpool_t *tpool_init(int num_worker_threads,\
int max_queue_size, int do_not_block_when_full);
/* 添加一个工作线程 */
extern int tpool_add_work(tpool_t *pool, void (*routine)(), void *arg);
/* 清除线程池*/
extern int tpool_destroy(tpool_t *pool, int finish);
#endif /* _TPOOL_H_ */
/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\ /*线程池线程个数*/
int max_queue_size, \ /*最大任务数*/
int do_not_block_when_full) /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...\n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL) {
lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads; /*工作线程个数*/
pool->max_queue_size = max_queue_size; /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full; /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL) {
lprintf(log, FATAL,"Unable to malloc() thread info array\n");/*虽然malloc()并不是返回数组指针,但仍然可以这样用,表示数组。calloc返回数组指针*/
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0) {
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */ /*这里预创建的线程并不能运行,需要客户调度一个存在的函数才能工作。这里创建完线程以后,就放在那睡眠,直到唤醒*/
for(i = 0; i != num_worker_threads; i++) {
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0) { /*注意这里的pthread_create(),第一个参数是函数名,这里用malloc产生的指针(数组)来表示,第二个是函数实体tpool_thread,现在的tpool_thread执行部分是空的,就是说没有执行函数体*/
lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread %d!\n",i);
}
lprintf(log, INFO, "init pool end!\n");
return pool;
}
/*下面这个函数负责调度。把一个真实的函数调度到线程池,然后排队或者运行*/
int tpool_add_work(tpool_t *pool, \ /*线程池指针*/
void (*routine)(void *),\ /*工作线程函数指针*/
void *arg) /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) && \
(pool->do_not_block_when_full)) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed))) {
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0) {
lprintf(log,FATAL,"pthread cond wait failure\n");
return -1;
}
}
if(pool->shutdown || pool->queue_closed) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL) {
lprintf(log,FATAL,"unable to create work struct\n");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0) {
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0){
lprintf(log,FATAL,"pthread broadcast error\n");
return -1;
}
}
else {
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur; /*当前工作线程*/
lprintf(log, INFO, "destroy pool begin!\n");
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_lock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 第一步,设置线程退出标记 */
lprintf(log, INFO, "destroy pool begin 1!\n");
if(pool->queue_closed || pool->shutdown) {
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
/* 第二步,禁止新任务加入任务链表 */
lprintf(log, INFO, "destroy pool begin 2!\n");
pool->queue_closed = 1;
if(finish) {
while(pool->cur_queue_size != 0) {
if((rtn = pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
return -1;
}
}
}
/* 第三步,设置线程池销毁标记 */
lprintf(log, INFO, "destroy pool begin 3!\n");
pool->shutdown = 1;
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex unlock failure\n");
return -1;
}
/* 第四步,等待所有已建立的线程退出 */
lprintf(log, INFO, "destroy pool begin 4!\n");
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
if((rtn = pthread_cond_broadcast(&(pool->queue_not_full))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
for(i = 0; i < pool->num_threads; i++) {
if((rtn = pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %d\n",rtn);
return -1;
}
}
/* 第五步,释放线程池所占的内存空间 */
free(pool->threads);
while(pool->queue_head != NULL) {
cur = pool->queue_head->next;
pool->queue_head = pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO, "destroy pool end!\n");
return 0;
}
void *tpool_thread(void *tpool)
{
tpool_work_t *my_work;
tpool_t *pool = (struct tpool *)tpool;
for(;;){/* 线程内循环 */
pthread_mutex_lock(&(pool->queue_lock));
/* 如果任务列表为0,并且线程池没有关闭,则一直等待,直到任务到来为止 */
while((pool->cur_queue_size == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));
}
/* 线程池是否已经关闭,如果线程池关闭则线程自己主动关闭 */
if(pool->shutdown){
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL); /*线程退出状态为空,主线程不捕获各副线程状态*/
}
my_work = pool->queue_head;
pool->cur_queue_size--;
/*将任务链表头部去掉,改任务正在处理中*/
if(pool->cur_queue_size == 0)
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
/* 任务链表还没有满 */
if((!pool->do_not_block_when_full) &&\
(pool->cur_queue_size == (pool->max_queue_size - 1))) {
pthread_cond_broadcast(&(pool->queue_not_full));
}
/*任务链表为空*/
if(pool->cur_queue_size == 0) {
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
/*启动线程业务处理逻辑*/
(*(my_work->handler_routine))(my_work->arg);
free(my_work); /*这里释放的是链表头部的线程结构体*/
}
return(NULL);
}
/* -------------------------------------------------------------------------
* log.h 记录函数定义
* -------------------------------------------------------------------------
*/
#ifndef __LOG_H
#define __LOG_H
#include <stdio.h>
#include <semaphore.h>
/*记录的最大长度*/
#define LOGLINE_MAX 1024
/*记录的等级*/
#define DEBUG 1
#define INFO 2
#define WARN 3
#define ERROR 4
#define FATAL 5
/*记录的类型*/
#define LOG_TRUNC 1<<0
#define LOG_NODATE 1<<1
#define LOG_NOLF 1<<2
#define LOG_NOLVL 1<<3
#define LOG_DEBUG 1<<4
#define LOG_STDERR 1<<5
#define LOG_NOTID 1<<6
typedef struct {
int fd;
sem_t sem;
int flags;
} log_t;
/*
* 功能描述: 记录打印函数,将记录打印至记录文件logfile。
* 参数: log_t - log_open()函数的返回值
* level - 可以是: DEBUG, INFO, WARN, ERROR, FATAL
* fmt - 记录的内容,格式同printf()函数
* 返回值: 成功返回0,失败返回-1
*/
int lprintf( log_t *log, unsigned int level, char *fmt, ... );
/*
* 功能描述: 初始化记录文件the logfile
*参数: fname - 记录文件logfile的文件名
* flags - 记录格式的选项
* LOG_TRUNC - 截断打开的记录文件
* LOG_NODATE - 忽略记录中的每一行
* LOG_NOLF - 自动为每条记录新开一行.
* LOG_NOLVL - 不记录消息的等级
* LOG_STDERR - 将消息同时送到STDERR
*返回值:成功返回log_t(>0),失败返回NULL
*/
log_t *log_open( char *fname, int flags );
/*
* 功能描述:关闭记录文件
* 参数: * log - 记录文件的指针
*/
void log_close( log_t *log );
#endif
/* -------------------------------------------------------------------------
* log.c – 记录函数实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ... ) {
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char line[LOGLINE_MAX];
static char threadnum[10];
int cnt;
static char *levels[6] = { "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ", "[error] ", "[fatal] " };
if(!log) return -1;
if( !(log->flags&LOG_DEBUG) && level == DEBUG ) return 0;
fd=log->fd;
/*日期*/
if( !(log->flags&LOG_NODATE) ) {
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='\0';
}
/*线程号*/
if( !(log->flags&LOG_NOTID) ) {
sprintf(threadnum, "(TID:%lu) ", pthread_self());
}
cnt = snprintf(line, sizeof(line), "%s%s%s",
log->flags&LOG_NODATE ? "" : date,
log->flags&LOG_NOLVL ? "" :
(level > FATAL ? levels[0] : levels[level]),
log->flags&LOG_NOTID ? "" : threadnum);
va_start(ap, fmt);
vsnprintf(line+cnt, sizeof(line)-cnt, fmt, ap); /*如果输入的日志过长会自动截取*/
va_end(ap);
line[sizeof(line)-1] = '\0';
if( !(log->flags&LOG_NOLF) ) {
/*chomp(line);*/
/*strcpy(line+strlen(line), "\n");*/
}
sem_wait(&log->sem); /*用信号实现同步*/
rc = write(fd, line, strlen(line));
sem_post(&log->sem);
if( !rc ) errno = 0;
return rc;
}
log_t *log_open( char *fname, int flags ) {
log_t *log = malloc(sizeof(log_t));
if(!log) {
fprintf(stderr, "log_open: Unable to malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-") ) {
log->fd = 2;
} else {
log->fd = open(fname, O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ? O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1 ) {
fprintf(stderr, "log_open: Opening logfile %s: %s", fname, strerror(errno));
goto log_open_b;
}
if( sem_init(&log->sem, 0, 1) == -1 ) {
fprintf(stderr, "log_open: Could not initialize log semaphore.");
goto log_open_c;
}
return log;
log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
void log_close( log_t *log ) {
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
/* -------------------------------------------------------------------------
* testpool.c – 线程池测试程序
* -------------------------------------------------------------------------
*/
#include <pthread.h>
#include "log.h"
#include "tpool.h"
log_t *log; /*进程全局日志文件句柄*
/*任务*/
void thread(void *arg)
{
char * ptr=(char *)arg;
sleep(1);
printf("hello world! %s\n",ptr);
}
int main(int argc, char *argv[])
{
tpool_t *pool; /*线程池指针*/
/* 开启记录文件 */
log=log_open("test.log", 0);
/* 创建一个有100个工作线程,最大200个任务队列的线程池 */
pool=tpool_init(100,200,1);
int i;
/* 开启记录文件 */
* 添加100个任务*/
for (i = 0; i<100;i++) /*这里往线程池里一次性添加100个线程,参数都一样。在实际情况下,参数是可变的*/
tpool_add_work(pool,thread,"test!");
sleep(10);
/*终止线程池*/
tpool_destroy(pool,1);
/* 关闭记录文件 */
log_close(log);
pthread_exit(NULL);
}
* tpool.h – 线程池定义
* -------------------------------------------------------------------------
*/
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include <stdio.h>
#include <pthread.h>
/*工作线程链表*/
typedef struct tpool_work{
void (*handler_routine)(); /*任务函数指针*/
void *arg; /*任务函数参数*/
struct tpool_work *next; /*下一个任务链表*/
} tpool_work_t;
/*线程池结构体*/
typedef struct tpool{
int num_threads; /*最大线程数*/
int max_queue_size; /*最大任务链表数*/
int do_not_block_when_full; /*当链表满时是否阻塞*/
pthread_t *threads; /*线程指针*/
int cur_queue_size;
tpool_work_t *queue_head; /*链表头*/
tpool_work_t *queue_tail; /*链表尾*/
pthread_mutex_t queue_lock; /*链表互斥量*/
pthread_cond_t queue_not_full; /*链表条件量-未满*/
pthread_cond_t queue_not_empty; /*链表条件量-非空*/
pthread_cond_t queue_empty; /*链表条件量-空*/
int queue_closed;
int shutdown;
} tpool_t;
/* 初始化连接池 */
extern tpool_t *tpool_init(int num_worker_threads,\
int max_queue_size, int do_not_block_when_full);
/* 添加一个工作线程 */
extern int tpool_add_work(tpool_t *pool, void (*routine)(), void *arg);
/* 清除线程池*/
extern int tpool_destroy(tpool_t *pool, int finish);
#endif /* _TPOOL_H_ */
/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\ /*线程池线程个数*/
int max_queue_size, \ /*最大任务数*/
int do_not_block_when_full) /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...\n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL) {
lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads; /*工作线程个数*/
pool->max_queue_size = max_queue_size; /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full; /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL) {
lprintf(log, FATAL,"Unable to malloc() thread info array\n");/*虽然malloc()并不是返回数组指针,但仍然可以这样用,表示数组。calloc返回数组指针*/
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0) {
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */ /*这里预创建的线程并不能运行,需要客户调度一个存在的函数才能工作。这里创建完线程以后,就放在那睡眠,直到唤醒*/
for(i = 0; i != num_worker_threads; i++) {
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0) { /*注意这里的pthread_create(),第一个参数是函数名,这里用malloc产生的指针(数组)来表示,第二个是函数实体tpool_thread,现在的tpool_thread执行部分是空的,就是说没有执行函数体*/
lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread %d!\n",i);
}
lprintf(log, INFO, "init pool end!\n");
return pool;
}
/*下面这个函数负责调度。把一个真实的函数调度到线程池,然后排队或者运行*/
int tpool_add_work(tpool_t *pool, \ /*线程池指针*/
void (*routine)(void *),\ /*工作线程函数指针*/
void *arg) /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) && \
(pool->do_not_block_when_full)) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed))) {
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0) {
lprintf(log,FATAL,"pthread cond wait failure\n");
return -1;
}
}
if(pool->shutdown || pool->queue_closed) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL) {
lprintf(log,FATAL,"unable to create work struct\n");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0) {
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0){
lprintf(log,FATAL,"pthread broadcast error\n");
return -1;
}
}
else {
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur; /*当前工作线程*/
lprintf(log, INFO, "destroy pool begin!\n");
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_lock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 第一步,设置线程退出标记 */
lprintf(log, INFO, "destroy pool begin 1!\n");
if(pool->queue_closed || pool->shutdown) {
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
/* 第二步,禁止新任务加入任务链表 */
lprintf(log, INFO, "destroy pool begin 2!\n");
pool->queue_closed = 1;
if(finish) {
while(pool->cur_queue_size != 0) {
if((rtn = pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
return -1;
}
}
}
/* 第三步,设置线程池销毁标记 */
lprintf(log, INFO, "destroy pool begin 3!\n");
pool->shutdown = 1;
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex unlock failure\n");
return -1;
}
/* 第四步,等待所有已建立的线程退出 */
lprintf(log, INFO, "destroy pool begin 4!\n");
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
if((rtn = pthread_cond_broadcast(&(pool->queue_not_full))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
for(i = 0; i < pool->num_threads; i++) {
if((rtn = pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %d\n",rtn);
return -1;
}
}
/* 第五步,释放线程池所占的内存空间 */
free(pool->threads);
while(pool->queue_head != NULL) {
cur = pool->queue_head->next;
pool->queue_head = pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO, "destroy pool end!\n");
return 0;
}
void *tpool_thread(void *tpool)
{
tpool_work_t *my_work;
tpool_t *pool = (struct tpool *)tpool;
for(;;){/* 线程内循环 */
pthread_mutex_lock(&(pool->queue_lock));
/* 如果任务列表为0,并且线程池没有关闭,则一直等待,直到任务到来为止 */
while((pool->cur_queue_size == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));
}
/* 线程池是否已经关闭,如果线程池关闭则线程自己主动关闭 */
if(pool->shutdown){
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL); /*线程退出状态为空,主线程不捕获各副线程状态*/
}
my_work = pool->queue_head;
pool->cur_queue_size--;
/*将任务链表头部去掉,改任务正在处理中*/
if(pool->cur_queue_size == 0)
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
/* 任务链表还没有满 */
if((!pool->do_not_block_when_full) &&\
(pool->cur_queue_size == (pool->max_queue_size - 1))) {
pthread_cond_broadcast(&(pool->queue_not_full));
}
/*任务链表为空*/
if(pool->cur_queue_size == 0) {
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
/*启动线程业务处理逻辑*/
(*(my_work->handler_routine))(my_work->arg);
free(my_work); /*这里释放的是链表头部的线程结构体*/
}
return(NULL);
}
/* -------------------------------------------------------------------------
* log.h 记录函数定义
* -------------------------------------------------------------------------
*/
#ifndef __LOG_H
#define __LOG_H
#include <stdio.h>
#include <semaphore.h>
/*记录的最大长度*/
#define LOGLINE_MAX 1024
/*记录的等级*/
#define DEBUG 1
#define INFO 2
#define WARN 3
#define ERROR 4
#define FATAL 5
/*记录的类型*/
#define LOG_TRUNC 1<<0
#define LOG_NODATE 1<<1
#define LOG_NOLF 1<<2
#define LOG_NOLVL 1<<3
#define LOG_DEBUG 1<<4
#define LOG_STDERR 1<<5
#define LOG_NOTID 1<<6
typedef struct {
int fd;
sem_t sem;
int flags;
} log_t;
/*
* 功能描述: 记录打印函数,将记录打印至记录文件logfile。
* 参数: log_t - log_open()函数的返回值
* level - 可以是: DEBUG, INFO, WARN, ERROR, FATAL
* fmt - 记录的内容,格式同printf()函数
* 返回值: 成功返回0,失败返回-1
*/
int lprintf( log_t *log, unsigned int level, char *fmt, ... );
/*
* 功能描述: 初始化记录文件the logfile
*参数: fname - 记录文件logfile的文件名
* flags - 记录格式的选项
* LOG_TRUNC - 截断打开的记录文件
* LOG_NODATE - 忽略记录中的每一行
* LOG_NOLF - 自动为每条记录新开一行.
* LOG_NOLVL - 不记录消息的等级
* LOG_STDERR - 将消息同时送到STDERR
*返回值:成功返回log_t(>0),失败返回NULL
*/
log_t *log_open( char *fname, int flags );
/*
* 功能描述:关闭记录文件
* 参数: * log - 记录文件的指针
*/
void log_close( log_t *log );
#endif
/* -------------------------------------------------------------------------
* log.c – 记录函数实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ... ) {
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char line[LOGLINE_MAX];
static char threadnum[10];
int cnt;
static char *levels[6] = { "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ", "[error] ", "[fatal] " };
if(!log) return -1;
if( !(log->flags&LOG_DEBUG) && level == DEBUG ) return 0;
fd=log->fd;
/*日期*/
if( !(log->flags&LOG_NODATE) ) {
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='\0';
}
/*线程号*/
if( !(log->flags&LOG_NOTID) ) {
sprintf(threadnum, "(TID:%lu) ", pthread_self());
}
cnt = snprintf(line, sizeof(line), "%s%s%s",
log->flags&LOG_NODATE ? "" : date,
log->flags&LOG_NOLVL ? "" :
(level > FATAL ? levels[0] : levels[level]),
log->flags&LOG_NOTID ? "" : threadnum);
va_start(ap, fmt);
vsnprintf(line+cnt, sizeof(line)-cnt, fmt, ap); /*如果输入的日志过长会自动截取*/
va_end(ap);
line[sizeof(line)-1] = '\0';
if( !(log->flags&LOG_NOLF) ) {
/*chomp(line);*/
/*strcpy(line+strlen(line), "\n");*/
}
sem_wait(&log->sem); /*用信号实现同步*/
rc = write(fd, line, strlen(line));
sem_post(&log->sem);
if( !rc ) errno = 0;
return rc;
}
log_t *log_open( char *fname, int flags ) {
log_t *log = malloc(sizeof(log_t));
if(!log) {
fprintf(stderr, "log_open: Unable to malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-") ) {
log->fd = 2;
} else {
log->fd = open(fname, O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ? O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1 ) {
fprintf(stderr, "log_open: Opening logfile %s: %s", fname, strerror(errno));
goto log_open_b;
}
if( sem_init(&log->sem, 0, 1) == -1 ) {
fprintf(stderr, "log_open: Could not initialize log semaphore.");
goto log_open_c;
}
return log;
log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
void log_close( log_t *log ) {
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
/* -------------------------------------------------------------------------
* testpool.c – 线程池测试程序
* -------------------------------------------------------------------------
*/
#include <pthread.h>
#include "log.h"
#include "tpool.h"
log_t *log; /*进程全局日志文件句柄*
/*任务*/
void thread(void *arg)
{
char * ptr=(char *)arg;
sleep(1);
printf("hello world! %s\n",ptr);
}
int main(int argc, char *argv[])
{
tpool_t *pool; /*线程池指针*/
/* 开启记录文件 */
log=log_open("test.log", 0);
/* 创建一个有100个工作线程,最大200个任务队列的线程池 */
pool=tpool_init(100,200,1);
int i;
/* 开启记录文件 */
* 添加100个任务*/
for (i = 0; i<100;i++) /*这里往线程池里一次性添加100个线程,参数都一样。在实际情况下,参数是可变的*/
tpool_add_work(pool,thread,"test!");
sleep(10);
/*终止线程池*/
tpool_destroy(pool,1);
/* 关闭记录文件 */
log_close(log);
pthread_exit(NULL);
}
发表评论
-
内存屏障
2010-02-26 11:03 1508处理器的乱序和并发执行 目前的高级处理器,为了提高内部逻辑元 ... -
函数调用堆栈分析
2010-02-26 10:53 1383理解调用栈最重要的两 ... -
mtrace检测内存泄露
2010-02-25 09:50 1091[url] http://math.acadiau.ca/AC ... -
c语言编程之字符串操作
2010-02-25 09:41 8631. //在s串中查找与s1相匹配的字符串,找到后用 ... -
linux C 链接库 so制作及调用[转]
2010-02-24 16:26 2579文章分类:C++编程 [文章作者:陈毓端 若转载请标注原文链 ... -
mtrace的使用
2010-02-24 16:02 1313对于内存溢出之类的麻烦可能大家在编写指针比较多的复杂的程序的时 ... -
单片机的C语言中位操作用法(转
2010-02-24 14:27 2213单片机的C语言中位操作用法 作者:郭天祥 在对单处机进 ... -
Linux下的itoa函数
2010-02-21 17:55 1766上篇文章说到linux需要it ... -
va_list、va_start、va_arg、va_end的原理与使用
2010-02-05 10:34 29031. 概述 由于在C语言中没有函数重载,解 ... -
快速排序(quickSort)
2010-02-04 10:50 8681. #include <stdio.h> ... -
C问题---itoa函数
2010-02-04 10:36 1048------------------------------ ... -
itoa函数及atoi函数
2010-02-04 10:35 1313C语言提供了几个标准库函数,可以将任意类型(整型、长整型、浮点 ... -
结构体零长度数组的作用
2010-02-04 10:21 1375在一些 C 语言编写的代码中,有时可以看到如下定义的结构: ... -
优化C代码常用的几招
2010-02-04 10:14 776性能优化方面永远注意8 ... -
我经常去的网站
2010-02-03 17:53 1623MFC相关网站 www.codeproject.com ht ... -
可重入函数与不可重入函数
2010-02-03 16:35 932原文地址:http://blog.chin ... -
哈夫曼编码
2010-02-03 16:26 1317本文描述在网上能够找到的最简单,最快速的哈夫曼编码。本方法不使 ... -
优化变成了忧患:String.split引发的“内存泄露”
2010-02-01 17:39 1117一直赞叹Sun对待技术的 ... -
锁无关的(Lock-Free)数据结构——在避免死锁的同时确保线程
2010-01-26 14:47 907http://hi.baidu.com/%5F%E2%64%5 ... -
使用 GNU profiler 来提高代码运行速度
2010-01-26 13:46 784进应用程序的性能是一 ...
相关推荐
在深入探讨Linux C编程及其相关知识点之前,首先要理解C语言本身的重要性。C语言是一种强大的、通用的编程语言,被广泛用于系统级编程,因为它提供了对硬件的直接访问,并且具有高效的执行性能。在Linux环境下,...
本文将深入探讨线程池的实现原理及其在Linux环境下的应用。 线程池的核心组成部分包括工作线程、任务队列、线程同步机制(如互斥量和条件变量)以及线程管理接口。在给定的描述中提到,这个线程池实现了增加线程、...
- **进程池和线程池**: 详细解释了进程池和线程池的概念、工作原理及其优势,特别是在减少资源消耗、提高响应速度方面的应用。 - **高性能I/O框架库Libevent**: Libevent是一个高性能的事件驱动库,用于处理大量并发...
4. **跨平台编译脚本**:文件`build_for_android.sh`, `build_for_mac.sh`, `build_for_ios.sh`, 和 `build_for_linux.sh`表明该框架旨在支持多种操作系统,包括Android、macOS、iOS和Linux。这些脚本通常用于配置和...
- `ps`命令:通过`ps -efH`可以列出所有进程及其线程,`-H`选项表示按线程显示。 - `top`命令:使用`top -H`可以看到每个进程的线程信息,包括CPU使用率、内存使用等。 - `htop`命令:增强版的`top`,图形化界面...
- **Linux发行版**:Ubuntu、CentOS、Red Hat等不同版本的特点及其适用场景。 - **安装与配置**:如何安装Linux系统及基本的环境配置。 #### 二、Linux基础命令 - **文件与目录管理**:ls、cd、mkdir等常用命令的...
- 测试与调试技巧:介绍如何有效测试和调试程序,提高软件质量。 **3. 进程管理** - **进程基本概念**:深入讲解进程的概念、生命周期及其状态转换。 - **进程控制**:学习创建、终止进程以及父进程与子进程之间的...
4. **条件变量的应用**:探讨条件变量的作用及其在同步控制中的应用。 5. **线程同步与通信**:深入理解线程间如何进行同步和通信,以确保数据的一致性和完整性。 6. **线程安全性的保障**:提供避免死锁和竞态条件...
- **功能增强**:支持微秒级别的时间戳、线程池、子查询优化、组提交和进度报告等功能。 - **体积小、速度快**:设计轻量级,运行效率高。 - **开源、免费**:开放源代码,无额外费用,安全可靠。 **MariaDB在Linux...
3.3.2线程池. . . . . . . . . . . . . . . . . . . .. . . . . . . . . . . . . 63 3.3.3推荐模式. . . . . . . . . . . . . . . . . . . .. . . . . . . . . . . . 64 3.4进程间通信只用TCP . . . . . . . . . . ....
#### 三、多线程问题及其解决方法 - **竞态条件(Race Conditions):** - 定义:当两个或多个线程访问同一资源且其操作顺序影响最终结果时发生的问题。 - 解决方法: - 使用互斥锁保护共享资源。 - 确保临界区...
以下是常用的Linux命令及其简要说明: 1. **`rm –rf 文件夹名称`**:强制删除指定的文件夹及其所有子目录。 2. **`rm –f text.log`**:强制删除指定的文件。 3. **`ls`**:列出当前目录下的所有文件及目录。 4. *...
10. **并发模型**:了解线程池、事件驱动、协程等并发模型及其在多线程测试中的应用也是重要的学习点。 综上所述,这个“test_CPU.rar_源码”压缩包包含了一个多线程CPU性能测试工具的源代码,涵盖了多线程编程、...
- **补丁更新**:Oracle经常发布补丁以修复已知问题,保持WebLogic及其依赖的JDK最新,有助于问题解决。 4. **性能优化** - **JVM调优**:调整JVM堆大小(如`-Xms`和`-Xmx`参数),设置新生代和老年代比例,启用...
以下是对 Openfire 服务端在 Linux 上的实现及其相关开发的详细说明: 1. **Linux 服务器上的 Openfire 安装** - 首先,确保你的 Linux 服务器满足 Openfire 的系统需求,通常需要Java运行环境(JRE)和Java开发...
【系统软件需求清单及其技术参数】的文档详细列出了对软件和数据库的严格技术要求,主要涉及以下几个关键知识点: 1. **J2EE兼容性**:软件必须符合J2EE 1.2、1.3和1.4规范,并通过SUN的认证,确保其在多版本的Java...
### WebLogic宕机问题及其解决策略 #### 一、引言 在现代企业级应用部署中,Oracle WebLogic Server作为一款高性能的企业级Java应用服务器,因其稳定性和强大的功能集受到广泛青睐。然而,在实际生产环境中,...