注:这些API内部是通过轻量级原子锁实现的lock-free,不是真正意义的无锁。我看到的资料中,貌似只有linux kernel中fifo实现了真正意义上的无锁,但是仅用于与单个消费者单个生产者的环境。
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<2>> q; printf("boost::lockfree:queue is lock free:%s", q.is_lock_free() ? "true" : "false"); //true //push的返回值:1,push成功;0,push失败。 size_t s1 = q.push(9); //1 size_t s2 = q.push(9); //1 size_t s3 = q.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<2>> q2; size_t s2_1 = q2.push(9); //1 size_t s2_2 = q2.push(9); //1 size_t s2_3 = q2.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<false>, boost::lockfree::capacity<2>> q3; size_t s3_1 = q3.push(9); //1 size_t s3_2 = q3.push(9); //1 size_t s3_3 = q3.push(9); //0 size_t s3_4 = q3.push(9); //0
C++11 std::atomic方案
A High Performance Lock Free Ring Queue
#ifndef INCLUDED_UTILS_LFRINGQUEUE #define INCLUDED_UTILS_LFRINGQUEUE #define _ENABLE_ATOMIC_ALIGNMENT_FIX #define ATOMIC_FLAG_INIT 0 #pragma once #include <vector> #include <mutex> #include <thread> #include <atomic> #include <chrono> #include <cstring> #include <iostream> // Lock free ring queue template < typename _TyData, long _uiCount = 100000 > class lfringqueue { public: lfringqueue( long uiCount = _uiCount ) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount( uiCount ) { m_queue = new _TyData*[m_uiCount]; memset( m_queue, 0, sizeof(_TyData*) * m_uiCount ); } ~lfringqueue() { if ( m_queue ) delete [] m_queue; } bool enqueue( _TyData *pdata, unsigned int uiRetries = 1000 ) { if ( NULL == pdata ) { // Null enqueues are not allowed return false; } unsigned int uiCurrRetries = 0; while ( uiCurrRetries < uiRetries ) { // Release fence in order to prevent memory reordering // of any read or write with following write std::atomic_thread_fence(std::memory_order_release); long lHeadIterator = m_lHeadIterator; if ( NULL == m_queue[lHeadIterator] ) { long lHeadIteratorOrig = lHeadIterator; ++lHeadIterator; if ( lHeadIterator >= m_uiCount ) lHeadIterator = 0; // Don't worry if this CAS fails. It only means some thread else has // already inserted an item and set it. if ( std::atomic_compare_exchange_strong( &m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator ) ) { // void* are always atomic (you wont set a partial pointer). m_queue[lHeadIteratorOrig] = pdata; if ( m_lEventSet.test_and_set( )) { m_bHasItem.test_and_set(); } return true; } } else { // The queue is full. Spin a few times to check to see if an item is popped off. ++uiCurrRetries; } } return false; } bool dequeue( _TyData **ppdata ) { if ( !ppdata ) { // Null dequeues are not allowed! return false; } bool bDone = false; bool bCheckQueue = true; while ( !bDone ) { // Acquire fence in order to prevent memory reordering // of any read or write with following read std::atomic_thread_fence(std::memory_order_acquire); //MemoryBarrier(); long lTailIterator = m_lTailIterator; _TyData *pdata = m_queue[lTailIterator]; //volatile _TyData *pdata = m_queue[lTailIterator]; if ( NULL != pdata ) { bCheckQueue = true; long lTailIteratorOrig = lTailIterator; ++lTailIterator; if ( lTailIterator >= m_uiCount ) lTailIterator = 0; //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig )) if ( std::atomic_compare_exchange_strong( &m_lTailIterator, &lTailIteratorOrig, lTailIterator )) { // Sets of sizeof(void*) are always atomic (you wont set a partial pointer). m_queue[lTailIteratorOrig] = NULL; // Gets of sizeof(void*) are always atomic (you wont get a partial pointer). *ppdata = (_TyData*)pdata; return true; } } else { bDone = true; m_lEventSet.clear(); } } *ppdata = NULL; return false; } long countguess() const { long lCount = trycount(); if ( 0 != lCount ) return lCount; // If the queue is full then the item right before the tail item will be valid. If it // is empty then the item should be set to NULL. long lLastInsert = m_lTailIterator - 1; if ( lLastInsert < 0 ) lLastInsert = m_uiCount - 1; _TyData *pdata = m_queue[lLastInsert]; if ( pdata != NULL ) return m_uiCount; return 0; } long getmaxsize() const { return m_uiCount; } bool HasItem() { return m_bHasItem.test_and_set(); } void SetItemFlagBack() { m_bHasItem.clear(); } private: long trycount() const { long lHeadIterator = m_lHeadIterator; long lTailIterator = m_lTailIterator; if ( lTailIterator > lHeadIterator ) return m_uiCount - lTailIterator + lHeadIterator; // This has a bug where it returns 0 if the queue is full. return lHeadIterator - lTailIterator; } private: std::atomic<long> m_lHeadIterator; // enqueue index std::atomic<long> m_lTailIterator; // dequeue index _TyData **m_queue; // array of pointers to the data long m_uiCount; // size of the array std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT; // a flag to use whether we should change the item flag std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT; // a flag to indicate whether there is an item enqueued }; #endif //INCLUDED_UTILS_LFRINGQUEUE
/* * File: main.cpp * Author: Peng * * Created on February 22, 2014, 9:55 PM */ #include <cstdlib> #include "lfringqueue.h" #include <mutex> #include <stdio.h> #include <string> #include <set> #include <random> #include <chrono> #include <iostream> #include <ctime> #include <atomic> #include <sstream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #include <iostream> #include <boost/atomic.hpp> const long NUM_DATA = 10; const int NUM_ENQUEUE_THREAD = 1; const int NUM_DEQUEUE_THREAD = 1; const long NUM_ITEM = 1000000; using namespace std; class Data { public: Data( int i = 0 ) : m_iData(i) { stringstream ss; ss << i; m_szDataString = ss.str(); //sprintf( m_szDataString, "%l-d", i); } bool operator< ( const Data & aData) const { if ( m_iData < aData.m_iData) return true; else return false; } int& GetData() { return m_iData; } private: int m_iData; string m_szDataString; //char m_szDataString[MAX_DATA_SIZE]; }; Data DataArray[NUM_DATA]; constexpr long size = 0.5 * NUM_DATA; lfringqueue < Data, 1000> LockFreeQueue; boost::lockfree::queue<Data*> BoostQueue(1000); // Since there is a chance that the searched number cannot be found, so the function should return boolean bool BinarySearchNumberInSortedArray( Data datas[], int iStart, int iEnd, int SearchedNum, int &iFound ) { if ( iEnd - iStart <= 1 ) { if ( datas[iStart].GetData() == SearchedNum ) { iFound = iStart; return true; } else if ( datas[iEnd].GetData() == SearchedNum ) { iFound = iEnd; return true; } else return false; } int mid = 0.5 * ( iStart + iEnd ); if ( datas[mid].GetData() == SearchedNum ) { iFound = mid; return true; } if ( datas[mid].GetData() > SearchedNum ) { if ( mid - 1 >= 0) return BinarySearchNumberInSortedArray ( datas, iStart, mid - 1, SearchedNum, iFound); else return false; } else { if ( mid + 1 <= iEnd ) return BinarySearchNumberInSortedArray ( datas, mid + 1, iEnd, SearchedNum, iFound); else return false; } } bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue() { std::uniform_int_distribution<int> dis(1, NUM_DATA); default_random_engine engine{}; for ( long i = 0; i < NUM_ITEM; i++ ) { int x = dis ( engine ); int iFoundIndex; if ( BinarySearchNumberInSortedArray(DataArray, 0, NUM_DATA - 1, x, iFoundIndex ) ) { Data* pData = &DataArray[iFoundIndex]; LockFreeQueue.enqueue( pData ); //BoostQueue.push( pData ); } } } bool Dequeue() { Data *pData; for ( long i = 0; i < NUM_ITEM; i ++) { while ( LockFreeQueue.dequeue( &pData ) ); //while ( BoostQueue.pop( pData ) ) ; } } int main(int argc, char** argv) { for ( int i = 1; i < NUM_DATA + 1; i++ ) { Data data(i); DataArray[i-1] = data; } std::thread PublishThread[NUM_ENQUEUE_THREAD]; std::thread ConsumerThread[NUM_DEQUEUE_THREAD]; std::chrono::duration<double> elapsed_seconds; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i] = std::thread( GenerateRandomNumber_FindPointerToTheNumber_EnQueue ); } auto start = std::chrono::high_resolution_clock::now(); for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i] = std::thread{ Dequeue}; } for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i].join(); } auto end = std::chrono::high_resolution_clock::now(); elapsed_seconds = end - start; std::cout << "Enqueue and Dequeue 1 million item in:" << elapsed_seconds.count() << std::endl; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i].join(); } return 0; }
