- 浏览: 471853 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
kc_hxd_jp:
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
zeroMQ初体验-14.命名机制 进阶 -
kobe1029:
Map<String, Object> args ...
rabbitmq 队列长度预设的曲线方案 -
Sasoritattoo:
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
nginx + gridfs + mongodb 大事记(残) -
3GQQ2012:
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
zeroMQ初体验-15.应答模式进阶(一)-数据的封装 -
iyuan:
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...
zeroMQ初体验-1.简介及C/S模式
临时缓存
现实中,比如DNS服务器,可以算是一个典型案例。临时存储一个节点,如果节点小时,那么该存储也会随之灰飞,所以,"过期"也是一个靠谱的需求。
由于有新的需求提出,我们的key-value库也得做些相应的改动:
服务器:
可靠性
任何东东,一旦脱离的 可靠性,那么终将只是个玩具,虽然它也会带来无尽的麻烦。
之前的"主从模式"似乎不错,再稍微变动下,把“从”也当作客户端之一,接受、存储所有的更新,是不是有点靠谱的感觉。
主从的状态机制模型图:
机制为:客户端发现主服务器不返回了,那么就向从服务器请求,从服务器确认主服务器的确不行了,于是就上马充当服务器。主回归后,从再变为一个客户端。
客户端:
客户端api:
(未完待续)
现实中,比如DNS服务器,可以算是一个典型案例。临时存储一个节点,如果节点小时,那么该存储也会随之灰飞,所以,"过期"也是一个靠谱的需求。
由于有新的需求提出,我们的key-value库也得做些相应的改动:
/* ===================================================================== kvmsg - key-value message class for example applications --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "kvmsg.h" #include <uuid/uuid.h> #include "zlist.h" // Keys are short strings #define KVMSG_KEY_MAX 255 // Message is formatted on wire as 4 frames: // frame 0: key (0MQ string) // frame 1: sequence (8 bytes, network order) // frame 2: uuid (blob, 16 bytes) // frame 3: properties (0MQ string) // frame 4: body (blob) #define FRAME_KEY 0 #define FRAME_SEQ 1 #define FRAME_UUID 2 #define FRAME_PROPS 3 #define FRAME_BODY 4 #define KVMSG_FRAMES 5 // Structure of our class struct _kvmsg { // Presence indicators for each frame int present [KVMSG_FRAMES]; // Corresponding 0MQ message frames, if any zmq_msg_t frame [KVMSG_FRAMES]; // Key, copied into safe C string char key [KVMSG_KEY_MAX + 1]; // List of properties, as name=value strings zlist_t *props; size_t props_size; }; // Serialize list of properties to a message frame static void s_encode_props (kvmsg_t *self) { zmq_msg_t *msg = &self->frame [FRAME_PROPS]; if (self->present [FRAME_PROPS]) zmq_msg_close (msg); zmq_msg_init_size (msg, self->props_size); char *prop = zlist_first (self->props); char *dest = (char *) zmq_msg_data (msg); while (prop) { strcpy (dest, prop); dest += strlen (prop); *dest++ = '\n'; prop = zlist_next (self->props); } self->present [FRAME_PROPS] = 1; } // Rebuild properties list from message frame static void s_decode_props (kvmsg_t *self) { zmq_msg_t *msg = &self->frame [FRAME_PROPS]; self->props_size = 0; while (zlist_size (self->props)) free (zlist_pop (self->props)); size_t remainder = zmq_msg_size (msg); char *prop = (char *) zmq_msg_data (msg); char *eoln = memchr (prop, '\n', remainder); while (eoln) { *eoln = 0; zlist_append (self->props, strdup (prop)); self->props_size += strlen (prop) + 1; remainder -= strlen (prop) + 1; prop = eoln + 1; eoln = memchr (prop, '\n', remainder); } } // --------------------------------------------------------------------- // Constructor, sets sequence as provided kvmsg_t * kvmsg_new (int64_t sequence) { kvmsg_t *self; self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t)); self->props = zlist_new (); kvmsg_set_sequence (self, sequence); return self; } // --------------------------------------------------------------------- // Destructor // Free shim, compatible with zhash_free_fn void kvmsg_free (void *ptr) { if (ptr) { kvmsg_t *self = (kvmsg_t *) ptr; // Destroy message frames if any int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) if (self->present [frame_nbr]) zmq_msg_close (&self->frame [frame_nbr]); // Destroy property list while (zlist_size (self->props)) free (zlist_pop (self->props)); zlist_destroy (&self->props); // Free object itself free (self); } } void kvmsg_destroy (kvmsg_t **self_p) { assert (self_p); if (*self_p) { kvmsg_free (*self_p); *self_p = NULL; } } // --------------------------------------------------------------------- // Create duplicate of kvmsg kvmsg_t * kvmsg_dup (kvmsg_t *self) { kvmsg_t *kvmsg = kvmsg_new (0); int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { if (self->present [frame_nbr]) { zmq_msg_t *src = &self->frame [frame_nbr]; zmq_msg_t *dst = &kvmsg->frame [frame_nbr]; zmq_msg_init_size (dst, zmq_msg_size (src)); memcpy (zmq_msg_data (dst), zmq_msg_data (src), zmq_msg_size (src)); kvmsg->present [frame_nbr] = 1; } } kvmsg->props = zlist_copy (self->props); return kvmsg; } // --------------------------------------------------------------------- // Reads key-value message from socket, returns new kvmsg instance. kvmsg_t * kvmsg_recv (void *socket) { assert (socket); kvmsg_t *self = kvmsg_new (0); // Read all frames off the wire, reject if bogus int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { if (self->present [frame_nbr]) zmq_msg_close (&self->frame [frame_nbr]); zmq_msg_init (&self->frame [frame_nbr]); self->present [frame_nbr] = 1; if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) { kvmsg_destroy (&self); break; } // Verify multipart framing int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0; if (zsockopt_rcvmore (socket) != rcvmore) { kvmsg_destroy (&self); break; } } if (self) s_decode_props (self); return self; } // --------------------------------------------------------------------- // Send key-value message to socket; any empty frames are sent as such. void kvmsg_send (kvmsg_t *self, void *socket) { assert (self); assert (socket); s_encode_props (self); int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { zmq_msg_t copy; zmq_msg_init (©); if (self->present [frame_nbr]) zmq_msg_copy (©, &self->frame [frame_nbr]); zmq_sendmsg (socket, ©, (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0); zmq_msg_close (©); } } // --------------------------------------------------------------------- // Return key from last read message, if any, else NULL char * kvmsg_key (kvmsg_t *self) { assert (self); if (self->present [FRAME_KEY]) { if (!*self->key) { size_t size = zmq_msg_size (&self->frame [FRAME_KEY]); if (size > KVMSG_KEY_MAX) size = KVMSG_KEY_MAX; memcpy (self->key, zmq_msg_data (&self->frame [FRAME_KEY]), size); self->key [size] = 0; } return self->key; } else return NULL; } // --------------------------------------------------------------------- // Return sequence nbr from last read message, if any int64_t kvmsg_sequence (kvmsg_t *self) { assert (self); if (self->present [FRAME_SEQ]) { assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8); byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]); int64_t sequence = ((int64_t) (source [0]) << 56) + ((int64_t) (source [1]) << 48) + ((int64_t) (source [2]) << 40) + ((int64_t) (source [3]) << 32) + ((int64_t) (source [4]) << 24) + ((int64_t) (source [5]) << 16) + ((int64_t) (source [6]) << 8) + (int64_t) (source [7]); return sequence; } else return 0; } // --------------------------------------------------------------------- // Return UUID from last read message, if any, else NULL byte * kvmsg_uuid (kvmsg_t *self) { assert (self); if (self->present [FRAME_UUID] && zmq_msg_size (&self->frame [FRAME_UUID]) == sizeof (uuid_t)) return (byte *) zmq_msg_data (&self->frame [FRAME_UUID]); else return NULL; } // --------------------------------------------------------------------- // Return body from last read message, if any, else NULL byte * kvmsg_body (kvmsg_t *self) { assert (self); if (self->present [FRAME_BODY]) return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]); else return NULL; } // --------------------------------------------------------------------- // Return body size from last read message, if any, else zero size_t kvmsg_size (kvmsg_t *self) { assert (self); if (self->present [FRAME_BODY]) return zmq_msg_size (&self->frame [FRAME_BODY]); else return 0; } // --------------------------------------------------------------------- // Set message key as provided void kvmsg_set_key (kvmsg_t *self, char *key) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_KEY]; if (self->present [FRAME_KEY]) zmq_msg_close (msg); zmq_msg_init_size (msg, strlen (key)); memcpy (zmq_msg_data (msg), key, strlen (key)); self->present [FRAME_KEY] = 1; } // --------------------------------------------------------------------- // Set message sequence number void kvmsg_set_sequence (kvmsg_t *self, int64_t sequence) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_SEQ]; if (self->present [FRAME_SEQ]) zmq_msg_close (msg); zmq_msg_init_size (msg, 8); byte *source = zmq_msg_data (msg); source [0] = (byte) ((sequence >> 56) & 255); source [1] = (byte) ((sequence >> 48) & 255); source [2] = (byte) ((sequence >> 40) & 255); source [3] = (byte) ((sequence >> 32) & 255); source [4] = (byte) ((sequence >> 24) & 255); source [5] = (byte) ((sequence >> 16) & 255); source [6] = (byte) ((sequence >> 8) & 255); source [7] = (byte) ((sequence) & 255); self->present [FRAME_SEQ] = 1; } // --------------------------------------------------------------------- // Set message UUID to generated value void kvmsg_set_uuid (kvmsg_t *self) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_UUID]; uuid_t uuid; uuid_generate (uuid); if (self->present [FRAME_UUID]) zmq_msg_close (msg); zmq_msg_init_size (msg, sizeof (uuid)); memcpy (zmq_msg_data (msg), uuid, sizeof (uuid)); self->present [FRAME_UUID] = 1; } // --------------------------------------------------------------------- // Set message body void kvmsg_set_body (kvmsg_t *self, byte *body, size_t size) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_BODY]; if (self->present [FRAME_BODY]) zmq_msg_close (msg); self->present [FRAME_BODY] = 1; zmq_msg_init_size (msg, size); memcpy (zmq_msg_data (msg), body, size); } // --------------------------------------------------------------------- // Set message key using printf format void kvmsg_fmt_key (kvmsg_t *self, char *format, …) { char value [KVMSG_KEY_MAX + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, KVMSG_KEY_MAX, format, args); va_end (args); kvmsg_set_key (self, value); } // --------------------------------------------------------------------- // Set message body using printf format void kvmsg_fmt_body (kvmsg_t *self, char *format, …) { char value [255 + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, 255, format, args); va_end (args); kvmsg_set_body (self, (byte *) value, strlen (value)); } // --------------------------------------------------------------------- // Get message property, if set, else "" char * kvmsg_get_prop (kvmsg_t *self, char *name) { assert (strchr (name, '=') == NULL); char *prop = zlist_first (self->props); size_t namelen = strlen (name); while (prop) { if (strlen (prop) > namelen && memcmp (prop, name, namelen) == 0 && prop [namelen] == '=') return prop + namelen + 1; prop = zlist_next (self->props); } return ""; } // --------------------------------------------------------------------- // Set message property // Names cannot contain '='. Max length of value is 255 chars. void kvmsg_set_prop (kvmsg_t *self, char *name, char *format, …) { assert (strchr (name, '=') == NULL); char value [255 + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, 255, format, args); va_end (args); // Allocate name=value string char *prop = malloc (strlen (name) + strlen (value) + 2); // Remove existing property if any sprintf (prop, "%s=", name); char *existing = zlist_first (self->props); while (existing) { if (memcmp (prop, existing, strlen (prop)) == 0) { self->props_size -= strlen (existing) + 1; zlist_remove (self->props, existing); free (existing); break; } existing = zlist_next (self->props); } // Add new name=value property string strcat (prop, value); zlist_append (self->props, prop); self->props_size += strlen (prop) + 1; } // --------------------------------------------------------------------- // Store entire kvmsg into hash map, if key/value are set. // Nullifies kvmsg reference, and destroys automatically when no longer // needed. If value is empty, deletes any previous value from store. void kvmsg_store (kvmsg_t **self_p, zhash_t *hash) { assert (self_p); if (*self_p) { kvmsg_t *self = *self_p; assert (self); if (kvmsg_size (self)) { if (self->present [FRAME_KEY] && self->present [FRAME_BODY]) { zhash_update (hash, kvmsg_key (self), self); zhash_freefn (hash, kvmsg_key (self), kvmsg_free); } } else zhash_delete (hash, kvmsg_key (self)); *self_p = NULL; } } // --------------------------------------------------------------------- // Dump message to stderr, for debugging and tracing void kvmsg_dump (kvmsg_t *self) { if (self) { if (!self) { fprintf (stderr, "NULL"); return; } size_t size = kvmsg_size (self); byte *body = kvmsg_body (self); fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self)); fprintf (stderr, "[key:%s]", kvmsg_key (self)); fprintf (stderr, "[size:%zd] ", size); if (zlist_size (self->props)) { fprintf (stderr, "["); char *prop = zlist_first (self->props); while (prop) { fprintf (stderr, "%s;", prop); prop = zlist_next (self->props); } fprintf (stderr, "]"); } int char_nbr; for (char_nbr = 0; char_nbr < size; char_nbr++) fprintf (stderr, "%02X", body [char_nbr]); fprintf (stderr, "\n"); } else fprintf (stderr, "NULL message\n"); } // --------------------------------------------------------------------- // Runs self test of class int kvmsg_test (int verbose) { kvmsg_t *kvmsg; printf (" * kvmsg: "); // Prepare our context and sockets zctx_t *ctx = zctx_new (); void *output = zsocket_new (ctx, ZMQ_DEALER); int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc"); assert (rc == 0); void *input = zsocket_new (ctx, ZMQ_DEALER); rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc"); assert (rc == 0); zhash_t *kvmap = zhash_new (); // Test send and receive of simple message kvmsg = kvmsg_new (1); kvmsg_set_key (kvmsg, "key"); kvmsg_set_uuid (kvmsg); kvmsg_set_body (kvmsg, (byte *) "body", 4); if (verbose) kvmsg_dump (kvmsg); kvmsg_send (kvmsg, output); kvmsg_store (&kvmsg, kvmap); kvmsg = kvmsg_recv (input); if (verbose) kvmsg_dump (kvmsg); assert (streq (kvmsg_key (kvmsg), "key")); kvmsg_store (&kvmsg, kvmap); // Test send and receive of message with properties kvmsg = kvmsg_new (2); kvmsg_set_prop (kvmsg, "prop1", "value1"); kvmsg_set_prop (kvmsg, "prop2", "value1"); kvmsg_set_prop (kvmsg, "prop2", "value2"); kvmsg_set_key (kvmsg, "key"); kvmsg_set_uuid (kvmsg); kvmsg_set_body (kvmsg, (byte *) "body", 4); assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2")); if (verbose) kvmsg_dump (kvmsg); kvmsg_send (kvmsg, output); kvmsg_destroy (&kvmsg); kvmsg = kvmsg_recv (input); if (verbose) kvmsg_dump (kvmsg); assert (streq (kvmsg_key (kvmsg), "key")); assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2")); kvmsg_destroy (&kvmsg); // Shutdown and destroy all objects zhash_destroy (&kvmap); zctx_destroy (&ctx); printf ("OK\n"); return 0; }
服务器:
// // Clone server Model Five // // Lets us build this source without creating a library #include "kvmsg.c" // zloop reactor handlers static int s_snapshots (zloop_t *loop, void *socket, void *args); static int s_collector (zloop_t *loop, void *socket, void *args); static int s_flush_ttl (zloop_t *loop, void *socket, void *args); // Our server is defined by these properties typedef struct { zctx_t *ctx; // Context wrapper zhash_t *kvmap; // Key-value store zloop_t *loop; // zloop reactor int port; // Main port we're working on int64_t sequence; // How many updates we're at void *snapshot; // Handle snapshot requests void *publisher; // Publish updates to clients void *collector; // Collect updates from clients } clonesrv_t; int main (void) { clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t)); self->port = 5556; self->ctx = zctx_new (); self->kvmap = zhash_new (); self->loop = zloop_new (); zloop_set_verbose (self->loop, FALSE); // Set up our clone server sockets self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER); self->publisher = zsocket_new (self->ctx, ZMQ_PUB); self->collector = zsocket_new (self->ctx, ZMQ_PULL); zsocket_bind (self->snapshot, "tcp://*:%d", self->port); zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1); zsocket_bind (self->collector, "tcp://*:%d", self->port + 2); // Register our handlers with reactor zloop_reader (self->loop, self->snapshot, s_snapshots, self); zloop_reader (self->loop, self->collector, s_collector, self); zloop_timer (self->loop, 1000, 0, s_flush_ttl, self); // Run reactor until process interrupted zloop_start (self->loop); zloop_destroy (&self->loop); zhash_destroy (&self->kvmap); zctx_destroy (&self->ctx); free (self); return 0; } // --------------------------------------------------------------------- // Send snapshots to clients who ask for them static int s_send_single (char *key, void *data, void *args); // Routing information for a key-value snapshot typedef struct { void *socket; // ROUTER socket to send to zframe_t *identity; // Identity of peer who requested state char *subtree; // Client subtree specification } kvroute_t; static int s_snapshots (zloop_t *loop, void *snapshot, void *args) { clonesrv_t *self = (clonesrv_t *) args; zframe_t *identity = zframe_recv (snapshot); if (identity) { // Request is in second frame of message char *request = zstr_recv (snapshot); char *subtree = NULL; if (streq (request, "ICANHAZ?")) { free (request); subtree = zstr_recv (snapshot); } else printf ("E: bad request, aborting\n"); if (subtree) { // Send state socket to client kvroute_t routing = { snapshot, identity, subtree }; zhash_foreach (self->kvmap, s_send_single, &routing); // Now send END message with sequence number zclock_log ("I: sending shapshot=%d", (int) self->sequence); zframe_send (&identity, snapshot, ZFRAME_MORE); kvmsg_t *kvmsg = kvmsg_new (self->sequence); kvmsg_set_key (kvmsg, "KTHXBAI"); kvmsg_set_body (kvmsg, (byte *) subtree, 0); kvmsg_send (kvmsg, snapshot); kvmsg_destroy (&kvmsg); free (subtree); } } return 0; } // Send one state snapshot key-value pair to a socket // Hash item data is our kvmsg object, ready to send static int s_send_single (char *key, void *data, void *args) { kvroute_t *kvroute = (kvroute_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg)) && memcmp (kvroute->subtree, kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) { // Send identity of recipient first zframe_send (&kvroute->identity, kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); kvmsg_send (kvmsg, kvroute->socket); } return 0; } // --------------------------------------------------------------------- // Collect updates from clients static int s_collector (zloop_t *loop, void *collector, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = kvmsg_recv (collector); if (kvmsg) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_send (kvmsg, self->publisher); int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl")); if (ttl) kvmsg_set_prop (kvmsg, "ttl", "%" PRId64, zclock_time () + ttl * 1000); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: publishing update=%d", (int) self->sequence); } return 0; } // --------------------------------------------------------------------- // Purge ephemeral values that have expired static int s_flush_single (char *key, void *data, void *args); static int s_flush_ttl (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; zhash_foreach (self->kvmap, s_flush_single, args); return 0; } // If key-value pair has expired, delete it and publish the // fact to listening clients. static int s_flush_single (char *key, void *data, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; int64_t ttl; sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl); if (ttl && zclock_time () >= ttl) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, self->publisher); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: publishing delete=%d", (int) self->sequence); } return 0; }
可靠性
任何东东,一旦脱离的 可靠性,那么终将只是个玩具,虽然它也会带来无尽的麻烦。
之前的"主从模式"似乎不错,再稍微变动下,把“从”也当作客户端之一,接受、存储所有的更新,是不是有点靠谱的感觉。
主从的状态机制模型图:
机制为:客户端发现主服务器不返回了,那么就向从服务器请求,从服务器确认主服务器的确不行了,于是就上马充当服务器。主回归后,从再变为一个客户端。
客户端:
// // Clone client Model Six // // Lets us build this source without creating a library #include "clone.c" #define SUBTREE "/client/" int main (void) { // Create distributed hash instance clone_t *clone = clone_new (); // Specify configuration clone_subtree (clone, SUBTREE); clone_connect (clone, "tcp://localhost", "5556"); clone_connect (clone, "tcp://localhost", "5566"); // Set random tuples into the distributed hash while (!zctx_interrupted) { // Set random value, check it was stored char key [255]; char value [10]; sprintf (key, "%s%d", SUBTREE, randof (10000)); sprintf (value, "%d", randof (1000000)); clone_set (clone, key, value, randof (30)); sleep (1); } clone_destroy (&clone); return 0; }
客户端api:
/* ===================================================================== clone - client-side Clone Pattern class --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "clone.h" // If no server replies within this time, abandon request #define GLOBAL_TIMEOUT 4000 // msecs // Server considered dead if silent for this long #define SERVER_TTL 5000 // msecs // Number of servers we will talk to #define SERVER_MAX 2 // ===================================================================== // Synchronous part, works in our application thread // --------------------------------------------------------------------- // Structure of our class struct _clone_t { zctx_t *ctx; // Our context wrapper void *pipe; // Pipe through to clone agent }; // This is the thread that handles our real clone class static void clone_agent (void *args, zctx_t *ctx, void *pipe); // --------------------------------------------------------------------- // Constructor clone_t * clone_new (void) { clone_t *self; self = (clone_t *) zmalloc (sizeof (clone_t)); self->ctx = zctx_new (); self->pipe = zthread_fork (self->ctx, clone_agent, NULL); return self; } // --------------------------------------------------------------------- // Destructor void clone_destroy (clone_t **self_p) { assert (self_p); if (*self_p) { clone_t *self = *self_p; zctx_destroy (&self->ctx); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // Specify subtree for snapshot and updates, do before connect // Sends [SUBTREE][subtree] to the agent void clone_subtree (clone_t *self, char *subtree) { assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "SUBTREE"); zmsg_addstr (msg, subtree); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // Connect to new server endpoint // Sends [CONNECT][endpoint][service] to the agent void clone_connect (clone_t *self, char *address, char *service) { assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "CONNECT"); zmsg_addstr (msg, address); zmsg_addstr (msg, service); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // Set new value in distributed hash table // Sends [SET][key][value][ttl] to the agent void clone_set (clone_t *self, char *key, char *value, int ttl) { char ttlstr [10]; sprintf (ttlstr, "%d", ttl); assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "SET"); zmsg_addstr (msg, key); zmsg_addstr (msg, value); zmsg_addstr (msg, ttlstr); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // Lookup value in distributed hash table // Sends [GET][key] to the agent and waits for a value response // If there is no clone available, will eventually return NULL. char * clone_get (clone_t *self, char *key) { assert (self); assert (key); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "GET"); zmsg_addstr (msg, key); zmsg_send (&msg, self->pipe); zmsg_t *reply = zmsg_recv (self->pipe); if (reply) { char *value = zmsg_popstr (reply); zmsg_destroy (&reply); return value; } return NULL; } // ===================================================================== // Asynchronous part, works in the background // --------------------------------------------------------------------- // Simple class for one server we talk to typedef struct { char *address; // Server address int port; // Server port void *snapshot; // Snapshot socket void *subscriber; // Incoming updates uint64_t expiry; // When server expires uint requests; // How many snapshot requests made? } server_t; static server_t * server_new (zctx_t *ctx, char *address, int port, char *subtree) { server_t *self = (server_t *) zmalloc (sizeof (server_t)); zclock_log ("I: adding server %s:%d…", address, port); self->address = strdup (address); self->port = port; self->snapshot = zsocket_new (ctx, ZMQ_DEALER); zsocket_connect (self->snapshot, "%s:%d", address, port); self->subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (self->subscriber, "%s:%d", address, port + 1); zsockopt_set_subscribe (self->subscriber, subtree); return self; } static void server_destroy (server_t **self_p) { assert (self_p); if (*self_p) { server_t *self = *self_p; free (self->address); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // Our agent class // States we can be in #define STATE_INITIAL 0 // Before asking server for state #define STATE_SYNCING 1 // Getting state from server #define STATE_ACTIVE 2 // Getting new updates from server typedef struct { zctx_t *ctx; // Context wrapper void *pipe; // Pipe back to application zhash_t *kvmap; // Actual key/value table char *subtree; // Subtree specification, if any server_t *server [SERVER_MAX]; uint nbr_servers; // 0 to SERVER_MAX uint state; // Current state uint cur_server; // If active, server 0 or 1 int64_t sequence; // Last kvmsg processed void *publisher; // Outgoing updates } agent_t; static agent_t * agent_new (zctx_t *ctx, void *pipe) { agent_t *self = (agent_t *) zmalloc (sizeof (agent_t)); self->ctx = ctx; self->pipe = pipe; self->kvmap = zhash_new (); self->subtree = strdup (""); self->state = STATE_INITIAL; self->publisher = zsocket_new (self->ctx, ZMQ_PUB); return self; } static void agent_destroy (agent_t **self_p) { assert (self_p); if (*self_p) { agent_t *self = *self_p; int server_nbr; for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) server_destroy (&self->server [server_nbr]); zhash_destroy (&self->kvmap); free (self->subtree); free (self); *self_p = NULL; } } // Returns -1 if thread was interrupted static int agent_control_message (agent_t *self) { zmsg_t *msg = zmsg_recv (self->pipe); char *command = zmsg_popstr (msg); if (command == NULL) return -1; if (streq (command, "SUBTREE")) { free (self->subtree); self->subtree = zmsg_popstr (msg); } else if (streq (command, "CONNECT")) { char *address = zmsg_popstr (msg); char *service = zmsg_popstr (msg); if (self->nbr_servers < SERVER_MAX) { self->server [self->nbr_servers++] = server_new ( self->ctx, address, atoi (service), self->subtree); // We broadcast updates to all known servers zsocket_connect (self->publisher, "%s:%d", address, atoi (service) + 2); } else zclock_log ("E: too many servers (max. %d)", SERVER_MAX); free (address); free (service); } else if (streq (command, "SET")) { char *key = zmsg_popstr (msg); char *value = zmsg_popstr (msg); char *ttl = zmsg_popstr (msg); zhash_update (self->kvmap, key, (byte *) value); zhash_freefn (self->kvmap, key, free); // Send key-value pair on to server kvmsg_t *kvmsg = kvmsg_new (0); kvmsg_set_key (kvmsg, key); kvmsg_set_uuid (kvmsg); kvmsg_fmt_body (kvmsg, "%s", value); kvmsg_set_prop (kvmsg, "ttl", ttl); kvmsg_send (kvmsg, self->publisher); kvmsg_destroy (&kvmsg); puts (key); free (ttl); free (key); // Value is owned by hash table } else if (streq (command, "GET")) { char *key = zmsg_popstr (msg); char *value = zhash_lookup (self->kvmap, key); if (value) zstr_send (self->pipe, value); else zstr_send (self->pipe, ""); free (key); free (value); } free (command); zmsg_destroy (&msg); return 0; } // --------------------------------------------------------------------- // Asynchronous agent manages server pool and handles request/reply // dialog when the application asks for it. static void clone_agent (void *args, zctx_t *ctx, void *pipe) { agent_t *self = agent_new (ctx, pipe); while (TRUE) { zmq_pollitem_t poll_set [] = { { pipe, 0, ZMQ_POLLIN, 0 }, { 0, 0, ZMQ_POLLIN, 0 } }; int poll_timer = -1; int poll_size = 2; server_t *server = self->server [self->cur_server]; switch (self->state) { case STATE_INITIAL: // In this state we ask the server for a snapshot, // if we have a server to talk to… if (self->nbr_servers > 0) { zclock_log ("I: waiting for server at %s:%d…", server->address, server->port); if (server->requests < 2) { zstr_sendm (server->snapshot, "ICANHAZ?"); zstr_send (server->snapshot, self->subtree); server->requests++; } server->expiry = zclock_time () + SERVER_TTL; self->state = STATE_SYNCING; poll_set [1].socket = server->snapshot; } else poll_size = 1; break; case STATE_SYNCING: // In this state we read from snapshot and we expect // the server to respond, else we fail over. poll_set [1].socket = server->snapshot; break; case STATE_ACTIVE: // In this state we read from subscriber and we expect // the server to give hugz, else we fail over. poll_set [1].socket = server->subscriber; break; } if (server) { poll_timer = (server->expiry - zclock_time ()) * ZMQ_POLL_MSEC; if (poll_timer < 0) poll_timer = 0; } // ------------------------------------------------------------ // Poll loop int rc = zmq_poll (poll_set, poll_size, poll_timer); if (rc == -1) break; // Context has been shut down if (poll_set [0].revents & ZMQ_POLLIN) { if (agent_control_message (self)) break; // Interrupted } else if (poll_set [1].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket); if (!kvmsg) break; // Interrupted // Anything from server resets its expiry time server->expiry = zclock_time () + SERVER_TTL; if (self->state == STATE_SYNCING) { // Store in snapshot until we're finished server->requests = 0; if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { self->sequence = kvmsg_sequence (kvmsg); self->state = STATE_ACTIVE; zclock_log ("I: received from %s:%d snapshot=%d", server->address, server->port, (int) self->sequence); kvmsg_destroy (&kvmsg); } else kvmsg_store (&kvmsg, self->kvmap); } else if (self->state == STATE_ACTIVE) { // Discard out-of-sequence updates, incl. hugz if (kvmsg_sequence (kvmsg) > self->sequence) { self->sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: received from %s:%d update=%d", server->address, server->port, (int) self->sequence); } else kvmsg_destroy (&kvmsg); } } else { // Server has died, failover to next zclock_log ("I: server at %s:%d didn't give hugz", server->address, server->port); self->cur_server = (self->cur_server + 1) % self->nbr_servers; self->state = STATE_INITIAL; } } agent_destroy (&self); }
(未完待续)
发表评论
-
IM选型(初)
2016-08-23 19:12 1643主要参考文章: https://r ... -
关于python和rabbitmq的那点事儿
2011-10-19 14:15 7963rabbitmq是一个消息中间件,在之前的zmq介绍中有略带提 ... -
zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
2011-05-26 16:09 4197服务器: // // Clone server Mod ... -
zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
2011-05-26 15:04 3657在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导 ... -
zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
2011-05-25 16:55 2756作为发布/订阅模式的一个常用场景,大数据量的组播是有必要的。虽 ... -
zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
2011-05-25 16:24 4548在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅 ... -
zeroMQ初体验-29.可靠性-自由模式
2011-05-24 17:02 5407好吧,本以为这可能是一个更靠谱的模式,谁知(其实是我一厢情愿了 ... -
zeroMQ初体验-28.可靠性-主从模式
2011-05-23 14:47 5540虽然"硬盘模式" ... -
zeroMQ初体验-27.可靠性-硬盘模式
2011-05-23 13:44 3794在之前的种种模式中, ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:05 5653上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:03 1上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-25.可靠性-偏执的海盗模式
2011-05-05 19:05 3591虽然说“简单的海盗模 ... -
zeroMQ初体验-24.可靠性-简单的海盗模式
2011-05-05 16:41 3216相较于“懒惰的”做了 ... -
zeroMQ初体验-23.可靠性-懒惰的海盗模式
2011-05-05 16:15 5066相较于通常的阻塞模式,这里只是做了一点简单的动作来加强系统的可 ... -
zeroMQ初体验-22.可靠性-总览
2011-04-26 19:25 5938在开篇就从曾对zeromq的可靠性做过质疑,不过,作为一个雄心 ... -
rabbitmq 队列长度预设的曲线方案
2011-04-21 14:36 3400zeromq中倒是直接支持这个功能的。 类似于设定队列长度或 ... -
zeroMQ初体验-21.应答模式进阶(七)-云计算
2011-04-18 19:14 3536这里给出了一个最近很火的"云计算"案例。 ... -
zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
2011-04-18 17:22 3879某些时候,为了冗余的需要,可能会有这样的需求: impo ... -
zeroMQ初体验-19.应答模式进阶(五)-异步式应答
2011-04-15 15:23 4846恩,这应该算是比较实 ... -
zeroMQ初体验-18.应答模式进阶(四)-定制路由3
2011-04-02 15:39 5187从经典到超越经典。 首 ...
相关推荐
ZeroMQ支持多种协议,如PUB/SUB(发布/订阅)、REQ/REP(请求/响应)和PUSH/PULL(推送/拉取),这些模式提供了灵活的消息传递模型。此外,它还提供了一些高级特性,比如负载均衡、故障恢复和多路复用。 在实际使用...
6. **分布式**:ZeroMQ 支持多对多的发布/订阅模式,使得构建分布式系统更加容易。 zeromq-4.0.3.tar.gz 文件本身是一个 tar 归档,通常用于在 Unix-like 系统上打包和存储文件。归档后,文件被压缩为 .gz 格式,这...
这些套接字支持多种消息模式,包括请求/响应、发布/订阅、推送/拉取以及对等模式,使得开发者可以轻松地处理点对点、一对多和多对多的通信场景。 zeromq的安装过程一般包括解压、配置、编译和安装四个步骤: 1. **...
这个“zeromq-4.3.4.tar.gz”文件是0MQ库的4.3.4稳定版本,发布于2021年1月17日。下面我们将深入探讨0MQ的核心特性、主要功能以及如何使用这一版本。 1. **0MQ简介** - 0MQ不是一个传统的消息队列系统,而是一种在...
zeromq的核心特性包括点对点连接、发布/订阅模式、请求/响应模式以及推拉模式,这些模式为各种通信场景提供了基础。4.1.8版本可能包含了一些错误修复、性能提升或者新功能的添加,具体更新内容可以在其官方 change...
这里提供的四个压缩包文件,Python-2.6.6.tar.bz2、zeromq-3.2.5.tar.gz、jzmq.tar.gz以及storm-0.8.0.zip,都是与Storm搭建和运行相关的资源。 首先,我们来详细了解一下每个文件的作用: 1. **Python-2.6.6.tar....
1. **套接字(Sockets)**:在ZeroMQ中,套接字不仅仅是传统网络编程中的概念,它们提供了多种模式,如PUB(发布者)、SUB(订阅者)、REQ(请求者)、REP(响应者)、DEALER(经销商)和ROUTER(路由器),这些模式...
1. **消息队列模型**:ZeroMQ 提供了多种消息模式,如 Pub/Sub(发布/订阅)、Req/Rep(请求/响应)、PUSH/PULL(推/拉)和 XPUB/XSUB(扩展发布/订阅)。这些模式覆盖了各种通信场景,让开发者能够灵活选择最适合其...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
3. **模式丰富**:支持多种消息模式,如发布/订阅、请求/应答、推送/拉取和一对多等,这些模式可以灵活地组合以适应不同的应用场景。 4. **高可用性**:具有消息持久化和负载均衡功能,确保在故障情况下仍能保持...
zeromq的设计基于发布/订阅(Pub/Sub)、请求/应答(Req/Rep)和推送/拉取(Push/Pull)等经典的消息模式。这些模式覆盖了各种常见的分布式系统通信场景,如服务间调用、事件驱动架构、工作队列等。通过这些模式,...
ZeroMQ的核心概念是Socket(套接字),它提供了类似于网络编程中的套接字接口,但增加了许多高级功能,比如消息队列、负载均衡、高可用性和订阅/发布模式等。这些特性使得ZeroMQ成为构建分布式系统、微服务架构和...
在“zeromq-3.2.4.tar.gz”这个压缩包中,包含了zeromq的3.2.4版本源代码,用于在不同的系统上构建和安装zeromq。 zeromq的核心概念是基于发布/订阅(Publish/Subscribe)、请求/响应(Request/Reply)和推送/拉取...
首先,zeromq支持多种协议,包括TCP、IPC(进程间通信)、PUB/SUB(发布/订阅)、REQ/REP(请求/响应)、PAIR(对等)和DEALER/ROUTER(经销商/路由器)模式。这些模式为不同场景提供了灵活的选择,比如PUB/SUB模式...
这个“zeromq-4.1.0-rc1.zip”压缩包包含了ZeroMQ 4.1.0版本的源代码,这是一个预发布版本(Release Candidate),意味着它是正式版发布前的最后一个测试版本。 ZeroMQ的核心概念是提供一种抽象的网络通信层,允许...
在Storm中,零MQ作为数据传输的基础,它支持发布/订阅、请求/响应以及推拉等多种消息模式,使得Storm能够有效地处理实时数据流并确保数据的可靠传输。版本3.12.5是该库的一个稳定版本,可能包含了一些性能优化和bug...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
2. **模式丰富**:支持多种消息模式,如发布/订阅(Publish/Subscribe)、请求/响应(Request/Reply)、推拉(Push/Pull)以及管道对(Pair)。这些模式可以灵活地适应不同的应用场景。 3. **高并发**:ZeroMQ利用I...