`
lzy.je
  • 浏览: 151026 次
  • 性别: Icon_minigender_1
  • 来自: 沈阳
社区版块
存档分类
最新评论

超轻量 pthread 集结点实现

阅读更多

          我需要的 pthread 线程集结点功能,使用同一集结点的线程将通过 rend_wait 函数等待,当集结点到达指定数量的线程后同时激发继续执行。使用 pthread 的 mutex 和 cond 超轻量实现。下面 rend.h 是集结点实现,rendezvous.c 是测试应用。

 

/*
 * rend.h
 *
 *  Created on: 2009-11-14
 *      Author: liuzy (lzy.dev@gmail.com)
 */

#ifndef REND_H_
#define REND_H_

#include <pthread.h>
#include <assert.h>

struct rend_t {
	volatile int count;
	pthread_mutex_t count_lock;
	pthread_cond_t ready;
};

#define DECLARE_REND(name, count) \
	struct rend_t name = {(count), PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER}

int rend_init(struct rend_t* prend, int count) {
	int ret = 0;

	assert(prend);

	prend->count = count;

	if ((ret = pthread_mutex_init(&prend->count_lock, NULL)))
		return ret;

	if ((ret = pthread_cond_init(&prend->ready, NULL)))
		return ret;

	return EXIT_SUCCESS;
}

int rend_wait(struct rend_t* prend) {
	int ret = 0;

	assert(prend);

	if ((ret = pthread_mutex_lock(&prend->count_lock)))
		return ret;

	/* check count value is ready to weak up block code */
	if (prend->count == 1) {
		if ((ret = pthread_cond_broadcast(&prend->ready)))
			return ret;

		if ((ret = pthread_mutex_unlock(&prend->count_lock)))
			return ret;
	} else {
		prend->count--;

		ret = pthread_cond_wait(&prend->ready, &prend->count_lock);
		prend->count++;

		if (ret) {
			pthread_mutex_unlock(&prend->count_lock);
			return ret;
		}

		if ((ret = pthread_mutex_unlock(&prend->count_lock)))
			return ret;
	}

	return EXIT_SUCCESS;
}

int rend_free(struct rend_t* prend) {
	int ret = 0;

	assert(prend);

	prend->count = 0;

	if ((ret = pthread_mutex_destroy(&prend->count_lock)))
		return ret;

	if ((ret = pthread_cond_destroy(&prend->ready)))
		return ret;

	return EXIT_SUCCESS;
}

#endif /* REND_H_ */

 

rend 使用更简单:

 

  1. 定义/初始化 rend_t 集结点对象。DECLARE_REND 宏用于静态定义,rend_init 函数可以对动态创建的集结点结构初始化;
  2. pthread 线程通过调用 rend_wait 函数 P/V 集结状态。集结关系的线程要 P/V 在同一个 rend_t 集结对象上;
  3. 释放集结对象,rend_free 函数。

以上函数都是成功返回 0,出错返回 errno 值(非 0)。

 

 

/*
 ==============================
 Name        : rendezvous.c
 Author      : liuzy (lzy.dev@gmail.com)
 Version     : 0.1
==============================
 */

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>		/* va_list */
#include <unistd.h>
#include <string.h>
#include <errno.h>		/* errno */
#include <syslog.h>		/* for syslog(2) and level */
#include <pthread.h>

#include "rend.h"

static int daemon_proc = 0;	/* for syslog in err_doit */

#define	MAXLINE 4096		/* max text line length */

void err_doit(int errnoflag, int level, const char* fmt, va_list ap) {

	char buf[MAXLINE + 1] = { 0 };
	int errno_save = errno, n = 0;

#ifdef HAVE_VSNPRINTF
	vsnprintf(buf, MAXLINE, fmt, ap);
#else
	vsprintf(buf, fmt, ap);
#endif	/* HAVE_VSNPRINTF */

	n = strlen(buf);
	if (errnoflag)
		snprintf(buf + n, MAXLINE - n, ": %s", strerror(errno_save));
	strcat(buf, "\n");

	if (daemon_proc) {
		syslog(level, "%s", buf);
	} else {
		fflush(stdout);
		fputs(buf, stderr);
		fflush(stderr);
	}

	return;
}

void err_msg(const char* fmt, ...) {
	va_list ap;

	va_start(ap, fmt);
	err_doit(0, LOG_INFO, fmt, ap);
	va_end(ap);

	return;
}

void err_sys(const char* fmt, ...) {
	va_list ap;

	va_start(ap, fmt);
	err_doit(1, LOG_ERR, fmt, ap);
	va_end(ap);

	exit(EXIT_FAILURE);
}

#define THREAD_COUNT 100	/* rendezvous test thread workers */

struct worker_arg {
	int worker_id;
	struct rend_t* prend;
};

static void* pthread_worker(void* arg) {
	struct worker_arg* parg = (struct worker_arg*) arg;

	err_msg("worker #%d running.", (int) parg->worker_id);

	srand(parg->worker_id * 2);
	sleep(rand() % 5);

	rend_wait(parg->prend);	/* workers rendezvous */

	err_msg("worker #%d exiting.", (int) parg->worker_id);

	return EXIT_SUCCESS;
}

int main(void) {
	int idx = 0;
	void* exitcode = NULL;

	pthread_t thds[THREAD_COUNT];
	struct worker_arg arg[THREAD_COUNT];
	DECLARE_REND(rend, THREAD_COUNT);

	err_msg("workers creating.");

	for (idx = 0; idx < THREAD_COUNT; idx++) {
		arg[idx].prend = &rend;
		arg[idx].worker_id = idx;

		if (pthread_create(thds + idx, NULL, pthread_worker, (void*) &arg[idx]))
			err_sys("worker #%d create error.", idx);
	}

	puts("workers exiting.");

	for (idx = 0; idx < THREAD_COUNT; idx++)
		if (pthread_join(thds[idx], &exitcode) || (exitcode != EXIT_SUCCESS))
			err_msg("worker #%d exit error.", idx);

	err_msg("all done. exit 0.");

	rend_free(&rend);

	return EXIT_SUCCESS;
}

 

          看了下 semaphore os syscall 及其 infrastructure,也许以后还需要进程间(非 pthread)集结时用得上。kernel 实现的超强啊,呵呵~

 

// 2009.11.17 14:34 添加 ////

 

快速用户空间互斥锁(Futex)
快速用户空间互斥锁(fast userspace mutex,Futex)是快速的用户空间的锁,是对传统的System V同步方式的一种替代,传统同步方式如:信号量、文件锁和消息队列,在每次锁访问时需要进行系统调用。而futex仅在有竞争的操作时才用系统调用访问内核,这样,在竞争出现较少的情况下,可以大幅度地减少工作负载
futex在非竞争情况下可从用户空间获取和释放,不需要进入内核。与信号量类似,它有一个可以原子增减的计数器,进程可以等待计数器值变为正数。用户进程通过系统调用对资源的竞争作一个公断。
futex 是一个用户空间的整数值,被多个线程或进程共享。Futex的系统调用对该整数值时进行操作,仲裁竞争的访问。 glibc中的NPTL库封装了futex 系统调用,对futex接口进行了抽象。用户通过NPTL库像传统编程一样地使用线程同步API函数,而不会感觉到futex的存在。
futex 的实现机制是:如果当前进程访问临界区时,该临界区正被另一个进程使用,当前进程将锁用一个值标识,表示“有一个等待者正挂起”,并且调用 sys_futex(FUTEX_WAIT)等待其他进程释放它。内核在内部创建futex队列,以便以后与唤醒者匹配等待者。当临界区拥有者线程释放了 futex,它通过变量值发出通知表示还有多个等待者在挂起,并调用系统调用sys_futex(FUTEX_WAKE)唤醒它们。一旦所有等待者已获取资源并释放锁时,futex回到非竞争状态,并没有内核状态与它相关。
robust futex是为了解决futex锁崩溃而对futex进行了增强。例如:当一个进程在持有pthread_mutex_t锁正与其他进程发生竞争时,进程因某种意外原因而提前退出,如:进程发生段错误,或者被用户用shell命令kill -9-ed”强行退出,此时,需要有一种机制告诉等待者“锁的最一个持有者已经非正常地退出”。“
为了解决此类问题,NPTL创建了robust mutex用户空间API pthread_mutex_lock(),如果锁的拥有者进程提前退出,pthread_mutex_lock()返回一个错误值,新的拥有者进程可以决定是否可以安全恢复被锁保护的数据。

 

有几点不还不理解:

 

  1. “futex 如果说是一个用户空间的整数值,那怎么被多个进程共享?Futex 系统调用在 kernel 态怎么操作该值并仲裁竞争?这是那种直接映射到 userspace 的 kernel 地址么。这个需要程序间通过 mmap 在共享段中访问,与 futex 没什么关系。
  2. 这个“robust futex”机制指的应该就是 SVRx 传统 sem IPC 里的 SEM_UNDO flag 吧?

一篇不错的文章,引发对 glibc nptl 实现源码的探索:

关于信号量与线程互斥锁的区别与实现

 

分享到:
评论

相关推荐

    [并发并行]_[线程同步]_[pthread_once 实现单例模式分析]

    `pthread_once` 是一个在 POSIX 标准中定义的函数,用于实现线程安全的初始化。在这个场景下,我们将深入探讨如何使用 `pthread_once` 在 C/C++ 中实现单例模式,并结合 `Win32` 平台的特性进行讨论。 首先,单例...

    东南大学 操作系统实验2 用pthread实现矩阵相乘

    本实验的主题是“用pthread实现矩阵相乘”,这是并行计算领域的一个经典问题,旨在利用多线程技术提高计算效率。pthread是POSIX线程库,广泛应用于Linux和其他符合POSIX标准的操作系统中。 矩阵相乘是计算密集型...

    pthread, window pthread

    标题"pthread, window pthread"可能指的是pthread库在Windows平台上的实现。尽管pthread是为POSIX系统设计的,但为了使跨平台开发更加便捷,存在一些库,如PThread4Win或Winpthreads,它们为Windows提供了与pthread...

    pthread多线程求pi,linux多线程pthread,C,C++

    在Linux系统中,`pthread`库是C和C++编程语言实现多线程的标准接口。本文将深入探讨标题和描述中提到的“pthread多线程求pi”这一主题,以及并行计算在解决复杂计算问题中的应用。 首先,我们要理解π(pi)的计算...

    posix pthread windows 实现

    在Windows环境中,实现POSIX Pthread是为了在Windows平台上提供与UNIX/Linux相似的多线程编程接口,使得跨平台开发变得更加方便。Pthreads-win32是SourceWare项目的一部分,它为Windows提供了对POSIX线程库(pthread...

    并行pthread求π算法

    在这里,我们主要探讨pthread库在实现并行计算求π中的应用。 pthread是POSIX线程库的缩写,它是跨平台的API,用于创建和管理线程。在C语言编程环境中,pthread库为我们提供了创建、同步、通信等线程操作的功能。在...

    pthread.zip_C++_pthread windows

    在C++编程中,多线程技术是实现并发执行任务的重要手段,而pthread库作为跨平台的多线程API,被广泛应用于Linux和Windows系统中。本文将深入探讨pthread库的基础知识、其在C++中的使用方法以及如何在两个主要操作...

    生产者-消费者问题的Pthread实现

    对于条件变量,使用`pthread_cond_init()`初始化,`pthread_cond_wait()`让线程等待,`pthread_cond_signal()`或`pthread_cond_broadcast()`唤醒等待的线程。 - 结束线程:使用`pthread_join()`等待线程结束,或`...

    使用矩阵转置和pthread实现矩阵相乘

    使用c++实现了基于pthread的并行矩阵乘法,同时为了提高程序运行效率,首先将矩阵进行转置

    Windows可使用的pthread库

    2. **线程同步**:pthread库提供了多种同步机制,包括互斥锁(`pthread_mutex_t`)、条件变量(`pthread_cond_t`)、信号量(`pthread_semaphore_t`)和读写锁(`pthread_rwlock_t`)。这些同步原语有助于防止数据...

    vs的pthread包

    POSIX线程库,通常简称为pthread,是跨平台的一种多线程API,主要在Unix-like系统中使用,但也可以通过一些方式在Windows上实现。在本话题中,我们将探讨如何在VS中使用pthread包,以及如何正确配置项目以确保其正常...

    pthread相关的头文件与 库

    如果在编译时选择动态链接`pthread`库,那么程序运行时必须有`pthread.dll`存在,否则会报找不到入口点的错误。 在Windows环境下,由于原生并不完全支持POSIX标准,所以通常需要第三方库如pthreads-w32来提供这些...

    Linux课程设计:多线程停车场,使用锁和信号量实现,引入pthread-win32库后,可以在windows下运行+源代码+文

    Linux课程设计:多线程停车场,使用锁和信号量实现,引入pthread-win32库后,可以在windows下运行+源代码+文 - 小白不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的课程设计,代码都测试ok,都是运行...

    pthread文件pthread.lib

    pthread.lib文件下载,本人用于crf++安装时使用。。 pthread.lib文件下载,本人用于crf++安装时使用。。 pthread.lib文件下载,本人用于crf++安装时使用。。 pthread.lib文件下载,本人用于crf++安装时使用。。

    pthread_testcancel pthread_kill pthread_cancel

    pthread_testcancel pthread_kill pthread_cancel 的使用例子

    pthread_实验报告1

    本实验报告主要探讨了多线程的概念及其在并行计算中的应用,使用Pthread库作为实现工具。Pthread是POSIX线程库,它为C和C++编程提供了多线程支持,使得在单个进程中可以并发地执行多个线程,从而提高程序的执行效率...

    VC++6.0安装pthread库过程图解

    然而,由于pthread库在跨平台线程管理方面的广泛适用性,有时我们需要在VC++6.0中引入这个库来实现跨平台的代码兼容。本篇将详细介绍如何在VC++6.0中安装和使用pthread库。 首先,我们需要获取pthread库。pthread库...

    windows 下的pthread 库

    在Windows下使用`pthread`库,开发者需要注意一些移植问题,比如信号量和读写锁在Windows API中实现方式不同,可能需要额外的适配工作。此外,`pthread`库在Windows上可能没有像在Linux那样的优化,性能上可能有所...

    pthread多线程c++动库下载

    pthread是POSIX线程库,它为C++编程提供了跨平台的多线程支持。在Windows环境下,由于标准C++库并不直接...同时,了解pthread的线程生命周期管理、线程局部存储(TLS)以及取消点等特性也是编写高效多线程代码的基础。

    glibc pthread source code

    而实现这一功能的关键,便是glibc中的pthread库。glibc,全称GNU C Library,是Linux系统下广泛使用的C语言运行库,它为程序员提供了丰富的API接口,其中就包括了对多线程支持的pthread接口。本文将深入探讨glibc中...

Global site tag (gtag.js) - Google Analytics