`
kenby
  • 浏览: 725795 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Linux C线程池实现

阅读更多

三个文件

 

1 tpool.h

typedef struct tpool_work {
    void               (*routine)(void *);
    void                *arg;
    struct tpool_work   *next;
} tpool_work_t;

typedef struct tpool {
    /* pool characteristics */
    int                 num_threads;
    int                 max_queue_size;
    /* pool state */
    pthread_t           *tpid;
    tpool_work_t        *queue;
    int                 front, rear;
    /* 剩下的任务可以做完, 但不能再加新的任务 */
    int                 queue_closed;   
    /* 剩下的任务都不做了, 直接关闭 */
    int                 shutdown;       
    /* pool synchronization */
    pthread_mutex_t     queue_lock;
    pthread_cond_t      queue_has_task;
    pthread_cond_t      queue_has_space;
    pthread_cond_t      queue_empty;
} *tpool_t;

void tpool_init(tpool_t *tpoolp,int num_threads, int max_queue_size);

int tpool_add_work(tpool_t tpool,void(*routine)(void *), void *arg);

int tpool_destroy(tpool_t tpool,int finish);

 2 tpool.c

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"

#define DEBUG

#if defined(DEBUG)
#define debug(...) do { \
    flockfile(stdout); \
    printf("###%p.%s: ", (void *)pthread_self(), __func__); \
    printf(__VA_ARGS__); \
    putchar('\n'); \
    fflush(stdout); \
    funlockfile(stdout); \
} while (0)
#else
#define debug(...)
#endif

void *tpool_thread(void *);

void tpool_init(tpool_t *tpoolp, int num_worker_threads, int max_queue_size)
{
    int i;
    tpool_t pool;

    pool = (tpool_t)malloc(sizeof(struct tpool));
    if (pool == NULL) {
        perror("malloc");
        exit(0);
    }

    pool->num_threads = 0;
    pool->max_queue_size = max_queue_size + 1;
    pool->num_threads = num_worker_threads;
    pool->tpid = NULL;
    pool->front = 0;
    pool->rear = 0;
    pool->queue_closed = 0;
    pool->shutdown = 0;

    if (pthread_mutex_init(&pool->queue_lock, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }
    if (pthread_cond_init(&pool->queue_has_space, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }
    if (pthread_cond_init(&pool->queue_has_task, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }
    if (pthread_cond_init(&pool->queue_empty, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }

    if ((pool->queue = malloc(sizeof(struct tpool_work) * 
                    pool->max_queue_size)) == NULL) {
        perror("malloc");
        free(pool);
        exit(0);
    }

    if ((pool->tpid = malloc(sizeof(pthread_t) * num_worker_threads)) == NULL) {
        perror("malloc");
        free(pool);
        free(pool->queue);
        exit(0);
    }

    for (i = 0; i < num_worker_threads; i++) {
        if (pthread_create(&pool->tpid[i], NULL, tpool_thread, 
                    (void *)pool) != 0) {
            perror("pthread_create");
            exit(0);
        }
    }

    *tpoolp = pool;
}


int empty(tpool_t pool)
{
    return  pool->front == pool->rear;
}

int full(tpool_t pool)
{
    return ((pool->rear + 1) % pool->max_queue_size == pool->front);
}

int size(tpool_t pool)
{
    return (pool->rear + pool->max_queue_size -
                pool->front) % pool->max_queue_size;
}

int tpool_add_work(tpool_t tpool, void(*routine)(void *), void *arg)
{
    tpool_work_t *temp;

    pthread_mutex_lock(&tpool->queue_lock);

    while (full(tpool) && !tpool->shutdown && !tpool->queue_closed) {
        pthread_cond_wait(&tpool->queue_has_space, &tpool->queue_lock);
    }

    if (tpool->shutdown || tpool->queue_closed) {
        pthread_mutex_unlock(&tpool->queue_lock);
        return -1;
    }

    int is_empty = empty(tpool);

    temp = tpool->queue + tpool->rear;
    temp->routine = routine;
    temp->arg = arg;
    tpool->rear = (tpool->rear + 1) % tpool->max_queue_size;

    if (is_empty) {
        debug("signal has task");
        pthread_cond_broadcast(&tpool->queue_has_task);
    }

    pthread_mutex_unlock(&tpool->queue_lock);    

    return 0;
}

void *tpool_thread(void *arg)
{
    tpool_t pool = (tpool_t)(arg);
    tpool_work_t *work;

    for (;;) {
        pthread_mutex_lock(&pool->queue_lock);

        while (empty(pool) && !pool->shutdown) {
            debug("I'm sleep");
            pthread_cond_wait(&pool->queue_has_task, &pool->queue_lock);
        }
        debug("I'm awake");

        if (pool->shutdown == 1) {
            debug("exit");
            pthread_mutex_unlock(&pool->queue_lock);
            pthread_exit(NULL);
        }

        int is_full = full(pool);
        work = pool->queue + pool->front;
        pool->front = (pool->front + 1) % pool->max_queue_size;

        if (is_full) {
            pthread_cond_broadcast(&pool->queue_has_space);
        }

        if (empty(pool)) {
            pthread_cond_signal(&pool->queue_empty);
        }

        pthread_mutex_unlock(&pool->queue_lock);   

        (*(work->routine))(work->arg);
    }
}

int tpool_destroy(tpool_t tpool, int finish)
{
    int     i;

    pthread_mutex_lock(&tpool->queue_lock);

    tpool->queue_closed = 1;

    if (finish == 1) {
        debug("wait all work done");
        while (!empty(tpool)) {
            pthread_cond_wait(&tpool->queue_empty, &tpool->queue_lock);
        }
    }
    tpool->shutdown = 1;

    pthread_mutex_unlock(&tpool->queue_lock);

    pthread_cond_broadcast(&tpool->queue_has_task);

    debug("wait worker thread exit");
    for (i = 0; i < tpool->num_threads; i++) {
        pthread_join(tpool->tpid[i], NULL);
    }

    debug("free thread pool");
    free(tpool->tpid);
    free(tpool->queue);
    free(tpool);
}

 

3 tpooltest.c

#include <stdio.h>
#include <pthread.h>
#include "tpool.h"

char *str[]={"string 0", "string 1", "string 2", 
                "string 3", "string 4", "string 5"};

void job(void * jobstr)
{
    long i, x;

    for (i = 0; i < 100000000; i++)  {
        x = x +i;
    }
    printf("%s\n", (char *)jobstr);
}

int main(void)
{
    int i;  
    tpool_t test_pool;

    tpool_init(&test_pool, 8, 20);

    for ( i = 0; i < 5; i++) {
        tpool_add_work(test_pool, job, str[i]);
    }

    tpool_destroy(test_pool, 1);

    return 0;
}
 
分享到:
评论

相关推荐

    Linux C语言 线程池 状态机 并发处理Demo

    本项目提供了一个关于“Linux C语言线程池、状态机和并发处理”的Demo,非常适合初学者理解和实践多线程技术。下面将详细阐述这些知识点。 首先,线程池是一种线程管理机制,它预先创建了一组线程,待有任务需要...

    linux线程池创建c实现

    Linux 线程池创建 C 实现 线程池是一种常用的并发编程技术,它可以提高应用程序的性能和响应速度。在 Linux 系统中,使用 C 语言创建线程池可以实现高效的并发处理。 什么时候需要创建线程池呢?简单的说,如果一...

    linux线程池,c语言实现

    以上就是关于“Linux线程池,C语言实现”的主要技术要点。实际的代码实现会涉及到这些概念的具体应用和细节处理。在开发过程中,要充分考虑线程池的扩展性、可靠性和效率,以满足不同应用场景的需求。

    Linux下C线程池实现

    在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...

    linux c 线程池连接mysql

    在Linux系统下,C语言是实现底层编程和高性能服务器应用的常见选择。本文将深入探讨如何利用C语言、Epoll事件模型、线程池以及数据库连接池来构建一个高并发的MySQL连接服务。 1. **C语言与MySQL连接**: C语言...

    linuxC语言线程池实现CP命令

    linux系统下C语言 利用线程池技术实现CP命令 压缩包包含:源代码+开发说明PPT 线程池头文件: //任务 struct task { void *(*task)(void *arg); void *arg; struct task *next; }; //线程池 typedef struct ...

    C语言实现简单线程池.zip

    总的来说,这个项目提供了一个基础的C语言线程池实现,展示了如何利用POSIX线程在Linux环境下进行多线程编程。理解和实现这样的线程池不仅可以提高程序的效率,还能帮助开发者深入理解操作系统级的并发编程。

    linux线程池的C语言实现

    因此,针对C语言环境下的线程池实现变得尤为重要。 ##### 4.1 关键数据结构 - `tp_work_desc`: 描述了线程执行任务时所需的一些信息,具体内容取决于具体应用场景,需由用户自定义。 - `tp_work`: 定义了线程将要...

    Linux C系统编程:使用线程池实现cp命令

    总结起来,Linux C系统编程中使用线程池实现类似`cp`命令的功能,是一个涉及多线程编程、任务调度和同步控制的综合实践。通过这样的实现,我们可以提高文件复制操作的并发性和效率,同时降低系统资源的消耗。在深入...

    非常好用的C语言线程池,自己测试通过

    这个“非常好用的C语言线程池”是一个纯C代码实现的线程池库,适合在C语言项目中使用,提高多任务处理的效率和资源利用率。 线程池的工作原理: 1. **初始化**:线程池在启动时会预先创建一定数量的线程,这些线程...

    linux下C语言实现线程池

    下面是一个简单的线程池实现步骤: 1. **初始化线程池**:设置线程池大小,创建相应数量的工作线程,并初始化任务队列。 2. **创建任务**:定义任务结构体,包含任务函数和参数。 3. **提交任务**:将任务放入任务...

    C语言 Linux Nginx 线程池

    这些元素共同构成了一个高级话题,即如何在Linux环境中使用C语言来实现Nginx服务器的线程池功能。线程池是一种优化资源分配和管理的策略,它预先创建一组线程,当有任务需要执行时,从线程池中取出一个线程来处理...

    linux c 线程池

    在这个“linux c 线程池”资源中,我们可以预见到它提供了C语言实现的线程池模型,用于在Linux操作系统下处理客户端和服务端之间的交互。 1. **线程池概念**: 线程池是多个线程的集合,这些线程由一个中央调度器...

    linux C线程池开放源码封装

    在IT行业中,线程池是一种...总的来说,Linux C线程池开放源码封装是利用C语言和pthread库实现高效并发执行任务的一种方法。通过理解和应用这些源码,开发者可以构建自己的线程池系统,以适应各种复杂的并发应用场景。

    线程池实现,通过C语言实现

    在Linux环境下,通过C语言实现线程池是一种常见的编程实践。 在给定的资源中,"200行C代码实现简单线程池.doc"可能包含了详细的设计和实现步骤,以及如何在实际项目中应用线程池的文档。而"threadpool.c"则是实际的...

    Linux下线程池的C语言实现

    在深入探讨Linux下线程池的C语言实现之前,我们首先需要理解线程池的基本概念以及它在系统设计中的重要性。线程池是一种管理线程的机制,它预先创建一组固定数量的线程,等待任务的到来,从而避免了频繁创建和销毁...

    Linux C线程池简单实现实例

    本实例提供了一个简单的Linux C线程池实现,可用于学习和实验。通过对线程池基本概念的理解,我们可以更高效地管理多线程程序中的线程生命周期,从而提高程序的整体性能。此外,通过深入分析源代码,还可以进一步...

    linux线程池c源码

    本文将深入解析一个Linux下的线程池实现,该实现使用C语言编写,并遵循GNU通用公共许可证版本2(或之后版本)。线程池是一种软件设计模式,它可以提高程序执行效率,通过复用已创建的线程来减少创建和销毁线程的开销...

    linux c线程池

    linux pthreadpool实现和线程池的用处 简单易懂 互斥和信号量使用

    LINUXC线程池.pdf

    通过对这些内容的分析,可以掌握Linux下C语言编程中实现线程池的基本方法,这包括了对互斥锁、条件变量、线程的创建和销毁的运用。这使得程序员可以更加高效地编写多线程程序,避免了因过度创建和销毁线程所导致的...

Global site tag (gtag.js) - Google Analytics