- 浏览: 474416 次
- 性别:
- 来自: 上海
-
最新评论
-
kc_hxd_jp:
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
zeroMQ初体验-14.命名机制 进阶 -
kobe1029:
Map<String, Object> args ...
rabbitmq 队列长度预设的曲线方案 -
Sasoritattoo:
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
nginx + gridfs + mongodb 大事记(残) -
3GQQ2012:
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
zeroMQ初体验-15.应答模式进阶(一)-数据的封装 -
iyuan:
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...
zeroMQ初体验-1.简介及C/S模式
文章列表
恩,这应该算是比较实用的部分了。
模式图:
import zmq
import threading
import time
from random import choice
class ClientTask(threading.Thread):
"""ClientTask"""
def init(self):
...
神马awk,head,sed都是浮云,直接磁盘操作啊。。
:|dd of=fname seek=1 bs=$(($(stat -c%s fname)-$(tail -2 fname|wc -c)))
有木有!!
从经典到超越经典。
首先,先回顾下经典:
然后,扩展:
然后,变异:
import threading
import time
import zmq
NBR_CLIENTS = 10
NBR_WORKERS = 3
def worker_thread(worker_url, context, i):
""" Worker using R ...
XREP-REQ模式:
典型的"老妈模式",只有当她真的要听你说时,她才能听的进去。所以首先,得要REQ告诉你“她准备好了,你可以讲了”,然后,你才能倾吐...
一般来说与XREQ一样,一个REQ只能连接一个XREP(除非你想做容错,不过,不建议那样)。
实例模型:
import time
import random
from threading import Thread
import zmq
import zhelpers
NBR_WORKERS = 10
def worker_thread(context):
worker ...
在上一节中已经提到XREP主要工作是包装数据,打上标记以便方便的传递数据。那么,换个角度来看,这不就是路由么!其实在优雅的扩展中有介绍过。在这里针对XREP模式做深入的探索。
首先,得要理一下其中几种类型的差别(相似的名字真是坑爹啊):
REQ,官网称之为"老妈类型",因为它负责主动提出请求,并且要求得到答复(严格同步的)
REP,"老爸类型",负责应答请求,(从不主动,也是严格同步的)
XREQ,"分销类型",负责对进出的数据排序,均匀的分发给接入的REP或者XREP
XREP,"路由类型",将信息转发至任何与他 ...
整整一大章全部讲的应答模式的进阶,应该很重要吧(简直是一定的)。
上一节讲到了发布/订阅模式 关于封装的话题,在应答模式中也是如此,不过这个动作已经被底层(zeromq)接管,对应用透明。而其中普通模式与X模式又有区别,例如:req连接Xrep:
说明:
第三部分是实际发送的数据
第二部分是REQ向XREP发送请求时底层附加的
第一部分是XREP自身地址
注意:
前文已经说过,XREP其实用以平衡负载,所以这里由它对请求数据做了封装操作,如果通过多个XREP,数据结构就会变成这个样子:
同时,如果没有启用命名机制,XREP会自动赋予临时名字:
不然,就是这样了:
这里给出一个验证代码: ...
前文曾提到过命名机制,事实上它是一把双刃剑。在能够持有数据等待重新连接的时候,也增加了持有数据方的负担(危险),特别是在"发布/订阅"模式下,可谓牵一发而动全身。
这里先给出一组示例,在代码的运行过程中,通过重启消费者来观察发布者的进程状态。
发布端:
import zmq
import time
context = zmq.Context()
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")
publisher = context.socket(zmq.PUB ...
前面章节有介绍过当传输大数据时,建议分拆成多个小数据逐个发送,以防单条数据过大引发内存溢出等问题。同样的,这也适用于 发布订阅模式,这里用到了一个新名词:信封。
这种封装的数据结构看起来是这样的:
由于k ...
可能绝大多数接触zeromq的人都会对其去中心的自由感到满意,同时却又对数据传输的可靠性产生怀疑甚至沮丧(如果恰巧你也知道"兔子"的话)。
在这里,或许可以为此作出一些弥补,增强诸位使用它的信心。
zeromq之所以传输的速度无以伦比,它的"zero copy"功不可没,在这种机制下,减少了数据的二次缓存和挪动,并且减少了通讯间的应答式回应。不过在快速的同时,也降低了数据传递的可靠性。而打开copy机制,则在牺牲一定速度的代价下提升了其稳定性。
除了zero-copy机制外,zeromq还提供了一种命名机制,用以建立所谓的"Durable ...
上一篇讲到了线程间的协作,通过zeroMQ的pair模式可以很优雅的实现。而在各节点间(进程级),则适用度不高(虽然也能用)。这里给出了两个理由:
1.节点间是可以调节的,而线程间不是(线程是稳定的),pair模式是非自动连接的.
2.线程数是固定的,可预估的。而节点则是变动、不可预估的。
由此得出结论:pair适用于稳定、可控的环境。
所以,有了本章节。不知诸位还记得前面所讲的发布/订阅模式,在那里曾说过这种模式是不太稳定的(主要是指初始阶段),容易在连接未建立前就发布、废弃部分数据。在这里,通过节点间的协作来解决那个难题。
模型图:
发布端:
import zmq
SUBS ...
"或许,ZeroMQ是最好的多线程运行环境!"官网如是说。
其实它想要支持的是那种类似erlang信号模式。传统多线程总会伴随各种"锁"出现各种稀奇古怪的问题。而zeroMQ的多线程致力于"去锁化",简单来说,一 ...
前面所谈到的网络拓扑结构都是这样的:
而在实际的应用中,绝大多数会出现这样的结构要求:
zeroMQ中自然也提供了这样的需求案例:
1.发布/订阅 代理模式:
import zmq
context = zmq.Context()
frontend = context.socket(zmq.SUB)
frontend.connect("tcp://192.168.55.210:5556")
backend = context.socket(zmq.PUB)
backend.bind("tcp://10.1.1.0:8100" ...
写过"永不停歇"的代码的兄弟应该都或多或少遇到或考虑到内存溢出之类的问题,那么,在ZeroMQ的应用中,又如何处理如是情况?
文中给出了类C这种需要自行管理内存的解决方案(虽然python的GC很强大,不过,关注下总没有坏处):
这里运用到了这个工具:valgrind
为了避免zeromq中的一些warning的干扰,首先需要重新build下zermq
$ cd zeromq
$ export CPPFLAGS=-DZMQ_MAKE_VALGRIND_HAPPY
$ ./configure
$ make clean; make
$ sudo make install
...
关掉一个进程有很多种方式,而在ZeroMQ中则推崇通过使用信号通知,可控的卸载、关闭进程。在这里,要援引之前的"分而治之"例子(具体可以见这里)。
例图:
显然,信号发送是由能够掌握整个进度的"水槽"(下游)来控制,在原有基础上做少许变更即可。
Worker(数据处理):
import sys
import time
import zmq
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localh ...
之前已经讲过,zeroMQ是可以多对多的,但需要成对匹配才行,即多个发布端都是同一种模式,而这里要涉及到的是,多个发布端模式不统一的情况。
文中先给出了一个比较"脏"的处理方式:
import zmq
import time
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
subscriber = context.socket(zmq.SUB)
subscriber.conn ...