>>>from twisted.internet import reactor
>>>reactor
<twisted.internet.selectreactor.SelectReactor object at 0x01C5BFD0>
reactor本来是一个模块,怎么变成对象了?
查看 reactor.py, 看到就一个模块方法selectreactor.install()
查看install方法:
def install():
"""Configure the twisted mainloop to be run using the select() reactor.
"""
reactor = SelectReactor()
from twisted.internet.main import installReactor
installReactor(reactor)
这里生成了一个SelectReactor的对象,似乎就是我们要找的reactor.
再查看main.py
def installReactor(reactor):
# this stuff should be common to all reactors.
import twisted.internet
import sys
assert not sys.modules.has_key('twisted.internet.reactor'), \
"reactor already installed"
twisted.internet.reactor = reactor
sys.modules['twisted.internet.reactor'] = reactor
哦, reactor已经被偷梁换柱了.
回到selectreactor.py, 看看 SelectReactor 类是个什么东西.
SelectReactor 继承父类posixbase.PosixReactorBase, 本身增加了一些方法, 似乎看不出什么.那我们就去posixbase.py看看他爸爸是干什么的.
PosixReactorBase继承两个父类_SignalReactorMixin 和 ReactorBase. 先不管其他, 寻根溯源,这两个父类都来自于internet.base模块.
好吧, 找到这里算是到头了. ReactorBase 作为 "Reactor" 的基类, 提供了reactor大部分及其重要的方法, 另一些重要的方法由_SignalReactorMixin来扩展. 下面做下详细的分析.
# 一般来说, 建立一个服务器基本遵循以下几个步骤
# 以建立一个最基本的TCP服务器为例
#1
reactor.listenTCP(PORT, Factory())
#2
reactor.run()
对于#1比较好理解, 在posixbase.PosixReactorBase中
def listenTCP(self, port, factory, backlog=50, interface=''):
"""@see: twisted.internet.interfaces.IReactorTCP.listenTCP
"""
p = tcp.Port(port, factory, backlog, interface, self)
p.startListening()
return p
# 其中包括socket的建立,绑定等等一系列手续.
# 详细内容以后再表.
对于#2, 在base.SignalReactorMixin中
def run(self, installSignalHandlers=True):
self.startRunning(installSignalHandlers=installSignalHandlers)
self.mainLoop()
def mainLoop(self):
while self._started:
try:
while self._started:
# Advance simulation time in delayed event
# processors.
self.runUntilCurrent()
t2 = self.timeout()
t = self.running and t2
self.doIteration(t)
except:
log.msg("Unexpected error in main loop.")
log.err()
else:
log.msg('Main loop terminated.')
reactor一直孜孜不倦地执行两个方法:self.runUntilCurrent和 self.doIteration. 看看这两个函数都是干什么的:
# 在ReactorBase中, runUntilCurrent方法主要做了两件事,
# 把self.threadCallQueue和self.pendingTimedCalls 里的对象执行一遍
def runUntilCurrent(self):
if self.threadCallQueue:
# Keep track of how many calls we actually make, as we're
# making them, in case another call is added to the queue
# while we're in this loop.
count = 0
total = len(self.threadCallQueue)
for (f, a, kw) in self.threadCallQueue:
try:
f(*a, **kw)
except:
log.err()
count += 1
if count == total:
break
del self.threadCallQueue[:count]
if self.threadCallQueue:
if self.waker:
self.waker.wakeUp()
# insert new delayed calls now
self._insertNewDelayedCalls()
now = self.seconds()
while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
call = heappop(self._pendingTimedCalls)
if call.cancelled:
self._cancellations-=1
continue
if call.delayed_time > 0:
call.activate_delay()
heappush(self._pendingTimedCalls, call)
continue
try:
call.called = 1
call.func(*call.args, **call.kw)
except:
log.deferr()
if hasattr(call, "creator"):
e = "\n"
e += " C: previous exception occurred in " + \
"a DelayedCall created here:\n"
e += " C:"
e += "".join(call.creator).rstrip().replace("\n","\n C:")
e += "\n"
log.msg(e)
if (self._cancellations > 50 and
self._cancellations > len(self._pendingTimedCalls) >> 1):
self._cancellations = 0
self._pendingTimedCalls = [x for x in self._pendingTimedCalls
if not x.cancelled]
heapify(self._pendingTimedCalls)
if self._justStopped:
self._justStopped = False
self.fireSystemEvent("shutdown")
# 回到SelectReactor中,查看 doSelect(doIteration)方法
# _select既是select.select函数
# self._reads和self._writes内存储的应该都是类文件操作符,比如socket..
# 再看下self._doReadOrWrite方法,会发现所有的reader/writer都执行自身
# 的 doRead/doWrite方法.
def doSelect(self, timeout):
"""
Run one iteration of the I/O monitor loop.
This will run all selectables who had input or output readiness
waiting for them.
"""
while 1:
try:
r, w, ignored = _select(self._reads.keys(),
self._writes.keys(),
[], timeout)
break
except ValueError, ve:
# Possibly a file descriptor has gone negative?
log.err()
self._preenDescriptors()
except TypeError, te:
# Something *totally* invalid (object w/o fileno, non-integral
# result) was passed
log.err()
self._preenDescriptors()
except (select.error, IOError), se:
# select(2) encountered an error
if se.args[0] in (0, 2):
# windows does this if it got an empty list
if (not self._reads) and (not self._writes):
return
else:
raise
elif se.args[0] == EINTR:
return
elif se.args[0] == EBADF:
self._preenDescriptors()
else:
# OK, I really don't know what's going on. Blow up.
raise
_drdw = self._doReadOrWrite
_logrun = log.callWithLogger
for selectables, method, fdset in ((r, "doRead", self._reads),
(w,"doWrite", self._writes)):
for selectable in selectables:
# if this was disconnected in another thread, kill it.
# ^^^^ --- what the !@#*? serious! -exarkun
if selectable not in fdset:
continue
# This for pausing input when we're not ready for more.
_logrun(selectable, _drdw, selectable, method, dict)
好吧,从上面基本可以看出, reactor在run循环里做了两件事, 执行线程队列和延迟对象队列,操作类文件对象符.
对于线程队列和延迟对象队列, 还比较好理解.
对于类文件对象的队列, reactor 是什么时候把它们加进的呢?
写道
# 插播 ReactorBase.callLater方法
# 执行callLater后reactor把DelayedCall对象存放在_newTimedCalls队列中
# 在执行ReactorBase.runUntilCurrent时,
# reactor执行了_insertNewDelayedCalls 方法
# 把_newTimedCalls内的数据存入_pendingTimedCalls队列中
def callLater(self, _seconds, _f, *args, **kw):
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
self._cancelCallLater,
self._moveCallLaterSooner,
seconds=self.seconds)
self._newTimedCalls.append(tple)
return tple
# 同样对于thread
# callFromThread方法也是把thread存入到threadCallQueue中
# 直到在runUntilCurrent中执行
def callFromThread(self, f, *args, **kw):
self.threadCallQueue.append((f, args, kw))
分享到:
相关推荐
1) 解压压缩文件. 2) 运行安装程序:dotnet_reactor_setup_4_9_0_0.exe(也可在官网下载安装,下载地址:http://www.eziriz.com/downloads/dotnet_reactor_setup_4_9_0_0.exe). 3) 使用Crack文件夹下的破解版替换...
赠送jar包:reactor-core-3.4.14.jar; 赠送原API文档:reactor-core-3.4.14-javadoc.jar; 赠送源代码:reactor-core-3.4.14-sources.jar; 赠送Maven依赖信息文件:reactor-core-3.4.14.pom; 包含翻译后的API文档...
赠送jar包:reactor-core-3.4.10.jar; 赠送原API文档:reactor-core-3.4.10-javadoc.jar; 赠送源代码:reactor-core-3.4.10-sources.jar; 赠送Maven依赖信息文件:reactor-core-3.4.10.pom; 包含翻译后的API文档...
Flux代表的是一个包含0到N个元素的异步序列,而Mono则代表一个异步的结果,这个结果可能是0个或者1个元素。文档描述了如何创建和订阅Flux或Mono,以及如何编程式地创建序列。同时,也介绍了调度器(Schedulers)和...
dotNET Reactor 使用步骤图解 dotNET Reactor 是一个功能强大的.NET 保护工具,它可以帮助开发者保护他们的.NET 程序集免受反编译和逆向工程的攻击。下面是 dotNET Reactor 的使用步骤图解: 选择文件 dotNET ...
赠送jar包:reactor-netty-http-1.0.11.jar; 赠送原API文档:reactor-netty-http-1.0.11-javadoc.jar; 赠送源代码:reactor-netty-http-1.0.11-sources.jar; 赠送Maven依赖信息文件:reactor-netty-...
NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件NET+Reactor_1279带破解文件
赠送jar包:reactor-netty-core-1.0.15.jar; 赠送原API文档:reactor-netty-core-1.0.15-javadoc.jar; 赠送源代码:reactor-netty-core-1.0.15-sources.jar; 赠送Maven依赖信息文件:reactor-netty-core-1.0.15....
.NET Reactor是一款功能强大的代码保护以及许可授权管理系统软件,主要用于开发人员保护其.NET软件程序,.NET Reactor支持所有支持.NET编译的程序开发语言。控件中国网是.NET Reactor在中国正式的授权销售商,提供...
赠送jar包:reactor-extra-3.4.6.jar; 赠送原API文档:reactor-extra-3.4.6-javadoc.jar; 赠送源代码:reactor-extra-3.4.6-sources.jar; 赠送Maven依赖信息文件:reactor-extra-3.4.6.pom; 包含翻译后的API文档...
赠送jar包:reactor-extra-3.4.5.jar; 赠送原API文档:reactor-extra-3.4.5-javadoc.jar; 赠送源代码:reactor-extra-3.4.5-sources.jar; 赠送Maven依赖信息文件:reactor-extra-3.4.5.pom; 包含翻译后的API文档...
Mono代表0个或1个值的发布者,它可以用于处理那些不一定会发生或者只产生一个结果的异步操作,例如查询数据库中的单一记录。Mono是不可变的,一旦创建,就不能改变其状态。Mono的常见操作包括`just`(创建一个包含...
java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...
Reactor是一个基于Java虚拟机(JVM)之上的异步应用基础库,其主要目的是为Java、Groovy以及其他运行在JVM上的语言提供一套构建基于事件和数据驱动应用的抽象。Reactor的核心设计是利用非阻塞方式高效地传递消息,这...
1. **Dispatcher(分发器)**:也称为Reactor,它是事件的中心调度者。当I/O事件发生时,Dispatcher负责将事件分发给对应的Handler进行处理。 2. **Handler(处理器)**:处理器负责处理特定类型的I/O事件。每个...
**Spring Reactor 深度解析** Spring Reactor 是一个强大的反应式编程库,它是 Spring Framework 的一部分,特别是在 Spring 5.0 及以后的版本中扮演着核心角色。Spring Reactor 是基于 Project Reactor,这是一个...
"Reactor 和 Proactor 模式在网络编程中的应用" Reactor 和 Proactor 模式是两种常见的事件处理模式,在网络编程中广泛应用于设计高效、可靠的并发和网络应用程序。在本文中,我们将详细介绍 Reactor 和 Proactor ...
1. **代码混淆**:这是.NET Reactor的重要特性之一,它通过重命名类、方法和变量,使得反编译后的代码变得难以理解和跟踪。这极大地降低了代码被分析和复制的可能性。 2. **资源加密**:此工具可以对应用程序中的...