论坛首页 入门技术论坛

zeroMQ初体验-14.命名机制 进阶

浏览 2757 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (11) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-03-31   最后修改:2011-03-31
前文曾提到过命名机制,事实上它是一把双刃剑。在能够持有数据等待重新连接的时候,也增加了持有数据方的负担(危险),特别是在"发布/订阅"模式下,可谓牵一发而动全身。

这里先给出一组示例,在代码的运行过程中,通过重启消费者来观察发布者的进程状态。

发布端:
import zmq
import time

context = zmq.Context()

sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")

publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")

sync_request = sync.recv()

for n in xrange(10):
    msg = "Update %d" % n
    publisher.send(msg)
    time.sleep(1)

publisher.send("END")
time.sleep(1)  # Give 0MQ/2.0.x time to flush output


订阅端:
import zmq
import time

context = zmq.Context()

subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.IDENTITY, "Hello")
subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect("tcp://localhost:5565")

sync = context.socket(zmq.PUSH)
sync.connect("tcp://localhost:5564")
sync.send("")

while True:
    data = subscriber.recv()
    print data
    if data == "END":
        break


订阅端得到的信息:
$ durasub
Update 0
Update 1
Update 2
^C
$ durasub
Update 3
Update 4
Update 5
Update 6
Update 7
^C
$ durasub
Update 8
Update 9
END


数据被发布者存储了,而发布者的内存占用也节节升高(很危险啊)。所以是否使用命名策略是需要谨慎选择的。为了以防万一,zeromq也提供了"高水位"机制,即当发送端持有数据达到一定数量就不再存储后面的数据,很好的控制了风险。这个机制也适当解决了这里的慢消费问题。

使用了 高水位 后的测试结果:
$ durasub
Update 0
Update 1
^C
$ durasub
Update 2
Update 3
Update 7
Update 8
Update 9
END


"高水位"封堵了内存崩溃的可能性,却是以数据丢失为代价的,zeromq也为此配对提供了"swap"功能,将内存中的数据转存入硬盘,实现了"既不耗内存又不丢数据"。

实现代码:
import zmq
import time

context = zmq.Context()

# Subscriber tells us when it's ready here
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")

# We send updates via this socket
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")

# Prevent publisher overflow from slow subscribers
publisher.setsockopt(zmq.HWM, 1)

# Specify the swap space in bytes, this covers all subscribers
publisher.setsockopt(zmq.SWAP, 25000000)

# Wait for synchronization request
sync_request = sync.recv()

# Now broadcast exactly 10 updates with pause
for n in xrange(10):
    msg = "Update %d" % n
    publisher.send(msg)
    time.sleep(1)

publisher.send("END")
time.sleep(1)  # Give 0MQ/2.0.x time to flush output


注意点:
高水位与交换区的设定,是需要根据实际运用状态来确定的,高水位设的过小,会影响到速度。
如果是数据存储端崩溃了,那么,所有数据将彻底消失。
关于高水位的特别说明:
除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据。
线程间的通信,高水位是通信双方共同设置的总和,如果有一方没有设置,则高水位规则不会起到作用。

(未完待续)
   发表时间:2011-03-31  
请问下,有没有相关的performance测试的资料或demo、经验。。。
0 请登录后投票
   发表时间:2011-03-31  
kimmking 写道
请问下,有没有相关的performance测试的资料或demo、经验。。。


这是一个伪翻译的系列教程帖,文中的例子都是实际可用的demo,全文目的也致力于分享一些使用经验和方法。至于性能方面,据传10000qps轻轻松松,不过,根据自己的实际环境自测比较靠谱些。限于个人水平,该系列进度不会太快,你可以参看官方原文:http://zguide.zeromq.org/page:all
0 请登录后投票
论坛首页 入门技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics