`
iyuan
  • 浏览: 469425 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-25.可靠性-偏执的海盗模式

    博客分类:
  • MQ
阅读更多
虽然说“简单的海盗模式”已经非常靠谱了,不过瑕疵还是有不少的。比如说,中间件队列并不监控后端的worker死活,至少会有一次丢包来确定那个worker已经不在了(虽然问题不大,但终究不爽)。而在“偏执的”模式中,有对“简单”模式做了一些扩展:


Queue:
require"zmq"
require"zmq.poller"
require"zmsg"

local MAX_WORKERS          = 100
local HEARTBEAT_LIVENESS   = 3       --  3-5 is reasonable
local HEARTBEAT_INTERVAL   = 1000    --  msecs

local tremove = table.remove

--  Insert worker at end of queue, reset expiry
--  Worker must not already be in queue
local function s_worker_append(queue, identity)
    if queue[identity] then
        printf ("E: duplicate worker identity %s", identity)
    else
        assert (#queue < MAX_WORKERS)
        queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
        queue[#queue + 1] = identity
    end
end
--  Remove worker from queue, if present
local function s_worker_delete(queue, identity)
    for i=1,#queue do
        if queue == identity then
            tremove(queue, i)
            break
        end
    end
    queue[identity] = nil
end
--  Reset worker expiry, worker must be present
local function s_worker_refresh(queue, identity)
    if queue[identity] then
        queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
    else
        printf("E: worker %s not ready\n", identity)
    end
end
--  Pop next available worker off queue, return identity
local function s_worker_dequeue(queue)
    assert (#queue > 0)
    local identity = tremove(queue, 1)
    queue[identity] = nil
    return identity
end
--  Look for & kill expired workers
local function s_queue_purge(queue)
    local curr_clock = s_clock()
    --  Work backwards from end to simplify removal
    for i=#queue,1,-1 do
        local id = queue
        if (curr_clock > queue[id]) then
            tremove(queue, i)
            queue[id] = nil
        end
    end
end
s_version_assert (2, 1)

--  Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.XREP)
local backend  = context:socket(zmq.XREP)
frontend:bind("tcp://*:5555");    --  For clients
backend:bind("tcp://*:5556");    --  For workers

--  Queue of available workers
local queue = {}
local is_accepting = false

--  Send out heartbeats at regular intervals
local heartbeat_at = s_clock() + HEARTBEAT_INTERVAL

local poller = zmq.poller(2)

local function frontend_cb()
    --  Now get next client request, route to next worker
    local msg = zmsg.recv(frontend)
    local identity = s_worker_dequeue (queue)
    msg:push(identity)
    msg:send(backend)

    if (#queue == 0) then
        -- stop accepting work from clients, when no workers are available.
        poller:remove(frontend)
        is_accepting = false
    end
end

--  Handle worker activity on backend
poller:add(backend, zmq.POLLIN, function()
    local msg = zmsg.recv(backend)
    local identity = msg:unwrap()

    --  Return reply to client if it's not a control message
    if (msg:parts() == 1) then
        if (msg:address() == "READY") then
            s_worker_delete(queue, identity)
            s_worker_append(queue, identity)
        elseif (msg:address() == "HEARTBEAT") then
            s_worker_refresh(queue, identity)
        else
            printf("E: invalid message from %s\n", identity)
            msg:dump()
        end
    else
        -- reply for client.
        msg:send(frontend)
        s_worker_append(queue, identity)
    end

    -- start accepting client requests, if we are not already doing so.
    if not is_accepting and #queue > 0 then
        is_accepting = true
        poller:add(frontend, zmq.POLLIN, frontend_cb)
    end
end)

-- start poller's event loop
while true do
    local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
    --  Send heartbeats to idle workers if it's time
    if (s_clock() > heartbeat_at) then
        for i=1,#queue do
            local msg = zmsg.new("HEARTBEAT")
            msg:wrap(queue, nil)
            msg:send(backend)
        end
        heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
    end
    s_queue_purge(queue)
end

--  We never exit the main loop
--  But pretend to do the right shutdown anyhow
while (#queue > [[span style="color:#666666"]]0) [[span style="color:#008000"]]do
    s_worker_dequeue(queue)
[[span style="color:#008000"]]end

frontend:close()
backend:close()

worker:
require"zmq"
require"zmq.poller"
require"zmsg"

local HEARTBEAT_LIVENESS   = 3       --  3-5 is reasonable
local HEARTBEAT_INTERVAL   = 1000    --  msecs
local INTERVAL_INIT        = 1000    --  Initial reconnect
local INTERVAL_MAX        = 32000    --  After exponential backoff

--  Helper function that returns a new configured socket
--  connected to the Hello World server
--
local identity

local function s_worker_socket (context)
    local worker = context:socket(zmq.XREQ)

    --  Set random identity to make tracing easier
    identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
    worker:setopt(zmq.IDENTITY, identity)
    worker:connect("tcp://localhost:5556")

    --  Configure socket to not wait at close time
    worker:setopt(zmq.LINGER, 0)

    --  Tell queue we're ready for work
    printf("I: (%s) worker ready\n", identity)
    worker:send("READY")

    return worker
end

s_version_assert (2, 1)
math.randomseed(os.time())

local context = zmq.init(1)
local worker = s_worker_socket (context)

--  If liveness hits zero, queue is considered disconnected
local liveness = HEARTBEAT_LIVENESS
local interval = INTERVAL_INIT

--  Send out heartbeats at regular intervals
local heartbeat_at = s_clock () + HEARTBEAT_INTERVAL

local poller = zmq.poller(1)

local is_running = true

local cycles = 0
local function worker_cb()
    --  Get message
    --  - 3-part envelope + content -> request
    --  - 1-part "HEARTBEAT" -> heartbeat
    local msg = zmsg.recv (worker)

    if (msg:parts() == 3) then
        --  Simulate various problems, after a few cycles
        cycles = cycles + 1
        if (cycles > 3 and randof (5) == 0) then
            printf ("I: (%s) simulating a crash\n", identity)
            is_running = false
            return
        elseif (cycles > 3 and randof (5) == 0) then
            printf ("I: (%s) simulating CPU overload\n",
                identity)
            s_sleep (5000)
        end
        printf ("I: (%s) normal reply - %s\n",
            identity, msg:body())
        msg:send(worker)
        liveness = HEARTBEAT_LIVENESS
        s_sleep(1000);           --  Do some heavy work
    elseif (msg:parts() == 1 and msg:body() == "HEARTBEAT") then
        liveness = HEARTBEAT_LIVENESS
    else
        printf ("E: (%s) invalid message\n", identity)
        msg:dump()
    end
    interval = INTERVAL_INIT
end
poller:add(worker, zmq.POLLIN, worker_cb)

while is_running do
    local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))

    if (cnt == 0) then
        liveness = liveness - 1
        if (liveness == 0) then
            printf ("W: (%s) heartbeat failure, can't reach queue\n",
                identity)
            printf ("W: (%s) reconnecting in %d msec…\n",
                identity, interval)
            s_sleep (interval)
    
            if (interval < INTERVAL_MAX) then
                interval = interval * 2
            end
            poller:remove(worker)
            worker:close()
            worker = s_worker_socket (context)
            poller:add(worker, zmq.POLLIN, worker_cb)
            liveness = HEARTBEAT_LIVENESS
        end
    end
    --  Send heartbeat to queue if it's time
    if (s_clock () > heartbeat_at) then
        heartbeat_at = s_clock () + HEARTBEAT_INTERVAL
        printf("I: (%s) worker heartbeat\n", identity)
        worker:send("HEARTBEAT")
    end
end
worker:close()
context:term()

注意:这里的是lua代码

其实从模式图中已经可以看出,系统中多了“心跳”环节,来确认链路的可用性。

关于心跳模块,着实比较棘手,也算是代码中的重头了。关于做“心跳”的策略,关键是要把握好时间间隔,以避免过载或者失效。通常的,也不建议在持久化的连接上加入心跳机制。

这里应当注意到,“偏执”模式与“简单”模式并不兼容--因为心跳机制。
为了避免混乱。 rfc.zeromq.org这儿有一些协议的声明,帮助你至少不需要去看现有的代码来确定是否兼容新的东东~

(未完待续)
分享到:
评论

相关推荐

    zeromq-2.1.7.tar.gz

    总的来说,zeromq-2.1.7提供了在Linux环境下进行高效、可靠的消息传递功能,是构建大规模分布式系统的一个重要工具。虽然这是一个较旧的版本,但在某些特定场景下,旧版本可能更稳定,更适合于已知的工作负载和环境...

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....

    zeromq-4.0.3.tar.gz.zip

    zeromq-4.0.3.tar.gz 是一个包含了 ZeroMQ 4.0.3 版本源代码的压缩文件。ZeroMQ,也被称为“零消息队列”或“0MQ”,是一个开源的消息中间件,它提供了一种高效、灵活且可扩展的方式来处理分布式系统中的数据通信。...

    zeromq-4.2.3.tar.gz

    zeromq-4.2.3.tar.gz 是ZeroMQ 4.2.3版本的源代码包,这个稳定版本确保了良好的兼容性和可靠性。 首先,让我们深入了解ZeroMQ的核心概念和功能: 1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络...

    zeromq-4.1.8.tar.gz

    标题中的"zeromq-4.1.8.tar.gz"指的是ZeroMQ的4.1.8版本的源代码包,通常以tar.gz格式压缩,这是一种在Linux和类Unix系统中常见的归档和压缩方式。 zeromq的核心特性包括点对点连接、发布/订阅模式、请求/响应模式...

    zeromq-2.1.9.tar.gz

    `zeromq-2.1.9.tar.gz` 是zeromq的一个特定版本,即2.1.9版,通常以源码形式提供,需要通过编译来安装。 首先,让我们深入了解zeromq的核心概念。zeromq设计了一个灵活的套接字模型,它允许开发者构建复杂的网络...

    zeromq-4.3.4.zip

    - **错误修复**:修复了前一版本中已知的bug,增强了系统的稳定性和可靠性。 - **新功能**:可能添加了新的API或功能,以满足更广泛的需求。 - **安全性增强**:可能加强了安全措施,比如加密传输或身份验证机制的...

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-4.1.3.tar.gz

    zeromq-4.1.3 是一个针对 ZeroMQ 的特定版本的开发工具包,ZeroMQ 是一个开源的消息中间件库,它为分布式计算提供了一种高性能、轻量级的通信框架。ZeroMQ 提供了多种编程语言的绑定,使得开发者能够方便地在不同的...

    zeromq-4.3.4.tar.gz

    0MQ(也称为 ZeroMQ 或 ØMQ)是一个开源的消息中间件,它提供了一种轻量级、高性能的异步消息...通过使用“zeromq-4.3.4.tar.gz”,你可以享受到这个版本带来的稳定性和优化,从而更高效地实现跨进程、跨网络的通信。

    zeromq-4.2.0.tar.gz源码包

    在zeromq-4.2.0源码包中,你可以找到以下主要组成部分: 1. **源代码**:包含了zeromq的核心库和各种语言的绑定。核心库通常用C++编写,提供了跨平台的API,而绑定则允许开发者使用Python、Java、C#等其他语言与...

    zeromq-4.2.0.tar.zip

    标题中的"zeromq-4.2.0.tar.zip"是指ZeroMQ库的4.2.0版本,它被封装在一个ZIP压缩包中,而内部包含的文件是tar归档格式。ZeroMQ是一个开源的消息中间件,它提供了一个高级的消息队列模型,允许应用程序之间进行高效...

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-4.3.2.tar.gz

    总的来说,zeromq-4.3.2是一个强大且可靠的工具,它简化了网络编程,提高了应用的性能和效率。对于需要进行高效、灵活消息传递的开发者来说,它是一个理想的选择。通过合理利用其特性,开发者可以构建出高并发、可...

    zeromq-4.1.0-rc1.zip

    这个“zeromq-4.1.0-rc1.zip”压缩包包含了ZeroMQ 4.1.0版本的源代码,这是一个预发布版本(Release Candidate),意味着它是正式版发布前的最后一个测试版本。 ZeroMQ的核心概念是提供一种抽象的网络通信层,允许...

    zeromq-3.12.5.zip

    zeromq-3.12.5.tar.gz, libzmq-3.1.2.tar.gz 在Linux环境中,构建和部署分布式计算系统时,Storm是一个常用的选择,它是一个开源的流处理框架,用于实时数据处理。这个压缩包"zeromq-3.12.5.zip"包含了与Storm集群...

    zeromq-4.0.1.tar.gz

    这个“zeromq-4.0.1.tar.gz”文件是ZeroMQ的4.0.1版本源代码包,适用于那些需要在网络通信、并发处理或构建微服务架构的开发者。由于从官方网站下载可能速度较慢,此压缩包提供了方便的下载渠道。 ZeroMQ的核心特性...

    zeromq-3.2.4.tar.gz

    - 可靠性:支持消息确认和重试机制,确保消息的可靠传输。 - 负载均衡:发布/订阅模式下,消息可以广播给所有订阅者,而推送/拉取模式则可以实现负载均衡。 总的来说,zeromq是一个功能强大、性能优异的开源消息...

Global site tag (gtag.js) - Google Analytics