`
aigo
  • 浏览: 2687705 次
  • 性别: Icon_minigender_1
  • 来自: 宜昌
社区版块
存档分类
最新评论

单个生产者单个消费者无锁队列c++实现

阅读更多

keyword: lockfree、lock-free、queue、buffer、无锁

相关文章:

无锁环形缓冲(Wait-free ring buffer) 

http://aigo.iteye.com/blog/1913518

 

方案:用atomic实现

boost提供了一种标准的所谓Wait-free ring buffer,官方文档上叫buffer,但是其实现其实还是queue,因为push和pop的时候没有size参数,就是说其内部是具体的元素对象,不是类似字节流那种闭环的buffer。

另外,他其实还是用到了轻量级的线程同步atomic,但比lock的开销少至少一个数量级。
下面例子是boost官方文档上的,这里只是提供了思路,没有boost也行,std::atomic也能实现

Wait-free ring buffer

http://www.boost.org/doc/libs/1_60_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_ringbuffer.implementation

实现:

 

#include <boost/atomic.hpp>

template<typename T, size_t Size>
class ringbuffer {
public:
  ringbuffer() : head_(0), tail_(0) {}

  bool push(const T & value)
  {
    size_t head = head_.load(boost::memory_order_relaxed);
    size_t next_head = next(head);
    if (next_head == tail_.load(boost::memory_order_acquire))
      return false;
    ring_[head] = value;
    head_.store(next_head, boost::memory_order_release);
    return true;
  }
  bool pop(T & value)
  {
    size_t tail = tail_.load(boost::memory_order_relaxed);
    if (tail == head_.load(boost::memory_order_acquire))
      return false;
    value = ring_[tail];
    tail_.store(next(tail), boost::memory_order_release);
    return true;
  }
private:
  size_t next(size_t current)
  {
    return (current + 1) % Size;
  }
  T ring_[Size];
  boost::atomic<size_t> head_, tail_;
};

  调用:

 

ringbuffer<int, 32> r;

// try to insert an element
if (r.push(42)) { /* succeeded */ }
else { /* buffer full */ }

// try to retrieve an element
int value;
if (r.pop(value)) { /* succeeded */ }
else { /* buffer empty */ }

 

如果不需要无锁,只需要一个队列,还可以使用boost线程的API:circular_buffer

http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html

 

 

 

误解:用volatile实现lock-free

之前网上不少关于用volatile来实现无锁的例子,实际上是错误的
volatile无法实现线程安全,即使是一读一写。volatile设计的目的就不是为线程安全考虑的,在特殊的硬件和编译器上volatile才有数据同步的作用

著名C++专家Herb Sutters对无锁问题的相关论述:

原文:http://www.drdobbs.com/parallel/volatile-vs-volatile/212701484

Volatile is orthogonal to what you use to implement atomics. In C++ it tells the compiler that certain it is not safe to perform optimizations with that variable. Herb Sutters lays it out: 

To safely write lock-free code that communicates between threads without using locks, prefer to use ordered atomic variables: Java/.NET volatile, C++0x atomic, and C-compatible atomic_T.

 

To safely communicate with special hardware or other memory that has unusual semantics, use unoptimizable variables: ISO C/C++ volatile. Remember that reads and writes of these variables are not necessarily atomic, however.

 

 Finally, to express a variable that both has unusual semantics and has any or all of the atomicity and/or ordering guarantees needed for lock-free coding, only the ISO C++0x draft Standard provides a direct way to spell it: volatile atomic.

 

 

下面这个例子是用volatile实现无锁的例子,这个仅限于特定硬件!如果要实现无锁,还不如参考linux kernel中的fifo,它连volatile都没用。

CSDN上某博主的文章:http://blog.csdn.net/chenjiayi_yun/article/details/8945841 

限制一个线程读,一个线程写,不加锁的队列,使用单链表实现,测试环境:centos 5.9 

#include <iostream>
#include <pthread.h>


template<class QElmType>
struct qnode
{
    struct qnode *next;
    QElmType data;
};
template<class QElmType>
class queue
{
public:
        queue() {init();}
        ~queue() {destroy();}

        bool init()
        {
                m_front = m_rear = new qnode<QElmType>;
                if (!m_front)
                        return false;
                m_front->next = 0;
                return true;
        }
        void destroy()
        {
                while (m_front)
                {
                        m_rear = m_front->next;
                        delete m_front;
                        m_front = m_rear;
                }
        }
        bool push(QElmType e)
        {
                struct qnode<QElmType> *p = new qnode<QElmType>;
                if (!p)
                        return false;
                p->next = 0;
                m_rear->next = p;
                m_rear->data = e;
                m_rear = p;
                return true;
        }
        bool pop(QElmType *e)
        {
                if (m_front == m_rear)
                        return false;


                struct qnode<QElmType> *p = m_front;
                *e = p->data;
                m_front = p->next;
                delete p;
                return true;
        }
private:
  struct qnode<QElmType> * volatile m_front, * volatile m_rear;
};


queue<int> g_q;


void *thread1(void * l)
{
        int i = 0;
        while (1)
        {
                g_q.push(i);
                i++;
                usleep(::rand()%1000);
        }
        return 0;
}
void *thread2(void * l)
{
        int i;
        while (1)
        {
                if (g_q.pop(&i))
                        std::cout << i << std::endl;
                //else
                        //std::cout << "can not pop" << std::endl;
                usleep(::rand()%1000);
        }
        return 0;
}


int main(int argc, char* argv[])
{
        pthread_t t1,t2;
        pthread_create(&t1, 0, thread1, 0);
        pthread_create(&t2, 0, thread2, 0);
        char ch;
        while (1)
        {
                std::cin >> ch;
                if (ch == 'q')
                        break;
        }
       return 0;
}

 

代码比较简单,只实现2个线程间的无锁。

这个无锁队列主要是使用两个volatile 的指针来判断是否还有任务( volatile m_front,  volatile m_rear)。

只能实现两个线程间的无锁队列,一个是工作者线程一个是提供任务线程,因为不能让两个或以上的线程来修改指针m_front 和指针m_rear,否者会出现问题。

这个无锁队列的实现是基于m_front指针的修改是由一个线程来完成的,m_rear的修改是由另一个线程来完成的,不会出现同时两个线程修改同一个指针

 

 

分享到:
评论

相关推荐

    Linux内核中的无锁队列 - kfifo

    1. **只支持一个读者和一个写者并发操作**:这意味着在多线程环境下,`kfifo`只能被一个生产者线程和一个消费者线程使用,超过这个限制则需要引入额外的同步机制。 2. **无阻塞的读写操作**:当队列满时,写操作会...

    readerwriterqueue:用于C ++的快速单生产者,单消费者无锁队列

    适用于C ++的单一生产者,单一消费者的无锁队列这个小型存储库具有我自己的C ++无锁队列(我从头开始设计)的实现。 它仅支持两线程用例(一个消费,一个生产)。 线程无法切换角色,尽管您可以根据需要在单个线程中...

    无锁化编程

    - **实例**:2011年,Kogan和Petrank在以色列理工大学提出了一个多生产者-多消费者的并发访问wait-free队列,虽然性能不如lock-free队列,但是能够控制每个操作的最坏响应时间。 #### Lock-free (非锁定) - **定义...

    C++消息队列

    这个测试程序可能包括生产者线程向队列中添加消息,消费者线程从队列中取出并处理消息的场景,以及各种边界条件和错误处理的测试用例。 在实际项目中,C++消息队列的用途广泛,例如,它可以用于进程间的通信,使...

    生产者消费者_操作系统课程设计

    7. **课程设计实践**:对于操作系统课程设计,学生通常会用编程语言如C++或Java实现生产者消费者模型,通过模拟生产过程和消费过程,演示如何使用上述同步机制。设计中可能会包括错误处理、超时机制、以及性能分析等...

    bcb 多线程示例 MutilThread(生产者与消费者)

    在本示例"bcb 多线程示例 MutilThread(生产者与消费者)"中,我们将探讨 Borland C++ Builder (bcb) 平台上的线程应用,特别是生产者-消费者问题的解决方案。 生产者-消费者问题是多线程编程中的一个经典案例,它...

    消息队列,消息队列的使用场景,C,C++源码.zip

    5. **C/C++实现消息队列** - 在C和C++中,可以使用标准库如POSIX Message Queues (mq_overview(7)) 或自定义实现来创建消息队列。POSIX消息队列提供了跨进程通信的能力,支持消息的有序发送和接收。 - 对于分布式...

    consume_and_productor.rar_consume_生产者 消费者

    - **多级缓冲**:当单个缓冲区无法满足需求时,可以使用多个缓冲区,形成更复杂的生产者-消费者模型,提高系统效率。 - **优先级队列**:生产者可以将不同优先级的数据放入不同的队列,消费者根据优先级顺序消费。...

    queue_atomic:使用C ++ 11原子的多生产者多消费者队列模板

    queue_atomic 使用C ++ 11原子的多生产者多消费者队列模板。... 在单个生产者单个消费者的情况下,queue_atomic是完全无锁的queue_atomic可以在多生产者多消费者模式下使用,但是当有竞争时它将旋转调用s

    kafka demo ,两种线程消费方式

    Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责发布消息到主题,而消费者则订阅这些主题并消费消息。消费者通过消费者组(Consumer Group)进行...

    开源消息队列lockedqueue.h、msgqueue、rte-ring

    它采用环形缓冲区的设计,允许生产者和消费者同时操作而无需同步,提高了性能。在高并发场景下,rte_ring的表现优于传统的消息队列,因为它避免了锁竞争带来的开销。 在实时互动软件/插件中,这三种消息队列可能...

    课程“ Linux Kernel Internals”的互补并发程序-C/C++开发

    课程“ Linux Kernel Internals”项目的补充程序项目清单tpool:一个轻量级的线程池。 tinync:使用co互补程序...spmc:并发的单个生产者/多个消费者队列。 map-reduce:使用MapReduce进行单词计数。 许可证以上项目是

    课堂笔记_c++应用1

    这是一个多线程同步的经典问题,涉及两个线程——生产者和消费者,它们共享一个固定大小的缓冲区。生产者生成数据并放入缓冲区,而消费者则从缓冲区取出数据消费。为保证正确性,需要使用同步机制,如互斥锁和条件...

    安全使用队列

    在多线程环境中,队列可以作为线程间的通信机制,例如生产者-消费者模型,其中一个线程添加元素到队列(生产),而另一个线程从队列中取出元素(消费)。 "Queue.cpp"很可能是实现线程安全队列的源代码文件,它可能...

    18种设计模式c++版

    设计模式是软件工程中经过长期实践总结出的通用解决方案,它们是解决特定问题、实现特定目标的模板。在C++编程中,设计模式的应用能够提高代码的...结合C++代码和UML图,学习者可以更直观地掌握每种模式的实现方式。

    面试题--北京亚信C++.doc

    - **循环生产/消费**:生产者不断生产数据放入队列,消费者不断从队列中取出数据进行处理。 #### 6. 多线程中最注意什么 - **解析**:多线程编程需要注意的关键点包括但不限于: - **线程安全**:确保共享资源的...

    Message Queue

    在“MessageQueue_Five_version”这个项目中,我们可以推测作者可能使用了其中的一种或多种实现,通过编程接口来创建消息队列,然后在生产者和消费者之间进行消息传递。 接下来,我们讨论Google的语音识别API。...

    简介kafka简介

    5. **消费者组(Consumer Groups)**:消费者组是消费者实例的集合,用于实现负载均衡和容错。每个分区只能被一个消费者组内的一个消费者消费,这样可以保证消息的唯一消费。 6. ** broker**:Kafka集群由多个...

    pstat:pstat-并行统计工具

    pstat利用生产者-消费者并发模式。 如果安装了英特尔库,则pstat可以利用其无锁数据结构。 否则,它将使用更简单的可锁定标准容器,但会降低性能。 特征 快速燃烧 只能使用标准C ++库进行构建-无需外部依赖项 充分...

    apache-activemq-5.15.3-bin.zip

    消息中间件通常使用消息队列模型,其中生产者发送消息到队列,消费者从队列中接收消息,两者并不直接交互。 **2. Apache ActiveMQ特性** - **高可用性**:ActiveMQ支持多种集群配置,如网络节点间的镜像,确保即使...

Global site tag (gtag.js) - Google Analytics