`
sipgreen
  • 浏览: 26863 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
最近访客 更多访客>>
社区版块
存档分类
最新评论

BlockingQueue C++实现

    博客分类:
  • C++
 
阅读更多

 

// BlockingQueue.h: interface for the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)
#define AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <windows.h>
#include <LIST>
#include <vector>
#include <STRING>
using namespace std;

class CBlockingQueue
{
public:
	CBlockingQueue(int size);
	virtual ~CBlockingQueue();
private:
	CBlockingQueue();
	HANDLE m_MssageNullEvent;
	HANDLE m_synSignal;
	int m_size;
	list<string> m_msgs;
public:
		string Dequeue();
		void Enqueue(string msg);
};

#endif // !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)

 

 

 

 

// BlockingQueue.cpp: implementation of the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "BlockingQueue.h"
#include <afxcom_.h>

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CBlockingQueue::CBlockingQueue()
{

}

CBlockingQueue::CBlockingQueue(int ssize)
{
	m_size=ssize;
	m_synSignal=CreateSemaphore(NULL,1,1,"m_synSignal");
	m_MssageNullEvent=CreateEvent(NULL,TRUE,TRUE,"m_MssageNullEvent");
}

CBlockingQueue::~CBlockingQueue()
{
	
}
string CBlockingQueue::Dequeue()
{
	string msg="";
	WaitForSingleObject(m_synSignal,INFINITE);
	while(0==m_msgs.size())
	{
		bool breset=ResetEvent(m_MssageNullEvent);
		ReleaseSemaphore(m_synSignal,1,NULL);
		DWORD nret=WaitForSingleObject(m_MssageNullEvent,INFINITE);
		//ASSERT(WAIT_TIMEOUT!=);	
	}
	string temp=*m_msgs.begin();
	m_msgs.pop_front();
	SetEvent(m_MssageNullEvent);
	ReleaseSemaphore(m_synSignal,1,NULL);
	
	return temp;
}

void CBlockingQueue::Enqueue(string msg)
{
	WaitForSingleObject(m_synSignal,INFINITE);
	int a=m_msgs.size();
	while(m_size==m_msgs.size())
	{
		bool bret=	ResetEvent(m_MssageNullEvent);
		ReleaseSemaphore(m_synSignal,1,NULL);
		DWORD nret=	WaitForSingleObject(m_MssageNullEvent,INFINITE);	
	}
	m_msgs.push_back(msg);
	ReleaseSemaphore(m_synSignal,1,NULL);
	if (1==m_msgs.size())
		SetEvent(m_MssageNullEvent);
}

 

 以上不具有普遍性,更合理的设计如下:

 

将  同步内核对象抽象出来 成为 CMonitor

 

// Monitor.h: interface for the CMonitor class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_MONITOR_H__26A4800D_6F3C_41BF_97AC_1D20860517AC__INCLUDED_)
#define AFX_MONITOR_H__26A4800D_6F3C_41BF_97AC_1D20860517AC__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <windows.h>

class CMonitor 
{

	

public:
	CMonitor();
	virtual ~CMonitor();


private:
	HANDLE m_h[2];
	DWORD m_lastThreadId;
public:
	void wait(DWORD timeout);
	bool pulse();
	void Enter();
	void Exit();
	


};

#endif // !defined(AFX_MONITOR_H__26A4800D_6F3C_41BF_97AC_1D20860517AC__INCLUDED_)

 

// Monitor.cpp: implementation of the CMonitor class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "Monitor.h"
#include <exception>

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CMonitor::CMonitor()
{
	m_h[0]=CreateEvent(NULL,TRUE,TRUE,"h1");
	m_h[1]=CreateEvent(NULL,TRUE,TRUE,"h2");
	m_lastThreadId=0;
}

CMonitor::~CMonitor()
{

}

void CMonitor::Enter()
{
	m_lastThreadId=::GetCurrentThreadId();	
	WaitForSingleObject(m_h[0],INFINITE);//ResetEvent();
}

void CMonitor::Exit()
{
	m_lastThreadId=NULL;
	SetEvent(m_h[0]);
	SetEvent(m_h[1]);
}

bool CMonitor::pulse()
{
	if (!(m_lastThreadId!=NULL || m_lastThreadId!=::GetCurrentThreadId()))
	{
		throw exception("The wait could only be excuted by the monitor owner!");
	}
	SetEvent(m_h[1]);

	return true;
}

void CMonitor::wait( DWORD timeout )
{
	if (!(m_lastThreadId!=NULL || m_lastThreadId!=::GetCurrentThreadId()))
	{
		throw exception("The wait could only be excuted by the monitor owner!");
	}
	m_lastThreadId=NULL;
	ResetEvent(m_h[1]);
	WaitForSingleObject(m_h[1],timeout);
	SetEvent(m_h[0]);
}


 

 

那么新的 BlockingQueue代码如下:

 

// BlockingQueue.h: interface for the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)
#define AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <windows.h>
#include <LIST>
#include <vector>
#include <STRING>
#include "Monitor.h"
using namespace std;

class CBlockingQueue
{
public:
	CBlockingQueue(int size);
	virtual ~CBlockingQueue();
private:
	CBlockingQueue();
	int m_size;
	list<string> m_msgs;

	CMonitor m_monitor;
public:
		string Dequeue();
		void Enqueue(string msg);
};

#endif // !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)

 

 

// BlockingQueue.cpp: implementation of the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "BlockingQueue.h"
#include <afxcom_.h>

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CBlockingQueue::CBlockingQueue()
{

}

CBlockingQueue::CBlockingQueue(int ssize)
{
	m_size=ssize;
}

CBlockingQueue::~CBlockingQueue()
{
	
}
string CBlockingQueue::Dequeue()
{
	string msg="";

	m_monitor.Enter();
	{
		while(0==m_msgs.size())
		{
			m_monitor.wait(INFINITE);
		}
		msg=*m_msgs.begin();
		m_msgs.pop_front();
		if ((m_size-1)==m_msgs.size())
			m_monitor.pulse();
	}
	m_monitor.Exit();

	return msg;
}

void CBlockingQueue::Enqueue(string msg)
{
	m_monitor.Enter();
	{
		int a=m_msgs.size();
		while(m_size==m_msgs.size())
		{
			m_monitor.wait(INFINITE);
		}
		m_msgs.push_back(msg);
		if (1==m_msgs.size())
			m_monitor.pulse();
	}
	m_monitor.Exit();
}

 

1
8
分享到:
评论

相关推荐

    C++写的跨平台BlockingQueue

    《C++实现的跨平台BlockingQueue详解》 在软件开发中,线程间的通信和同步是必不可少的部分。Java中的`BlockingQueue`是一个高效且常用的并发工具类,它提供了线程安全的数据结构,允许一个线程放入元素,而另一个...

    高效的实现队列

    它可能包含了上述某一种或多种队列的实现,例如用C++、Java或其他编程语言实现的简单队列、阻塞队列、并发队列等。具体的实现细节需要查看源代码才能得知。 在实际应用中,队列常被用于任务调度、消息传递、网络...

    支持多线程和泛型的阻塞队列

    在C++环境中,`BlockingQueue.h`可能是一个自定义实现的阻塞队列头文件。它可能会包含如下内容: 1. **模板类定义**:定义一个模板类`BlockingQueue&lt;T&gt;`,其中`T`代表队列中元素的类型。 2. **数据成员**:使用`std...

    Linux C++ 使用condition实现阻塞队列的方法

    在这个例子中,我们看到如何在Linux环境下使用C++和POSIX线程库(pthread)中的条件变量(condition variables)来实现阻塞队列。 首先,我们需要包含必要的头文件,并定义一个名为`BlockingQueue`的类。这个类包含了...

    线程的几种控制方式以及线程间的几种通信方式

    Java的`BlockingQueue`接口和Python的`queue`模块提供了现成的实现。 4. **管程(Monitor)**:Java中的`synchronized`关键字和`wait()`, `notify()`, `notifyAll()`方法其实就是一个简单的管程实现,它提供了线程...

    多线程队列

    在Java中,`java.util.concurrent`包下的`BlockingQueue`接口及其实现类如`LinkedBlockingQueue`是很好的选择。 在实际应用中,多线程队列和COM组件的结合能提高系统的并发性能,降低资源消耗,同时简化并发编程的...

    广工操作系统实现报告汇总-(银行家算法、动态内存分配、生产者消费者、进程调度)Java实现

    Java的内存管理主要依赖于垃圾收集机制,但在操作系统层面,我们可能需要实现类似C++的`malloc`和`free`函数,或者理解Java的堆和栈内存结构,以及如何有效地分配和回收内存。 3. **生产者消费者问题**:这是一个...

    实现多线程编程.rar

    本资源"实现多线程编程.rar"显然是针对Java、C语言、C++、C以及JSP开发者的一份珍贵内部资料,它涵盖了多线程编程的核心概念和实践技巧。 1. **多线程定义**:多线程是指一个程序中可以同时执行多个线程(或称为轻...

    一个生产者与消费者的例子.

    生产者与消费者问题是一个经典的多线程同步问题,在计算机科学和软件工程中有着广泛的应用。...在实际应用中,可以根据具体需求选择不同的实现方式,例如Java的`BlockingQueue`或C++的`std::condition_variable`。

    生产者-消费者多线程处理

    在Java中,可以使用`BlockingQueue`接口来实现生产者-消费者模式,它已经内置了线程安全的队列操作。生产者可以使用`offer()`方法添加元素,消费者则用`take()`方法取出元素,这两个方法会自动处理等待和唤醒操作。 ...

    生产者消费者演示程序

    而在C++中,可以使用`std::condition_variable`和`std::mutex`配合自定义的队列来实现。 具体实现时,生产者线程会执行以下步骤: 1. 生产数据。 2. 获取队列的写锁(互斥锁)。 3. 检查队列是否已满。如果已满,...

    生产者消费者多线程代码

    在Java或C++等编程语言中,我们可以利用多线程来实现“生产者消费者”模式,这是一个经典的并发问题。这个模式涉及到两个主要角色:生产者(Producer)和消费者(Consumer),它们共享一个有限大小的缓冲区作为临界...

    多线程编程的入门指导.zip

    在Java中,可以使用`BlockingQueue`实现线程间的通信。 五、死锁、活锁与饥饿 1. 死锁:两个或更多线程相互等待对方释放资源,导致无法继续执行。避免死锁的关键在于遵循资源的有序分配和避免循环等待。 2. 活锁...

    ppt 源码多线程 讲义

    Java中的BlockingQueue,C++中的`std::queue`配合`condition_variable`,Python的`Queue`模块都是实现这一模式的例子。 4. **线程安全**:讲解如何编写线程安全的代码,避免数据不一致和资源竞争。这可能涉及到无锁...

    计算机后端-Java-Java高并发从入门到面试教程-.C组件拓展.zip

    3. **并发容器**:Java并发集合框架包括`ConcurrentHashMap`、`CopyOnWriteArrayList`、`BlockingQueue`等,它们设计时考虑了并发安全,能有效避免并发问题。 4. **Java内存模型**:JMM(Java Memory Model)规定了...

    演示线程消息发送,只是一个简单的实例

    在Java中,可以利用`java.util.concurrent`包下的`BlockingQueue`;在C++中,可以使用Boost库的`message_queue`。 7. **示例代码分析**:压缩包中的文件"演示线程消息发送"很可能包含了具体的操作示例,可能包括...

    多线程编程之一 介绍+例程

    3. **线程通信**:例如,使用Java的`BlockingQueue`进行线程间的数据传递,或者Python的`queue`模块,实现线程间的协作。 4. **线程优先级**:在某些系统中,线程有优先级概念,高优先级线程会先于低优先级线程执行...

    多线程的批量线程同步解决方案

    这个测试可能涵盖各种同步策略,如使用`synchronized`关键字保护共享数据,或者使用`BlockingQueue`来实现线程间的通信和同步。 总结,多线程批量线程同步解决方案涵盖了多种技术,从基本的互斥量到复杂的线程池,...

    线程间互斥2

    8. **线程间的通信(Thread Communication)**:Java 提供了多种方式实现线程间的通信,如 wait(), notify() 和 notifyAll(),以及 BlockingQueue 等高级接口,它们使得线程能够协调执行,避免不必要的等待。...

Global site tag (gtag.js) - Google Analytics