论坛首页 编程语言技术论坛

使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题

浏览 3609 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-09-25  
C++
1. 使用ACE_Proactor模式,客户端实现代码如下:


void TClientHandler::open(ACE_HANDLE handle, ACE_Message_Block &msgBlock)
{  
    this->handle(handle);
    int size = 0;  
    int buf_len = sizeof(int);  
    ACE_OS::getsockopt(handle, SOL_SOCKET, SO_RCVBUF, (char*)(&size), &buf_len);
    iLog.info("Receive size is %d", size);
    size = size * 10;  
    ACE_OS::setsockopt(handle, SOL_SOCKET, SO_RCVBUF, (const char*)(&size), sizeof(int));

    size = 0;
    ACE_OS::getsockopt(handle, SOL_SOCKET, SO_SNDBUF, (char*)(&size), &buf_len);
    iLog.info("Send size is %d", size);
    size = size * 10;
    ACE_OS::setsockopt(handle, SOL_SOCKET, SO_SNDBUF, (const char*)(&size), sizeof(int));

    if (this->m_reader.open (*this) != 0)
    {
        delete this;
        return;
    }  
    if (this->m_writer.open (*this) != 0)
    {
        delete this;
        return;
    }

    ACE_Message_Block *mb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
    if (this->m_reader.read (*mb, mb->space ()) != 0)
    {
        delete this;
        return;
    }

    //  向tmc发送注册命令
    TMessage *tmsg = new (sizeof(TMessage))TMessage;
    tmsg->msgHeader.version = TVERSION_BASE;
    tmsg->msgHeader.cmdCode = CMD_LOGIN;
    tmsg->msgHeader.sendProcType = getLocalProcType();
    tmsg->msgHeader.sendProcHandle =  getLocalProcHandle();
    this->write_message(tmsg, sizeof(TMessage));

    return;
}

//异步读完成后会调用此函数  
void TClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{  
    ACE_Message_Block &mbb = result.message_block();  
    if (!result.success () || result.bytes_transferred () == 0)
    {
        // 知道remoteaddress是哪个?
        mbb.release();
        delete this;
        return;
    }
    
    ACE_Message_Block *mb = NULL;
    do 
    {
        // 用这种方法尽量保证可以clone到数据
        mb = mbb.clone();
    } while (NULL == mb);

    mbb.release();

    // 根据消息类型,将消息放入到不同的队列里面
    TMessage *tmsg = NULL;
    tmsg = (TMessage *)mb->base();

    // 每个接收的消息都记录一下日志
    //writeLog(tmsg);
    if (MT_RESPONSE == tmsg->msgHeader.msgType)
    {
        // 如果是响应消息就放入到响应队列里
        TResponseQueue::instance()->enqueue(mb);
        //iLog.info("Put message to response queue, retCode=%d", retCode);
    }
    else
    {
        // 走到这里都是请求消息或者通知消息,还有异步请求与异步响应
        if (CMD_LOGIN == tmsg->msgHeader.cmdCode)
        {
            // 如果是命令码为CMD_LOGIN的通知消息,先处理一下,再放入消息队列
            iLog.info("Login notice, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);
            // 这个消息不需要放入队列,在这里release掉
            mb->release();

            // 发异步消息取相应TMC上所有连接的进程
            TMessage *mymsg = new (sizeof(TMessage))TMessage;
            mymsg->msgHeader.version = TVERSION_BASE;
            mymsg->msgHeader.msgType = MT_ASYN_REQUEST;  // 发送异步请求消息
            mymsg->msgHeader.cmdCode = CMD_GETPROCINTMC;
            mymsg->msgHeader.sendProcType = getLocalProcType();
            mymsg->msgHeader.sendProcHandle =  getLocalProcHandle();
            this->write_message(mymsg, sizeof(TMessage));

        }
        else if (CMD_GETPROCINTMC == tmsg->msgHeader.cmdCode)
        {
            iLog.info("Process in tmc, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);

            // 这个消息不需要放入队列,在这里release掉
            mb->release();
        }
        else
        {
            // 如果是同步请求消息,异步请求,通知消息和,异步请求,异步响应,就放入TikyTaskMgr队列
            TikyTaskMgr *taskMgr = TikyAppMgr::instance()->getTaskMgr(DEFAULT_TASKMGRID);
            if (NULL != taskMgr)
            {
                taskMgr->putq(mb);
                //iLog.info("Put messaget to taskmgr, retCode=%d", retCode);
            }
            else
            {
                iLog.error("TaskMgr(%d) is not found...", DEFAULT_TASKMGRID);
                
                // 回送一个响应消息,没有找到TaskMgr
                TMessage *rspMsg = new (sizeof(TMessage))TMessage;
                rspMsg->msgHeader.msgType = MT_RESPONSE;
                rspMsg->msgHeader.sendProcType = tmsg->msgHeader.recvProcType;
                rspMsg->msgHeader.sendProcHandle = tmsg->msgHeader.recvProcHandle;
                rspMsg->msgHeader.recvProcType = tmsg->msgHeader.sendProcType;
                rspMsg->msgHeader.recvProcHandle = tmsg->msgHeader.sendProcHandle;
                rspMsg->msgHeader.sn = tmsg->msgHeader.sn;
                // 没有TaskMgr,就回送一个错误信息
                rspMsg->msgHeader.retCode = ERR_TCOM_NOTFOUNDTASKMGR;
                this->write_message(rspMsg, sizeof(TMessage));
                mb->release();
            }
        }
    }

    ACE_Message_Block *nmb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
    if (this->m_reader.read(*nmb, nmb->space()) != 0)
    {
        nmb->release();
        delete this;
        return;
    }
}

//异步写完成后会调用此函数
void TClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
    iLog.info("Handle result is %d", result.success());
    ACE_Message_Block &mb = result.message_block();
    iLog.info("Write result=%ld, trans=%d, write=%d, len=%d", result.error(), result.bytes_transferred(),
        result.bytes_to_write(), mb.length());
    TMessage *tmsg = (TMessage *)mb.base();
    // 记录一下发送消息日志
    writeLog(tmsg);
    delete tmsg;
    mb.release();
    return;
}

int TClientHandler::write_message(TMessage *tmsg, unsigned short msgLength)
{
    if (NULL == tmsg)
    {
        iLog.error("[TClientHandler::write_message] tmsg is NULL.");
        return -1;
    }

    ACE_Message_Block *mbb = new ACE_Message_Block((const char *)tmsg, msgLength);
    // 要设置一下wr_ptr的指针,要不mbb->length()返回的会是0
    mbb->wr_ptr(msgLength);
    int retCode = this->m_writer.write(*mbb, mbb->length());
    if (retCode != 0)
    {
        iLog.error("Send msg failure, lasterror=%d, length=%d, retCode=%d", ACE_OS::last_error(), mbb->length(), retCode);
        delete tmsg;
        mbb->release();
        delete this;
    }
    return retCode;
}


写一个循环不间断调用write_message发送数据,会看到发送的数据,会用一个TCP报文发出去,不知道有没有人遇到相同的问题?
   发表时间:2011-09-28  
找到原因了,TCP是基于流的,所以tcp在发送数据时不是马上发送,而是稍微等待一点时间才发送,UDP是基于消息的,因此使用TCP通讯,需要应用自己来解决边界问题!
0 请登录后投票
论坛首页 编程语言技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics