`
iyuan
  • 浏览: 471855 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-26.可靠性-管家模式

    博客分类:
  • MQ
阅读更多
上一节末尾有说到协议,zeromq自然做了充沛的封装,"管家模式"便由此而来。


是不是有点像简化版的"偏执模式"?这里的“broker”需要做到"承上启下"。因为这是"协议"的具体实现,自然,这里以api形式给出各个角色的相应实现。

为客户端提供的api:
local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_timeout(timeout)
    self.timeout = timeout
end

function obj_mt:set_retries(retries)
    self.retries = retries
end

function obj_mt:destroy()
    if self.client then self.client:close() end
    self.context:term()
end

local function s_mdcli_connect_to_broker(self)
    -- close old socket.
    if self.client then
        self.poller:remove(self.client)
        self.client:close()
    end
    self.client = assert(self.context:socket(zmq.REQ))
    assert(self.client:setopt(zmq.LINGER, 0))
    assert(self.client:connect(self.broker))
    if self.verbose then
        s_console("I: connecting to broker at %s…", self.broker)
    end
    -- add socket to poller
    self.poller:add(self.client, zmq.POLLIN, function()
        self.got_reply = true
    end)
end

--
-- Send request to broker and get reply by hook or crook
-- Returns the reply message or nil if there was no reply.
--
function obj_mt:send(service, request)
    -- Prefix request with protocol frames
    -- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    -- Frame 2: Service name (printable string)
    request:push(service)
    request:push(mdp.MDPC_CLIENT)
    if self.verbose then
        s_console("I: send request to '%s' service:", service)
        request:dump()
    end

    local retries = self.retries
    while (retries > 0) do
        local msg = request:dup()
        msg:send(self.client)
        self.got_reply = false

        while true do
            local cnt = assert(self.poller:poll(self.timeout * 1000))
            if cnt ~= 0 and self.got_reply then
                local msg = zmsg.recv(self.client)
                if self.verbose then
                    s_console("I: received reply:")
                    msg:dump()
                end
                assert(msg:parts() >= 3)

                local header = msg:pop()
                assert(header == mdp.MDPC_CLIENT)
                local reply_service = msg:pop()
                assert(reply_service == service)
                return msg
            else
                retries = retries - 1
                if (retries > 0) then
                    if self.verbose then
                        s_console("W: no reply, reconnecting…")
                    end
                    -- Reconnect
                    s_mdcli_connect_to_broker(self)
                    break -- outer loop will resend request.
                else
                    if self.verbose then
                        s_console("W: permanent error, abandoning request")
                    end
                    return nil -- Giving up
                end
            end
        end
    end
end

module(…)

function new(broker, verbose)
    s_version_assert (2, 1);
    local self = setmetatable({
        context = zmq.init(1),
        poller = zpoller.new(1),
        broker = broker,
        verbose = verbose,
        timeout = 2500, -- msecs
        retries = 3,    -- before we abandon
    }, obj_mt)

    s_mdcli_connect_to_broker(self)
    return self
end

setmetatable(_M, { __call = function(self, …) return new(…) end })

客户端调用:
require"mdcliapi"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi.new("tcp://localhost:5555", verbose)

local count=1
repeat
    local request = zmsg.new("Hello world")
    local reply = session:send("echo", request)
    if not reply then
        break    --  Interrupt or failure
    end
    count = count + 1
until (count == 100000)
printf("%d requests/replies processed\n", count)
session:destroy()

服务端api:
local HEARTBEAT_LIVENESS = 3  -- 3-5 is reasonable

local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_heartbeat(heartbeat)
    self.heartbeat = heartbeat
end

function obj_mt:set_reconnect(reconnect)
    self.reconnect = reconnect
end

function obj_mt:destroy()
    if self.worker then self.worker:close() end
    self.context:term()
end

-- Send message to broker
-- If no msg is provided, create one internally
local function s_mdwrk_send_to_broker(self, command, option, msg)
    msg = msg or zmsg.new()

    -- Stack protocol envelope to start of message
    if option then
        msg:push(option)
    end
    msg:push(command)
    msg:push(mdp.MDPW_WORKER)
    msg:push("")

    if self.verbose then
        s_console("I: sending %s to broker", mdp.mdps_commands[command])
        msg:dump()
    end
    msg:send(self.worker)
end

local function s_mdwrk_connect_to_broker(self)
    -- close old socket.
    if self.worker then
        self.poller:remove(self.worker)
        self.worker:close()
    end
    self.worker = assert(self.context:socket(zmq.XREQ))
    assert(self.worker:setopt(zmq.LINGER, 0))
    assert(self.worker:connect(self.broker))
    if self.verbose then
        s_console("I: connecting to broker at %s…", self.broker)
    end
    -- Register service with broker
    s_mdwrk_send_to_broker(self, mdp.MDPW_READY, self.service)
    -- If liveness hits zero, queue is considered disconnected
    self.liveness = HEARTBEAT_LIVENESS
    self.heartbeat_at = s_clock() + self.heartbeat
    -- add socket to poller
    self.poller:add(self.worker, zmq.POLLIN, function()
        self.got_msg = true
    end)
end

--
-- Send reply, if any, to broker and wait for next request.
--
function obj_mt:recv(reply)
    -- Format and send the reply if we are provided one
    if reply then
        assert(self.reply_to)
        reply:wrap(self.reply_to, "")
        self.reply_to = nil
        s_mdwrk_send_to_broker(self, mdp.MDPW_REPLY, nil, reply)
    end
    self.expect_reply = true

    self.got_msg = false
    while true do
        local cnt = assert(self.poller:poll(self.heartbeat * 1000))
        if cnt ~= 0 and self.got_msg then
            self.got_msg = false
            local msg = zmsg.recv(self.worker)
            if self.verbose then
                s_console("I: received message from broker:")
                msg:dump()
            end
            self.liveness = HEARTBEAT_LIVENESS
            -- Don't try to handle errors, just assert noisily
            assert(msg:parts() >= 3)

            local empty = msg:pop()
            assert(empty == "")

            local header = msg:pop()
            assert(header == mdp.MDPW_WORKER)

            local command = msg:pop()
            if command == mdp.MDPW_REQUEST then
                -- We should pop and save as many addresses as there are
                -- up to a null part, but for now, just save one…
                self.reply_to = msg:unwrap()
                return msg -- We have a request to process
            elseif command == mdp.MDPW_HEARTBEAT then
                -- Do nothing for heartbeats
            elseif command == mdp.MDPW_DISCONNECT then
                -- dis-connect and re-connect to broker.
                s_mdwrk_connect_to_broker(self)
            else
                s_console("E: invalid input message (%d)", command:byte(1,1))
                msg:dump()
            end
        else
            self.liveness = self.liveness - 1
            if (self.liveness == 0) then
                if self.verbose then
                    s_console("W: disconnected from broker - retrying…")
                end
                -- sleep then Reconnect
                s_sleep(self.reconnect)
                s_mdwrk_connect_to_broker(self)
            end

            -- Send HEARTBEAT if it's time
            if (s_clock() > self.heartbeat_at) then
                s_mdwrk_send_to_broker(self, mdp.MDPW_HEARTBEAT)
                self.heartbeat_at = s_clock() + self.heartbeat
            end
        end
    end
end

module(…)

function new(broker, service, verbose)
    s_version_assert(2, 1);
    local self = setmetatable({
        context = zmq.init(1),
        poller = zpoller.new(1),
        broker = broker,
        service = service,
        verbose = verbose,
        heartbeat = 2500, -- msecs
        reconnect = 2500, -- msecs
    }, obj_mt)

    s_mdwrk_connect_to_broker(self)
    return self
end

setmetatable(_M, { __call = function(self, …) return new(…) end })

服务端调用:
require"mdwrkapi"
require"zmsg"

local verbose = (arg[1] == "-v")
local session = mdwrkapi.new("tcp://localhost:5555", "echo", verbose)

local reply
while true do
    local request = session:recv(reply)
    if not request then
        break              --  Worker was interrupted
    end
    reply = request        --  Echo is complex… :-)
end
session:destroy()


注意:
这里的api全部都是单线程的,不会做心跳,并且不会做错误报告(这里可以根据具体需要修正)。确定连接通路就任务分配的是“管家”:
require"zmq"
require"zmq.poller"
require"zmsg"
require"zhelpers"
require"mdp"

local tremove = table.remove

--  We'd normally pull these from config data

local HEARTBEAT_LIVENESS   = 3       --  3-5 is reasonable
local HEARTBEAT_INTERVAL   = 2500    --  msecs
local HEARTBEAT_EXPIRY     = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

--  ---------------------------------------------------------------------
--  Constructor for broker object

--  ---------------------------------------------------------------------
--  Broker object's metatable.
local broker_mt = {}
broker_mt.__index = broker_mt

function broker_new(verbose)
    local context = zmq.init(1)
    --  Initialize broker state
    return setmetatable({
        context = context,
        socket = context:socket(zmq.XREP),
        verbose = verbose,
        services = {},
        workers = {},
        waiting = {},
        heartbeat_at = s_clock() + HEARTBEAT_INTERVAL,
    }, broker_mt)
end

--  ---------------------------------------------------------------------
--  Service object
local service_mt = {}
service_mt.__index = service_mt

--  Worker object
local worker_mt = {}
worker_mt.__index = worker_mt

-- helper list remove function
local function zlist_remove(list, item)
    for n=#list,1,-1 do
        if list[n] == item then
            tremove(list, n)
        end
    end
end

--  ---------------------------------------------------------------------
--  Destructor for broker object

function broker_mt:destroy()
    self.socket:close()
    self.context:term()
    for name, service in pairs(self.services) do
        service:destroy()
    end
    for id, worker in pairs(self.workers) do
        worker:destroy()
    end
end

--  ---------------------------------------------------------------------
--  Bind broker to endpoint, can call this multiple times
--  We use a single socket for both clients and workers.

function broker_mt:bind(endpoint)
    self.socket:bind(endpoint)
    s_console("I: MDP broker/0.1.1 is active at %s", endpoint)
end

--  ---------------------------------------------------------------------
--  Delete any idle workers that haven't pinged us in a while. Workers
--  are oldest to most recent, so we stop at the first alive worker.

function broker_mt:purge_workers()
    local waiting = self.waiting
    for n=1,#waiting do
        local worker = waiting[n]
        if (not worker:expired()) then
            return             --  Worker is alive, we're done here
        end
        if (self.verbose) then
            s_console("I: deleting expired worker: %s", worker.identity)
        end

        self:worker_delete(worker, false)
    end
end

--  ---------------------------------------------------------------------
--  Locate or create new service entry

function broker_mt:service_require(name)
    assert (name)
    local service = self.services[name]
    if not service then
        service = setmetatable({
            name = name,
            requests = {},
            waiting = {},
            workers = 0,
        }, service_mt)
        self.services[name] = service
        if (self.verbose) then
            s_console("I: received message:")
        end
    end
    return service
end

--  ---------------------------------------------------------------------
--  Destroy service object, called when service is removed from
--  broker.services.

function service_mt:destroy()
end

--  ---------------------------------------------------------------------
--  Dispatch requests to waiting workers as possible

function broker_mt:service_dispatch(service, msg)
    assert (service)
    local requests = service.requests
    if (msg) then               --  Queue message if any
        requests[#requests + 1] = msg
    end

    self:purge_workers()
    local waiting = service.waiting
    while (#waiting > 0 and #requests > 0) do
        local worker = tremove(waiting, 1) -- pop worker from service's waiting queue.
        zlist_remove(self.waiting, worker) -- also remove worker from broker's waiting queue.
        local msg = tremove(requests, 1) -- pop request from service's request queue.
        self:worker_send(worker, mdp.MDPW_REQUEST, nil, msg)
    end
end

--  ---------------------------------------------------------------------
--  Handle internal service according to 8/MMI specification

function broker_mt:service_internal(service_name, msg)
    if (service_name == "mmi.service") then
        local name = msg:body()
        local service = self.services[name]
        if (service and service.workers) then
            msg:body_set("200")
        else
            msg:body_set("404")
        end
    else
        msg:body_set("501")
    end

    --  Remove & save client return envelope and insert the
    --  protocol header and service name, then rewrap envelope.
    local client = msg:unwrap()
    msg:wrap(mdp.MDPC_CLIENT, service_name)
    msg:wrap(client, "")

    msg:send(self.socket)
end

--  ---------------------------------------------------------------------
--  Creates worker if necessary

function broker_mt:worker_require(identity)
    assert (identity)

    --  self.workers is keyed off worker identity
    local worker = self.workers[identity]
    if (not worker) then
        worker = setmetatable({
            identity = identity,
            expiry = 0,
        }, worker_mt)
        self.workers[identity] = worker
        if (self.verbose) then
            s_console("I: registering new worker: %s", identity)
        end
    end
    return worker
end

--  ---------------------------------------------------------------------
--  Deletes worker from all data structures, and destroys worker

function broker_mt:worker_delete(worker, disconnect)
    assert (worker)
    if (disconnect) then
        self:worker_send(worker, mdp.MDPW_DISCONNECT)
    end
    local service = worker.service
    if (service) then
        zlist_remove (service.waiting, worker)
        service.workers = service.workers - 1
    end
    zlist_remove (self.waiting, worker)
    self.workers[worker.identity] = nil
    worker:destroy()
end

--  ---------------------------------------------------------------------
--  Destroy worker object, called when worker is removed from
--  broker.workers.

function worker_mt:destroy(argument)
end

--  ---------------------------------------------------------------------
--  Process message sent to us by a worker

function broker_mt:worker_process(sender, msg)
    assert (msg:parts() >= 1)     --  At least, command

    local command = msg:pop()
    local worker_ready = (self.workers[sender] ~= nil)
    local worker = self:worker_require(sender)

    if (command == mdp.MDPW_READY) then
        if (worker_ready) then          --  Not first command in session then
            self:worker_delete(worker, true)
        elseif (sender:sub(1,4) == "mmi.") then  --  Reserved service name
            self:worker_delete(worker, true)
        else
            --  Attach worker to service and mark as idle
            local service_name = msg:pop()
            local service = self:service_require(service_name)
            worker.service = service
            service.workers = service.workers + 1
            self:worker_waiting(worker)
        end
    elseif (command == mdp.MDPW_REPLY) then
        if (worker_ready) then
            --  Remove & save client return envelope and insert the
            --  protocol header and service name, then rewrap envelope.
            local client = msg:unwrap()
            msg:wrap(mdp.MDPC_CLIENT, worker.service.name)
            msg:wrap(client, "")

            msg:send(self.socket)
            self:worker_waiting(worker)
        else
            self:worker_delete(worker, true)
        end
    elseif (command == mdp.MDPW_HEARTBEAT) then
        if (worker_ready) then
            worker.expiry = s_clock() + HEARTBEAT_EXPIRY
        else
            self:worker_delete(worker, true)
        end
    elseif (command == mdp.MDPW_DISCONNECT) then
        self:worker_delete(worker, false)
    else
        s_console("E: invalid input message (%d)", command:byte(1,1))
        msg:dump()
    end
end

--  ---------------------------------------------------------------------
--  Send message to worker
--  If pointer to message is provided, sends & destroys that message

function broker_mt:worker_send(worker, command, option, msg)
    msg = msg and msg:dup() or zmsg.new()

    --  Stack protocol envelope to start of message
    if (option) then                 --  Optional frame after command
        msg:push(option)
    end
    msg:push(command)
    msg:push(mdp.MDPW_WORKER)
    --  Stack routing envelope to start of message
    msg:wrap(worker.identity, "")

    if (self.verbose) then
        s_console("I: sending %s to worker", mdp.mdps_commands[command])
        msg:dump()
    end
    msg:send(self.socket)
end

--  ---------------------------------------------------------------------
--  This worker is now waiting for work

function broker_mt:worker_waiting(worker)
    --  Queue to broker and service waiting lists
    self.waiting[#self.waiting + 1] = worker
    worker.service.waiting[#worker.service.waiting + 1] = worker
    worker.expiry = s_clock() + HEARTBEAT_EXPIRY
    self:service_dispatch(worker.service, nil)
end

--  ---------------------------------------------------------------------
--  Return 1 if worker has expired and must be deleted

function worker_mt:expired()
    return (self.expiry < s_clock())
end
--  ---------------------------------------------------------------------
--  Process a request coming from a client

function broker_mt:client_process(sender, msg)
    assert (msg:parts() >= 2)     --  Service name + body

    local service_name = msg:pop()
    local service = self:service_require(service_name)
    --  Set reply return address to client sender
    msg:wrap(sender, "")
    if (service_name:sub(1,4) == "mmi.") then
        self:service_internal(service_name, msg)
    else
        self:service_dispatch(service, msg)
    end
end

--  ---------------------------------------------------------------------
--  Main broker work happens here

local verbose = (arg[1] == "-v")

s_version_assert (2, 1)
s_catch_signals ()
local self = broker_new(verbose)
self:bind("tcp://*:5555")

local poller = zmq.poller.new(1)

--  Process next input message, if any
poller:add(self.socket, zmq.POLLIN, function()
    local msg = zmsg.recv(self.socket)
    if (self.verbose) then
        s_console("I: received message:")
        msg:dump()
    end
    local sender = msg:pop()
    local empty  = msg:pop()
    local header = msg:pop()

    if (header == mdp.MDPC_CLIENT) then
        self:client_process(sender, msg)
    elseif (header == mdp.MDPW_WORKER) then
        self:worker_process(sender, msg)
    else
        s_console("E: invalid message:")
        msg:dump()
    end
end)

--  Get and process messages forever or until interrupted
while (not s_interrupted) do
    local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
    --  Disconnect and delete any expired workers
    --  Send heartbeats to idle workers if needed
    if (s_clock() > self.heartbeat_at) then
        self:purge_workers()
        local waiting = self.waiting
        for n=1,#waiting do
            local worker = waiting[n]
            self:worker_send(worker, mdp.MDPW_HEARTBEAT)
        end
        self.heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
    end
end
if (s_interrupted) then
    printf("W: interrupt received, shutting down…\n")
end
self:destroy()


这里的“管家”基本上做了所有他能做的事:心跳,代理发送信息,合理利用多服务资源。
或许,效能上还有些问题,那么试试"异步"?
require"zmq"
require"zmq.threads"
require"zmsg"

local common_code = [[
    require"zmq"
    require"zmsg"
    require"zhelpers"
]]

local client_task = common_code .. [[
    local context = zmq.init(1)
    local client = context:socket(zmq.XREQ)
    client:setopt(zmq.IDENTITY, "C", 1)
    client:connect("tcp://localhost:5555")

    printf("Setting up test…\n")
    s_sleep(100)

    local requests
    local start

    printf("Synchronous round-trip test…\n")
    requests = 10000
    start = s_clock()
    for n=1,requests do
        local msg = zmsg.new("HELLO")
        msg:send(client)
        msg = zmsg.recv(client)
    end
    printf(" %d calls/second\n",
        (1000 * requests) / (s_clock() - start))

    printf("Asynchronous round-trip test…\n")
    requests = 100000
    start = s_clock()
    for n=1,requests do
        local msg = zmsg.new("HELLO")
        msg:send(client)
    end
    for n=1,requests do
        local msg = zmsg.recv(client)
    end
    printf(" %d calls/second\n",
        (1000 * requests) / (s_clock() - start))

    client:close()
    context:term()
]]

local worker_task = common_code .. [[
    local context = zmq.init(1)
    local worker = context:socket(zmq.XREQ)
    worker:setopt(zmq.IDENTITY, "W", 1)
    worker:connect("tcp://localhost:5556")

    while true do
        local msg = zmsg.recv(worker)
        msg:send(worker)
    end
    worker:close()
    context:term()
]]

local broker_task = common_code .. [[
    --  Prepare our context and sockets
    local context = zmq.init(1)
    local frontend = context:socket(zmq.XREP)
    local backend  = context:socket(zmq.XREP)
    frontend:bind("tcp://*:5555")
    backend:bind("tcp://*:5556")

    require"zmq.poller"
    local poller = zmq.poller(2)
    poller:add(frontend, zmq.POLLIN, function()
        local msg = zmsg.recv(frontend)
        --msg[1] = "W"
        msg:pop()
        msg:push("W")
        msg:send(backend)
    end)
    poller:add(backend, zmq.POLLIN, function()
        local msg = zmsg.recv(backend)
        --msg[1] = "C"
        msg:pop()
        msg:push("C")
        msg:send(frontend)
    end)
    poller:start()
    frontend:close()
    backend:close()
    context:term()
]]

s_version_assert(2, 1)

local client = zmq.threads.runstring(nil, client_task)
assert(client:start())
local worker = zmq.threads.runstring(nil, worker_task)
assert(worker:start(true))
local broker = zmq.threads.runstring(nil, broker_task)
assert(broker:start(true))

assert(client:join())


如此这般,效能倒是大大降低了(官网说法是降了近20倍),分析了下原因,由于异步需要管理各条任务,不断轮询之类的原因,反倒降低了性能,那么I/O的异步呢?

异步的客户端api:
local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_timeout(timeout)
    self.timeout = timeout
end

function obj_mt:destroy()
    if self.client then self.client:close() end
    self.context:term()
end

local function s_mdcli_connect_to_broker(self)
    -- close old socket.
    if self.client then
        self.poller:remove(self.client)
        self.client:close()
    end
    self.client = assert(self.context:socket(zmq.XREQ))
    assert(self.client:setopt(zmq.LINGER, 0))
    assert(self.client:connect(self.broker))
    if self.verbose then
        s_console("I: connecting to broker at %s…", self.broker)
    end
    -- add socket to poller
    self.poller:add(self.client, zmq.POLLIN, function()
        self.got_reply = true
    end)
end

--
-- Send request to broker and get reply by hook or crook
--
function obj_mt:send(service, request)
    -- Prefix request with protocol frames
    -- Frame 0: empty (REQ emulation)
    -- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    -- Frame 2: Service name (printable string)
    request:push(service)
    request:push(mdp.MDPC_CLIENT)
    request:push("")
    if self.verbose then
        s_console("I: send request to '%s' service:", service)
        request:dump()
    end
    request:send(self.client)
    return 0
end

--  Returns the reply message or NULL if there was no reply. Does not
--  attempt to recover from a broker failure, this is not possible
--  without storing all unanswered requests and resending them all…
function obj_mt:recv()
    self.got_reply = false

    local cnt = assert(self.poller:poll(self.timeout * 1000))
    if cnt ~= 0 and self.got_reply then
        local msg = zmsg.recv(self.client)
        if self.verbose then
            s_console("I: received reply:")
            msg:dump()
        end
        assert(msg:parts() >= 3)

        local empty = msg:pop()
        assert(empty == "")

        local header = msg:pop()
        assert(header == mdp.MDPC_CLIENT)

        return msg
    end
    if self.verbose then
        s_console("W: permanent error, abandoning request")
    end
    return nil -- Giving up
end

module(…)

function new(broker, verbose)
    s_version_assert (2, 1);
    local self = setmetatable({
        context = zmq.init(1),
        poller = zpoller.new(1),
        broker = broker,
        verbose = verbose,
        timeout = 2500, -- msecs
    }, obj_mt)

    s_mdcli_connect_to_broker(self)
    return self
end

setmetatable(_M, { __call = function(self, …) return new(…) end })


异步的客户端:
require"mdcliapi2"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi2.new("tcp://localhost:5555", verbose)

local count=100000
for n=1,count do
    local request = zmsg.new("Hello world")
    session:send("echo", request)
end
for n=1,count do
    local reply = session:recv()
    if not reply then
        break   --  Interrupted by Ctrl-C
    end
end
printf("%d replies received\n", count)
session:destroy()


当当当当:
$ time mdclient
同步的:
real    0m14.088s
user    0m1.310s
sys     0m2.670s
异步的:
real    0m8.730s
user    0m0.920s
sys     0m1.550s
10个服务端的异步:
real    0m3.863s
user    0m0.730s
sys     0m0.470s

经过测试,4核的话起8个服务端就算饱和了。不过,就效率而言,应该是足够了。
值得注意到是,"异步管家模式"并非全能。由于他没有做管家的连接重试,所以一旦“管家”崩溃了,那自然一切都say goodbye了。

为了服务更靠谱,或许还需要一个叫做"发现服务"的系统,来确认到底有哪些服务可用。
require"mdcliapi"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi.new("tcp://localhost:5555", verbose)

--  This is the service we want to look up
local request = zmsg.new("echo")

--  This is the service we send our request to
local reply = session:send("mmi.service", request)

if (reply) then
    printf ("Lookup echo service: %s\n", reply:body())
else
    printf ("E: no response from broker, make sure it's running\n")
end

session:destroy()



注:以上皆为lua代码

(未完待续)
0
1
分享到:
评论

相关推荐

    zeromq-2.1.7.tar.gz

    总的来说,zeromq-2.1.7提供了在Linux环境下进行高效、可靠的消息传递功能,是构建大规模分布式系统的一个重要工具。虽然这是一个较旧的版本,但在某些特定场景下,旧版本可能更稳定,更适合于已知的工作负载和环境...

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....

    zeromq-4.0.3.tar.gz.zip

    zeromq-4.0.3.tar.gz 是一个包含了 ZeroMQ 4.0.3 版本源代码的压缩文件。ZeroMQ,也被称为“零消息队列”或“0MQ”,是一个开源的消息中间件,它提供了一种高效、灵活且可扩展的方式来处理分布式系统中的数据通信。...

    zeromq-4.2.3.tar.gz

    zeromq-4.2.3.tar.gz 是ZeroMQ 4.2.3版本的源代码包,这个稳定版本确保了良好的兼容性和可靠性。 首先,让我们深入了解ZeroMQ的核心概念和功能: 1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络...

    zeromq-4.1.8.tar.gz

    标题中的"zeromq-4.1.8.tar.gz"指的是ZeroMQ的4.1.8版本的源代码包,通常以tar.gz格式压缩,这是一种在Linux和类Unix系统中常见的归档和压缩方式。 zeromq的核心特性包括点对点连接、发布/订阅模式、请求/响应模式...

    zeromq-2.1.9.tar.gz

    `zeromq-2.1.9.tar.gz` 是zeromq的一个特定版本,即2.1.9版,通常以源码形式提供,需要通过编译来安装。 首先,让我们深入了解zeromq的核心概念。zeromq设计了一个灵活的套接字模型,它允许开发者构建复杂的网络...

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-4.1.3.tar.gz

    zeromq-4.1.3 是一个针对 ZeroMQ 的特定版本的开发工具包,ZeroMQ 是一个开源的消息中间件库,它为分布式计算提供了一种高性能、轻量级的通信框架。ZeroMQ 提供了多种编程语言的绑定,使得开发者能够方便地在不同的...

    zeromq-4.3.4.tar.gz

    0MQ(也称为 ZeroMQ 或 ØMQ)是一个开源的消息中间件,它提供了一种轻量级、高性能的异步消息...通过使用“zeromq-4.3.4.tar.gz”,你可以享受到这个版本带来的稳定性和优化,从而更高效地实现跨进程、跨网络的通信。

    zeromq-4.3.4.zip

    - **错误修复**:修复了前一版本中已知的bug,增强了系统的稳定性和可靠性。 - **新功能**:可能添加了新的API或功能,以满足更广泛的需求。 - **安全性增强**:可能加强了安全措施,比如加密传输或身份验证机制的...

    zeromq-4.2.0.tar.gz源码包

    在zeromq-4.2.0源码包中,你可以找到以下主要组成部分: 1. **源代码**:包含了zeromq的核心库和各种语言的绑定。核心库通常用C++编写,提供了跨平台的API,而绑定则允许开发者使用Python、Java、C#等其他语言与...

    zeromq-4.2.0.tar.zip

    标题中的"zeromq-4.2.0.tar.zip"是指ZeroMQ库的4.2.0版本,它被封装在一个ZIP压缩包中,而内部包含的文件是tar归档格式。ZeroMQ是一个开源的消息中间件,它提供了一个高级的消息队列模型,允许应用程序之间进行高效...

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-4.3.2.tar.gz

    总的来说,zeromq-4.3.2是一个强大且可靠的工具,它简化了网络编程,提高了应用的性能和效率。对于需要进行高效、灵活消息传递的开发者来说,它是一个理想的选择。通过合理利用其特性,开发者可以构建出高并发、可...

    zeromq-4.1.0-rc1.zip

    这个“zeromq-4.1.0-rc1.zip”压缩包包含了ZeroMQ 4.1.0版本的源代码,这是一个预发布版本(Release Candidate),意味着它是正式版发布前的最后一个测试版本。 ZeroMQ的核心概念是提供一种抽象的网络通信层,允许...

    zeromq-3.12.5.zip

    zeromq-3.12.5.tar.gz, libzmq-3.1.2.tar.gz 在Linux环境中,构建和部署分布式计算系统时,Storm是一个常用的选择,它是一个开源的流处理框架,用于实时数据处理。这个压缩包"zeromq-3.12.5.zip"包含了与Storm集群...

    zeromq-4.0.1.tar.gz

    这个“zeromq-4.0.1.tar.gz”文件是ZeroMQ的4.0.1版本源代码包,适用于那些需要在网络通信、并发处理或构建微服务架构的开发者。由于从官方网站下载可能速度较慢,此压缩包提供了方便的下载渠道。 ZeroMQ的核心特性...

    zeromq-3.2.4.tar.gz

    - 可靠性:支持消息确认和重试机制,确保消息的可靠传输。 - 负载均衡:发布/订阅模式下,消息可以广播给所有订阅者,而推送/拉取模式则可以实现负载均衡。 总的来说,zeromq是一个功能强大、性能优异的开源消息...

Global site tag (gtag.js) - Google Analytics