浏览 2757 次
锁定老帖子 主题:zeroMQ初体验-14.命名机制 进阶
精华帖 (0) :: 良好帖 (0) :: 新手帖 (11) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-03-31
最后修改:2011-03-31
前文曾提到过命名机制,事实上它是一把双刃剑。在能够持有数据等待重新连接的时候,也增加了持有数据方的负担(危险),特别是在"发布/订阅"模式下,可谓牵一发而动全身。
这里先给出一组示例,在代码的运行过程中,通过重启消费者来观察发布者的进程状态。 发布端: import zmq import time context = zmq.Context() sync = context.socket(zmq.PULL) sync.bind("tcp://*:5564") publisher = context.socket(zmq.PUB) publisher.bind("tcp://*:5565") sync_request = sync.recv() for n in xrange(10): msg = "Update %d" % n publisher.send(msg) time.sleep(1) publisher.send("END") time.sleep(1) # Give 0MQ/2.0.x time to flush output 订阅端: import zmq import time context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.setsockopt(zmq.IDENTITY, "Hello") subscriber.setsockopt(zmq.SUBSCRIBE, "") subscriber.connect("tcp://localhost:5565") sync = context.socket(zmq.PUSH) sync.connect("tcp://localhost:5564") sync.send("") while True: data = subscriber.recv() print data if data == "END": break 订阅端得到的信息: $ durasub Update 0 Update 1 Update 2 ^C $ durasub Update 3 Update 4 Update 5 Update 6 Update 7 ^C $ durasub Update 8 Update 9 END 数据被发布者存储了,而发布者的内存占用也节节升高(很危险啊)。所以是否使用命名策略是需要谨慎选择的。为了以防万一,zeromq也提供了"高水位"机制,即当发送端持有数据达到一定数量就不再存储后面的数据,很好的控制了风险。这个机制也适当解决了这里的慢消费问题。 使用了 高水位 后的测试结果: $ durasub Update 0 Update 1 ^C $ durasub Update 2 Update 3 Update 7 Update 8 Update 9 END "高水位"封堵了内存崩溃的可能性,却是以数据丢失为代价的,zeromq也为此配对提供了"swap"功能,将内存中的数据转存入硬盘,实现了"既不耗内存又不丢数据"。 实现代码: import zmq import time context = zmq.Context() # Subscriber tells us when it's ready here sync = context.socket(zmq.PULL) sync.bind("tcp://*:5564") # We send updates via this socket publisher = context.socket(zmq.PUB) publisher.bind("tcp://*:5565") # Prevent publisher overflow from slow subscribers publisher.setsockopt(zmq.HWM, 1) # Specify the swap space in bytes, this covers all subscribers publisher.setsockopt(zmq.SWAP, 25000000) # Wait for synchronization request sync_request = sync.recv() # Now broadcast exactly 10 updates with pause for n in xrange(10): msg = "Update %d" % n publisher.send(msg) time.sleep(1) publisher.send("END") time.sleep(1) # Give 0MQ/2.0.x time to flush output 注意点: 高水位与交换区的设定,是需要根据实际运用状态来确定的,高水位设的过小,会影响到速度。 如果是数据存储端崩溃了,那么,所有数据将彻底消失。 关于高水位的特别说明: 除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据。 线程间的通信,高水位是通信双方共同设置的总和,如果有一方没有设置,则高水位规则不会起到作用。 (未完待续) 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2011-03-31
请问下,有没有相关的performance测试的资料或demo、经验。。。
|
|
返回顶楼 | |
发表时间:2011-03-31
kimmking 写道 请问下,有没有相关的performance测试的资料或demo、经验。。。 这是一个伪翻译的系列教程帖,文中的例子都是实际可用的demo,全文目的也致力于分享一些使用经验和方法。至于性能方面,据传10000qps轻轻松松,不过,根据自己的实际环境自测比较靠谱些。限于个人水平,该系列进度不会太快,你可以参看官方原文:http://zguide.zeromq.org/page:all |
|
返回顶楼 | |