`
pjwqq
  • 浏览: 81552 次
社区版块
存档分类
最新评论

pyzmq的4种模式(PUSH/PULL)笔记

阅读更多

   这个是 PUSH/PULL 模式,又叫做pipeline管道模式,取其一去不回头之意,一推一拉,数据滚滚向前。

   这种socket封装的原本用意呢,是把数据交给一组worker端干活,PUSH会把任务均匀的(这个好像是zmq的招牌)的分配给下游的worker们,保证大家都有活干,图:

Producer:产生代处理数据,并将数据push出去

consumer(worker):pull到来自producer的数据,处理后,push出去

resultcolltor:pull到consumer的数据处理结果

Producer:

import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.bind("tcp://127.0.0.1:5557")
    # 开始Producer之前必须先启动resultcollector和consumer
    for num in xrange(2000):
        work_message = { 'num' : num }
        zmq_socket.send_json(work_message)

producer()

 consumer:

import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print "I am consumer #%s" % (consumer_id)
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5557")
    # send work
    consumer_sender = context.socket(zmq.PUSH)
    consumer_sender.connect("tcp://127.0.0.1:5558")
    
    while True:
        work = consumer_receiver.recv_json()
        data = work['num']
        result = { 'consumer' : consumer_id, 'num' : data}
        if data%2 == 0: 
            consumer_sender.send_json(result)

consumer()

 resultcollector:

import time
import zmq
import pprint

def result_collector():
    context = zmq.Context()
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")
    collecter_data = {}
    for x in xrange(1000):
        result = results_receiver.recv_json()
        if collecter_data.has_key(result['consumer']):
            collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1
        else:
            collecter_data[result['consumer']] = 1
        if x == 999:
            pprint.pprint(collecter_data)

result_collector()

run起来:

python resultcollector.py
python consumer.py
python consumer.py
python producer.py

Producer的2000个数交给consumer处理(就是排除奇数),然后汇集到resultcollector,看下结果:

{   3362: 233,
    9312: 767
}

 consumer:3362 处理 233,consumer:9312 处理767,结果还行,呵呵

 注意一点就是,下游要先于上游开启,否则数据就丢了。感觉zmq对socket的这几种封装,有些场景还是挺适用,可以自由组装。有机会尝试一下。

 

5
2
分享到:
评论

相关推荐

    支持中文路径push/pull的adb

    标题中的"支持中文路径push/pull的adb"正是针对这一问题的解决方案。 "支持中文路径的push和pull"意味着这个ADB版本已经经过修改,能够正确处理包含中文字符的文件路径。在Windows环境下,由于文件系统编码的问题,...

    能在linux(epoll)运行基于C# .net standard2.0 写的socket框架,可使用于dotnet core程序集

    服务端所在socket.core.Server命名空间下,分别为三种模式 push/pull/pack 客户端所在socket.core.Client命名空间下,分别为三种模式 push/pull/pack 主要流程与对应的方法和事件介绍. 注:connectId(int)代表着...

    供应链库存的RFID使能的Push/Pull控制策略优化 (2015年)

    基于RFID技术借鉴单工厂多阶段生产存储系统的Push/Pull策略,以供应链分销网络三级库存系统为例设计了8种RFID使能的Push/Pull混合控制策略。为得到各策略的更优性能,采用基于仿真的粒子群优化方法对各策略参数进行了...

    adb push /adb pull工具

    【解决】当设备已root使用adb命令, 提示'adbd cannot run as root in production builds',adb push /adb pull 到系统目录 解决方案 解压密码:hkxgame.com

    node-zeromq-pushpull-demo:使用`PUSH`PULL`套接字(`0MQ`)和Node.js集群技术演示zeromq(0MQ)

    最后一个项目是一个使用PUSH / PULL套接字 ( 0MQ ) 和 Node.js 集群技术的程序。 任务 启动一个由 3 个工人组成的池并在他们之间分配 30 个工作(整个程序可能不到 100 行代码) 主进程应该: 创建一个 PUSH 套接...

    Axure RP 8.0.0.3274 Beta Update

    - Fixed push/pull for some Custom Shapes - Fixed moving some Custom Shapes - Fixed push/pull in Groups with hidden Widgets - Fixed push/pull for some flip animations - Fixed using resize animation ...

    应用笔记LAT1319+Push-Pull模式在全桥峰值电流控制中的应用

    ### 应用笔记LAT1319+Push-Pull模式在全桥峰值电流控制中的应用 #### 一、概述 本应用笔记旨在介绍如何利用Push-Pull模式解决全桥变换器在峰值电流控制中遇到的问题。具体而言,针对客户的需求——在全桥变换器的...

    为3KVA UPS 设计一个PUSH-PULL变压器

    PUSH-PULL变压器是一种双端驱动的电路结构,适用于高功率应用,能提供良好的功率输出和低失真。 首先,设计条件包括输入电压来源于8节12V电池,电压范围在80-110V之间。输出电压设定为±380V,而输出功率需求为2100...

    push-pull变换器缓冲电路设计

    详细介绍了推挽变换器(push-pull)开关元件mosfet漏极尖峰电压的产生原理、测量方法、缓冲电路设计步骤、元件的选型计算方法等。

    zeroMQ/jzmq java例子

    本篇文章将深入探讨ZeroMQ/jzmq在Java中的应用,通过分析提供的"req/rep"、"pub/sub"、"push/pull"模式及代理和多数据源的示例,来帮助初学者快速理解其工作原理。 1. **req/rep模式**: 在req/rep(请求/响应)...

    ADB支持中文pull/push

    然而,随着ADB的不断更新和完善,现在已经能够很好地支持中文`pull`和`push`操作,解决了中文路径和文件名的兼容性问题。 `pull`命令是ADB用来从Android设备上下载文件到电脑的,而`push`则是将电脑上的文件上传到...

    Push-Pull Transformer

    级联式推挽变换器,通过推挽变换器输入并联输出串联的方式实现低电压大电流输入,大电压小电流输出

    adb push pull中文支持解决方案

    然而,当涉及到处理包含中文字符的文件名时,`adb push`和`adb pull`命令可能会出现乱码问题,导致文件无法正常传输。这个问题主要源于ADB默认使用ASCII编码,而中文字符通常需要UTF-8或者其他多字节编码来表示。 ...

    crystal report push模式

    在 Crystal Reports 中,有两种主要的数据访问模式:Pull 模式和 Push 模式。本篇将详细讲解 Crystal Report 的 Push 模式及其相关知识点。 Push 模式,也称为数据推送模式,是一种由应用程序控制数据流的方式。在 ...

    水晶报表pull和push方法实现源代码

    在水晶报表中,"Pull"和"Push"方法是两种不同的数据获取策略,它们各自有着不同的特点和应用场景。 1. **Pull方法**: Pull方法也被称为“拉”模式,它是由水晶报表自身从数据源中检索数据。在这种模式下,水晶...

    Docker注册表的前端和授权工具Portus.zip

    它允许对你所有的镜像进行细颗粒度控制,你可以决定哪个用户和团队可 push/pull 镜像。轻松管理用户。 在 Portus 映射你的公司,可以定义任意数量的 team,并从 team 添加和移除用户。Team 有三种类型的用户:...

    pull与Push相关

    在实际应用中,很多软件往往结合使用Pull和Push两种技术。例如,电子邮件应用通常采用Pull策略来检查新邮件,但也可以通过Push通知用户有未读邮件。这样既可以节省资源,又能够提供及时的通知。 总的来说,Pull与...

    论文:实例对比push 和pull

    ### 论文知识点总结 ...综上所述,本论文通过对Pull与Push两种AJAX数据更新机制的深入研究,为开发者提供了有价值的参考依据,有助于他们根据实际需求选择最适合的技术方案,从而提升应用的整体性能和用户体验。

    Go-pushover-Go封装PushoverAPI

    《Go语言封装Pushover API详解》 在Go语言的开发过程中,我们经常需要与各种第三方服务进行交互,其中Pushover API是一个广泛使用的即时通知服务,它允许开发者将消息推送到用户的手机或桌面设备上。本篇文章将深入...

    Push or Pull ?.md

    Push or Pull ?.md

Global site tag (gtag.js) - Google Analytics