`

Redis Replication

阅读更多

Redis Replication 初试

以下代码我就不翻译了,怕翻译不好见笑 ,备注:请您注意红色字体的地方 :)

 

Redis replication is a very simple to use and configure master-slave replication that allows slave Redis servers to be exact copies of master servers. The following are some very important facts about Redis replication:

  • A master can have multiple slaves.

  • Slaves are able to accept other slaves connections. Aside from connecting a number of slaves to the same master, slaves can also be connected to other slaves in a graph-like structure.

  • Redis replication is non-blocking on the master side, this means that the master will continue to serve queries when one or more slaves perform the first synchronization. Instead, replication is blocking on the slave side: while the slave is performing the first synchronization it can't reply to queries.

  • Replications can be used both for scalability, in order to have multiple slaves for read-only queries (for example, heavy SORT operations can be offloaded to slaves, or simply for data redundancy.

  • It is possible to use replication to avoid the saving process on the master side: just configure your master redis.conf to avoid saving (just comment all the "save" directives), then connect a slave configured to save from time to time.

How Redis replication works

If you set up a slave, upon connection it sends a SYNC command. And it doesn't matter if it's the first time it has connected or if it's a reconnection.

The master then starts background saving, and collects all new commands received that will modify the dataset. When the background saving is complete, the master transfers the database file to the slave, which saves it on disk, and then loads it into memory. The master will then send to the slave all accumulated commands, and all new commands received from clients that will modify the dataset. This is done as a stream of commands and is in the same format of the Redis protocol itself.

You can try it yourself via telnet. Connect to the Redis port while the server is doing some work and issue the SYNC command. You'll see a bulk transfer and then every command received by the master will be re-issued in the telnet session.

Slaves are able to automatically reconnect when the master <-> slave link goes down for some reason. If the master receives multiple concurrent slave synchronization requests, it performs a single background save in order to serve all of them.

Configuration

To configure replication is trivial: just add the following line to the slave configuration file:

slaveof 192.168.1.1 6379


 

以下代码为SYNC 初试

 

localhost:~ liuzheng$ telnet 0.0.0.0 6379
Trying 0.0.0.0...
Connected to 0.0.0.0.
Escape character is '^]'.

/* 发同步数据命令 */
SYNC   
/* 从master接收到了数据 */
$14634
REDIS0001#Aa1{'a': 1}#
1309916463.73)User:id:4e12e9be2590c41201000001:follower4e12e9be2590c412010000001309862334.6199999)User:id:4e12e9692591309861638.01)User:id:4e12ee172590c41315000002:followee4e12ee172590c41315000000
                                                                               1309863448.3)User:id:4e12e6452590c41128000001:follower4e12e6452590c411280000001309861445.8900001)User:id:4e12e9e12590c41208000000:followee4e12e9e12590c41201309862369.53)User:id:4e12e6c62590c41147000000:followee4e12e6c62590c41147000001
                                                                               1309861574.8)User:id:4e13bd2f2590c40141309916465.73)User:id:4e12ebe82590c4128e000000:followee4e12ebe82590c4128e0000001309862888.33999994e12ebe82590c4128e0000011309862888.3399999)User:id:4e12e97f2590c411f3000001:follower4e12e97f2590c411f30000001309862271.3299999)User:id:4e1309862512.55)User:id:4e12e8e82590c411cf000000:followee4e12e8e82590c411cf0000011309862120.8299999)User:id:4e12ec3e2590c412ab000000:followee4e12ec3e2590c412ab0000001309862975.42000014e12ec3e2590c412ab0000011309862974.4200001)User:id:4e12e5722590c410f0000001:follower4e12e5722590c410f00000001309861234.5599999)User:id:4e12e6362590c41125000000:followee4e12e6362590c411250000011309861430.6700001)User:id:4e12e9f42590c4120f000000:followee4e12e9f42590c4120f0000011309862388.6500001)User:id:4e12ebe82590c4128e000000:follower4e12ebe82590c4128e0000001309862888.33999994e12ebe82590c4128e000001131309862447.01)User:id:4e12ec3e2590c412ab000000:follower4e12ec3e2590c412ab0000001309862975.42000014e12ec3e2590c412ab0000011309862975.4200001)User:id:4e12ecb12590c412c2000000:followee4e12ecb12590c412c20000011309863089.61999994e12ecb12590c412c20000001309863090.6199999)User:id:4e12ea322590c41224000000:followee4e12ea322590c412240000011309862450.8399999)User:id:4e12e5b62590c41101000000:followee4e12e5b62590c411010000011309861302.0899999)User:id:4e13bd2f2590c4014b000002:f1309916464.73)User:id:4e12ea8c2590c4123b000000:followee4e12ea8c2590c4123b0000011309862540.8299999)User:id:4e12e9ea2591309862378.02)User:id:4e12ecb12590c412c2000000:follower4e12ecb12590c412c20000011309863090.61999994e12ecb12590c412c2001309862673.29)User:id:4e12ebe82590c4128e000001:followee4e12ebe82590c4128e0000001309862888.3399999)User:id:4e12e9692591309862249.53)User:id:4e12ea8c2590c4123b000000:follower4e12ea8c2590c4123b0000011309862540.8299999)User:id:4e12ec3e2590c412ab000001:followee4e12ec3e2590c412ab0000001309862975.4200001)User:id:4e12e9e12590c41208000001:follower4e12e9e12591309862369.53)User:id:4e12e6c62590c41147000001:follower4e12e6c62590c41147000000
                                                                               1309861574.8)User:id:4e12ed382590c412e1309862673.29)User:id:4e12ebe82590c4128e000001:follower4e12ebe82590c4128e0000001309862888.3399999)User:id:4e12e8e82590c411cf000001:follower4e12e8e82590c411cf0000001309862120.8299999)User:id:4e12ec3e2590c412ab000001:follower4e12ec3e2590c412ab0000001309862974.4200001)User:id:4e12ecb12590c412c2000001:followee4e12ecb12590c412c20000001309863090.6199999)U1309862452.23)User:id:4e12ea352590c41226000000:followee4e12ea352590c412260000011309862453.6800001)User:id:4e12ea8c2590c4123b000001:followee4e12ea8c2590c4123b0000001309862540.8299999)User:id:4e12e6362590c41125000001:follower4e12e6362591309863109.98)User:id:4e12e9f42590c4120f000001:follower4e12e9f42590c4120f0000001309862388.6400001)User:id:4e12eb512590c41269000000:followee4e12eb512590c412690000011309862737.39000014e12eb512590c412690000001309862737.3900001)User:id:4e12e6592590c4112d000000:followee4e12e6592590c4112d0000011309861465.9200001)User:id:4e12ecb12590c412c2000001:follower4e12ecb12590c412c20000001309863089.6199999)User:id:4e12ea322590c41224000001:follower4e12ea322590c412240000001309862450.8399999)User:id:4e12e5b62590c41101000001:follower4e12e5b62590c411010000001309861302.0899999)User:id:4e12eb112590c41251309862378.02)User:id:4e12ea8c2590c4123b000001:follower4e12ea8c2590c4123b0000001309862540.8299999)User:id:4e12eacf2590c4124c000000:followee4e12eacf2590c4124c0000011309862607.1700001)User:id:4e12ecc52590c412c7000000:follower4e12ecc52591309863111.98)User:id:4e12eb512590c41269000000:follower4e12eb512590c412690000011309862737.39000014e12eb512590c41269001309862418.55)User:id:4e12eacf2590c4124c000000:follower4e12eacf2590c4124c0000011309862607.1700001)User:id:4e12ecc52591309863224.27)User:id:4e12e7dd2590c4118b000000:followee4e12e7dd2590c4118b0000011309861853.6099999)User:id:4e12eb512590c41269000001:followee4e12eb512590c412690000001309862737.3900001)User:id:4e12e7942590c4117a000000:followee4e12e7942591309862452.23)User:id:4e12ea352590c41226000001:follower4e12ea352590c412260000001309862453.6800001)User:id:4e12e5db2590c41108000000:followee4e12e5db2590c411080000011309861339.3599999)User:id:4e12eacf2590c4124c000001:followee4e12eacf2591309861900.22)User:id:4e12eb512590c41269000001:follower4e12eb512590c412690000001309862737.3900001)User:id:4e12e9ef2591309862383.53)User:id:4e12e6592590c4112d000001:follower4e12e6592590c4112d0000001309861465.9200001)User:id:4e12ea4f2591309862479.25)User:id:4e12eacf2590c4124c000001:follower4e12eacf2590c4124c0000001309862607.1700001)User:id:4e12e59a2590c410fa000000:followee4e12e59a2590c410fa0000011309861274.8800001)User:id:4e12ea2b2590c41220000000:followee4e12ea2b2590c41220000001
1309863501.73)User:id:4e12e7dd2590c4118b000001:follower4e12e7dd2590c4118b0000001309861853.6099999)User:id:4e12eb642590c4126e000000:followee4e12eb642590c4126e0000011309862756.40000014e12eb642590c4126e0000001309862756.4000001)User:id:4e12e7942590c4117a000001:follower4e12e7942590c4117a0000001309861780.9000001)User:id:4e12e6122590c4111b000000:followee4e1309863500.72)User:id:4e12e7172590c41159000000:followee4e12e7172590c411590000011309861655.8099999)User:id:4e12e5f32590c41112000000:followee4e12e5f32590c411120000011309861363.8599999)User:id:4e12e5db2590c41108000001:follower4e12e5db2591309861900.22)User:id:4e12eb642590c4126e000000:follower4e12eb642590c4126e0000011309862756.40000014e12eb642590c4126e001309861405.71)User:id:4e12e8532590c411ab000000:followee4e12e8532590c411ab0000011309861971.3199999)User:id:4e12e59a2590c410fa000001:follower4e12e59a2590c410fa0000001309861274.8800001)User:id:4e12ea2b2590c41220000001:follower4e12ea2b2590c41220000000
1309862105.02)User:id:4e12eae82590c41253000000:followee4e12eae82590c412530000011309862632.1900001)User:id:4e12eb642590c4126e000001:followee4e12eb642590c4126e0000001309862756.4000001)User:id:4e12e94d2590c411e4000000:followee4e12e94d2591309916465.73)User:id:4e12eae82590c41253000000:follower4e12eae82590c412530000011309862632.1900001)User:id:4e12eb642590c4126e000001:follower4e12eb642590c4126e0000001309862756.4000001)User:id:4e12ec112590c41298000000:followee4e12ec112591309862929.97)User:id:4e12e6122590c4111b000001:follower4e12e6122590c4111b0000001309861394.5799999)User:id:4e12e5842591309861252.28)User:id:4e12e7172590c41159000001:follower4e12e7172590c411590000001309861655.8099999)User:id:4e13bd2f2591309916464.73)User:id:4e12e5f32590c41112000001:follower4e12e5f32590c411120000001309861363.8599999)User:id:4e12e9be2590c41201000000:followee4e12e9be2590c412010000011309862334.6199999)User:id:4e12ee172590c41315000000:follower4e12ee172591309863447.294e12ee172590c41315000002
1309861638.01)User:id:4e12eae82590c41253000001:followee4e12eae82590c412530000001309862632.1900001)User:id:4e12e6452590c41128000000:followee4e12e6452590c411280000011309861445.8900001)User:id:4e12e7592590c4116d000001:follower4e12e7592591309861721.01)User:id:4e12e97f2590c411f3000000:followee4e12e97f2590c411f30000011309862271.3299999)User:id:4e12e61d2590c4111e000001:follower4e12e61d2590c4111e000000
                                              1309861405.7)User:id:4e13bd2f2590c4014b000000:followee4e13bd2f2590c40141309862512.55)User:id:4e12e8532590c411ab000001:follower4e12e8532590c411ab0000001309861971.3199999)User:id:4e13bd2f2591309862947.52)User:id:4e12e5722590c410f0000000:followee4e12e5722590c410f00000011309861234.5599999)User:id:4e12eae82590c41253000001:follower4e12eae82590c412530000001309862632.1900001)User:id:4e12ee172590c41315000001:followee4e12ee172591309862929.97#User:id:4e12ec112590c41298000001:followee4e12ec112590c41298000000
              %User:id:4e27a133578786077a000001:post4e30ca58421aa902ad000001
13118203764e30ce0b421aa903c3000001
1311821323%User:id:4e1ab7c42590c41917000000:post4e2fc506421aa9134b000001
13117534784e2fc504421aa9134b000000
1311753476user.send_post##H@N{"post_id": "4e2fc506421aa9134b01", "user###1ab7c42590c41917@%00"}##F@N{"post_id": "4e2fc504421aa9134b0`", "user###1ab7c42590c41917`%0"}CommonPost:id:1:common_posts4e2fecc8421aa91974000000
13117636564e2fed23421aa91974000001
13117637474e2fed23421aa91974000002
1311763747broadcast.feed'{"feed_id": "4e2fed23421aa91974000002"}'{"feed_id": "4e2fed23421aa91974000001"}'{"feed_id": "4e2fecc8421aa91974000000"})User:id:4e1ab7c42590c41917000000:follower4e23cdcb5787860e0b000000
1311759734%User:id:4e27a133578786077a000001:feed4e30ca58421aa902ad000001
13118203764e30ce0b421aa903c3000001
1311821323)User:id:4e23cdcb5787860e0b000000:followee4e1ab7c42590c41917000000
/* 数据接收完成,处于保活期 */
 1311759734ING PING
