`
angeloce
  • 浏览: 71762 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

reactor解剖术(2)

阅读更多

 

观看上章的代码, reactor似乎没有主动加入过 reader/writer, reactor如何操作socket的呢?

重新想象reactor在run之前还做过什么?

对了, 连接/建立连接!

 

就如reactor.listenTCP

 

def listenTCP(self, port, factory, backlog=50, interface=''):
        p = tcp.Port(port, factory, backlog, interface, self)
        p.startListening()
        return p

 

看看tcp.Port的设计

 

tcp.Port继承于base.BasePort 和 tcp._SocketCloser,

而base.BasePort 继承于abstract.FileDescriptor, 一个抽象的文件操作符类

tcp.Port实例化时没有做太多动作, 我们聚焦在方法 startListening 上

 

# tcp.Port.startListening 生成并绑定了一个socket
# 也没有做什么过多的动作, 直接看看最下面的startReading

def startListening(self):
        try:
            skt = self.createInternetSocket()
            skt.bind((self.interface, self.port))
        except socket.error, le:
            raise CannotListenError, (self.interface, self.port, le)

        # Make sure that if we listened on port 0, we update that to
        # reflect what the OS actually assigned us.
        self._realPortNumber = skt.getsockname()[1]

        log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))

        # The order of the next 6 lines is kind of bizarre.  If no one
        # can explain it, perhaps we should re-arrange them.
        self.factory.doStart()
        skt.listen(self.backlog)
        self.connected = True
        self.socket = skt
        self.fileno = self.socket.fileno
        self.numberAccepts = 100

        self.startReading()

 

# 一直找到abstract.FileDescriptor.startReading
# 执行了reactor.addReader

def startReading(self):
        """Start waiting for read availability.
        """
        self.reactor.addReader(self)

# selectreactor.SelectReactor.addReader指明了
# 一个tcp.Port对象被作为reader加入到了reactor的reads队列中
def addReader(self, reader):
        """
        Add a FileDescriptor for notification of data available to read.
        """
        self._reads[reader] = 1

 

原来在这里, 在reactor.listenTCP时候就被加入到了reader队列中.

赶紧回头看看, 在 selectreactor.SelectReactor.doSelect中,如果一个类文件操作符状态改变了,会执行其doRead/doWriter方法.那去看看作为reader的tcp.Port的doRead方法.

 

 

# tcp.Port的socket接受了一个连接,
# 并执行了self.factory.buildProtocol方法生成一个portocol
# 通过self.transport生成了一个tcp.Server对象

def doRead(self):
        try:
            if platformType == "posix":
                numAccepts = self.numberAccepts
            else:
                # win32 event loop breaks if we do more than one accept()
                # in an iteration of the event loop.
                numAccepts = 1
            for i in range(numAccepts):
                # we need this so we can deal with a factory's buildProtocol
                # calling our loseConnection
                if self.disconnecting:
                    return
                try:
                    skt, addr = self.socket.accept()
                except socket.error, e:
                    if e.args[0] in (EWOULDBLOCK, EAGAIN):
                        self.numberAccepts = i
                        break
                    elif e.args[0] == EPERM:
                        # Netfilter on Linux may have rejected the
                        # connection, but we get told to try to accept()
                        # anyway.
                        continue
                    elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
                        log.msg("Could not accept new connection (%s)" % (
                            errorcode[e.args[0]],))
                        break
                    raise

                protocol = self.factory.buildProtocol(self._buildAddr(addr))
                if protocol is None:
                    skt.close()
                    continue
                s = self.sessionno
                self.sessionno = s+1
                transport = self.transport(skt, protocol, addr, self, s, self.reactor)
                transport = self._preMakeConnection(transport)
                protocol.makeConnection(transport)
            else:
                self.numberAccepts = self.numberAccepts+20
        except:
            log.deferr()

 

 

虽然还有点迷糊, 不过知道了protocol对象产生于此处.那这个产生的transport实例具体作用是什么呢?

先看下 protocol.makeConnection

 

# protocol.BaseProtocol
def makeConnection(self, transport):
        self.connected = 1
        self.transport = transport
        self.connectionMade()

  

 

看到了一个熟悉的方法connectionMade!

protocol的三个事件方法 connectionMade, dataReceived, connectionLost是protocol最重要的三个方法了.

其一出现了, 剩下的两个是在何处被触发的呢?

 

先不急, 先看看transport 是怎么回事:

 

tcp.Server 来自于 父类 tcp.Connection. 而Connection继承于abstract.FileDescriptor,又是一个类文件符.

tcp.Server实例时还是做了点小动作的

 

# tcp.Server
def __init__(self, sock, protocol, client, server, sessionno, reactor):
        Connection.__init__(self, sock, protocol, reactor)
        self.server = server
        self.client = client
        self.sessionno = sessionno
        self.hostname = client[0]
        self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,
                                    sessionno,
                                    self.hostname)
        self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
                                          self.sessionno,
                                          self.server._realPortNumber)
        self.startReading()
        self.connected = 1

 

self.startReading从 abstract.FileDescriptor上知晓是把 该实例作为reader加入到reactor队列中的.

那我们就看看tcp.Server的doRead方法

 

# tcp.Connection
def doRead(self):
        """Calls self.protocol.dataReceived with all available data.

        This reads up to self.bufferSize bytes of data from its socket, then
        calls self.dataReceived(data) to process it.  If the connection is not
        lost through an error in the physical recv(), this function will return
        the result of the dataReceived call.
        """
        try:
            data = self.socket.recv(self.bufferSize)
        except socket.error, se:
            if se.args[0] == EWOULDBLOCK:
                return
            else:
                return main.CONNECTION_LOST
        if not data:
            return main.CONNECTION_DONE
        return self.protocol.dataReceived(data)

 

眼前一亮, dataReceived方法!

分享到:
评论
1 楼 yukaizhao 2013-09-10  
请教个问题,下面例子中的client调用了reactor.run()之后线程就被block在这一句了,这是设计使然。

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
 
 
"""
An example client. Run simpleserv.py first before running this.
"""
 
from twisted.internet import reactor, protocol
 
 
# a client protocol
 
class EchoClient(protocol.Protocol):
    """Once connected, send a message, then print the result."""
     
    def connectionMade(self):
        self.transport.write("hello, world!")
     
    def dataReceived(self, data):
        "As soon as any data is received, write it back."
        print "Server said:", data
        self.transport.loseConnection()
     
    def connectionLost(self, reason):
        print "connection lost"
 
class EchoFactory(protocol.ClientFactory):
    protocol = EchoClient
 
    def clientConnectionFailed(self, connector, reason):
        print "Connection failed - goodbye!"
        reactor.stop()
     
    def clientConnectionLost(self, connector, reason):
        print "Connection lost - goodbye!"
        reactor.stop()
 
 
# this connects the protocol to a server runing on port 8000
def main():
    f = EchoFactory()
    reactor.connectTCP("localhost", 8000, f)
    reactor.run()
 
# this only runs if the module was *not* imported
if __name__ == '__main__':
    main()

我有两个疑问:

1. 在这种编程模型下,我如何再次连接"localhost", 8000并发起另一个请求呢?
2. 这个client连接到服务器之后发送了一条消息,假如我还想发送第二条消息,该如何发呢?

谢谢!

相关推荐

    .NET Reactor 4.9 破解版

    2) 运行安装程序:dotnet_reactor_setup_4_9_0_0.exe(也可在官网下载安装,下载地址:http://www.eziriz.com/downloads/dotnet_reactor_setup_4_9_0_0.exe). 3) 使用Crack文件夹下的破解版替换安装目录下的"dotNET_...

    reactor-core-3.4.14-API文档-中文版.zip

    赠送jar包:reactor-core-3.4.14.jar; 赠送原API文档:reactor-core-3.4.14-javadoc.jar; 赠送源代码:reactor-core-3.4.14-sources.jar; 赠送Maven依赖信息文件:reactor-core-3.4.14.pom; 包含翻译后的API文档...

    reactor-core-3.4.10-API文档-中文版.zip

    赠送jar包:reactor-core-3.4.10.jar; 赠送原API文档:reactor-core-3.4.10-javadoc.jar; 赠送源代码:reactor-core-3.4.10-sources.jar; 赠送Maven依赖信息文件:reactor-core-3.4.10.pom; 包含翻译后的API文档...

    dotNET Reactor 使用步骤图解

    dotNET Reactor 使用步骤图解 dotNET Reactor 是一个功能强大的.NET 保护工具,它可以帮助开发者保护他们的.NET 程序集免受反编译和逆向工程的攻击。下面是 dotNET Reactor 的使用步骤图解: 选择文件 dotNET ...

    Reactor 3中文帮助文档

    在React框架中,Reactor是其核心库之一,专门用于构建响应式应用程序。本篇文章将详细解读Reactor 3的中文帮助文档,帮助用户理解非阻塞响应式框架的使用方法及其原理,尤其是Spring WebFlux底层实现的相关知识。 ...

    reactor-extra-3.4.5-API文档-中文版.zip

    赠送jar包:reactor-extra-3.4.5.jar; 赠送原API文档:reactor-extra-3.4.5-javadoc.jar; 赠送源代码:reactor-extra-3.4.5-sources.jar; 赠送Maven依赖信息文件:reactor-extra-3.4.5.pom; 包含翻译后的API文档...

    reactor-netty-http-1.0.11-API文档-中文版.zip

    赠送jar包:reactor-netty-http-1.0.11.jar; 赠送原API文档:reactor-netty-http-1.0.11-javadoc.jar; 赠送源代码:reactor-netty-http-1.0.11-sources.jar; 赠送Maven依赖信息文件:reactor-netty-...

    NET+Reactor_1279带破解文件

    NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件

    reactor-netty-core-1.0.15-API文档-中文版.zip

    赠送jar包:reactor-netty-core-1.0.15.jar; 赠送原API文档:reactor-netty-core-1.0.15-javadoc.jar; 赠送源代码:reactor-netty-core-1.0.15-sources.jar; 赠送Maven依赖信息文件:reactor-netty-core-1.0.15....

    NET Reactor 5.0破解版

    .NET Reactor是一款功能强大的代码保护以及许可授权管理系统软件,主要用于开发人员保护其.NET软件程序,.NET Reactor支持所有支持.NET编译的程序开发语言。控件中国网是.NET Reactor在中国正式的授权销售商,提供...

    reactor-extra-3.4.6-API文档-中文版.zip

    赠送jar包:reactor-extra-3.4.6.jar; 赠送原API文档:reactor-extra-3.4.6-javadoc.jar; 赠送源代码:reactor-extra-3.4.6-sources.jar; 赠送Maven依赖信息文件:reactor-extra-3.4.6.pom; 包含翻译后的API文档...

    Reactor教程Mono和Flux例子

    Reactor是Spring框架的一部分,它是一个响应式编程库,用于构建非阻塞、高并发、事件驱动的应用程序。在Java生态系统中,Reactor是实现 Reactive Streams 规范的一个关键工具,它提供了Mono和Flux两种核心类型来处理...

    java基于NIO实现Reactor模型源码.zip

    java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...

    reactor指南中文版

    Reactor是一个基于Java虚拟机(JVM)之上的异步应用基础库,其主要目的是为Java、Groovy以及其他运行在JVM上的语言提供一套构建基于事件和数据驱动应用的抽象。Reactor的核心设计是利用非阻塞方式高效地传递消息,这...

    Reactor Pattern (一)

    **Reactor模式(一)** Reactor模式是一种事件驱动的设计模式,它主要用于处理并发I/O操作,通过将I/O事件的处理与事件处理程序解耦,实现高效的异步处理。在高并发环境下,Reactor模式可以显著提升系统性能,因为...

    tpd_reactor_proactor.pdf

    "Reactor 和 Proactor 模式在网络编程中的应用" Reactor 和 Proactor 模式是两种常见的事件处理模式,在网络编程中广泛应用于设计高效、可靠的并发和网络应用程序。在本文中,我们将详细介绍 Reactor 和 Proactor ...

    dotnet_reactor_6_5_0_0.rar

    2. **资源加密**:此工具可以对应用程序中的资源(如图片、音频、XML文件等)进行加密,确保它们在未经授权的情况下无法访问。 3. **阻止调试**:.NET Reactor 6.5 可以检测并阻止调试器的附加,防止恶意用户通过...

    Reactor 3 参考文档

    Reactor 3 是其最新版本,它遵循了 Reactive Streams 规范,与Java 8及更高版本紧密集成,并在Spring Boot 2.x中被广泛采用。 ### 1. 反应式编程基础 反应式编程是一种编程范式,强调数据流和变化传播。在Reactor中...

Global site tag (gtag.js) - Google Analytics