`
zsxxsz
  • 浏览: 446504 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

acl_cpp 非阻塞模块的IPC通信机制

 
阅读更多

      在 acl_cpp 的非阻塞框架的设计中,充分利用了操作系统平台的高并发机制,同时简化了异步编程的过程。但是,并不是所有的操作都是非阻塞的,现实的程序应用中存在着大量的阻塞式行为,acl_cpp 的非阻塞框架中设计了一种通过 ipc 模式使阻塞式函数与 acl_cpp 的非阻塞过程相结合的机制。即是说,在 acl_cpp 的主线程是非阻塞的,而把阻塞过程放在单独的一个线程中运行,当阻塞线程运算完毕,会以 ipc 方式通知主线程运行结果,这样就达到了阻塞与非阻塞相结合的模式。下图展示了 ipc 类的继承关系:

ipc 图

 

      以上图中有两个类与 ipc 相关:ipc_server 及 ipc_client,其中 ipc_server 其实用于与监听异步流相关,而ipc_client 与客户端连接流相关。在 ipc_server 类中有一个 open 函数用来打开本地监听地址,另有三个虚函数便于子类进行相关操作:

      1)on_accept:当监听流获得一个客户端连接时回调此函数,将获得的客户端流传递给子类对象;

      2)on_open:当用户调用 ipc_server::open 函数,且监听某一服务地址成功时通过该函数将实际的监听地址传递给子类对象(因为在调用 open(addr) 时,addr 一般仅指定 IP 地址,同时将 端口赋 0 以便于操作系统自动分配本地端口号,所以通过 on_open 便可以将实际的 端口传递给子类对象);

      3)on_close:当监听流关闭时调用此函数通知子类对象。

 

      相对于 ipc_server 类,则 ipc_client 的接口就显得比较多,主要的函数如下:

      1)open:共有四个 open 重载函数用连接监听流服务地址,其中两个是同步流方式,两个是异步流方式,一般同步建立的流用在阻塞式线程中,异步建立的流用在非阻塞线程中;

      2)send_message:一般用来向非阻塞主线程发送消息;

      3)on_message:异步接收到阻塞线程发来的消息的回调函数;

      4)append_message:添加异步流想要接收的消息号。

      ipc_client 类比较特殊,其充当着双重身份:1)作为客户端连接流连接监听流的服务地址,一般用在阻塞式线程中;2)作为监听流接收到来自于客户端流的连接请求而创建的与之对应的服务端异步流,一般用在非阻塞线程中。

 

      下面以一个具体的实例来说明如果使用 ipc_server 及 ipc_client 两个类:

 

 

#include "lib_acl.h"
#include <iostream>
#include "aio_handle.hpp"
#include "ipc_server.hpp"
#include "ipc_client.hpp"

using namespace acl;

#define MSG_REQ		1
#define MSG_RES		2
#define MSG_STOP		3

// 消息客户端连接类定义
class test_client1 : public ipc_client
{
public:
	test_client1()
	{

	}

	~test_client1()
	{

	}

	// 连接消息服务端地址成功后的回调函数
	virtual void on_open()
	{
		// 添加消息回调对象,接收消息服务器的
		// 此类消息
		this->append_message(MSG_RES);

		// 向消息服务器发送请求消息
		this->send_message(MSG_REQ, NULL, 0);

		// 异步等待来自于消息服务器的消息
		wait();
	}

	// 流关闭时的回调函数
	virtual void on_close()
	{
		delete this;
	}

	// 接收到消息服务器的消息时的回调函数,其中的消息号由
	// append_message 进行注册
	virtual void on_message(int nMsg, void*, int)
	{
		std::cout << "test_client1 on message:" << nMsg << std::endl;

		// 向消息服务器发送消息,通知消息服务器停止
		this->send_message(MSG_STOP, NULL, 0);

		// 删除在 on_open 中注册的消息号
		this->delete_message(MSG_RES);

		// 本异步消息过程停止运行
		this->get_handle().stop();

		// 关闭本异步流对象
		this->close();
	}
protected:
private:
};

// 消息客户端处理过程

static bool client_main(aio_handle* handle, const char* addr)
{
	// 创建消息连接
	ipc_client* ipc = new test_client1();
	
	// 连接消息服务器
	if (ipc->open(handle, addr, 0) == false)
	{
		std::cout << "open " << addr << " error!" << std::endl;
		delete ipc;
		return (false);
	}

	return (true);
}

// 子线程的入口函数
static void* thread_callback(void *ctx)
{
	const char* addr = (const char*) ctx;
	aio_handle handle;

	if (client_main(&handle, addr) == false)
	{
		handle.check();
		return (NULL);
	}

	// 子线程的异步消息循环
	while (true)
	{
		if (handle.check() == false)
			break;
	}

	// 最后清理一些可能未关闭的流连接
	handle.check();

	return (NULL);
}

// 消息服务器接收到的客户端连接流类定义
class test_client2 : public ipc_client
{
public:
	test_client2()
	{

	}

	~test_client2()
	{

	}

	virtual void on_close()
	{
		delete this;
	}

	// 接收到消息客户端发来消息的回调函数
	virtual void on_message(int nMsg, void*, int)
	{
		std::cout << "test_client2 on message:" << nMsg << std::endl;

		// 如果收到消息客户端要求退出的消息,则主线程了退出
		if (nMsg == MSG_STOP)
		{
			this->close();

			// 通知主线程的非阻塞引擎关闭
			this->get_handle().stop();
		}
		else
			// 回应客户端消息
			this->send_message(MSG_RES, NULL, 0);
	}
protected:
private:
};

// 主线程的消息服务器 ipc 服务监听类定义
class test_server : public ipc_server
{
public:
	test_server()
	{

	}

	~test_server()
	{

	}

	// 消息服务器接收到消息客户端连接时的回调函数	
	void on_accept(aio_socket_stream* client)
	{
		// 创建 ipc 连接对象
		ipc_client* ipc = new test_client2();

		// 打开异步IPC过程
		ipc->open(client);

		// 添加消息回调对象
		ipc->append_message(MSG_REQ);
		ipc->append_message(MSG_STOP);
		ipc->wait();
	}
protected:
private:
};

static void usage(const char* procname)
{
	printf("usage: %s -h[help] -t[use thread]\n", procname);
}

int main(int argc, char* argv[])
{
	int   ch;
	bool  use_thread = false;

	while ((ch = getopt(argc, argv, "ht")) > 0)
	{
		switch (ch)
		{
		case 'h':
			usage(argv[0]);
			return (0);
		case 't':
			use_thread = true;
			break;
		default:
			break;
		}
	}

	acl_init();

	aio_handle handle;

	ipc_server* server = new test_server();

	// 使消息服务器监听 127.0.0.1 的地址
	if (server->open(&handle, "127.0.0.1:0") == false)
	{
		delete server;
		std::cout << "open server error!" << std::endl;
		getchar();
		return (1);
	}

	char  addr[256];
#ifdef WIN32
	_snprintf(addr, sizeof(addr), "%s", server->get_addr());
#else
	snprintf(addr, sizeof(addr), "%s", server->get_addr());
#endif

	if (use_thread)
	{
		// 使消息客户端在子线程中单独运行
		acl_pthread_t tid;
		acl_pthread_create(&tid, NULL, thread_callback, addr);
	}

	// 因为消息客户端也是非阻塞过程,所以也可以与消息服务器
	// 在同一线程中运行
	else
		client_main(&handle, addr);

	// 主线程的消息循环过程
	while (true)
	{
		if (handle.check() == false)
		{
			std::cout << "stop now!" << std::endl;
			break;
		}
	}

	delete server;
	handle.check();  // 清理一些可能未关闭的异步流对象

	std::cout << "server stopped!" << std::endl;
	getchar();
	return (0);
}

 

 

      以上例子相对简单,其展示的消息服务器与消息客户端均是非阻塞过程,其实将上面的异步消息客户端稍微一改便可以改成同步消息客户端了,修改部分如下:

 

 

class test_client3 : public ipc_client
{
public:
	test_client3()
	{

	}

	~test_client3()
	{

	}

	virtual void on_open()
	{
		// 添加消息回调对象
		this->append_message(MSG_RES);

		// 向消息服务器发送请求消息
		this->send_message(MSG_REQ, NULL, 0);

		// 同步等待消息
		wait();
	}

	virtual void on_close()
	{
		delete this;
	}

	virtual void on_message(int nMsg, void*, int)
	{
		std::cout << "test_client3 on message:" << nMsg << std::endl;
		this->send_message(MSG_STOP, NULL, 0);
		this->delete_message(MSG_RES);
		this->close();
	}
protected:
private:
};

// 子线程处理过程

static bool client_main(const char* addr)
{
	// 创建消息客户端对象
	ipc_client* ipc = new test_client3();
	
	// 同步方式连接消息服务器
	if (ipc->open(addr, 0) == false)
	{
		std::cout << "open " << addr << " error!" << std::endl;
		delete ipc;  // 当消息客户端未成功创建时需要在此处删除对象
		return (false);
	}

	return (true);
}

static void* thread_callback(void *ctx)
{
	const char* addr = (const char*) ctx;

	if (client_main(addr) == false)
		return (NULL);
	return (NULL);
}

 

   对比 test_client1 与 test_client_3 两个消息客户端,可以发现二者区别并不太大,关键在于调用 open 时是采用了异步还是同步连接消息服务器,其决定了消息客户端是异步的还是同步的。

 

      示例代码:samples/aio_ipc

 

      个人微博:http://weibo.com/zsxxsz

      acl_cpp 下载:http://sourceforge.net/projects/acl/

      原文地址:http://zsxxsz.iteye.com/blog/1495832

      更多文章:http://zsxxsz.iteye.com/

      QQ 群:242722074

分享到:
评论

相关推荐

    acl_cpp:用于win32 / linux,服务器框架,HttpServlet的功能强大的c ++库-开源

    acl_cpp(已包含在acl项目中:https://sourceforge.net/projects/acl/,请从acl项目url下载)是acl的c ++包装库,并且acl_cpp比...使用acl_cpp,您将获得更强大的acl功能,并且可以快速开发,进行模块编程,祝您好运!

    基于acl库封装c\c++ 的redis-client客户端源码

    redis-acl\lib_acl_cpp\samples\redis路径下,把lib_acl_vc2010d.lib、lib_acl_cpp_vc2010d.lib放到 \redis-acl\lib_acl_cpp\samples\redis\redisclient路径下: 依赖的库 需要下载开源 源码编出来 : svn://svn....

    acl_cpp:一个强大的 c++ 库,用于 win32/linux、服务器框架、HttpServlet-开源

    acl_cpp(已包含在acl项目:https://sourceforge.net/projects/acl/,请从acl项目url下载)是acl的c++封装库,acl_cpp比acl有很多有用的功能...有了acl_cpp,你会得到更强大的acl功能,快速开发,模块编程,祝你好运!

    acl-master.zip_ACL库_c++ acl_git acl-master

    acl_cpp 是基于 acl 库的 C++ 库,包括 MIIME 解析、Handlersocket 客户端库、数据库连接池(支持mysql/sqlite)、WEB 编程、数据库编程、阻塞/非阻塞数据流等内容。

    acl.2.1.2.8.src.2012.7.2.zip_ACL_ACL_http_http json_协议

    同步模式适用于简单请求-响应的交互,而异步模式则适用于需要处理大量并发请求的情况,通过非阻塞I/O和事件通知机制提高性能。 3. **常用数据结构**:为了方便数据处理,ACL提供了多种高效的数据结构,如链表、队列...

    CISCO_ACL__思科_访问控制列表

    CISCO_ACL__思科_访问控制列表

    acl.zip_ACL_权限控制

    在IT行业中,访问控制列表(Access Control List,简称ACL)是一种常见的权限管理机制,用于定义哪些用户或系统实体可以访问特定资源。在这个“acl.zip_ACL_权限控制”的压缩包中,我们有四个PHP文件:p2.php、acl_1...

    acl.rar_ACL_Agent_jade_jade agent_jade java

    在这个“ACL_Agent_jade_jade_agent_jade_java”主题中,我们将探讨如何在JADE上构建和配置agent,以及它们如何使用ACL进行通信。 首先,了解JADE的基本结构是至关重要的。JADE由几个核心组件组成,包括:Agent容器...

    Python库 | acl_iitbbs-0.1-py3-none-any.whl

    Python库的设计通常遵循模块化原则,因此`acl_iitbbs`可能包含多个子模块,每个子模块都有特定的功能,如登录认证、帖子检索、分类浏览、用户管理等。为了深入了解这个库,我们需要查看其文档或源代码,了解它的API...

    Acl_pong-master.zip_it

    首先,"Acl_pong-master"这个名字暗示了这是一个与访问控制列表(Access Control List,简称ACL)相关的项目,可能是为了实现特定的游戏权限管理或者安全机制。然而,"pong"通常指的是经典的乒乓游戏,这种游戏在...

    acl_mysql_util

    acl_mysql_util是一个针对MySQL数据库和ACL(Access Control List,访问控制列表)的实用工具库,它为开发者提供了方便的接口,以便在应用中更高效、更安全地进行数据操作。这个库是基于MySQL的原生接口进行封装,...

    posix_acl.rar_V2

    通常,这样的代码会包含如`acl_get_file()`, `acl_set_file()`, `acl_init()`, `acl_delete_def_file()`等函数,用于获取、设置、初始化和删除默认ACL。此外,还可能有处理错误的函数,以及与系统调用交互的接口。 ...

    acl C++跨平台库.rar

    lib_acl_cpp 库的 db 模块依赖于 mysql 客户端库、sqlite 库,stream 流模块依赖于 polarssl 库(该库源码附在 acl/resource 目录下),另外,在 UNIX/LINUX 平台下还需要压缩库 --- libz 库(一般 LINUX 会自带该...

    acl.rar_ACL_control

    在IT行业中,访问控制列表(Access Control List,简称ACL)是一种重要的安全机制,它用于管理对资源的访问权限。QEMU,全称Quick Emulator,是一个功能强大的开源虚拟化软件,能够模拟各种CPU架构和硬件环境。在...

    acl-3.0.19

    整个 acl 项目主要包含三个函数库:lib_acl(纯C开发的基础库,主要包含网络通信及服务器编程框架以及其它丰富的功能)、lib_protocol(包含 HTTP/PING/SMTP 通信协议的C语言实现)、lib_acl_cpp(基于 lib_acl 及 lib_...

    acl_QC实用工具.ppt

    acl_QC实用工具.ppt

    posix_acl.rar_aclfun

    2. `acl_get_entry()`: 此函数用于获取ACL中的特定条目,可能用于读取已有的权限设置。 3. `acl_set_entry()`: 这个函数用来修改ACL中的条目,比如更新特定用户的权限。 4. `acl_add_entry()`: 添加新的ACL条目,...

    luyouqi.rar_ACL_路由器

    本压缩包文件"luyouqi.rar_ACL_路由器"提供了关于标准ACL配置的详细资料,旨在帮助用户理解和实施路由器的ACL设置。 首先,我们来看"标准ACL"的概念。标准ACL是基于源IP地址进行过滤的规则集合,允许或拒绝特定网络...

    ACL_MAIN

    综上所述,"ACL_MAIN"涉及的核心知识点包括JavaScript编程、访问控制列表的原理和实现、Web应用的中间件机制以及可能的数据结构和算法。具体的实现细节将依赖于`acl_main.js`的源码内容,包括如何定义用户、角色、...

Global site tag (gtag.js) - Google Analytics