PING
PING
PING
PING
PING
PING
PING
PING
PING
select 12
*3
$4
/* 发出删除命令,这时slave服务能立马响应到,master is non-blocking */
 ZREM
$41
User:id:4e1ab7c42590c41917000000:follower
$24
4e23cdcb5787860e0b000000
*3
$4
ZREM
$41
User:id:4e23cdcb5787860e0b000000:followee
$24
4e1ab7c42590c41917000000
*3
$5
LPUSH
$13
user.unfollow
$82
{"follower_id": "4e23cdcb5787860e0b000000", "user_id": "4e1ab7c42590c41917000000"}
*1
$5
MULTI
*4
$4 
/* 发出添加命令 */
ZADD
$41
User:id:4e23cdcb5787860e0b000000:followee
$10
1312339310
$24
4e1ab7c42590c41917000000
*1
$4
EXEC
*1
$5
MULTI
*4
$4
ZADD
$41
User:id:4e1ab7c42590c41917000000:follower
$10
1312339310
$24
4e23cdcb5787860e0b000000
*1
$4
EXEC
*3
$5
LPUSH
$11
user.follow
$98
{"follower_id": "4e23cdcb5787860e0b000000", "notice": true, "user_id": "4e1ab7c42590c41917000000"}
PING
^X^C^X^C^C^C^X^C^C^C^X^X^C^Cdd^C^C^X

 

 

  Replicate 复制详解

 

 

