`
sogo6
  • 浏览: 113988 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题

C++ 
阅读更多
1. 使用ACE_Proactor模式,客户端实现代码如下:


void TClientHandler::open(ACE_HANDLE handle, ACE_Message_Block &msgBlock)
{  
    this->handle(handle);
    int size = 0;  
    int buf_len = sizeof(int);  
    ACE_OS::getsockopt(handle, SOL_SOCKET, SO_RCVBUF, (char*)(&size), &buf_len);
    iLog.info("Receive size is %d", size);
    size = size * 10;  
    ACE_OS::setsockopt(handle, SOL_SOCKET, SO_RCVBUF, (const char*)(&size), sizeof(int));

    size = 0;
    ACE_OS::getsockopt(handle, SOL_SOCKET, SO_SNDBUF, (char*)(&size), &buf_len);
    iLog.info("Send size is %d", size);
    size = size * 10;
    ACE_OS::setsockopt(handle, SOL_SOCKET, SO_SNDBUF, (const char*)(&size), sizeof(int));

    if (this->m_reader.open (*this) != 0)
    {
        delete this;
        return;
    }  
    if (this->m_writer.open (*this) != 0)
    {
        delete this;
        return;
    }

    ACE_Message_Block *mb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
    if (this->m_reader.read (*mb, mb->space ()) != 0)
    {
        delete this;
        return;
    }

    //  向tmc发送注册命令
    TMessage *tmsg = new (sizeof(TMessage))TMessage;
    tmsg->msgHeader.version = TVERSION_BASE;
    tmsg->msgHeader.cmdCode = CMD_LOGIN;
    tmsg->msgHeader.sendProcType = getLocalProcType();
    tmsg->msgHeader.sendProcHandle =  getLocalProcHandle();
    this->write_message(tmsg, sizeof(TMessage));

    return;
}

//异步读完成后会调用此函数  
void TClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{  
    ACE_Message_Block &mbb = result.message_block();  
    if (!result.success () || result.bytes_transferred () == 0)
    {
        // 知道remoteaddress是哪个?
        mbb.release();
        delete this;
        return;
    }
    
    ACE_Message_Block *mb = NULL;
    do 
    {
        // 用这种方法尽量保证可以clone到数据
        mb = mbb.clone();
    } while (NULL == mb);

    mbb.release();

    // 根据消息类型,将消息放入到不同的队列里面
    TMessage *tmsg = NULL;
    tmsg = (TMessage *)mb->base();

    // 每个接收的消息都记录一下日志
    //writeLog(tmsg);
    if (MT_RESPONSE == tmsg->msgHeader.msgType)
    {
        // 如果是响应消息就放入到响应队列里
        TResponseQueue::instance()->enqueue(mb);
        //iLog.info("Put message to response queue, retCode=%d", retCode);
    }
    else
    {
        // 走到这里都是请求消息或者通知消息,还有异步请求与异步响应
        if (CMD_LOGIN == tmsg->msgHeader.cmdCode)
        {
            // 如果是命令码为CMD_LOGIN的通知消息,先处理一下,再放入消息队列
            iLog.info("Login notice, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);
            // 这个消息不需要放入队列,在这里release掉
            mb->release();

            // 发异步消息取相应TMC上所有连接的进程
            TMessage *mymsg = new (sizeof(TMessage))TMessage;
            mymsg->msgHeader.version = TVERSION_BASE;
            mymsg->msgHeader.msgType = MT_ASYN_REQUEST;  // 发送异步请求消息
            mymsg->msgHeader.cmdCode = CMD_GETPROCINTMC;
            mymsg->msgHeader.sendProcType = getLocalProcType();
            mymsg->msgHeader.sendProcHandle =  getLocalProcHandle();
            this->write_message(mymsg, sizeof(TMessage));

        }
        else if (CMD_GETPROCINTMC == tmsg->msgHeader.cmdCode)
        {
            iLog.info("Process in tmc, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);

            // 这个消息不需要放入队列,在这里release掉
            mb->release();
        }
        else
        {
            // 如果是同步请求消息,异步请求,通知消息和,异步请求,异步响应,就放入TikyTaskMgr队列
            TikyTaskMgr *taskMgr = TikyAppMgr::instance()->getTaskMgr(DEFAULT_TASKMGRID);
            if (NULL != taskMgr)
            {
                taskMgr->putq(mb);
                //iLog.info("Put messaget to taskmgr, retCode=%d", retCode);
            }
            else
            {
                iLog.error("TaskMgr(%d) is not found...", DEFAULT_TASKMGRID);
                
                // 回送一个响应消息,没有找到TaskMgr
                TMessage *rspMsg = new (sizeof(TMessage))TMessage;
                rspMsg->msgHeader.msgType = MT_RESPONSE;
                rspMsg->msgHeader.sendProcType = tmsg->msgHeader.recvProcType;
                rspMsg->msgHeader.sendProcHandle = tmsg->msgHeader.recvProcHandle;
                rspMsg->msgHeader.recvProcType = tmsg->msgHeader.sendProcType;
                rspMsg->msgHeader.recvProcHandle = tmsg->msgHeader.sendProcHandle;
                rspMsg->msgHeader.sn = tmsg->msgHeader.sn;
                // 没有TaskMgr,就回送一个错误信息
                rspMsg->msgHeader.retCode = ERR_TCOM_NOTFOUNDTASKMGR;
                this->write_message(rspMsg, sizeof(TMessage));
                mb->release();
            }
        }
    }

    ACE_Message_Block *nmb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
    if (this->m_reader.read(*nmb, nmb->space()) != 0)
    {
        nmb->release();
        delete this;
        return;
    }
}

//异步写完成后会调用此函数
void TClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
    iLog.info("Handle result is %d", result.success());
    ACE_Message_Block &mb = result.message_block();
    iLog.info("Write result=%ld, trans=%d, write=%d, len=%d", result.error(), result.bytes_transferred(),
        result.bytes_to_write(), mb.length());
    TMessage *tmsg = (TMessage *)mb.base();
    // 记录一下发送消息日志
    writeLog(tmsg);
    delete tmsg;
    mb.release();
    return;
}

int TClientHandler::write_message(TMessage *tmsg, unsigned short msgLength)
{
    if (NULL == tmsg)
    {
        iLog.error("[TClientHandler::write_message] tmsg is NULL.");
        return -1;
    }

    ACE_Message_Block *mbb = new ACE_Message_Block((const char *)tmsg, msgLength);
    // 要设置一下wr_ptr的指针,要不mbb->length()返回的会是0
    mbb->wr_ptr(msgLength);
    int retCode = this->m_writer.write(*mbb, mbb->length());
    if (retCode != 0)
    {
        iLog.error("Send msg failure, lasterror=%d, length=%d, retCode=%d", ACE_OS::last_error(), mbb->length(), retCode);
        delete tmsg;
        mbb->release();
        delete this;
    }
    return retCode;
}


写一个循环不间断调用write_message发送数据,会看到发送的数据,会用一个TCP报文发出去,不知道有没有人遇到相同的问题?
分享到:
评论

相关推荐

    ACE_Proactor网络通信示例代码

    这个压缩包包含的是一组使用ACE_Proactor实现TCP通信的示例代码,我们可以从这些文件中学习到如何在C++中运用ACE库来处理网络通信。 首先,`Tcp.cpp`可能是主程序入口,它可能负责初始化ACE_Proactor,并创建必要的...

    ACE_Proactor TCP协议通信示例代码

    2. **ACE_Proactor模式**:ACE_Proactor是基于事件的非阻塞I/O模型,它使用了异步事件驱动的编程模型。在该模型中,I/O操作是启动后立即返回的,然后由Proactor在后台处理,当操作完成时,Proactor通过回调函数通知...

    tpd_reactor_proactor.pdf

    "Reactor 和 Proactor 模式在网络编程中的应用" Reactor 和 Proactor 模式是两种常见的事件处理模式,在网络编程中广泛应用于设计高效、可靠的并发和网络应用程序。在本文中,我们将详细介绍 Reactor 和 Proactor ...

    ACE.zip_ACE_ACE p_ACE source code

    标题中的"ACE.zip_ACE_ACE p_ACE source code"表明这是一个包含ACE库源代码的压缩文件。"ACE p"可能是指"ACE Proactor",它是ACE库的一部分,提供了异步事件处理的能力。"ACE source code"则直接指明了压缩包内的...

    ACE.zip_ACE 网络_ace_static.dsw_开源_开源软件_金融

    标题中的“ACE.zip_ACE 网络”表明这个压缩包包含的是ACE库的相关文件,而“ace_static.dsw”则是指向一个开发工作空间文件,通常用于Visual Studio环境下编译ACE静态库。 "开源"标签揭示了ACE项目遵循开放源代码的...

    ACE.rar_ACE 网络 编程_visual c

    3. 数据传输:通过ACE_SOCK_Stream读写数据,实现客户端和服务器间的通信。 通过深入阅读和实践"ACE.chm"文档,你将能够掌握ACE的基本用法,理解其设计哲学,从而在实际项目中灵活运用,提升网络编程的能力。

    ACE继承层次图

    - **ACE_Asynch_Write_Stream**:用于异步写入流数据。 #### 五、小结 通过分析ACE继承层次图,我们可以看到ACE框架是如何通过类的继承和组合来构建一个灵活且强大的网络编程工具包的。这些类之间紧密相连,共同...

    对ACE的Proactor通讯模式的全面封装

    Proactor模式是ACE库中提供的一种异步事件处理模型,它允许程序在非阻塞方式下处理I/O操作,提高了系统的并发处理能力。本封装主要目的是使开发者能够在不直接接触ACE底层细节的情况下,利用Proactor模式实现高效的...

    ACE入门详细例子

    Reactor模式是ACE实现异步I/O的关键,它允许应用程序在非阻塞模式下运行,提高了系统效率。 2. **ACE_SOCK**:这是ACE提供的用于TCP和UDP通信的基础类。ACE_SOCK封装了操作系统原生的套接字API,提供了更高级别的...

    C++ Network Programming, Volume 2: Systematic Reuse with ACE and Frameworks

    - 文档中也提到了ACE框架中其他一些组件和类的概述,如ACE_Asynch_Read_Write、ACE_Asynch_Connector等,这些都是ACE框架中用于高效地实现网络通信的工具。 对于期望深入学习和掌握C++网络编程的开发者来说,本书...

    ACE-5.6.zip ACE5.6官网源码

    8. **ACE_Time_Value**:ACE为时间值提供了专门的数据结构,支持时间间隔的计算和比较,方便在定时任务和超时处理中使用。 9. **ACE_SVC_Registry**:服务注册表允许应用程序动态地查找和使用服务,增强了系统的...

    sdf.rar_ACE

    4. **事件处理**:深入讲解ACE的事件驱动编程模型,如Reactor和Proactor模式,以及它们在处理异步事件时的优势。 5. **分布式系统**:介绍ACE如何支持分布式系统,包括命名服务、远程过程调用(RPC)和分布式对象...

    ACE在服务端开发中的应用

    在服务端开发中,ACE 的一个重要应用是使用 ACE_Proactor 模型来处理异步 I/O 操作。与传统的反应器模式不同,ACE_Proactor 遵循前摄式(Proactor)设计模式,它首先启动 I/O 操作,然后等待事件的完成,之后回调...

    ACE开发指南

    - **Acceptor-Connector框架**:在Proactor模式下实现客户端和服务端的连接管理。 - **ACE_Service_Handler**:处理连接请求的服务端处理器。 - **ACE_Asynch_Acceptor**:用于监听并异步接受新的连接请求。 - **...

    ace技术内幕示例代码

    在"ace技术内幕"这本书中,作者深入剖析了ACE的设计理念、核心组件以及使用方法。通过示例代码,读者可以更好地理解和应用这些理论知识。Linux平台的选择是因为它在服务器端开发和嵌入式系统中具有广泛的应用,同时...

    ACE开发指南(初级).

    本指南将重点介绍ACE框架的基础知识、Reactor模型、Proactor模型以及Task框架,并通过示例来展示如何使用这些模型来构建简单的服务器和客户端程序。 ##### 2.3 获取ACE ACE可以从其官方网站下载最新稳定版本。在本...

    ACE入门 编译 配置 中文版

    例如,ACE_Reactor和ACE_Proactor模式用于事件多路分解和分派;ACE_Acceptor-Connector用于支持对象的网络连接;ACE_Task用于定义任务的执行;ACE_Pipes和Filters组件用于创建可插拔和可重用的通信管道。ACE还提供了...

    Windows下采用IOCP实现的ACE的Proactor框架剖析

    **Windows下的IOCP(I/O完成端口)与ACE Proactor框架** IOCP(I/O完成端口,Input/Output Completion Port)是Windows操作系统提供的一种高效、可扩展的异步I/O模型。它允许应用程序处理大量的并发I/O操作,特别...

    ACE-5.4.1.zip

    2. **ACE_SOCK_Stream**: 基于ACE_SOCK,这个类增加了面向流的接口,支持TCP的全双工、顺序化数据传输。它提供了read和write方法,用于读写数据,同时也支持更高级别的IO操作,如异步IO和缓冲IO。 3. **ACE_Reactor...

Global site tag (gtag.js) - Google Analytics