- 浏览: 470059 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
kc_hxd_jp:
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
zeroMQ初体验-14.命名机制 进阶 -
kobe1029:
Map<String, Object> args ...
rabbitmq 队列长度预设的曲线方案 -
Sasoritattoo:
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
nginx + gridfs + mongodb 大事记(残) -
3GQQ2012:
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
zeroMQ初体验-15.应答模式进阶(一)-数据的封装 -
iyuan:
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...
zeroMQ初体验-1.简介及C/S模式
从经典到超越经典。
首先,先回顾下经典:
然后,扩展:
然后,变异:
client发出的数据结构:
路由处理成:
再转给worker成:
工人处理的数据:
由worker到client是一个逆序过程,不过因为两边都是REQ类型,所以其实是一致的。
[补]:
通常,上层的api会帮我们做一些事,免去了逐步封装数据的麻烦,比如在python中,最终代码会是这个样子:
(未完待续)
首先,先回顾下经典:
然后,扩展:
然后,变异:
import threading import time import zmq NBR_CLIENTS = 10 NBR_WORKERS = 3 def worker_thread(worker_url, context, i): """ Worker using REQ socket to do LRU routing """ socket = context.socket(zmq.REQ) identity = "Worker-%d" % (i) socket.setsockopt(zmq.IDENTITY, identity) #set worker identity socket.connect(worker_url) # Tell the borker we are ready for work socket.send("READY") try: while True: # python binding seems to eat empty frames address = socket.recv() request = socket.recv() print("%s: %s\n" %(identity, request)) socket.send(address, zmq.SNDMORE) socket.send("", zmq.SNDMORE) socket.send("OK") except zmq.ZMQError, zerr: # context terminated so quit silently if zerr.strerror == 'Context was terminated': return else: raise zerr def client_thread(client_url, context, i): """ Basic request-reply client using REQ socket """ socket = context.socket(zmq.REQ) identity = "Client-%d" % (i) socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier socket.connect(client_url) # Send request, get reply socket.send("HELLO") reply = socket.recv() print("%s: %s\n" % (identity, reply)) return def main(): """ main method """ url_worker = "inproc://workers" url_client = "inproc://clients" client_nbr = NBR_CLIENTS # Prepare our context and sockets context = zmq.Context(1) frontend = context.socket(zmq.XREP) frontend.bind(url_client) backend = context.socket(zmq.XREP) backend.bind(url_worker) # create workers and clients threads for i in range(NBR_WORKERS): thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, )) thread.start() for i in range(NBR_CLIENTS): thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, )) thread_c.start() # Logic of LRU loop # - Poll backend always, frontend only if 1+ worker ready # - If worker replies, queue worker as ready and forward reply # to client if necessary # - If client requests, pop next worker and send request to it # Queue of available workers available_workers = 0 workers_list = [] # init poller poller = zmq.Poller() # Always poll for worker activity on backend poller.register(backend, zmq.POLLIN) # Poll front-end only if we have available workers poller.register(frontend, zmq.POLLIN) while True: socks = dict(poller.poll()) # Handle worker activity on backend if (backend in socks and socks[backend] == zmq.POLLIN): # Queue worker address for LRU routing worker_addr = backend.recv() assert available_workers < NBR_WORKERS # add worker back to the list of workers available_workers += 1 workers_list.append(worker_addr) # Second frame is empty empty = backend.recv() assert empty == "" # Third frame is READY or else a client reply address client_addr = backend.recv() # If client reply, send rest back to frontend if client_addr != "READY": # Following frame is empty empty = backend.recv() assert empty == "" reply = backend.recv() frontend.send(client_addr, zmq.SNDMORE) frontend.send("", zmq.SNDMORE) frontend.send(reply) client_nbr -= 1 if client_nbr == 0: break # Exit after N messages # poll on frontend only if workers are available if available_workers > 0: if (frontend in socks and socks[frontend] == zmq.POLLIN): # Now get next client request, route to LRU worker # Client request is [address][empty][request] client_addr = frontend.recv() empty = frontend.recv() assert empty == "" request = frontend.recv() # Dequeue and drop the next worker address available_workers -= 1 worker_id = workers_list.pop() backend.send(worker_id, zmq.SNDMORE) backend.send("", zmq.SNDMORE) backend.send(client_addr, zmq.SNDMORE) backend.send(request) #out of infinite loop: do some housekeeping time.sleep (1) frontend.close() backend.close() context.term() if name == "main": main()
client发出的数据结构:
路由处理成:
再转给worker成:
工人处理的数据:
由worker到client是一个逆序过程,不过因为两边都是REQ类型,所以其实是一致的。
[补]:
通常,上层的api会帮我们做一些事,免去了逐步封装数据的麻烦,比如在python中,最终代码会是这个样子:
import threading import time import zmq NBR_CLIENTS = 10 NBR_WORKERS = 3 def worker_thread(worker_url, context, i): """ Worker using REQ socket to do LRU routing """ socket = context.socket(zmq.REQ) identity = "Worker-%d" % (i) socket.setsockopt(zmq.IDENTITY, identity) #set worker identity socket.connect(worker_url) # Tell the borker we are ready for work socket.send("READY") try: while True: [address, request] = socket.recv_multipart() print("%s: %s\n" %(identity, request)) socket.send_multipart([address, "", "OK"]) except zmq.ZMQError, zerr: # context terminated so quit silently if zerr.strerror == 'Context was terminated': return else: raise zerr def client_thread(client_url, context, i): """ Basic request-reply client using REQ socket """ socket = context.socket(zmq.REQ) identity = "Client-%d" % (i) socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier socket.connect(client_url) # Send request, get reply socket.send("HELLO") reply = socket.recv() print("%s: %s\n" % (identity, reply)) return def main(): """ main method """ url_worker = "inproc://workers" url_client = "inproc://clients" client_nbr = NBR_CLIENTS # Prepare our context and sockets context = zmq.Context(1) frontend = context.socket(zmq.XREP) frontend.bind(url_client) backend = context.socket(zmq.XREP) backend.bind(url_worker) # create workers and clients threads for i in range(NBR_WORKERS): thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, )) thread.start() for i in range(NBR_CLIENTS): thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, )) thread_c.start() # Logic of LRU loop # - Poll backend always, frontend only if 1+ worker ready # - If worker replies, queue worker as ready and forward reply # to client if necessary # - If client requests, pop next worker and send request to it # Queue of available workers available_workers = 0 workers_list = [] # init poller poller = zmq.Poller() # Always poll for worker activity on backend poller.register(backend, zmq.POLLIN) # Poll front-end only if we have available workers poller.register(frontend, zmq.POLLIN) while True: socks = dict(poller.poll()) # Handle worker activity on backend if (backend in socks and socks[backend] == zmq.POLLIN): # Queue worker address for LRU routing message = backend.recv_multipart() assert available_workers < NBR_WORKERS worker_addr = message[0] # add worker back to the list of workers available_workers += 1 workers_list.append(worker_addr) # Second frame is empty empty = message[1] assert empty == "" # Third frame is READY or else a client reply address client_addr = message[2] # If client reply, send rest back to frontend if client_addr != "READY": # Following frame is empty empty = message[3] assert empty == "" reply = message[4] frontend.send_multipart([client_addr, "", reply]) client_nbr -= 1 if client_nbr == 0: break # Exit after N messages # poll on frontend only if workers are available if available_workers > 0: if (frontend in socks and socks[frontend] == zmq.POLLIN): # Now get next client request, route to LRU worker # Client request is [address][empty][request] [client_addr, empty, request ] = frontend.recv_multipart() assert empty == "" # Dequeue and drop the next worker address available_workers -= 1 worker_id = workers_list.pop() backend.send_multipart([worker_id, "", client_addr, request]) #out of infinite loop: do some housekeeping time.sleep (1) frontend.close() backend.close() context.term() if name == "main": main()
(未完待续)
发表评论
-
IM选型(初)
2016-08-23 19:12 1636主要参考文章: https://r ... -
关于python和rabbitmq的那点事儿
2011-10-19 14:15 7960rabbitmq是一个消息中间件,在之前的zmq介绍中有略带提 ... -
zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
2011-05-26 16:09 4177服务器: // // Clone server Mod ... -
zeroMQ初体验-33.发布/订阅模式进阶-克隆模式-中
2011-05-26 15:37 2924临时缓存 现实中,比如 ... -
zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
2011-05-26 15:04 3652在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导 ... -
zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
2011-05-25 16:55 2754作为发布/订阅模式的一个常用场景,大数据量的组播是有必要的。虽 ... -
zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
2011-05-25 16:24 4546在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅 ... -
zeroMQ初体验-29.可靠性-自由模式
2011-05-24 17:02 5399好吧,本以为这可能是一个更靠谱的模式,谁知(其实是我一厢情愿了 ... -
zeroMQ初体验-28.可靠性-主从模式
2011-05-23 14:47 5528虽然"硬盘模式" ... -
zeroMQ初体验-27.可靠性-硬盘模式
2011-05-23 13:44 3792在之前的种种模式中, ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:05 5649上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:03 1上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-25.可靠性-偏执的海盗模式
2011-05-05 19:05 3574虽然说“简单的海盗模 ... -
zeroMQ初体验-24.可靠性-简单的海盗模式
2011-05-05 16:41 3211相较于“懒惰的”做了 ... -
zeroMQ初体验-23.可靠性-懒惰的海盗模式
2011-05-05 16:15 5062相较于通常的阻塞模式,这里只是做了一点简单的动作来加强系统的可 ... -
zeroMQ初体验-22.可靠性-总览
2011-04-26 19:25 5932在开篇就从曾对zeromq的可靠性做过质疑,不过,作为一个雄心 ... -
rabbitmq 队列长度预设的曲线方案
2011-04-21 14:36 3380zeromq中倒是直接支持这个功能的。 类似于设定队列长度或 ... -
zeroMQ初体验-21.应答模式进阶(七)-云计算
2011-04-18 19:14 3531这里给出了一个最近很火的"云计算"案例。 ... -
zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
2011-04-18 17:22 3874某些时候,为了冗余的需要,可能会有这样的需求: impo ... -
zeroMQ初体验-19.应答模式进阶(五)-异步式应答
2011-04-15 15:23 4839恩,这应该算是比较实 ...
相关推荐
zeromq-2.1.7 是 ZeroMQ 的一个较早版本,ZeroMQ 是一个开源的消息中间件,它提供了一种高效、灵活的框架来构建分布式应用。在Linux环境中,ZeroMQ通过消息队列机制实现了进程间的通信,使得数据可以在不同程序之间...
这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....
zeromq-4.0.3.tar.gz 是一个包含了 ZeroMQ 4.0.3 版本源代码的压缩文件。ZeroMQ,也被称为“零消息队列”或“0MQ”,是一个开源的消息中间件,它提供了一种高效、灵活且可扩展的方式来处理分布式系统中的数据通信。...
3. 进入解压后的目录:`cd zeromq-4.1.8` 4. 使用autotools构建系统配置:`./configure` 5. 编译源代码:`make` 6. 安装到系统目录:`sudo make install` 使用zeromq: ZeroMQ提供了多种语言的绑定,如C++、Python...
zeromq-4.2.3.tar.gz 是ZeroMQ 4.2.3版本的源代码包,这个稳定版本确保了良好的兼容性和可靠性。 首先,让我们深入了解ZeroMQ的核心概念和功能: 1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络...
`zeromq-2.1.9.tar.gz` 是zeromq的一个特定版本,即2.1.9版,通常以源码形式提供,需要通过编译来安装。 首先,让我们深入了解zeromq的核心概念。zeromq设计了一个灵活的套接字模型,它允许开发者构建复杂的网络...
这个“zeromq-4.3.4.tar.gz”文件是0MQ库的4.3.4稳定版本,发布于2021年1月17日。下面我们将深入探讨0MQ的核心特性、主要功能以及如何使用这一版本。 1. **0MQ简介** - 0MQ不是一个传统的消息队列系统,而是一种在...
zeromq-4.1.3 是一个针对 ZeroMQ 的特定版本的开发工具包,ZeroMQ 是一个开源的消息中间件库,它为分布式计算提供了一种高性能、轻量级的通信框架。ZeroMQ 提供了多种编程语言的绑定,使得开发者能够方便地在不同的...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
3. **模式丰富**:支持多种消息模式,如发布/订阅、请求/应答、推送/拉取和一对多等,这些模式可以灵活地组合以适应不同的应用场景。 4. **高可用性**:具有消息持久化和负载均衡功能,确保在故障情况下仍能保持...
zeromq的设计基于发布/订阅(Pub/Sub)、请求/应答(Req/Rep)和推送/拉取(Push/Pull)等经典的消息模式。这些模式覆盖了各种常见的分布式系统通信场景,如服务间调用、事件驱动架构、工作队列等。通过这些模式,...
标题中的"zeromq-4.2.0.tar.zip"是指ZeroMQ库的4.2.0版本,它被封装在一个ZIP压缩包中,而内部包含的文件是tar归档格式。ZeroMQ是一个开源的消息中间件,它提供了一个高级的消息队列模型,允许应用程序之间进行高效...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
在Linux环境下,zeromq-4.3.2.tar.gz可以通过编译源代码进行安装。通常,这涉及到解压、配置、编译和安装几个步骤。用户需要确保系统上已经安装了必要的构建工具,如GCC、make和必要的依赖库。安装过程可能如下: `...
综上所述,“zeromq-4.1.0-rc1.zip”压缩包中的源代码提供了ZeroMQ的一个关键版本,开发者可以深入了解其内部机制,根据项目需求进行定制或扩展。对于想要构建分布式系统,特别是对性能和灵活性有高要求的项目来说,...
zeromq-3.12.5.tar.gz, libzmq-3.1.2.tar.gz 在Linux环境中,构建和部署分布式计算系统时,Storm是一个常用的选择,它是一个开源的流处理框架,用于实时数据处理。这个压缩包"zeromq-3.12.5.zip"包含了与Storm集群...
这个“zeromq-4.0.1.tar.gz”文件是ZeroMQ的4.0.1版本源代码包,适用于那些需要在网络通信、并发处理或构建微服务架构的开发者。由于从官方网站下载可能速度较慢,此压缩包提供了方便的下载渠道。 ZeroMQ的核心特性...
VS2015 在Widows 10 上编译的 ZeroMQ 4.3.2,JZMQ 3.1 CZMQ 4.2,可以在 JDK 1.8 下运行。DLL 都是 64位,包含了编译及运行相关信息。分享一下,也给自己留个备份