nt processCommand(redisClient *c) {
    struct redisCommand *cmd;

    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        cmd->proc != execCommand && cmd->proc != discardCommand &&
        cmd->proc != multiCommand && cmd->proc != watchCommand)
    {
        queueMultiCommand(c,cmd);
        addReply(c,shared.queued);
    } else {
        if (server.vm_enabled && server.vm_max_threads > 0 &&
            blockClientOnSwappedKeys(c,cmd)) return REDIS_ERR;
        /* 在这里,call是执行具体的命令入口,从这里进入我们来进行深度探索*/
        call(c,cmd);
    }
    return REDIS_OK;
}

 

 

/* Call() is the core of Redis execution of a command */
void call(redisClient *c, struct redisCommand *cmd) {
    long long dirty;

    dirty = server.dirty;
    cmd->proc(c);
    dirty = server.dirty-dirty;

    if (server.appendonly && dirty)
        feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
    if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
        listLength(server.slaves))
       /* 这里如果有更新set or del 操作就同步到slaves */
        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}
 

 

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int outc = 0, j;
    robj **outv;
    /* We need 1+(ARGS*3) objects since commands are using the new protocol
     * and we one 1 object for the first "*<count>\r\n" multibulk count, then
     * for every additional object we have "$<count>\r\n" + object + "\r\n". */
    robj *static_outv[REDIS_STATIC_ARGS*3+1];
    robj *lenobj;

    if (argc <= REDIS_STATIC_ARGS) {
        outv = static_outv;
    } else {
        outv = zmalloc(sizeof(robj*)*(argc*3+1));
    }

    lenobj = createObject(REDIS_STRING,
            sdscatprintf(sdsempty(), "*%d\r\n", argc));
    lenobj->refcount = 0;
    outv[outc++] = lenobj;
    for (j = 0; j < argc; j++) {
        lenobj = createObject(REDIS_STRING,
            sdscatprintf(sdsempty(),"$%lu\r\n",
                (unsigned long) stringObjectLen(argv[j])));
        lenobj->refcount = 0;
        outv[outc++] = lenobj;
        outv[outc++] = argv[j];
        outv[outc++] = shared.crlf;
    }

    /* Increment all the refcounts at start and decrement at end in order to
     * be sure to free objects if there is no slave in a replication state
     * able to be feed with commands */
    for (j = 0; j < outc; j++) incrRefCount(outv[j]);
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        /* Don't feed slaves that are still waiting for BGSAVE to start */
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;

        /* Feed all the other slaves, MONITORs and so on */
        if (slave->slaveseldb != dictid) {
            robj *selectcmd;

            switch(dictid) {
            case 0: selectcmd = shared.select0; break;
            case 1: selectcmd = shared.select1; break;
            case 2: selectcmd = shared.select2; break;
            case 3: selectcmd = shared.select3; break;
            case 4: selectcmd = shared.select4; break;
            case 5: selectcmd = shared.select5; break;
            case 6: selectcmd = shared.select6; break;
            case 7: selectcmd = shared.select7; break;
            case 8: selectcmd = shared.select8; break;
            case 9: selectcmd = shared.select9; break;
            default:
                selectcmd = createObject(REDIS_STRING,
                    sdscatprintf(sdsempty(),"select %d\r\n",dictid));
                selectcmd->refcount = 0;
                break;
            }
            addReply(slave,selectcmd);
            slave->slaveseldb = dictid;
        }
        /* 这里添加到 client buffer */
        for (j = 0; j < outc; j++) addReply(slave,outv[j]);
    }
    for (j = 0; j < outc; j++) decrRefCount(outv[j]);
    if (outv != static_outv) zfree(outv);
}
 

 

