`
aigo
  • 浏览: 2568924 次
  • 性别: Icon_minigender_1
  • 来自: 宜昌
社区版块
存档分类
最新评论

使用无锁环形缓冲(Wait-free ring buffer)提升IO效率

阅读更多

相关文章:

无锁队列

http://aigo.iteye.com/blog/2288131

 

摘自:http://www.oschina.net/code/snippet_54334_12505

代码源于 http://www.ibm.com/developerworks/cn/linux/l-cn-lockfree/ 的实现. 
注意: 构造时参数 buf_size 必须是2的N次方.

 

#ifndef min
#define min(a,b) (((a) < (b)) ? (a) : (b))
#endif

#ifndef max
#define max(a,b) (((a) > (b)) ? (a) : (b))
#endif

// 无锁缓冲队列.
class circular_buffer
{
public:
   circular_buffer(int buf_size)
      : m_buffer_size(buf_size)
      , m_circle_buffer(NULL)
      , m_write_p(0)
      , m_read_p(0)
   {
      m_circle_buffer = new char[m_buffer_size];
   }

   ~circular_buffer()
   {
      if (m_circle_buffer)
         delete[] m_circle_buffer;
      m_circle_buffer = NULL;
   }

   void clear()
   {
      m_write_p = 0;
      m_read_p = 0;
   }

   unsigned int available()
   {
      return m_buffer_size - (m_write_p - m_read_p);
   }

   unsigned int used()
   {
      return m_write_p - m_read_p;
   }

   unsigned int put_data(char* buffer, unsigned int len)
   {
      unsigned int l;
      len = _min(len, m_buffer_size - m_write_p + m_read_p);
      /* first put the data starting from fifo->in to buffer end */
      l = _min(len, m_buffer_size - (m_write_p & (m_buffer_size - 1)));
      memcpy(m_circle_buffer + (m_write_p & (m_buffer_size - 1)), buffer, l);
      /* then put the rest (if any) at the beginning of the buffer */
      memcpy(m_circle_buffer, buffer + l, len - l);
      m_write_p += len;
      return len;
   }

   unsigned int get_data(char* buffer, unsigned int len)
   {
      unsigned int l; 
      len = _min(len, m_write_p - m_read_p); 
      /* first get the data from fifo->out until the end of the buffer */ 
      l = _min(len, m_buffer_size - (m_read_p & (m_buffer_size - 1))); 
      memcpy(buffer, m_circle_buffer + (m_read_p & (m_buffer_size - 1)), l); 
      /* then get the rest (if any) from the beginning of the buffer */ 
      memcpy(buffer + l, m_circle_buffer, len - l); 
      m_read_p += len; 
      return len; 
   }

protected:
   inline unsigned int _max(unsigned int a, unsigned int b)
   {
      return max(a, b);
   }

   inline unsigned int _min(unsigned int a, unsigned int b)
   {
      return min(a, b);
   }

private:
   int m_buffer_size;
   char* m_circle_buffer;
   unsigned int m_write_p;
   unsigned int m_read_p;
};

 

 

摘自:http://blog.csdn.net/xocoder/article/details/7880769

 

最近在重构之前写的网络底层时,从各个方面认真考虑了每一个细节实现。其中,在提交I/O(WSASend/WSARecv)和I/O完成(GetQueuedCompletionStatus)时,难免出现一个缓冲区需要两个线程公用的问题。

 

假设主线程不断发送该消息,这些消息被堆叠在一个缓冲区里,定时使用WSASend提交发送I/O请求,在GetQueuedCompletionStatus返回后,才能按照已发送的字节数去删掉该缓冲区里相应字节数的数据。不明白?好吧我说的简单一些。

WSASend调用后,你传递的参数只是说明:我希望发送这么多数据。但请求提交后,你的要求未必能够被全部满足,也就是说也许你想发送1024字节的东西,但也许GetQueuedCompletionStatus返回,操作完成后,本次只成功发送了1000个字节,也就是说剩余的24字节的数据,你还需要再调用WSASend,直到都发送成功为止。所以在这种情况下,一定要等待GetQueuedCompletionStatus返回,才知道究竟发送成功了多少,也才能从之前的发送缓冲里删掉数据。否则,如果你在提交WSASend时就把数据删掉了,而GetQueuedCompletionStatus返回后却告诉你只发了1000字节,那就杯具了-----那24字节的数据永远地离开了我们。

 

而在这种情况下,我们发送消息,也就是向这个缓冲区后面堆放要发送的数据,是主线程中执行的,而GetQueuedCompletionStatus完成后,从缓冲区内弹出数据,确是IOCP的工作线程做的。当然,最简单的办法就是------加个锁呗。但是,在I/O频繁的情况下,可以想象会出现多少线程争用的情况。于是,就有了本文要说的东西:环形缓冲。

 

环形缓冲的原理并不难理解,只适用于一个线程写,一个线程读的情况。环形缓冲的原理我就不再赘述,可以自行搜索。

 

废话不多说。下面给出我在这次优化中写的一个环形缓冲类,该环形缓冲完美地在IOCP中工作了起来,实实在在地解决了线程争用引发地效率低下。

2012.9.1 0:57 重贴代码
修改了一个可能出现的误置Full标志的BUG,之前的代码中,是先增加写指针,再判断是否等于读指针,等于则置Full标志,但若在该判断之前,读线程将数据读空,此时写线程继续工作,进行该判断时,就会发现写指针 = 读指针(但是是由于读空造成的),于是错误地将状态置为Full。

 

2014.8.4 2:08 重贴代码

重写代码,解决掉xiaolizi提出的可能会引发数据被覆盖的BUG,详细请见回复:(十分感谢您提出这个问题)
XRingBuffer.h 

#pragma once

#include "XBaseDefine.h"

const BYTE XRING_BUFFER_READ_POS_AND_WRITE_POS_SIZE = 2;

class XRingBuffer
{
public:
	XRingBuffer(const DWORD size);
	~XRingBuffer();

	bool pushData(const void* data, const DWORD size);

	bool copyData(void* dest, const DWORD destSize, const DWORD copySize);
	bool popData(void* dest, const DWORD destSize, const DWORD popSize);
	bool popData(const DWORD popSize);
	const DWORD getUsedSize() const;
	const DWORD getFreeSize() const;
private:
	bool copyDataWithAddReadPosOption(void* dest, const DWORD destSize, const DWORD popSize, bool addReadPos);
private:
	char* _buffer;
	const DWORD _size;
	volatile DWORD _write_pos;
	volatile DWORD _read_pos;
};

 XRingBuffer.cpp

#include "XRingBuffer.h"
#include "XDebug.h"

XRingBuffer::XRingBuffer(const DWORD size) : 
_size(size + XRING_BUFFER_READ_POS_AND_WRITE_POS_SIZE),
_write_pos(1),
_read_pos(0),
_buffer(NULL)
{
	_buffer = new char[_size];
}

XRingBuffer::~XRingBuffer()
{
	delete [] _buffer;
}

bool XRingBuffer::pushData(const void* data, const DWORD size)
{
	const DWORD freeSize = getFreeSize();
	if(freeSize < size)
	{
		return false;
	}
	const DWORD readPos =_read_pos;
	const DWORD writePos = _write_pos;
	if(writePos > readPos)
	{
		const DWORD lenFromWritePosToBufferEnd = _size - writePos;
		if(size <= lenFromWritePosToBufferEnd)
		{
			memcpy(_buffer + _write_pos, data, size);
			_write_pos += size;
			if(_write_pos == _size)
			{
				_write_pos = 0;
			}
			else if(_write_pos > _size)
			{
				assert_fail("wirtepos cannot bigger than size");
				return false;
			}
			return true;
		}
		else
		{	// 先拷贝前一部分到缓冲区尾部
			memcpy(_buffer + _write_pos, data, lenFromWritePosToBufferEnd);
			const DWORD secondPartLen = size - lenFromWritePosToBufferEnd;
			// 拷贝后一部分到缓冲区前部
			memcpy(_buffer, ((char*)data) + lenFromWritePosToBufferEnd, secondPartLen);
			_write_pos = secondPartLen;
			return true;
		}
	}
	else if(writePos < readPos)
	{
		memcpy(_buffer + writePos, data, size);
		_write_pos += size;
		return true;
	}
	else
	{
		assert_fail("write pos equal read pos, it's an error");
		return false;
	}
}


bool XRingBuffer::copyData(void* dest, const DWORD destSize, const DWORD copySize)
{
	return copyDataWithAddReadPosOption(dest, destSize, copySize, false);
}

bool XRingBuffer::popData(void* dest, const DWORD destSize, const DWORD popSize)
{
	return copyDataWithAddReadPosOption(dest, destSize, popSize, true);
}

bool XRingBuffer::popData(const DWORD popSize)
{
	return copyDataWithAddReadPosOption(NULL, 0, popSize, true);
}

const DWORD XRingBuffer::getUsedSize() const
{
	const DWORD writePos = _write_pos;
	const DWORD readPos = _read_pos;
	if(writePos > readPos)
	{
		return writePos - readPos - 1;
	}
	else if(writePos < readPos)
	{
		return (_size - readPos - 1) + _write_pos;
	}
	else
	{
		assert_fail("write pos equal read pos, it's an error");
		return 0;
	}
}

const DWORD XRingBuffer::getFreeSize() const
{
	const DWORD usedSize = getUsedSize();
	return _size - (usedSize + XRING_BUFFER_READ_POS_AND_WRITE_POS_SIZE);
}

bool XRingBuffer::copyDataWithAddReadPosOption(void* dest, const DWORD destSize, const DWORD copySize, bool addReadPos)
{
	const DWORD usedSize = getUsedSize();
	if(usedSize < copySize)
	{
		assert_fail("data is not enought to copy");
		return false;
	}
	if(dest != NULL)
	{
		if(destSize < copySize)
		{
			assert_fail("dest buffer size is smaller than copy size");
			return false;
		}	
	}
	const DWORD writePos = _write_pos;
	const DWORD readPos = _read_pos;
	if(writePos > readPos)
	{
		if(dest != NULL)
		{
			memcpy(dest, _buffer + readPos + 1, copySize);
		}
		if(addReadPos)
		{
			_read_pos += copySize;
		}
		return true;
	}
	else if(writePos < readPos)
	{
		const DWORD lenFromReadPosToBufferEnd = _size - readPos - 1;
		if(copySize <= lenFromReadPosToBufferEnd)
		{
			if(dest != NULL)
			{
				memcpy(dest, _buffer + readPos + 1, copySize);
			}
			if(addReadPos)
			{
				_read_pos += copySize;
				assert(_read_pos < _size);
			}
			return true;
		}
		else
		{
			const DWORD secondPartLen = copySize - lenFromReadPosToBufferEnd;
			if(dest != NULL)
			{
				memcpy(dest, _buffer + readPos + 1, lenFromReadPosToBufferEnd);
				memcpy(((char*)dest) + lenFromReadPosToBufferEnd, _buffer, secondPartLen);
			}
			if(addReadPos)
			{
				_read_pos = secondPartLen - 1;
			}
			return true;
		}
	}
	else
	{
		assert_fail("write pos equal read pos, it's an error");
		return false;
	}
}

 

 

 

分享到:
评论

相关推荐

    无锁 环形缓冲区

    `RingBuffer.cpp`和`RingBuffer.h`文件很可能是该无锁环形缓冲区实现的源代码。`RingBuffer.h`可能包含了类定义,定义了环形缓冲区的结构、成员变量以及相关的操作接口,如`push()`(写入)、`pop()`(读取)、`is_...

    Lock-free Queue and Ring Buffer

    无锁队列与环形缓冲区(Lock-free Queue and Ring Buffer)是计算机科学中的关键概念,尤其是在并发编程和多线程环境下。它们被设计用于在高并发场景下提高数据结构的性能,避免了传统锁机制所带来的性能瓶颈。下面...

    xm-ring-buffer.tar.gz_RINGBUFFER_XMRING_buffer_circlebuffer_wayr

    环形缓冲区(Ring Buffer),又...通过研究和使用`xm-ring-buffer`,开发者能够轻松地在自己的项目中集成高效的环形缓冲区,从而优化数据传输和处理的效率。这在实时系统、网络通信、音频视频流处理等领域尤为有用。

    lock-free-wait-free-circularfifo.zip_Free!_可等待fifo_环形FIFO

    - Lock-free和wait-free环形FIFO广泛应用于网络协议栈、多线程计算、实时系统等领域。然而,实现这样的数据结构充满挑战,需要对并发编程有深入理解,避免无谓的复杂性和潜在的死锁风险。 7. **总结**: "lock-...

    c++实现的无锁环形队列

    1. c++实现的无锁环形队列,注释详细,讲解了环形队列的实现原理和操作技巧 2. 在linux服务器下,可以自己编译,运行,也可以修改参数后做测试 3. 编译的命令如下:g++ -std=c++11 -o test main.cpp ring_buffer.cpp...

    ringbuf-可直接访问内部数据的无锁SPSC FIFO环形缓冲区-Rust开发

    概述RingBuffer是最初的结构ringbuf无锁单生产者单消费者(SPSC)FIFO环形缓冲区,可直接访问内部数据。 概述RingBuffer是代表环形缓冲区本身的初始结构。 环形缓冲区可以分为生产者和消费者两对。 生产者和消费者...

    Rt-thead studio软件下使用ringbuffer

    - RingBuffer,也称为环形缓冲区,是一种特殊的缓冲区设计,其特点是当缓冲区满时,新的数据会覆盖旧的数据,而不是导致溢出。这种设计在内存资源有限的嵌入式系统中尤为常见。 - 在RTOS中,RingBuffer常用于任务...

    ringbuffer.zip

    环形缓冲区(Ring Buffer)是一种在嵌入式系统、通信和数据处理等领域常见的数据结构,主要用于存储和传输数据。它的设计灵感来源于计算机内存管理中的页表,通过循环使用一段固定大小的内存来实现高效的数据存取。...

    ringbuffer - 原创-高效率管理

    RingBuffer,也称为循环缓冲区或环形缓冲区,是一种高效的数据结构,广泛应用于各种实时系统和通信协议中。在IT行业中,理解并熟练运用RingBuffer对于开发涉及数据流处理和内存管理的程序至关重要。 RingBuffer的...

    LwRB - Lightweight ring buffer

    轻量级环形缓冲器(Lightweight Ring Buffer,简称 LwRB)是解决这一问题的一种有效工具。由Tilen MAJERLE开发并维护,其版本号为v3.1.0,LwRB旨在提供一种轻量、高效且线程安全的方式来存储和读取数据。 环形缓冲...

    环形缓冲区实现类(RingBuffer)

    环形缓冲区实现类(RingBuffer)

    ring buffer实现原理

    在通信程序中,经常使用环形缓冲区作为数据结构来存放通信中发送和接收的数据。环形缓冲区是一个先进先出的循环缓冲区,可以向通信程序提供对缓冲区的互斥访问。 1、环形缓冲区的实现原理 环形缓冲区通常有一个读...

    ringbuf:无锁环形缓冲区(MPSC)

    无锁环形缓冲区(Lock-Free Ring Buffer)更进一步,通过避免锁的使用来提高并发性能,特别是对于多生产者单消费者(Multiple Producer Single Consumer, MPSC)场景。在C语言环境中,实现这样的数据结构需要对内存...

    环形缓冲区(RingBuffer)源码

    环形缓冲区(RingBuffer),又称为循环缓冲区,是一种高效的数据处理机制,常用于多线程通信、数据采集系统以及内存管理等多个领域。它的设计理念是利用固定大小的内存空间,通过指针的循环移动来实现数据的存取,...

    ring-buffer-design.rar_V2 _buffer_ring_buffer

    `ring-buffer-design.txt`可能是一个文档,详细解释了环形缓冲区的设计原理、实现细节以及在Linux v2.13.6中的具体用法。通过分析这份文档,我们可以更深入地理解无锁环形缓冲区在实际系统中的应用和优势。 总结...

    RingBuffer_环形缓冲区_

    例如,Java中的`java.util.concurrent.LinkedBlockingDeque`可以看作是一个环形缓冲区的实现,C++的Boost库提供了`boost::lockfree::ring_buffer`。在实际项目中,根据需求选择合适的库或自定义实现,以优化数据处理...

    一个封装好的C++环形缓冲区

    环形缓冲区(Circular Buffer),又称循环队列,是一种数据结构,通常用于实现高效的缓存机制。它在一个固定大小的数组中进行操作,当缓冲区满时,新的元素会覆盖最旧的元素,而不是像传统缓冲区那样等待空间释放。...

    语音识别动态分配内存空间--环形buffer

    环形buffer通过其设计巧妙地解决了这个问题:由于写入和读取的位置可以独立计算,且始终指向buffer的两端,因此可以避免同步冲突,实现无锁操作,提高系统性能。 在具体实现中,环形buffer通常包括以下关键组件: 1...

    这是 FPGA 中使用的循环缓冲控制器 Verilog Hardware Circular Buffer Controller

    根据wiki:循环缓冲区、循环队列、循环缓冲区或环形缓冲区是一种使用单个固定大小缓冲区的数据结构,就好像它是端到端连接的一样。这种结构很容易缓冲数据流。 68747470733a2f2f75706c6f61642e77696b696d656469612...

    RingBuffer.rar_RINGBUFFER_环形缓冲_环形缓冲区

    环形缓冲区,环形缓冲区在数据采集等实时系统中广泛使用。

Global site tag (gtag.js) - Google Analytics