- 浏览: 471881 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
kc_hxd_jp:
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
zeroMQ初体验-14.命名机制 进阶 -
kobe1029:
Map<String, Object> args ...
rabbitmq 队列长度预设的曲线方案 -
Sasoritattoo:
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
nginx + gridfs + mongodb 大事记(残) -
3GQQ2012:
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
zeroMQ初体验-15.应答模式进阶(一)-数据的封装 -
iyuan:
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...
zeroMQ初体验-1.简介及C/S模式
虽然说“简单的海盗模式”已经非常靠谱了,不过瑕疵还是有不少的。比如说,中间件队列并不监控后端的worker死活,至少会有一次丢包来确定那个worker已经不在了(虽然问题不大,但终究不爽)。而在“偏执的”模式中,有对“简单”模式做了一些扩展:
Queue:
worker:
注意:这里的是lua代码
其实从模式图中已经可以看出,系统中多了“心跳”环节,来确认链路的可用性。
关于心跳模块,着实比较棘手,也算是代码中的重头了。关于做“心跳”的策略,关键是要把握好时间间隔,以避免过载或者失效。通常的,也不建议在持久化的连接上加入心跳机制。
这里应当注意到,“偏执”模式与“简单”模式并不兼容--因为心跳机制。
为了避免混乱。 rfc.zeromq.org这儿有一些协议的声明,帮助你至少不需要去看现有的代码来确定是否兼容新的东东~
(未完待续)
Queue:
require"zmq" require"zmq.poller" require"zmsg" local MAX_WORKERS = 100 local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable local HEARTBEAT_INTERVAL = 1000 -- msecs local tremove = table.remove -- Insert worker at end of queue, reset expiry -- Worker must not already be in queue local function s_worker_append(queue, identity) if queue[identity] then printf ("E: duplicate worker identity %s", identity) else assert (#queue < MAX_WORKERS) queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS queue[#queue + 1] = identity end end -- Remove worker from queue, if present local function s_worker_delete(queue, identity) for i=1,#queue do if queue == identity then tremove(queue, i) break end end queue[identity] = nil end -- Reset worker expiry, worker must be present local function s_worker_refresh(queue, identity) if queue[identity] then queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS else printf("E: worker %s not ready\n", identity) end end -- Pop next available worker off queue, return identity local function s_worker_dequeue(queue) assert (#queue > 0) local identity = tremove(queue, 1) queue[identity] = nil return identity end -- Look for & kill expired workers local function s_queue_purge(queue) local curr_clock = s_clock() -- Work backwards from end to simplify removal for i=#queue,1,-1 do local id = queue if (curr_clock > queue[id]) then tremove(queue, i) queue[id] = nil end end end s_version_assert (2, 1) -- Prepare our context and sockets local context = zmq.init(1) local frontend = context:socket(zmq.XREP) local backend = context:socket(zmq.XREP) frontend:bind("tcp://*:5555"); -- For clients backend:bind("tcp://*:5556"); -- For workers -- Queue of available workers local queue = {} local is_accepting = false -- Send out heartbeats at regular intervals local heartbeat_at = s_clock() + HEARTBEAT_INTERVAL local poller = zmq.poller(2) local function frontend_cb() -- Now get next client request, route to next worker local msg = zmsg.recv(frontend) local identity = s_worker_dequeue (queue) msg:push(identity) msg:send(backend) if (#queue == 0) then -- stop accepting work from clients, when no workers are available. poller:remove(frontend) is_accepting = false end end -- Handle worker activity on backend poller:add(backend, zmq.POLLIN, function() local msg = zmsg.recv(backend) local identity = msg:unwrap() -- Return reply to client if it's not a control message if (msg:parts() == 1) then if (msg:address() == "READY") then s_worker_delete(queue, identity) s_worker_append(queue, identity) elseif (msg:address() == "HEARTBEAT") then s_worker_refresh(queue, identity) else printf("E: invalid message from %s\n", identity) msg:dump() end else -- reply for client. msg:send(frontend) s_worker_append(queue, identity) end -- start accepting client requests, if we are not already doing so. if not is_accepting and #queue > 0 then is_accepting = true poller:add(frontend, zmq.POLLIN, frontend_cb) end end) -- start poller's event loop while true do local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000)) -- Send heartbeats to idle workers if it's time if (s_clock() > heartbeat_at) then for i=1,#queue do local msg = zmsg.new("HEARTBEAT") msg:wrap(queue, nil) msg:send(backend) end heartbeat_at = s_clock() + HEARTBEAT_INTERVAL end s_queue_purge(queue) end -- We never exit the main loop -- But pretend to do the right shutdown anyhow while (#queue > [[span style="color:#666666"]]0) [[span style="color:#008000"]]do s_worker_dequeue(queue) [[span style="color:#008000"]]end frontend:close() backend:close()
worker:
require"zmq" require"zmq.poller" require"zmsg" local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable local HEARTBEAT_INTERVAL = 1000 -- msecs local INTERVAL_INIT = 1000 -- Initial reconnect local INTERVAL_MAX = 32000 -- After exponential backoff -- Helper function that returns a new configured socket -- connected to the Hello World server -- local identity local function s_worker_socket (context) local worker = context:socket(zmq.XREQ) -- Set random identity to make tracing easier identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000)) worker:setopt(zmq.IDENTITY, identity) worker:connect("tcp://localhost:5556") -- Configure socket to not wait at close time worker:setopt(zmq.LINGER, 0) -- Tell queue we're ready for work printf("I: (%s) worker ready\n", identity) worker:send("READY") return worker end s_version_assert (2, 1) math.randomseed(os.time()) local context = zmq.init(1) local worker = s_worker_socket (context) -- If liveness hits zero, queue is considered disconnected local liveness = HEARTBEAT_LIVENESS local interval = INTERVAL_INIT -- Send out heartbeats at regular intervals local heartbeat_at = s_clock () + HEARTBEAT_INTERVAL local poller = zmq.poller(1) local is_running = true local cycles = 0 local function worker_cb() -- Get message -- - 3-part envelope + content -> request -- - 1-part "HEARTBEAT" -> heartbeat local msg = zmsg.recv (worker) if (msg:parts() == 3) then -- Simulate various problems, after a few cycles cycles = cycles + 1 if (cycles > 3 and randof (5) == 0) then printf ("I: (%s) simulating a crash\n", identity) is_running = false return elseif (cycles > 3 and randof (5) == 0) then printf ("I: (%s) simulating CPU overload\n", identity) s_sleep (5000) end printf ("I: (%s) normal reply - %s\n", identity, msg:body()) msg:send(worker) liveness = HEARTBEAT_LIVENESS s_sleep(1000); -- Do some heavy work elseif (msg:parts() == 1 and msg:body() == "HEARTBEAT") then liveness = HEARTBEAT_LIVENESS else printf ("E: (%s) invalid message\n", identity) msg:dump() end interval = INTERVAL_INIT end poller:add(worker, zmq.POLLIN, worker_cb) while is_running do local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000)) if (cnt == 0) then liveness = liveness - 1 if (liveness == 0) then printf ("W: (%s) heartbeat failure, can't reach queue\n", identity) printf ("W: (%s) reconnecting in %d msec…\n", identity, interval) s_sleep (interval) if (interval < INTERVAL_MAX) then interval = interval * 2 end poller:remove(worker) worker:close() worker = s_worker_socket (context) poller:add(worker, zmq.POLLIN, worker_cb) liveness = HEARTBEAT_LIVENESS end end -- Send heartbeat to queue if it's time if (s_clock () > heartbeat_at) then heartbeat_at = s_clock () + HEARTBEAT_INTERVAL printf("I: (%s) worker heartbeat\n", identity) worker:send("HEARTBEAT") end end worker:close() context:term()
注意:这里的是lua代码
其实从模式图中已经可以看出,系统中多了“心跳”环节,来确认链路的可用性。
关于心跳模块,着实比较棘手,也算是代码中的重头了。关于做“心跳”的策略,关键是要把握好时间间隔,以避免过载或者失效。通常的,也不建议在持久化的连接上加入心跳机制。
这里应当注意到,“偏执”模式与“简单”模式并不兼容--因为心跳机制。
为了避免混乱。 rfc.zeromq.org这儿有一些协议的声明,帮助你至少不需要去看现有的代码来确定是否兼容新的东东~
(未完待续)
发表评论
-
IM选型(初)
2016-08-23 19:12 1643主要参考文章: https://r ... -
关于python和rabbitmq的那点事儿
2011-10-19 14:15 7963rabbitmq是一个消息中间件,在之前的zmq介绍中有略带提 ... -
zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
2011-05-26 16:09 4198服务器: // // Clone server Mod ... -
zeroMQ初体验-33.发布/订阅模式进阶-克隆模式-中
2011-05-26 15:37 2930临时缓存 现实中,比如 ... -
zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
2011-05-26 15:04 3659在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导 ... -
zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
2011-05-25 16:55 2756作为发布/订阅模式的一个常用场景,大数据量的组播是有必要的。虽 ... -
zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
2011-05-25 16:24 4549在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅 ... -
zeroMQ初体验-29.可靠性-自由模式
2011-05-24 17:02 5407好吧,本以为这可能是一个更靠谱的模式,谁知(其实是我一厢情愿了 ... -
zeroMQ初体验-28.可靠性-主从模式
2011-05-23 14:47 5540虽然"硬盘模式" ... -
zeroMQ初体验-27.可靠性-硬盘模式
2011-05-23 13:44 3795在之前的种种模式中, ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:05 5654上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:03 1上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-24.可靠性-简单的海盗模式
2011-05-05 16:41 3216相较于“懒惰的”做了 ... -
zeroMQ初体验-23.可靠性-懒惰的海盗模式
2011-05-05 16:15 5067相较于通常的阻塞模式,这里只是做了一点简单的动作来加强系统的可 ... -
zeroMQ初体验-22.可靠性-总览
2011-04-26 19:25 5938在开篇就从曾对zeromq的可靠性做过质疑,不过,作为一个雄心 ... -
rabbitmq 队列长度预设的曲线方案
2011-04-21 14:36 3401zeromq中倒是直接支持这个功能的。 类似于设定队列长度或 ... -
zeroMQ初体验-21.应答模式进阶(七)-云计算
2011-04-18 19:14 3536这里给出了一个最近很火的"云计算"案例。 ... -
zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
2011-04-18 17:22 3881某些时候,为了冗余的需要,可能会有这样的需求: impo ... -
zeroMQ初体验-19.应答模式进阶(五)-异步式应答
2011-04-15 15:23 4847恩,这应该算是比较实 ... -
zeroMQ初体验-18.应答模式进阶(四)-定制路由3
2011-04-02 15:39 5187从经典到超越经典。 首 ...
相关推荐
总的来说,zeromq-2.1.7提供了在Linux环境下进行高效、可靠的消息传递功能,是构建大规模分布式系统的一个重要工具。虽然这是一个较旧的版本,但在某些特定场景下,旧版本可能更稳定,更适合于已知的工作负载和环境...
这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....
zeromq-4.0.3.tar.gz 是一个包含了 ZeroMQ 4.0.3 版本源代码的压缩文件。ZeroMQ,也被称为“零消息队列”或“0MQ”,是一个开源的消息中间件,它提供了一种高效、灵活且可扩展的方式来处理分布式系统中的数据通信。...
zeromq-4.2.3.tar.gz 是ZeroMQ 4.2.3版本的源代码包,这个稳定版本确保了良好的兼容性和可靠性。 首先,让我们深入了解ZeroMQ的核心概念和功能: 1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络...
标题中的"zeromq-4.1.8.tar.gz"指的是ZeroMQ的4.1.8版本的源代码包,通常以tar.gz格式压缩,这是一种在Linux和类Unix系统中常见的归档和压缩方式。 zeromq的核心特性包括点对点连接、发布/订阅模式、请求/响应模式...
`zeromq-2.1.9.tar.gz` 是zeromq的一个特定版本,即2.1.9版,通常以源码形式提供,需要通过编译来安装。 首先,让我们深入了解zeromq的核心概念。zeromq设计了一个灵活的套接字模型,它允许开发者构建复杂的网络...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
zeromq-4.1.3 是一个针对 ZeroMQ 的特定版本的开发工具包,ZeroMQ 是一个开源的消息中间件库,它为分布式计算提供了一种高性能、轻量级的通信框架。ZeroMQ 提供了多种编程语言的绑定,使得开发者能够方便地在不同的...
0MQ(也称为 ZeroMQ 或 ØMQ)是一个开源的消息中间件,它提供了一种轻量级、高性能的异步消息...通过使用“zeromq-4.3.4.tar.gz”,你可以享受到这个版本带来的稳定性和优化,从而更高效地实现跨进程、跨网络的通信。
- **错误修复**:修复了前一版本中已知的bug,增强了系统的稳定性和可靠性。 - **新功能**:可能添加了新的API或功能,以满足更广泛的需求。 - **安全性增强**:可能加强了安全措施,比如加密传输或身份验证机制的...
在zeromq-4.2.0源码包中,你可以找到以下主要组成部分: 1. **源代码**:包含了zeromq的核心库和各种语言的绑定。核心库通常用C++编写,提供了跨平台的API,而绑定则允许开发者使用Python、Java、C#等其他语言与...
标题中的"zeromq-4.2.0.tar.zip"是指ZeroMQ库的4.2.0版本,它被封装在一个ZIP压缩包中,而内部包含的文件是tar归档格式。ZeroMQ是一个开源的消息中间件,它提供了一个高级的消息队列模型,允许应用程序之间进行高效...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
总的来说,zeromq-4.3.2是一个强大且可靠的工具,它简化了网络编程,提高了应用的性能和效率。对于需要进行高效、灵活消息传递的开发者来说,它是一个理想的选择。通过合理利用其特性,开发者可以构建出高并发、可...
这个“zeromq-4.1.0-rc1.zip”压缩包包含了ZeroMQ 4.1.0版本的源代码,这是一个预发布版本(Release Candidate),意味着它是正式版发布前的最后一个测试版本。 ZeroMQ的核心概念是提供一种抽象的网络通信层,允许...
zeromq-3.12.5.tar.gz, libzmq-3.1.2.tar.gz 在Linux环境中,构建和部署分布式计算系统时,Storm是一个常用的选择,它是一个开源的流处理框架,用于实时数据处理。这个压缩包"zeromq-3.12.5.zip"包含了与Storm集群...
这个“zeromq-4.0.1.tar.gz”文件是ZeroMQ的4.0.1版本源代码包,适用于那些需要在网络通信、并发处理或构建微服务架构的开发者。由于从官方网站下载可能速度较慢,此压缩包提供了方便的下载渠道。 ZeroMQ的核心特性...
- 可靠性:支持消息确认和重试机制,确保消息的可靠传输。 - 负载均衡:发布/订阅模式下,消息可以广播给所有订阅者,而推送/拉取模式则可以实现负载均衡。 总的来说,zeromq是一个功能强大、性能优异的开源消息...