`
aigo
  • 浏览: 2644655 次
  • 性别: Icon_minigender_1
  • 来自: 宜昌
社区版块
存档分类
最新评论

多个生产者多个消费者:boost::lockfree::queue和std::mutex的性能对比(benchmark)

阅读更多

原文作者:@玄冬Wong

key word:Non-blocking、Blocking、multi-productor、multi-customer、benchmark、performance

 

/************************************************************************/
/* 测试多个生产者多个消费者线程环境下,boost::lockfree::queue和std::mutex的性能  */
/************************************************************************/

#define _ENABLE_ATOMIC_ALIGNMENT_FIX

#include <windows.h>
#include <iostream>
#include <time.h>  
#include <thread>
#include <list> 

#include <boost/lockfree/spsc_queue.hpp>  
#include <boost/lockfree/queue.hpp>  
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/circular_buffer.hpp>

#include <mutex>


#define LOOP_COUNT 5000000
#define QUEUE_CAPACITY 65534
#define THREAD_COUNT 4

std::mutex m;
std::condition_variable cv;

boost::circular_buffer<int> cb(QUEUE_CAPACITY);

boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>* mul_queue;


std::atomic<unsigned int> push_count = 0;
std::atomic<unsigned int> pop_count = 0;

void blocking_productor()
{
	while (1)
	{
		std::unique_lock<std::mutex> lk(m);
		cv.wait(lk, [] {return cb.size() < QUEUE_CAPACITY; });
		cb.push_back(push_count);
		cv.notify_one();

		if (++push_count >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}
	}
}

void blocking_customer()
{
	while (1)
	{
		std::unique_lock<std::mutex> lk(m);
		cv.wait(lk, [] {return cb.size() > 0; });

		cb.pop_front();
		cv.notify_one();

		if (++pop_count >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}
	}
}

void nonblocking_productor()
{
	while (1)
	{
		if (push_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}

		if (mul_queue->push(push_count))
		{
			++push_count;
			//push_count.fetch_add(1, std::memory_order_relaxed);
		}
		else
		{
			Sleep(1);
		}
	}
}

void nonblocking_customer()
{
	int val;
	while (1)
	{
		if (pop_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}

		if (mul_queue->pop(val))
		{
			++pop_count;
			//pop_count.fetch_add(1, std::memory_order_relaxed);
		}
		else
		{
			Sleep(1); 
		}
	}
}


void test_blocking()
{
	std::thread** carray = new std::thread*[THREAD_COUNT];
	std::thread** parray = new std::thread*[THREAD_COUNT];

	clock_t start = clock();

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i] = new std::thread((&blocking_customer));
		parray[i] = new std::thread((&blocking_productor));
	}

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i]->join();
		parray[i]->join();
	}

	clock_t end = clock();
	printf("[test_blocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT,  QUEUE_CAPACITY, end - start);
	printf("push_count:%d pop_count:%d\n", push_count, pop_count);

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		delete carray[i];
		delete parray[i];
	}
	delete[] carray;
	delete[] parray;
}

void test_nonblocking()
{
	std::thread** carray = new std::thread*[THREAD_COUNT];
	std::thread** parray = new std::thread*[THREAD_COUNT];

	clock_t start = clock();

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i] = new std::thread((&nonblocking_customer));
		parray[i] = new std::thread((&nonblocking_productor));
	}

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i]->join();
		parray[i]->join();
	}

	clock_t end = clock();
	printf("[test_nonblocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT, QUEUE_CAPACITY, end - start);
	printf("push_count:%d pop_count:%d\n", push_count, pop_count);

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		delete carray[i];
		delete parray[i];
	}
	delete[] carray;
	delete[] parray;
}

int main(char* args, int size)
{
	mul_queue = new boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>;
	std::cout << mul_queue->is_lock_free() << std::endl;

	//为了排除测试程序的无关因素,测试时只开启一个:blocking或者nonblocking。
	//test_blocking();
	test_nonblocking();
}

 

输出结果:

生产者和消费者分别4个线程

[test_blocking]

THREAD_COUNT:4

QUEUE_CAPACITY:65534

cost:3598ms

push_count:20000003 pop_count:20000003

 

[test_nonblocking]

THREAD_COUNT:4

QUEUE_CAPACITY:65534

cost:6350ms

push_count:20000003 pop_count:20000003

 

 

生产者和消费者分别8个线程

[test_blocking]

THREAD_COUNT:8

QUEUE_CAPACITY:65534

cost:3620ms

push_count:20000007 pop_count:20000007

 

[test_nonblocking]

THREAD_COUNT:8

QUEUE_CAPACITY:65534

cost:6466ms

push_count:20000007 pop_count:20000004

 

 

生产者和消费者分别16个线程

[test_blocking]

THREAD_COUNT:16

QUEUE_CAPACITY:65534

cost:3590ms

push_count:20000015 pop_count:20000015

 

[test_nonblocking]

THREAD_COUNT:16

QUEUE_CAPACITY:65534

cost:6136ms

push_count:20000007 pop_count:20000007

 

结论:多读多写的多线程环境下,std::mutex的速度比boost::lockfree::queue的性能高一倍。queue的容量限制不得超过65535,具体原因没仔细看,可能是boost为了实现lock-free而采用了算法只能支持这么大的容量。

 PS:上面的不同线程数量下,累加结果是一样的,是因为每个线程的循环次数做了均分,总循环次数为20000000

 

 测试环境:

windows 10 pro x64

VS2015企业版 update2,release x64

CPU:i7二代移动版

 

分享到:
评论

相关推荐

    C++ 用多方程解决生产者与消费者的问题

    在计算机科学中,生产者-消费者问题是多线程编程中一个经典的同步问题。它涉及到两个主要角色:生产者和消费者。生产者负责生成数据,而消费者则负责消费这些数据。在C++中,我们通常使用互斥量(mutexes)、条件...

    ProducerConsumer(生产者消费者问题的单线程模拟)

    生产者消费者问题是多线程编程中的经典问题,它主要探讨如何在多个线程间共享资源并协调它们的工作流程。在这个问题中,生产者线程负责生成数据,而消费者线程负责消耗这些数据。在单线程模拟中,我们通常会用到线程...

    C++ 多线程通信方式简介并结合生产者-消费者模式代码实现

    生产者-消费者模式是一种典型的多线程同步问题,一个或多个生产者线程生成数据,一个或多个消费者线程消耗这些数据。在此模式中,通常使用队列作为缓冲区,同时利用互斥量和条件变量来同步生产者和消费者的动作。 ...

    线程实现生产者消费者问题

    生产者消费者问题是多线程编程中的一个经典模型,用于演示如何在并发环境中通过共享资源进行协作。在这个模型中,生产者线程负责生成数据,而消费者线程则负责消费这些数据。问题的关键在于如何保证生产者不会在无处...

    生产者消费者同步问题

    生产者消费者问题是多线程编程中的一个经典案例,主要探讨如何在并发环境下高效且安全地共享有限资源。在这个问题中,我们通常有两类线程:生产者线程负责生成数据,而消费者线程则负责消费这些数据。为了解决线程间...

    生产者消费者问题c++实现

    生产者消费者问题是多线程编程中的一个经典案例,它展示了如何通过线程间的协作来解决资源的同步和异步操作。在C++中,我们可以利用标准库中的互斥量(mutex)、条件变量(condition_variable)等工具来实现这个问题...

    生产者和消费者算法 动态库

    生产者和消费者问题是一个经典的多线程同步问题,源自计算机科学中的并发控制理论。这个问题描述的是两个或多个线程(进程)如何有效地共享有限的资源,例如一个缓冲区。在这个场景中,生产者负责生成数据并放入缓冲...

    生产者-消费者多线程处理

    在实际应用中,生产者-消费者模型可以被扩展到多个生产者和多个消费者,且缓冲区大小可以动态调整。此外,为了防止死锁和饥饿现象,还需要考虑适当的同步策略,如公平性和优先级反转的处理。 总之,"生产者-消费者...

    c++11 实现的阻塞队列

    C++11 中的阻塞队列是指在多线程环境下,实现生产者消费者模式的队列。阻塞队列的实现需要解决两个问题:线程安全和阻塞机制。在 C++11 中,我们可以使用 std::mutex、std::condition_variable 和 std::queue 等标准...

    一个生产者与消费者的例子.

    生产者与消费者问题是一个经典的多线程同步问题,在计算机科学和软件工程中有着广泛的应用。这个问题的核心在于如何在多个线程之间有效地共享有限的资源,确保数据的一致性和避免竞争条件。在这个例子中,我们将深入...

    lockfree queue

    无锁队列(Lockfree Queue)是一种在多线程编程中用于实现高效并发的数据结构,它的设计目标是提高系统的并行性能,同时避免了锁的使用,从而减少了因线程竞争导致的性能瓶颈。在标题中提到的"lockfree queue"就是指...

    C++ Queue(带上限的)

    如果对性能有较高要求,特别是在频繁插入和删除操作时,`std::vector`的连续内存存储可能会带来一定的优势。 在编程竞赛或小题中,这样的数据结构经常出现,因为它们能够很好地解决一些限制资源的问题,如缓存管理...

    生产者 消费者 模式 c++

    生产者消费者模式是一种多线程或并发编程中的经典设计模式,它主要用于解决系统资源的高效利用和同步问题。在C++中实现生产者消费者模式,我们可以利用C++11及更高版本提供的线程库()、互斥量(&lt;mutex&gt;)、条件...

    C++实现生产与消费者模型

    在计算机科学中,生产者-消费者模型是一种经典的并发编程问题,用于解决多个线程之间如何高效、安全地共享资源的问题。在这个模型中,"生产者"线程负责生成数据,而"消费者"线程则负责处理这些数据。C++作为一门支持...

    Producer_Consumer_synchronization.rar_synchronization_生产者消费者_生产者

    在操作系统设计中,"生产者-消费者"问题是一个经典的多线程同步问题,它涉及到如何在并发环境中有效地共享资源。该问题由计算机科学家Edsger Dijkstra提出,旨在解决两个或多个线程间的协作,一个线程(生产者)生成...

    c++消息队列queue.rar

    此外,除了`std::queue`外,还有其他实现消息队列的方法,如使用`std::deque`配合自定义的锁机制,或者使用Boost库中的`boost::lockfree::queue`,它提供了无锁的数据结构,适用于高并发场景。 "queue"这个文件名...

    生产者消费者演示程序

    生产者消费者问题是一个经典的多线程同步问题,来源于操作系统理论,用于模拟两个或多个相互依赖的进程或线程之间的协作。在这个场景下,“生产者”是生成数据的实体,而“消费者”则负责处理这些数据。这个问题的...

    C++多线程封装成类使用示例

    std::unique_lock&lt;std::mutex&gt; lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); // 等待任务或者线程池停止 if (stop && tasks.empty()) { return; } task = std::move...

    操作系统的生产消费问题

    操作系统中的生产者-消费者问题是多线程编程中的一个经典示例,主要用来演示同步和互斥的概念。在并发环境中,生产者线程负责生成数据,而消费者线程负责处理这些数据。为了保证数据的正确性,我们需要防止生产者过...

    lock-free circular array queue.

    1. `array_lock_free_queue_impl.h` 和 `array_lock_free_queue_single_producer_impl.h`:这是无锁队列的具体实现,分别对应多生产者和单生产者的场景。 2. `array_lock_free_queue.h` 和 `array_lock_free_queue_...

Global site tag (gtag.js) - Google Analytics