转自:http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/#list15
本文是本系列的最后一篇,讨论两个主题:关于实现基于互斥锁的并发链表的设计方法和设计不使用互斥锁的并发数据结构。对于后一个主题,我选择实现一个并发堆栈并解释设计这种数据结构涉及的一些问题。用 C++
设计独立于平台的不使用互斥锁的数据结构目前还不可行,所以我选用 GCC version 4.3.4 作为编译器并在代码中使用 GCC 特有的 __sync_*
函数。如果您是 WIndows® C++
开发人员,可以考虑使用 Interlocked* 系列函数实现相似的功能。
清单 1 给出最基本的并发单向链表接口。是不是缺少什么东西?
template <typename T> class SList { private: typedef struct Node { T data; Node *next; Node(T& data) : value(data), next(NULL) { } } Node; pthread_mutex_t _lock; Node *head, *tail; public: void push_back(T& value); void insert_after(T& previous, T& value); // insert data after previous void remove(const T& value); bool find(const T& value); // return true on success SList( ); ~SList( ); }; |
清单 2 给出 push_back
方法定义。
void SList<T>::push_back(T& data) { pthread_mutex_lock(&_lock); if (head == NULL) { head = new Node(data); tail = head; } else { tail->next = new Node(data); tail = tail->next; } pthread_mutex_unlock(&_lock); } |
现在,假设一个线程试图通过调用 push_back
把 n 个整数连续地添加到这个链表中。这个接口本身要求获取并释放互斥锁 n次,即使在第一次获取锁之前已经知道要插入的所有数据。更好的做法是定义另一个方法,它接收一系列整数,只获取并释放互斥锁一次。清单 3 给出方法定义。
void SList<T>::push_back(T* data, int count) // or use C++ iterators { Node *begin = new Node(data[0]); Node *temp = begin; for (int i=1; i<count; ++i) { temp->next = new Node(data[i]); temp = temp->next; } pthread_mutex_lock(&_lock); if (head == NULL) { head = begin; tail = head; } else { tail->next = begin; tail = temp; } pthread_mutex_unlock(&_lock); } |
现在,我们来优化链表中的搜索元素 — 即 find
方法。下面是几种可能出现的情况:
- 当一些线程正在迭代链表时,出现插入或删除请求。
- 当一些线程正在迭代链表时,出现迭代请求。
- 当一些线程正在插入或删除数据时,出现迭代请求。
显然,应该能够同时处理多个迭代请求。如果系统中的插入/删除操作很少,主要活动是搜索,那么基于单一锁的方法性能会很差。在这种情况下,应该考虑使用读写锁,即 pthread_rwlock_t
。在本文的示例中,将在 SList
中使用 pthread_rwlock_t
而不是 pthread_mutex_t
。这么做就允许多个线程同时搜索链表。插入和删除操作仍然会锁住整个链表,这是合适的。清单 4给出使用 pthread_rwlock_t
的链表实现。
template <typename T> class SList { private: typedef struct Node { // … same as before } Node; pthread_rwlock_t _rwlock; // Not pthread_mutex_t any more! Node *head, *tail; public: // … other API remain as-is SList( ) : head(NULL), tail(NULL) { pthread_rwlock_init(&_rwlock, NULL); } ~SList( ) { pthread_rwlock_destroy(&_rwlock); // … now cleanup nodes } }; |
清单 5 给出链表搜索代码。
bool SList<T>::find(const T& value) { pthread_rwlock_rdlock (&_rwlock); Node* temp = head; while (temp) { if (temp->value == data) { status = true; break; } temp = temp->next; } pthread_rwlock_unlock(&_rwlock); return status; } |
清单 6 给出使用读写锁的 push_back
方法。
void SList<T>::push_back(T& data) { pthread_setschedprio(pthread_self( ), SCHED_FIFO); pthread_rwlock_wrlock(&_rwlock); // … All the code here is same as Listing 2 pthread_rwlock_unlock(&_rwlock); } |
我们来分析一下。这里使用两个锁定函数调用(pthread_rwlock_rdlock
和 pthread_rwlock_wrlock
)管理同步,使用pthread_setschedprio
调用设置写线程的优先级。如果没有写线程在这个锁上阻塞(换句话说,没有插入/删除请求),那么多个请求链表搜索的读线程可以同时操作,因为在这种情况下一个读线程不会阻塞另一个读线程。如果有写线程等待这个锁,当然不允许新的读线程获得锁,写线程等待,到现有的读线程完成操作时写线程开始操作。如果不按这种方式使用pthread_setschedprio
设置写线程的优先级,根据读写锁的性质,很容易看出写线程可能会饿死。
下面是对于这种方式要记住的几点:
- 如果超过了最大读锁数量(由实现定义),
pthread_rwlock_rdlock
可能会失败。 - 如果有 n 个并发的读锁,一定要调用
pthread_rwlock_unlock
n 次。
应该了解的最后一个方法是 insert_after
。同样,预期的使用模式要求调整数据结构的设计。如果一个应用程序使用前面提供的链表,它执行的插入和搜索操作数量差不多相同,但是删除操作很少,那么在插入期间锁住整个链表是不合适的。在这种情况下,最好允许在链表中的分离点(disjoint point)上执行并发插入,同样使用基于读写锁的方式。下面是构造链表的方法:
- 在两个级别上执行锁定(见 清单 7):链表有一个读写锁,各个节点包含一个互斥锁。如果想节省空间,可以考虑共享互斥锁 — 可以维护节点与互斥锁的映射。
- 在插入期间,写线程在链表上建立读锁,然后继续处理。在插入数据之前,锁住要在其后添加新数据的节点,插入之后释放此节点,然后释放读写锁。
- 删除操作在链表上建立写锁。不需要获得与节点相关的锁。
- 与前面一样,可以并发地执行搜索。
template <typename T> class SList { private: typedef struct Node { pthread_mutex_lock lock; T data; Node *next; Node(T& data) : value(data), next(NULL) { pthread_mutex_init(&lock, NULL); } ~Node( ) { pthread_mutex_destroy(&lock); } } Node; pthread_rwlock_t _rwlock; // 2 level locking Node *head, *tail; public: // … all external API remain as-is } }; |
清单 8 给出在链表中插入数据的代码。
void SList<T>:: insert_after(T& previous, T& value) { pthread_rwlock_rdlock (&_rwlock); Node* temp = head; while (temp) { if (temp->value == previous) { break; } temp = temp->next; } Node* newNode = new Node(value); pthread_mutex_lock(&temp->lock); newNode->next = temp->next; temp->next = newNode; pthread_mutex_unlock(&temp->lock); pthread_rwlock_unlock(&_rwlock); return status; } |
到目前为止,都是在数据结构中使用一个或多个互斥锁管理同步。但是,这种方法并非没有问题。请考虑以下情况:
- 等待互斥锁会消耗宝贵的时间 — 有时候是很多时间。这种延迟会损害系统的可伸缩性。
- 低优先级的线程可以获得互斥锁,因此阻碍需要同一互斥锁的高优先级线程。这个问题称为优先级倒置(priority inversion ) (更多信息见 参考资料 中的链接)。
- 可能因为分配的时间片结束,持有互斥锁的线程被取消调度。这对于等待同一互斥锁的其他线程有不利影响,因为等待时间现在会更长。这个问题称为锁护送(lock convoying)(更多信息见 参考资料 中的链接)。
互斥锁的问题还不只这些。最近,出现了一些不使用互斥锁的解决方案。话虽如此,尽管使用互斥锁需要谨慎,但是如果希望提高性能,肯定应该研究互斥锁。
在研究不使用互斥锁的解决方案之前,先讨论一下从 80486 开始在所有 Intel® 处理器上都支持的 CMPXCHG 汇编指令。清单 9从概念角度说明这个指令的作用。
int compare_and_swap ( int *memory_location, int expected_value, int new_value) { int old_value = *memory_location; if (old_value == expected_value) *memory_location = new_value; return old_value; } |
这里发生的操作是:指令检查一个内存位置是否包含预期的值;如果是这样,就把新的值复制到这个位置。清单 10 提供汇编语言伪代码。
CMPXCHG OP1, OP2 if ({AL or AX or EAX} = OP1) zero = 1 ;Set the zero flag in the flag register OP1 = OP2 else zero := 0 ;Clear the zero flag in the flag register {AL or AX or EAX}= OP1 |
CPU 根据操作数的宽度(8、16 或 32)选择 AL、AX 或 EAX 寄存器。如果 AL/AX/EAX 寄存器的内容与操作数 1 的内容匹配,就把操作数 2 的内容复制到操作数 1;否则,用操作数 2 的值更新 AL/AX/EAX 寄存器。Intel Pentium® 64 位处理器有相似的指令 CMPXCHG8B,它支持 64 位的比较并交换。注意,CMPXCHG 指令是原子性的,这意味着在这个指令结束之前没有可见的中间状态。它要么完整地执行,要么根本没有开始。在其他平台上有等效的指令,例如 Motorola MC68030 处理器的 compare and swap (CAS) 指令有相似的语义。
我们为什么对 CMPXCHG 感兴趣?这意味着要使用汇编语言编写代码吗?
需要了解 CMPXCHG 和 CMPXCHG8B 等相关指令是因为它们构成了无锁解决方案的核心。但是,不必使用汇编语言编写代码。GCC (GNU Compiler Collection,4.1 和更高版本)提供几个原子性的内置函数(见 参考资料),可以使用它们为 x86 和 x86-64 平台实现 CAS 操作。实现这一支持不需要包含头文件。在本文中,我们要在无锁数据结构的实现中使用 GCC 内置函数。看一下这些内置函数:
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...) type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...) |
__sync_bool_compare_and_swap
内置函数比较 oldval
和 *ptr
。如果它们匹配,就把 newval
复制到 *ptr
。如果 oldval
和*ptr
匹配,返回值是 True,否则是 False。__sync_val_compare_and_swap
内置函数的行为是相似的,只是它总是返回旧值。清单 11 提供一个使用示例。
#include <iostream> using namespace std; int main() { bool lock(false); bool old_value = __sync_val_compare_and_swap( &lock, false, true); cout >> lock >> endl; // prints 0x1 cout >> old_value >> endl; // prints 0x0 } |
既然了解了 CAS,现在就来设计一个并发堆栈。这个堆栈没有锁;这种无锁的并发数据结构也称为非阻塞数据结构。清单 12给出代码接口。
template <typename T> class Stack { typedef struct Node { T data; Node* next; Node(const T& d) : data(d), next(0) { } } Node; Node *top; public: Stack( ) : top(0) { } void push(const T& data); T pop( ) throw (…); }; |
清单 13 给出压入操作的定义。
void Stack<T>::push(const T& data) { Node *n = new Node(data); while (1) { n->next = top; if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS break; } } } |
压入(Push)操作做了什么?从单一线程的角度来看,创建了一个新节点,它的 next 指针指向堆栈的顶部。接下来,调用 CAS 内置函数,把新的节点复制到 top 位置。
从多个线程的角度来看,完全可能有两个或更多线程同时试图把数据压入堆栈。假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30,而线程 A 先获得了时间片。但是,在 n->next = top
指令结束之后,调度程序暂停了线程 A。现在,线程 B 获得了时间片(它很幸运),它能够完成 CAS,把 30 压入堆栈后结束。接下来,线程 A 恢复执行,显然对于这个线程 *top
和 n->next
不匹配,因为线程 B 修改了 top 位置的内容。因此,代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的),调用 CAS,把 20 压入堆栈后结束。整个过程没有使用任何锁。
清单 14 给出从堆栈弹出数据的代码。
T Stack<T>::pop( ) { if (top == NULL) throw std::string(“Cannot pop from empty stack”); while (1) { Node* next = top->next; if (__sync_bool_compare_and_swap(&top, top, next)) { // CAS return top->data; } } } |
用与 push
相似的代码定义弹出操作语义。堆栈的顶存储在 result
中,使用 CAS 把 top 位置更新为 top->next
并返回适当的数据。如果恰在执行 CAS 之前线程失去执行权,那么在线程恢复执行之后,CAS 会失败,继续循环,直到有有效的数据可用为止。
不幸的是,这种堆栈弹出实现有问题 — 包括明显的问题和不太明显的问题。明显的问题是 NULL 检查必须放在 while
循环中。如果线程 P 和线程 Q 都试图从只剩一个元素的堆栈弹出数据,而线程 P 恰在执行 CAS 之前失去执行权,那么当它重新获得执行权时,堆栈中已经没有可弹出的数据了。因为 top
是 NULL,访问 &top
肯定会导致崩溃 — 这显然是可以避免的 bug。这个问题也突显了并发数据结构的基本设计原则之一:决不要假设任何代码会连续执行。
清单 15 给出解决了此问题的代码。
T Stack<T>::pop( ) { while (1) { if (top == NULL) throw std::string(“Cannot pop from empty stack”); Node* next = top->next; if (top && __sync_bool_compare_and_swap(&top, top, next)) { // CAS return top->data; } } } |
下一个问题比较复杂,但是如果您了解内存管理程序的工作方式(更多信息见 参考资料 中的链接),应该不太难理解。清单 16 展示这个问题。
T* ptr1 = new T(8, 18); T* old = ptr1; // .. do stuff with ptr1 delete ptr1; T* ptr2 = new T(0, 1); // We can't guarantee that the operating system will not recycle memory // Custom memory managers recycle memory often if (old1 == ptr2) { … } |
在此代码中,无法保证 old
和 ptr2
有不同的值。根据操作系统和定制的应用程序内存管理系统的具体情况,完全可能回收利用已删除的内存 — 也就是说,删除的内存放在应用程序专用的池中,可在需要时重用,而不返回给系统。这显然会改进性能,因为不需要通过系统调用请求更多内存。尽管在一般情况下这是有利的,但是对于非阻塞堆栈不好。现在我们来看看这是为什么。
假设有两个线程 — A 和 B。A 调用 pop
并恰在执行 CAS 之前失去执行权。然后 B 调用 pop
,然后压入数据,数据的一部分来自从前面的弹出操作回收的内存。清单 17 给出伪代码。
Thread A tries to pop Stack Contents: 5 10 14 9 100 2 result = pointer to node containing 5 Thread A now de-scheduled Thread B gains control Stack Contents: 5 10 14 9 100 2 Thread B pops 5 Thread B pushes 8 16 24 of which 8 was from the same memory that earlier stored 5 Stack Contents: 8 16 24 10 14 9 100 2 Thread A gains control At this time, result is still a valid pointer and *result = 8 But next points to 10, skipping 16 and 24!!! |
纠正方法相当简单:不存储下一个节点。清单 18 给出代码。
T Stack<T>::pop( ) { while (1) { Node* result = top; if (result == NULL) throw std::string(“Cannot pop from empty stack”); if (top && __sync_bool_compare_and_swap(&top, result, result->next)) { // CAS return top->data; } } } |
这样,即使线程 B 在线程 A 试图弹出数据的同时修改了堆栈顶,也可以确保不会跳过堆栈中的元素。
本系列讨论了如何设计支持并发访问的数据结构。您已经看到,设计可以基于互斥锁,也可以是无锁的。无论采用哪种方式,要考虑的问题不仅仅是这些数据结构的基本功能 — 具体来说,必须一直记住线程会争夺执行权,要考虑线程重新执行时如何恢复操作。目前,解决方案(尤其是无锁解决方案)与平台/编译器紧密相关。请研究用于实现线程和锁的 Boost 库,阅读 John Valois 关于无锁链表的文章(见 参考资料 中的链接)。C++0x
标准提供了 std::thread
类,但是目前大多数编译器对它的支持很有限,甚至不支持它。
相关推荐
这篇文章介绍了一种基于无锁数据结构的FIFO(先进先出)队列算法,该算法被设计为一种高效的核间通信机制。 无锁数据结构的关键在于,它通过原子操作或者特定的算法设计,确保数据结构在并发读写时仍然保持正确性和...
设计无锁数据结构时,需要考虑避免这两种情况。 7. **性能分析与优化**:无锁数据结构通常比锁基数据结构更复杂,但可能在高并发下提供更好的性能。性能分析包括了竞争率、缓存一致性开销、CPU分支预测等。优化策略...
- **无锁数据结构**:除了队列之外,还有很多其他的数据结构也可以采用无锁编程的方式来实现,如堆栈、哈希表等。这些数据结构的设计需要考虑如何在无锁环境下保持数据的一致性和完整性,同时也需要考虑其实现的...
无锁数据结构的优势在于可以实现更高的并发性和更好的可伸缩性,但设计起来也更为复杂,因为需要处理更多的竞态条件和活锁问题。 总结来说,设计不使用互斥锁的并发数据结构可以显著提高并行计算的性能,但需要对...
无锁数据结构和算法通过避免锁的使用,直接在内存中进行原子操作,减少了系统开销,提升了并发执行的效率。Amino的实现基于这些概念,提供了无锁队列、栈、哈希表等并发容器,使得开发者能够在不牺牲性能的前提下,...
- 使用**无锁数据结构**或**非阻塞算法**,例如比较并交换(CAS)操作。 - 实施**分层锁定**或**乐观并发控制**,以更细粒度地管理并发访问。 - 避免使用传统的互斥锁,因为它们可能导致死锁和优先级反转问题。 ##...
在`lockfree-lib`这个库中,我们可以推测它提供了一些无锁数据结构的实现,比如无锁队列、无锁栈或者无锁哈希表。这些数据结构在多线程环境下可以提供高效的并发访问,而不会出现锁竞争导致的性能下降。无锁队列通常...
无锁队列是一种高效、线程安全的数据结构,它在多线程环境下广泛...在《Java并发编程实战》等书籍中,也可以找到更多关于无锁数据结构的深入讨论。在实践中,可以尝试将无锁队列应用到自己的项目中,以优化并发性能。
这一发展改变了传统数据结构设计的环境,因为现有的数据结构大多仍是为单核CPU设计,遵循顺序型原则。 2. 共享内存架构下的并发问题:在多核时代,多核处理器通过共享内存的方式来进行数据通信和同步。这一架构下,...
在多任务环境下的数据结构设计和分析中,我们面临的核心挑战是如何高效地管理和操作数据,以便在并发或并行环境中支持多个任务同时访问和修改数据。数据结构的选择和设计直接影响到程序的性能、可扩展性和资源利用率...
无锁队列是一种高效、线程安全的数据结构,尤其在多线程环境下,它通过避免锁的使用来提高并发性能。C++11引入了原子操作...这个实现不仅有助于理解无锁数据结构的设计,还能为你的C++11多线程编程提供实战经验。
- 无锁数据结构可以避免使用锁带来的性能问题,如死锁、饥饿问题以及锁的开销等。 - 无锁设计能够提高并行算法的可扩展性和性能,对于GPU这种拥有海量线程的硬件而言尤为重要。 综上所述,该文档详细描述了一种...
这些数据结构设计的目标是允许多个处理器或线程同时访问和修改数据,而不会引入不必要的冲突或降低性能。常见的分布式数据结构包括分布式哈希表(DHT)、分布式队列、分布式锁等,它们通过巧妙的分布式算法实现高效...
例如,使用无锁数据结构可以减少锁的开销,提高并发性能;使用条件变量(Condition)可以实现更精细的线程同步策略,避免不必要的等待。 在学习资源方面,"大学生 C/C++/JAVA/Python数据结构学习笔记和资料大全...
在 JAVA 中,有多种方式可以实现无锁并行,例如,使用原子变量、CAS 操作、Lock-free 数据结构等。这些方法可以避免线程之间的竞态和数据不一致,从而保障线程安全。 锁机制和无锁并行都是多线程编程中保障线程安全...
3. **并发控制**:在并行程序中,由于多个任务可能同时访问同一资源,因此需要掌握并发控制技术,如锁、信号量、条件变量和无锁数据结构,以防止数据不一致和死锁问题。 4. **通信与同步**:在共享内存和分布式内存...
首先,无锁队列是一种在多核处理器架构下优化数据结构访问和避免锁竞争的技术,它在并行计算和多线程环境中尤其有用。无锁队列可以有效提升数据处理的速度,特别是在网络包处理、任务调度等需要高并发和低延迟处理的...
10. **并发数据结构**:特殊设计的数据结构,如无锁数据结构、读写锁数据结构,可以在并发环境中提供更高的性能和安全性。 通过这本书和配书光盘中的实例,读者将能深入理解这些概念,并学会在实际项目中应用它们,...