/* -----------------------------------------------------------------------------
 * Higher level functions to queue data on the client output buffer.
 * The following functions are the ones that commands implementations will call.
 * -------------------------------------------------------------------------- */

void addReply(redisClient *c, robj *obj) {
    if (_installWriteEvent(c) != REDIS_OK) return;
    redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);

    /* This is an important place where we can avoid copy-on-write
     * when there is a saving child running, avoiding touching the
     * refcount field of the object if it's not needed.
     *
     * If the encoding is RAW and there is room in the static buffer
     * we'll be able to send the object to the client without
     * messing with its page. */
    if (obj->encoding == REDIS_ENCODING_RAW) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);
    } else {
        /* FIXME: convert the long into string and use _addReplyToBuffer()
         * instead of calling getDecodedObject. As this place in the
         * code is too performance critical. */
        obj = getDecodedObject(obj);
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);
        decrRefCount(obj);
    }
}/* -----------------------------------------------------------------------------
 * Low level functions to add more data to output buffers.
 * -------------------------------------------------------------------------- */

int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    if (listLength(c->reply) > 0) return REDIS_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return REDIS_ERR;

    /* 添加到 client.buffer += data */
    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return REDIS_OK;
}


void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = privdata;
    int nwritten = 0, totwritten = 0, objlen;
    robj *o;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    while(c->bufpos > 0 || listLength(c->reply)) {
        if (c->bufpos > 0) {
            if (c->flags & REDIS_MASTER) {
                /* Don't reply to a master */
                nwritten = c->bufpos - c->sentlen;
            } else {
                nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
                if (nwritten <= 0) break;
            }
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If the buffer was sent, set bufpos to zero to continue with
             * the remainder of the reply. */
            if (c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o->ptr);

            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }

            if (c->flags & REDIS_MASTER) {
                /* Don't reply to a master */
                nwritten = objlen - c->sentlen;
            } else {
               /* socket.write buffer  to client */
                nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
                if (nwritten <= 0) break;
            }
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If we fully sent the object on head go to the next one */
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
            }
        }
        /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
         * bytes, in a single threaded server it's a good idea to serve
         * other clients as well, even if a very large request comes from
         * super fast link that is always able to accept data (in real world
         * scenario think about 'KEYS *' against the loopback interfae) */
        if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
    }
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            redisLog(REDIS_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }
    if (totwritten > 0) c->lastinteraction = time(NULL);
    if (listLength(c->reply) == 0) {
        c->sentlen = 0;
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

        /* Close connection after entire reply has been sent. */
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);
    }
}

