- 浏览: 471883 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
kc_hxd_jp:
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
zeroMQ初体验-14.命名机制 进阶 -
kobe1029:
Map<String, Object> args ...
rabbitmq 队列长度预设的曲线方案 -
Sasoritattoo:
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
nginx + gridfs + mongodb 大事记(残) -
3GQQ2012:
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
zeroMQ初体验-15.应答模式进阶(一)-数据的封装 -
iyuan:
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...
zeroMQ初体验-1.简介及C/S模式
上一节末尾有说到协议,zeromq自然做了充沛的封装,"管家模式"便由此而来。
是不是有点像简化版的"偏执模式"?这里的“broker”需要做到"承上启下"。因为这是"协议"的具体实现,自然,这里以api形式给出各个角色的相应实现。
为客户端提供的api:
客户端调用:
服务端api:
服务端调用:
注意:
这里的api全部都是单线程的,不会做心跳,并且不会做错误报告(这里可以根据具体需要修正)。确定连接通路就任务分配的是“管家”:
这里的“管家”基本上做了所有他能做的事:心跳,代理发送信息,合理利用多服务资源。
或许,效能上还有些问题,那么试试"异步"?
如此这般,效能倒是大大降低了(官网说法是降了近20倍),分析了下原因,由于异步需要管理各条任务,不断轮询之类的原因,反倒降低了性能,那么I/O的异步呢?
异步的客户端api:
异步的客户端:
当当当当:
$ time mdclient
同步的:
real 0m14.088s
user 0m1.310s
sys 0m2.670s
异步的:
real 0m8.730s
user 0m0.920s
sys 0m1.550s
10个服务端的异步:
real 0m3.863s
user 0m0.730s
sys 0m0.470s
经过测试,4核的话起8个服务端就算饱和了。不过,就效率而言,应该是足够了。
值得注意到是,"异步管家模式"并非全能。由于他没有做管家的连接重试,所以一旦“管家”崩溃了,那自然一切都say goodbye了。
为了服务更靠谱,或许还需要一个叫做"发现服务"的系统,来确认到底有哪些服务可用。
注:以上皆为lua代码
(未完待续)
是不是有点像简化版的"偏执模式"?这里的“broker”需要做到"承上启下"。因为这是"协议"的具体实现,自然,这里以api形式给出各个角色的相应实现。
为客户端提供的api:
local setmetatable = setmetatable local mdp = require"mdp" local zmq = require"zmq" local zpoller = require"zmq.poller" local zmsg = require"zmsg" require"zhelpers" local s_version_assert = s_version_assert local obj_mt = {} obj_mt.__index = obj_mt function obj_mt:set_timeout(timeout) self.timeout = timeout end function obj_mt:set_retries(retries) self.retries = retries end function obj_mt:destroy() if self.client then self.client:close() end self.context:term() end local function s_mdcli_connect_to_broker(self) -- close old socket. if self.client then self.poller:remove(self.client) self.client:close() end self.client = assert(self.context:socket(zmq.REQ)) assert(self.client:setopt(zmq.LINGER, 0)) assert(self.client:connect(self.broker)) if self.verbose then s_console("I: connecting to broker at %s…", self.broker) end -- add socket to poller self.poller:add(self.client, zmq.POLLIN, function() self.got_reply = true end) end -- -- Send request to broker and get reply by hook or crook -- Returns the reply message or nil if there was no reply. -- function obj_mt:send(service, request) -- Prefix request with protocol frames -- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y) -- Frame 2: Service name (printable string) request:push(service) request:push(mdp.MDPC_CLIENT) if self.verbose then s_console("I: send request to '%s' service:", service) request:dump() end local retries = self.retries while (retries > 0) do local msg = request:dup() msg:send(self.client) self.got_reply = false while true do local cnt = assert(self.poller:poll(self.timeout * 1000)) if cnt ~= 0 and self.got_reply then local msg = zmsg.recv(self.client) if self.verbose then s_console("I: received reply:") msg:dump() end assert(msg:parts() >= 3) local header = msg:pop() assert(header == mdp.MDPC_CLIENT) local reply_service = msg:pop() assert(reply_service == service) return msg else retries = retries - 1 if (retries > 0) then if self.verbose then s_console("W: no reply, reconnecting…") end -- Reconnect s_mdcli_connect_to_broker(self) break -- outer loop will resend request. else if self.verbose then s_console("W: permanent error, abandoning request") end return nil -- Giving up end end end end end module(…) function new(broker, verbose) s_version_assert (2, 1); local self = setmetatable({ context = zmq.init(1), poller = zpoller.new(1), broker = broker, verbose = verbose, timeout = 2500, -- msecs retries = 3, -- before we abandon }, obj_mt) s_mdcli_connect_to_broker(self) return self end setmetatable(_M, { __call = function(self, …) return new(…) end })
客户端调用:
require"mdcliapi" require"zmsg" require"zhelpers" local verbose = (arg[1] == "-v") local session = mdcliapi.new("tcp://localhost:5555", verbose) local count=1 repeat local request = zmsg.new("Hello world") local reply = session:send("echo", request) if not reply then break -- Interrupt or failure end count = count + 1 until (count == 100000) printf("%d requests/replies processed\n", count) session:destroy()
服务端api:
local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable local setmetatable = setmetatable local mdp = require"mdp" local zmq = require"zmq" local zpoller = require"zmq.poller" local zmsg = require"zmsg" require"zhelpers" local s_version_assert = s_version_assert local obj_mt = {} obj_mt.__index = obj_mt function obj_mt:set_heartbeat(heartbeat) self.heartbeat = heartbeat end function obj_mt:set_reconnect(reconnect) self.reconnect = reconnect end function obj_mt:destroy() if self.worker then self.worker:close() end self.context:term() end -- Send message to broker -- If no msg is provided, create one internally local function s_mdwrk_send_to_broker(self, command, option, msg) msg = msg or zmsg.new() -- Stack protocol envelope to start of message if option then msg:push(option) end msg:push(command) msg:push(mdp.MDPW_WORKER) msg:push("") if self.verbose then s_console("I: sending %s to broker", mdp.mdps_commands[command]) msg:dump() end msg:send(self.worker) end local function s_mdwrk_connect_to_broker(self) -- close old socket. if self.worker then self.poller:remove(self.worker) self.worker:close() end self.worker = assert(self.context:socket(zmq.XREQ)) assert(self.worker:setopt(zmq.LINGER, 0)) assert(self.worker:connect(self.broker)) if self.verbose then s_console("I: connecting to broker at %s…", self.broker) end -- Register service with broker s_mdwrk_send_to_broker(self, mdp.MDPW_READY, self.service) -- If liveness hits zero, queue is considered disconnected self.liveness = HEARTBEAT_LIVENESS self.heartbeat_at = s_clock() + self.heartbeat -- add socket to poller self.poller:add(self.worker, zmq.POLLIN, function() self.got_msg = true end) end -- -- Send reply, if any, to broker and wait for next request. -- function obj_mt:recv(reply) -- Format and send the reply if we are provided one if reply then assert(self.reply_to) reply:wrap(self.reply_to, "") self.reply_to = nil s_mdwrk_send_to_broker(self, mdp.MDPW_REPLY, nil, reply) end self.expect_reply = true self.got_msg = false while true do local cnt = assert(self.poller:poll(self.heartbeat * 1000)) if cnt ~= 0 and self.got_msg then self.got_msg = false local msg = zmsg.recv(self.worker) if self.verbose then s_console("I: received message from broker:") msg:dump() end self.liveness = HEARTBEAT_LIVENESS -- Don't try to handle errors, just assert noisily assert(msg:parts() >= 3) local empty = msg:pop() assert(empty == "") local header = msg:pop() assert(header == mdp.MDPW_WORKER) local command = msg:pop() if command == mdp.MDPW_REQUEST then -- We should pop and save as many addresses as there are -- up to a null part, but for now, just save one… self.reply_to = msg:unwrap() return msg -- We have a request to process elseif command == mdp.MDPW_HEARTBEAT then -- Do nothing for heartbeats elseif command == mdp.MDPW_DISCONNECT then -- dis-connect and re-connect to broker. s_mdwrk_connect_to_broker(self) else s_console("E: invalid input message (%d)", command:byte(1,1)) msg:dump() end else self.liveness = self.liveness - 1 if (self.liveness == 0) then if self.verbose then s_console("W: disconnected from broker - retrying…") end -- sleep then Reconnect s_sleep(self.reconnect) s_mdwrk_connect_to_broker(self) end -- Send HEARTBEAT if it's time if (s_clock() > self.heartbeat_at) then s_mdwrk_send_to_broker(self, mdp.MDPW_HEARTBEAT) self.heartbeat_at = s_clock() + self.heartbeat end end end end module(…) function new(broker, service, verbose) s_version_assert(2, 1); local self = setmetatable({ context = zmq.init(1), poller = zpoller.new(1), broker = broker, service = service, verbose = verbose, heartbeat = 2500, -- msecs reconnect = 2500, -- msecs }, obj_mt) s_mdwrk_connect_to_broker(self) return self end setmetatable(_M, { __call = function(self, …) return new(…) end })
服务端调用:
require"mdwrkapi" require"zmsg" local verbose = (arg[1] == "-v") local session = mdwrkapi.new("tcp://localhost:5555", "echo", verbose) local reply while true do local request = session:recv(reply) if not request then break -- Worker was interrupted end reply = request -- Echo is complex… :-) end session:destroy()
注意:
这里的api全部都是单线程的,不会做心跳,并且不会做错误报告(这里可以根据具体需要修正)。确定连接通路就任务分配的是“管家”:
require"zmq" require"zmq.poller" require"zmsg" require"zhelpers" require"mdp" local tremove = table.remove -- We'd normally pull these from config data local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable local HEARTBEAT_INTERVAL = 2500 -- msecs local HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS -- --------------------------------------------------------------------- -- Constructor for broker object -- --------------------------------------------------------------------- -- Broker object's metatable. local broker_mt = {} broker_mt.__index = broker_mt function broker_new(verbose) local context = zmq.init(1) -- Initialize broker state return setmetatable({ context = context, socket = context:socket(zmq.XREP), verbose = verbose, services = {}, workers = {}, waiting = {}, heartbeat_at = s_clock() + HEARTBEAT_INTERVAL, }, broker_mt) end -- --------------------------------------------------------------------- -- Service object local service_mt = {} service_mt.__index = service_mt -- Worker object local worker_mt = {} worker_mt.__index = worker_mt -- helper list remove function local function zlist_remove(list, item) for n=#list,1,-1 do if list[n] == item then tremove(list, n) end end end -- --------------------------------------------------------------------- -- Destructor for broker object function broker_mt:destroy() self.socket:close() self.context:term() for name, service in pairs(self.services) do service:destroy() end for id, worker in pairs(self.workers) do worker:destroy() end end -- --------------------------------------------------------------------- -- Bind broker to endpoint, can call this multiple times -- We use a single socket for both clients and workers. function broker_mt:bind(endpoint) self.socket:bind(endpoint) s_console("I: MDP broker/0.1.1 is active at %s", endpoint) end -- --------------------------------------------------------------------- -- Delete any idle workers that haven't pinged us in a while. Workers -- are oldest to most recent, so we stop at the first alive worker. function broker_mt:purge_workers() local waiting = self.waiting for n=1,#waiting do local worker = waiting[n] if (not worker:expired()) then return -- Worker is alive, we're done here end if (self.verbose) then s_console("I: deleting expired worker: %s", worker.identity) end self:worker_delete(worker, false) end end -- --------------------------------------------------------------------- -- Locate or create new service entry function broker_mt:service_require(name) assert (name) local service = self.services[name] if not service then service = setmetatable({ name = name, requests = {}, waiting = {}, workers = 0, }, service_mt) self.services[name] = service if (self.verbose) then s_console("I: received message:") end end return service end -- --------------------------------------------------------------------- -- Destroy service object, called when service is removed from -- broker.services. function service_mt:destroy() end -- --------------------------------------------------------------------- -- Dispatch requests to waiting workers as possible function broker_mt:service_dispatch(service, msg) assert (service) local requests = service.requests if (msg) then -- Queue message if any requests[#requests + 1] = msg end self:purge_workers() local waiting = service.waiting while (#waiting > 0 and #requests > 0) do local worker = tremove(waiting, 1) -- pop worker from service's waiting queue. zlist_remove(self.waiting, worker) -- also remove worker from broker's waiting queue. local msg = tremove(requests, 1) -- pop request from service's request queue. self:worker_send(worker, mdp.MDPW_REQUEST, nil, msg) end end -- --------------------------------------------------------------------- -- Handle internal service according to 8/MMI specification function broker_mt:service_internal(service_name, msg) if (service_name == "mmi.service") then local name = msg:body() local service = self.services[name] if (service and service.workers) then msg:body_set("200") else msg:body_set("404") end else msg:body_set("501") end -- Remove & save client return envelope and insert the -- protocol header and service name, then rewrap envelope. local client = msg:unwrap() msg:wrap(mdp.MDPC_CLIENT, service_name) msg:wrap(client, "") msg:send(self.socket) end -- --------------------------------------------------------------------- -- Creates worker if necessary function broker_mt:worker_require(identity) assert (identity) -- self.workers is keyed off worker identity local worker = self.workers[identity] if (not worker) then worker = setmetatable({ identity = identity, expiry = 0, }, worker_mt) self.workers[identity] = worker if (self.verbose) then s_console("I: registering new worker: %s", identity) end end return worker end -- --------------------------------------------------------------------- -- Deletes worker from all data structures, and destroys worker function broker_mt:worker_delete(worker, disconnect) assert (worker) if (disconnect) then self:worker_send(worker, mdp.MDPW_DISCONNECT) end local service = worker.service if (service) then zlist_remove (service.waiting, worker) service.workers = service.workers - 1 end zlist_remove (self.waiting, worker) self.workers[worker.identity] = nil worker:destroy() end -- --------------------------------------------------------------------- -- Destroy worker object, called when worker is removed from -- broker.workers. function worker_mt:destroy(argument) end -- --------------------------------------------------------------------- -- Process message sent to us by a worker function broker_mt:worker_process(sender, msg) assert (msg:parts() >= 1) -- At least, command local command = msg:pop() local worker_ready = (self.workers[sender] ~= nil) local worker = self:worker_require(sender) if (command == mdp.MDPW_READY) then if (worker_ready) then -- Not first command in session then self:worker_delete(worker, true) elseif (sender:sub(1,4) == "mmi.") then -- Reserved service name self:worker_delete(worker, true) else -- Attach worker to service and mark as idle local service_name = msg:pop() local service = self:service_require(service_name) worker.service = service service.workers = service.workers + 1 self:worker_waiting(worker) end elseif (command == mdp.MDPW_REPLY) then if (worker_ready) then -- Remove & save client return envelope and insert the -- protocol header and service name, then rewrap envelope. local client = msg:unwrap() msg:wrap(mdp.MDPC_CLIENT, worker.service.name) msg:wrap(client, "") msg:send(self.socket) self:worker_waiting(worker) else self:worker_delete(worker, true) end elseif (command == mdp.MDPW_HEARTBEAT) then if (worker_ready) then worker.expiry = s_clock() + HEARTBEAT_EXPIRY else self:worker_delete(worker, true) end elseif (command == mdp.MDPW_DISCONNECT) then self:worker_delete(worker, false) else s_console("E: invalid input message (%d)", command:byte(1,1)) msg:dump() end end -- --------------------------------------------------------------------- -- Send message to worker -- If pointer to message is provided, sends & destroys that message function broker_mt:worker_send(worker, command, option, msg) msg = msg and msg:dup() or zmsg.new() -- Stack protocol envelope to start of message if (option) then -- Optional frame after command msg:push(option) end msg:push(command) msg:push(mdp.MDPW_WORKER) -- Stack routing envelope to start of message msg:wrap(worker.identity, "") if (self.verbose) then s_console("I: sending %s to worker", mdp.mdps_commands[command]) msg:dump() end msg:send(self.socket) end -- --------------------------------------------------------------------- -- This worker is now waiting for work function broker_mt:worker_waiting(worker) -- Queue to broker and service waiting lists self.waiting[#self.waiting + 1] = worker worker.service.waiting[#worker.service.waiting + 1] = worker worker.expiry = s_clock() + HEARTBEAT_EXPIRY self:service_dispatch(worker.service, nil) end -- --------------------------------------------------------------------- -- Return 1 if worker has expired and must be deleted function worker_mt:expired() return (self.expiry < s_clock()) end -- --------------------------------------------------------------------- -- Process a request coming from a client function broker_mt:client_process(sender, msg) assert (msg:parts() >= 2) -- Service name + body local service_name = msg:pop() local service = self:service_require(service_name) -- Set reply return address to client sender msg:wrap(sender, "") if (service_name:sub(1,4) == "mmi.") then self:service_internal(service_name, msg) else self:service_dispatch(service, msg) end end -- --------------------------------------------------------------------- -- Main broker work happens here local verbose = (arg[1] == "-v") s_version_assert (2, 1) s_catch_signals () local self = broker_new(verbose) self:bind("tcp://*:5555") local poller = zmq.poller.new(1) -- Process next input message, if any poller:add(self.socket, zmq.POLLIN, function() local msg = zmsg.recv(self.socket) if (self.verbose) then s_console("I: received message:") msg:dump() end local sender = msg:pop() local empty = msg:pop() local header = msg:pop() if (header == mdp.MDPC_CLIENT) then self:client_process(sender, msg) elseif (header == mdp.MDPW_WORKER) then self:worker_process(sender, msg) else s_console("E: invalid message:") msg:dump() end end) -- Get and process messages forever or until interrupted while (not s_interrupted) do local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000)) -- Disconnect and delete any expired workers -- Send heartbeats to idle workers if needed if (s_clock() > self.heartbeat_at) then self:purge_workers() local waiting = self.waiting for n=1,#waiting do local worker = waiting[n] self:worker_send(worker, mdp.MDPW_HEARTBEAT) end self.heartbeat_at = s_clock() + HEARTBEAT_INTERVAL end end if (s_interrupted) then printf("W: interrupt received, shutting down…\n") end self:destroy()
这里的“管家”基本上做了所有他能做的事:心跳,代理发送信息,合理利用多服务资源。
或许,效能上还有些问题,那么试试"异步"?
require"zmq" require"zmq.threads" require"zmsg" local common_code = [[ require"zmq" require"zmsg" require"zhelpers" ]] local client_task = common_code .. [[ local context = zmq.init(1) local client = context:socket(zmq.XREQ) client:setopt(zmq.IDENTITY, "C", 1) client:connect("tcp://localhost:5555") printf("Setting up test…\n") s_sleep(100) local requests local start printf("Synchronous round-trip test…\n") requests = 10000 start = s_clock() for n=1,requests do local msg = zmsg.new("HELLO") msg:send(client) msg = zmsg.recv(client) end printf(" %d calls/second\n", (1000 * requests) / (s_clock() - start)) printf("Asynchronous round-trip test…\n") requests = 100000 start = s_clock() for n=1,requests do local msg = zmsg.new("HELLO") msg:send(client) end for n=1,requests do local msg = zmsg.recv(client) end printf(" %d calls/second\n", (1000 * requests) / (s_clock() - start)) client:close() context:term() ]] local worker_task = common_code .. [[ local context = zmq.init(1) local worker = context:socket(zmq.XREQ) worker:setopt(zmq.IDENTITY, "W", 1) worker:connect("tcp://localhost:5556") while true do local msg = zmsg.recv(worker) msg:send(worker) end worker:close() context:term() ]] local broker_task = common_code .. [[ -- Prepare our context and sockets local context = zmq.init(1) local frontend = context:socket(zmq.XREP) local backend = context:socket(zmq.XREP) frontend:bind("tcp://*:5555") backend:bind("tcp://*:5556") require"zmq.poller" local poller = zmq.poller(2) poller:add(frontend, zmq.POLLIN, function() local msg = zmsg.recv(frontend) --msg[1] = "W" msg:pop() msg:push("W") msg:send(backend) end) poller:add(backend, zmq.POLLIN, function() local msg = zmsg.recv(backend) --msg[1] = "C" msg:pop() msg:push("C") msg:send(frontend) end) poller:start() frontend:close() backend:close() context:term() ]] s_version_assert(2, 1) local client = zmq.threads.runstring(nil, client_task) assert(client:start()) local worker = zmq.threads.runstring(nil, worker_task) assert(worker:start(true)) local broker = zmq.threads.runstring(nil, broker_task) assert(broker:start(true)) assert(client:join())
如此这般,效能倒是大大降低了(官网说法是降了近20倍),分析了下原因,由于异步需要管理各条任务,不断轮询之类的原因,反倒降低了性能,那么I/O的异步呢?
异步的客户端api:
local setmetatable = setmetatable local mdp = require"mdp" local zmq = require"zmq" local zpoller = require"zmq.poller" local zmsg = require"zmsg" require"zhelpers" local s_version_assert = s_version_assert local obj_mt = {} obj_mt.__index = obj_mt function obj_mt:set_timeout(timeout) self.timeout = timeout end function obj_mt:destroy() if self.client then self.client:close() end self.context:term() end local function s_mdcli_connect_to_broker(self) -- close old socket. if self.client then self.poller:remove(self.client) self.client:close() end self.client = assert(self.context:socket(zmq.XREQ)) assert(self.client:setopt(zmq.LINGER, 0)) assert(self.client:connect(self.broker)) if self.verbose then s_console("I: connecting to broker at %s…", self.broker) end -- add socket to poller self.poller:add(self.client, zmq.POLLIN, function() self.got_reply = true end) end -- -- Send request to broker and get reply by hook or crook -- function obj_mt:send(service, request) -- Prefix request with protocol frames -- Frame 0: empty (REQ emulation) -- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y) -- Frame 2: Service name (printable string) request:push(service) request:push(mdp.MDPC_CLIENT) request:push("") if self.verbose then s_console("I: send request to '%s' service:", service) request:dump() end request:send(self.client) return 0 end -- Returns the reply message or NULL if there was no reply. Does not -- attempt to recover from a broker failure, this is not possible -- without storing all unanswered requests and resending them all… function obj_mt:recv() self.got_reply = false local cnt = assert(self.poller:poll(self.timeout * 1000)) if cnt ~= 0 and self.got_reply then local msg = zmsg.recv(self.client) if self.verbose then s_console("I: received reply:") msg:dump() end assert(msg:parts() >= 3) local empty = msg:pop() assert(empty == "") local header = msg:pop() assert(header == mdp.MDPC_CLIENT) return msg end if self.verbose then s_console("W: permanent error, abandoning request") end return nil -- Giving up end module(…) function new(broker, verbose) s_version_assert (2, 1); local self = setmetatable({ context = zmq.init(1), poller = zpoller.new(1), broker = broker, verbose = verbose, timeout = 2500, -- msecs }, obj_mt) s_mdcli_connect_to_broker(self) return self end setmetatable(_M, { __call = function(self, …) return new(…) end })
异步的客户端:
require"mdcliapi2" require"zmsg" require"zhelpers" local verbose = (arg[1] == "-v") local session = mdcliapi2.new("tcp://localhost:5555", verbose) local count=100000 for n=1,count do local request = zmsg.new("Hello world") session:send("echo", request) end for n=1,count do local reply = session:recv() if not reply then break -- Interrupted by Ctrl-C end end printf("%d replies received\n", count) session:destroy()
当当当当:
$ time mdclient
同步的:
real 0m14.088s
user 0m1.310s
sys 0m2.670s
异步的:
real 0m8.730s
user 0m0.920s
sys 0m1.550s
10个服务端的异步:
real 0m3.863s
user 0m0.730s
sys 0m0.470s
经过测试,4核的话起8个服务端就算饱和了。不过,就效率而言,应该是足够了。
值得注意到是,"异步管家模式"并非全能。由于他没有做管家的连接重试,所以一旦“管家”崩溃了,那自然一切都say goodbye了。
为了服务更靠谱,或许还需要一个叫做"发现服务"的系统,来确认到底有哪些服务可用。
require"mdcliapi" require"zmsg" require"zhelpers" local verbose = (arg[1] == "-v") local session = mdcliapi.new("tcp://localhost:5555", verbose) -- This is the service we want to look up local request = zmsg.new("echo") -- This is the service we send our request to local reply = session:send("mmi.service", request) if (reply) then printf ("Lookup echo service: %s\n", reply:body()) else printf ("E: no response from broker, make sure it's running\n") end session:destroy()
注:以上皆为lua代码
(未完待续)
发表评论
-
IM选型(初)
2016-08-23 19:12 1643主要参考文章: https://r ... -
关于python和rabbitmq的那点事儿
2011-10-19 14:15 7963rabbitmq是一个消息中间件,在之前的zmq介绍中有略带提 ... -
zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
2011-05-26 16:09 4198服务器: // // Clone server Mod ... -
zeroMQ初体验-33.发布/订阅模式进阶-克隆模式-中
2011-05-26 15:37 2930临时缓存 现实中,比如 ... -
zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
2011-05-26 15:04 3659在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导 ... -
zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
2011-05-25 16:55 2756作为发布/订阅模式的一个常用场景,大数据量的组播是有必要的。虽 ... -
zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
2011-05-25 16:24 4549在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅 ... -
zeroMQ初体验-29.可靠性-自由模式
2011-05-24 17:02 5407好吧,本以为这可能是一个更靠谱的模式,谁知(其实是我一厢情愿了 ... -
zeroMQ初体验-28.可靠性-主从模式
2011-05-23 14:47 5540虽然"硬盘模式" ... -
zeroMQ初体验-27.可靠性-硬盘模式
2011-05-23 13:44 3795在之前的种种模式中, ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:03 1上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-25.可靠性-偏执的海盗模式
2011-05-05 19:05 3594虽然说“简单的海盗模 ... -
zeroMQ初体验-24.可靠性-简单的海盗模式
2011-05-05 16:41 3216相较于“懒惰的”做了 ... -
zeroMQ初体验-23.可靠性-懒惰的海盗模式
2011-05-05 16:15 5067相较于通常的阻塞模式,这里只是做了一点简单的动作来加强系统的可 ... -
zeroMQ初体验-22.可靠性-总览
2011-04-26 19:25 5938在开篇就从曾对zeromq的可靠性做过质疑,不过,作为一个雄心 ... -
rabbitmq 队列长度预设的曲线方案
2011-04-21 14:36 3401zeromq中倒是直接支持这个功能的。 类似于设定队列长度或 ... -
zeroMQ初体验-21.应答模式进阶(七)-云计算
2011-04-18 19:14 3536这里给出了一个最近很火的"云计算"案例。 ... -
zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
2011-04-18 17:22 3881某些时候,为了冗余的需要,可能会有这样的需求: impo ... -
zeroMQ初体验-19.应答模式进阶(五)-异步式应答
2011-04-15 15:23 4847恩,这应该算是比较实 ... -
zeroMQ初体验-18.应答模式进阶(四)-定制路由3
2011-04-02 15:39 5187从经典到超越经典。 首 ...
相关推荐
总的来说,zeromq-2.1.7提供了在Linux环境下进行高效、可靠的消息传递功能,是构建大规模分布式系统的一个重要工具。虽然这是一个较旧的版本,但在某些特定场景下,旧版本可能更稳定,更适合于已知的工作负载和环境...
这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....
zeromq-4.0.3.tar.gz 是一个包含了 ZeroMQ 4.0.3 版本源代码的压缩文件。ZeroMQ,也被称为“零消息队列”或“0MQ”,是一个开源的消息中间件,它提供了一种高效、灵活且可扩展的方式来处理分布式系统中的数据通信。...
zeromq-4.2.3.tar.gz 是ZeroMQ 4.2.3版本的源代码包,这个稳定版本确保了良好的兼容性和可靠性。 首先,让我们深入了解ZeroMQ的核心概念和功能: 1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络...
标题中的"zeromq-4.1.8.tar.gz"指的是ZeroMQ的4.1.8版本的源代码包,通常以tar.gz格式压缩,这是一种在Linux和类Unix系统中常见的归档和压缩方式。 zeromq的核心特性包括点对点连接、发布/订阅模式、请求/响应模式...
`zeromq-2.1.9.tar.gz` 是zeromq的一个特定版本,即2.1.9版,通常以源码形式提供,需要通过编译来安装。 首先,让我们深入了解zeromq的核心概念。zeromq设计了一个灵活的套接字模型,它允许开发者构建复杂的网络...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
zeromq-4.1.3 是一个针对 ZeroMQ 的特定版本的开发工具包,ZeroMQ 是一个开源的消息中间件库,它为分布式计算提供了一种高性能、轻量级的通信框架。ZeroMQ 提供了多种编程语言的绑定,使得开发者能够方便地在不同的...
0MQ(也称为 ZeroMQ 或 ØMQ)是一个开源的消息中间件,它提供了一种轻量级、高性能的异步消息...通过使用“zeromq-4.3.4.tar.gz”,你可以享受到这个版本带来的稳定性和优化,从而更高效地实现跨进程、跨网络的通信。
- **错误修复**:修复了前一版本中已知的bug,增强了系统的稳定性和可靠性。 - **新功能**:可能添加了新的API或功能,以满足更广泛的需求。 - **安全性增强**:可能加强了安全措施,比如加密传输或身份验证机制的...
在zeromq-4.2.0源码包中,你可以找到以下主要组成部分: 1. **源代码**:包含了zeromq的核心库和各种语言的绑定。核心库通常用C++编写,提供了跨平台的API,而绑定则允许开发者使用Python、Java、C#等其他语言与...
标题中的"zeromq-4.2.0.tar.zip"是指ZeroMQ库的4.2.0版本,它被封装在一个ZIP压缩包中,而内部包含的文件是tar归档格式。ZeroMQ是一个开源的消息中间件,它提供了一个高级的消息队列模型,允许应用程序之间进行高效...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
总的来说,zeromq-4.3.2是一个强大且可靠的工具,它简化了网络编程,提高了应用的性能和效率。对于需要进行高效、灵活消息传递的开发者来说,它是一个理想的选择。通过合理利用其特性,开发者可以构建出高并发、可...
这个“zeromq-4.1.0-rc1.zip”压缩包包含了ZeroMQ 4.1.0版本的源代码,这是一个预发布版本(Release Candidate),意味着它是正式版发布前的最后一个测试版本。 ZeroMQ的核心概念是提供一种抽象的网络通信层,允许...
zeromq-3.12.5.tar.gz, libzmq-3.1.2.tar.gz 在Linux环境中,构建和部署分布式计算系统时,Storm是一个常用的选择,它是一个开源的流处理框架,用于实时数据处理。这个压缩包"zeromq-3.12.5.zip"包含了与Storm集群...
这个“zeromq-4.0.1.tar.gz”文件是ZeroMQ的4.0.1版本源代码包,适用于那些需要在网络通信、并发处理或构建微服务架构的开发者。由于从官方网站下载可能速度较慢,此压缩包提供了方便的下载渠道。 ZeroMQ的核心特性...
- 可靠性:支持消息确认和重试机制,确保消息的可靠传输。 - 负载均衡:发布/订阅模式下,消息可以广播给所有订阅者,而推送/拉取模式则可以实现负载均衡。 总的来说,zeromq是一个功能强大、性能优异的开源消息...