- 浏览: 471856 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
kc_hxd_jp:
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
zeroMQ初体验-14.命名机制 进阶 -
kobe1029:
Map<String, Object> args ...
rabbitmq 队列长度预设的曲线方案 -
Sasoritattoo:
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
nginx + gridfs + mongodb 大事记(残) -
3GQQ2012:
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
zeroMQ初体验-15.应答模式进阶(一)-数据的封装 -
iyuan:
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...
zeroMQ初体验-1.简介及C/S模式
在之前的种种模式中,虽然各擅胜场,可是有一个根本性的问题:一旦数据在某个环节丢失了,那么它就真的丢失了(虽然会有种种重试机制)。而"硬盘模式"即是为了应对这种情况而出现的。
结构图:
是不是非常眼熟?只是对“管家模式”做了些许扩展,就又使得可靠性上升了一个台阶。
总的来说,本模式主要做了三件事:
1.获取请求,给出唯一标识
2.获取答复,按照唯一标识回复
3.回复成功,关闭数据
客户端:
中间件(主体):
注意:官网似乎并不推荐额外的模式存储(比如数据库、kevy-value什么的,原因与效率有关),你可以试试~
结构图:
是不是非常眼熟?只是对“管家模式”做了些许扩展,就又使得可靠性上升了一个台阶。
总的来说,本模式主要做了三件事:
1.获取请求,给出唯一标识
2.获取答复,按照唯一标识回复
3.回复成功,关闭数据
客户端:
// // Titanic client example // Implements client side of http://rfc.zeromq.org/spec:9 // Lets us build this source without creating a library #include "mdcliapi.c" // Calls a TSP service // Returns reponse if successful (status code 200 OK), else NULL // static zmsg_t * s_service_call (mdcli_t *session, char *service, zmsg_t **request_p) { zmsg_t *reply = mdcli_send (session, service, request_p); if (reply) { zframe_t *status = zmsg_pop (reply); if (zframe_streq (status, "200")) { zframe_destroy (&status); return reply; } else if (zframe_streq (status, "400")) { printf ("E: client fatal error, aborting\n"); exit (EXIT_FAILURE); } else if (zframe_streq (status, "500")) { printf ("E: server fatal error, aborting\n"); exit (EXIT_FAILURE); } } else exit (EXIT_SUCCESS); // Interrupted or failed zmsg_destroy (&reply); return NULL; // Didn't succeed, don't care why not } int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose); // 1. Send 'echo' request to Titanic zmsg_t *request = zmsg_new (); zmsg_addstr (request, "echo"); zmsg_addstr (request, "Hello world"); zmsg_t *reply = s_service_call ( session, "titanic.request", &request); zframe_t *uuid = NULL; if (reply) { uuid = zmsg_pop (reply); zmsg_destroy (&reply); zframe_print (uuid, "I: request UUID "); } // 2. Wait until we get a reply while (!zctx_interrupted) { zclock_sleep (100); request = zmsg_new (); zmsg_add (request, zframe_dup (uuid)); zmsg_t *reply = s_service_call ( session, "titanic.reply", &request); if (reply) { char *reply_string = zframe_strdup (zmsg_last (reply)); printf ("Reply: %s\n", reply_string); free (reply_string); zmsg_destroy (&reply); // 3. Close request request = zmsg_new (); zmsg_add (request, zframe_dup (uuid)); reply = s_service_call (session, "titanic.close", &request); zmsg_destroy (&reply); break; } else { printf ("I: no reply yet, trying again…\n"); zclock_sleep (5000); // Try again in 5 seconds } } zframe_destroy (&uuid); mdcli_destroy (&session); return 0; }
中间件(主体):
// // Titanic service // // Implements server side of http://rfc.zeromq.org/spec:9 // Lets us build this source without creating a library #include "mdwrkapi.c" #include "mdcliapi.c" #include "zfile.h" #include <uuid/uuid.h> // Return a new UUID as a printable character string // Caller must free returned string when finished with it static char * s_generate_uuid (void) { char hex_char [] = "0123456789ABCDEF"; char *uuidstr = zmalloc (sizeof (uuid_t) * 2 + 1); uuid_t uuid; uuid_generate (uuid); int byte_nbr; for (byte_nbr = 0; byte_nbr < sizeof (uuid_t); byte_nbr++) { uuidstr [byte_nbr * 2 + 0] = hex_char [uuid [byte_nbr] >> 4]; uuidstr [byte_nbr * 2 + 1] = hex_char [uuid [byte_nbr] & 15]; } return uuidstr; } // Returns freshly allocated request filename for given UUID #define TITANIC_DIR ".titanic" static char * s_request_filename (char *uuid) { char *filename = malloc (256); snprintf (filename, 256, TITANIC_DIR "/%s.req", uuid); return filename; } // Returns freshly allocated reply filename for given UUID static char * s_reply_filename (char *uuid) { char *filename = malloc (256); snprintf (filename, 256, TITANIC_DIR "/%s.rep", uuid); return filename; } // --------------------------------------------------------------------- // Titanic request service static void titanic_request (void *args, zctx_t *ctx, void *pipe) { mdwrk_t *worker = mdwrk_new ( "tcp://localhost:5555", "titanic.request", 0); zmsg_t *reply = NULL; while (TRUE) { // Send reply if it's not null // And then get next request from broker zmsg_t *request = mdwrk_recv (worker, &reply); if (!request) break; // Interrupted, exit // Ensure message directory exists file_mkdir (TITANIC_DIR); // Generate UUID and save message to disk char *uuid = s_generate_uuid (); char *filename = s_request_filename (uuid); FILE *file = fopen (filename, "w"); assert (file); zmsg_save (request, file); fclose (file); free (filename); zmsg_destroy (&request); // Send UUID through to message queue reply = zmsg_new (); zmsg_addstr (reply, uuid); zmsg_send (&reply, pipe); // Now send UUID back to client // Done by the mdwrk_recv() at the top of the loop reply = zmsg_new (); zmsg_addstr (reply, "200"); zmsg_addstr (reply, uuid); free (uuid); } mdwrk_destroy (&worker); } // --------------------------------------------------------------------- // Titanic reply service static void * titanic_reply (void *context) { mdwrk_t *worker = mdwrk_new ( "tcp://localhost:5555", "titanic.reply", 0); zmsg_t *reply = NULL; while (TRUE) { zmsg_t *request = mdwrk_recv (worker, &reply); if (!request) break; // Interrupted, exit char *uuid = zmsg_popstr (request); char *req_filename = s_request_filename (uuid); char *rep_filename = s_reply_filename (uuid); if (file_exists (rep_filename)) { FILE *file = fopen (rep_filename, "r"); assert (file); reply = zmsg_load (file); zmsg_pushstr (reply, "200"); fclose (file); } else { reply = zmsg_new (); if (file_exists (req_filename)) zmsg_pushstr (reply, "300"); //Pending else zmsg_pushstr (reply, "400"); //Unknown } zmsg_destroy (&request); free (uuid); free (req_filename); free (rep_filename); } mdwrk_destroy (&worker); return 0; } // --------------------------------------------------------------------- // Titanic close service static void * titanic_close (void *context) { mdwrk_t *worker = mdwrk_new ( "tcp://localhost:5555", "titanic.close", 0); zmsg_t *reply = NULL; while (TRUE) { zmsg_t *request = mdwrk_recv (worker, &reply); if (!request) break; // Interrupted, exit char *uuid = zmsg_popstr (request); char *req_filename = s_request_filename (uuid); char *rep_filename = s_reply_filename (uuid); file_delete (req_filename); file_delete (rep_filename); free (uuid); free (req_filename); free (rep_filename); zmsg_destroy (&request); reply = zmsg_new (); zmsg_addstr (reply, "200"); } mdwrk_destroy (&worker); return 0; } // Attempt to process a single request, return 1 if successful static int s_service_success (mdcli_t *client, char *uuid) { // Load request message, service will be first frame char *filename = s_request_filename (uuid); FILE *file = fopen (filename, "r"); free (filename); // If the client already closed request, treat as successful if (!file) return 1; zmsg_t *request = zmsg_load (file); fclose (file); zframe_t *service = zmsg_pop (request); char *service_name = zframe_strdup (service); // Use MMI protocol to check if service is available zmsg_t *mmi_request = zmsg_new (); zmsg_add (mmi_request, service); zmsg_t *mmi_reply = mdcli_send (client, "mmi.service", &mmi_request); int service_ok = (mmi_reply && zframe_streq (zmsg_first (mmi_reply), "200")); zmsg_destroy (&mmi_reply); if (service_ok) { zmsg_t *reply = mdcli_send (client, service_name, &request); if (reply) { filename = s_reply_filename (uuid); FILE *file = fopen (filename, "w"); assert (file); zmsg_save (reply, file); fclose (file); free (filename); return 1; } zmsg_destroy (&reply); } else zmsg_destroy (&request); free (service_name); return 0; } int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); zctx_t *ctx = zctx_new (); // Create MDP client session with short timeout mdcli_t *client = mdcli_new ("tcp://localhost:5555", verbose); mdcli_set_timeout (client, 1000); // 1 sec mdcli_set_retries (client, 1); // only 1 retry void *request_pipe = zthread_fork (ctx, titanic_request, NULL); zthread_new (ctx, titanic_reply, NULL); zthread_new (ctx, titanic_close, NULL); // Main dispatcher loop while (TRUE) { // We'll dispatch once per second, if there's no activity zmq_pollitem_t items [] = { { request_pipe, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC); if (rc == -1) break; // Interrupted if (items [0].revents & ZMQ_POLLIN) { // Ensure message directory exists file_mkdir (TITANIC_DIR); // Append UUID to queue, prefixed with '-' for pending zmsg_t *msg = zmsg_recv (request_pipe); if (!msg) break; // Interrupted FILE *file = fopen (TITANIC_DIR "/queue", "a"); char *uuid = zmsg_popstr (msg); fprintf (file, "-%s\n", uuid); fclose (file); free (uuid); zmsg_destroy (&msg); } // Brute-force dispatcher // char entry [] = "?…….:…….:…….:…….:"; FILE *file = fopen (TITANIC_DIR "/queue", "r+"); while (file && fread (entry, 33, 1, file) == 1) { // UUID is prefixed with '-' if still waiting if (entry [0] == '-') { if (verbose) printf ("I: processing request %s\n", entry + 1); if (s_service_success (client, entry + 1)) { // Mark queue entry as processed fseek (file, -33, SEEK_CUR); fwrite ("+", 1, 1, file); fseek (file, 32, SEEK_CUR); } } // Skip end of line, LF or CRLF if (fgetc (file) == '\r') fgetc (file); if (zctx_interrupted) break; } if (file) fclose (file); } mdcli_destroy (&client); return 0; }
注意:官网似乎并不推荐额外的模式存储(比如数据库、kevy-value什么的,原因与效率有关),你可以试试~
发表评论
-
IM选型(初)
2016-08-23 19:12 1643主要参考文章: https://r ... -
关于python和rabbitmq的那点事儿
2011-10-19 14:15 7963rabbitmq是一个消息中间件,在之前的zmq介绍中有略带提 ... -
zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
2011-05-26 16:09 4197服务器: // // Clone server Mod ... -
zeroMQ初体验-33.发布/订阅模式进阶-克隆模式-中
2011-05-26 15:37 2930临时缓存 现实中,比如 ... -
zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
2011-05-26 15:04 3657在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导 ... -
zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
2011-05-25 16:55 2756作为发布/订阅模式的一个常用场景,大数据量的组播是有必要的。虽 ... -
zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
2011-05-25 16:24 4548在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅 ... -
zeroMQ初体验-29.可靠性-自由模式
2011-05-24 17:02 5407好吧,本以为这可能是一个更靠谱的模式,谁知(其实是我一厢情愿了 ... -
zeroMQ初体验-28.可靠性-主从模式
2011-05-23 14:47 5540虽然"硬盘模式" ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:05 5654上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:03 1上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-25.可靠性-偏执的海盗模式
2011-05-05 19:05 3591虽然说“简单的海盗模 ... -
zeroMQ初体验-24.可靠性-简单的海盗模式
2011-05-05 16:41 3216相较于“懒惰的”做了 ... -
zeroMQ初体验-23.可靠性-懒惰的海盗模式
2011-05-05 16:15 5066相较于通常的阻塞模式,这里只是做了一点简单的动作来加强系统的可 ... -
zeroMQ初体验-22.可靠性-总览
2011-04-26 19:25 5938在开篇就从曾对zeromq的可靠性做过质疑,不过,作为一个雄心 ... -
rabbitmq 队列长度预设的曲线方案
2011-04-21 14:36 3400zeromq中倒是直接支持这个功能的。 类似于设定队列长度或 ... -
zeroMQ初体验-21.应答模式进阶(七)-云计算
2011-04-18 19:14 3536这里给出了一个最近很火的"云计算"案例。 ... -
zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
2011-04-18 17:22 3879某些时候,为了冗余的需要,可能会有这样的需求: impo ... -
zeroMQ初体验-19.应答模式进阶(五)-异步式应答
2011-04-15 15:23 4846恩,这应该算是比较实 ... -
zeroMQ初体验-18.应答模式进阶(四)-定制路由3
2011-04-02 15:39 5187从经典到超越经典。 首 ...
相关推荐
总的来说,zeromq-2.1.7提供了在Linux环境下进行高效、可靠的消息传递功能,是构建大规模分布式系统的一个重要工具。虽然这是一个较旧的版本,但在某些特定场景下,旧版本可能更稳定,更适合于已知的工作负载和环境...
这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....
zeromq-4.0.3.tar.gz 是一个包含了 ZeroMQ 4.0.3 版本源代码的压缩文件。ZeroMQ,也被称为“零消息队列”或“0MQ”,是一个开源的消息中间件,它提供了一种高效、灵活且可扩展的方式来处理分布式系统中的数据通信。...
zeromq-4.2.3.tar.gz 是ZeroMQ 4.2.3版本的源代码包,这个稳定版本确保了良好的兼容性和可靠性。 首先,让我们深入了解ZeroMQ的核心概念和功能: 1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络...
标题中的"zeromq-4.1.8.tar.gz"指的是ZeroMQ的4.1.8版本的源代码包,通常以tar.gz格式压缩,这是一种在Linux和类Unix系统中常见的归档和压缩方式。 zeromq的核心特性包括点对点连接、发布/订阅模式、请求/响应模式...
`zeromq-2.1.9.tar.gz` 是zeromq的一个特定版本,即2.1.9版,通常以源码形式提供,需要通过编译来安装。 首先,让我们深入了解zeromq的核心概念。zeromq设计了一个灵活的套接字模型,它允许开发者构建复杂的网络...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
zeromq-4.1.3 是一个针对 ZeroMQ 的特定版本的开发工具包,ZeroMQ 是一个开源的消息中间件库,它为分布式计算提供了一种高性能、轻量级的通信框架。ZeroMQ 提供了多种编程语言的绑定,使得开发者能够方便地在不同的...
0MQ(也称为 ZeroMQ 或 ØMQ)是一个开源的消息中间件,它提供了一种轻量级、高性能的异步消息...通过使用“zeromq-4.3.4.tar.gz”,你可以享受到这个版本带来的稳定性和优化,从而更高效地实现跨进程、跨网络的通信。
- **错误修复**:修复了前一版本中已知的bug,增强了系统的稳定性和可靠性。 - **新功能**:可能添加了新的API或功能,以满足更广泛的需求。 - **安全性增强**:可能加强了安全措施,比如加密传输或身份验证机制的...
在zeromq-4.2.0源码包中,你可以找到以下主要组成部分: 1. **源代码**:包含了zeromq的核心库和各种语言的绑定。核心库通常用C++编写,提供了跨平台的API,而绑定则允许开发者使用Python、Java、C#等其他语言与...
标题中的"zeromq-4.2.0.tar.zip"是指ZeroMQ库的4.2.0版本,它被封装在一个ZIP压缩包中,而内部包含的文件是tar归档格式。ZeroMQ是一个开源的消息中间件,它提供了一个高级的消息队列模型,允许应用程序之间进行高效...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
总的来说,zeromq-4.3.2是一个强大且可靠的工具,它简化了网络编程,提高了应用的性能和效率。对于需要进行高效、灵活消息传递的开发者来说,它是一个理想的选择。通过合理利用其特性,开发者可以构建出高并发、可...
这个“zeromq-4.1.0-rc1.zip”压缩包包含了ZeroMQ 4.1.0版本的源代码,这是一个预发布版本(Release Candidate),意味着它是正式版发布前的最后一个测试版本。 ZeroMQ的核心概念是提供一种抽象的网络通信层,允许...
zeromq-3.12.5.tar.gz, libzmq-3.1.2.tar.gz 在Linux环境中,构建和部署分布式计算系统时,Storm是一个常用的选择,它是一个开源的流处理框架,用于实时数据处理。这个压缩包"zeromq-3.12.5.zip"包含了与Storm集群...
这个“zeromq-4.0.1.tar.gz”文件是ZeroMQ的4.0.1版本源代码包,适用于那些需要在网络通信、并发处理或构建微服务架构的开发者。由于从官方网站下载可能速度较慢,此压缩包提供了方便的下载渠道。 ZeroMQ的核心特性...
- 可靠性:支持消息确认和重试机制,确保消息的可靠传输。 - 负载均衡:发布/订阅模式下,消息可以广播给所有订阅者,而推送/拉取模式则可以实现负载均衡。 总的来说,zeromq是一个功能强大、性能优异的开源消息...