`
iyuan
  • 浏览: 473428 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-2.发布订阅模式(pub/sub)

    博客分类:
  • MQ
阅读更多
pub/sub模式:



发布端(pub)
import itertools
import sys  
import time 
            
import zmq  
            
def main(): 
    if len (sys.argv) != 2:
        print 'usage: publisher <bind-to>'
        sys.exit (1)
    
    bind_to = sys.argv[1]
    
    all_topics = ['sports.general','sports.football','sports.basketball',
                  'stocks.general','stocks.GOOG','stocks.AAPL',
                  'weather']
    
    ctx = zmq.Context()
    s = ctx.socket(zmq.PUB)
    s.bind(bind_to)

    print "Starting broadcast on topics:"
    print "   %s" % all_topics
    print "Hit Ctrl-C to stop broadcasting."
    print "Waiting so subscriber sockets can connect..."
    print
    time.sleep(1.0)
    
    msg_counter = itertools.count()
    try:
        for topic in itertools.cycle(all_topics):
            msg_body = str(msg_counter.next())
            print '   Topic: %s, msg:%s' % (topic, msg_body)
            #s.send_multipart([topic, msg_body])
            s.send_pyobj([topic, msg_body])
            # short wait so we don't hog the cpu
            time.sleep(0.1)
    except KeyboardInterrupt:
        pass

    print "Waiting for message queues to flush..."
    time.sleep(0.5)
    s.close()
    print "Done."

if __name__ == "__main__":
    main()


订阅端(sub):
import sys
import time
import zmq

def main():
    if len (sys.argv) < 2:
        print 'usage: subscriber <connect_to> [topic topic ...]'
        sys.exit (1)

    connect_to = sys.argv[1]
    topics = sys.argv[2:]

    ctx = zmq.Context()
    s = ctx.socket(zmq.SUB)
    s.connect(connect_to)

    # manage subscriptions
    if not topics:
        print "Receiving messages on ALL topics..."
        s.setsockopt(zmq.SUBSCRIBE,'')
    else:
        print "Receiving messages on topics: %s ..." % topics
        for t in topics:
            s.setsockopt(zmq.SUBSCRIBE,t)
    print
    try:
        while True:
            #topic, msg = s.recv_multipart()
            topic, msg = s.recv_pyobj()
            print '   Topic: %s, msg:%s' % (topic, msg)
    except KeyboardInterrupt:
        pass
    print "Done."

if __name__ == "__main__":
    main()


注意:
这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件"setsockopt"。
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
官网还提供了一种可能出现的问题:当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积(有朋友指出是堆积在消费端,或许是新版本改进,需要读者的尝试和反馈,thx!),显然,这是不可以被接受的。至于解决方案,或许后面的"分而治之"就是吧。

(未完待续)
2
0
分享到:
评论
6 楼 frankwangzy1103 2012-07-04  
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
---------------------------------------
这个我还真遇到了,局域网,多个sub的时候,如果pub不sleep,会全部被取到一台机器上去,之前折腾了半天了,后来sleep了一下好了~~
5 楼 frankwangzy1103 2012-07-04  
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
----------------------------------
4 楼 frankwangzy1103 2012-07-04  
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
3 楼 iyuan 2011-12-07  
guozhiwei 写道
guozhiwei 写道
"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.




我刚才试验了 是在订阅端堆积的..


现在zmq好像版本很高了,文档已经陈旧了。谢谢你的反馈,相关内容已经做出修正~
2 楼 guozhiwei 2011-12-06  
guozhiwei 写道
"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.




我刚才试验了 是在订阅端堆积的..
1 楼 guozhiwei 2011-12-06  
"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.

相关推荐

    zeromq-2.1.7.tar.gz

    ZeroMQ支持多种协议,如PUB/SUB(发布/订阅)、REQ/REP(请求/响应)和PUSH/PULL(推送/拉取),这些模式提供了灵活的消息传递模型。此外,它还提供了一些高级特性,比如负载均衡、故障恢复和多路复用。 在实际使用...

    zeromq-4.0.3.tar.gz.zip

    6. **分布式**:ZeroMQ 支持多对多的发布/订阅模式,使得构建分布式系统更加容易。 zeromq-4.0.3.tar.gz 文件本身是一个 tar 归档,通常用于在 Unix-like 系统上打包和存储文件。归档后,文件被压缩为 .gz 格式,这...

    zeromq-4.2.3.tar.gz

    1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络编程中的概念,它们提供了多种模式,如PUB(发布者)、SUB(订阅者)、REQ(请求者)、REP(响应者)、DEALER(经销商)和ROUTER(路由器),这些模式...

    zeromq-4.3.4.tar.gz

    - **发布/订阅(PUB/SUB)**:允许一个或多个发布者向多个订阅者广播消息。 - **请求/应答(REQ/REP)**:提供了一对一的请求-响应通信模式。 - **请求/应答(XREQ/XREP)**:扩展的请求-响应模式,支持多路复用和错误...

    zeromq-4.1.3.tar.gz

    1. **消息队列模型**:ZeroMQ 提供了多种消息模式,如 Pub/Sub(发布/订阅)、Req/Rep(请求/响应)、PUSH/PULL(推/拉)和 XPUB/XSUB(扩展发布/订阅)。这些模式覆盖了各种通信场景,让开发者能够灵活选择最适合其...

    zeromq-4.2.0.tar.gz源码包

    zeromq的设计基于发布/订阅(Pub/Sub)、请求/应答(Req/Rep)和推送/拉取(Push/Pull)等经典的消息模式。这些模式覆盖了各种常见的分布式系统通信场景,如服务间调用、事件驱动架构、工作队列等。通过这些模式,...

    zeromq-4.2.0.tar.zip

    ZeroMQ的核心概念是Socket(套接字),它提供了类似于网络编程中的套接字接口,但增加了许多高级功能,比如消息队列、负载均衡、高可用性和订阅/发布模式等。这些特性使得ZeroMQ成为构建分布式系统、微服务架构和...

    zeromq的pub-sub订阅模式的jave实现

    zeromq是一个强大的开源消息库,它提供了多种消息模式,其中最常见的是发布/订阅(pub-sub)模式。在这个模式中,发布者(publisher)发送消息到一个主题,而订阅者(subscriber)则根据自己的兴趣选择接收特定主题...

    zeromq-4.3.2.tar.gz

    首先,zeromq支持多种协议,包括TCP、IPC(进程间通信)、PUB/SUB(发布/订阅)、REQ/REP(请求/响应)、PAIR(对等)和DEALER/ROUTER(经销商/路由器)模式。这些模式为不同场景提供了灵活的选择,比如PUB/SUB模式...

    zeromq4-1-master.zip

    7. **消息模式**:ZMQ支持多种通信模式,包括请求/响应(REQ/REP)、发布/订阅(PUB/SUB)、推播/拉取(PUSH/PULL)、对等(PAIR)等,这些模式可以灵活组合以满足各种应用场景。 “zeromq4-1-master”源代码库中...

    zeromq-4.0.3.tar

    3. **模式丰富**:支持多种通信模式,如PUB/SUB(发布/订阅)、REQ/REP(请求/响应)、PUSH/PULL(推送/拉取)等,适应不同应用场景。 4. **跨平台**:ZeroMQ可以在多种操作系统上运行,包括Linux、Windows、macOS等...

    ZeroMQ4.3.4

    2. **灵活性**:支持多种消息模式,如发布/订阅(Publish/Subscribe),请求/响应(Request/Reply),推拉(Push/Pull),对等(Peer-to-Peer)等,适应不同应用场景。 3. **轻量级**:不需要中心服务器,节点间可以直接通信...

    http---zguide_zeromq_org-py-all.rar_python zeromq_python zmq_zer

    在ZeroMQ的PUB/SUB模式中,发布者发送消息到一个主题,而订阅者可以订阅特定的主题来接收消息。这种模式适用于一对多的广播场景。而在REQ/REP模式中,请求者发送一个请求,应答者则返回一个响应,这是典型的客户端-...

    zeromq-4.1.6.tar.gz

    1. **消息队列模型**:ZeroMQ遵循了发布/订阅(Publish/Subscribe)、请求/响应(Request/Reply)和推拉(Push/Pull)等消息传递模式。这些模式允许开发者设计灵活的通信架构,适应各种复杂的网络拓扑。 2. **高...

    zeromq-4.2.5.tar.gz

    3. **灵活的通信模式:** 支持多种通信模式,如请求-响应(Request-Reply)、发布-订阅(Publish-Subscribe)、推送-拉取(Push-Pull)等,适应不同场景的需求。 4. **协议独立:** ZeroMQ 提供了一套自己的简单...

    zeromq-3.2.5.zip

    2. **发布/订阅(Pub/Sub)**:在这个模式中,`ZMQ_PUB`插座用于发布消息,而`ZMQ_SUB`插座用于订阅这些消息。订阅者可以通过设置过滤器来选择接收感兴趣的主题。 3. **请求/响应(Req/Rep)**:zeromq的请求/响应...

    zeromq-4.3.2.zip

    4. **模式丰富**:ZeroMQ提供了四种主要的通信模式:PUB/SUB(发布/订阅)、REQ/REP(请求/响应)、DEALER/ROUTER(经销商/路由器)和PAIR(对等)模式,适应不同应用场景。 5. **可伸缩性**:ZeroMQ通过负载均衡和...

    ZeroMQ初体验.rar_ZeroMQ初体验_zeromq

    2. **创建插座**:在程序中创建ZeroMQ插座,指定其类型,如PUB(发布者)、SUB(订阅者)、REQ(请求者)或REP(应答者)等。 3. **绑定与连接**:发布者绑定到一个地址,订阅者或请求者连接到该地址。这定义了消息...

    golang实现zeromq的各种通讯模式

    使用的zeromq版本为:"github.com/pebbe/zmq4" 含有req-rep pub-sub push-pull router-dealer req-router dealer-rep loadbalance)(负载均衡)的实现

    zeroMQ初体验

    - **XPUB/XSUB(扩展发布/订阅)**:带有过滤元数据的PUB/SUB模式。 在&lt;http://zguide.zeromq.org/page:all&gt;这个页面,你可以找到更多关于这些模式的详细示例和说明,进一步了解和学习ZeroMQ如何实现高效的分布式...

Global site tag (gtag.js) - Google Analytics