`
iyuan
  • 浏览: 473583 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-3.分而治之模式(push/pull)

    博客分类:
  • MQ
阅读更多
push/pull模式:



模型描述:
1.上游(任务发布)
2.工人(中间,具体工作)
3.下游(信号采集或者工作结果收集)

上游代码:
import zmq
import random
import time

context = zmq.Context()

# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

print "Press Enter when the workers are ready: "
_ = raw_input()
print "Sending tasks to workers..."

# The first message is "0" and signals start of batch
sender.send('0')

# Initialize random number generator
random.seed()

# Send 100 tasks
total_msec = 0
for task_nbr in range(100):
    # Random workload from 1 to 100 msecs
    workload = random.randint(1, 100)
    total_msec += workload
    sender.send(str(workload))
print "Total expected cost: %s msec" % total_msec


工作代码:
import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Process tasks forever
while True:
    s = receiver.recv()

    # Simple progress indicator for the viewer
    sys.stdout.write('.')
    sys.stdout.flush()

    # Do the work
    time.sleep(int(s)*0.001)

    # Send results to sink
    sender.send('')


下游代码:
import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Wait for start of batch
s = receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmations
total_msec = 0
for task_nbr in range(100):
    s = receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(':')
    else:
        sys.stdout.write('.')

# Calculate and report duration of batch
tend = time.time()
print "Total elapsed time: %d msec" % ((tend-tstart)*1000)



注意点:
这种模式与pub/sub模式一样都是单向的,区别有两点:
1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到

这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的那个"堵塞问题"的一个解决策略吧)

由上面的模型图可以看出,这是一个N:N的模式,在1:N的情况下,各消费者并不是平均消费的,而在N:1的情况下,则有所不同,如下图:



这种模式主要关注点在于,可以扩展中间worker,来到达并发的目的。

(未完待续)
0
0
分享到:
评论

相关推荐

    zeromq-2.1.7.tar.gz

    ZeroMQ支持多种协议,如PUB/SUB(发布/订阅)、REQ/REP(请求/响应)和PUSH/PULL(推送/拉取),这些模式提供了灵活的消息传递模型。此外,它还提供了一些高级特性,比如负载均衡、故障恢复和多路复用。 在实际使用...

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    3. 解压缩zeromq-3.2.5.tar.gz,编译并安装ZeroMQ,确保系统安装了必要的编译工具和依赖库。 4. 解压缩jzmq.tar.gz,编译并安装jZMQ,这需要JDK已经安装并且Java环境已配置好。 5. 解压缩storm-0.8.0.zip,根据提供...

    zeromq-4.0.3.tar.gz.zip

    zeromq-4.0.3.tar.gz 是一个包含了 ZeroMQ 4.0.3 版本源代码的压缩文件。ZeroMQ,也被称为“零消息队列”或“0MQ”,是一个开源的消息中间件,它提供了一种高效、灵活且可扩展的方式来处理分布式系统中的数据通信。...

    zeromq-4.1.8.tar.gz

    3. 进入解压后的目录:`cd zeromq-4.1.8` 4. 使用autotools构建系统配置:`./configure` 5. 编译源代码:`make` 6. 安装到系统目录:`sudo make install` 使用zeromq: ZeroMQ提供了多种语言的绑定,如C++、Python...

    zeromq-4.1.3.tar.gz

    1. **消息队列模型**:ZeroMQ 提供了多种消息模式,如 Pub/Sub(发布/订阅)、Req/Rep(请求/响应)、PUSH/PULL(推/拉)和 XPUB/XSUB(扩展发布/订阅)。这些模式覆盖了各种通信场景,让开发者能够灵活选择最适合其...

    zeromq-4.3.4.tar.gz

    - **推送/拉取(PUSH/PULL)**:用于单向数据流,一个或多个推送者向一个或多个拉取者发送消息。 - **管道(PAIR)**:为两个端点提供高速、低延迟的直接连接。 4. **0MQ的使用** - 在安装“zeromq-4.3.4.tar.gz”后...

    zeromq-4.2.3.tar.gz

    zeromq-4.2.3.tar.gz 是ZeroMQ 4.2.3版本的源代码包,这个稳定版本确保了良好的兼容性和可靠性。 首先,让我们深入了解ZeroMQ的核心概念和功能: 1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络...

    zeromq-2.1.9.tar.gz

    `zeromq-2.1.9.tar.gz` 是zeromq的一个特定版本,即2.1.9版,通常以源码形式提供,需要通过编译来安装。 首先,让我们深入了解zeromq的核心概念。zeromq设计了一个灵活的套接字模型,它允许开发者构建复杂的网络...

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-4.3.4.zip

    标题中的"zeromq-4.3.4.zip"指的是这个库的4.3.4版本的源代码压缩包。这个稳定版在2021年1月17日发布,意味着它是经过多轮测试和优化后的成熟版本,适合用于生产环境。 0MQ的核心特性包括: 1. **轻量级**:0MQ不...

    zeromq-4.2.0.tar.gz源码包

    zeromq的设计基于发布/订阅(Pub/Sub)、请求/应答(Req/Rep)和推送/拉取(Push/Pull)等经典的消息模式。这些模式覆盖了各种常见的分布式系统通信场景,如服务间调用、事件驱动架构、工作队列等。通过这些模式,...

    zeromq-4.1.0-rc1.zip

    它提供了多种消息模式,如发布/订阅(Publish/Subscribe)、请求/响应(Request/Reply)和推拉(Push/Pull)模式,这些模式可以轻松地构建复杂、可扩展的分布式系统。 1. **发布/订阅模式**: 在这种模式下,发布...

    zeromq-4.2.0.tar.zip

    标题中的"zeromq-4.2.0.tar.zip"是指ZeroMQ库的4.2.0版本,它被封装在一个ZIP压缩包中,而内部包含的文件是tar归档格式。ZeroMQ是一个开源的消息中间件,它提供了一个高级的消息队列模型,允许应用程序之间进行高效...

    zeromq-3.2.4.tar.gz

    zeromq的核心概念是基于发布/订阅(Publish/Subscribe)、请求/响应(Request/Reply)和推送/拉取(Push/Pull)模式的消息队列模型。这些模式提供了异步通信的基础,使得应用程序可以实现解耦和并行处理。例如,在...

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.0.1.tar.gz

    2. **模式丰富**:支持多种消息模式,如发布/订阅(Publish/Subscribe)、请求/响应(Request/Reply)、推拉(Push/Pull)以及管道对(Pair)。这些模式可以灵活地适应不同的应用场景。 3. **高并发**:ZeroMQ利用I...

    zeromq-4.3.2.tar.gz

    在Linux环境下,zeromq-4.3.2.tar.gz可以通过编译源代码进行安装。通常,这涉及到解压、配置、编译和安装几个步骤。用户需要确保系统上已经安装了必要的构建工具,如GCC、make和必要的依赖库。安装过程可能如下: `...

    zeromq-3.12.5.zip

    zeromq-3.12.5.tar.gz, libzmq-3.1.2.tar.gz 在Linux环境中,构建和部署分布式计算系统时,Storm是一个常用的选择,它是一个开源的流处理框架,用于实时数据处理。这个压缩包"zeromq-3.12.5.zip"包含了与Storm集群...

Global site tag (gtag.js) - Google Analytics