/* Set the event loop to listen for write events on the client's socket.
 * Typically gets called every time a reply is built. */
int _installWriteEvent(redisClient *c) {
    if (c->fd <= 0) return REDIS_ERR;
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}


int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= AE_SETSIZE) return AE_ERR;
    aeFileEvent *fe = &eventLoop->events[fd];

    /* epoll or kqueue */
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc; 
    if (mask & AE_WRITABLE) fe->wfileProc = proc; /* sendReplyToClient */ 
         fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

/* -----------------------------------------------SLAVE-----------------------------------------------*/

void initServer() {
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    if (!(loops % 10)) replicationCron();

    server.cronloops++;
    return 100;
}


 /* slave run replicationCron */
void replicationCron(void) {
    /* Bulk transfer I/O timeout? */
    if (server.masterhost && server.replstate == REDIS_REPL_TRANSFER &&
        (time(NULL)-server.repl_transfer_lastio) > REDIS_REPL_TIMEOUT)
    {
        redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
        replicationAbortSyncTransfer();
    }

    /* Timed out master when we are an already connected slave? */
    if (server.masterhost && server.replstate == REDIS_REPL_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > REDIS_REPL_TIMEOUT)
    {
        redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
        freeClient(server.master);
    }

    /* Check if we should connect to a MASTER */
    if (server.replstate == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (syncWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started: SYNC sent");
        }
    }
    
    /* If we have attached slaves, PING them from time to time.
     * So slaves can implement an explicit timeout to masters, and will
     * be able to detect a link disconnection even if the TCP connection
     * will not actually go down. */
    if (!(server.cronloops % (REDIS_REPL_PING_SLAVE_PERIOD*10))) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;

            /* Don't ping slaves that are in the middle of a bulk transfer
             * with the master for first synchronization. */
            if (slave->replstate == REDIS_REPL_SEND_BULK) continue;
            if (slave->replstate == REDIS_REPL_ONLINE) {
                /* If the slave is online send a normal ping */
                addReplySds(slave,sdsnew("PING\r\n"));
            } else {
                /* Otherwise we are in the pre-synchronization stage.
                 * Just a newline will do the work of refreshing the
                 * connection last interaction time, and at the same time
                 * we'll be sure that being a single char there are no
                 * short-write problems. */
                if (write(slave->fd, "\n", 1) == -1) {
                    /* Don't worry, it's just a ping. */
                }
            }
        }
    }
}



