`
angeloce
  • 浏览: 71774 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

reactor解剖术(1)

阅读更多

        

>>>from twisted.internet import reactor
>>>reactor
<twisted.internet.selectreactor.SelectReactor object at 0x01C5BFD0>

reactor本来是一个模块,怎么变成对象了?

 

查看 reactor.py, 看到就一个模块方法selectreactor.install()

 

查看install方法:

 

def install():
    """Configure the twisted mainloop to be run using the select() reactor.
    """
    reactor = SelectReactor()
    from twisted.internet.main import installReactor
    installReactor(reactor)

 

这里生成了一个SelectReactor的对象,似乎就是我们要找的reactor.

 

再查看main.py

 

def installReactor(reactor):
    # this stuff should be common to all reactors.
    import twisted.internet
    import sys
    assert not sys.modules.has_key('twisted.internet.reactor'), \
           "reactor already installed"
    twisted.internet.reactor = reactor
    sys.modules['twisted.internet.reactor'] = reactor

 

 

哦, reactor已经被偷梁换柱了.

 

回到selectreactor.py, 看看 SelectReactor 类是个什么东西.

 

SelectReactor 继承父类posixbase.PosixReactorBase, 本身增加了一些方法, 似乎看不出什么.那我们就去posixbase.py看看他爸爸是干什么的.

 

PosixReactorBase继承两个父类_SignalReactorMixin 和 ReactorBase.  先不管其他,  寻根溯源,这两个父类都来自于internet.base模块.

 

好吧, 找到这里算是到头了. ReactorBase 作为 "Reactor" 的基类, 提供了reactor大部分及其重要的方法, 另一些重要的方法由_SignalReactorMixin来扩展. 下面做下详细的分析.

 

# 一般来说, 建立一个服务器基本遵循以下几个步骤
# 以建立一个最基本的TCP服务器为例


#1
reactor.listenTCP(PORT, Factory())

#2
reactor.run()

 

对于#1比较好理解, 在posixbase.PosixReactorBase中

def listenTCP(self, port, factory, backlog=50, interface=''):
        """@see: twisted.internet.interfaces.IReactorTCP.listenTCP
        """
        p = tcp.Port(port, factory, backlog, interface, self)
        p.startListening()
        return p

# 其中包括socket的建立,绑定等等一系列手续.
# 详细内容以后再表.

 

 

对于#2, 在base.SignalReactorMixin中

def run(self, installSignalHandlers=True):
        self.startRunning(installSignalHandlers=installSignalHandlers)
        self.mainLoop()


def mainLoop(self):
        while self._started:
            try:
                while self._started:
                    # Advance simulation time in delayed event
                    # processors.
                    self.runUntilCurrent()
                    t2 = self.timeout()
                    t = self.running and t2
                    self.doIteration(t)
            except:
                log.msg("Unexpected error in main loop.")
                log.err()
            else:
                log.msg('Main loop terminated.')

  

 

reactor一直孜孜不倦地执行两个方法:self.runUntilCurrent和 self.doIteration. 看看这两个函数都是干什么的:

 

# 在ReactorBase中, runUntilCurrent方法主要做了两件事,
# 把self.threadCallQueue和self.pendingTimedCalls 里的对象执行一遍
def runUntilCurrent(self):
        if self.threadCallQueue:
            # Keep track of how many calls we actually make, as we're
            # making them, in case another call is added to the queue
            # while we're in this loop.
            count = 0
            total = len(self.threadCallQueue)
            for (f, a, kw) in self.threadCallQueue:
                try:
                    f(*a, **kw)
                except:
                    log.err()
                count += 1
                if count == total:
                    break
            del self.threadCallQueue[:count]
            if self.threadCallQueue:
                if self.waker:
                    self.waker.wakeUp()

        # insert new delayed calls now
        self._insertNewDelayedCalls()

        now = self.seconds()
        while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
            call = heappop(self._pendingTimedCalls)
            if call.cancelled:
                self._cancellations-=1
                continue

            if call.delayed_time > 0:
                call.activate_delay()
                heappush(self._pendingTimedCalls, call)
                continue

            try:
                call.called = 1
                call.func(*call.args, **call.kw)
            except:
                log.deferr()
                if hasattr(call, "creator"):
                    e = "\n"
                    e += " C: previous exception occurred in " + \
                         "a DelayedCall created here:\n"
                    e += " C:"
                    e += "".join(call.creator).rstrip().replace("\n","\n C:")
                    e += "\n"
                    log.msg(e)


        if (self._cancellations > 50 and
             self._cancellations > len(self._pendingTimedCalls) >> 1):
            self._cancellations = 0
            self._pendingTimedCalls = [x for x in self._pendingTimedCalls
                                       if not x.cancelled]
            heapify(self._pendingTimedCalls)

        if self._justStopped:
            self._justStopped = False
            self.fireSystemEvent("shutdown")

 

 

# 回到SelectReactor中,查看 doSelect(doIteration)方法
# _select既是select.select函数
# self._reads和self._writes内存储的应该都是类文件操作符,比如socket..
# 再看下self._doReadOrWrite方法,会发现所有的reader/writer都执行自身
# 的 doRead/doWrite方法.

def doSelect(self, timeout):
        """
        Run one iteration of the I/O monitor loop.

        This will run all selectables who had input or output readiness
        waiting for them.
        """
        while 1:
            try:
                r, w, ignored = _select(self._reads.keys(),
                                        self._writes.keys(),
                                        [], timeout)
                break
            except ValueError, ve:
                # Possibly a file descriptor has gone negative?
                log.err()
                self._preenDescriptors()
            except TypeError, te:
                # Something *totally* invalid (object w/o fileno, non-integral
                # result) was passed
                log.err()
                self._preenDescriptors()
            except (select.error, IOError), se:
                # select(2) encountered an error
                if se.args[0] in (0, 2):
                    # windows does this if it got an empty list
                    if (not self._reads) and (not self._writes):
                        return
                    else:
                        raise
                elif se.args[0] == EINTR:
                    return
                elif se.args[0] == EBADF:
                    self._preenDescriptors()
                else:
                    # OK, I really don't know what's going on.  Blow up.
                    raise
        _drdw = self._doReadOrWrite
        _logrun = log.callWithLogger
        for selectables, method, fdset in ((r, "doRead", self._reads),
                                           (w,"doWrite", self._writes)):
            for selectable in selectables:
                # if this was disconnected in another thread, kill it.
                # ^^^^ --- what the !@#*?  serious!  -exarkun
                if selectable not in fdset:
                    continue
                # This for pausing input when we're not ready for more.
                _logrun(selectable, _drdw, selectable, method, dict)

 

好吧,从上面基本可以看出, reactor在run循环里做了两件事, 执行线程队列和延迟对象队列,操作类文件对象符.

 

对于线程队列和延迟对象队列, 还比较好理解.

 

对于类文件对象的队列, reactor 是什么时候把它们加进的呢?

 

写道
# 插播 ReactorBase.callLater方法
# 执行callLater后reactor把DelayedCall对象存放在_newTimedCalls队列中
# 在执行ReactorBase.runUntilCurrent时,
# reactor执行了_insertNewDelayedCalls 方法
# 把_newTimedCalls内的数据存入_pendingTimedCalls队列中
def callLater(self, _seconds, _f, *args, **kw):
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
self._cancelCallLater,
self._moveCallLaterSooner,
seconds=self.seconds)
self._newTimedCalls.append(tple)
return tple


# 同样对于thread
# callFromThread方法也是把thread存入到threadCallQueue中
# 直到在runUntilCurrent中执行

def callFromThread(self, f, *args, **kw):
self.threadCallQueue.append((f, args, kw))

 

 

分享到:
评论

相关推荐

    .NET Reactor 4.9 破解版

    1) 解压压缩文件. 2) 运行安装程序:dotnet_reactor_setup_4_9_0_0.exe(也可在官网下载安装,下载地址:http://www.eziriz.com/downloads/dotnet_reactor_setup_4_9_0_0.exe). 3) 使用Crack文件夹下的破解版替换...

    reactor-core-3.4.14-API文档-中文版.zip

    赠送jar包:reactor-core-3.4.14.jar; 赠送原API文档:reactor-core-3.4.14-javadoc.jar; 赠送源代码:reactor-core-3.4.14-sources.jar; 赠送Maven依赖信息文件:reactor-core-3.4.14.pom; 包含翻译后的API文档...

    reactor-core-3.4.10-API文档-中文版.zip

    赠送jar包:reactor-core-3.4.10.jar; 赠送原API文档:reactor-core-3.4.10-javadoc.jar; 赠送源代码:reactor-core-3.4.10-sources.jar; 赠送Maven依赖信息文件:reactor-core-3.4.10.pom; 包含翻译后的API文档...

    Reactor 3中文帮助文档

    Flux代表的是一个包含0到N个元素的异步序列,而Mono则代表一个异步的结果,这个结果可能是0个或者1个元素。文档描述了如何创建和订阅Flux或Mono,以及如何编程式地创建序列。同时,也介绍了调度器(Schedulers)和...

    dotNET Reactor 使用步骤图解

    dotNET Reactor 使用步骤图解 dotNET Reactor 是一个功能强大的.NET 保护工具,它可以帮助开发者保护他们的.NET 程序集免受反编译和逆向工程的攻击。下面是 dotNET Reactor 的使用步骤图解: 选择文件 dotNET ...

    reactor-netty-http-1.0.11-API文档-中文版.zip

    赠送jar包:reactor-netty-http-1.0.11.jar; 赠送原API文档:reactor-netty-http-1.0.11-javadoc.jar; 赠送源代码:reactor-netty-http-1.0.11-sources.jar; 赠送Maven依赖信息文件:reactor-netty-...

    NET+Reactor_1279带破解文件

    NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件

    reactor-netty-core-1.0.15-API文档-中文版.zip

    赠送jar包:reactor-netty-core-1.0.15.jar; 赠送原API文档:reactor-netty-core-1.0.15-javadoc.jar; 赠送源代码:reactor-netty-core-1.0.15-sources.jar; 赠送Maven依赖信息文件:reactor-netty-core-1.0.15....

    NET Reactor 5.0破解版

    .NET Reactor是一款功能强大的代码保护以及许可授权管理系统软件,主要用于开发人员保护其.NET软件程序,.NET Reactor支持所有支持.NET编译的程序开发语言。控件中国网是.NET Reactor在中国正式的授权销售商,提供...

    reactor-extra-3.4.6-API文档-中文版.zip

    赠送jar包:reactor-extra-3.4.6.jar; 赠送原API文档:reactor-extra-3.4.6-javadoc.jar; 赠送源代码:reactor-extra-3.4.6-sources.jar; 赠送Maven依赖信息文件:reactor-extra-3.4.6.pom; 包含翻译后的API文档...

    reactor-extra-3.4.5-API文档-中文版.zip

    赠送jar包:reactor-extra-3.4.5.jar; 赠送原API文档:reactor-extra-3.4.5-javadoc.jar; 赠送源代码:reactor-extra-3.4.5-sources.jar; 赠送Maven依赖信息文件:reactor-extra-3.4.5.pom; 包含翻译后的API文档...

    Reactor教程Mono和Flux例子

    Mono代表0个或1个值的发布者,它可以用于处理那些不一定会发生或者只产生一个结果的异步操作,例如查询数据库中的单一记录。Mono是不可变的,一旦创建,就不能改变其状态。Mono的常见操作包括`just`(创建一个包含...

    java基于NIO实现Reactor模型源码.zip

    java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...

    reactor指南中文版

    Reactor是一个基于Java虚拟机(JVM)之上的异步应用基础库,其主要目的是为Java、Groovy以及其他运行在JVM上的语言提供一套构建基于事件和数据驱动应用的抽象。Reactor的核心设计是利用非阻塞方式高效地传递消息,这...

    Reactor Pattern (一)

    1. **Dispatcher(分发器)**:也称为Reactor,它是事件的中心调度者。当I/O事件发生时,Dispatcher负责将事件分发给对应的Handler进行处理。 2. **Handler(处理器)**:处理器负责处理特定类型的I/O事件。每个...

    spring-reactor jars

    **Spring Reactor 深度解析** Spring Reactor 是一个强大的反应式编程库,它是 Spring Framework 的一部分,特别是在 Spring 5.0 及以后的版本中扮演着核心角色。Spring Reactor 是基于 Project Reactor,这是一个...

    tpd_reactor_proactor.pdf

    "Reactor 和 Proactor 模式在网络编程中的应用" Reactor 和 Proactor 模式是两种常见的事件处理模式,在网络编程中广泛应用于设计高效、可靠的并发和网络应用程序。在本文中,我们将详细介绍 Reactor 和 Proactor ...

    dotnet_reactor_6_5_0_0.rar

    1. **代码混淆**:这是.NET Reactor的重要特性之一,它通过重命名类、方法和变量,使得反编译后的代码变得难以理解和跟踪。这极大地降低了代码被分析和复制的可能性。 2. **资源加密**:此工具可以对应用程序中的...

Global site tag (gtag.js) - Google Analytics