原文作者:@玄冬Wong
key word:Non-blocking、Blocking、multi-productor、multi-customer、benchmark、performance
/************************************************************************/ /* 测试多个生产者多个消费者线程环境下,boost::lockfree::queue和std::mutex的性能 */ /************************************************************************/ #define _ENABLE_ATOMIC_ALIGNMENT_FIX #include <windows.h> #include <iostream> #include <time.h> #include <thread> #include <list> #include <boost/lockfree/spsc_queue.hpp> #include <boost/lockfree/queue.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> #include <boost/circular_buffer.hpp> #include <mutex> #define LOOP_COUNT 5000000 #define QUEUE_CAPACITY 65534 #define THREAD_COUNT 4 std::mutex m; std::condition_variable cv; boost::circular_buffer<int> cb(QUEUE_CAPACITY); boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>* mul_queue; std::atomic<unsigned int> push_count = 0; std::atomic<unsigned int> pop_count = 0; void blocking_productor() { while (1) { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [] {return cb.size() < QUEUE_CAPACITY; }); cb.push_back(push_count); cv.notify_one(); if (++push_count >= LOOP_COUNT * THREAD_COUNT) { break; } } } void blocking_customer() { while (1) { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [] {return cb.size() > 0; }); cb.pop_front(); cv.notify_one(); if (++pop_count >= LOOP_COUNT * THREAD_COUNT) { break; } } } void nonblocking_productor() { while (1) { if (push_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT) { break; } if (mul_queue->push(push_count)) { ++push_count; //push_count.fetch_add(1, std::memory_order_relaxed); } else { Sleep(1); } } } void nonblocking_customer() { int val; while (1) { if (pop_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT) { break; } if (mul_queue->pop(val)) { ++pop_count; //pop_count.fetch_add(1, std::memory_order_relaxed); } else { Sleep(1); } } } void test_blocking() { std::thread** carray = new std::thread*[THREAD_COUNT]; std::thread** parray = new std::thread*[THREAD_COUNT]; clock_t start = clock(); for (int i = 0; i < THREAD_COUNT; i++) { carray[i] = new std::thread((&blocking_customer)); parray[i] = new std::thread((&blocking_productor)); } for (int i = 0; i < THREAD_COUNT; i++) { carray[i]->join(); parray[i]->join(); } clock_t end = clock(); printf("[test_blocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT, QUEUE_CAPACITY, end - start); printf("push_count:%d pop_count:%d\n", push_count, pop_count); for (int i = 0; i < THREAD_COUNT; i++) { delete carray[i]; delete parray[i]; } delete[] carray; delete[] parray; } void test_nonblocking() { std::thread** carray = new std::thread*[THREAD_COUNT]; std::thread** parray = new std::thread*[THREAD_COUNT]; clock_t start = clock(); for (int i = 0; i < THREAD_COUNT; i++) { carray[i] = new std::thread((&nonblocking_customer)); parray[i] = new std::thread((&nonblocking_productor)); } for (int i = 0; i < THREAD_COUNT; i++) { carray[i]->join(); parray[i]->join(); } clock_t end = clock(); printf("[test_nonblocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT, QUEUE_CAPACITY, end - start); printf("push_count:%d pop_count:%d\n", push_count, pop_count); for (int i = 0; i < THREAD_COUNT; i++) { delete carray[i]; delete parray[i]; } delete[] carray; delete[] parray; } int main(char* args, int size) { mul_queue = new boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>; std::cout << mul_queue->is_lock_free() << std::endl; //为了排除测试程序的无关因素,测试时只开启一个:blocking或者nonblocking。 //test_blocking(); test_nonblocking(); }
输出结果:
生产者和消费者分别4个线程
[test_blocking]
THREAD_COUNT:4
QUEUE_CAPACITY:65534
cost:3598ms
push_count:20000003 pop_count:20000003
[test_nonblocking]
THREAD_COUNT:4
QUEUE_CAPACITY:65534
cost:6350ms
push_count:20000003 pop_count:20000003
生产者和消费者分别8个线程
[test_blocking]
THREAD_COUNT:8
QUEUE_CAPACITY:65534
cost:3620ms
push_count:20000007 pop_count:20000007
[test_nonblocking]
THREAD_COUNT:8
QUEUE_CAPACITY:65534
cost:6466ms
push_count:20000007 pop_count:20000004
生产者和消费者分别16个线程
[test_blocking]
THREAD_COUNT:16
QUEUE_CAPACITY:65534
cost:3590ms
push_count:20000015 pop_count:20000015
[test_nonblocking]
THREAD_COUNT:16
QUEUE_CAPACITY:65534
cost:6136ms
push_count:20000007 pop_count:20000007
结论:多读多写的多线程环境下,std::mutex的速度比boost::lockfree::queue的性能高一倍。queue的容量限制不得超过65535,具体原因没仔细看,可能是boost为了实现lock-free而采用了算法只能支持这么大的容量。
PS:上面的不同线程数量下,累加结果是一样的,是因为每个线程的循环次数做了均分,总循环次数为20000000
测试环境:
windows 10 pro x64
VS2015企业版 update2,release x64
CPU:i7二代移动版
相关推荐
在计算机科学中,生产者-消费者问题是多线程编程中一个经典的同步问题。它涉及到两个主要角色:生产者和消费者。生产者负责生成数据,而消费者则负责消费这些数据。在C++中,我们通常使用互斥量(mutexes)、条件...
生产者消费者问题是多线程编程中的经典问题,它主要探讨如何在多个线程间共享资源并协调它们的工作流程。在这个问题中,生产者线程负责生成数据,而消费者线程负责消耗这些数据。在单线程模拟中,我们通常会用到线程...
生产者-消费者模式是一种典型的多线程同步问题,一个或多个生产者线程生成数据,一个或多个消费者线程消耗这些数据。在此模式中,通常使用队列作为缓冲区,同时利用互斥量和条件变量来同步生产者和消费者的动作。 ...
生产者消费者问题是多线程编程中的一个经典模型,用于演示如何在并发环境中通过共享资源进行协作。在这个模型中,生产者线程负责生成数据,而消费者线程则负责消费这些数据。问题的关键在于如何保证生产者不会在无处...
生产者消费者问题是多线程编程中的一个经典案例,主要探讨如何在并发环境下高效且安全地共享有限资源。在这个问题中,我们通常有两类线程:生产者线程负责生成数据,而消费者线程则负责消费这些数据。为了解决线程间...
生产者消费者问题是多线程编程中的一个经典案例,它展示了如何通过线程间的协作来解决资源的同步和异步操作。在C++中,我们可以利用标准库中的互斥量(mutex)、条件变量(condition_variable)等工具来实现这个问题...
生产者和消费者问题是一个经典的多线程同步问题,源自计算机科学中的并发控制理论。这个问题描述的是两个或多个线程(进程)如何有效地共享有限的资源,例如一个缓冲区。在这个场景中,生产者负责生成数据并放入缓冲...
在实际应用中,生产者-消费者模型可以被扩展到多个生产者和多个消费者,且缓冲区大小可以动态调整。此外,为了防止死锁和饥饿现象,还需要考虑适当的同步策略,如公平性和优先级反转的处理。 总之,"生产者-消费者...
C++11 中的阻塞队列是指在多线程环境下,实现生产者消费者模式的队列。阻塞队列的实现需要解决两个问题:线程安全和阻塞机制。在 C++11 中,我们可以使用 std::mutex、std::condition_variable 和 std::queue 等标准...
生产者与消费者问题是一个经典的多线程同步问题,在计算机科学和软件工程中有着广泛的应用。这个问题的核心在于如何在多个线程之间有效地共享有限的资源,确保数据的一致性和避免竞争条件。在这个例子中,我们将深入...
无锁队列(Lockfree Queue)是一种在多线程编程中用于实现高效并发的数据结构,它的设计目标是提高系统的并行性能,同时避免了锁的使用,从而减少了因线程竞争导致的性能瓶颈。在标题中提到的"lockfree queue"就是指...
如果对性能有较高要求,特别是在频繁插入和删除操作时,`std::vector`的连续内存存储可能会带来一定的优势。 在编程竞赛或小题中,这样的数据结构经常出现,因为它们能够很好地解决一些限制资源的问题,如缓存管理...
生产者消费者模式是一种多线程或并发编程中的经典设计模式,它主要用于解决系统资源的高效利用和同步问题。在C++中实现生产者消费者模式,我们可以利用C++11及更高版本提供的线程库()、互斥量(<mutex>)、条件...
在计算机科学中,生产者-消费者模型是一种经典的并发编程问题,用于解决多个线程之间如何高效、安全地共享资源的问题。在这个模型中,"生产者"线程负责生成数据,而"消费者"线程则负责处理这些数据。C++作为一门支持...
在操作系统设计中,"生产者-消费者"问题是一个经典的多线程同步问题,它涉及到如何在并发环境中有效地共享资源。该问题由计算机科学家Edsger Dijkstra提出,旨在解决两个或多个线程间的协作,一个线程(生产者)生成...
此外,除了`std::queue`外,还有其他实现消息队列的方法,如使用`std::deque`配合自定义的锁机制,或者使用Boost库中的`boost::lockfree::queue`,它提供了无锁的数据结构,适用于高并发场景。 "queue"这个文件名...
生产者消费者问题是一个经典的多线程同步问题,来源于操作系统理论,用于模拟两个或多个相互依赖的进程或线程之间的协作。在这个场景下,“生产者”是生成数据的实体,而“消费者”则负责处理这些数据。这个问题的...
std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); // 等待任务或者线程池停止 if (stop && tasks.empty()) { return; } task = std::move...
操作系统中的生产者-消费者问题是多线程编程中的一个经典示例,主要用来演示同步和互斥的概念。在并发环境中,生产者线程负责生成数据,而消费者线程负责处理这些数据。为了保证数据的正确性,我们需要防止生产者过...
1. `array_lock_free_queue_impl.h` 和 `array_lock_free_queue_single_producer_impl.h`:这是无锁队列的具体实现,分别对应多生产者和单生产者的场景。 2. `array_lock_free_queue.h` 和 `array_lock_free_queue_...