int syncWithMaster(void) {
    char buf[1024], tmpfile[256], authcmd[1024];
    int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
    int dfd, maxtries = 5;

    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    /* AUTH with the master if required. */
    if(server.masterauth) {
    	snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
    	if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
            close(fd);
            redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
                strerror(errno));
            return REDIS_ERR;
    	}
        /* Read the AUTH result.  */
        if (syncReadLine(fd,buf,1024,3600) == -1) {
            close(fd);
            redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        if (buf[0] != '+') {
            close(fd);
            redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
            return REDIS_ERR;
        }
    }

    /* Issue the SYNC command */
    if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
        close(fd);
        redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)time(NULL),(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        close(fd);
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
        return REDIS_ERR;
    }

    /* Setup the non blocking download of the bulk file. */
    if (aeCreateFileEvent(server.el, fd, AE_READABLE, readSyncBulkPayload, NULL)
            == AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }
    server.replstate = REDIS_REPL_TRANSFER;
    server.repl_transfer_left = -1;
    server.repl_transfer_s = fd;
    server.repl_transfer_fd = dfd;
    server.repl_transfer_lastio = time(NULL);
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return REDIS_OK;
}



/* Asynchronously read the SYNC payload we receive from a master */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
    char buf[4096];
    ssize_t nread, readlen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(privdata);
    REDIS_NOTUSED(mask);

    /* If repl_transfer_left == -1 we still have to read the bulk length
     * from the master reply. */
    if (server.repl_transfer_left == -1) {
        if (syncReadLine(fd,buf,1024,3600) == -1) {
            redisLog(REDIS_WARNING,
                "I/O error reading bulk count from MASTER: %s",
                strerror(errno));
            replicationAbortSyncTransfer();
            return;
        }
        if (buf[0] == '-') {
            redisLog(REDIS_WARNING,
                "MASTER aborted replication with an error: %s",
                buf+1);
            replicationAbortSyncTransfer();
            return;
        } else if (buf[0] == '\0') {
            /* At this stage just a newline works as a PING in order to take
             * the connection live. So we refresh our last interaction
             * timestamp. */
            server.repl_transfer_lastio = time(NULL);
            return;
        } else if (buf[0] != '$') {
            redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
            replicationAbortSyncTransfer();
            return;
        }
        server.repl_transfer_left = strtol(buf+1,NULL,10);
        redisLog(REDIS_NOTICE,
            "MASTER <-> SLAVE sync: receiving %ld bytes from master",
            server.repl_transfer_left);
        return;
    }

    /* Read bulk data */
    readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
        server.repl_transfer_left : (signed)sizeof(buf);
    nread = read(fd,buf,readlen);
    if (nread <= 0) {
        redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
            (nread == -1) ? strerror(errno) : "connection lost");
        replicationAbortSyncTransfer();
        return;
    }
    server.repl_transfer_lastio = time(NULL);
    if (write(server.repl_transfer_fd,buf,nread) != nread) {
        redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
        replicationAbortSyncTransfer();
        return;
    }
    server.repl_transfer_left -= nread;
    /* Check if the transfer is now complete */
    if (server.repl_transfer_left == 0) {
        if (rename(server.repl_transfer_tmpfile,server.dbfilename) == -1) {
            redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
            replicationAbortSyncTransfer();
            return;
        }
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
        emptyDb();
        /* Before loading the DB into memory we need to delete the readable
         * handler, otherwise it will get called recursively since
         * rdbLoad() will call the event loop to process events from time to
         * time for non blocking loading. */
        aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
        if (rdbLoad(server.dbfilename) != REDIS_OK) {
            redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
            replicationAbortSyncTransfer();
            return;
        }
        /* Final setup of the connected slave <- master link */
        zfree(server.repl_transfer_tmpfile);
        close(server.repl_transfer_fd);
        server.master = createClient(server.repl_transfer_s);
        server.master->flags |= REDIS_MASTER;
        server.master->authenticated = 1;
        server.replstate = REDIS_REPL_CONNECTED;
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
        /* Rewrite the AOF file now that the dataset changed. */
        if (server.appendonly) rewriteAppendOnlyFileBackground();
    }
}

