- 浏览: 667761 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
sztime:
可以在文本框上绑定事件来禁用回车键, 我就是这样做的.在IE中 ...
form 回车自动提交问题 -
damoqiongqiu:
非常好的文章,很透彻不过有一句话小僧腆着脸补充一下:“1111 ...
为什么要用补码来做存储 -
wuyizhong:
原来如此啊。
form 回车自动提交问题 -
luliangy:
谢楼主~!
用C语言扩展Python的功能 -
kwong:
很有用,谢谢
火狐和IE 对css 样式解释的差异
Stackless Python 并发式编程介绍
作者: Grant Olson 翻译: Lych 校对: gashero
邮箱: olsongt@verizon.net 邮箱: lych77@gmail.com 邮箱: harry.python@gmail.com
时间: 2006年7月7日 时间: 2007年9月23日
原文地址
目录
1 介绍
1.1 为什么要用stackless
1.1.1 现实世界就是并发的
1.1.2 并发可能是(仅仅可能是)下一个重要的编程范式
1.2 安装Stackless
2 Stackless起步
2.1 微进程(tasklet)
2.2 调度器(scheduler)
2.3 通道(channel)
2.4 总结
3 协程(coroutine)
3.1 子例程的问题
3.1.1 堆栈
3.1.2 那为什么要使用堆栈?
3.2 走进协程
3.3 总结
4 轻量级线程
4.1 Hackysack的模拟
4.2 游戏的传统线程版本
4.3 Stackless
4.4 总结
5 数据流
5.1 工厂
5.2 “普通”版本
5.2.1 分析
5.3 走进数据流
5.4 代码的stackless版本
5.4.1 分析
5.4.1.1 休眠功能
5.4.1.2 类
5.5 那我们获得了什么?
5.6 “推”数据
5.6.1 半加器
6 角色
6.1 杀手机器人!
6.1.1 角色基类
6.1.2 消息的格式
6.1.3 “世界”类
6.1.4 一个简单的机器人
6.1.5 蹊径:pyGame
6.1.6 第一轮代码
6.2 又一蹊径: 机理的模拟
6.2.1 角色属性
6.2.2 碰撞检测
6.2.3 恒定的时间
6.2.4 伤害值、生命值和死亡
6.2.5 第二轮代码
6.3 回到角色: 让我们变得疯狂
6.3.1 爆炸
6.3.2 埋雷机器人
6.3.3 建造台
6.3.4 最终的模拟
6.4 总结
7 完整代码列表
7.1 pingpong.py - 递归的ping pong示例
7.2 pingpong_stackless.py - stackless的ping pong示例
7.3 hackysackthreaded.py - 基于操作系统线程的hackysack示例
7.4 hackysackstackless.py - stackless的hackysack示例
7.5 assemblyline.py - “普通”的生产线示例
7.6 assemblyline-stackless.py - stackless的生产线示例
7.7 digitalCircuit.py - stackless数字电路
7.8 actors.py - 第一个角色示例
7.9 actors2.py - 第二个角色示例
7.10 actors3.py - 第三个角色示例
1 介绍
1.1 为什么要用stackless
摘自stackless网站 http://www.stackless.com/:
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,
并避免传统线程所带来的性能与复杂度问题。Stackless为Python带来的微线程扩展,是一种低开销、轻量
级的便利工具,如果使用得当,可以获益如下:
+ 改进程序结构
+ 增进代码可读性
+ 提高编程人员生产力
以上是Stackless Python很简明的释义,但其对我们意义何在?——就在于Stackless提供的并发建模工具,比目前其它大多数传统编程语言所提供的,都更加易用:不仅是Python自身,也包括Java、C++,以及其它。尽管还有其他一些语言提供并发特性,可它们要么是主要用于学术研究的(如 Mozart/Oz),要么是罕为使用、或用于特殊目的的专业语言(如Erlang)。而使用stackless,你将会在Python本身的所有优势之上,在一个(但愿)你已经很熟悉的环境中,再获得并发的特性。
这自然引出了个问题:为什么要并发?
1.1.1 现实世界就是并发的
现实世界就是“并发”的,它是由一群事物(或“角色”)所组成,而这些事物以一种对彼此所知有限的、松散耦合的方式相互作用。传说中面向对象编程有一个好处,就是对象能够对现实的世界进行模拟。这在一定程度上是正确的,面向对象编程很好地模拟了对象个体,但对于这些对象个体之间的交互,却无法以一种理想的方式来表现。例如,如下代码实例,有什么问题?
def familyTacoNight():
husband.eat(dinner)
wife.eat(dinner)
son.eat(dinner)
daughter.eat(dinner)
第一印象,没问题。但是,上例中存在一个微妙的安排:所有事件是次序发生的,即:直到丈夫吃完饭,妻子才开始吃;儿子则一直等到母亲吃完才吃;而女儿则是最后一个。在现实世界中,哪怕是丈夫还堵车在路上,妻子、儿子和女儿仍然可以该吃就吃,而要在上例中的话,他们只能饿死了——甚至更糟:永远没有人会知道这件事,因为他们永远不会有机会抛出一个异常来通知这个世界!
1.1.2 并发可能是(仅仅可能是)下一个重要的编程范式
我个人相信,并发将是软件世界里的下一个重要范式。随着程序变得更加复杂和耗费资源,我们已经不能指望摩尔定律来每年给我们提供更快的CPU了,当前,日用个人计算机的性能提升来自于多核与多CPU机。一旦单个CPU的性能达到极限,软件开发者们将不得不转向分布式模型,靠多台计算机的互相作用来建立强大的应用(想想GooglePlex)。为了取得多核机和分布式编程的优势,并发将很快成为做事情的方式的事实标准。
1.2 安装Stackless
安装Stackless的细节可以在其网站上找到。现在Linux用户可以通过Subversion取得源代码并编译;而对于Windows用户,则有一个.zip文件供使用,需要将其解压到现有的Python安装目录中。接下来,本教程假设Stackless Python已经安装好了,可以工作,并且假设你对Python语言本身有基本的了解。
2 Stackless起步
本章简要介绍了Stackless的基本概念,后面章节将基于这些基础,来展示更加实用的功能。
2.1 微进程(tasklet)
微进程是stackless的基本构成单元,你可以通过提供任一个Python可调用对象(通常为函数或类的方法)来建立它,这将建立一个微进程并将其添加到调度器。这是一个快速演示:
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> def print_x(x):
... print x
...
>>> stackless.tasklet(print_x)('one')
<stackless.tasklet object at 0x00A45870>
>>> stackless.tasklet(print_x)('two')
<stackless.tasklet object at 0x00A45A30>
>>> stackless.tasklet(print_x)('three')
<stackless.tasklet object at 0x00A45AB0>
>>>
>>> stackless.run()
one
two
three
>>>
注意,微进程将排起队来,并不运行,直到调用 stackless.run() 。
2.2 调度器(scheduler)
调度器控制各个微进程运行的顺序。如果刚刚建立了一组微进程,它们将按照建立的顺序来执行。在实用中,一般会建立一组可以再次被调度的微进程,好让每个都有轮次机会。一个快速演示:
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> def print_three_times(x):
... print "1:", x
... stackless.schedule()
... print "2:", x
... stackless.schedule()
... print "3:", x
... stackless.schedule()
...
>>>
>>> stackless.tasklet(print_three_times)('first')
<stackless.tasklet object at 0x00A45870>
>>> stackless.tasklet(print_three_times)('second')
<stackless.tasklet object at 0x00A45A30>
>>> stackless.tasklet(print_three_times)('third')
<stackless.tasklet object at 0x00A45AB0>
>>>
>>> stackless.run()
1: first
1: second
1: third
2: first
2: second
2: third
3: first
3: second
3: third
>>>
注意:当调用 stackless.schedule() 的时候,当前活动微进程将暂停执行,并将自身重新插入到调度器队列的末尾,好让下一个微进程被执行。一旦在它前面的所有其他微进程都运行过了,它将从上次停止的地方继续开始运行。这个过程会持续,直到所有的活动微进程都完成了运行过程。这就是使用stackless达到合作式多任务的方式。
2.3 通道(channel)
通道使得微进程之间的信息传递成为可能。它做到了两件事:
能够在微进程之间交换信息。
能够控制运行的流程。
又一个快速演示:
C:\>c:\python24\python
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> channel = stackless.channel()
>>>
>>> def receiving_tasklet():
... print "Recieving tasklet started"
... print channel.receive()
... print "Receiving tasklet finished"
...
>>> def sending_tasklet():
... print "Sending tasklet started"
... channel.send("send from sending_tasklet")
... print "sending tasklet finished"
...
>>> def another_tasklet():
... print "Just another tasklet in the scheduler"
...
>>> stackless.tasklet(receiving_tasklet)()
<stackless.tasklet object at 0x00A45B30>
>>> stackless.tasklet(sending_tasklet)()
<stackless.tasklet object at 0x00A45B70>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Recieving tasklet started
Sending tasklet started
send from sending_tasklet
Receiving tasklet finished
Just another tasklet in the scheduler
sending tasklet finished
>>>
>>>
接收的微进程调用 channel.receive() 的时候,便阻塞住,这意味着该微进程暂停执行,直到有信息从这个通道送过来。除了往这个通道发送信息以外,没有其他任何方式可以让这个微进程恢复运行。
若有其他微进程向这个通道发送了信息,则不管当前的调度到了哪里,这个接收的微进程都立即恢复执行;而发送信息的微进程则被转移到调度列表的末尾,就像调用了 stackless.schedule() 一样。
同样注意,发送信息的时候,若当时没有微进程正在这个通道上接收,也会使当前微进程阻塞:
>>>
>>> stackless.tasklet(sending_tasklet)()
<stackless.tasklet object at 0x00A45B70>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Sending tasklet started
Just another tasklet in the scheduler
>>>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45B30>
>>> stackless.run()
Just another tasklet in the scheduler
>>>
>>> #最后,加入接收的微进程
...
>>> stackless.tasklet(receiving_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Recieving tasklet started
send from sending_tasklet
Receiving tasklet finished
sending tasklet finished
>>>
发送信息的微进程,只有在成功地将数据发送到了另一个微进程之后,才会重新被插入到调度器中。
2.4 总结
以上涵盖了stackless的大部分功能。似乎不多是吧?——我们只使用了少许对象,和大约四五个函数调用,来进行操作。但是,使用这种简单的API作为基本构建单元,我们可以开始做一些真正有趣的事情。
3 协程(coroutine)
3.1 子例程的问题
大多数传统编程语言具有子例程的概念。一个子例程被另一个例程(可能还是其它某个例程的子例程)所调用,或返回一个结果,或不返回结果。从定义上说,一个子例程是从属于其调用者的。
见下例:
def ping():
print "PING"
pong()
def pong():
print "PONG"
ping()
ping()
有经验的编程者会看到这个程序的问题所在:它导致了堆栈溢出。如果运行这个程序,它将显示一大堆讨厌的跟踪信息,来指出堆栈空间已经耗尽。
3.1.1 堆栈
我仔细考虑了,自己对C语言堆栈的细节究竟了解多少,最终还是决定完全不去讲它。似乎,其他人对其所尝试的描述,以及图表,只有本身已经理解了的人才能看得懂。我将试着给出一个最简单的说明,而对其有更多兴趣的读者可以从网上查找更多信息。
每当一个子例程被调用,都有一个“栈帧”被建立,这是用来保存变量,以及其他子例程局部信息的区域。于是,当你调用 ping() ,则有一个栈帧被建立,来保存这次调用相关的信息。简言之,这个帧记载着 ping 被调用了。当再调用 pong() ,则又建立了一个栈帧,记载着 pong 也被调用了。这些栈帧是串联在一起的,每个子例程调用都是其中的一环。就这样,堆栈中显示: ping 被调用所以 pong 接下来被调用。显然,当 pong() 再调用 ping() ,则使堆栈再扩展。下面是个直观的表示:
帧 堆栈
1 ping 被调用
2 ping 被调用,所以 pong 被调用
3 ping 被调用,所以 pong 被调用,所以 ping 被调用
4 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用
5 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用
6 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用……
现在假设,这个页面的宽度就表示系统为堆栈所分配的全部内存空间,当其顶到页面的边缘的时候,将会发生溢出,系统内存耗尽,即术语“堆栈溢出”。
3.1.2 那为什么要使用堆栈?
上例是有意设计的,用来体现堆栈的问题所在。在大多数情况下,当每个子例程返回的时候,其栈帧将被清除掉,就是说堆栈将会自行实现清理过程。这一般来说是件好事,在C语言中,堆栈就是一个不需要编程者来手动进行内存管理的区域。很幸运,Python程序员也不需要直接来担心内存管理与堆栈。但是由于 Python解释器本身也是用C实现的,那些实现者们可是需要担心这个的。使用堆栈是会使事情方便,除非我们开始调用那种从不返回的函数,如上例中的,那时候,堆栈的表现就开始和程序员别扭起来,并耗尽可用的内存。
3.2 走进协程
此时,将堆栈弄溢出是有点愚蠢的。 ping() 和 pong() 本不是真正意义的子例程,因为其中哪个也不从属于另一个,它们是“协程”,处于同等的地位,并可以彼此间进行无缝通信。
帧 堆栈
1 ping 被调用
2 pong 被调用
3 ping 被调用
4 pong 被调用
5 ping 被调用
6 pong 被调用
在stackless中,我们使用通道来建立协程。还记得吗,通道所带来的两个好处中的一个,就是能够控制微进程之间运行的流程。使用通道,我们可以在 ping 和 pong 这两个协程之间自由来回,要多少次就多少次,都不会堆栈溢出:
#
# pingpong_stackless.py
#
import stackless
ping_channel = stackless.channel()
pong_channel = stackless.channel()
def ping():
while ping_channel.receive(): #在此阻塞
print "PING"
pong_channel.send("from ping")
def pong():
while pong_channel.receive():
print "PONG"
ping_channel.send("from pong")
stackless.tasklet(ping)()
stackless.tasklet(pong)()
# 我们需要发送一个消息来初始化这个游戏的状态
# 否则,两个微进程都会阻塞
stackless.tasklet(ping_channel.send)('startup')
stackless.run()
你可以运行这个程序要多久有多久,它都不会崩溃,且如果你检查其内存使用量(使用Windows的任务管理器或Linux的top命令),将会发现使用量是恒定的。这个程序的协程版本,不管运行一分钟还是一天,使用的内存都是一样的。而如果你检查原先那个递归版本的内存用量,则会发现其迅速增长,直到崩溃。
3.3 总结
是否还记得,先前我提到过,那个代码的递归版本,有经验的程序员会一眼看出毛病。但老实说,这里面并没有什么“计算机科学”方面的原因在阻碍它的正常工作,有些让人坚信的东西,其实只是个与实现细节有关的小问题——只因为大多数传统编程语言都使用堆栈。某种意义上说,有经验的程序员都是被洗了脑,从而相信这是个可以接受的问题。而stackless,则真正察觉了这个问题,并除掉了它。
4 轻量级线程
与当今的操作系统中内建的、和标准Python代码中所支持的普通线程相比,“微线程”要更为轻量级,正如其名称所暗示。它比传统线程占用更少的内存,并且微线程之间的切换,要比传统线程之间的切换更加节省资源。
为了准确说明微线程的效率究竟比传统线程高多少,我们用两者来写同一个程序。
4.1 Hackysack的模拟
Hackysack是一种游戏,就是一伙脏乎乎的小子围成一个圈,来回踢一个装满了豆粒的沙包,目标是不让这个沙包落地,当传球给别人的时候,可以耍各种把戏。踢沙包只可以用脚。
在我们的简易模拟中,我们假设一旦游戏开始,圈里人数就是恒定的,并且每个人都是如此厉害,以至于如果允许的话,这个游戏可以永远停不下来。
4.2 游戏的传统线程版本
import thread
import random
import sys
import Queue
class hackysacker:
counter = 0
def __init__(self,name,circle):
self.name = name
self.circle = circle
circle.append(self)
self.messageQueue = Queue.Queue()
thread.start_new_thread(self.messageLoop,())
def incrementCounter(self):
hackysacker.counter += 1
if hackysacker.counter >= turns:
while self.circle:
hs = self.circle.pop()
if hs is not self:
hs.messageQueue.put('exit')
sys.exit()
def messageLoop(self):
while 1:
message = self.messageQueue.get()
if message == "exit":
debugPrint("%s is going home" % self.name)
sys.exit()
debugPrint("%s got hackeysack from %s" % (self.name, message.name))
kickTo = self.circle[random.randint(0,len(self.circle)-1)]
debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name))
self.incrementCounter()
kickTo.messageQueue.put(self)
def debugPrint(x):
if debug:
print x
debug=1
hackysackers=5
turns = 5
def runit(hs=10,ts=10,dbg=1):
global hackysackers,turns,debug
hackysackers = hs
turns = ts
debug = dbg
hackysacker.counter= 0
circle = []
one = hackysacker('1',circle)
for i in range(hackysackers):
hackysacker(`i`,circle)
one.messageQueue.put(one)
try:
while circle:
pass
except:
#有时我们在清理过程中会遇到诡异的错误。
pass
if __name__ == "__main__":
runit(dbg=1)
一个“玩者”类的初始化用到了其名字,和一个指向包含了所有玩者的全局列表 circle 的引用,还有一个继承自Python标准库中的Queue类的消息队列。
Queue 这个类的作用,与stackless的通道类似。它包含 put() 和 get() 方法,在一个空的Queue上调用 put() 会阻塞,直到另一个线程调用 put() 将数据送入Queue中为止。Queue这个类被设计为能与操作系统级的线程高效合作。
__init__ 方法接下来使用Python标准库中的thread模块新建一个线程,并在新线程中开始了一个消息循环。此消息循环是个无限循环,不停地处理队列中的消息。如果其收到一个特殊的消息 'exit' ,则结束这个线程。
如果收到了另一个消息——指定其收到了沙包,玩者则从圈中随机选取一个其他玩者,通过向其发送一条消息来指定,将沙包再踢给它。
由类成员变量 hackysacker.counter 进行计数,当沙包被踢够了指定的次数时,将会向圈中的所有玩者都发送一条特殊的 'exit' 消息。
注意,当全局变量debug为非零的时候,还有个函数debugPrint可以输出信息。我们可以使这游戏输出到标准输出,但当计时的时候,这会影响精确度。
我们来运行这个程序,并检查其是否正常工作:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.exe
hackysackthreaded.py
1 got hackeysack from 1
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 0
0 got hackeysack from 4
0 kicking hackeysack to 1
1 got hackeysack from 0
1 kicking hackeysack to 3
3 got hackeysack from 1
3 kicking hackeysack to 3
4 is going home
2 is going home
1 is going home
0 is going home
1 is going home
C:\Documents and Settings\grant\Desktop\why_stackless\code>
如我们所见,所有玩者到了一起,并很快地进行了一场游戏。现在,我们对若干次实验运行过程进行计时。Python标准库中有一个 timeit.py 程序,可以用作此目的。那么,我们也同时关掉调试输出:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackysackthreaded" hackysackthreaded.ru
nit(10,1000,0)
10 loops, best of 3: 183 msec per loop
在我的机器上,十个玩者共进行1000次传球,共使用了183毫秒。我们来增加玩者的数量:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackeysackthreaded" hackeysackthreaded.ru
nit(100,1000,0)
10 loops, best of 3: 231 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackysackthreaded" hackysackthreaded.ru
nit(1000,1000,0)
10 loops, best of 3: 681 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackysackthreaded" hackysackthreaded.ru
nit(10000,1000,0)
Traceback (most recent call last):
File "c:\Python24\lib\timeit.py", line 255, in main
x = t.timeit(number)
File "c:\Python24\lib\timeit.py", line 161, in timeit
timing = self.inner(it, self.timer)
File "<timeit-src>", line 6, in inner
File ".\hackeysackthreaded.py", line 58, in runit
hackysacker(`i`,circle)
File ".\hackeysackthreaded.py", line 14, in __init__
thread.start_new_thread(self.messageLoop,())
error: can't start new thread
在我的3GHz、1G内存的机器上,当尝试10,000个线程的时候出现了错误。就不想拿出这详细的输出内容来扰人了,只是通过若干实验与出错过程得出,在我机器上,此程序从1100个线程左右开始出错。另请注意,1000个线程时候所耗用的时间,是10个线程时候的大约三倍。
4.3 Stackless
import stackless
import random
import sys
class hackysacker:
counter = 0
def __init__(self,name,circle):
self.name = name
self.circle = circle
circle.append(self)
self.channel = stackless.channel()
stackless.tasklet(self.messageLoop)()
def incrementCounter(self):
hackysacker.counter += 1
if hackysacker.counter >= turns:
while self.circle:
self.circle.pop().channel.send('exit')
def messageLoop(self):
while 1:
message = self.channel.receive()
if message == 'exit':
return
debugPrint("%s got hackeysack from %s" % (self.name, message.name))
kickTo = self.circle[random.randint(0,len(self.circle)-1)]
while kickTo is self:
kickTo = self.circle[random.randint(0,len(self.circle)-1)]
debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name))
self.incrementCounter()
kickTo.channel.send(self)
def debugPrint(x):
if debug:print x
debug = 5
hackysackers = 5
turns = 1
def runit(hs=5,ts=5,dbg=1):
global hackysackers,turns,debug
hackysackers = hs
turns = ts
debug = dbg
hackysacker.counter = 0
circle = []
one = hackysacker('1',circle)
for i in range(hackysackers):
hackysacker(`i`,circle)
one.channel.send(one)
try:
stackless.run()
except TaskletExit:
pass
if __name__ == "__main__":
runit()
以上代码实质上与线程版本是等价的,主要区别仅在于我们使用微进程来代替线程,并且使用通道代替Queue来进行切换。让我们运行它,并检查输出:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e hackysackstackless.py
1 got hackeysack from 1
1 kicking hackeysack to 1
1 got hackeysack from 1
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 1
1 got hackeysack from 4
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 0
工作情况确如预期。现在来计时:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(10,1000,0)
100 loops, best of 3: 19.7 msec per loop
其仅用了19.7毫秒,速度几乎是线程版本的10倍。现在我们同样开始增加微线程的数量:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(100,1000,0)
100 loops, best of 3: 19.7 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(1000,1000,0)
10 loops, best of 3: 26.9 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(10000,1000,0)
10 loops, best of 3: 109 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(100000,1000,0)
10 loops, best of 3: 1.07 sec per loop
甚至直到10,000个线程的时候,那时线程版本早已不能运行了,而这个仍然可以比线程版本在10个线程的时候运行的还快。
这里我在尽量保持代码的简洁,因此你可以相信我的话:计时时间的增长仅仅在于初始化游戏圈子的部分,而真正进行游戏的时间则是一直不变的,不管使用10个微线程,还是10,000个。这归因于通道的工作方式:当它们收到消息的时候,是立即进行阻塞和恢复操作的。另一方面,各个操作系统线程则是轮番检查自己的队列里是否有了东西,这意味着,跑着越多的线程,性能就变得越差。
4.4 总结
但愿我已经成功地演示了,微线程的运行至少比操作系统线程快一个数量级,并具备远高于后者的可伸缩性。关于操作系统线程的一般常识是:(1)尽量不要使用它,(2)如果非用不可,就能少用一点就少用一点。而stackless的微线程则使我们从这些限制中解放出来。
5 数据流
5.1 工厂
假设,我们要写程序来模拟一个生产玩具娃娃的工厂,具有如下的需求:
一个仓库,装有用来塑造的塑料球。
一个仓库,装有用来连接部件的铆钉。
一台注塑机,可以在6秒内,用0.2磅塑料球来制造一双手臂。
一台注塑机,可以在5秒内,用0.2磅塑料球来制造一双腿。
一台注塑机,可以在4秒内,用0.1磅塑料球来制造一个头部。
一台注塑机,可以在10秒内,用0.5磅塑料球来制造一个躯干。
一个装配台,可以在2秒内,将一个现成的躯干和一双现成的腿,用一个铆钉装配在一起。
一个装配台,可以在2秒内,将上面的半成品和一双现成的手臂,用一个铆钉装配在一起。
一个装配台,可以在3秒内,将上面的半成品和一个现成的头部,用一个铆钉装配在一起。
每台设备都一直不停地工作下去。
5.2 “普通”版本
如果不用stackless而用“普通”的方法来写这个,将会是很痛苦的事情。当我们经历了这个“普通”版示例之后,会再用stackless来做一个,并比较两者的代码。如果你认为这个例子太不自然,并且有时间的话,可以稍为休息后,根据上面的需求,自己来做一个工厂的实现,再来将你写出的代码和 stackless版本做个比较。
代码如下:
class storeroom:
def __init__(self,name,product,unit,count):
self.product = product
self.unit = unit
self.count = count
self.name = name
def get(self,count):
if count > self.count:
raise RuntimeError("Not enough %s" % self.product)
else:
self.count -= count
return count
def put(self,count):
self.count += count
def run(self):
pass
rivetStoreroom = storeroom("rivetStoreroom","rivets","#",1000)
plasticStoreroom = storeroom("plastic Storeroom","plastic pellets","lb",100)
class injectionMolder:
def __init__(self,name,partName,plasticSource,plasticPerPart,timeToMold):
self.partName = partName
self.plasticSource = plasticSource
self.plasticPerPart = plasticPerPart
self.timeToMold = timeToMold
self.items = 0
self.plastic = 0
self.time = -1
self.name = name
def get(self,items):
if items > self.items:
return 0
else:
self.items -= items
return items
def run(self):
if self.time == 0:
self.items += 1
print "%s finished making part" % self.name
self.time -= 1
elif self.time < 0:
print "%s starts making new part %s" % (self.name,self.partName)
if self.plastic < self.plasticPerPart:
print "%s getting more plastic"
self.plastic += self.plasticSource.get(self.plasticPerPart * 10)
self.time = self.timeToMold
else:
print "%s molding for %s more seconds" % (self.partName, self.time)
self.time -= 1
armMolder = injectionMolder("arm Molder", "arms",plasticStoreroom,0.2,6)
legMolder = injectionMolder("leg Molder", "leg",plasticStoreroom,0.2,5)
headMolder = injectionMolder("head Molder","head",plasticStoreroom,0.1,4)
torsoMolder = injectionMolder("torso Molder","torso",plasticStoreroom,0.5,10)
class assembler:
def __init__(self,name,partAsource,partBsource,rivetSource,timeToAssemble):
self.partAsource = partAsource
self.partBsource = partBsource
self.rivetSource = rivetSource
self.timeToAssemble = timeToAssemble
self.itemA = 0
self.itemB = 0
self.items = 0
self.rivets = 0
self.time = -1
self.name = name
def get(self,items):
if items > self.items:
return 0
else:
self.items -= items
return items
def run(self):
if self.time == 0:
self.items += 1
print "%s finished assembling part" % self.name
self.time -= 1
elif self.time < 0:
print "%s starts assembling new part" % self.name
if self.itemA < 1:
print "%s Getting item A" % self.name
self.itemA += self.partAsource.get(1)
if self.itemA < 1:
print "%s waiting for item A" % self.name
elif self.itemB < 1:
print "%s Getting item B" % self.name
self.itemB += self.partBsource.get(1)
if self.itemB < 1:
print "%s waiting for item B" % self.name
print "%s starting to assemble" % self.name
self.time = self.timeToAssemble
else:
print "%s assembling for %s more seconds" % (self.name, self.time)
self.time -= 1
legAssembler = assembler("leg Assembler",torsoMolder,legMolder,rivetStoreroom,2)
armAssembler = assembler("arm Assembler", armMolder,legAssembler,rivetStoreroom,2)
torsoAssembler = assembler("torso Assembler", headMolder,armAssembler,
rivetStoreroom,3)
components = [rivetStoreroom, plasticStoreroom, armMolder,
legMolder, headMolder, torsoMolder,
legAssembler, armAssembler, torsoAssembler]
def run():
while 1:
for component in components:
component.run()
raw_input("Press <ENTER> to continue...")
print "\n\n\n"
if __name__ == "__main__":
run()
5.2.1 分析
我们从一个代表仓库的类开始,它的初始化需要一个其所储存的产品的名称、一个衡量单位(如磅,或部件数目)和一个初始存量作为参数。还有一个 run 方法什么也不做,其用途将会在稍后了解。基于这个类,我们建立了两个仓库示例。
接下来是一个注塑机类,它的初始化需要其产品的名称、一个作为塑料来源的仓库、制造一个部件所需要的原料量,和制造一个部件所需的时间作为参数。有一个 get() 方法,在其内部已有完成的产品时,可将其取出,并调整内部记录。对于这个类, run() 方法是确实做了些事情的:
在计时器大于0期间,塑造过程持续进行,并递减计时器。
当塑造剩余时间达到0,则一个产品被建立,并把计时器设为-1。
当计时器为-1时,注塑机检测是否还有足够的塑料来塑造下一个产品,如果有,则取来原料,并开始塑造。
用这个类,我们建立了四个注塑机实例。
再接下来是一个装配台类,它的初始化需要其产品的名字、部件1的来源、部件2的来源、一个铆钉的仓库,以及装配这些部件所需的时间作为参数。也有一个 get() 方法,在其内部已有完成的产品时,可将其取出,并调整内部记录。而这个类的 run() 方法是这样的:
若计时器大于0,则已经具备原材料的装配台继续其装配过程。
如果计时器等于0,则一个产品被完成,内部记录随之被调整。
如果计时器小于0,则装配台试图取得新的各个部件,并再次开始装配。若其中某个部件还没有来得及塑造出来,则必须等待。
为了装配腿、手臂和头部,各有一个装配台实例被建立。
注:
你会注意到,仓库、注塑机和装配台类有很多相似之处。如果我是在写一个真正生产系统,则很可能先建立一个基类,并使用继承。但在这里,我觉得做出这种类层次关系的话只会使代码变得繁杂,所以有意保持了其简单。
由以上三个类所建立的所有实例,都被装进一个称为 components 的“设备”数组中。然后,我们建立一个事件循环,重复地调用每个设备的 run() 方法。
5.3 走进数据流
如果你熟悉 Unix 系统,那么不管你知不知道数据流技术,恐怕你都已经在使用它了。看下面的 shell 命令:
cat README | more
为了公平,也举出 Windows 中对应的:
type readme.txt | more
尽管,在 Windows 的世界中,数据流技术并不像在 Unix 世界中那么普遍深入。
顺便对还不熟悉 more 工具的读者:这个程序从一个外部来源接收输入,显示一页的内容后暂停,直到用户按下任意键,再显示下一页。这个“|”操作符获取一个程序的输出,并用管道将其传送到另一个命令的输入。这样,不管 cat 还是 type ,都是将文档内容传送到标准输出,而 more 则接收这些输出。
这样,more 程序仅仅是坐在那里,等着来自另一个程序的数据来流向自己。只要流进的数据足够一定量,就在屏幕上显示一页并暂停;而用户击键时,more 则让后面的数据再流入,并开始再一次等待数据量足够,再显示,再暂停。这便是术语“数据流”。
使用通道,再使用stackless本身的轮转调度器,我们就可以使用数据流技术来写这个工厂的模拟。
5.4 代码的stackless版本
import stackless
#
# “休眠” 辅助函数
#
sleepingTasklets = []
sleepingTicks = 0
def Sleep(secondsToWait):
channel = stackless.channel()
endTime = sleepingTicks + secondsToWait
sleepingTasklets.append((endTime, channel))
sleepingTasklets.sort()
# 阻塞,直到收到一个唤醒通知。
channel.receive()
def ManageSleepingTasklets():
global sleepingTicks
while 1:
if len(sleepingTasklets):
endTime = sleepingTasklets[0][0]
while endTime <= sleepingTicks:
channel = sleepingTasklets[0][1]
del sleepingTasklets[0]
# 我们需要发送一些东西,但发什么无所谓,
# 因为其内容是没用的。
channel.send(None)
endTime = sleepingTasklets[0][0] # 检查下一个
sleepingTicks += 1
print "1 second passed"
stackless.schedule()
stackless.tasklet(ManageSleepingTasklets)()
#
# 工厂的实现
#
class storeroom:
def __init__(self,name,product,unit,count):
self.product = product
self.unit = unit
self.count = count
self.name = name
def get(self,count):
while count > self.count: #重新调度,直到有了足够的原料
print "%s doesn't have enough %s to deliver yet" % (self.name,
self.product)
stackless.schedule()
self.count -= count
return count
return count
def put(self,count):
self.count += count
def run(self):
pass
rivetStoreroom = storeroom("rivetStoreroom","rivets","#",1000)
plasticStoreroom = storeroom("plastic Storeroom","plastic pellets","lb",100)
class injectionMolder:
def __init__(self,name,partName,plasticSource,plasticPerPart,timeToMold):
self.partName = partName
self.plasticSource = plasticSource
self.plasticPerPart = plasticPerPart
self.timeToMold = timeToMold
self.plastic = 0
self.items = 0
self.name = name
stackless.tasklet(self.run)()
def get(self,items):
while items > self.items: #重新调度,直到有了足够的产品
print "%s doesn't have enough %s to deliver yet" % (self.name,
self.partName)
stackless.schedule()
self.items -= items
return items
def run(self):
while 1:
print "%s starts making new part %s" % (self.name,self.partName)
if self.plastic < self.plasticPerPart:
print "%s getting more plastic"
self.plastic += self.plasticSource.get(self.plasticPerPart * 10)
self.plastic -= self.plasticPerPart
Sleep(self.timeToMold)
print "%s done molding after %s seconds" % (self.partName,
self.timeToMold)
self.items += 1
print "%s finished making part" % self.name
stackless.schedule()
armMolder = injectionMolder("arm Molder", "arms",plasticStoreroom,0.2,5)
legMolder = injectionMolder("leg Molder", "leg",plasticStoreroom,0.2,5)
headMolder = injectionMolder("head Molder","head",plasticStoreroom,0.1,5)
torsoMolder = injectionMolder("torso Molder","torso",plasticStoreroom,0.5,10)
class assembler:
def __init__(self,name,partAsource,partBsource,rivetSource,timeToAssemble):
self.partAsource = partAsource
self.partBsource = partBsource
self.rivetSource = rivetSource
self.timeToAssemble = timeToAssemble
self.itemA = 0
self.itemB = 0
self.items = 0
self.rivets = 0
self.name = name
stackless.tasklet(self.run)()
def get(self,items):
while items > self.items: #重新调度,直到有了足够的产品
print "Don't have a %s to deliver yet" % (self.name)
stackless.schedule()
self.items -= items
return items
def run(self):
while 1:
print "%s starts assembling new part" % self.name
self.itemA += self.partAsource.get(1)
self.itemB += self.partBsource.get(1)
print "%s starting to assemble" % self.name
Sleep(self.timeToAssemble)
print "%s done assembling after %s" % (self.name, self.timeToAssemble)
self.items += 1
print "%s finished assembling part" % self.name
stackless.schedule()
legAssembler = assembler("leg Assembler",torsoMolder,legMolder,rivetStoreroom,2)
armAssembler = assembler("arm Assembler", armMolder,legAssembler,rivetStoreroom,2)
torsoAssembler = assembler("torso Assembler", headMolder,armAssembler,
rivetStoreroom,3)
def pause():
while 1:
raw_input("Press <ENTER> to continue...")
print "\n\n\n"
stackless.schedule()
stackless.tasklet(pause)()
def run():
stackless.run()
if __name__ == "__main__":
run()
5.4.1 分析
5.4.1.1 休眠功能
首先我们建立了一些辅助函数,好让我们的类可以进行“休眠”。一个微进程调用 Sleep() ,则先建立一个通道,再计算出将被唤醒的时间,并将这个时间信息添加到全局数组 sleepingTasklets 中。之后,将调用 channel.receive() ,这将使该微进程暂停运行,直到被再次唤醒。
接着我们建立另一个函数,来管理所有休眠的微进程。它检查全局数组 sleepingTasklets ,找出所有需要立即被唤醒的成员,并通过其通道来将其唤醒。这个函数也被添加到了微进程调度器中。
5.4.1.2 类
这些类与“普通”版本中的类相似,但也有一些显著不同:首先,在实例化的时候,他们的 run() 方法创建了微进程,这样我们不再需要手工建立一个设备数组,和一个外部的 run() 函数来处理事件循环,stackless本身就隐式地做了这些工作。其次的不同是,微进程可以通过休眠来等待一个产品被产出,而不用通过计数器来计时。第三个不同,则是对 get() 的调用变得更自然了,如果某种原材料没有准备好,则这个微进程简单地重新进入调度循环,直到有了原材料。
5.5 那我们获得了什么?
OK,两个版本的程序都能运行,并得到同样的结果,那这里究竟有什么大不了的事情?——让我们查看一下普通版本的工厂的 run 方法:
def run(self):
if self.time == 0:
self.items += 1
print "%s finished assembling part" % self.name
self.time -= 1
elif self.time < 0:
print "%s starts assembling new part" % self.name
if self.itemA < 1:
print "%s Getting item A" % self.name
self.itemA += self.partAsource.get(1)
if self.itemA < 1:
print "%s waiting for item A" % self.name
elif self.itemB < 1:
print "%s Getting item B" % self.name
self.itemB += self.partBsource.get(1)
if self.itemB < 1:
print "%s waiting for item B" % self.name
print "%s starting to assemble" % self.name
self.time = self.timeToAssemble
else:
print "%s assembling for %s more seconds" % (self.name, self.time)
self.time -= 1
再看 stackless 的版本:
def run(self):
while 1:
print "%s starts assembling new part" % self.name
self.itemA += self.partAsource.get(1)
self.itemB += self.partBsource.get(1)
print "%s starting to assemble" % self.name
Sleep(self.timeToAssemble)
print "%s done assembling after %s" % (self.name, self.timeToAssemble)
self.items += 1
print "%s finished assembling part" % self.name
stackless.schedule()
Stackless 的版本比普通的版本更加简单、清晰和直观,它不需要将事件循环的基础结构包装进 run 方法中,这个结构已经和 run() 方法解除了耦合。run() 方法仅仅描述了自己要做什么,而不需要关心具体究竟怎么做的。这就使软件开发者能集中精力于工厂的运作,而不是事件循环以及程序本身的运作。
5.6 “推”数据
注:
本节的完整程序保存为 digitalCircuit.py ,在本文的末尾,和代码.zip文件中和都有。
在工厂的例子中,我们是在“拉”数据:每个部分都去请求其所需要的部件,并一直等待那些部件到来。我们也可以来“推”数据,这样,系统中的每个部分都将自身的变化向下传播到另一个部分。“拉”的方式,称为“懒惰数据流”,而“推”的方式则称为“急切数据流”。
为了演示“推”的方式,我们来建立一个数字电路的模拟器。这个模拟器由各种元件组成,元件具有0或1的状态,并可以各种方式互相连接起来。这里我们使用面向对象的方法,并定义一个 EventHandler 基类来实现其大部分功能:
class EventHandler:
def __init__(self,*outputs):
if outputs==None:
self.outputs=[]
else:
self.outputs=list(outputs)
self.channel = stackless.channel()
stackless.tasklet(self.listen)()
def listen(self):
while 1:
val = self.channel.receive()
self.processMessage(val)
for output in self.outputs:
self.notify(output)
def processMessage(self,val):
pass
def notify(self,output):
pass
def registerOutput(self,output):
self.outputs.append(output)
def __call__(self,val):
self.channel.send(val)
EventHandler 类的核心功能,是做以下三件事:
通过 listen 方法,持续地监听一个通道上传来的消息。
之后,通过 processMessage 方法,处理所有收到的消息。
最后,通过 notify 方法,将处理结果通知到所有注册的输出端。
还有两个附加的辅助方法:
registerOutput 用来在实例建立之后,再注册额外的输出端。
__call__ 被重载,作为一种便利,使我们可以以这种格式来发送消息:
event(1)
从而无需这样:
event.channel.send(1)
使用 EventHandler 类作为基本构建单元,我们可以开始实现这个数字电路模拟器,由一个开关开始。下面描述的是一个可由用户控制的开关,可以向其发送0或1的值:
class Switch(EventHandler):
def __init__(self,initialState=0,*outputs):
EventHandler.__init__(self,*outputs)
self.state = initialState
def processMessage(self,val):
debugPrint("Setting input to %s" % val)
self.state = val
def notify(self,output):
output((self,self.state))
初始化之后,这个开关就保存着其初始的状态,而 processMessage 则被重载,用来将收到的消息保存起来,成为新的当前状态。其 notify 方法则被重载为发送一个元组,其中含有指向实例自身的引用,还有其状态。我们不久后会看到,我们需要顺便发送这个自身的引用,这样,那些具有多个输入端的元件则可以判别,消息来自于哪个来源。
注:
若你正在随着我们的进度来调试代码,则别忘了我们还在使用 debugPrint() 函数来提供诊断信息,它最初是在“轻量级线程”这一节中定义的。
接下来我们要建立的,是“指示器”类,这个类的实例的作用仅仅是将其当前状态输出。我想我们可以认为,其相当于真正的数字电路中的发光二极管:
class Reporter(EventHandler):
def __init__(self,msg="%(sender)s send message %(value)s"):
EventHandler.__init__(self)
self.msg = msg
def processMessage(self,msg):
sender,value=msg
print self.msg % {'sender':sender,'value':value}
其初始化接受一个可选的格式字符串,来指定之后输出的样式。代码的其他部分意义自明。
现在我们有了一个足够好的框架,来测试这些最初的功能:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e
Python 2.4.3 Stackless 3.1b3 060516 (#69, May 3 2006, 11:46:11) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>> from digitalCircuit import *
>>>
>>> reporter = Reporter()
>>> switch = Switch(0,reporter) #创建一个开关,并连接到一个指示器做输出。
>>>
>>> switch(1)
<digitalCircuit.Switch instance at 0x00A46828> send message 1
>>>
>>> switch(0)
<digitalCircuit.Switch instance at 0x00A46828> send message 0
>>>
>>> switch(1)
<digitalCircuit.Switch instance at 0x00A46828> send message 1
>>>
与先前设计的工厂不同,对开关的操作会使结果立即被推至其输出端,并显示出来。
现在我们来建立一些数字逻辑部件,首先是反相器,它接受一个输入,并将其逻辑相反的值推出,就是说输入0会输出1,输入1会输出0:
class Inverter(EventHandler):
def __init__(self,input,*outputs):
EventHandler.__init__(self,*outputs)
self.input = input
input.registerOutput(self)
self.state = 0
def processMessage(self,msg):
sender,value = msg
debugPrint("Inverter received %s from %s" % (value,msg))
if value:
self.state = 0
else:
self.state = 1
反相器的初始化参数为一个输入端,即另外某个 EventHandler ,将它保存下来,并将自身注册为它的一个输出端。而 processMessage() 方法,则将自身的状态设为收到的消息的逻辑相反值。与 Switch 类类似,反相器类的 notify 事件也发送一个由其自身和其状态所组成的元组。
我们可以修改上面的例子,在开关和指示器之间串联入一个反相器。如有兴趣,尽可一试,但这个过程我认为已经没有必要列出了。
接下来是一个与门,这是我们遇到的第一个有多个输入端的类。它有两个输入端,如果都被置为1,则送出消息1,否则送出消息0:
class AndGate(EventHandler):
def __init__(self,inputA,inputB,*outputs):
EventHandler.__init__(self,*outputs)
self.inputA = inputA
self.inputAstate = inputA.state
inputA.registerOutput(self)
self.inputB = inputB
self.inputBstate = inputB.state
inputB.registerOutput(self)
self.state = 0
def processMessage(self,msg):
sender, value = msg
debugPrint("AndGate received %s from %s" % (value,sender))
if sender is self.inputA:
self.inputAstate = value
elif sender is self.inputB:
self.inputBstate = value
else:
raise RuntimeError("Didn't expect message from %s" % sender)
if self.inputAstate and self.inputBstate:
self.state = 1
else:
self.state = 0
debugPrint("AndGate's new state => %s" % self.state)
def notify(self,output):
output((self,self.state))
在与门的 processMessage 方法中,我们需要判定,是哪个输入端送来了消息,并据此设置状态。这就是为什么别的部件送来的消息中需要含有其自身的引用。
最后我们做出或门。它和与门类似,只是,它只要有任一个输入端为1的时候就送出消息1,只有两个输入端都为0的时候才送出0:
class OrGate(EventHandler):
def __init__(self,inputA,inputB,*outputs):
EventHandler.__init__(self,*outputs)
self.inputA = inputA
self.inputAstate = inputA.state
inputA.registerOutput(self)
self.inputB = inputB
self.inputBstate = inputB.state
inputB.registerOutput(self)
self.state = 0
def processMessage(self,msg):
sender, value = msg
debugPrint("OrGate received %s from %s" % (value,sender))
if sender is self.inputA:
self.inputAstate = value
elif sender is self.inputB:
self.inputBstate = value
else:
raise RuntimeError("Didn't expect message from %s" % sender)
if self.inputAstate or self.inputBstate:
self.state = 1
else:
self.state = 0
debugPrint("OrGate's new state => %s" % self.state)
def notify(self,output):
output((self,self.state))
5.6.1 半加器
作为结束,我们将使用我们已经建立的所有部件,来构建一个半加器。半加器实现两个比特的加法。我们将一些部件连接了起来,然后来“拨动”开关。开关的动作改变了其状态,并且把其变化,以数据流的方式,通过系统传播了下去:
if __name__ == "__main__":
# 半加器
inputA = Switch()
inputB = Switch()
result = Reporter("Result = %(value)s")
carry = Reporter("Carry = %(value)s")
andGateA = AndGate(inputA,inputB,carry)
orGate = OrGate(inputA,inputB)
inverter = Inverter(andGateA)
andGateB = AndGate(orGate,inverter,result)
inputA(1)
inputB(1)
inputB(0)
inputA(0)
6 角色
在角色的模型里面,一切都是角色(废话!)。角色就是一个对象(一般意义上的对象,而不必是面向对象中的意义),它可以:
从其他角色接收消息。
对收到的消息中适合于自己的,进行处理。
向其它角色发送消息。
创建新的角色。
一个角色对其它的角色并不具有直接的访问渠道,所有的交流都通过消息传递来完成。这就提供了丰富的模型,来模拟现实世界中的对象——它们是彼此松散耦合的,并对彼此的内部所知有限。
如果我们要建立一个模拟过程的话,就来模拟一下……
6.1 杀手机器人!
注:
本节的完整程序保存为 actors.py ,在本文的末尾,和代码.zip文件中和都有。
6.1.1 角色基类
在这个例子中,我们将配置出一个小小的世界,在其中有一些使用角色模型来移动和战斗的机器人。作为开始,我们来定义所有角色的基类:
class actor:
def __init__(self):
self.channel = stackless.channel()
self.processMessageMethod = self.defaultMessageAction
stackless.tasklet(self.processMessage)()
def processMessage(self):
while 1:
self.processMessageMethod(self.channel.receive())
def defaultMessageAction(self,args):
print args
默认情况下,角色建立一个通道来接收消息,指定一个方法来处理这些消息,并启动一个循环来将接收的消息分派给处理方法。默认的处理过程只是把收到的消息显示出来。这些,已经是我们实现角色模型所需要的全部。
6.1.2 消息的格式
所有发送的消息都遵从一个格式:先是发送者的通道,接着一个字符串为消息的名称,再接下来是可选的参数。例如:
(self.channel, "JOIN", (1,1) )
(self.channel, "COLLISION")
等等……
注意,我们只将发送者的通道随消息送出,而不是整个发送者对象。在角色模型中,角色间的所有交流都必须通过消息传递来体现,如果将 self 都发送出去的话,则使得对方可以很容易地用非正常手段对发送者的内部未知信息进行访问。
事实上你会注意到,当我们将本节的大部分角色实例化的时候,甚至不需要将其赋值给能被别的角色访问到的变量。我们仅仅创建它们,并让它们独自漂浮在那里,对周围环境只有有限的了解。
6.1.3 “世界”类
“世界”角色,扮演着其他所有角色相互作用的中央枢纽。其他角色发送 JOIN 消息给世界角色,后者则跟踪它们。周期性地,世界角色发出 WORLD_STATE 消息,其中包括关于所有可见的角色的信息,来供它们内部处理:
class world(actor):
def __init__(self):
actor.__init__(self)
self.registeredActors = {}
stackless.tasklet(self.sendStateToActors)()
def testForCollision(self,x,y):
if x < 0 or x > 496:
return 1
elif y < 0 or y > 496:
return 1
else:
return 0
def sendStateToActors(self):
while 1:
for actor in self.registeredActors.keys():
actorInfo = self.registeredActors[actor]
if self.registeredActors[actor][1] != (-1,-1):
VectorX,VectorY = (math.sin(math.radians(actorInfo[2])) * actorInfo[3],
math.cos(math.radians(actorInfo[2])) * actorInfo[3])
x,y = actorInfo[1]
x += VectorX
y -= VectorY
if self.testForCollision(x,y):
actor.send((self.channel,"COLLISION"))
else:
self.registeredActors[actor] = tuple([actorInfo[0],
(x,y),
actorInfo[2],
actorInfo[3]])
worldState = [self.channel, "WORLD_STATE"]
for actor in self.registeredActors.keys():
if self.registeredActors[actor][1] != (-1,-1):
worldState.append( (actor, self.registeredActors[actor]))
message = tuple(worldState)
for actor in self.registeredActors.keys():
actor.send(message)
stackless.schedule()
def defaultMessageAction(self,args):
sentFrom, msg, msgArgs = args[0],args[1],args[2:]
if msg == "JOIN":
print 'ADDING ' , msgArgs
self.registeredActors[sentFrom] = msgArgs
elif msg == "UPDATE_VECTOR":
self.registeredActors[sentFrom] = tuple([self.registeredActors[sentFrom][0],
self.registeredActors[sentFrom][1],
msgArgs[0],msgArgs[1]])
else:
print '!!!! WORLD GOT UNKNOWN MESSAGE ' , args
World = world().channel
除了处理消息的微进程外,“世界”角色还建立了另一个独立的微进程,来执行 sendStateToActors() 方法。这个方法里有个循环,用于构建关于世界状态的信息,并发送给所有的角色。这是其它角色唯一可以指望接收到的消息。若有必要,它们可以回应这个消息,即将某种 UPDATE 消息发回给世界。
作为 sendStateToActors() 方法的一部分,“世界”角色需要更新其内部的、对可移动的角色的位置的记录。它使用可移动角色的角度和速度来建立一个矢量,确保更新后的位置不会撞到世界的四面边界,并存下其新的位置。
defaultMessageAction() 方法处理以下已知信息,并忽略其他的:
JOIN
将一个角色添加到世界中的已知角色列表,其参数包括新角色的位置、角度和速度。位置-1, -1表示这个角色对其它角色不可见,比如后面将要详述的显示屏角色。
UPDATE_VECTOR
为发送这个消息的角色设置新的角度和速度。
最后,一个“世界”角色被实例化,其通道被保存进全局变量 World 中,使其它角色可以发送它们最初的 JOIN 消息。
6.1.4 一个简单的机器人
我们将以一个简单的机器人开始,它以恒定的速度移动,每接到一个 WORLD_STATE 消息的时候,都顺时针旋转1度作为响应。当发生与世界边界碰撞的 COLLISION 事件时,它将旋转73度再尝试前进。所有其他消息将被忽略。
class basicRobot(actor):
def __init__(self,location=(0,0),angle=135,velocity=1,world=World):
actor.__init__(self)
self.location = location
self.angle = angle
self.velocity = velocity
self.world = world
joinMsg =(self.channel,"JOIN",self.__class__.__name__,
self.location,self.angle,self.velocity)
self.world.send(joinMsg)
def defaultMessageAction(self,args):
sentFrom, msg, msgArgs = args[0],args[1],args[2:]
if msg == "WORLD_STATE":
self.location = (self.location[0] + 1, self.location[1] + 1)
self.angle += 1
if self.angle >= 360:
self.angle -= 360
updateMsg = (self.channel, "UPDATE_VECTOR",
self.angle,self.velocity)
self.world.send(updateMsg)
elif msg == "COLLISION":
self.angle += 73
if self.angle >= 360:
self.angle -= 360
else:
print "UNKNOWN MESSAGE", args
basicRobot(angle=135,velocity=5)
basicRobot((464,0),angle=225,velocity=10)
stackless.run()
注意,机器人的构造方法发出了一个 JOIN 消息到“世界”对象,来注册自己。除此之外,代码应该还算易懂的。
6.1.5 蹊径:pyGame
至此,在示例程序中,我们都是使用调试输出语句来显示事情进行的过程。我试图以这种方式来保持代码的简单易懂,但有些时候,输出语句却变得不再直观,而是越发迷惑。在“数据流”一节中我们已经用得很勉强了,而在本节中,情况已经变得复杂到无法再尝试用打印输出来表示了。
注:
要运行本节的代码,需要安装 pyGame 的当前版本,可以从这里取得:http://www.pygame.org/
我决定使用 pyGame 来创建一个简单的可视化引擎。尽管对于 pyGame 内容的叙述已经超出了本教程的范围,但其操作本身还是相对简明的。当显示屏角色收到一个 WORLD_STATE 消息,就将相应的角色放置上去,并更新显示。很幸运,我们可以将所有的 pyGame 代码隔离在一个角色之内,因此代码的其它部分可以保持不被“污染”,依然可以被理解,哪怕不了解也不关心 pyGame 怎么进行的显示渲染:
class display(actor):
def __init__(self,world=World):
actor.__init__(self)
self.world = World
self.icons = {}
pygame.init()
window = pygame.display.set_mode((496,496))
pygame.display.set_caption("Actor Demo")
joinMsg = (self.channel,"JOIN",self.__class__.__name__, (-1,-1))
self.world.send(joinMsg)
def defaultMessageAction(self,args):
sentFrom, msg, msgArgs = args[0],args[1],args[2:]
if msg == "WORLD_STATE":
self.updateDisplay(msgArgs)
else:
print "UNKNOWN MESSAGE", args
def getIcon(self, iconName):
if self.icons.has_key(iconName):
return self.icons[iconName]
else:
iconFile = os.path.join("data","%s.bmp" % iconName)
surface = pygame.image.load(iconFile)
surface.set_colorkey((0xf3,0x0a,0x0a))
self.icons[iconName] = surface
return surface
def updateDisplay(self,actors):
for event in pygame.event.get():
if event.type == pygame.QUIT: sys.exit()
screen = pygame.display.get_surface()
background = pygame.Surface(screen.get_size())
background = background.convert()
background.fill((200, 200, 200))
screen.blit(background, (0,0))
for item in actors:
screen.blit(pygame.transfor
作者: Grant Olson 翻译: Lych 校对: gashero
邮箱: olsongt@verizon.net 邮箱: lych77@gmail.com 邮箱: harry.python@gmail.com
时间: 2006年7月7日 时间: 2007年9月23日
原文地址
目录
1 介绍
1.1 为什么要用stackless
1.1.1 现实世界就是并发的
1.1.2 并发可能是(仅仅可能是)下一个重要的编程范式
1.2 安装Stackless
2 Stackless起步
2.1 微进程(tasklet)
2.2 调度器(scheduler)
2.3 通道(channel)
2.4 总结
3 协程(coroutine)
3.1 子例程的问题
3.1.1 堆栈
3.1.2 那为什么要使用堆栈?
3.2 走进协程
3.3 总结
4 轻量级线程
4.1 Hackysack的模拟
4.2 游戏的传统线程版本
4.3 Stackless
4.4 总结
5 数据流
5.1 工厂
5.2 “普通”版本
5.2.1 分析
5.3 走进数据流
5.4 代码的stackless版本
5.4.1 分析
5.4.1.1 休眠功能
5.4.1.2 类
5.5 那我们获得了什么?
5.6 “推”数据
5.6.1 半加器
6 角色
6.1 杀手机器人!
6.1.1 角色基类
6.1.2 消息的格式
6.1.3 “世界”类
6.1.4 一个简单的机器人
6.1.5 蹊径:pyGame
6.1.6 第一轮代码
6.2 又一蹊径: 机理的模拟
6.2.1 角色属性
6.2.2 碰撞检测
6.2.3 恒定的时间
6.2.4 伤害值、生命值和死亡
6.2.5 第二轮代码
6.3 回到角色: 让我们变得疯狂
6.3.1 爆炸
6.3.2 埋雷机器人
6.3.3 建造台
6.3.4 最终的模拟
6.4 总结
7 完整代码列表
7.1 pingpong.py - 递归的ping pong示例
7.2 pingpong_stackless.py - stackless的ping pong示例
7.3 hackysackthreaded.py - 基于操作系统线程的hackysack示例
7.4 hackysackstackless.py - stackless的hackysack示例
7.5 assemblyline.py - “普通”的生产线示例
7.6 assemblyline-stackless.py - stackless的生产线示例
7.7 digitalCircuit.py - stackless数字电路
7.8 actors.py - 第一个角色示例
7.9 actors2.py - 第二个角色示例
7.10 actors3.py - 第三个角色示例
1 介绍
1.1 为什么要用stackless
摘自stackless网站 http://www.stackless.com/:
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,
并避免传统线程所带来的性能与复杂度问题。Stackless为Python带来的微线程扩展,是一种低开销、轻量
级的便利工具,如果使用得当,可以获益如下:
+ 改进程序结构
+ 增进代码可读性
+ 提高编程人员生产力
以上是Stackless Python很简明的释义,但其对我们意义何在?——就在于Stackless提供的并发建模工具,比目前其它大多数传统编程语言所提供的,都更加易用:不仅是Python自身,也包括Java、C++,以及其它。尽管还有其他一些语言提供并发特性,可它们要么是主要用于学术研究的(如 Mozart/Oz),要么是罕为使用、或用于特殊目的的专业语言(如Erlang)。而使用stackless,你将会在Python本身的所有优势之上,在一个(但愿)你已经很熟悉的环境中,再获得并发的特性。
这自然引出了个问题:为什么要并发?
1.1.1 现实世界就是并发的
现实世界就是“并发”的,它是由一群事物(或“角色”)所组成,而这些事物以一种对彼此所知有限的、松散耦合的方式相互作用。传说中面向对象编程有一个好处,就是对象能够对现实的世界进行模拟。这在一定程度上是正确的,面向对象编程很好地模拟了对象个体,但对于这些对象个体之间的交互,却无法以一种理想的方式来表现。例如,如下代码实例,有什么问题?
def familyTacoNight():
husband.eat(dinner)
wife.eat(dinner)
son.eat(dinner)
daughter.eat(dinner)
第一印象,没问题。但是,上例中存在一个微妙的安排:所有事件是次序发生的,即:直到丈夫吃完饭,妻子才开始吃;儿子则一直等到母亲吃完才吃;而女儿则是最后一个。在现实世界中,哪怕是丈夫还堵车在路上,妻子、儿子和女儿仍然可以该吃就吃,而要在上例中的话,他们只能饿死了——甚至更糟:永远没有人会知道这件事,因为他们永远不会有机会抛出一个异常来通知这个世界!
1.1.2 并发可能是(仅仅可能是)下一个重要的编程范式
我个人相信,并发将是软件世界里的下一个重要范式。随着程序变得更加复杂和耗费资源,我们已经不能指望摩尔定律来每年给我们提供更快的CPU了,当前,日用个人计算机的性能提升来自于多核与多CPU机。一旦单个CPU的性能达到极限,软件开发者们将不得不转向分布式模型,靠多台计算机的互相作用来建立强大的应用(想想GooglePlex)。为了取得多核机和分布式编程的优势,并发将很快成为做事情的方式的事实标准。
1.2 安装Stackless
安装Stackless的细节可以在其网站上找到。现在Linux用户可以通过Subversion取得源代码并编译;而对于Windows用户,则有一个.zip文件供使用,需要将其解压到现有的Python安装目录中。接下来,本教程假设Stackless Python已经安装好了,可以工作,并且假设你对Python语言本身有基本的了解。
2 Stackless起步
本章简要介绍了Stackless的基本概念,后面章节将基于这些基础,来展示更加实用的功能。
2.1 微进程(tasklet)
微进程是stackless的基本构成单元,你可以通过提供任一个Python可调用对象(通常为函数或类的方法)来建立它,这将建立一个微进程并将其添加到调度器。这是一个快速演示:
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> def print_x(x):
... print x
...
>>> stackless.tasklet(print_x)('one')
<stackless.tasklet object at 0x00A45870>
>>> stackless.tasklet(print_x)('two')
<stackless.tasklet object at 0x00A45A30>
>>> stackless.tasklet(print_x)('three')
<stackless.tasklet object at 0x00A45AB0>
>>>
>>> stackless.run()
one
two
three
>>>
注意,微进程将排起队来,并不运行,直到调用 stackless.run() 。
2.2 调度器(scheduler)
调度器控制各个微进程运行的顺序。如果刚刚建立了一组微进程,它们将按照建立的顺序来执行。在实用中,一般会建立一组可以再次被调度的微进程,好让每个都有轮次机会。一个快速演示:
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> def print_three_times(x):
... print "1:", x
... stackless.schedule()
... print "2:", x
... stackless.schedule()
... print "3:", x
... stackless.schedule()
...
>>>
>>> stackless.tasklet(print_three_times)('first')
<stackless.tasklet object at 0x00A45870>
>>> stackless.tasklet(print_three_times)('second')
<stackless.tasklet object at 0x00A45A30>
>>> stackless.tasklet(print_three_times)('third')
<stackless.tasklet object at 0x00A45AB0>
>>>
>>> stackless.run()
1: first
1: second
1: third
2: first
2: second
2: third
3: first
3: second
3: third
>>>
注意:当调用 stackless.schedule() 的时候,当前活动微进程将暂停执行,并将自身重新插入到调度器队列的末尾,好让下一个微进程被执行。一旦在它前面的所有其他微进程都运行过了,它将从上次停止的地方继续开始运行。这个过程会持续,直到所有的活动微进程都完成了运行过程。这就是使用stackless达到合作式多任务的方式。
2.3 通道(channel)
通道使得微进程之间的信息传递成为可能。它做到了两件事:
能够在微进程之间交换信息。
能够控制运行的流程。
又一个快速演示:
C:\>c:\python24\python
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> channel = stackless.channel()
>>>
>>> def receiving_tasklet():
... print "Recieving tasklet started"
... print channel.receive()
... print "Receiving tasklet finished"
...
>>> def sending_tasklet():
... print "Sending tasklet started"
... channel.send("send from sending_tasklet")
... print "sending tasklet finished"
...
>>> def another_tasklet():
... print "Just another tasklet in the scheduler"
...
>>> stackless.tasklet(receiving_tasklet)()
<stackless.tasklet object at 0x00A45B30>
>>> stackless.tasklet(sending_tasklet)()
<stackless.tasklet object at 0x00A45B70>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Recieving tasklet started
Sending tasklet started
send from sending_tasklet
Receiving tasklet finished
Just another tasklet in the scheduler
sending tasklet finished
>>>
>>>
接收的微进程调用 channel.receive() 的时候,便阻塞住,这意味着该微进程暂停执行,直到有信息从这个通道送过来。除了往这个通道发送信息以外,没有其他任何方式可以让这个微进程恢复运行。
若有其他微进程向这个通道发送了信息,则不管当前的调度到了哪里,这个接收的微进程都立即恢复执行;而发送信息的微进程则被转移到调度列表的末尾,就像调用了 stackless.schedule() 一样。
同样注意,发送信息的时候,若当时没有微进程正在这个通道上接收,也会使当前微进程阻塞:
>>>
>>> stackless.tasklet(sending_tasklet)()
<stackless.tasklet object at 0x00A45B70>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Sending tasklet started
Just another tasklet in the scheduler
>>>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45B30>
>>> stackless.run()
Just another tasklet in the scheduler
>>>
>>> #最后,加入接收的微进程
...
>>> stackless.tasklet(receiving_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Recieving tasklet started
send from sending_tasklet
Receiving tasklet finished
sending tasklet finished
>>>
发送信息的微进程,只有在成功地将数据发送到了另一个微进程之后,才会重新被插入到调度器中。
2.4 总结
以上涵盖了stackless的大部分功能。似乎不多是吧?——我们只使用了少许对象,和大约四五个函数调用,来进行操作。但是,使用这种简单的API作为基本构建单元,我们可以开始做一些真正有趣的事情。
3 协程(coroutine)
3.1 子例程的问题
大多数传统编程语言具有子例程的概念。一个子例程被另一个例程(可能还是其它某个例程的子例程)所调用,或返回一个结果,或不返回结果。从定义上说,一个子例程是从属于其调用者的。
见下例:
def ping():
print "PING"
pong()
def pong():
print "PONG"
ping()
ping()
有经验的编程者会看到这个程序的问题所在:它导致了堆栈溢出。如果运行这个程序,它将显示一大堆讨厌的跟踪信息,来指出堆栈空间已经耗尽。
3.1.1 堆栈
我仔细考虑了,自己对C语言堆栈的细节究竟了解多少,最终还是决定完全不去讲它。似乎,其他人对其所尝试的描述,以及图表,只有本身已经理解了的人才能看得懂。我将试着给出一个最简单的说明,而对其有更多兴趣的读者可以从网上查找更多信息。
每当一个子例程被调用,都有一个“栈帧”被建立,这是用来保存变量,以及其他子例程局部信息的区域。于是,当你调用 ping() ,则有一个栈帧被建立,来保存这次调用相关的信息。简言之,这个帧记载着 ping 被调用了。当再调用 pong() ,则又建立了一个栈帧,记载着 pong 也被调用了。这些栈帧是串联在一起的,每个子例程调用都是其中的一环。就这样,堆栈中显示: ping 被调用所以 pong 接下来被调用。显然,当 pong() 再调用 ping() ,则使堆栈再扩展。下面是个直观的表示:
帧 堆栈
1 ping 被调用
2 ping 被调用,所以 pong 被调用
3 ping 被调用,所以 pong 被调用,所以 ping 被调用
4 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用
5 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用
6 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用……
现在假设,这个页面的宽度就表示系统为堆栈所分配的全部内存空间,当其顶到页面的边缘的时候,将会发生溢出,系统内存耗尽,即术语“堆栈溢出”。
3.1.2 那为什么要使用堆栈?
上例是有意设计的,用来体现堆栈的问题所在。在大多数情况下,当每个子例程返回的时候,其栈帧将被清除掉,就是说堆栈将会自行实现清理过程。这一般来说是件好事,在C语言中,堆栈就是一个不需要编程者来手动进行内存管理的区域。很幸运,Python程序员也不需要直接来担心内存管理与堆栈。但是由于 Python解释器本身也是用C实现的,那些实现者们可是需要担心这个的。使用堆栈是会使事情方便,除非我们开始调用那种从不返回的函数,如上例中的,那时候,堆栈的表现就开始和程序员别扭起来,并耗尽可用的内存。
3.2 走进协程
此时,将堆栈弄溢出是有点愚蠢的。 ping() 和 pong() 本不是真正意义的子例程,因为其中哪个也不从属于另一个,它们是“协程”,处于同等的地位,并可以彼此间进行无缝通信。
帧 堆栈
1 ping 被调用
2 pong 被调用
3 ping 被调用
4 pong 被调用
5 ping 被调用
6 pong 被调用
在stackless中,我们使用通道来建立协程。还记得吗,通道所带来的两个好处中的一个,就是能够控制微进程之间运行的流程。使用通道,我们可以在 ping 和 pong 这两个协程之间自由来回,要多少次就多少次,都不会堆栈溢出:
#
# pingpong_stackless.py
#
import stackless
ping_channel = stackless.channel()
pong_channel = stackless.channel()
def ping():
while ping_channel.receive(): #在此阻塞
print "PING"
pong_channel.send("from ping")
def pong():
while pong_channel.receive():
print "PONG"
ping_channel.send("from pong")
stackless.tasklet(ping)()
stackless.tasklet(pong)()
# 我们需要发送一个消息来初始化这个游戏的状态
# 否则,两个微进程都会阻塞
stackless.tasklet(ping_channel.send)('startup')
stackless.run()
你可以运行这个程序要多久有多久,它都不会崩溃,且如果你检查其内存使用量(使用Windows的任务管理器或Linux的top命令),将会发现使用量是恒定的。这个程序的协程版本,不管运行一分钟还是一天,使用的内存都是一样的。而如果你检查原先那个递归版本的内存用量,则会发现其迅速增长,直到崩溃。
3.3 总结
是否还记得,先前我提到过,那个代码的递归版本,有经验的程序员会一眼看出毛病。但老实说,这里面并没有什么“计算机科学”方面的原因在阻碍它的正常工作,有些让人坚信的东西,其实只是个与实现细节有关的小问题——只因为大多数传统编程语言都使用堆栈。某种意义上说,有经验的程序员都是被洗了脑,从而相信这是个可以接受的问题。而stackless,则真正察觉了这个问题,并除掉了它。
4 轻量级线程
与当今的操作系统中内建的、和标准Python代码中所支持的普通线程相比,“微线程”要更为轻量级,正如其名称所暗示。它比传统线程占用更少的内存,并且微线程之间的切换,要比传统线程之间的切换更加节省资源。
为了准确说明微线程的效率究竟比传统线程高多少,我们用两者来写同一个程序。
4.1 Hackysack的模拟
Hackysack是一种游戏,就是一伙脏乎乎的小子围成一个圈,来回踢一个装满了豆粒的沙包,目标是不让这个沙包落地,当传球给别人的时候,可以耍各种把戏。踢沙包只可以用脚。
在我们的简易模拟中,我们假设一旦游戏开始,圈里人数就是恒定的,并且每个人都是如此厉害,以至于如果允许的话,这个游戏可以永远停不下来。
4.2 游戏的传统线程版本
import thread
import random
import sys
import Queue
class hackysacker:
counter = 0
def __init__(self,name,circle):
self.name = name
self.circle = circle
circle.append(self)
self.messageQueue = Queue.Queue()
thread.start_new_thread(self.messageLoop,())
def incrementCounter(self):
hackysacker.counter += 1
if hackysacker.counter >= turns:
while self.circle:
hs = self.circle.pop()
if hs is not self:
hs.messageQueue.put('exit')
sys.exit()
def messageLoop(self):
while 1:
message = self.messageQueue.get()
if message == "exit":
debugPrint("%s is going home" % self.name)
sys.exit()
debugPrint("%s got hackeysack from %s" % (self.name, message.name))
kickTo = self.circle[random.randint(0,len(self.circle)-1)]
debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name))
self.incrementCounter()
kickTo.messageQueue.put(self)
def debugPrint(x):
if debug:
print x
debug=1
hackysackers=5
turns = 5
def runit(hs=10,ts=10,dbg=1):
global hackysackers,turns,debug
hackysackers = hs
turns = ts
debug = dbg
hackysacker.counter= 0
circle = []
one = hackysacker('1',circle)
for i in range(hackysackers):
hackysacker(`i`,circle)
one.messageQueue.put(one)
try:
while circle:
pass
except:
#有时我们在清理过程中会遇到诡异的错误。
pass
if __name__ == "__main__":
runit(dbg=1)
一个“玩者”类的初始化用到了其名字,和一个指向包含了所有玩者的全局列表 circle 的引用,还有一个继承自Python标准库中的Queue类的消息队列。
Queue 这个类的作用,与stackless的通道类似。它包含 put() 和 get() 方法,在一个空的Queue上调用 put() 会阻塞,直到另一个线程调用 put() 将数据送入Queue中为止。Queue这个类被设计为能与操作系统级的线程高效合作。
__init__ 方法接下来使用Python标准库中的thread模块新建一个线程,并在新线程中开始了一个消息循环。此消息循环是个无限循环,不停地处理队列中的消息。如果其收到一个特殊的消息 'exit' ,则结束这个线程。
如果收到了另一个消息——指定其收到了沙包,玩者则从圈中随机选取一个其他玩者,通过向其发送一条消息来指定,将沙包再踢给它。
由类成员变量 hackysacker.counter 进行计数,当沙包被踢够了指定的次数时,将会向圈中的所有玩者都发送一条特殊的 'exit' 消息。
注意,当全局变量debug为非零的时候,还有个函数debugPrint可以输出信息。我们可以使这游戏输出到标准输出,但当计时的时候,这会影响精确度。
我们来运行这个程序,并检查其是否正常工作:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.exe
hackysackthreaded.py
1 got hackeysack from 1
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 0
0 got hackeysack from 4
0 kicking hackeysack to 1
1 got hackeysack from 0
1 kicking hackeysack to 3
3 got hackeysack from 1
3 kicking hackeysack to 3
4 is going home
2 is going home
1 is going home
0 is going home
1 is going home
C:\Documents and Settings\grant\Desktop\why_stackless\code>
如我们所见,所有玩者到了一起,并很快地进行了一场游戏。现在,我们对若干次实验运行过程进行计时。Python标准库中有一个 timeit.py 程序,可以用作此目的。那么,我们也同时关掉调试输出:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackysackthreaded" hackysackthreaded.ru
nit(10,1000,0)
10 loops, best of 3: 183 msec per loop
在我的机器上,十个玩者共进行1000次传球,共使用了183毫秒。我们来增加玩者的数量:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackeysackthreaded" hackeysackthreaded.ru
nit(100,1000,0)
10 loops, best of 3: 231 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackysackthreaded" hackysackthreaded.ru
nit(1000,1000,0)
10 loops, best of 3: 681 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\python24\python.ex
e c:\Python24\lib\timeit.py -s "import hackysackthreaded" hackysackthreaded.ru
nit(10000,1000,0)
Traceback (most recent call last):
File "c:\Python24\lib\timeit.py", line 255, in main
x = t.timeit(number)
File "c:\Python24\lib\timeit.py", line 161, in timeit
timing = self.inner(it, self.timer)
File "<timeit-src>", line 6, in inner
File ".\hackeysackthreaded.py", line 58, in runit
hackysacker(`i`,circle)
File ".\hackeysackthreaded.py", line 14, in __init__
thread.start_new_thread(self.messageLoop,())
error: can't start new thread
在我的3GHz、1G内存的机器上,当尝试10,000个线程的时候出现了错误。就不想拿出这详细的输出内容来扰人了,只是通过若干实验与出错过程得出,在我机器上,此程序从1100个线程左右开始出错。另请注意,1000个线程时候所耗用的时间,是10个线程时候的大约三倍。
4.3 Stackless
import stackless
import random
import sys
class hackysacker:
counter = 0
def __init__(self,name,circle):
self.name = name
self.circle = circle
circle.append(self)
self.channel = stackless.channel()
stackless.tasklet(self.messageLoop)()
def incrementCounter(self):
hackysacker.counter += 1
if hackysacker.counter >= turns:
while self.circle:
self.circle.pop().channel.send('exit')
def messageLoop(self):
while 1:
message = self.channel.receive()
if message == 'exit':
return
debugPrint("%s got hackeysack from %s" % (self.name, message.name))
kickTo = self.circle[random.randint(0,len(self.circle)-1)]
while kickTo is self:
kickTo = self.circle[random.randint(0,len(self.circle)-1)]
debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name))
self.incrementCounter()
kickTo.channel.send(self)
def debugPrint(x):
if debug:print x
debug = 5
hackysackers = 5
turns = 1
def runit(hs=5,ts=5,dbg=1):
global hackysackers,turns,debug
hackysackers = hs
turns = ts
debug = dbg
hackysacker.counter = 0
circle = []
one = hackysacker('1',circle)
for i in range(hackysackers):
hackysacker(`i`,circle)
one.channel.send(one)
try:
stackless.run()
except TaskletExit:
pass
if __name__ == "__main__":
runit()
以上代码实质上与线程版本是等价的,主要区别仅在于我们使用微进程来代替线程,并且使用通道代替Queue来进行切换。让我们运行它,并检查输出:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e hackysackstackless.py
1 got hackeysack from 1
1 kicking hackeysack to 1
1 got hackeysack from 1
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 1
1 got hackeysack from 4
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 0
工作情况确如预期。现在来计时:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(10,1000,0)
100 loops, best of 3: 19.7 msec per loop
其仅用了19.7毫秒,速度几乎是线程版本的10倍。现在我们同样开始增加微线程的数量:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(100,1000,0)
100 loops, best of 3: 19.7 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(1000,1000,0)
10 loops, best of 3: 26.9 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(10000,1000,0)
10 loops, best of 3: 109 msec per loop
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e c:\Python24\lib\timeit.py -s"import hackysackstackless" hackysackstackless.r
unit(100000,1000,0)
10 loops, best of 3: 1.07 sec per loop
甚至直到10,000个线程的时候,那时线程版本早已不能运行了,而这个仍然可以比线程版本在10个线程的时候运行的还快。
这里我在尽量保持代码的简洁,因此你可以相信我的话:计时时间的增长仅仅在于初始化游戏圈子的部分,而真正进行游戏的时间则是一直不变的,不管使用10个微线程,还是10,000个。这归因于通道的工作方式:当它们收到消息的时候,是立即进行阻塞和恢复操作的。另一方面,各个操作系统线程则是轮番检查自己的队列里是否有了东西,这意味着,跑着越多的线程,性能就变得越差。
4.4 总结
但愿我已经成功地演示了,微线程的运行至少比操作系统线程快一个数量级,并具备远高于后者的可伸缩性。关于操作系统线程的一般常识是:(1)尽量不要使用它,(2)如果非用不可,就能少用一点就少用一点。而stackless的微线程则使我们从这些限制中解放出来。
5 数据流
5.1 工厂
假设,我们要写程序来模拟一个生产玩具娃娃的工厂,具有如下的需求:
一个仓库,装有用来塑造的塑料球。
一个仓库,装有用来连接部件的铆钉。
一台注塑机,可以在6秒内,用0.2磅塑料球来制造一双手臂。
一台注塑机,可以在5秒内,用0.2磅塑料球来制造一双腿。
一台注塑机,可以在4秒内,用0.1磅塑料球来制造一个头部。
一台注塑机,可以在10秒内,用0.5磅塑料球来制造一个躯干。
一个装配台,可以在2秒内,将一个现成的躯干和一双现成的腿,用一个铆钉装配在一起。
一个装配台,可以在2秒内,将上面的半成品和一双现成的手臂,用一个铆钉装配在一起。
一个装配台,可以在3秒内,将上面的半成品和一个现成的头部,用一个铆钉装配在一起。
每台设备都一直不停地工作下去。
5.2 “普通”版本
如果不用stackless而用“普通”的方法来写这个,将会是很痛苦的事情。当我们经历了这个“普通”版示例之后,会再用stackless来做一个,并比较两者的代码。如果你认为这个例子太不自然,并且有时间的话,可以稍为休息后,根据上面的需求,自己来做一个工厂的实现,再来将你写出的代码和 stackless版本做个比较。
代码如下:
class storeroom:
def __init__(self,name,product,unit,count):
self.product = product
self.unit = unit
self.count = count
self.name = name
def get(self,count):
if count > self.count:
raise RuntimeError("Not enough %s" % self.product)
else:
self.count -= count
return count
def put(self,count):
self.count += count
def run(self):
pass
rivetStoreroom = storeroom("rivetStoreroom","rivets","#",1000)
plasticStoreroom = storeroom("plastic Storeroom","plastic pellets","lb",100)
class injectionMolder:
def __init__(self,name,partName,plasticSource,plasticPerPart,timeToMold):
self.partName = partName
self.plasticSource = plasticSource
self.plasticPerPart = plasticPerPart
self.timeToMold = timeToMold
self.items = 0
self.plastic = 0
self.time = -1
self.name = name
def get(self,items):
if items > self.items:
return 0
else:
self.items -= items
return items
def run(self):
if self.time == 0:
self.items += 1
print "%s finished making part" % self.name
self.time -= 1
elif self.time < 0:
print "%s starts making new part %s" % (self.name,self.partName)
if self.plastic < self.plasticPerPart:
print "%s getting more plastic"
self.plastic += self.plasticSource.get(self.plasticPerPart * 10)
self.time = self.timeToMold
else:
print "%s molding for %s more seconds" % (self.partName, self.time)
self.time -= 1
armMolder = injectionMolder("arm Molder", "arms",plasticStoreroom,0.2,6)
legMolder = injectionMolder("leg Molder", "leg",plasticStoreroom,0.2,5)
headMolder = injectionMolder("head Molder","head",plasticStoreroom,0.1,4)
torsoMolder = injectionMolder("torso Molder","torso",plasticStoreroom,0.5,10)
class assembler:
def __init__(self,name,partAsource,partBsource,rivetSource,timeToAssemble):
self.partAsource = partAsource
self.partBsource = partBsource
self.rivetSource = rivetSource
self.timeToAssemble = timeToAssemble
self.itemA = 0
self.itemB = 0
self.items = 0
self.rivets = 0
self.time = -1
self.name = name
def get(self,items):
if items > self.items:
return 0
else:
self.items -= items
return items
def run(self):
if self.time == 0:
self.items += 1
print "%s finished assembling part" % self.name
self.time -= 1
elif self.time < 0:
print "%s starts assembling new part" % self.name
if self.itemA < 1:
print "%s Getting item A" % self.name
self.itemA += self.partAsource.get(1)
if self.itemA < 1:
print "%s waiting for item A" % self.name
elif self.itemB < 1:
print "%s Getting item B" % self.name
self.itemB += self.partBsource.get(1)
if self.itemB < 1:
print "%s waiting for item B" % self.name
print "%s starting to assemble" % self.name
self.time = self.timeToAssemble
else:
print "%s assembling for %s more seconds" % (self.name, self.time)
self.time -= 1
legAssembler = assembler("leg Assembler",torsoMolder,legMolder,rivetStoreroom,2)
armAssembler = assembler("arm Assembler", armMolder,legAssembler,rivetStoreroom,2)
torsoAssembler = assembler("torso Assembler", headMolder,armAssembler,
rivetStoreroom,3)
components = [rivetStoreroom, plasticStoreroom, armMolder,
legMolder, headMolder, torsoMolder,
legAssembler, armAssembler, torsoAssembler]
def run():
while 1:
for component in components:
component.run()
raw_input("Press <ENTER> to continue...")
print "\n\n\n"
if __name__ == "__main__":
run()
5.2.1 分析
我们从一个代表仓库的类开始,它的初始化需要一个其所储存的产品的名称、一个衡量单位(如磅,或部件数目)和一个初始存量作为参数。还有一个 run 方法什么也不做,其用途将会在稍后了解。基于这个类,我们建立了两个仓库示例。
接下来是一个注塑机类,它的初始化需要其产品的名称、一个作为塑料来源的仓库、制造一个部件所需要的原料量,和制造一个部件所需的时间作为参数。有一个 get() 方法,在其内部已有完成的产品时,可将其取出,并调整内部记录。对于这个类, run() 方法是确实做了些事情的:
在计时器大于0期间,塑造过程持续进行,并递减计时器。
当塑造剩余时间达到0,则一个产品被建立,并把计时器设为-1。
当计时器为-1时,注塑机检测是否还有足够的塑料来塑造下一个产品,如果有,则取来原料,并开始塑造。
用这个类,我们建立了四个注塑机实例。
再接下来是一个装配台类,它的初始化需要其产品的名字、部件1的来源、部件2的来源、一个铆钉的仓库,以及装配这些部件所需的时间作为参数。也有一个 get() 方法,在其内部已有完成的产品时,可将其取出,并调整内部记录。而这个类的 run() 方法是这样的:
若计时器大于0,则已经具备原材料的装配台继续其装配过程。
如果计时器等于0,则一个产品被完成,内部记录随之被调整。
如果计时器小于0,则装配台试图取得新的各个部件,并再次开始装配。若其中某个部件还没有来得及塑造出来,则必须等待。
为了装配腿、手臂和头部,各有一个装配台实例被建立。
注:
你会注意到,仓库、注塑机和装配台类有很多相似之处。如果我是在写一个真正生产系统,则很可能先建立一个基类,并使用继承。但在这里,我觉得做出这种类层次关系的话只会使代码变得繁杂,所以有意保持了其简单。
由以上三个类所建立的所有实例,都被装进一个称为 components 的“设备”数组中。然后,我们建立一个事件循环,重复地调用每个设备的 run() 方法。
5.3 走进数据流
如果你熟悉 Unix 系统,那么不管你知不知道数据流技术,恐怕你都已经在使用它了。看下面的 shell 命令:
cat README | more
为了公平,也举出 Windows 中对应的:
type readme.txt | more
尽管,在 Windows 的世界中,数据流技术并不像在 Unix 世界中那么普遍深入。
顺便对还不熟悉 more 工具的读者:这个程序从一个外部来源接收输入,显示一页的内容后暂停,直到用户按下任意键,再显示下一页。这个“|”操作符获取一个程序的输出,并用管道将其传送到另一个命令的输入。这样,不管 cat 还是 type ,都是将文档内容传送到标准输出,而 more 则接收这些输出。
这样,more 程序仅仅是坐在那里,等着来自另一个程序的数据来流向自己。只要流进的数据足够一定量,就在屏幕上显示一页并暂停;而用户击键时,more 则让后面的数据再流入,并开始再一次等待数据量足够,再显示,再暂停。这便是术语“数据流”。
使用通道,再使用stackless本身的轮转调度器,我们就可以使用数据流技术来写这个工厂的模拟。
5.4 代码的stackless版本
import stackless
#
# “休眠” 辅助函数
#
sleepingTasklets = []
sleepingTicks = 0
def Sleep(secondsToWait):
channel = stackless.channel()
endTime = sleepingTicks + secondsToWait
sleepingTasklets.append((endTime, channel))
sleepingTasklets.sort()
# 阻塞,直到收到一个唤醒通知。
channel.receive()
def ManageSleepingTasklets():
global sleepingTicks
while 1:
if len(sleepingTasklets):
endTime = sleepingTasklets[0][0]
while endTime <= sleepingTicks:
channel = sleepingTasklets[0][1]
del sleepingTasklets[0]
# 我们需要发送一些东西,但发什么无所谓,
# 因为其内容是没用的。
channel.send(None)
endTime = sleepingTasklets[0][0] # 检查下一个
sleepingTicks += 1
print "1 second passed"
stackless.schedule()
stackless.tasklet(ManageSleepingTasklets)()
#
# 工厂的实现
#
class storeroom:
def __init__(self,name,product,unit,count):
self.product = product
self.unit = unit
self.count = count
self.name = name
def get(self,count):
while count > self.count: #重新调度,直到有了足够的原料
print "%s doesn't have enough %s to deliver yet" % (self.name,
self.product)
stackless.schedule()
self.count -= count
return count
return count
def put(self,count):
self.count += count
def run(self):
pass
rivetStoreroom = storeroom("rivetStoreroom","rivets","#",1000)
plasticStoreroom = storeroom("plastic Storeroom","plastic pellets","lb",100)
class injectionMolder:
def __init__(self,name,partName,plasticSource,plasticPerPart,timeToMold):
self.partName = partName
self.plasticSource = plasticSource
self.plasticPerPart = plasticPerPart
self.timeToMold = timeToMold
self.plastic = 0
self.items = 0
self.name = name
stackless.tasklet(self.run)()
def get(self,items):
while items > self.items: #重新调度,直到有了足够的产品
print "%s doesn't have enough %s to deliver yet" % (self.name,
self.partName)
stackless.schedule()
self.items -= items
return items
def run(self):
while 1:
print "%s starts making new part %s" % (self.name,self.partName)
if self.plastic < self.plasticPerPart:
print "%s getting more plastic"
self.plastic += self.plasticSource.get(self.plasticPerPart * 10)
self.plastic -= self.plasticPerPart
Sleep(self.timeToMold)
print "%s done molding after %s seconds" % (self.partName,
self.timeToMold)
self.items += 1
print "%s finished making part" % self.name
stackless.schedule()
armMolder = injectionMolder("arm Molder", "arms",plasticStoreroom,0.2,5)
legMolder = injectionMolder("leg Molder", "leg",plasticStoreroom,0.2,5)
headMolder = injectionMolder("head Molder","head",plasticStoreroom,0.1,5)
torsoMolder = injectionMolder("torso Molder","torso",plasticStoreroom,0.5,10)
class assembler:
def __init__(self,name,partAsource,partBsource,rivetSource,timeToAssemble):
self.partAsource = partAsource
self.partBsource = partBsource
self.rivetSource = rivetSource
self.timeToAssemble = timeToAssemble
self.itemA = 0
self.itemB = 0
self.items = 0
self.rivets = 0
self.name = name
stackless.tasklet(self.run)()
def get(self,items):
while items > self.items: #重新调度,直到有了足够的产品
print "Don't have a %s to deliver yet" % (self.name)
stackless.schedule()
self.items -= items
return items
def run(self):
while 1:
print "%s starts assembling new part" % self.name
self.itemA += self.partAsource.get(1)
self.itemB += self.partBsource.get(1)
print "%s starting to assemble" % self.name
Sleep(self.timeToAssemble)
print "%s done assembling after %s" % (self.name, self.timeToAssemble)
self.items += 1
print "%s finished assembling part" % self.name
stackless.schedule()
legAssembler = assembler("leg Assembler",torsoMolder,legMolder,rivetStoreroom,2)
armAssembler = assembler("arm Assembler", armMolder,legAssembler,rivetStoreroom,2)
torsoAssembler = assembler("torso Assembler", headMolder,armAssembler,
rivetStoreroom,3)
def pause():
while 1:
raw_input("Press <ENTER> to continue...")
print "\n\n\n"
stackless.schedule()
stackless.tasklet(pause)()
def run():
stackless.run()
if __name__ == "__main__":
run()
5.4.1 分析
5.4.1.1 休眠功能
首先我们建立了一些辅助函数,好让我们的类可以进行“休眠”。一个微进程调用 Sleep() ,则先建立一个通道,再计算出将被唤醒的时间,并将这个时间信息添加到全局数组 sleepingTasklets 中。之后,将调用 channel.receive() ,这将使该微进程暂停运行,直到被再次唤醒。
接着我们建立另一个函数,来管理所有休眠的微进程。它检查全局数组 sleepingTasklets ,找出所有需要立即被唤醒的成员,并通过其通道来将其唤醒。这个函数也被添加到了微进程调度器中。
5.4.1.2 类
这些类与“普通”版本中的类相似,但也有一些显著不同:首先,在实例化的时候,他们的 run() 方法创建了微进程,这样我们不再需要手工建立一个设备数组,和一个外部的 run() 函数来处理事件循环,stackless本身就隐式地做了这些工作。其次的不同是,微进程可以通过休眠来等待一个产品被产出,而不用通过计数器来计时。第三个不同,则是对 get() 的调用变得更自然了,如果某种原材料没有准备好,则这个微进程简单地重新进入调度循环,直到有了原材料。
5.5 那我们获得了什么?
OK,两个版本的程序都能运行,并得到同样的结果,那这里究竟有什么大不了的事情?——让我们查看一下普通版本的工厂的 run 方法:
def run(self):
if self.time == 0:
self.items += 1
print "%s finished assembling part" % self.name
self.time -= 1
elif self.time < 0:
print "%s starts assembling new part" % self.name
if self.itemA < 1:
print "%s Getting item A" % self.name
self.itemA += self.partAsource.get(1)
if self.itemA < 1:
print "%s waiting for item A" % self.name
elif self.itemB < 1:
print "%s Getting item B" % self.name
self.itemB += self.partBsource.get(1)
if self.itemB < 1:
print "%s waiting for item B" % self.name
print "%s starting to assemble" % self.name
self.time = self.timeToAssemble
else:
print "%s assembling for %s more seconds" % (self.name, self.time)
self.time -= 1
再看 stackless 的版本:
def run(self):
while 1:
print "%s starts assembling new part" % self.name
self.itemA += self.partAsource.get(1)
self.itemB += self.partBsource.get(1)
print "%s starting to assemble" % self.name
Sleep(self.timeToAssemble)
print "%s done assembling after %s" % (self.name, self.timeToAssemble)
self.items += 1
print "%s finished assembling part" % self.name
stackless.schedule()
Stackless 的版本比普通的版本更加简单、清晰和直观,它不需要将事件循环的基础结构包装进 run 方法中,这个结构已经和 run() 方法解除了耦合。run() 方法仅仅描述了自己要做什么,而不需要关心具体究竟怎么做的。这就使软件开发者能集中精力于工厂的运作,而不是事件循环以及程序本身的运作。
5.6 “推”数据
注:
本节的完整程序保存为 digitalCircuit.py ,在本文的末尾,和代码.zip文件中和都有。
在工厂的例子中,我们是在“拉”数据:每个部分都去请求其所需要的部件,并一直等待那些部件到来。我们也可以来“推”数据,这样,系统中的每个部分都将自身的变化向下传播到另一个部分。“拉”的方式,称为“懒惰数据流”,而“推”的方式则称为“急切数据流”。
为了演示“推”的方式,我们来建立一个数字电路的模拟器。这个模拟器由各种元件组成,元件具有0或1的状态,并可以各种方式互相连接起来。这里我们使用面向对象的方法,并定义一个 EventHandler 基类来实现其大部分功能:
class EventHandler:
def __init__(self,*outputs):
if outputs==None:
self.outputs=[]
else:
self.outputs=list(outputs)
self.channel = stackless.channel()
stackless.tasklet(self.listen)()
def listen(self):
while 1:
val = self.channel.receive()
self.processMessage(val)
for output in self.outputs:
self.notify(output)
def processMessage(self,val):
pass
def notify(self,output):
pass
def registerOutput(self,output):
self.outputs.append(output)
def __call__(self,val):
self.channel.send(val)
EventHandler 类的核心功能,是做以下三件事:
通过 listen 方法,持续地监听一个通道上传来的消息。
之后,通过 processMessage 方法,处理所有收到的消息。
最后,通过 notify 方法,将处理结果通知到所有注册的输出端。
还有两个附加的辅助方法:
registerOutput 用来在实例建立之后,再注册额外的输出端。
__call__ 被重载,作为一种便利,使我们可以以这种格式来发送消息:
event(1)
从而无需这样:
event.channel.send(1)
使用 EventHandler 类作为基本构建单元,我们可以开始实现这个数字电路模拟器,由一个开关开始。下面描述的是一个可由用户控制的开关,可以向其发送0或1的值:
class Switch(EventHandler):
def __init__(self,initialState=0,*outputs):
EventHandler.__init__(self,*outputs)
self.state = initialState
def processMessage(self,val):
debugPrint("Setting input to %s" % val)
self.state = val
def notify(self,output):
output((self,self.state))
初始化之后,这个开关就保存着其初始的状态,而 processMessage 则被重载,用来将收到的消息保存起来,成为新的当前状态。其 notify 方法则被重载为发送一个元组,其中含有指向实例自身的引用,还有其状态。我们不久后会看到,我们需要顺便发送这个自身的引用,这样,那些具有多个输入端的元件则可以判别,消息来自于哪个来源。
注:
若你正在随着我们的进度来调试代码,则别忘了我们还在使用 debugPrint() 函数来提供诊断信息,它最初是在“轻量级线程”这一节中定义的。
接下来我们要建立的,是“指示器”类,这个类的实例的作用仅仅是将其当前状态输出。我想我们可以认为,其相当于真正的数字电路中的发光二极管:
class Reporter(EventHandler):
def __init__(self,msg="%(sender)s send message %(value)s"):
EventHandler.__init__(self)
self.msg = msg
def processMessage(self,msg):
sender,value=msg
print self.msg % {'sender':sender,'value':value}
其初始化接受一个可选的格式字符串,来指定之后输出的样式。代码的其他部分意义自明。
现在我们有了一个足够好的框架,来测试这些最初的功能:
C:\Documents and Settings\grant\Desktop\why_stackless\code>c:\Python24\python.ex
e
Python 2.4.3 Stackless 3.1b3 060516 (#69, May 3 2006, 11:46:11) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>> from digitalCircuit import *
>>>
>>> reporter = Reporter()
>>> switch = Switch(0,reporter) #创建一个开关,并连接到一个指示器做输出。
>>>
>>> switch(1)
<digitalCircuit.Switch instance at 0x00A46828> send message 1
>>>
>>> switch(0)
<digitalCircuit.Switch instance at 0x00A46828> send message 0
>>>
>>> switch(1)
<digitalCircuit.Switch instance at 0x00A46828> send message 1
>>>
与先前设计的工厂不同,对开关的操作会使结果立即被推至其输出端,并显示出来。
现在我们来建立一些数字逻辑部件,首先是反相器,它接受一个输入,并将其逻辑相反的值推出,就是说输入0会输出1,输入1会输出0:
class Inverter(EventHandler):
def __init__(self,input,*outputs):
EventHandler.__init__(self,*outputs)
self.input = input
input.registerOutput(self)
self.state = 0
def processMessage(self,msg):
sender,value = msg
debugPrint("Inverter received %s from %s" % (value,msg))
if value:
self.state = 0
else:
self.state = 1
反相器的初始化参数为一个输入端,即另外某个 EventHandler ,将它保存下来,并将自身注册为它的一个输出端。而 processMessage() 方法,则将自身的状态设为收到的消息的逻辑相反值。与 Switch 类类似,反相器类的 notify 事件也发送一个由其自身和其状态所组成的元组。
我们可以修改上面的例子,在开关和指示器之间串联入一个反相器。如有兴趣,尽可一试,但这个过程我认为已经没有必要列出了。
接下来是一个与门,这是我们遇到的第一个有多个输入端的类。它有两个输入端,如果都被置为1,则送出消息1,否则送出消息0:
class AndGate(EventHandler):
def __init__(self,inputA,inputB,*outputs):
EventHandler.__init__(self,*outputs)
self.inputA = inputA
self.inputAstate = inputA.state
inputA.registerOutput(self)
self.inputB = inputB
self.inputBstate = inputB.state
inputB.registerOutput(self)
self.state = 0
def processMessage(self,msg):
sender, value = msg
debugPrint("AndGate received %s from %s" % (value,sender))
if sender is self.inputA:
self.inputAstate = value
elif sender is self.inputB:
self.inputBstate = value
else:
raise RuntimeError("Didn't expect message from %s" % sender)
if self.inputAstate and self.inputBstate:
self.state = 1
else:
self.state = 0
debugPrint("AndGate's new state => %s" % self.state)
def notify(self,output):
output((self,self.state))
在与门的 processMessage 方法中,我们需要判定,是哪个输入端送来了消息,并据此设置状态。这就是为什么别的部件送来的消息中需要含有其自身的引用。
最后我们做出或门。它和与门类似,只是,它只要有任一个输入端为1的时候就送出消息1,只有两个输入端都为0的时候才送出0:
class OrGate(EventHandler):
def __init__(self,inputA,inputB,*outputs):
EventHandler.__init__(self,*outputs)
self.inputA = inputA
self.inputAstate = inputA.state
inputA.registerOutput(self)
self.inputB = inputB
self.inputBstate = inputB.state
inputB.registerOutput(self)
self.state = 0
def processMessage(self,msg):
sender, value = msg
debugPrint("OrGate received %s from %s" % (value,sender))
if sender is self.inputA:
self.inputAstate = value
elif sender is self.inputB:
self.inputBstate = value
else:
raise RuntimeError("Didn't expect message from %s" % sender)
if self.inputAstate or self.inputBstate:
self.state = 1
else:
self.state = 0
debugPrint("OrGate's new state => %s" % self.state)
def notify(self,output):
output((self,self.state))
5.6.1 半加器
作为结束,我们将使用我们已经建立的所有部件,来构建一个半加器。半加器实现两个比特的加法。我们将一些部件连接了起来,然后来“拨动”开关。开关的动作改变了其状态,并且把其变化,以数据流的方式,通过系统传播了下去:
if __name__ == "__main__":
# 半加器
inputA = Switch()
inputB = Switch()
result = Reporter("Result = %(value)s")
carry = Reporter("Carry = %(value)s")
andGateA = AndGate(inputA,inputB,carry)
orGate = OrGate(inputA,inputB)
inverter = Inverter(andGateA)
andGateB = AndGate(orGate,inverter,result)
inputA(1)
inputB(1)
inputB(0)
inputA(0)
6 角色
在角色的模型里面,一切都是角色(废话!)。角色就是一个对象(一般意义上的对象,而不必是面向对象中的意义),它可以:
从其他角色接收消息。
对收到的消息中适合于自己的,进行处理。
向其它角色发送消息。
创建新的角色。
一个角色对其它的角色并不具有直接的访问渠道,所有的交流都通过消息传递来完成。这就提供了丰富的模型,来模拟现实世界中的对象——它们是彼此松散耦合的,并对彼此的内部所知有限。
如果我们要建立一个模拟过程的话,就来模拟一下……
6.1 杀手机器人!
注:
本节的完整程序保存为 actors.py ,在本文的末尾,和代码.zip文件中和都有。
6.1.1 角色基类
在这个例子中,我们将配置出一个小小的世界,在其中有一些使用角色模型来移动和战斗的机器人。作为开始,我们来定义所有角色的基类:
class actor:
def __init__(self):
self.channel = stackless.channel()
self.processMessageMethod = self.defaultMessageAction
stackless.tasklet(self.processMessage)()
def processMessage(self):
while 1:
self.processMessageMethod(self.channel.receive())
def defaultMessageAction(self,args):
print args
默认情况下,角色建立一个通道来接收消息,指定一个方法来处理这些消息,并启动一个循环来将接收的消息分派给处理方法。默认的处理过程只是把收到的消息显示出来。这些,已经是我们实现角色模型所需要的全部。
6.1.2 消息的格式
所有发送的消息都遵从一个格式:先是发送者的通道,接着一个字符串为消息的名称,再接下来是可选的参数。例如:
(self.channel, "JOIN", (1,1) )
(self.channel, "COLLISION")
等等……
注意,我们只将发送者的通道随消息送出,而不是整个发送者对象。在角色模型中,角色间的所有交流都必须通过消息传递来体现,如果将 self 都发送出去的话,则使得对方可以很容易地用非正常手段对发送者的内部未知信息进行访问。
事实上你会注意到,当我们将本节的大部分角色实例化的时候,甚至不需要将其赋值给能被别的角色访问到的变量。我们仅仅创建它们,并让它们独自漂浮在那里,对周围环境只有有限的了解。
6.1.3 “世界”类
“世界”角色,扮演着其他所有角色相互作用的中央枢纽。其他角色发送 JOIN 消息给世界角色,后者则跟踪它们。周期性地,世界角色发出 WORLD_STATE 消息,其中包括关于所有可见的角色的信息,来供它们内部处理:
class world(actor):
def __init__(self):
actor.__init__(self)
self.registeredActors = {}
stackless.tasklet(self.sendStateToActors)()
def testForCollision(self,x,y):
if x < 0 or x > 496:
return 1
elif y < 0 or y > 496:
return 1
else:
return 0
def sendStateToActors(self):
while 1:
for actor in self.registeredActors.keys():
actorInfo = self.registeredActors[actor]
if self.registeredActors[actor][1] != (-1,-1):
VectorX,VectorY = (math.sin(math.radians(actorInfo[2])) * actorInfo[3],
math.cos(math.radians(actorInfo[2])) * actorInfo[3])
x,y = actorInfo[1]
x += VectorX
y -= VectorY
if self.testForCollision(x,y):
actor.send((self.channel,"COLLISION"))
else:
self.registeredActors[actor] = tuple([actorInfo[0],
(x,y),
actorInfo[2],
actorInfo[3]])
worldState = [self.channel, "WORLD_STATE"]
for actor in self.registeredActors.keys():
if self.registeredActors[actor][1] != (-1,-1):
worldState.append( (actor, self.registeredActors[actor]))
message = tuple(worldState)
for actor in self.registeredActors.keys():
actor.send(message)
stackless.schedule()
def defaultMessageAction(self,args):
sentFrom, msg, msgArgs = args[0],args[1],args[2:]
if msg == "JOIN":
print 'ADDING ' , msgArgs
self.registeredActors[sentFrom] = msgArgs
elif msg == "UPDATE_VECTOR":
self.registeredActors[sentFrom] = tuple([self.registeredActors[sentFrom][0],
self.registeredActors[sentFrom][1],
msgArgs[0],msgArgs[1]])
else:
print '!!!! WORLD GOT UNKNOWN MESSAGE ' , args
World = world().channel
除了处理消息的微进程外,“世界”角色还建立了另一个独立的微进程,来执行 sendStateToActors() 方法。这个方法里有个循环,用于构建关于世界状态的信息,并发送给所有的角色。这是其它角色唯一可以指望接收到的消息。若有必要,它们可以回应这个消息,即将某种 UPDATE 消息发回给世界。
作为 sendStateToActors() 方法的一部分,“世界”角色需要更新其内部的、对可移动的角色的位置的记录。它使用可移动角色的角度和速度来建立一个矢量,确保更新后的位置不会撞到世界的四面边界,并存下其新的位置。
defaultMessageAction() 方法处理以下已知信息,并忽略其他的:
JOIN
将一个角色添加到世界中的已知角色列表,其参数包括新角色的位置、角度和速度。位置-1, -1表示这个角色对其它角色不可见,比如后面将要详述的显示屏角色。
UPDATE_VECTOR
为发送这个消息的角色设置新的角度和速度。
最后,一个“世界”角色被实例化,其通道被保存进全局变量 World 中,使其它角色可以发送它们最初的 JOIN 消息。
6.1.4 一个简单的机器人
我们将以一个简单的机器人开始,它以恒定的速度移动,每接到一个 WORLD_STATE 消息的时候,都顺时针旋转1度作为响应。当发生与世界边界碰撞的 COLLISION 事件时,它将旋转73度再尝试前进。所有其他消息将被忽略。
class basicRobot(actor):
def __init__(self,location=(0,0),angle=135,velocity=1,world=World):
actor.__init__(self)
self.location = location
self.angle = angle
self.velocity = velocity
self.world = world
joinMsg =(self.channel,"JOIN",self.__class__.__name__,
self.location,self.angle,self.velocity)
self.world.send(joinMsg)
def defaultMessageAction(self,args):
sentFrom, msg, msgArgs = args[0],args[1],args[2:]
if msg == "WORLD_STATE":
self.location = (self.location[0] + 1, self.location[1] + 1)
self.angle += 1
if self.angle >= 360:
self.angle -= 360
updateMsg = (self.channel, "UPDATE_VECTOR",
self.angle,self.velocity)
self.world.send(updateMsg)
elif msg == "COLLISION":
self.angle += 73
if self.angle >= 360:
self.angle -= 360
else:
print "UNKNOWN MESSAGE", args
basicRobot(angle=135,velocity=5)
basicRobot((464,0),angle=225,velocity=10)
stackless.run()
注意,机器人的构造方法发出了一个 JOIN 消息到“世界”对象,来注册自己。除此之外,代码应该还算易懂的。
6.1.5 蹊径:pyGame
至此,在示例程序中,我们都是使用调试输出语句来显示事情进行的过程。我试图以这种方式来保持代码的简单易懂,但有些时候,输出语句却变得不再直观,而是越发迷惑。在“数据流”一节中我们已经用得很勉强了,而在本节中,情况已经变得复杂到无法再尝试用打印输出来表示了。
注:
要运行本节的代码,需要安装 pyGame 的当前版本,可以从这里取得:http://www.pygame.org/
我决定使用 pyGame 来创建一个简单的可视化引擎。尽管对于 pyGame 内容的叙述已经超出了本教程的范围,但其操作本身还是相对简明的。当显示屏角色收到一个 WORLD_STATE 消息,就将相应的角色放置上去,并更新显示。很幸运,我们可以将所有的 pyGame 代码隔离在一个角色之内,因此代码的其它部分可以保持不被“污染”,依然可以被理解,哪怕不了解也不关心 pyGame 怎么进行的显示渲染:
class display(actor):
def __init__(self,world=World):
actor.__init__(self)
self.world = World
self.icons = {}
pygame.init()
window = pygame.display.set_mode((496,496))
pygame.display.set_caption("Actor Demo")
joinMsg = (self.channel,"JOIN",self.__class__.__name__, (-1,-1))
self.world.send(joinMsg)
def defaultMessageAction(self,args):
sentFrom, msg, msgArgs = args[0],args[1],args[2:]
if msg == "WORLD_STATE":
self.updateDisplay(msgArgs)
else:
print "UNKNOWN MESSAGE", args
def getIcon(self, iconName):
if self.icons.has_key(iconName):
return self.icons[iconName]
else:
iconFile = os.path.join("data","%s.bmp" % iconName)
surface = pygame.image.load(iconFile)
surface.set_colorkey((0xf3,0x0a,0x0a))
self.icons[iconName] = surface
return surface
def updateDisplay(self,actors):
for event in pygame.event.get():
if event.type == pygame.QUIT: sys.exit()
screen = pygame.display.get_surface()
background = pygame.Surface(screen.get_size())
background = background.convert()
background.fill((200, 200, 200))
screen.blit(background, (0,0))
for item in actors:
screen.blit(pygame.transfor
发表评论
-
apns批量使用失败
2012-04-17 19:22 2481突然某一天app 调用 apns 用户莫名其妙收不到, ... -
pylons 中 wsgiapp 和 wsgicontroller 的关系
2011-09-05 02:43 685pylons 看了好久了,喜欢的他精简封装,就想它自己的 ... -
[转] Python 面试题集合
2011-07-27 01:48 1522Python 面试题集合 发布时间:2011-07- ... -
nginx 下 用fastcgi 模式使用 webpy
2011-07-24 00:23 868nginx ---------------------- ... -
【外刊IT评论】Python 程序员的进化
2011-02-11 00:07 1074不久前,在互联网上出现了一篇有趣的文章,讲的是对于同一个问题, ... -
paramiko 的ssh登录
2011-01-04 17:43 1143import paramiko socks=('192. ... -
getattribute getattr
2010-09-11 21:53 798class D(object): def ... -
python 打包【转】
2010-09-02 00:33 1644python的第三方模块越来 ... -
python 3D相关
2009-02-24 23:24 1241Python是什么>? 它不是蟒蛇,而是一种早期产生于L ... -
蛋蛋的python把老子吓死了
2009-02-13 16:23 1257今天无意中敲 python的老窝, 敲到 www.python ... -
我的第一个python应用
2009-02-12 01:47 4983我的一个python应用做完了!哈哈,自己自学python时间 ... -
网上有人实现的飞信的协议
2009-02-09 14:01 814http://cocobear.info/blog/2008/ ... -
python加cookie访问
2009-02-08 23:28 2613import httplib, urllib def t ... -
Python 中的元类编程
2009-01-05 01:56 719最近看python 遇到python的元类编程。 查到这篇文 ... -
[转]常用的python模块
2009-01-02 01:51 1765adodb:我们领导推荐的数据库连接组件 bsddb3:Be ... -
[转]制作Python的安装模块
2009-01-02 01:44 2904Python模块的安装方法: 1. 单文件模块 ... -
python 中文编码的处理
2008-12-31 19:10 4397在win下写点python的代码,对utf-8 老是处理不过来 ... -
用C语言扩展Python的功能
2008-12-21 11:08 1316Pyton和C分别有着各自的 ... -
pyinstaller 来建立linux下的python独立执行文件
2008-12-09 10:21 2482以下内容假定已安装好Python 2.4/2.5 一、下载并 ... -
python 微线程
2008-11-26 18:29 21059.4 微线程—Stackless Pyt ...
相关推荐
我在网上找的《Stackless Python 并发式编程介绍》做成了CHM帮助文档。拿出来共享下: Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处, 并避免传统线程所带来的性能...
Stackless Python 是一种扩展了标准Python解释器的版本,它主要关注并发编程和微线程的实现。在标准Python中,每个线程都有自己的堆栈,这在处理大量并发任务时可能会导致资源浪费和效率低下。Stackless Python通过...
### Stackless Python:并发编程的新范式 #### 1.1 为什么选择Stackless Python? 在探讨Stackless Python之前,我们需要理解并发编程的重要性及其在现代软件开发中的地位。随着计算机硬件的发展,多核处理器已经...
Stackless Python是一种优化过的Python实现,它增强了标准Python解释器的能力,特别是在并发编程和任务切换方面。这个版本的Python去掉了传统的调用栈限制,从而允许更高效的微线程(也称为轻量级线程或协作线程)...
Stackless Python是一种特殊的Python实现,它扩展了标准Python解释器的功能,主要专注于并行处理、微线程(microthreads)和高效的任务切换。这个PPT很可能是为了在公司内部推广Stackless Python,介绍其特点、优势...
Stackless Python 是 Python 的一个分支,它在标准 Python 解释器的基础上添加了一些额外的功能,特别是对并发编程的支持。这个简单的演示将向我们展示 Stackless Python 如何帮助我们更有效地抓取网页,同时利用其...
Stackless Python是一种特殊的Python实现,它扩展了标准Python解释器的功能,特别适合处理并发和微线程。在本文中,我们将深入探讨如何使用Stackless Python来构建一个聊天室服务器,以此来理解其在多任务环境中的...
### Stackless Python并发式编程介绍 #### 1.1 为什么要使用Stackless Python? Stackless Python是Python的一个增强版本,它引入了一种低开销、轻量级的微线程扩展,这种扩展允许开发者利用线程式的编程模式,...
PSP-StacklessPython 适用于 Sony PSP 的 Stackless Python 2.5 端口 这是 Python 编程语言到 Sony PSP 控制台的端口。 当前版本是 2.5.2,它是 CPython 和 Stackless Python 的最新移植版本。 从 Stackless 主页...
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具,...
Stackless Python是一种Python的变体,它强调微线程(microthreads)的使用,提供更高效的并发执行。Stackless Python通过减少每个线程的堆栈大小来实现这一点,从而允许更多的线程同时运行。这在处理大量并发任务时...
goless, 在 Stackless python 之上,像构建语义一样 goless使用英镑的goless库,你可以在 python 中编写语言风格的并发程序。 goless 为频道,选择和goroutine提供功能。 英镑goless允许你使用优美的go并发编程模型...
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具,...
python本身的限制,导致了今天诟病,stackless-python为python的升级版,帮助解决多线程问题,有这方面需求的用了才会说好
我刚才发了一个,下来不能看。对不起大家了。重发。 下载后不能看的解决方法是:右击文件--属性--解除锁定。就可以正常显示了。
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具,...
Stackless Python是一种增强版的Python解释器,它在标准Python的基础上引入了"无栈"的概念,从而提高了程序的并发性能和内存效率。这个压缩包包含的是Stacklesslib 1.2.2版本,适用于Python 2.7环境。安装这个库后,...
Python 的解释器实现有多种,包括 CPython、Pypy、Stackless Python、Jython、IronPython 等。每种解释器都有其特点和优点。例如,Pypy 使用 JIT 技术来提高运行速度,而 Stackless Python 则支持协程实现。 此外...