`
zsxxsz
  • 浏览: 446501 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ipc_service 类:阻塞与非阻塞混合编程

 
阅读更多

      非阻塞编程主要解决了网络通讯中高并发的问题,采用非阻塞方式,服务器不必为每个连接启动单独的进程或线程,从而大大地减少了系统资源的浪费;但是现实网络应用中,阻塞应用又是不可避免的,如我们对数据库编程时使用的数据操作的客户端库本身就是阻塞的。因此,单纯的非阻塞模式或阻塞模式均不能很好地胜任互联应用,如果能够将一些必要的阻塞过程融合进非阻塞过程中将会是一个现实的需求。本文主要介绍了如何保证在使用 acl_cpp 的非阻塞框架的同时,可以把阻塞的过程与非阻塞过程进行整合。关于非阻塞编程,可以参考 acl_cpp开发--非阻塞网络编程 中的章节。

      本文讨论的内容是建立在 acl_cpp 非阻塞模块的IPC通信机制 内容的基础之上,有两个基础类:ipc_service 类用来粘合阻塞与非阻塞过程,ipc_request 做为阻塞调用过程的基础类,提供了需要子线程中进行阻塞式过程的虚方法。

      ipc_service 类的继承关系图如下:

 

 

      可以看出,ipc_service 是从 ipc_server 类继承而来,在 ipc_service 中定义了受保护的成员方法: request(ipc_request*) ,它将被子类对象调用向子线程或线程池发送任务请求;同时在 ipc_service 中还专门针对基于 win32 消息窗口定义了虚方法:win32_proc,该虚方法被子类继承后用于主窗口过程接收来自于子线程的结果处理消息;另外,在该图中还专门封装了三个实际应用的子类 http_service、db_service、dns_service。在 ipc_service 的构造函数可以指定任务线程池中最大线程数,同时还可以指定是否采用基于 win32 窗口消息的 IPC 通信机制。

 

      ipc_request 类的继承关系图如下:

 

      ipc_request 的类对象在独立的子线程中运行,运行结果通过 ipc_client 通道给非阻塞的主线程发送消息(对于win2的窗口过程,可以发送窗口消息,此时 ipc_service 的类对象将会通过 win32_proc 虚方法接收消息)。另外,ipc_request 的两个重载方法: run(ipc_client*) 和 run(HWND) 都是在子线程中运行的。

      程序执行的流程为:

      1)在主线程中创建 ipc_service 对象实例,并监听某一端口:ipc_service::open(aio_handle*, const char* addr="127.0.0.1:0") ---> 主线程进行异步事件循环过程:while (true) { aio_handle::check(); }

      2)在主线程中创建 ipc_request 对象实例,并创建需要在子线程中处理的阻塞式任务 ---> 在主线程 ipc_service 的对象中调用 ipc_service 的异步请求方法 request(ipc_request*) 将阻塞请求任务 ipc_request 传递给子线程

      3)在子线程中调用 ipc_request::run 阻塞过程处理用户的请求

      4)在子线程中调用 ipc_client::send_message 方法将结果数据以 ipc 消息的方式通知主线程

      5)在主线程中接收到来自于子线程的结果处理消息后,将结果传递给用户过程

 

      以下以 dns_service 为例,详细讲解 ipc_service 与 ipc_request 在主线程和子线程中的行为过程:

      1)主线程中 dns_service 的主要代码如下:

 

 

	dns_service::dns_service(int nthread /* = 1 */, bool win32_gui /* = false */)
		: ipc_service(nthread, win32_gui)
	{

	}

	dns_service::~dns_service()
	{

	}

	// 主线程的 dns_service 对象接收到子线程消息的 ipc 连接请求时的回调函数	
	void dns_service::on_accept(aio_socket_stream* client)
	{
		// 创建 ipc_client 消息通道,同时用异步连接流作为通讯对象
		ipc_client* ipc = NEW dns_ipc(this);
		ipc->open(client);

		// 添加消息回调对象
		ipc->append_message(IPC_RES);
		// 异步等待来自于子线程的消息
		ipc->wait();
	}

#ifdef WIN32

	// 当接收 WIN32 窗口消息时的回调过程
	void dns_service::win32_proc(HWND hWnd, UINT nMsg, WPARAM wParam, LPARAM lParam)
	{
		if (nMsg != IPC_RES + WM_USER)
		{
			logger_error("invalid nMsg(%d)", nMsg);
			return;
		}
		else if (lParam == 0)
		{
			logger_error("lParam invalid");
			return;
		}

		DNS_IPC_DATA* dat = (DNS_IPC_DATA*) lParam;
		dns_res* res = dat->res;

		on_result(*res);
		delete res;

		// 在采用 WIN32 消息时该对象空间是动态分配的,所以需要释放
		acl_myfree(dat);
	}

#endif

	// 用户调用此函数查询某个域名,需要查询的域名及域名查询结果
	// 均在 dns_result_callback 对象中记录
	void dns_service::lookup(dns_result_callback* callback)
	{
		// 如果发现有针对相同域名的查询过程,只需要将本查询过程
		// 下相同域名的查询过程合并,加入原来的查询过程的列表中,
		// 减少针对同一域名的查询次数
		std::list<dns_result_callback*>::iterator it;
		const char* domain = callback->get_domain().c_str();

		for (it= callbacks_.begin(); it != callbacks_.end(); it++)
		{
			if ((*it)->get_domain() == domain)
			{
				callbacks_.push_back(callback);
				return;
			}
		}

		callbacks_.push_back(callback);

		ipc_request* req = NEW dns_request(domain);

		// 调用基类 ipc_service 请求过程
		request(req);
	}

	// 查询结果的回调函数
	void dns_service::on_result(const dns_res& res)
	{
		std::list<dns_result_callback*>::iterator it, next;

		it= callbacks_.begin();
		for (; it != callbacks_.end();)
		{
			next = it;
			next++;
			if ((*it)->get_domain() == res.domain_.c_str())
			{
				// 通知请求对象的解析结果
				(*it)->on_result((*it)->get_domain(), res);
				(*it)->destroy(); // 调用请求对象的销毁过程
				callbacks_.erase(it);
				it = next;
			}
			else
				it++;
		}
	}

 

 

      2)在 dns_service::on_accept 函数中创建 ipc_client 通道的子类定义如下:

 

	class dns_ipc : public ipc_client
	{
	public:
		dns_ipc(dns_service* server)
			: server_(server)
		{

		}

		~dns_ipc()
		{

		}

		// 在主线程接收到来自于子线程的消息时的回调函数
		// 因为在 dns_service::on_accept 中将该类对象
		// 与相应的消息号进行了绑定
		virtual void on_message(int nMsg acl_unused,
			void* data, int dlen acl_unused)
		{
			if (nMsg != IPC_RES)
			{
				logger_error("invalid nMsg(%d)", nMsg);
				this->close();
				return;
			}

			// 转换子线程传来的数据内容
			DNS_IPC_DATA* dat = (DNS_IPC_DATA*) data;
			dns_res* res = dat->res;

			// 调用主线程中的结果接收过程
			server_->on_result(*res);
			delete res;
		}
	protected:
		virtual void on_close()
		{
			// 因为该类对象在 dns_service::on_accept 中是动态创建的,
			// 所需要当 ipc_client 通道关闭时需要自行释放所占内存
			delete this;
		}
	private:
		dns_service* server_;
	};

 

 

      3)dns_request 类定义如下(该类对象是在主线程中创建的,但其中的阻塞方法:run 却是在子线程中运行的):

 

	// 由子线程传递给主线程的数据类型
	struct DNS_IPC_DATA
	{
		dns_res* res;
	};


	// 由子线程传递给主线程的消息号
	typedef enum
	{
		IPC_RES
	};

	///////////////////////////////////////////////////////////////////////

	class dns_request : public ipc_request
	{
	public:
		dns_request(const char* domain)
			: domain_(domain)
		{

		}

		~dns_request()
		{

		}
	
		// 当 dns_request 类对象在 dns_service::lookup 方法中通过 
		// request(ipc_request) 过程传递给子线程后,子线程便会连接
		// 主线程中 dns_service 监听的消息服务器地址,连接成功后调用
		// dns_request::run 虚方法,同时将 ipc_client 通道传入

		virtual void run(ipc_client* ipc)
		{
			// 阻塞式查询域名
			ACL_DNS_DB* db = acl_gethostbyname(domain_.c_str(), NULL);

			// 将查询结果放在自定义的结构中
			data_.res = NEW dns_res(domain_.c_str());

			if (db != NULL)
			{
				ACL_ITER iter;
				acl_foreach(iter, db)
				{
					ACL_HOSTNAME* hn = (ACL_HOSTNAME*) iter.data;
					data_.res->ips_.push_back(hn->ip);
				}

				acl_netdb_free(db);
			}

			// 向主线程发送结果
			ipc->send_message(IPC_RES, &data_, sizeof(data_));

			// 销毁本类对象,因为其是动态分配的
			delete this;
		}

#ifdef WIN32

		// 基类虚接口,使子线程可以在执行完任务后向主线程发送 WIN32 窗口消息

		virtual void run(HWND hWnd)
		{
			ACL_DNS_DB* db = acl_gethostbyname(domain_.c_str(), NULL);
			DNS_IPC_DATA* data = (DNS_IPC_DATA*)
				acl_mymalloc(sizeof(DNS_IPC_DATA));
			data->res = NEW dns_res(domain_.c_str());

			if (db != NULL)
			{
				ACL_ITER iter;
				acl_foreach(iter, db)
				{
					ACL_HOSTNAME* hn = (ACL_HOSTNAME*) iter.data;
					data->res->ips_.push_back(hn->ip);
				}

				acl_netdb_free(db);
			}

			// 向主线程发送结果
			::PostMessage(hWnd, IPC_RES + WM_USER, 0, (LPARAM) data);

			// 销毁本类对象,因为其是动态分配的
			delete this;
		}
#endif
	private:
		string  domain_;
		DNS_IPC_DATA data_;
	};

 

 

      通过以上三个类的实现便完成了主线程的非阻塞过程与子线程的阻塞过程的结合。下面是一个具体的使用如上 dns_service 类的例子:

 

 

#ifdef WIN32
#include "acl/lib_acl.h"
#else
#include "lib_acl.h"
#include <getopt.h>
#endif
#include <iostream>
#include "aio_handle.hpp"
#include "dns_service.hpp"

using namespace acl;

static void usage(const char* procname)
{
	printf("usage: %s -h[help] -t[use thread]\n", procname);
}

static int __ncount = 0;

class dns_result : public dns_result_callback
{
public:
	dns_result(const char* domain)
		: dns_result_callback(domain)
	{

	}

	~dns_result()
	{

	}

	virtual void destroy()
	{
		delete this;
		__ncount--;
	}

	virtual void on_result(const char* domain,  const dns_res& res)
	{
		std::cout << "result: domain: " << domain;
		if (res.ips_.size() == 0)
		{
			std::cout << ": null" << std::endl;
			return;
		}

		std::cout << std::endl;

		std::list<string>::const_iterator cit = res.ips_.begin();
		for (; cit != res.ips_.end(); cit++)
			std::cout << "\t" << (*cit).c_str();
		std::cout << std::endl;
	}
};

int main(int argc, char* argv[])
{
	int   ch, nthreads = 2;

	while ((ch = getopt(argc, argv, "ht:")) > 0)
	{
		switch (ch)
		{
		case 'h':
			usage(argv[0]);
			return (0);
		case 't':
			nthreads = atoi(optarg);
			break;
		default:
			break;
		}
	}

	acl_init();

	aio_handle handle;

	const char* domain = "www.baidu.com";
	dns_service* server = new dns_service(nthreads);

	// 使消息服务器监听 127.0.0.1 的地址
	if (server->open(&handle) == false)
	{
		delete server;
		std::cout << "open server error!" << std::endl;
		getchar();
		return (1);
	}

	// 创建查询结果接收对象,并进行查询

	dns_result* result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	domain = "www.sina.com";
	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	domain = "www.51iker.com";
	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	domain = "www.hexun.com";
	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	domain = "www.com.dummy";
	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	result = new dns_result(domain);
	server->lookup(result);
	__ncount++;

	// 异步消息循环过程
	while (true)
	{
		if (handle.check() == false)
		{
			std::cout << "stop now!" << std::endl;
			break;
		}
		if (__ncount == 0)
			break;
	}

	// 销毁 dns_service 动态对象
	delete server;
	// 做最后的清理工作,以释放延迟释放的连接对象
	handle.check();

	std::cout << "server stopped!" << std::endl;
	getchar();
	return (0);
}

 

      个人微博:http://weibo.com/zsxxsz

      示例代码:samples/aio_dns

      acl_cpp 下载:http://sourceforge.net/projects/acl/

      原文地址:http://zsxxsz.iteye.com/blog/1482208

      更多文章:http://zsxxsz.iteye.com/

      QQ 群:242722074

分享到:
评论

相关推荐

    Tuya_IPC_SDK_接口说明文档1

    * 修改接口:tuya_ipc_upgrade_progress_report、tuya_ipc_ss_donwload_pre、tuya_ipc_ss_download_set_status、tuya_ipc_ai_detect_storage_pause、tuya_ipc_ai_detect_storage_resume、tuya_ipc_notify_with_event...

    Nodejs cluster报错Error [ERR_IPC_CHANNEL_CLOSED]: channel closed

    2019-12-16 14:33:02,pid-170096, Error [ERR_IPC_CHANNEL_CLOSED]: channel closed at ChildProcess.target.send (internal/child_process.js:578:16) at Worker.send (internal/cluster/worker.js:54:28) 经...

    IPC_E1_CN_STD_5.5.55_190128_海康威视升级软件_55_

    标题中的"IPC_E1_CN_STD_5.5.55_190128_海康威视升级软件_55_"表明这是一款海康威视IPC(网络摄像机)的E1型号的固件升级包,版本号为5.5.55,发布日期为2019年1月28日。"海康威视"是中国知名的安防监控设备制造商,...

    「移动安全」IPC_You_Outside_the_Sandbox:One_bug_to_Rule_the_Chrome_

    「移动安全」IPC_You_Outside_the_Sandbox:One_bug_to_Rule_the_Chrome_Broker - 数据治理 防火墙 安全人才 APT 安全意识 应急响应

    信息安全_数据安全_IPC_You_Outside_the_Sandbox:One_.pdf

    AppCache的进程间通信(IPC)接口被描述为攻击者可能利用的另一种途径,因为它允许与浏览器进程进行交互。 此外,文档还暗示了使用AI和云数据库作为自动化情报处理和安全解决方案的一部分,可能涉及实时威胁检测、...

    ipc.zip_IPC_IPC CREATE

    本文将详细解析标题“ipc.zip_IPC_IPC CREATE”中涉及的三个主要函数:`msq_create_ipc`、`msq_send_ipc`和`msq_receive_ipc`,它们与消息队列这一特定的IPC形式密切相关。 首先,我们要理解消息队列是一种存储有限...

    IPC_JEDEC JP002:2006

    IPC_JEDEC JP002:2006 Current Tin Whiskers Theory and Mitigation Practices Guideline(目前的锡须理论和缓解措施指南) - 完整英文电子.pdf

    ipc.rar_IPC_IPC Linux_ipc 进程通信_linux ipc_linux 多进程

    在Linux操作系统中,IPC(Inter-Process Communication,进程间通信)是多个进程间进行数据交换和协调工作的重要机制。这个名为“ipc.rar”的压缩包包含了关于IPC在Linux环境下的实践案例,主要使用C语言实现。下面...

    IPC.rar_IPC_java i_java ipc_java共享内存_共享内存

    标题“IPC.rar_IPC_java i_java ipc_java共享内存_共享内存”以及描述“IPC共享内存,文件映射编程,实现原理详解”都指向了一个核心主题:Java中的进程间通信(IPC)以及如何利用共享内存进行数据交换。在这个话题中...

    IPC.rar_ipc 多核_ti dsp_多核_IPC_多核IPC通信_核间IPC

    标题中的“IPC.rar_ipc 多核_ti dsp_多核_IPC_多核IPC通信_核间IPC”揭示了本文将探讨的主题,主要集中在基于TI DSP(数字信号处理器)的多核系统中,如何实现进程间通信(IPC, Inter-Process Communication)以及...

    ipc.rar_ ipc_IPC_linux ipc_pipes_进程间通信ipc

    在Linux操作系统中,进程间通信(IPC,Inter-Process Communication)是多个进程之间进行数据交换的重要机制。本文将深入探讨几种常见的Linux IPC机制,包括管道(Pipes)、命名管道(Named Pipes, FIFOs)、信号...

    IPC.rar_IPC_ipc ccs_多核通信_多核间通信_核间通信

    中断允许处理器暂停当前的任务,处理突发事件,然后返回到原来的任务,这种非阻塞的方式提高了系统的实时性和效率。中断处理通常包括中断请求、中断向量查找、中断处理程序执行以及中断恢复四个步骤。在多核环境下,...

    ipc.rar_IPC_linux msgrcv_msgrcv_msgsnd_发送消息 linux

    本压缩包文件“ipc.rar”包含了与Linux IPC相关的示例代码,特别是使用消息队列进行通信的方法。消息队列允许进程将数据结构(如字符串或结构体)作为消息发送,并由其他进程接收,提供了异步通信的能力。 标题...

    iPC_D3.1:WP3 D3.1中执行的分析代码

    在iPC_D3.1框架中对基因表达数据集进行反卷积 Jane Merlevede和Andrei Zinovyev `r format(Sys.time(),'%​​m /%d /%y')` bookdown :: html_document2 托克 toc_depth number_sections df_print 真的...

    IPC009_3.5.1_0217.zip

    【标题】"IPC009_3.5.1_0217.zip" 提供的是小米智能摄像机云台版的固件更新或者数据包,这个命名格式通常表示产品型号(IPC009)结合版本号(3.5.1)以及发布日期(0217,可能是2021年2月17日)。在IT领域,这种命名...

    Android编程IPC_培训视频.7z

    在Android系统中,IPC(Inter-Process Communication)是不同应用程序之间进行通信的一种技术,它允许一个应用进程与另一个运行在不同内存空间的应用进程交互。在Android的架构中,由于安全性和资源隔离的原因,每个...

    IPC1A__2S__201318666:练习 3

    标题 "IPC1A__2S__201318666: 练习 3" 提到的可能是某个编程课程的作业或项目,它属于 "编程与计算导论 1" 的 "A" 部分,该课程可能在2014年第二学期进行,由 Celeste Marilú Duarte Amaya 教授,学生编号为 ...

    android_system_service_example:Android IPC系统服务示例

    # In AOSP topdir## sourcesource build/envsetup.sh## lunchlunch XXXX# to source codecd android_system_service_example# Compilemm使用Android SDK和NDK 系统服务绑定程序API是用于自定义ROM的黑客技术,因此...

Global site tag (gtag.js) - Google Analytics