1
2
分享到:
评论

相关推荐

    Redis-x64-5.0.14.1.zip

    - **主从复制(Redis Replication)**:通过复制机制,实现数据备份和负载均衡。 - **集群(Clustering)**:通过Redis Cluster实现数据的分布式存储,提高系统的扩展性和可用性。 在Windows环境下使用Redis时,需要...

    redis中文参考手册.pdf

    此外,手册还涵盖了事务(Transaction)、发布订阅(Publish/Subscribe)、持久化(Persistence)、复制(Redis Replication)、集群(Clustering)和Lua脚本(Lua Scripting)等功能,这些都是构建高效分布式应用的关键组成部分...

    redis_simple.zip

    除了这些基本操作,Redis 还提供了一些高级特性,如事务(Transaction)、发布订阅(Publish/Subscribe)、持久化(Persistence)和主从复制(Redis Replication)等。其中,持久化确保数据在服务器重启后仍然可用,常见的...

    Redis缓存服务

    1. Redis Replication 的特点和优势: * 同一个 Master 可以同步多个 Slaves。 * Slave 同样可以接受其它 Slaves 的连接和同步请求。 * Master Server 是以非阻塞的方式为 Slaves 提供服务。 * Slave Server ...

    Redis API帮助文档(中文版)

    8. **复制(Redis Replication)**: 支持主从复制,保证数据的安全性和高可用性。主服务器的所有写操作会同步到从服务器。 9. **发布订阅(Publish/Subscribe)**: 通过频道进行消息通信,提供了一种解耦的通信模式。`...

    redis2 windows

    4. **复制(Redis Replication)**:Redis 2.0 提供了主从复制功能,可以将数据从主节点自动同步到多个从节点,以提高可用性和容错性。 5. **排序(Sorting)**:通过 `SORT` 命令,Redis 2.0 允许对集合和列表进行排序...

    100讲带你实战基于Redis的高并发预约抢购系统.zip

    4. **主从复制与安全**:11_redis replication以及master持久化对主从架构的安全意义,这部分讲述了主从复制在故障恢复中的作用,确保数据的一致性和系统的高可用性。 5. **多级缓存架构**:在处理百万流量的商品...

    Redis命令参考手册完整版(PDF)

    - **复制(Redis Replication)**:主从复制确保数据的安全性,支持多级复制。 10. **内存管理与性能优化** - `FLUSHDB/FLUSHALL`:清除当前数据库或所有数据库的数据。 - `INFO`:获取Redis服务器的状态信息,...

    9. redis replication的完整流运行程和原理深入剖析

    master host和ip是从哪儿来的,redis.conf里面的slaveof配置的 slave node内部有个定时任务,每秒检查是否有新的master node要连接和复制,如果发现,就跟master node建立socket网络连接 slave node发送pi

    《Redis实战》

    3. 高级特性:Redis的高级特性包括事务(Transactions)、发布/订阅(Publish/Subscribe)、持久化(Persistence)和复制(Redis Replication)。通过学习这些特性,读者可以理解如何确保数据安全,以及如何在多实例间同步...

    redis服务器,直接服务打开

    5. **复制(Redis Replication)**:Redis服务器可以设置为复制主服务器的数据,实现数据冗余和高可用性。 **启动Redis服务器:** 1. **安装Redis**:在Windows上,可以下载类似Redis-x64-3.0.504.msi这样的安装包...

    Redis命令参考手册.zip

    9. **主从复制(Redis Replication)** Redis支持主从复制,通过`SLAVEOF`命令将一个实例设置为另一个实例的副本,实现数据备份和负载均衡。 10. **集群(Clustering)** Redis集群可以水平扩展数据存储,通过`...

    100讲带你实战基于Redis的高并发.zip

    10. **11_redis replication以及master持久化对主从架构的安全意义** - 讲解主从复制(replication)的工作原理,以及主节点的持久化对数据一致性及系统容错性的保障。 通过这些内容,读者将能够学习到Redis的高级...

    cpp-Redisreplicator一款专注redis数据同步的工具

    Redis Replicator是一款rdb解析以及命令解析的工具. 此工具完整实现了redis replication协议.支持sync,psync,psync2等三种同步命令. 还支持远程rdb文件备份以及数据同步等功能.

    Redis命令参考

    9. **主从复制(Redis Replication)**:通过主从复制,可以创建数据备份并分散读取负载。主节点负责写操作,从节点同步主节点数据并处理读请求。 10. **集群(Clustering)**:Redis Cluster提供自动分片和故障转移...

    Redis教程(九):主从复制配置实例

    一、Redis的Replication:  这里首先需要说明的是,在Redis中配置Master-Slave模式真是太简单了。相信在阅读完这篇Blog之后你也可以轻松做到。这里我们还是先列出一些理论性的知识,后面给出实际操作的案例。  ...

    redis-6.0.3-win64.7z

    Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning ...

    redis-windows-7.2.3.zip

    Redis还支持事务(Transactions)、发布/订阅(Pub/Sub)模式、主从复制(Replication)以及集群(Cluster)功能,确保数据的一致性和高可用性。 在Windows环境中,我们还可以借助可视化工具如RedisInsight、Redis ...

    redis-6.0.0 windows 版

    5. **Redis功能**:Redis 6.0.0引入了许多新特性,如流(Streams)数据结构、客户端缓存(Client-side Caching)、低延迟复制(Low-Latency Replication)、多线程执行等。这些特性提升了Redis的性能和功能多样性。 6. **...

Global site tag (gtag.js) - Google Analytics