`

最快线程间数据交换算法,有效避免锁竞争 -- TwoQueues

    博客分类:
  • C++
阅读更多
本人CSDN博客地址:http://blog.csdn.net/hzdiy/article/details/8694642
处理多线程数据共享问题注意的几个要点:
1、锁竞争:尽量减少锁竞争的时间和次数。
2、内存:尽量是使用已分配内存,减少内存分配和释放的次数。尽量是用连续内存,减少共享占用的内存量。


多线程数据交换简单方案A:
定义一个list,再所有操作list的地方进行加锁和解锁。
简单模拟代码:

class CSimpleQueue  
{  
public:  
    CSimpleQueue()  
    {  
        InitializeCriticalSection(&m_crit);  
    }  
    ~CSimpleQueue()  
    {  
        DeleteCriticalSection(&m_crit);  
    }  
  
    // 添加数据  
    void AddData(DATA pData)  
    {  
        EnterCriticalSection(&m_crit);  
        m_listDATA.push_back(pData);  
        LeaveCriticalSection(&m_crit);  
    }  
    // 获取数据  
    bool GetData(DATA data)  
    {  
        EnterCriticalSection(&m_crit);  
        if (m_listDATA.size() > 0)  
        {  
            data = m_listDATA.front();  
            m_listDATA.pop_front();  
            LeaveCriticalSection(&m_crit);  
  
            return true;  
        }  
        else  
        {  
            LeaveCriticalSection(&m_crit);  
            return false;  
        }  
    }  
  
private:  
    list<DATA> m_listDATA;  
    CRITICAL_SECTION m_crit;  
};  


多线程数据交换优化方案B -- TwoQueues:
文章最后有TwoQueues的详细实现源代码。

TwoQueues的实现分析:
锁竞争:TwoQueues使用双内存块交换的方式减少锁竞争,数据写入时会进行加锁和解锁操作,但读取数据时,只是当当前读取队列的数据读取完毕时,进行两个队列交换时才进行加锁和解锁操作。可以说,数据的读取是不进行加锁的。这样,最大限度的降低了锁竞争问题。
内存:TwoQueues采用预先分配内存块的方式,初始化TwoQueues时就已经将存放数据的内存分配好了。之后数据存放于已经分配的内存块上。无需再次分配内存,不会再次进行内存分配。
TwoQueues的进一步优化,TwoQueues在存放数据时,完全可以模仿【数据长度+数据】的方式存放数据。但是这种方式会增加数据存在性检测的效率。TwoQueues则采用了一种链表的方式进行存放数据。
链表结构:

typedef struct tagDataHead  
    {  
        tagDataHead()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        void ReSet()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        char* pHead;  
        unsigned int nLen;  
        tagDataHead* pNext;  
    }DATAHEAD;  

此链表存放了数据头指针和数据长度。当TwoQueues中存在数据时,可以直接通过链表节点拿数据。因为链表每次创建时也是需要进行申请内存,而内存申请是一个比较耗效率的事情,TwoQueues再此做了一个小小的处理,当链表头不存在时,会进行ReAlloc调用,一性申请MALLOC_SIZE个链表头结构。并且链表从链表上解下时并不会释放内存而是放入一个pFreeDataHead链表上,当需要申请链表头结构时,会先从pFreeDataHead链表上取链表结构。此处理减少了内存分配和释放的次数,提高了多线程数据共享的效率。
以上的各种优化处理,使TwoQueues的效率得到了极大的提升。
之前做过的内存共享的测试。A线程不停的写入数据,B线程不停的读取数据。
debug版本 release版本
TwoQueues 80-100万次/秒 170-180万次/秒
CSimpleQueues 20万次/秒 5万次/秒

双队列的使用:
创建对象:
CTwoQueues m_cTwoQueues;
// 初始化双队列大小,因为是双队列,所以内存占用量是两份。
m_cTwoQueues.Init(0x0FFFFFFF);

写入方式:
m_cTwoQueues.PushData(sz, strlen(sz)+1);

读取方式:
const void* pData = NULL;

unsigned int nLen = 0;
if (m_cTwoQueues.PrepareData(pData, nLen))
{
// 处理数据  ...
// 确认(丢弃)此数据
m_cTwoQueues.ConfimData();
}

// 下面是TwoQueues的所有源代码  源代码下载
#pragma once  
#include <assert.h>  
  
namespace clwCore  
{  
#define MALLOC_SIZE 128  
  
    typedef struct tagDataHead  
    {  
        tagDataHead()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        void ReSet()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        char* pHead;  
        unsigned int nLen;  
        tagDataHead* pNext;  
    }DATAHEAD;  
  
    typedef struct tagDataMem  
    {  
        tagDataMem(unsigned int nSize)  
        {  
            if (0 == nSize)  
            {  
                assert(false);  
                return ;  
            }  
  
            nMaxSize = nSize;  
            pDataMem = (char*)malloc(nSize*sizeof(char));  
            nDataPos = 0;  
  
            if (NULL == pDataMem)  
            {  
                // CTwoQueues申请malloc内存失败。  
                assert(false);  
            }  
  
            pListDataHead = NULL;  
            pCurDataHead = NULL;  
            pFreeDataHead = NULL;  
        }  
        ~tagDataMem()  
        {  
            // 释放节点  
            ReSet();  
            while(NULL != pFreeDataHead)  
            {  
                DATAHEAD* pTempDataHead = pFreeDataHead;  
                pFreeDataHead = pFreeDataHead->pNext;  
  
                delete pTempDataHead;  
                pTempDataHead = NULL;  
            }  
  
            free(pDataMem);  
            pDataMem = NULL;  
            nDataPos = 0;  
        }  
        bool ReAlloc()  
        {  
            for (unsigned short i=0; i<MALLOC_SIZE; ++i)  
            {  
                DATAHEAD* pTempDataHead = new DATAHEAD;  
                //pTempDataHead->ReSet();    //构造时已经初始化  
  
                if (NULL == pTempDataHead)  
                {  
                    // 申请DATAHEAD内存失败。  
                    assert(false);  
                    return false;  
                }  
  
                pTempDataHead->pNext = pFreeDataHead;  
                pFreeDataHead = pTempDataHead;  
            }  
  
            return true;  
        }  
        DATAHEAD* GetDataHead()  
        {  
            if (NULL != pFreeDataHead)  
            {  
                DATAHEAD* pTempDataHead = pFreeDataHead;  
                pFreeDataHead = pFreeDataHead->pNext;  
  
                return pTempDataHead;  
            }  
  
            if (ReAlloc())  
            {  
                if (NULL != pFreeDataHead)  
                {  
                    DATAHEAD* pTempDataHead = pFreeDataHead;  
                    pFreeDataHead = pFreeDataHead->pNext;  
  
                    return pTempDataHead;  
                }  
            }  
  
            // ASSERT("GetDataHead返回NULL。");  
            assert(false);  
            return NULL;  
        }  
        unsigned int GetFreeLen()   //空闲内存长度  
        {  
            return nMaxSize-nDataPos;  
        }  
        bool PushData(void* pData, unsigned int nLen)  
        {  
            if (nDataPos+nLen >= nMaxSize)  
            {  
                return false;  
            }  
  
            DATAHEAD* pTempDataHead = GetDataHead();  
  
            if (NULL == pTempDataHead)  
            {  
                return false;  
            }  
  
            // 构造数据头结构  
            pTempDataHead->pHead = (pDataMem+nDataPos);  
            pTempDataHead->nLen = nLen;  
            pTempDataHead->pNext = NULL;  
  
            // 拷贝数据  
            memcpy(pDataMem+nDataPos, pData, nLen);  
            nDataPos += nLen;  
  
            if (NULL == pListDataHead)  
            {  
                pListDataHead = pTempDataHead;  
                pCurDataHead = pTempDataHead;  
                return true;  
            }  
            else  
            {  
                pCurDataHead->pNext = pTempDataHead;  
                pCurDataHead = pCurDataHead->pNext;  
                return true;  
            }  
        }  
  
        bool IsEmpty()  //判断是否有数据  
        {  
            return (NULL==pListDataHead)?true:false;  
        }  
  
        bool PrepareData(const void*& pData, unsigned int& nLen)    //准备一条数据  
        {  
            if (NULL != pListDataHead)  
            {  
                pData = pListDataHead->pHead;  
                nLen = pListDataHead->nLen;  
                return true;  
            }  
            else  
            {  
                return false;  
            }  
        }  
        void ConfimData()   //删除一条数据  
        {  
            if (NULL == pListDataHead)  
            {  
                return ;  
            }  
  
            DATAHEAD* pTempDataHead = pListDataHead;  
            pListDataHead = pListDataHead->pNext;  
  
            pTempDataHead->ReSet();  
            pTempDataHead->pNext = pFreeDataHead;  
            pFreeDataHead = pTempDataHead;  
        }  
        void ReSet()    //重置内存存储对象  
        {  
            while(NULL != pListDataHead)  
            {  
                DATAHEAD* pTempDataHead = pListDataHead;  
                pListDataHead = pListDataHead->pNext;  
  
                pTempDataHead->ReSet();  
                pTempDataHead->pNext = pFreeDataHead;  
                pFreeDataHead = pTempDataHead;  
            }  
  
            nDataPos = 0;  
            pCurDataHead = NULL;  
        }  
  
        char* pDataMem;         //数据内存区域  
        unsigned int nDataPos;  //数据存储位置  
        unsigned int nMaxSize;  //最大存储区域大小  
  
        DATAHEAD* pListDataHead;  
        DATAHEAD* pCurDataHead;  
        DATAHEAD* pFreeDataHead;    //空闲头结构队列  
    }DATAMEM;  
  
    class CTwoQueues  
    {  
    public:  
        CTwoQueues(void)  
        {  
            InitializeCriticalSection(&m_crit);  
            m_pDataMemPush = NULL;  
            m_pDataMemPop = NULL;  
        }  
        ~CTwoQueues(void)  
        {  
            if (NULL != m_pDataMemPush)  
            {  
                delete m_pDataMemPush;  
                m_pDataMemPush = NULL;  
            }  
  
            if (NULL != m_pDataMemPop)  
            {  
                delete m_pDataMemPop;  
                m_pDataMemPop = NULL;  
            }  
            DeleteCriticalSection(&m_crit);  
        }  
  
    public:  
        void Init(unsigned int nSize)  
        {  
            if (0 == nSize)  
            {  
                // 初始化CTwoQueues对象失败。  
                assert(false);  
                return ;  
            }  
  
            m_pDataMemPush = new DATAMEM(nSize);  
            m_pDataMemPop = new DATAMEM(nSize);  
        }  
  
        bool PushData(void* pData, unsigned int nLen)  
        {  
            //unsigned int nFreeLen = m_pDataMemPush->GetFreeLen();  
  
            bool bResult = false;  
  
            EnterCriticalSection(&m_crit);  
            bResult = m_pDataMemPush->PushData(pData, nLen);  
            LeaveCriticalSection(&m_crit);  
  
            return bResult;  
        }  
        bool PrepareData(const void*& pData, unsigned int& nLen)  
        {  
            bool bCanRead = true;  
            if (m_pDataMemPop->IsEmpty())  
            {  
                // 队列没有数据了  
                EnterCriticalSection(&m_crit);  
                if (m_pDataMemPush->IsEmpty())  
                {  
                    // Push队列为空  
                    LeaveCriticalSection(&m_crit);  
                    bCanRead = false;  
                }  
                else  
                {  
                    m_pDataMemPop->ReSet();  //充值读取队列  
                    DATAMEM* pTempDataMem = m_pDataMemPush;  
                    m_pDataMemPush = m_pDataMemPop;  
                    m_pDataMemPop = pTempDataMem;  
                    LeaveCriticalSection(&m_crit);  
                    bCanRead = true;  
                }  
            }  
  
            if (bCanRead)  
            {  
                return m_pDataMemPop->PrepareData(pData, nLen);  
            }  
            else  
            {  
                return false;  
            }  
        }  
  
        void ConfimData()  
        {  
            m_pDataMemPop->ConfimData();  
        }  
  
    private:  
        DATAMEM* m_pDataMemPush;  
        DATAMEM* m_pDataMemPop;  
        CRITICAL_SECTION m_crit;  
  
    };  
}  
分享到:
评论

相关推荐

    用Java多线程实现数据结构算法动态演示.pdf

    ### Java多线程实现数据结构算法动态演示知识点 #### 1. 多线程简介 - **程序、进程和线程的区别**: - **程序**是静态的代码序列,它是执行过程的蓝图,本身不执行操作。 - **进程**是程序的动态执行过程,涉及...

    vc++中的线程锁(线程锁保持线程同步)

    在VC++编程环境中,线程同步是一个至关重要的概念,特别是在多线程程序设计中,以确保并发执行的线程能够安全地访问共享资源,避免数据竞争和其他潜在的问题。本篇文章将详细探讨线程锁在VC++中的应用,以及如何通过...

    人工智能-项目实践-多线程-Paxos算法的多线程实现.zip

    - **线程同步**:通过synchronized关键字、Lock接口(如ReentrantLock)等机制防止数据竞争,保证同一时刻只有一个线程访问共享资源。 - **线程通信**:使用wait()、notify()和notifyAll()方法,或者条件变量...

    一种基于类的Java多线程程序数据竞争静态检测算法.pdf

    首先,算法通过构建函数调用图,分析各个线程间的交互,找出可能的数据竞争源。接着,运用静态切片技术缩小程序的分析范围,减少误报的可能性。静态切片是程序分析中的一种技术,它能提取出影响特定变量值的所有语句...

    Disruptor 一种可替代有界队列完成并发线程间数据交换高性能解决方案.docx

    Disruptor 是一个高性能的并发编程框架,由 LMAX 公司开发,旨在解决线程间数据交换的效率问题。它的设计目标是替代传统的有界队列,提供更高的吞吐量和更低的延迟。Disruptor 的核心在于其创新的数据交换机制,包括...

    MFC 多线程之间通过消息传递数据

    本主题将深入探讨如何在MFC的多线程环境中通过消息传递来交换数据。 首先,我们要了解MFC对多线程的支持。MFC提供了一个CWinThread类,它是所有线程的基础。创建新线程时,通常会从CWinThread派生一个新的类,并...

    多线程死锁,活锁,竞争锁问题总结

    ### 多线程死锁、活锁与竞争锁问题总结 #### 一、多线程基础知识简介 在探讨多线程编程中常见的问题之前,我们...通过合理的锁设计、资源管理策略以及适当的并发控制机制,可以有效避免这些多线程编程中常见的陷阱。

    CVI 03.多线程数据保护(线程锁)

    - **使用线程局部存储**:对于每个线程都需要有自己的独立数据副本的情况,使用线程局部存储可以避免线程锁。 在实际应用中,线程锁广泛应用于多线程环境中的共享资源管理,如数据库连接池、缓存系统、并发数据结构...

    线程间通信方式3:消息传递方式

    消息传递可以作为避免数据竞争的一种手段,因为每个线程通过消息队列顺序地接收和处理消息,而不是直接访问共享数据。 - 但需要注意的是,如果消息涉及到共享资源,仍需使用互斥量或临界区来保护这些资源。 6. **...

    改进的有效边表算法--计算机图形学

    在计算机图形学中,有效边表(Active Edge Table, AET)算法是一种广泛用于区域填充的技术,它在二维图形处理中扮演着重要角色。本文将深入探讨改进的有效边表算法及其在VC++6.0环境下的实现。首先,我们要理解什么...

    操作系统线程同步算法

    操作系统中的线程同步是多线程编程中一个关键的概念,它确保了多个线程在访问共享资源时的正确性,防止数据竞争和其他并发问题。在Windows操作系统中,提供了多种线程同步机制,如临界区、事件、信号量以及互斥量等...

    线程间通信的定义及全局变量的方法

    通过上述同步机制,我们可以有效地解决线程间通信时的数据竞争问题,保证线程对全局变量的操作是原子性的,从而确保数据的一致性和程序的正确性。总的来说,线程间通信是多线程编程中的一个重要方面,而同步和互斥...

    多线程导入excel 数据

    - **线程池管理**:使用`ExecutorService`可以更有效地管理线程,避免频繁创建和销毁线程带来的开销。通过设定最大线程数、核心线程数、存活时间等参数,调整线程池的行为。 3. **线程同步锁**: - **同步锁的...

    多线程编程中的数据竞争检测.pptx

    ### 多线程编程中的数据竞争检测 #### 线程安全概念与数据竞争定义 在多线程编程中,**线程安全**是指代码在并发执行时能够正确处理资源共享,确保不会出现数据竞争或其他不可预测的行为。实现线程安全的方法通常...

    C语言实现的FP-growth算法

    FP-growth算法是一种高效的数据挖掘方法,用于找出数据库中频繁出现的项集。在这个场景中,我们关注的是C语言实现的FP-growth算法。C语言以其高效性和灵活性,成为实现这种算法的理想选择,尤其是在处理大数据量时。...

    可并行递归算法的递归多线程实现

    未来的研究方向可能包括进一步优化线程调度策略,减少线程间的通信开销,以及探索更多适合多线程化的递归算法。通过持续的技术创新和优化,我们有理由相信,多线程递归算法将在未来的高性能计算领域发挥更加关键的...

    创建线程及线程间通信

    线程通信是线程协作完成任务的关键,它允许线程之间交换数据或同步操作。一种常见的通信机制是“生产者-消费者”模型,其中生产者线程生成数据,而消费者线程消费这些数据。在没有适当的同步机制下,可能会出现数据...

    JVM性能优化:线程锁优化.docx

    在进行JVM线程锁优化时,关键在于理解并发原理,选择适合的同步机制,并避免不必要的锁竞争,以提高程序的并发性能和整体效率。此外,使用现代的并发工具和设计模式,如`ConcurrentHashMap`和`CompletableFuture`,...

Global site tag (gtag.js) - Google Analytics