`
iyuan
  • 浏览: 471869 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者

    博客分类:
  • MQ
阅读更多
在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅者的消费速度慢,则会造成发布者端队列堆积,怎么办?本篇即是针对可能出现的"蜗牛"般的订阅者而生。

通常的做法:
在发布端用靠谱的队列承接来不及被消费的信息,这样会增大发布端的压力。
在订阅端用靠谱的队列承接来不及消费的信息,压力转嫁给各订阅端。
与前面相似,在队列中设置阈值,溢出则不收录。
压迫订阅端,当发布端发现订阅端过慢,给予惩罚性质的断开连接。

虽然上述4种方案都还算经典,不过总有欠妥之处。最好的方法莫过于让订阅者明白自个儿能力不足,作出主动性措施,比如:暂时性断开,优化了再来~

至于如何让订阅端检测是否消费能力不足,只需在订阅端设置一个有阈值的队列缓存数据,发布端给所有数据打上标号,如果订阅端读到“断号”,即可认定有数据超出阈值被舍弃了,那么,嘿嘿,主动整改吧。

虽然说中断订阅连接不是太理想,不过,总比不可知,不可控的一味流转数据要可靠的多,只少,都还掌控在自己手中~

//
//  Suicidal Snail
//
#include "czmq.h"

//  ---------------------------------------------------------------------
//  This is our subscriber
//  It connects to the publisher and subscribes to everything. It
//  sleeps for a short time between messages to simulate doing too
//  much work. If a message is more than 1 second late, it croaks.

#define MAX_ALLOWED_DELAY   1000    //  msecs

static void
subscriber (void *args, zctx_t *ctx, void *pipe)
{
    //  Subscribe to everything
    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (subscriber, "tcp://localhost:5556");

    //  Get and process messages
    while (1) {
        char *string = zstr_recv (subscriber);
        int64_t clock;
        int terms = sscanf (string, "%" PRId64, &clock);
        assert (terms == 1);
        free (string);

        //  Suicide snail logic
        if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
            fprintf (stderr, "E: subscriber cannot keep up, aborting\n");
            break;
        }
        //  Work for 1 msec plus some random additional time
        zclock_sleep (1 + randof (2));
    }
    zstr_send (pipe, "gone and died");
}

//  ---------------------------------------------------------------------
//  This is our server task
//  It publishes a time-stamped message to its pub socket every 1ms.

static void
publisher (void *args, zctx_t *ctx, void *pipe)
{
    //  Prepare publisher
    void *publisher = zsocket_new (ctx, ZMQ_PUB);
    zsocket_bind (publisher, "tcp://*:5556");

    while (1) {
        //  Send current clock (msecs) to subscribers
        char string [20];
        sprintf (string, "%" PRId64, zclock_time ());
        zstr_send (publisher, string);
        char *signal = zstr_recv_nowait (pipe);
        if (signal) {
            free (signal);
            break;
        }
        zclock_sleep (1);            //  1msec wait
    }
}

//  This main thread simply starts a client, and a server, and then
//  waits for the client to signal it's died.
//
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *pubpipe = zthread_fork (ctx, publisher, NULL);
    void *subpipe = zthread_fork (ctx, subscriber, NULL);
    free (zstr_recv (subpipe));
    zstr_send (pubpipe, "break");
    zclock_sleep (100);
    zctx_destroy (&ctx);
    return 0;
}

注意:
这里并未用到数据编号,而是打上了时间戳来判定数据是否延迟到不可接受的地步了,如何自我判定,方法万千,择优即可。

(未完待续)
0
0
分享到:
评论

相关推荐

    zeromq-2.1.7.tar.gz

    ZeroMQ支持多种协议,如PUB/SUB(发布/订阅)、REQ/REP(请求/响应)和PUSH/PULL(推送/拉取),这些模式提供了灵活的消息传递模型。此外,它还提供了一些高级特性,比如负载均衡、故障恢复和多路复用。 在实际使用...

    zeromq-4.0.3.tar.gz.zip

    6. **分布式**:ZeroMQ 支持多对多的发布/订阅模式,使得构建分布式系统更加容易。 zeromq-4.0.3.tar.gz 文件本身是一个 tar 归档,通常用于在 Unix-like 系统上打包和存储文件。归档后,文件被压缩为 .gz 格式,这...

    zeromq-4.3.4.tar.gz

    这个“zeromq-4.3.4.tar.gz”文件是0MQ库的4.3.4稳定版本,发布于2021年1月17日。下面我们将深入探讨0MQ的核心特性、主要功能以及如何使用这一版本。 1. **0MQ简介** - 0MQ不是一个传统的消息队列系统,而是一种在...

    zeromq-2.1.9.tar.gz

    这些套接字支持多种消息模式,包括请求/响应、发布/订阅、推送/拉取以及对等模式,使得开发者可以轻松地处理点对点、一对多和多对多的通信场景。 zeromq的安装过程一般包括解压、配置、编译和安装四个步骤: 1. **...

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....

    zeromq-4.1.8.tar.gz

    zeromq的核心特性包括点对点连接、发布/订阅模式、请求/响应模式以及推拉模式,这些模式为各种通信场景提供了基础。4.1.8版本可能包含了一些错误修复、性能提升或者新功能的添加,具体更新内容可以在其官方 change...

    zeromq-4.2.3.tar.gz

    1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络编程中的概念,它们提供了多种模式,如PUB(发布者)、SUB(订阅者)、REQ(请求者)、REP(响应者)、DEALER(经销商)和ROUTER(路由器),这些模式...

    zeromq-4.1.3.tar.gz

    1. **消息队列模型**:ZeroMQ 提供了多种消息模式,如 Pub/Sub(发布/订阅)、Req/Rep(请求/响应)、PUSH/PULL(推/拉)和 XPUB/XSUB(扩展发布/订阅)。这些模式覆盖了各种通信场景,让开发者能够灵活选择最适合其...

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-4.3.4.zip

    3. **模式丰富**:支持多种消息模式,如发布/订阅、请求/应答、推送/拉取和一对多等,这些模式可以灵活地组合以适应不同的应用场景。 4. **高可用性**:具有消息持久化和负载均衡功能,确保在故障情况下仍能保持...

    zeromq-4.2.0.tar.gz源码包

    zeromq的设计基于发布/订阅(Pub/Sub)、请求/应答(Req/Rep)和推送/拉取(Push/Pull)等经典的消息模式。这些模式覆盖了各种常见的分布式系统通信场景,如服务间调用、事件驱动架构、工作队列等。通过这些模式,...

    zeromq-4.2.0.tar.zip

    ZeroMQ的核心概念是Socket(套接字),它提供了类似于网络编程中的套接字接口,但增加了许多高级功能,比如消息队列、负载均衡、高可用性和订阅/发布模式等。这些特性使得ZeroMQ成为构建分布式系统、微服务架构和...

    zeromq-3.2.4.tar.gz

    - 负载均衡:发布/订阅模式下,消息可以广播给所有订阅者,而推送/拉取模式则可以实现负载均衡。 总的来说,zeromq是一个功能强大、性能优异的开源消息中间件,对于需要高效、可靠通信的分布式系统,如storm这样的...

    zeromq-4.1.0-rc1.zip

    这个“zeromq-4.1.0-rc1.zip”压缩包包含了ZeroMQ 4.1.0版本的源代码,这是一个预发布版本(Release Candidate),意味着它是正式版发布前的最后一个测试版本。 ZeroMQ的核心概念是提供一种抽象的网络通信层,允许...

    zeromq-4.3.2.tar.gz

    首先,zeromq支持多种协议,包括TCP、IPC(进程间通信)、PUB/SUB(发布/订阅)、REQ/REP(请求/响应)、PAIR(对等)和DEALER/ROUTER(经销商/路由器)模式。这些模式为不同场景提供了灵活的选择,比如PUB/SUB模式...

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-3.12.5.zip

    在Storm中,零MQ作为数据传输的基础,它支持发布/订阅、请求/响应以及推拉等多种消息模式,使得Storm能够有效地处理实时数据流并确保数据的可靠传输。版本3.12.5是该库的一个稳定版本,可能包含了一些性能优化和bug...

    zeromq-4.0.1.tar.gz

    2. **模式丰富**:支持多种消息模式,如发布/订阅(Publish/Subscribe)、请求/响应(Request/Reply)、推拉(Push/Pull)以及管道对(Pair)。这些模式可以灵活地适应不同的应用场景。 3. **高并发**:ZeroMQ利用I...

Global site tag (gtag.js) - Google Analytics