`
aigo
  • 浏览: 2698236 次
  • 性别: Icon_minigender_1
  • 来自: 宜昌
社区版块
存档分类
最新评论

[C++]boost提供的几种lock-free方案以及std::atomic实现无锁队列

阅读更多

boost方案

boost提供了三种无锁方案

boost::lockfree::queue

支持多个生产者和多个消费者线程的无锁队列

boost::lockfree::stack

支持多个生产者和多个消费者线程的无锁栈。

boost::lockfree::spsc_queue

支持单个生产者单个消费者线程的无锁队列。相比boost::lockfree::queue,其效率更高。

 

注:这些API内部是通过轻量级原子锁实现的lock-free,不是真正意义的无锁。我看到的资料中,貌似只有linux kernel中fifo实现了真正意义上的无锁,但是仅用于与单个消费者单个生产者的环境。

 

boost官方文档:

http://www.boost.org/doc/libs/1_60_0/doc/html/lockfree.html

 

queue容量和自增长的问题

可以设置初始容量,添加新元素时如果容量不够,则总容量可能自动增长:queue在当前操作系统上如果支持lock-free,则不会自动增长,如果不支持lock-free,才会自动增长。不同的操作系统其内存分配机制不同,这样会导致在某些操作系统上的queue不支持lockfree

boost::lockfree::spsc_queue<int, boost::lockfree::capacity<2>> q;
printf("boost::lockfree:queue is lock free:%s", q.is_lock_free() ? "true" : "false");	//true

//push的返回值:1,push成功;0,push失败。
size_t s1 = q.push(9);	//1
size_t s2 = q.push(9);	//1
size_t s3 = q.push(9);	//0

boost::lockfree::queue<int, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<2>> q2;
size_t s2_1 = q2.push(9);	//1
size_t s2_2 = q2.push(9);	//1
size_t s2_3 = q2.push(9);	//0

boost::lockfree::queue<int, boost::lockfree::fixed_sized<false>, boost::lockfree::capacity<2>> q3;
size_t s3_1 = q3.push(9);	//1
size_t s3_2 = q3.push(9);	//1
size_t s3_3 = q3.push(9);	//0
size_t s3_4 = q3.push(9);	//0
 

 

如果不需要考虑多线程或者自己实现同步,还有一种方案:boost::circular_buffer

http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html

 

 

C++11 std::atomic方案

网上有人借用std::atomic实现的一套无锁队列,其内部实现参考了boost::lockfree::queue的设计思路,可以适用于多个消费者多个生产者线程。

A High Performance Lock Free Ring Queue

http://www.codeproject.com/Tips/754393/A-High-Performance-Lock-Free-Ring-Queue

下面代码我在原文基础上做了修改:最新的编译器已不支持std::atomic_flag在构造函数中初始化

 lfringqueue.h

#ifndef INCLUDED_UTILS_LFRINGQUEUE
#define INCLUDED_UTILS_LFRINGQUEUE

#define _ENABLE_ATOMIC_ALIGNMENT_FIX
#define ATOMIC_FLAG_INIT 0


#pragma once


#include <vector>
#include <mutex>
#include <thread>
#include <atomic>
#include <chrono>
#include <cstring>
#include <iostream>

// Lock free ring queue 

template < typename _TyData, long _uiCount = 100000 >
class lfringqueue
{
public:
    lfringqueue( long uiCount = _uiCount ) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount( uiCount )
    {
        m_queue = new _TyData*[m_uiCount];
        memset( m_queue, 0, sizeof(_TyData*) * m_uiCount );
    }

    ~lfringqueue()
    {
        if ( m_queue )
            delete [] m_queue;
    }

 
    bool enqueue( _TyData *pdata, unsigned int uiRetries = 1000 )
    {
        if ( NULL == pdata )
        {
            // Null enqueues are not allowed
            return false;
        }

        unsigned int uiCurrRetries = 0;
        while ( uiCurrRetries < uiRetries )
        {
            // Release fence in order to prevent memory reordering 
            // of any read or write with following write
            std::atomic_thread_fence(std::memory_order_release);
            
            long lHeadIterator = m_lHeadIterator;

            if ( NULL == m_queue[lHeadIterator] )
            {
                long lHeadIteratorOrig = lHeadIterator;

                ++lHeadIterator;
                if ( lHeadIterator >= m_uiCount )
                        lHeadIterator = 0;

                // Don't worry if this CAS fails.  It only means some thread else has
                // already inserted an item and set it.
                if ( std::atomic_compare_exchange_strong( &m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator ) )             
                {
                    // void* are always atomic (you wont set a partial pointer).
                    m_queue[lHeadIteratorOrig] = pdata;
                  
                    if ( m_lEventSet.test_and_set( ))
                    {
                        m_bHasItem.test_and_set();
                    }
                    return true;
                }
            }
            else
            {
                // The queue is full.  Spin a few times to check to see if an item is popped off.
                ++uiCurrRetries;
            }
        }
        return false;
    }

    bool dequeue( _TyData **ppdata )
    {
        if ( !ppdata )
        {
            // Null dequeues are not allowed!
            return false;
        }

        bool bDone = false;
        bool bCheckQueue = true;

        while ( !bDone )
        {
            // Acquire fence in order to prevent memory reordering 
            // of any read or write with following read
            std::atomic_thread_fence(std::memory_order_acquire);
            //MemoryBarrier();
            long lTailIterator = m_lTailIterator;
            _TyData *pdata = m_queue[lTailIterator];
            //volatile _TyData *pdata = m_queue[lTailIterator];            
            if ( NULL != pdata )
            {
                bCheckQueue = true;
                long lTailIteratorOrig = lTailIterator;

                ++lTailIterator;
                if ( lTailIterator >= m_uiCount )
                        lTailIterator = 0;

                //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig ))
                if ( std::atomic_compare_exchange_strong( &m_lTailIterator, &lTailIteratorOrig, lTailIterator ))
                {
                        // Sets of sizeof(void*) are always atomic (you wont set a partial pointer).
                        m_queue[lTailIteratorOrig] = NULL;

                        // Gets of sizeof(void*) are always atomic (you wont get a partial pointer).
                        *ppdata = (_TyData*)pdata;

                        return true;
                }
            }
            else
            {
                bDone = true;
                m_lEventSet.clear();
            }
        }
        *ppdata = NULL;
        return false;
    }
	
	
    long countguess() const
    {
        long lCount = trycount();

        if ( 0 != lCount )
                return lCount;

        // If the queue is full then the item right before the tail item will be valid.  If it
        // is empty then the item should be set to NULL.
        long lLastInsert = m_lTailIterator - 1;
        if ( lLastInsert < 0 )
                lLastInsert = m_uiCount - 1;

        _TyData *pdata = m_queue[lLastInsert];
        if ( pdata != NULL ) 
                return m_uiCount;

        return 0;
    }

    long getmaxsize() const
    {
        return m_uiCount;
    }

    bool HasItem()
    {
        return m_bHasItem.test_and_set();
    }

    void SetItemFlagBack()
    {
        m_bHasItem.clear();
    }
	
private:
    long trycount() const
    {
        long lHeadIterator = m_lHeadIterator;
        long lTailIterator = m_lTailIterator;

        if ( lTailIterator > lHeadIterator )
                return m_uiCount - lTailIterator + lHeadIterator;

        // This has a bug where it returns 0 if the queue is full.
        return lHeadIterator - lTailIterator;
    }
	
private:    
    std::atomic<long> m_lHeadIterator;  // enqueue index
    std::atomic<long> m_lTailIterator;  // dequeue index
    _TyData **m_queue;                  // array of pointers to the data
    long m_uiCount;                     // size of the array
    std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT;       // a flag to use whether we should change the item flag
    std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT;        // a flag to indicate whether there is an item enqueued
};

#endif //INCLUDED_UTILS_LFRINGQUEUE

 

 

测试:

/* 
 * File:   main.cpp
 * Author: Peng
 *
 * Created on February 22, 2014, 9:55 PM
 */

#include <cstdlib>
#include "lfringqueue.h"
#include <mutex>
#include <stdio.h>
#include <string>
#include <set>
#include <random>
#include <chrono>
#include <iostream>
#include <ctime>
#include <atomic>
#include <sstream>

#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <iostream>

#include <boost/atomic.hpp>

const long NUM_DATA = 10;
const int NUM_ENQUEUE_THREAD = 1;
const int NUM_DEQUEUE_THREAD = 1;
const long NUM_ITEM = 1000000;
        
using namespace std;
class Data
{
public:   
    Data( int i = 0 ) : m_iData(i)
    {
        stringstream ss;
        ss << i;
        m_szDataString = ss.str();
        //sprintf( m_szDataString, "%l-d", i);    
    }
        
    bool operator< ( const Data & aData) const
    {
        if ( m_iData < aData.m_iData)
            return true;
        else
            return false;
    }
    
    int& GetData()
    {
        return m_iData;
    }
private:   
    int m_iData;
    string m_szDataString;
    //char m_szDataString[MAX_DATA_SIZE];
};

Data DataArray[NUM_DATA];

constexpr long size = 0.5 * NUM_DATA;
lfringqueue < Data, 1000> LockFreeQueue; 
boost::lockfree::queue<Data*> BoostQueue(1000);

// Since there is a chance that the searched number cannot be found, so the function should return boolean
bool BinarySearchNumberInSortedArray( Data datas[], int iStart, int iEnd, int SearchedNum, int &iFound )
{
    if ( iEnd - iStart <= 1 )
    {
        if ( datas[iStart].GetData() == SearchedNum )
        {
            iFound = iStart;
            return true;
        }
        else if ( datas[iEnd].GetData() == SearchedNum )
        {
            iFound = iEnd;
            return true;
        }
        else
            return false;
    }
    
    int mid = 0.5 * ( iStart + iEnd );
    
    if ( datas[mid].GetData() == SearchedNum )
    {
        iFound = mid;
        return true;
    }
    
    if ( datas[mid].GetData() > SearchedNum )
    {
        if ( mid - 1 >= 0)
            return BinarySearchNumberInSortedArray ( datas, iStart, mid - 1, SearchedNum, iFound);
        else
            return false;
    }
    else
    {
        if ( mid + 1 <= iEnd )
            return BinarySearchNumberInSortedArray ( datas, mid + 1, iEnd, SearchedNum, iFound);
        else
            return false;
    }
}
bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue()
{
    std::uniform_int_distribution<int> dis(1, NUM_DATA);
    default_random_engine engine{};
       
    for ( long i = 0; i < NUM_ITEM; i++ )
    {
        int x = dis ( engine );
        
        int iFoundIndex;
        if ( BinarySearchNumberInSortedArray(DataArray, 0, NUM_DATA - 1, x, iFoundIndex ) )
        {
            Data* pData = &DataArray[iFoundIndex];
            LockFreeQueue.enqueue( pData );
            //BoostQueue.push( pData );
        }
    }
}
bool Dequeue()
{
    Data *pData;

    for ( long i = 0; i < NUM_ITEM; i ++)
    {
        while (  LockFreeQueue.dequeue( &pData ) );       
        //while (  BoostQueue.pop( pData ) ) ;      
    }    
}

int main(int argc, char** argv) 
{
    for ( int i = 1; i < NUM_DATA + 1; i++ )
    {
        Data data(i);
        DataArray[i-1] = data;
    }
     
    std::thread PublishThread[NUM_ENQUEUE_THREAD]; 
    std::thread ConsumerThread[NUM_DEQUEUE_THREAD];
    std::chrono::duration<double> elapsed_seconds;
  
    for ( int i = 0; i < NUM_ENQUEUE_THREAD;  i++ )
    {
        PublishThread[i] = std::thread( GenerateRandomNumber_FindPointerToTheNumber_EnQueue ); 
    }
    
    auto start = std::chrono::high_resolution_clock::now();
    for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ )
    {
        ConsumerThread[i] = std::thread{ Dequeue};
    }
    
    for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ )
    {
        ConsumerThread[i].join();
    }   

    auto end = std::chrono::high_resolution_clock::now();
    elapsed_seconds = end - start;
    std::cout << "Enqueue and Dequeue 1 million item in:" << elapsed_seconds.count() << std::endl;
   

    for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ )
    {
        PublishThread[i].join();
    }
             
    return 0;
}

 

分享到:
评论

相关推荐

    一个c++11实现的无锁队列.zip

    在这个“一个c++11实现的无锁队列”中,我们可以探讨以下几个关键知识点: 1. **无锁编程**:无锁编程是一种并发编程技术,它避免了线程间的竞争条件和死锁问题,通过原子操作保证数据的一致性。在无锁队列中,数据...

    lock-free circular array queue.

    2. `array_lock_free_queue.h` 和 `array_lock_free_queue_single_producer.h`:这是无锁队列的接口定义,为用户提供了使用无锁队列的API。 3. `g_blocking_queue.h` 和 `g_blocking_queue_impl.h`:可能是阻塞队列...

    c++11无锁队列的一种简单实现.pptx

    此外,还有一个特殊的`std::atomic_flag`,它用于实现无锁(lock-free)自旋锁,通过`test_and_set()`和`clear()`函数完成。 内存模型在C++11中扮演着重要的角色。现代处理器的乱序执行可能会导致数据可见性问题,...

    atomic_queue:C ++无锁队列

    atomic_queue 基于带有循环缓冲区的C ++ 14多生产者多消费者无锁队列。 这些队列遵循的主要设计原理是极简主义:原子操作的最基本要求,固定大小的缓冲区,值语义。 这些品质也有局限性: 最大队列大小必须在编译...

    线程安全的std :: map和无锁映射的速度

    2. **无锁编程**:无锁编程(Lock-free Programming)是一种优化多线程并发性能的技术,它避免了线程间的互斥锁定,通过原子操作(如std::atomic)来更新数据,从而减少锁竞争,提高系统效率。无锁映射(Lock-free ...

    Lock-Free Data Structures

    ### 锁自由数据结构(Lock-Free Data Structures) 在多线程编程领域,锁自由数据结构是一种重要的技术,它改变了传统上依赖锁来实现线程安全的方式。本文将深入探讨锁自由数据结构的基本概念、原理以及它们如何在...

    c++无锁队列的完整资源

    在C++中,无锁队列(Lock-Free Queue)通常使用原子操作(Atomic Operations)来保证在没有互斥锁的情况下也能正确地进行元素的入队和出队操作。这种技术在高并发场景下能避免线程竞争和死锁问题,从而提高程序的...

    基于cas的无锁队列实现

    无锁队列是一种在多线程环境下实现高效并发操作的数据结构。它利用了硬件原子操作,如Compare and Swap (CAS)指令,来避免在更新数据时出现锁竞争,从而提高系统的并行性能。在这个场景中,我们将深入探讨如何在C++...

    hashtable C++无锁(std_atomic)&U32非0&不可扩大.rar

    无锁哈希表(Lock-free Hash Table)是一种高级并发数据结构,它利用原子操作(std::atomic)来实现线程安全的数据访问,避免了在多线程环境下使用锁导致的性能瓶颈。在这个“hashtable C++无锁(std_atomic)&U32非0&...

    lock-free:无锁数据结构

    总结来说,"lock-free"项目提供了一个用D语言实现的无锁数据结构库,包括SRSW队列和一个尚在完善阶段的双向链表。这些数据结构利用了原子操作来实现线程间的无锁通信,从而在多核环境下实现高效并发。项目的源代码...

    C++经典面试题 C++

    C++是一种强大的、通用的编程语言,被广泛应用于系统软件、应用软件、游戏开发、设备驱动程序等。在面试中,C++相关的知识点通常涵盖了语言基础、标准库、内存管理、模板、异常处理、STL(Standard Template Library...

    安全栈表实现,C++11实现,使用atomic特性

    `LockFreeStack.h`可能包含了一个无锁栈(Lock-Free Stack)的实现,这是一种更高级的并发数据结构,它完全避免了锁的使用,通过原子操作实现更高效率的并发访问。`targetver.h`和`stdafx.h`是Visual Studio项目中的...

    lock-free-wait-free-circularfifo.zip_Free!_circularfifo

    在lock-free-wait-free-circularfifo.zip文件中,包含了实现无锁等待自由循环FIFO队列的相关代码和示例。通过对这些代码的分析和学习,开发者可以深入理解如何在实际项目中应用这些高级并发技术,提高软件的并发性能...

    C++ concurrency cheatsheet

    - std::atomic:提供了对原子类型的操作,用于实现无锁编程和高度优化的并发控制。 5. 线程局部存储 线程局部存储(Thread Local Storage,TLS)是C++提供的一种机制,允许每个线程都有自己的数据变量副本。 - std...

    awesome-lockfree:关于免等待和无锁编程的资源集合

    无锁编程(Lock-Free Programming)和免等待编程(Wait-Free Programming)是计算机科学中的高级并发编程技术,尤其在多核处理器系统中,它们能够提供高效、高性能的解决方案。本资源集合"awesome-lockfree"专注于这...

    single-producer, single-consumer lock-free queue

    无锁队列(lock-free queue)通过原子操作(atomic operations)来实现,避免了锁的使用,提升了系统的并行性。 无锁队列的核心是原子操作,如CAS(Compare-and-Swap)或FAS(Fetch-and-Store),这些操作是硬件...

    readerwriterqueue无锁生产者消费者测试源码

    1. **std::atomic**:C++11引入的`std::atomic`库提供了一组原子类型和操作,用于实现无锁编程。在Reader-Writer Queue中,可能用到`std::atomic&lt;bool&gt;`、`std::atomic&lt;int&gt;`等来实现状态的原子更新。 2. **CAS操作...

Global site tag (gtag.js) - Google Analytics