boost方案
boost提供了三种无锁方案
boost::lockfree::queue:
支持多个生产者和多个消费者线程的无锁队列。
boost::lockfree::stack:
支持多个生产者和多个消费者线程的无锁栈。
boost::lockfree::spsc_queue:
仅支持单个生产者和单个消费者线程的无锁队列。相比boost::lockfree::queue,其效率更高。
注:这些API内部是通过轻量级原子锁实现的lock-free,不是真正意义的无锁。我看到的资料中,貌似只有linux kernel中fifo实现了真正意义上的无锁,但是仅用于与单个消费者单个生产者的环境。
boost官方文档:
http://www.boost.org/doc/libs/1_60_0/doc/html/lockfree.html
queue容量和自增长的问题
可以设置初始容量,添加新元素时如果容量不够,则总容量可能自动增长:queue在当前操作系统上如果支持lock-free,则不会自动增长,如果不支持lock-free,才会自动增长。不同的操作系统其内存分配机制不同,这样会导致在某些操作系统上的queue不支持lockfree
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<2>> q; printf("boost::lockfree:queue is lock free:%s", q.is_lock_free() ? "true" : "false"); //true //push的返回值:1,push成功;0,push失败。 size_t s1 = q.push(9); //1 size_t s2 = q.push(9); //1 size_t s3 = q.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<2>> q2; size_t s2_1 = q2.push(9); //1 size_t s2_2 = q2.push(9); //1 size_t s2_3 = q2.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<false>, boost::lockfree::capacity<2>> q3; size_t s3_1 = q3.push(9); //1 size_t s3_2 = q3.push(9); //1 size_t s3_3 = q3.push(9); //0 size_t s3_4 = q3.push(9); //0
如果不需要考虑多线程或者自己实现同步,还有一种方案:boost::circular_buffer
http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html
C++11 std::atomic方案
网上有人借用std::atomic实现的一套无锁队列,其内部实现参考了boost::lockfree::queue的设计思路,可以适用于多个消费者多个生产者线程。
A High Performance Lock Free Ring Queue
http://www.codeproject.com/Tips/754393/A-High-Performance-Lock-Free-Ring-Queue
下面代码我在原文基础上做了修改:最新的编译器已不支持std::atomic_flag在构造函数中初始化。
lfringqueue.h
#ifndef INCLUDED_UTILS_LFRINGQUEUE #define INCLUDED_UTILS_LFRINGQUEUE #define _ENABLE_ATOMIC_ALIGNMENT_FIX #define ATOMIC_FLAG_INIT 0 #pragma once #include <vector> #include <mutex> #include <thread> #include <atomic> #include <chrono> #include <cstring> #include <iostream> // Lock free ring queue template < typename _TyData, long _uiCount = 100000 > class lfringqueue { public: lfringqueue( long uiCount = _uiCount ) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount( uiCount ) { m_queue = new _TyData*[m_uiCount]; memset( m_queue, 0, sizeof(_TyData*) * m_uiCount ); } ~lfringqueue() { if ( m_queue ) delete [] m_queue; } bool enqueue( _TyData *pdata, unsigned int uiRetries = 1000 ) { if ( NULL == pdata ) { // Null enqueues are not allowed return false; } unsigned int uiCurrRetries = 0; while ( uiCurrRetries < uiRetries ) { // Release fence in order to prevent memory reordering // of any read or write with following write std::atomic_thread_fence(std::memory_order_release); long lHeadIterator = m_lHeadIterator; if ( NULL == m_queue[lHeadIterator] ) { long lHeadIteratorOrig = lHeadIterator; ++lHeadIterator; if ( lHeadIterator >= m_uiCount ) lHeadIterator = 0; // Don't worry if this CAS fails. It only means some thread else has // already inserted an item and set it. if ( std::atomic_compare_exchange_strong( &m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator ) ) { // void* are always atomic (you wont set a partial pointer). m_queue[lHeadIteratorOrig] = pdata; if ( m_lEventSet.test_and_set( )) { m_bHasItem.test_and_set(); } return true; } } else { // The queue is full. Spin a few times to check to see if an item is popped off. ++uiCurrRetries; } } return false; } bool dequeue( _TyData **ppdata ) { if ( !ppdata ) { // Null dequeues are not allowed! return false; } bool bDone = false; bool bCheckQueue = true; while ( !bDone ) { // Acquire fence in order to prevent memory reordering // of any read or write with following read std::atomic_thread_fence(std::memory_order_acquire); //MemoryBarrier(); long lTailIterator = m_lTailIterator; _TyData *pdata = m_queue[lTailIterator]; //volatile _TyData *pdata = m_queue[lTailIterator]; if ( NULL != pdata ) { bCheckQueue = true; long lTailIteratorOrig = lTailIterator; ++lTailIterator; if ( lTailIterator >= m_uiCount ) lTailIterator = 0; //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig )) if ( std::atomic_compare_exchange_strong( &m_lTailIterator, &lTailIteratorOrig, lTailIterator )) { // Sets of sizeof(void*) are always atomic (you wont set a partial pointer). m_queue[lTailIteratorOrig] = NULL; // Gets of sizeof(void*) are always atomic (you wont get a partial pointer). *ppdata = (_TyData*)pdata; return true; } } else { bDone = true; m_lEventSet.clear(); } } *ppdata = NULL; return false; } long countguess() const { long lCount = trycount(); if ( 0 != lCount ) return lCount; // If the queue is full then the item right before the tail item will be valid. If it // is empty then the item should be set to NULL. long lLastInsert = m_lTailIterator - 1; if ( lLastInsert < 0 ) lLastInsert = m_uiCount - 1; _TyData *pdata = m_queue[lLastInsert]; if ( pdata != NULL ) return m_uiCount; return 0; } long getmaxsize() const { return m_uiCount; } bool HasItem() { return m_bHasItem.test_and_set(); } void SetItemFlagBack() { m_bHasItem.clear(); } private: long trycount() const { long lHeadIterator = m_lHeadIterator; long lTailIterator = m_lTailIterator; if ( lTailIterator > lHeadIterator ) return m_uiCount - lTailIterator + lHeadIterator; // This has a bug where it returns 0 if the queue is full. return lHeadIterator - lTailIterator; } private: std::atomic<long> m_lHeadIterator; // enqueue index std::atomic<long> m_lTailIterator; // dequeue index _TyData **m_queue; // array of pointers to the data long m_uiCount; // size of the array std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT; // a flag to use whether we should change the item flag std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT; // a flag to indicate whether there is an item enqueued }; #endif //INCLUDED_UTILS_LFRINGQUEUE
测试:
/* * File: main.cpp * Author: Peng * * Created on February 22, 2014, 9:55 PM */ #include <cstdlib> #include "lfringqueue.h" #include <mutex> #include <stdio.h> #include <string> #include <set> #include <random> #include <chrono> #include <iostream> #include <ctime> #include <atomic> #include <sstream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #include <iostream> #include <boost/atomic.hpp> const long NUM_DATA = 10; const int NUM_ENQUEUE_THREAD = 1; const int NUM_DEQUEUE_THREAD = 1; const long NUM_ITEM = 1000000; using namespace std; class Data { public: Data( int i = 0 ) : m_iData(i) { stringstream ss; ss << i; m_szDataString = ss.str(); //sprintf( m_szDataString, "%l-d", i); } bool operator< ( const Data & aData) const { if ( m_iData < aData.m_iData) return true; else return false; } int& GetData() { return m_iData; } private: int m_iData; string m_szDataString; //char m_szDataString[MAX_DATA_SIZE]; }; Data DataArray[NUM_DATA]; constexpr long size = 0.5 * NUM_DATA; lfringqueue < Data, 1000> LockFreeQueue; boost::lockfree::queue<Data*> BoostQueue(1000); // Since there is a chance that the searched number cannot be found, so the function should return boolean bool BinarySearchNumberInSortedArray( Data datas[], int iStart, int iEnd, int SearchedNum, int &iFound ) { if ( iEnd - iStart <= 1 ) { if ( datas[iStart].GetData() == SearchedNum ) { iFound = iStart; return true; } else if ( datas[iEnd].GetData() == SearchedNum ) { iFound = iEnd; return true; } else return false; } int mid = 0.5 * ( iStart + iEnd ); if ( datas[mid].GetData() == SearchedNum ) { iFound = mid; return true; } if ( datas[mid].GetData() > SearchedNum ) { if ( mid - 1 >= 0) return BinarySearchNumberInSortedArray ( datas, iStart, mid - 1, SearchedNum, iFound); else return false; } else { if ( mid + 1 <= iEnd ) return BinarySearchNumberInSortedArray ( datas, mid + 1, iEnd, SearchedNum, iFound); else return false; } } bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue() { std::uniform_int_distribution<int> dis(1, NUM_DATA); default_random_engine engine{}; for ( long i = 0; i < NUM_ITEM; i++ ) { int x = dis ( engine ); int iFoundIndex; if ( BinarySearchNumberInSortedArray(DataArray, 0, NUM_DATA - 1, x, iFoundIndex ) ) { Data* pData = &DataArray[iFoundIndex]; LockFreeQueue.enqueue( pData ); //BoostQueue.push( pData ); } } } bool Dequeue() { Data *pData; for ( long i = 0; i < NUM_ITEM; i ++) { while ( LockFreeQueue.dequeue( &pData ) ); //while ( BoostQueue.pop( pData ) ) ; } } int main(int argc, char** argv) { for ( int i = 1; i < NUM_DATA + 1; i++ ) { Data data(i); DataArray[i-1] = data; } std::thread PublishThread[NUM_ENQUEUE_THREAD]; std::thread ConsumerThread[NUM_DEQUEUE_THREAD]; std::chrono::duration<double> elapsed_seconds; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i] = std::thread( GenerateRandomNumber_FindPointerToTheNumber_EnQueue ); } auto start = std::chrono::high_resolution_clock::now(); for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i] = std::thread{ Dequeue}; } for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i].join(); } auto end = std::chrono::high_resolution_clock::now(); elapsed_seconds = end - start; std::cout << "Enqueue and Dequeue 1 million item in:" << elapsed_seconds.count() << std::endl; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i].join(); } return 0; }
相关推荐
在C++中,无锁数据结构的实现通常需要对C++11标准库中的原子操作类(std::atomic)有深入理解。concurrentqueue是一种常见的无锁队列实现,它通常用于多线程环境中的消息传递或任务调度。在这个场景中,concurrent...
C++标准库中的`std::queue`或第三方库如Boost的`boost::lockfree::queue`可以用于实现无锁数据传输。 3. **原子操作**:对于简单的数据更新,C++提供了一组原子操作(如`std::atomic`),它们在多线程环境下保证了...
7. **免锁消息队列**:为了在多线程间安全地传递消息,免锁(lock-free)数据结构如无锁队列能提高并发性能。无锁算法需要利用原子操作(atomic operations),如C++的`std::atomic`库。 8. **免锁数据缓冲区**:免...
基于springboot大学生就业信息管理系统源码数据库文档.zip
基于java的驾校收支管理可视化平台的开题报告
时间序列 原木 间隔5秒钟 20241120
毕业设计&课设_基于 Vue 的电影在线预订与管理系统:后台 Java(SSM)代码,为毕业设计项目.zip
基于springboot课件通中小学教学课件共享平台源码数据库文档.zip
基于java的网上购物商城的开题报告
Delphi人脸检测与识别Demo1fdef-main.zip
基于java的咖啡在线销售系统的开题报告
基于java的自助医疗服务系统的开题报告.docx
内容概要:本文档全面介绍了Visual Basic(VB)编程语言的基础知识和高级应用。首先概述了VB的基本特性和开发环境,随后详细讲述了VB的数据类型、变量、运算符、控制结构、数组、过程与函数、变量作用域等内容。接着介绍了窗体设计、控件使用、菜单与工具栏的设计,文件操作、数据库访问等关键知识点。最后讨论了VB的学习方法、发展历史及其在桌面应用、Web应用、数据库应用、游戏开发和自动化脚本编写等领域的广泛应用前景。 适合人群:初学者和中级程序员,尤其是希望快速掌握Windows桌面应用开发的人群。 使用场景及目标:①掌握VB的基础语法和开发环境;②学会使用VB创建复杂的用户界面和功能完整的应用程序;③理解数据库操作、文件管理和网络编程等高级主题。 其他说明:Visual Basic是一种简单易学且功能强大的编程语言,尤其适合用于开发Windows桌面应用。文中不仅覆盖了基础知识,还包括了大量的实用案例和技术细节,帮助读者快速提升编程技能。
基于java的疫情期间高校防控系统开题报告.docx
基于springboot+vue社区老年人帮扶系统源码数据库文档.zip
基于java的超市商品管理系统的开题报告.docx
基于SpringBoot房屋买卖平台源码数据库文档.zip
xdu限通院23微处理器系统与应用大作业(两只老虎),适应于汇编语言keil软件,
<项目介绍> - 新闻类网站系统,基于SSM(Spring、Spring MVC、MyBatis)+MySQL开发,高分成品毕业设计,附带往届论文 - 不懂运行,下载完可以私聊问,可远程教学 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------
基于java的学生网上请假系统的开题报告.docx