`
airu
  • 浏览: 271528 次
  • 性别: Icon_minigender_1
  • 来自: 云南
社区版块
存档分类
最新评论

线程队列

 
阅读更多
队列很常见,但是如果我们考虑在多线程环境中,那么可能就要注意同步互斥了。
这里使用读写锁,可以在读的时候不需要锁住整个队列。但是添加到队列就必须用互斥的锁了。
这里就介绍一下读写锁。pthread_rwlock_t lock定义一个读写锁lock,同样注意要初始化。
用 pthread_rwlock_init(&lock) 来初始化 lock。

下面来看一个列子,使用双向链表实现队列。可以从头添加或者从尾添加。
th_queue.h
#ifndef TH_QUEUE_H
#define TH_QUEUE_H
#endif

struct task{
 struct task *prev;
 struct task *next;
 pthread_t pid;
 void *(*handle)(void *);
 void *arg;
 int task_id;
};

struct th_queue{
        struct task *tail;
        struct task *head;
        pthread_rwlock_t q_lock;
        unsigned int count;
};

/*
 * insert into the head of the queue
 */
void ins_th_queue(struct th_queue*, struct task*);

/*
 * remove the task from the queue
 * return: the task which comes from the queue.if the task no
 *         int the queue ,return NULL
 */
struct task* rm_f_th_queue(struct th_queue*, struct task*);

/*
 * find a task from the queue with the task_id
 * return: if we find the task, NULL if we don't find it
 */
struct task* q_f_th_queue(struct th_queue*, int task_id);

/*
 * initialize the queue
 */
int init_th_queue(struct th_queue*);

/*
 * add the task to the tail of the queue
 */
void app_th_queue(struct th_queue*, struct task*);


头文件定义了一个task,是放入队列的任务。我们可以执行它,现在暂时不考虑太多。
接下来是实现 th_queue.c

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<sys/types.h>
#include<assert.h>
#include"th_queue.h"

int init_th_queue(struct th_queue * queue)
{
        int err;

        assert(queue!=NULL);

        queue->head = NULL;
        queue->tail = NULL;
        err = pthread_rwlock_init(&queue->q_lock,NULL);
        if(err!= 0)
                return err;
        queue->count = 0;
        return 0;
}


void app_th_queue(struct th_queue* queue, struct task *task)
{
        assert(task != NULL);
        assert(queue != NULL);

        if(q_f_th_queue(queue,task->task_id)!=NULL){
                printf("You have appended!\n");
                return;
        }
        pthread_rwlock_wrlock(&queue->q_lock);
        task->next = NULL;
        task->prev = queue->tail;
        if(queue->tail != NULL){
                queue->tail->next = task;
	}else{
                queue->head = task;
        }
        queue->tail = task;
        queue->count++;
        pthread_rwlock_unlock(&queue->q_lock);

}

void ins_th_queue(struct th_queue* queue, struct task *task)
{
        assert(queue != NULL);
        assert(task != NULL);
        if(q_f_th_queue(queue,task->task_id)!=NULL){
                printf("You have inserted!\n");
                return;
        }    
        pthread_rwlock_wrlock(&queue->q_lock);  
        task->next = queue->head;
        task->prev = NULL;
        if(queue->head != NULL)
                queue->head->prev = task;
        else
                queue->tail = task;
        queue->head = task;
        queue->count++;
        pthread_rwlock_unlock(&queue->q_lock);
}

struct task* rm_f_th_queue(struct th_queue* queue, struct task *task)
{
        assert(queue != NULL);
        assert(task != NULL);
        struct task *iter;
        int flag = 0;
        pthread_rwlock_rdlock(&queue->q_lock);
        for(iter = queue->head;iter != NULL; iter = iter->next){
                if(iter == task){
                        flag = 1;
                        break;
                }
        }
        pthread_rwlock_unlock(&queue->q_lock);
        if(!flag){
                 printf("task %d not in the queue!\n",task->task_id);
                 return task;
        }

        pthread_rwlock_wrlock(&queue->q_lock);
        if(queue->head == task){
                queue->head = task->next;
                if(queue->tail == task)
                        queue->tail = NULL;
        }else if(queue->tail == task){
                queue->tail = task->prev;
 		if(queue->head == task)
                        queue->head = NULL;
        }else{
                task->prev->next = task->next;
                task->next->prev = task->prev;
        }
        queue->count--;
        pthread_rwlock_unlock(&queue->q_lock);
        return task;

}

struct task* q_f_th_queue(struct th_queue* queue, int id)
{
        assert(queue != NULL);

        struct task *iter;
        if(pthread_rwlock_rdlock(&queue->q_lock)!=0)
                return NULL;
        for(iter = queue->head; iter!=NULL;iter = iter->next){
                if(iter->task_id == id){
                        pthread_rwlock_unlock(&queue->q_lock);
                        return (iter);
                }
        }
        pthread_rwlock_unlock(&queue->q_lock);
        return NULL;
}



怎么样很简单吧。接下来,我们要测试一下我们的queue是否是线程同步的呢?
写个测试文件吧。test_th_queue.c
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<sys/types.h>
#include<pthread.h>
#include"th_queue.h"

#define MAX 100
//define the operation type
enum ops{
	APP,
	INS,
	RM
};
// we use it to simplify the test
struct op_queue{
	struct task *task;
	enum ops op;
};
//global queue
struct th_queue *gl_queue;

//the real function to do the test
void *th_process_task(void* arg)
{
	struct op_queue *oq = (struct op_queue *)arg;
	struct task *t = oq->task;
	switch(oq->op){
	case APP: app_th_queue(gl_queue,t);
		  printf("add task %d to tail, queue count=%d\n",t->task_id,gl_queue->count);
		  break;
	case INS: ins_th_queue(gl_queue,t);
		  printf("add task %d to head, queue count=%d\n",t->task_id,gl_queue->count);
		  break;
	case RM:  t = rm_f_th_queue(gl_queue,t);
		  printf("rm task %d ,queue count=%d\n",t->task_id,gl_queue->count);
		  break;
	default: app_th_queue(gl_queue,t);
		 printf("add task %d to tail, queue count=%d\n",t->task_id,gl_queue->count);
	}
	return ((void *)0);
}
int main(int argc, char **argv)
{
	pthread_t pid[MAX];
	struct task taskset[MAX];
	struct op_queue q_op[MAX];
	int i,d,err;
	d = 0;
	gl_queue = (struct th_queue*)malloc(sizeof(struct th_queue));
	init_th_queue(gl_queue);
        // just for the test
	for(i = 0; i < MAX; i++){
		taskset[i].next = NULL;
		taskset[i].prev = NULL;
		taskset[i].handle = NULL;
		taskset[i].arg = NULL;
		taskset[i].task_id = i;
		taskset[i].status = 0;
		taskset[i].pri = 0;
		taskset[i].pos = 0;
	}	
	for(i = 0; i < MAX; i++){
		q_op[i].task = &taskset[i];	
		if( i < (MAX/2)){
			q_op[i].op = APP;
		}else if( i>= (MAX/2) && (i <= (MAX/2 + MAX/4))){
			q_op[i].op = INS;
		}else{
                       //actually we can omit following row
			q_op[i].op = RM;
		}
	}	
   //we will create 100 threads to operate the queue
	for(i = 0; i < MAX ; i++){
		if(i<= (MAX/2 + MAX/4)){
			err = pthread_create(&pid[i],NULL,th_process_task,&q_op[i]);
			if(err != 0){
				fprintf(stderr,"can't create thread ");
				exit(1);
			}	
			if(pthread_join(pid[i],NULL)!= 0){
				fprintf(stderr,"can't join thread");
				exit(1);
			}
		}else{
			q_op[d].op=RM;
			
			err = pthread_create(&pid[i],NULL,th_process_task,&q_op[d]);
			if(err != 0){
				fprintf(stderr,"can't create thread ");
				exit(1);
			}	
			if(pthread_join(pid[i],NULL)!= 0){
				fprintf(stderr,"can't join thread");
				exit(1);
			}
			d++;

		}	
	}
	//please free the queue ,if we don't use it again
        //but, we will exit completely, so i did not write free here
}


编译,链接:
$gcc -g -c th_queue.c
$gcc -g -c test_th_queue.c
$gcc -g -lpthread -o test_th_queue th_queue.o test_th_queue.o


来看看结果吧。

add task 0 to tail, queue count=1
add task 1 to tail, queue count=2
add task 2 to tail, queue count=3
add task 3 to tail, queue count=4
add task 4 to tail, queue count=5
add task 5 to tail, queue count=6
add task 6 to tail, queue count=7
add task 7 to tail, queue count=8
add task 8 to tail, queue count=9
add task 9 to tail, queue count=10
add task 10 to tail, queue count=11
add task 11 to tail, queue count=12
add task 12 to tail, queue count=13
add task 13 to tail, queue count=14
add task 14 to tail, queue count=15
add task 15 to tail, queue count=16
add task 16 to tail, queue count=17
add task 17 to tail, queue count=18
add task 18 to tail, queue count=19
add task 19 to tail, queue count=20
add task 20 to tail, queue count=21
add task 21 to tail, queue count=22
add task 22 to tail, queue count=23
add task 23 to tail, queue count=24
add task 24 to tail, queue count=25
add task 25 to tail, queue count=26
add task 26 to tail, queue count=27
add task 27 to tail, queue count=28
add task 28 to tail, queue count=29
add task 29 to tail, queue count=30
add task 30 to tail, queue count=31
add task 31 to tail, queue count=32
add task 32 to tail, queue count=33
add task 33 to tail, queue count=34
add task 34 to tail, queue count=35
add task 35 to tail, queue count=36
add task 36 to tail, queue count=37
add task 37 to tail, queue count=38
add task 38 to tail, queue count=39
add task 39 to tail, queue count=40
add task 40 to tail, queue count=41
add task 41 to tail, queue count=42
add task 42 to tail, queue count=43
add task 43 to tail, queue count=44
add task 44 to tail, queue count=45
add task 45 to tail, queue count=46
add task 46 to tail, queue count=47
add task 47 to tail, queue count=48
add task 48 to tail, queue count=49
add task 49 to tail, queue count=50
add task 50 to head, queue count=51
add task 51 to head, queue count=52
add task 52 to head, queue count=53
add task 53 to head, queue count=54
add task 54 to head, queue count=55
add task 55 to head, queue count=56
add task 56 to head, queue count=57
add task 57 to head, queue count=58
add task 58 to head, queue count=59
add task 59 to head, queue count=60
add task 60 to head, queue count=61
add task 61 to head, queue count=62
add task 62 to head, queue count=63
add task 63 to head, queue count=64
add task 64 to head, queue count=65
add task 65 to head, queue count=66
add task 66 to head, queue count=67
add task 67 to head, queue count=68
add task 68 to head, queue count=69
add task 69 to head, queue count=70
add task 70 to head, queue count=71
add task 71 to head, queue count=72
add task 72 to head, queue count=73
add task 73 to head, queue count=74
add task 74 to head, queue count=75
add task 75 to head, queue count=76
rm task 0 ,queue count=75
rm task 1 ,queue count=74
rm task 2 ,queue count=73
rm task 3 ,queue count=72
rm task 4 ,queue count=71
rm task 5 ,queue count=70
rm task 6 ,queue count=69
rm task 7 ,queue count=68
rm task 8 ,queue count=67
rm task 9 ,queue count=66
rm task 10 ,queue count=65
rm task 11 ,queue count=64
rm task 12 ,queue count=63
rm task 13 ,queue count=62
rm task 14 ,queue count=61
rm task 15 ,queue count=60
rm task 16 ,queue count=59
rm task 17 ,queue count=58
rm task 18 ,queue count=57
rm task 19 ,queue count=56
rm task 20 ,queue count=55
rm task 21 ,queue count=54
rm task 22 ,queue count=53
rm task 23 ,queue count=52

简单吧。。。
分享到:
评论

相关推荐

    MFC的多线程队列程序源代码

    本示例中的"多线程队列程序源代码"是一个很好的实践案例,用于理解多线程和队列的原理及其在实际编程中的应用。 首先,我们要明白多线程的概念。在单线程程序中,任务是顺序执行的,而在多线程程序中,可以同时执行...

    多线程队列源代码

    这里我们讨论的"多线程队列源代码"是一个实现了多线程和队列数据结构的程序,它包含了一个多线程队列的实现以及对应的测试程序。 首先,让我们理解一下多线程的概念。在计算机编程中,多线程是指一个程序中可以同时...

    多线程 队列利用

    结合标题“多线程队列利用”,我们可以推断文章可能主要讨论如何在多线程环境中有效利用队列来提升程序性能。这可能包括以下内容: 1. **线程安全的队列实现**:在多线程环境中,为了保证数据的一致性和完整性,...

    多线程队列

    本文将深入探讨如何在C语言中实现多线程队列及其相关的知识点。 首先,多线程是指在一个进程中创建多个执行线程,每个线程可以并发执行不同的任务。这样可以充分利用多核处理器的计算能力,提高程序的运行效率。在...

    java 多线程 队列工厂

    在Java编程中,多线程和队列是两个非常重要的概念,它们对于构建高效、可扩展的并发应用程序至关重要。队列工厂则是实现多线程间通信和任务调度的一种设计模式,它提供了一种抽象和统一的方式来创建和管理队列实例。...

    Fig15_11-12.rar_C#线程队列_c# 多线程 队列

    在C#编程中,线程队列是一种管理并发执行任务的有效机制,特别是在处理大量异步操作时。本文将深入探讨“C#线程队列”及其在“多线程”环境中的应用,以及如何利用队列来实现“读者写者”问题的解决方案。 一、C#...

    spring 多线程队列执行

    在Spring框架中,多线程队列执行是一个重要的性能优化策略,它可以帮助应用程序更高效地处理并发任务,尤其是在高负载和大数据量的场景下。本文将深入探讨Spring如何实现多线程队列以及其相关的核心概念和技术。 1....

    tcp sock client 与server Linux线程队列

    本主题将深入探讨“TCP Sock Client与Server”以及Linux环境下的线程队列技术。 首先,TCP是一种面向连接的、可靠的、基于字节流的传输层通信协议。它确保了数据在网络中的有序、无损传输。TCP通过三次握手建立连接...

    易语言-鱼刺多线程队列压入文本数据设置存活时效

    本文将深入探讨"易语言-鱼刺多线程队列压入文本数据设置存活时效"这一主题。 首先,我们要理解什么是多线程。多线程是指在一个程序中同时运行多个线程,每个线程可以独立执行不同的任务。在易语言中,通过"鱼刺"...

    Android_Handler的线程队列Demo

    这个"Android_Handler的线程队列Demo"旨在演示如何有效地利用Handler来处理线程间的通信,特别是主线程(UI线程)与工作线程之间的交互。下面我们将深入探讨相关知识点。 首先,了解Handler的基本概念。Handler是...

    基于线程队列动态规划法的GPU性能优化.pdf

    然而,线程调度和管理是优化GPU性能的关键,特别是对于那些执行时间较长的线程,传统的线程队列管理策略可能无法有效地响应,导致资源浪费和性能下降。 针对这一问题,本文提出的线程队列动态规划法(TQDP)是一种...

    c# Thread 带超时的线程管理,支持线程队列

    c# 带超时的线程管理 Thread.cs ,支持线程队列,自动启动判断是否结束线程,线程队列批量启动与执行超时控制 Threadx.cs

    淘宝茅台抢购最新优化版本,淘宝茅台秒杀,优化了茅台抢购线程队列.zip

    标题中的“淘宝茅台抢购最新优化版本,淘宝茅台秒杀,优化了茅台抢购线程队列”揭示了这是一个针对淘宝平台茅台商品抢购的软件或脚本的更新版本,重点在于对线程队列进行了优化,以提高抢购成功率。在IT行业中,这种...

    安卓稳定多线程文件下载,加入线程队列

    安卓稳定多线程文件下载,加入线程队列 使用简单

    易语言简单的多线程消息队列

    易语言简单的多线程消息队列。@Patek。

    基于C#实现的轻量级多线程队列图文详解

    "基于C#实现的轻量级多线程队列图文详解" 本文主要介绍了基于C#实现的轻量级多线程队列的相关知识点,包括了多线程队列的实现原理、线程池的使用、任务优先级的设置、多线程模式的选择等方面的内容。 1. 多线程...

    C++多线程队列

    构造一个队列,并完成入队列和出队列的函数,要求该队列支持多线程 (即一个线程做入队列操作而另一个线程做出队列操作而且两个线程必须同时运行)

    LINUX 下C编写UDP 文件传输 运用多线程 队列 信号量

    在Linux环境下,C语言编程实现UDP文件传输是一个挑战性的任务,尤其当涉及到多线程、队列和信号量等并发控制机制时。本项目旨在教你如何有效地构建这样的系统,提高程序的效率和并发性。 首先,UDP(User Datagram ...

Global site tag (gtag.js) - Google Analytics