`

ACE线程安全与同步(Thread Safety and Synchronization)

    博客分类:
  • ACE
阅读更多

保护原语
为了确保一致性,多线程程序必须在共享数据周围使用保护原语。
Protection Primitives in ACE

Primitive

Description

ACE_Mutex

Wrapper class around the mutual-exclusion mechanism and used to provide a simple, efficient mechanism to serialize access to a shared resource. Similar in functionality to a binary sempahore. Can be used for mutual exclusion among both threads and processes.

ACE_Thread_Mutex

Can be used in place of ACE_Mutex and is specific for synchronization of threads.

ACE_Recursive_Thread_Mutex

Mutex that is recursive, that is, can be acquired multiple times by the same thread. Note that to work correctly, it must be released as many times as it was acquired.

ACE_RW_Mutex

Wrapper class that encapsulates readers/writer locks. These locks are acquired differently for reading and writing, thus enabling multiple readers to read while no one is writing. These locks are nonrecursive.

ACE_RW_Thread_Mutex

Can be used in place of ACE_RW_Mutex and is specific for synchronization of threads.

ACE_Token

Tokens are the richest and heaviest locking primitive available in ACE. The locks are recursive and allow for read and write locking. Also, strict FIFO ordering of acquisition is enforced; all threads that call acquire() enter a FIFO queue and acquire the lock in order.

ACE_Atomic_Op

Template wrapper that allows for safe arithmetic operations on the specified type.

ACE_Guard

Template-based guard class that takes the lock type as a template parameter.

ACE_Read_Guard

Guard that uses acquire_read() on a read/write guard during construction.

ACE_Write_Guard

Guard that uses acquire_write() on a read/write guard during construction.

递归互斥体(Recursive Mutexes)
如果某个线程两次获取同一个互斥体,线程会阻塞锁死自己(大多数情况)。
你可以使用递归互斥体来(ACE_Recursive_Thread_Mutex)避免这种情况,它允许同一个线程多次获取互斥体,而不会阻塞自己。
Readers/Writer 锁(Readers/Writer Locks)
Readers/Writer 锁(ACE_RW_Mutex)允许多线程同时持有一个锁进行读取,但只有一个线程能够持有这个锁进行写入。
只在读取竞争大大超过写入竞争时使用它,因为Readers/Writer 锁(ACE_RW_Mutex)都比互斥体慢。
原子运算包装(
Atomic Operation Wrapper)
如果机器内存是强序(strongly ordered)的,对一个处理器上的内存所做的修改就会立刻被其它处理器看到,就不需要对全局变量进行同步。
否则,你需要使用同步。
用一个
classic producer/consumer problem using busy waiting演示:
#include <ace/Atomic_Op_T.h>
#include <ace/Thread_Mutex.h>
#include <ace/Log_Msg.h>
#include <ace/Task.h>

typedef ACE_Atomic_Op<ACE_Thread_Mutex, unsigned int> SafeUInt;
typedef ACE_Atomic_Op<ACE_Thread_Mutex, int> SafeInt;

static const int Q_SIZE = 100;            //缓冲区大小
static const int MAX_PROD = 1000;        //最大生产数

class Consumer : public ACE_Task_Base
{
private:
	int* buf_;
	SafeUInt& in_;
	SafeUInt& out_;
public:
	Consumer(int *buf, SafeUInt &in, SafeUInt& out)
		: buf_(buf), in_(in), out_(out)
	{ }

	int svc(void)
	{
		while (1)
		{
			int item;

			// Busy wait.
			do
			{ }
			while (in_.value() - out_.value() == 0);

			item = buf_[out_.value() % Q_SIZE];
			out_++;

			ACE_DEBUG((LM_DEBUG, 
				ACE_TEXT("Consumed %d\n"),
				item));

			if (check_termination(item))
				break;
		}

		return 0;
	}

	int check_termination(int item)
	{
		return (item == MAX_PROD);
	}
};

class Producer : public ACE_Task_Base
{
private:
	int* buf_;
	SafeUInt& in_;
	SafeUInt& out_;
public:
	Producer(int* buf, SafeUInt& in, SafeUInt& out)
		: buf_(buf), in_(in), out_(out)
	{ }

	int svc(void)
	{
		SafeInt itemNo = 0;
		while (1)
		{
			// Busy wait.
			do
			{ }
			while(in_.value() - out_.value() == Q_SIZE);

			itemNo++;
			buf_[in_.value() % Q_SIZE] = itemNo.value();
			in_++;

			ACE_DEBUG((LM_DEBUG, 
				ACE_TEXT("Produced %d \n"),
				itemNo.value()));

			if (check_termination(itemNo.value()))
				break;
		}

		return 0;
	}

	int check_termination(int item)
	{
		return (item == MAX_PROD);
	}
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
	int shared_buf[Q_SIZE];
	SafeUInt in = 0;
	SafeUInt out = 0;

	Producer producer(shared_buf, in, out);
	Consumer consumer(shared_buf, in, out);

	producer.activate();
	consumer.activate();
	producer.wait();
	consumer.wait();

	return 0;
}
 
令牌管理(Token Management)
是一个框架解决方案,有死锁检测特性。
线程同步(Thread Synchronization)
同步是这样一个过程,通过它你可以控制多个线程的执行次序,从而完成某项任务。
ACE Synchronization Primitives

Primitive

Description

ACE_Condition

A condition variable; allows signaling other threads to indicate event occurrence

ACE_Semaphore

A counting semaphore; can be used as a signaling mechanism and also for synchronization purposes

ACE_Barrier

Blocks all threads of execution until they all reach the barrier line, after which all threads continue

ACE_Event

A simple synchronization object that is used to signal events to other threads

使用信号量(Semaphores)
信号量是非负整型计数,用于协调对多个资源的访问。
获取信号量,计数就会减小;
释放信号量,计数就会增大;

计数到达0----不再有资源----试图获取该信号量的线程就会阻塞,直到信号量计数变得大于0位置。
把信号量计数初始化为某个非负值,表示你拥有的资源数量。
与互斥体的区别:
互斥体假定获取它的线程也将是释放它的线程;
信号量相反,一个线程获取由另一个线程释放。
////操作系统课程上的例题(生产者和消费者PV操作):
#include <ace/Log_Msg.h>
#include <ace/Task.h>
#include <ace/Semaphore.h>
#include <ace/Message_Block.h>

class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
private:
	ACE_Semaphore& psema_;
	ACE_Semaphore& csema_;
	int exit_condition_;

public:
	enum { N_THREADS = 5 };

	Consumer(ACE_Semaphore& psema, ACE_Semaphore& csema)
		: psema_(psema), csema_(csema), exit_condition_(0)
	{ }

	int svc(void)
	{
		while(!is_closed())
			consume_item();
		return 0;
	}

	void consume_item()
	{
		csema_.acquire();
		if (!is_closed())
		{
			ACE_Message_Block *mb;
			this->getq(mb);
			if (mb->msg_type() == ACE_Message_Block::MB_HANGUP)
			{
				//挂断消息
				shutdown();
				mb->release();
				return;
			}
			else
			{
				ACE_DEBUG((LM_DEBUG,
					ACE_TEXT("(%t) Consumed %d\n"),
					*((int*)mb->rd_ptr())));
				mb->release();
			}
			//psema_值增一,允许生产者继续执行
			psema_.release();
		}
	}

	void shutdown(void)
	{
		exit_condition_ = 1;
		//唤醒消息队列上的所有线程
		this->msg_queue()->deactivate();
		//释放在信号量上等待的所有线程
		csema_.release(N_THREADS);
	}

	int is_closed(void)
	{
		return exit_condition_;
	}
};
class Producer : public ACE_Task_Base
{
public:
	enum { MAX_PROD = 128 };

	Producer(ACE_Semaphore& psema, ACE_Semaphore& csema,
		Consumer &consumer)
		: psema_(psema), csema_(csema), consumer_(consumer)
	{ }

	int svc(void)
	{
		for (int i = 0; i <= MAX_PROD; i++)
			produce_item(i);
		hang_up();
		return 0;
	}

	void produce_item(int item)
	{
		psema_.acquire();
		ACE_Message_Block *mb
			= new ACE_Message_Block(sizeof(int),
			ACE_Message_Block::MB_DATA);
		ACE_OS::memcpy(mb->wr_ptr(), &item, sizeof item);
		mb->wr_ptr(sizeof(int));
		this->consumer_.putq(mb);

		ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) Produced %d\n"), item));
		//生产出一个新元素,csema_值增一
		csema_.release();
	}

	void hang_up()
	{
		psema_.acquire();
		ACE_Message_Block *mb =
			new ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP);
		this->consumer_.putq(mb);
		csema_.release();
	}

private:
	ACE_Semaphore& psema_;
	ACE_Semaphore& csema_;
	Consumer& consumer_;
};
int ACE_TMAIN(int, ACE_TCHAR *[])
{
	ACE_Semaphore psem(5);        //队列中能容纳的最大数目为5
	ACE_Semaphore csem(0);        //在csem的release()之前,消费者完全不能进行消费

	Consumer consumer(psem, csem);
	Producer producer(psem, csem, consumer);

	producer.activate();
	//多个消费者线程
	consumer.activate(THR_NEW_LWP | THR_JOINABLE,
		Consumer::N_THREADS);

	producer.wait();
	consumer.wait();

	return 0;
}
 
使用栅栏(Barriers)
每个线程在到达某种周知的状态时调用栅栏的wait(),阻塞起来,等待其它所有参与线程调用wait()表明它们也到达了该状态。
一旦所有线程都到达栅栏,它们就会被解除阻塞,并一起继续执行。
使用ACE_Barrier,当然你得传入同步线程的数目。
///让所有线程的启动和关闭时间同步
#include <ace/OS.h>
#include <ace/Log_Msg.h>
#include <ace/Task.h>
#include <ace/Barrier.h>

class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH>
{
private:
	ACE_Barrier& startup_barrier_;
	ACE_Barrier& shutdown_barrier_;
public:
	enum { N_THREADS = 5 };

	HA_CommandHandler(ACE_Barrier& startup_barrier, ACE_Barrier& shutdown_barrier)
		: startup_barrier_(startup_barrier),
		shutdown_barrier_(shutdown_barrier)
	{ }

	//随机初始化时间
	void initialize_handler(void)
	{
		ACE_OS::sleep(ACE_OS::rand() % 10);
	}

	//随机执行时间
	int handle_command_requests(void)
	{
		ACE_OS::sleep(ACE_OS::rand() % 10);
		return -1;
	}

	int svc(void)
	{
		initialize_handler();
		startup_barrier_.wait();
		ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t: %D) Started\n")));

		while(handle_command_requests() > 0)
			;

		shutdown_barrier_.wait();
		ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t: %D) Ended\n")));

		return 0;
	}

};
int ACE_TMAIN(int, ACE_TCHAR *[])
{
	ACE_Barrier startup_barrier(HA_CommandHandler::N_THREADS);
	ACE_Barrier shutdown_barrier(HA_CommandHandler::N_THREADS);

	HA_CommandHandler handler(startup_barrier, shutdown_barrier);
	handler.activate(THR_NEW_LWP | THR_JOINABLE,
		HA_CommandHandler::N_THREADS);
	handler.wait();
	return 0;
}
 
线程专有存储(Thread-Specific Storage)
当你创建线程时,你所创建的全部东西是:一个线程栈、一个信号掩码和一个任务控制块。
线程专有存储(
thread-specific storage ,TSS):能够存储专有于某个线程的状态信息。
使用
ACE_TSS模板类,你只需把你想要存储在TSS中的数据作为模板参数传递给它,然后用operator->()方法在你需要时对数据进行访问。operator->()会在首次调用时在TSS中创建并存储数据,ACE_TSS的析构器会确保TSS数据被适当移除和销毁。
class ClientContext
{
public:
	void *get_attribute(const char *name);
	void set_attribute(const char *name, void *value);

private:
	Map attributeMap_;
};
class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH>
{
private:
	ACE_TSS<ClientContext> tss_ctx_;
public:
	virtual int svc(void)
	{
		//...
		this->tss_ctx_->get_attribute("attribute1");

		return 0;
	}

};
 

分享到:
评论

相关推荐

    ThreadSynchronization.zip

    本文将围绕“C#多线程”中的一个重要概念——线程同步,结合“ThreadSynchronization.zip”压缩包中的示例代码进行深入探讨。 线程同步是指在多线程环境中控制不同线程对共享资源的访问,以避免数据不一致或竞争...

    Synchronization 线程与同步part2.pdf

    这是关于并列与分布式的文档第二部分,Synchronization 线程与同步,本开发文档适合对于并列与分布式感兴趣的有一定计算机基础初学的朋友,一个快捷的学习文档,学习完这两章内容朋友,可以自己尝试开发使用系统多...

    Synchronization 线程与同步part1.pdf

    本文档是关于并行计算与分布式计算的入门级教程,重点讲解了同步、线程以及进程等概念,并通过具体例子说明了如何将程序并行化处理。该文档分为两个部分,本文是第一部分,主要涵盖了并行程序设计的基础知识。 1. ...

    IOS线程管理,线程同步

    6. **队列同步(Queue Synchronization)**:通过在同一线程队列上提交任务实现同步,例如,主线程上的任务会按顺序执行,无需额外的同步措施。 了解了这些基础知识后,开发者可以根据实际需求选择合适的线程管理...

    同步.rar_matlab 同步_synchronization_同步MATLAB_同步控制_控制同步

    在标题“同步.rar_matlab 同步_synchronization_同步MATLAB_同步控制_控制同步”中,我们可以理解这是一个关于使用MATLAB进行同步控制的项目或教程。"synchronization"是英文的同步,而"同步MATLAB"和"同步控制"则指...

    Thread Synchronization in User Mode

    线程同步是多线程环境下确保线程间安全协作的技术。在用户模式下进行线程同步,意味着所有的同步操作都是由应用程序自己负责,而不是由操作系统内核来处理。这通常通过使用特定的同步原语实现,如互斥量、信号量、...

    线程同步测试

    在这个"线程同步测试"项目中,我们主要关注的是如何使用事件和信号量来实现线程之间的协调与同步。 首先,我们来理解一下“多线程”。在单个进程中,多线程允许同时执行多个不同的任务,这些任务在不同的线程中运行...

    linux-itss:linux线程间同步(Inter-Thread Synchronization)方式汇总

    linux线程间同步方式(inter-thread-synchronization)汇总 包含的同步方式 互斥量 条件变量 读写锁 自旋锁 barrier(这个不知道怎么翻译) #扩展 我的另一个仓库有关于linux进程间通信的方式汇总,,供参考,欢迎讨论...

    OFDM同步技术matlab程序_ofdm定时同步_ofdm同步_OFDM同步_cazac_synchronization_

    2017_Martin_Low-complexity timing synchronization for OFDM based on CAZAC and Golay sequencesmatlab定时同步,OFDM

    ARM同步原语synchronization primitives

    随着多核处理器的普及与高性能计算需求的增长,确保多线程程序正确同步变得至关重要。ARM体系结构作为一种广泛应用的嵌入式系统架构,在同步机制方面提供了丰富的支持。本文档将详细介绍ARMv6中的同步原语,并探讨在...

    Thread Synchronization

    ### 线程同步(Thread Synchronization) #### 一、引言 线程同步是多线程编程中的一个重要概念,主要用于解决多个线程并发访问共享资源时可能出现的数据不一致问题。在Java等支持多线程的语言中,理解并正确实现...

    O-RAN Fronthaul Working Group Control, User and Synchronization Plane Specificat

    《O-RAN Fronthaul Working Group Control, User and Synchronization Plane Specification》是O-RAN联盟发布的技术规范,主要关注O-RAN(开放无线接入网)的前传接口的控制、用户和同步平面的设计与实现。...

    Locks, Deadlocks, and Synchronization.doc

    锁、死锁与同步机制:深入理解与应用 在计算机科学领域,锁、死锁以及...通过深入理解锁、死锁与同步机制,开发者可以构建出更加健壮和高效的多线程应用程序,尤其是在高并发环境下,这些基础知识的应用显得尤为关键。

    msk.zip_MSK 定时同步_MSK 定时同步_MSK同步_MSK锁相环_Synchronization MSK

    标题中的“msk.zip_MSK 定时同步_MSK 定时同步_MSK同步_MSK锁相环_Synchronization MSK”表明这是一个关于MSK(Minimum Shift Keying,最小移频键控)信号处理的项目,其中包含了定时同步和锁相环(Phase-Locked ...

    并发、多线程、同步异步概念.docx

    ### 并发、多线程、同步异步概念解析...综上所述,理解和掌握并发、多线程、同步与异步的概念对于开发高效、可靠的软件系统至关重要。在实际应用中,需要根据具体的场景和技术限制,灵活选择合适的并发模型和同步机制。

    线程下载文件与主窗体同步下载进度演示

    2. **窗体同步(Form Synchronization)**:在多线程环境中,线程间的数据交互需要确保数据安全,避免竞态条件。本项目中,通过消息机制或者线程同步对象(如TEvent、TMutex等)实现了线程间的同步,确保了下载进度...

    Thread synchronization in Linux and Windows systems

    Windows系统中的线程同步涉及的函数和机制与Linux的pthreads有很大不同。例如,Windows使用CRITICAL_SECTION结构来实现临界区同步,这种方式是一种相对快速的锁定机制,用于确保同一时间只有一个线程可以执行临界...

    Android 线程 多线程 Multi-thread

    #### 二、顺序执行与同步 **1. Java线程机制** Java线程机制是抢占式的,这意味着调度机制会周期性地中断线程,并将其上下文切换到另一个线程,这样每个线程都会被分配一个合理的执行时间来驱动其任务。这种机制...

Global site tag (gtag.js) - Google Analytics