- 浏览: 105865 次
最新评论
IPC Mutex And Conditional Variable
互斥锁与条件变量
1、函数列表
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);
线程创建函数,输入为属性attr(如果attr为NULL,则采用默认的属性),线程的执行函数为start_routine,执行函数的参数为arg;输出为当线程创建成功后,其id将保存在thread中。
线程创建成功,返回0;否则返回出错码。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_join(pthread_t thread, void **value_ptr);
此函数的调用线程将阻塞,直到目标thread线程结束。其中thread为pthread_create创建线程ID,value_ptr一般为NULL。
成功返回0,否则返回出错码。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_getconcurrency(void);
int pthread_setconcurrency(int new_level);
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
这两个函数将激活为条件变量cond所阻塞的线程。
pthread_cond_broadcast将一次性激活所有为cond所阻塞的线程。
pthread_cond_signal将激活至少一个为cond阻塞的线程,如果没有为cond阻塞的线程,调用将不发生任何影响,返回值也没区分;如果有多个为cond阻塞的线程,多次调用pthread_cond_signal激活阻塞的线程取决于阻塞线程调用wait函数的顺序。
返回值,成功时为0,否则EVINVAL将返回标示cond没有初始化。
这两个函数的实现原理是发送特定的信号,从而激活阻塞函数。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
条件变量都是与互斥关联的,因为决定什么时候wait和什么时候signal的变量(不是cond,而是具体应用中的在两个线程间共享的用户自定义的任何类型变量)在使用的时候必须同步保护起来,而mutex变量则起这样的作用。
给出使用代码格式:
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond=PTHREAD_COND_INITIALIZER; 条件为真 pthread_cond_signal(&cond); //那么给条件变量mutex发送信号 pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex); while(条件为假) pthread_cond_wait(&cond, &mutex); //那么,阻塞休眠此线程 pthread_mutex_unlock(&mutex); |
可以看出一共涉及三个名词:条件变量、条件和互斥。
条件是用于判断何时发送信号和休眠线程的用户自定义的变量;
互斥mutex是用于同步保护条件(因为条件是两个线程之间维护的变量,在两个线程中都有使用);
条件变量cond用于signal和wait函数中。
Wait函数原子地执行以下两个动作:给互斥锁mutex解锁(对照格式,调用此函数之前已经给mutex上了锁,因为要给其它线程使用);然后把调用线程投入休眠,直到另外某个线程就本条件变量调用signal函数。
Wait函数返回(signal函数导致或者是TIMEOUT)执行的两个操作:重新给互斥锁mutex上锁(因为马上要操作条件);总是再次测试相应条件成立与否(因为可能发生虚假的唤醒)。
Abstime是绝对时间,而不是相对时间。使用绝对是件的好处是:如果函数过早返回了(也许是因为捕获了某个信号),那么同一函数无需改变其参数中abstime参数就可以再次被调用。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
成功返回0,失败返回错误码。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
条件变量的两种初始化方式,静态和动态的。
成功返回0,失败返回错误码。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
互斥属性的初始化和销毁。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_condattr_destroy(pthread_condattr_t *attr);
int pthread_condattr_init(pthread_condattr_t *attr);
条件变量属性的初始化和销毁
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_mutexattr_getpshared(const pthread_mutexattr_t *
restrict attr, int *restrict pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr,
int pshared);
当初始化互斥后,通过此函数获得和修改互斥属性。
<!--[if !supportLists]-->Ø <!--[endif]-->int pthread_condattr_getpshared(const pthread_condattr_t *restrict attr,
int *restrict pshared);
int pthread_condattr_setpshared(pthread_condattr_t *attr,
int pshared);
当初始化条件变量以后,通过此函数获得和修改条件变量的属性。
2、实例解析
<!--[if !supportLists]-->Ø <!--[endif]-->simplethread
线程使用的简单实例。
//simplethread.c #include <stdio.h> #include <pthread.h> void thread(void* arg) { int i; printf("arg==%d",*((int *)arg)); for(i=0;i<3;i++) printf("This is a pthread./n"); sleep(10); } int main(void) { pthread_t id; int i,ret; int a=9; ret=pthread_create(&id,NULL,(void *) thread,&a); if(ret!=0){ printf ("Create pthread error!/n"); exit (1); } for(i=0;i<3;i++) printf("This is the main process./n"); pthread_join(id,NULL); printf("========"); return (0); } #gcc simplethread.c –lpthread –o simplethread |
主线程调用pthread_create创建线程,成功返回0;
主线程调用pthread_join阻塞等待子线程结束。
<!--[if !supportLists]-->Ø <!--[endif]-->basicmutex
使用同步锁,在生产者之间同步,保证任一时刻只有一个线程操作全局变量。
//basicmutex.c #include <pthread.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #define MAXLINE 4096 /* max text line length */ #define MAXNITEMS 1000000 #define MAXNTHREADS 100
int nitems; //数组的总长度 pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER; int buff[MAXNITEMS]; //线程操作的全局公共变量 //生产者初始化数组,buff[0]=0,buff[1]=1,依次初始化 //消费者只是检查buff[i]是否等于i int nput=0; //下一个待初始化数组元素下标,例如已经初始化buff[3]=3,那么nput=4 int nval=0; //下一个待初始化数组元素值,例如已经初始化buff[4]=4,那么nval=5 void *produce(void *), *consume(void *); //生产者初始化数组buff[nitems],总长度为nitems //消费者检查由消费者已经初始化的数组buff[i]是否等于i int main(int argc, char **argv) { int i, nthreads, count[MAXNTHREADS]; //count的作用是各个生产者线程初始化buff元素的个数 pthread_t tid_produce[MAXNTHREADS], tid_consume; nitems = 100000; nthreads = 3; //数组的总长度为100000;创建的生产者线程数为3 // Set_concurrency(nthreads); for (i = 0; i < nthreads; i++) { count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } //创建3个生产者线程 for (i = 0; i < nthreads; i++) { pthread_join(tid_produce[i], NULL); printf("count[%d] = %d/n", i, count[i]); } //阻塞等待3个生产者线程结束 pthread_create(&tid_consume, NULL, consume, NULL); //创建消费者线程 pthread_join(tid_consume, NULL); //阻塞等待消费者线程结束 exit(0); } void * produce(void *arg)//一共有3个线程在执行此函数,每个线程的传入的arg不同,记录初始化元素个数 { for ( ; ; ) { pthread_mutex_lock(&mutex); //对全局公共数组buff进行操作,必须加锁 if (nput >= nitems) { pthread_mutex_unlock(&mutex); return(NULL); /* array is full, we're done */ } //下一个待初始化的数组元素的下标nput大于了数组的总长度nitems,说明数组已经初始化完成 buff[nput] = nval; nput++; nval++; //具体的初始化过程 pthread_mutex_unlock(&mutex); *((int *) arg) += 1; //记录此生产者线程初始化元素的个数 } }
void * consume(void *arg) { int i; for (i = 0; i < nitems; i++) { if (buff[i] != i) printf("buff[%d] = %d/n", i, buff[i]); //检查元素初始化是否正确,不正确则输出 } return(NULL); } |
对于全局变量buff,nitems,nput和nval的操作,用mutex进行了同步,保证任何时刻只有一个线程在执行临界区的代码。
临界区代码为:
buff[nput] = nval; nput++; nval++; |
一共有三个线程有机会执行此段代码,假设没有采用mutex来保护临界区,则可能出现一下情况:如果A线程执行完nput++以后(假设此时nput为11,nval为10),还没执行nval++之前,发生线程调度,线程B开始执行,并执行完了临界区代码(此时nput为12,nval为11),这样最终buff[11]=10,后面的数据均为发生错位。
<!--[if !supportLists]-->Ø <!--[endif]-->basiccond
使用条件变量,在生产者和消费者之间同步,除了保证任一时刻只有一个线程操作全局变量,还保证消费者只能消费生产者已经生产的数据。
Basicmutex生产者线程结束以后,才创建消费者线程,不存在生产者和消费者线程之间的同步问题,也就是不存在消费者检查未初始化数组元素的情况。
Basiccond将同时创建生产者和消费者线程,这样就必须采用条件变量来控制消费者只能检查已经初始化的数组元素。
//basiccond.c #include <pthread.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #define MAXNITEMS 1000000 #define MAXNTHREADS 100 int nitems; /* read-only by producer and consumer */ int buff[MAXNITEMS]; pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond=PTHREAD_COND_INITIALIZER; int nput=0; int nval=0; int nready=0; //生产者线程已经初始化了的数组元素个数 void *produce(void *), *consume(void *);
int main(int argc, char **argv) { int i, nthreads, count[MAXNTHREADS]; pthread_t tid_produce[MAXNTHREADS], tid_consume; nitems = 1000; nthreads = 3; // Set_concurrency(nthreads + 1); for (i = 0; i < nthreads; i++) { count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } pthread_create(&tid_consume, NULL, consume, NULL); for (i = 0; i < nthreads; i++) { pthread_join(tid_produce[i], NULL); printf("count[%d] = %d/n", i, count[i]); } pthread_join(tid_consume, NULL); exit(0); } void * produce(void *arg) { for ( ; ; ) { pthread_mutex_lock(&mutex); if (nput >= nitems) { pthread_mutex_unlock(&mutex); return(NULL); /* array is full, we're done */ } buff[nput] = nval; nput++; nval++; nready++; pthread_cond_signal(&cond); //现在生产者已经生产了数组元素,所以可以通知消费者线程进行检查了 pthread_mutex_unlock(&mutex); *((int *) arg) += 1; } } void * consume(void *arg) { int i; for (i = 0; i < nitems; i++) { //如果生产者只初始化buff数组到900,则对超过900的元素进行检查则是不合理的 while (nready == 0) pthread_cond_wait(&cond, &mutex); // nready为0,说明消费者线程已经检查完了所有已经初始化的元素,那么,进入休眠 nready--; pthread_mutex_unlock(&mutex); if (buff[i] != i) printf("buff[%d] = %d/n", i, buff[i]); } return(NULL); } //gcc basiccond.c –lpthread –o basiccond |
条件变量实现消费者和生产者同步的原理:在生产者和消费者之间维护一个nready变量,生产者每初始化一个数组元素,则将nready加1,消费者没检查玩一个数组元素,则将nready减1.当nready为0时,说明消费者已经检查完了所有已经初始化的数组元素,此时调用pthread_cond_wait使消费者线程进入休眠。当生产者再次初始化数组元素时,调用pthread_cond_signal使消费者线程恢复。
关于条件变量同步:一个线程什么时候进入休眠,另一个线程什么时候发送条件变量来激活休眠的线程。而这取决于两个线程维护的一个变量,而操作这个变量时,必须用mutex来保护。
如果不采用条件变量,实现消费者和生产者同步的原理:将消费者检查的数组元素下标(i)与生产者已经生产的数组元素的下标(nput-1)进行比较,从而可以判断i元素是否已经生产。在此基础之上,如果生产了,则进行检查,否则则不进行检查,但必须继续等待生产者进行生产(因为消费者线程必须检查完buff{nitems}整个数组),而这个等待过程,则是耗费cpu的无限循环。
void * produce(void *arg) { for ( ; ; ) { pthread_mutex_lock(&mutex); if (nput >= nitems) { pthread_mutex_unlock(&mutex); return(NULL); /* array is full, we're done */ } buff[nput] = nval; nput++; nval++; pthread_mutex_unlock(&mutex); *((int *) arg) += 1; } }
void * consume(void *arg) { int i; for (i = 0; i < nitems; i++) { for(;;) { if(i<nput)//如果buff[i]已经为生产者所初始化,则可以进行消费 { if (buff[i] != i) printf("buff[%d] = %d/n", i, buff[i]); //检查元素初始化是否正确,不正确则输出 } else //如果buff[i]没有生产,则结束这一次的无限循环 continue; } return(NULL); }
|
<!--[if !supportLists]-->Ø <!--[endif]-->condwaittime
3、小结
<!--[if !supportLists]-->Ø <!--[endif]-->互斥锁和条件变量出自Posix线程标准,它们总是可用来同步一个进程内的各个线程的。如果一个互斥锁或条件变来那个存放在多个进程间共享的某个内存中,那么Posix还允许它用于这些进程间的同步。
<!--[if !supportLists]-->Ø <!--[endif]-->互斥锁用于保护临界区,以保证任何时刻只有一个线程在执行其中的代码(假设互斥锁由多个线程共享),或者任何时刻只有一个进程在执行其中的代码(假设互斥锁由多个进程共享)。
<!--[if !supportLists]-->Ø <!--[endif]-->互斥锁用于上锁,条件变量用于等待。互斥锁可以通过轮转(spinning)或轮询(polling)来实现等待,但耗费cpu。
<!--[if !supportLists]-->Ø <!--[endif]-->条件变量总是有一个互斥锁与之关联,因为决定等待和信号发送的变量必须用互斥锁保护起来。
相关推荐
Implement semaphore with mutex and conditional variable. Reason being, POSIX semaphore on Android are not used or well tested.
C++11中的各种mutex, lock对象,实际上都是对posix的mutex,condition的封装。不过里面也有很多细节值得学习。 std::mutex 先来看下std::mutex: 包增了一个pthread_mutex_t __m_,很简单,每个函数该干嘛...
- `cmpxchg`(Compare and Exchange)是一种特殊的CPU指令,它可以比较两个值,并在条件满足的情况下交换它们。 - 在Mutex的加锁过程中,`cmpxchg`指令被用来实现原子性的加锁操作。 2. **cmpxchg指令的执行流程*...
To be noticed, wai() will unlock mutex and will continue when the relevant mutex be unlocked. Signal() will not release the mutex which means it will go ahead without waiting for anything.
4. **条件变量(Conditional Variable)**: 条件变量是配合互斥量使用的另一种同步机制,它允许进程等待某个特定条件满足后再继续执行。进程可以"锁住"条件变量并释放互斥锁,然后被阻塞,直到其他进程唤醒它。...
在编程领域,特别是涉及到多线程或多进程编程时,互斥体(Mutex)是一个至关重要的概念。互斥体主要用于同步多个线程或进程对共享资源的访问,确保同一时间只有一个线程或进程能够访问该资源,从而避免数据冲突和竞...
《Mutex在多线程编程中的应用》 Mutex,即互斥锁,是多线程编程中一种重要的同步机制。在Windows编程中,Mutex被广泛用于控制对共享资源的访问,确保同一时间只有一个线程能够访问特定的代码段或数据。在“mutexs ...
在Linux系统中,互斥锁(Mutex)是一种用于多线程同步的重要机制,它确保了在任何时刻只有一个线程能够访问特定的临界区。Mutex锁的使用是防止数据竞争和保证线程安全的关键手段。这个"mutex锁demo代码.rar"压缩包...
在Laravel框架中,"laravel-mutex"是一个用于实现锁机制的库,它帮助开发者在并发环境中确保数据的一致性和完整性。这个库通常用于处理那些需要互斥操作的场景,比如防止多个进程同时执行同一段代码,或者保护共享...
当一个线程获得了`Mutex`的所有权后,其他尝试获取该`Mutex`的线程将会被阻塞,直到当前持有者释放`Mutex`。这样就保证了在任何时刻只有一个线程能够执行特定的代码段,确保了数据的一致性。 在C#中,`Mutex`类位于...
在UCOS-II版本中,互斥信号量(Mutex)是实现任务间同步和资源保护的重要机制。本文件“create-mutex.zip”包含了关于在UCOS-II中创建和使用互斥信号量的源码及相关知识点。 1. **互斥信号量(Mutex)的概念**: ...
标题与描述:“Latch_Lock_And_Mutex_Contention_Troubleshooting” 知识点详述: ### 1. Latch机制概述 Latch是Oracle数据库中用于管理内存结构并发访问的一种低级机制,主要用于保护短暂访问的内存结构,如缓存...
### Linux上的互斥线程Mutex详解 在多线程编程中,互斥量(Mutex)是一种常见的同步机制,用于防止多个线程同时访问共享资源,从而避免数据竞争和不一致状态的发生。本文将深入探讨Linux环境下C++互斥线程Mutex的...
当一个线程获取了Mutex后,其他试图获取该Mutex的线程将被阻塞,直到拥有Mutex的线程释放它。这样可以确保任何时候只有一个线程能访问受保护的代码或资源,从而避免并发问题。 在C++或.NET等支持线程编程的环境中,...
在多线程编程中,互斥锁(Mutex)是一种重要的同步机制,用于保证共享资源在同一时刻只能被一个线程访问。本项目提供了在Linux和Windows系统下通用的互斥锁Mutex封装,实现了跨平台的兼容性,使得开发者无需关心底层...
- 锁的竞争和等待机制:`mutex_lock_interruptible()`、`mutex_lock_killable()`以及`mutex_lock_nested()`。 - 锁的公平性和死锁预防策略。 通过对`mutex.c`的深入理解,我们可以更好地掌握Linux内核中的同步和...
Mutex,中文通常称为互斥锁,是Windows API中一种重要的多线程同步机制。在MetaTrader 5(MT5)程序库中,Mutex被用来确保在同一时间只有一个进程或线程可以访问特定的资源,从而避免数据竞争和不一致性的发生。在...
Mutex,中文名为互斥锁,是多线程编程中一种重要的同步机制,用于在多个线程之间实现资源的独占访问。在这个特定的例子中,Mutex被用来防止同一个程序的实例在同一时间多次运行,确保程序的单一实例化。下面将详细...
通过一个标志位来处理的方式虽然可以实现效果,但是还不够安全,极有可能有多条线程同时操作一个全局变量,导致资源争夺问题,为了保证安全,可以在此基础上加上对应的锁来处理同步问题,比如加上互斥锁 mutex,就...
在C++中,`mutex`(互斥量)是实现线程同步的一种基本工具,它允许只让一个线程访问共享资源,从而避免了竞态条件的发生。在VS2008中,我们可以利用C++标准模板库(STL)中的`std::mutex`类来实现这一功能。 首先,...