`
m635674608
  • 浏览: 5069303 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

分布式一致性协议Raft原理与实例

 
阅读更多

分布式一致性协议Raft原理与实例


1.Raft协议

1.1 Raft简介

Raft是由Stanford提出的一种更易理解的一致性算法,意在取代目前广为使用的Paxos算法。目前,在各种主流语言中都有了一些开源实现,比如本文中将使用的基于JGroups的Raft协议实现。关于Raft的原理,强烈推荐动画版Raft讲解

1.2 Raft原理

在Raft中,每个结点会处于下面三种状态中的一种:

  • follower:所有结点都以follower的状态开始。如果没收到leader消息则会变成candidate状态
  • candidate:会向其他结点“拉选票”,如果得到大部分的票则成为leader。这个过程就叫做Leader选举(Leader Election)
  • leader:所有对系统的修改都会先经过leader。每个修改都会写一条日志(log entry)。leader收到修改请求后的过程如下,这个过程叫做日志复制(Log Replication):
    1. 复制日志到所有follower结点(replicate entry)
    2. 大部分结点响应时才提交日志
    3. 通知所有follower结点日志已提交
    4. 所有follower也提交日志
    5. 现在整个系统处于一致的状态

1.2.1 Leader Election

当follower在选举超时时间(election timeout)内未收到leader的心跳消息(append entries),则变成candidate状态。为了避免选举冲突,这个超时时间是一个150~300ms之间的随机数

成为candidate的结点发起新的选举期(election term)去“拉选票”:

  1. 重置自己的计时器
  2. 投自己一票
  3. 发送 Request Vote消息

如果接收结点在新term内没有投过票那它就会投给此candidate,并重置它自己的选举超时时间。candidate拉到大部分选票就会成为leader,并定时发送心跳——Append Entries消息,去重置各个follower的计时器。当前Term会继续直到某个follower接收不到心跳并成为candidate。

如果不巧两个结点同时成为candidate都去“拉票”怎么办?这时会发生Splite Vote情况。两个结点可能都拉到了同样多的选票,难分胜负,选举失败,本term没有leader。之后又有计时器超时的follower会变成candidate,将term加一并开始新一轮的投票。

1.2.2 Log Replication

当发生改变时,leader会复制日志给follower结点,这也是通过Append Entries心跳消息完成的。前面已经列举了Log Replication的过程,这里就不重复了。

Raft能够正确地处理网络分区(“脑裂”)问题。假设A~E五个结点,B是leader。如果发生“脑裂”,A、B成为一个子分区,C、D、E成 为一个子分区。此时C、D、E会发生选举,选出C作为新term的leader。这样我们在两个子分区内就有了不同term的两个leader。这时如果 有客户端写A时,因为B无法复制日志到大部分follower所以日志处于uncommitted未提交状态。而同时另一个客户端对C的写操作却能够正确 完成,因为C是新的leader,它只知道D和E。

当网络通信恢复,B能够发送心跳给C、D、E了,却发现“改朝换代”了,因为C的term值更大,所以B自动降格为follower。然后A和B都回滚未提交的日志,并从新leader那里复制最新的日志。但这样是不是就会丢失更新?


2.JGroups-raft介绍

2.1 JGroups中的Raft

JGroups是Java里比较流行的网络通信框架,近期顺应潮流,它也推出了Raft基于JGroups的实现。简单试用了一下,还比较容易上 手,底层Raft的内部机制都被API屏蔽掉了。下面就通过一个分布式计数器的实例来学习一下Raft协议在JGroups中的实际用法。

Maven依赖如下:

    <dependency>
        <groupId>org.jgroups</groupId>
        <artifactId>jgroups-raft</artifactId>
        <version>0.2</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

其实JGroups-raft的Jar包中已经自带了一个Counter的Demo,但仔细看了一下,有的地方写的有些麻烦,不太容易把握住Raft这根主线。所以这里就参照官方的例子,进行了简写,突出Raft协议的基本使用方法。JGroups-raft目前资料不多,InfoQ上的这篇文章很不错,还有官方文档

2.2 核心API

使用JGroups-raft时,我们一般会实现两个接口:RAFT.RoleChange和StateMachine

  • 实现RAFT.RoleChange接口的方法能通知我们当前哪个结点是leader
  • 实现StateMachine执行要实现一致性的操作

典型单点服务实现方式就是:

JChannel ch = null;
RaftHandle handle = new RaftHandle(ch, this);
handle.addRoleListener(role -> {
    if(role == Role.Leader)
        // start singleton services
    else
        // stop singleton services
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.3 默认配置

jgroups-raft.jar中已经带了一个raft.xml配置文件,作为实例程序我们可以直接使用它。

简要解释一下最核心的几个配置项,参照GitHub上的文档

  • UDP:IP多播配置
  • raft.NO_DUPES:是否检测新加入结点的ID与老结点有重复
  • raft.ELECTION:选举超时时间的随机化范围
  • raft.RAFT所有Raft集群的成员必须在这里声明,也可以在运行时通过addServer/removeServer动态修改
  • raft.REDIRECT:是否转发请求给leader
  • raft.CLIENT:在哪个IP和端口上接收客户端请求
<!--
  Default stack using IP multicasting. It is similar to the "udp"
  stack in stacks.xml, but doesn't use streaming state transfer and flushing
  author: Bela Ban
-->

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
    <UDP
         mcast_addr="228.5.5.5"
         mcast_port="${jgroups.udp.mcast_port:45588}"
         ... />
    ...
    <raft.NO_DUPES/>
    <raft.ELECTION election_min_interval="100" election_max_interval="500"/>
    <raft.RAFT members="A,B,C" raft_id="${raft_id:undefined}"/>
    <raft.REDIRECT/>
    <raft.CLIENT bind_addr="0.0.0.0" />
</config>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

3.JGroups-raft实例

实例很简单,只有JGroupsRaftTest和CounterService两个类组成。JGroupsRaftTest是测试启动类,而CounterService就是利用Raft协议实现的分布式计数服务类。

3.1 JGroupsRaftTest

JGroupsRaftTest的职责主要有三个:

  • 创建Raft协议的JChannel
  • 创建CounterService
  • 循环读取用户输入

目前简单实现了几种操作包括:初始化计数器、加一、减一、读取计数器、查看Raft日志、做Raft快照(用于压缩日志文件)等。其中对计数器的操作,因为要与其他Raft成员进行分布式通信,所以当前集群必须要多于一个结点时才能进行操作。如果要支持单结点时的操作,需要做特殊处理

import org.jgroups.JChannel;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.util.Util;

/**
 * Test jgroups raft algorithm implementation.
 */
public class JGroupsRaftTest {

    private static final String CLUSTER_NAME = "ctr-cluster";
    private static final String COUNTER_NAME = "counter";
    private static final String RAFT_XML = "raft.xml";

    public static void main(String[] args) throws Exception {
        JChannel ch = new JChannel(RAFT_XML).name(args[0]);
        CounterService counter = new CounterService(ch);

        try {
            doConnect(ch, CLUSTER_NAME);
            doLoop(ch, counter);
        } finally {
            Util.close(ch);
        }
    }

    private static void doConnect(JChannel ch, String clusterName) throws Exception {
        ch.connect(clusterName);
    }

    private static void doLoop(JChannel ch, CounterService counter) {
        boolean looping = true;
        while (looping) {
            int key = Util.keyPress("\n[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit\n" +
                    "first-applied=" + ((RAFT) ch.getProtocolStack().findProtocol(RAFT.class)).log().firstApplied() +
                    ", last-applied=" + counter.lastApplied() +
                    ", commit-index=" + counter.commitIndex() +
                    ", log size=" + Util.printBytes(counter.logSize()) + ": ");

            if ((key == '0' || key == '1' || key == '2') && !counter.isLeaderExist()) {
                System.out.println("Cannot perform cause there is no leader by now");
                continue;
            }

            long val;
            switch (key) {
                case '0':
                    counter.getOrCreateCounter(COUNTER_NAME, 1L);
                    break;
                case '1':
                    val = counter.incrementAndGet(COUNTER_NAME);
                    System.out.printf("%s: %s\n", COUNTER_NAME, val);
                    break;
                case '2':
                    val = counter.decrementAndGet(COUNTER_NAME);
                    System.out.printf("%s: %s\n", COUNTER_NAME, val);
                    break;
                case '3':
                    counter.dumpLog();
                    break;
                case '4':
                    counter.snapshot();
                    break;
                case 'x':
                    looping = false;
                    break;
                case '\n':
                    System.out.println(COUNTER_NAME + ": " + counter.get(COUNTER_NAME) + "\n");
                    break;
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

3.2 CounterService

CounterService是我们的核心类,利用Raft实现了分布式的计数器操作,它的API主要由四部分组成:

  • Raft Local API:操作本地Raft的状态,像日志大小、做快照等
  • Raft API:实现Raft的监听器和状态机的方法
    • roleChanged:本地Raft的角色发生变化
    • apply:分布式通信消息
    • readContentFrom/writeContentTo:读写快照
  • Counter API:计数器的分布式API
  • Counter Native API:计数器的本地API。直接使用的话相当于脏读
import org.jgroups.Channel;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.Role;
import org.jgroups.protocols.raft.StateMachine;
import org.jgroups.raft.RaftHandle;
import org.jgroups.util.AsciiString;
import org.jgroups.util.Bits;
import org.jgroups.util.ByteArrayDataInputStream;
import org.jgroups.util.ByteArrayDataOutputStream;
import org.jgroups.util.Util;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * Distribute counter service based on Raft consensus algorithm.
 */
class CounterService implements StateMachine, RAFT.RoleChange {

    private RaftHandle raft;

    private final Map<String, Long> counters;

    private enum Command {
        CREATE, INCREMENT_AND_GET, DECREMENT_AND_GET, GET, SET
    }

    public CounterService(Channel ch) {
        this.raft = new RaftHandle(ch, this);
        this.counters = new HashMap<>();

        raft.raftId(ch.getName())
            .addRoleListener(this);
    }

    // ===========================================
    //              Raft Status API
    // ===========================================

    public int lastApplied() {
        return raft.lastApplied();
    }

    public int commitIndex() {
        return raft.commitIndex();
    }

    public int logSize() {
        return raft.logSize();
    }

    public void dumpLog() {
        System.out.println("\nindex (term): command\n---------------------");
        raft.logEntries((entry, index) -> {
            StringBuilder log = new StringBuilder()
                    .append(index)
                    .append(" (").append(entry.term()).append("): ");

            if (entry.command() == null ) {
                System.out.println(log.append("<marker record>"));
                return;
            } else if (entry.internal()) {
                System.out.println(log.append("<internal command>"));
                return;
            }

            ByteArrayDataInputStream in = new ByteArrayDataInputStream(
                    entry.command(), entry.offset(), entry.length()
            );
            try {
                Command cmd = Command.values()[in.readByte()];
                String name = Bits.readAsciiString(in).toString();
                switch (cmd) {
                    case CREATE:
                        log.append(cmd)
                            .append("(").append(name).append(", ")
                            .append(Bits.readLong(in))
                            .append(")");
                        break;
                    case GET:
                    case INCREMENT_AND_GET:
                    case DECREMENT_AND_GET:
                        log.append(cmd)
                            .append("(").append(name).append(")");
                        break;
                    default:
                        throw new IllegalArgumentException("Command " + cmd + "is unknown");
                }
                System.out.println(log);
            }
            catch (IOException e) {
                throw new IllegalStateException("Error when dump log", e);
            }
        });
        System.out.println();
    }

    public void snapshot() {
        try {
            raft.snapshot();
        } catch (Exception e) {
            throw new IllegalStateException("Error when snapshot", e);
        }
    }

    public boolean isLeaderExist() {
        return raft.leader() != null;
    }

    // ===========================================
    //              Raft API
    // ===========================================

    @Override
    public void roleChanged(Role role) {
        System.out.println("roleChanged to: " + role);
    }

    @Override
    public byte[] apply(byte[] data, int offset, int length) throws Exception {
        ByteArrayDataInputStream in = new ByteArrayDataInputStream(data, offset, length);
        Command cmd = Command.values()[in.readByte()];
        String name = Bits.readAsciiString(in).toString();
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                + "] Apply: cmd=[" + cmd + "]");

        long v1, retVal;
        switch (cmd) {
            case CREATE:
                v1 = Bits.readLong(in);
                retVal = create0(name, v1);
                return Util.objectToByteBuffer(retVal);
            case GET:
                retVal = get0(name);
                return Util.objectToByteBuffer(retVal);
            case INCREMENT_AND_GET:
                retVal = add0(name, 1L);
                return Util.objectToByteBuffer(retVal);
            case DECREMENT_AND_GET:
                retVal = add0(name, -1L);
                return Util.objectToByteBuffer(retVal);
            default:
                throw new IllegalArgumentException("Command " + cmd + "is unknown");
        }
    }

    @Override
    public void readContentFrom(DataInput in) throws Exception {
        int size = in.readInt();
        System.out.println("ReadContentFrom: size=[" + size + "]");
        for (int i = 0; i < size; i++) {
            AsciiString name = Bits.readAsciiString(in);
            Long value = Bits.readLong(in);
            counters.put(name.toString(), value);
        }
    }

    @Override
    public void writeContentTo(DataOutput out) throws Exception {
        synchronized (counters) {
            int size = counters.size();
            System.out.println("WriteContentFrom: size=[" + size + "]");
            out.writeInt(size);
            for (Map.Entry<String, Long> entry : counters.entrySet()) {
                AsciiString name = new AsciiString(entry.getKey());
                Long value = entry.getValue();
                Bits.writeAsciiString(name, out);
                Bits.writeLong(value, out);
            }
        }
    }

    // ===========================================
    //              Counter API
    // ===========================================

    public void getOrCreateCounter(String name, long initVal) {
        Object retVal = invoke(Command.CREATE, name, false, initVal);
        counters.put(name, (Long) retVal);
    }

    public long incrementAndGet(String name) {
        return (long) invoke(Command.INCREMENT_AND_GET, name, false);
    }

    public long decrementAndGet(String name) {
        return (long) invoke(Command.DECREMENT_AND_GET, name, false);
    }

    public long get(String name) {
        return (long) invoke(Command.GET, name, false);
    }

    private Object invoke(Command cmd, String name, boolean ignoreRetVal, long... values) {
        ByteArrayDataOutputStream out = new ByteArrayDataOutputStream(256);
        try {
            out.writeByte(cmd.ordinal());
            Bits.writeAsciiString(new AsciiString(name), out);
            for (long val : values) {
                Bits.writeLong(val, out);
            }

            byte[] rsp = raft.set(out.buffer(), 0, out.position());
            return ignoreRetVal ? null : Util.objectFromByteBuffer(rsp);
        }
        catch (IOException ex) {
            throw new RuntimeException("Serialization failure (cmd="
                    + cmd + ", name=" + name + ")", ex);
        }
        catch (Exception ex) {
            throw new RuntimeException("Raft set failure (cmd="
                    + cmd + ", name=" + name + ")", ex);
        }
    }

    // ===========================================
    //              Counter Native API
    // ===========================================

    public synchronized Long create0(String name, long initVal) {
        counters.putIfAbsent(name, initVal);
        return counters.get(name);
    }

    public synchronized Long get0(String name) {
        return counters.getOrDefault(name, 0L);
    }

    public synchronized Long add0(String name, long delta) {
        Long oldVal = counters.getOrDefault(name, 0L);
        return counters.put(name, oldVal + delta);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238

3.3 运行测试

我们分别以A、B、C为参数,启动三个JGroupsRaftTest服务。这样会自动在C:\Users\cdai\AppData\Local\Temp下生成A.log、B.log、C.log三个日志文件夹。

cdai@vm /cygdrive/c/Users/cdai/AppData/Local/Temp
$ tree A.log/ B.log/ C.log/
A.log/
|-- 000005.sst
|-- 000006.log
|-- CURRENT
|-- LOCK
|-- LOG
|-- LOG.old
`-- MANIFEST-000004
B.log/
|-- 000003.log
|-- CURRENT
|-- LOCK
|-- LOG
`-- MANIFEST-000002
C.log/
|-- 000003.log
|-- CURRENT
|-- LOCK
|-- LOG
`-- MANIFEST-000002

0 directories, 17 files
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3.3.1 分布式一致性

首先A创建计数器,B“加一”,C“减一”。可以看到尽管我们是分别在A、B、C上执行这三个操作,但三个结点都先后(leader提交日志后通知follower)通过apply()方法收到消息,并在本地的计数器Map上同步执行操作,保证了数据的一致性。最后停掉A服务,可以看到B通过roleChanged()得到消息,提升为新的Leader,并与C一同继续提供服务。

A的控制台输出:

-------------------------------------------------------------------
GMS: address=A, cluster=ctr-cluster, physical address=2001:0:9d38:6abd:cbb:1f78:3f57:50f6:50100
-------------------------------------------------------------------

[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit
first-applied=0, last-applied=0, commit-index=0, log size=0b: 
roleChanged to: Candidate
roleChanged to: Leader
0
[14:16:00.744] Apply: cmd=[CREATE]

[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit
first-applied=0, last-applied=1, commit-index=1, log size=1b: 
[14:16:07.002] Apply: cmd=[INCREMENT_AND_GET]
[14:16:14.264] Apply: cmd=[DECREMENT_AND_GET]
3

index (term): command
---------------------
1 (29): CREATE(counter, 1)
2 (29): INCREMENT_AND_GET(counter)
3 (29): DECREMENT_AND_GET(counter)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

B的控制台输出:

-------------------------------------------------------------------
GMS: address=B, cluster=ctr-cluster, physical address=2001:0:9d38:6abd:cbb:1f78:3f57:50f6:50101
-------------------------------------------------------------------

[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit
first-applied=0, last-applied=0, commit-index=0, log size=0b: 
[14:16:01.300] Apply: cmd=[CREATE]
1
counter: 2

[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit
first-applied=0, last-applied=2, commit-index=1, log size=2b: 
[14:16:07.299] Apply: cmd=[INCREMENT_AND_GET]
[14:16:14.304] Apply: cmd=[DECREMENT_AND_GET]
roleChanged to: Candidate
roleChanged to: Leader
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

C的控制台输出:

-------------------------------------------------------------------
GMS: address=C, cluster=ctr-cluster, physical address=2001:0:9d38:6abd:cbb:1f78:3f57:50f6:55800
-------------------------------------------------------------------

[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit
first-applied=0, last-applied=0, commit-index=0, log size=0b: 
[14:16:01.300] Apply: cmd=[CREATE]
[14:16:07.299] Apply: cmd=[INCREMENT_AND_GET]
2
counter: 3

[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit
first-applied=0, last-applied=3, commit-index=2, log size=3b: 
[14:16:14.304] Apply: cmd=[DECREMENT_AND_GET]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3.3.2 服务恢复

在只有B和C的集群中,我们执行了一次“加一”。当我们重新启动A服务时,它会自动执行这条日志,保持与B和C的一致。从日志的index能够看出,69是一个Term,也就是A为Leader时的“任期”,而70也就是B为Leader时。

A的控制台输出:

-------------------------------------------------------------------
GMS: address=A, cluster=ctr-cluster, physical address=2001:0:9d38:6abd:cbb:1f78:3f57:50f6:53237
-------------------------------------------------------------------

[0] Create [1] Increment [2] Decrement [3] Dump log [4] Snapshot [x] Exit
first-applied=0, last-applied=3, commit-index=3, log size=3b: 
[14:18:45.275] Apply: cmd=[INCREMENT_AND_GET]
[14:18:45.277] Apply: cmd=[GET]
3

index (term): command
---------------------
1 (69): CREATE(counter, 1)
2 (69): INCREMENT_AND_GET(counter)
3 (69): DECREMENT_AND_GET(counter)
4 (70): INCREMENT_AND_GET(counter)
5 (70): GET(counter)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
 
http://blog.csdn.net/dc_726/article/details/48832405
分享到:
评论

相关推荐

    行业分类-设备装置-基于分布式一致性协议实现的数据读写方法及装置.zip

    "基于分布式一致性协议实现的数据读写方法及装置.pdf"这个文件很可能详细阐述了如何将这些理论应用于具体的设备装置设计中,可能包括协议的具体实现步骤、性能优化策略以及在不同场景下的应用实例。为了深入理解这个...

    raftjava:分布式一致性算法raft的java实现

    《分布式一致性算法Raft在Java中的实现》 在分布式系统领域,一致性算法是核心问题之一,它确保了网络中各个节点的数据同步与一致性。Raft算法作为近年来提出的简单且易于理解的一致性算法,已经在业界得到了广泛...

    分布式系统原理与泛型课件

    此外,分布式系统还需要解决一致性、复制、调度、容错等问题,例如Paxos、Raft等一致性算法,以及CAP理论和BASE原则。 分布式计算模型如MapReduce和Spark则为大数据处理提供了高效的方法。MapReduce将大规模数据...

    分布式实例

    通常采用如Paxos、Raft等一致性算法来保证多个节点间的数据同步。 3. 故障检测与恢复:系统会持续监控各个实例的状态,一旦发现故障,可以自动将请求重定向至其他健康实例,同时可能启动新的实例以替换故障节点。 ...

    分布式计算讲义(MSRA)

    3. 分布式一致性:讲解分布式一致性模型,如强一致性、弱一致性、最终一致性等,以及实现这些一致性的算法,如Paxos、Raft等共识算法。 4. 分布式协调与调度:涉及Zookeeper、Chubby等分布式协调服务,以及任务调度...

    电子科技大学分布式系统2020年期末试卷回忆版

    3. **分布式一致性算法**:如Paxos、Raft和Gossip协议等,它们用于保证分布式环境中的数据一致性。 4. **负载均衡**:如何有效地分配任务以优化系统性能,避免热点问题。 5. **容错机制**:比如备份、故障恢复、心跳...

    分布式算法导论(原书第2版)

    4. **Raft算法**:作为Paxos的一种简化实现,Raft算法更容易理解和实现,同样可以解决分布式一致性问题。它通过选举领导者来管理分布式日志,确保了集群中所有节点的状态同步。 5. **分布式锁**:在分布式系统中,...

    分布式数据库弹性容灾策略.pptx

    - **一致性协议**:如Paxos或Raft等分布式一致性协议,保证数据复制的可靠性和有序性。 3. **故障检测与恢复:** - **心跳机制与健康检查**:通过心跳机制和健康检查实时监控节点状态。 - **自动故障转移**:当...

    算法设计与分析——分布式算法

    2. 分布式一致性:研究如何在分布式环境中保证数据的一致性,如Raft、Paxos等共识算法。这些算法在分布式数据库、分布式锁服务等场景中至关重要。 3. 分布式计算模型:了解各种分布式计算模型,如MapReduce模型,...

    分布式系统课件 南京大学

    4. **分布式算法**:如CAP定理(Consistency、Availability、Partition Tolerance)和BASE理论(Basically Available、Soft state、Eventually consistent),以及分布式一致性算法如Paxos、Raft等,这些都是构建...

    Raft是一种共识算法,java 程序例子

    Raft是一种易于理解且可证明安全的分布式一致性算法,它由Ousterhout等人在2013年提出,作为Paxos算法的替代方案。本篇文章将深入探讨Raft算法的核心原理,并通过一个Java程序实例来展示其实际应用。 首先,让我们...

    分布式java应用完整版

    5. 分布式一致性与事务:探讨CAP理论、Paxos算法、Raft协议,以及2PC、TCC等分布式事务解决方案。 6. 分布式缓存:讲述Redis、Memcached等缓存系统在分布式系统中的应用。 7. 分布式消息队列:讲解消息中间件如...

    分布式操作系统的课件

    3. 分布式一致性:探讨分布式一致性模型,如Paxos、Raft和Gossip协议,理解它们在确保数据一致性中的作用。 4. 分布式资源管理:讲解如何在分布式环境中管理计算和存储资源,包括调度策略、负载均衡和资源分配。 5...

    分布式系统设计 分布式系统设计

    5. **分布式一致性算法**:包括Paxos、Raft、ZAB等,它们用于保证分布式环境中节点间的数据一致性。例如,Paxos在解决多数派问题上非常有效,而Raft则更易于理解和实现。 6. **负载均衡**:通过将工作负载分配到多...

    深入浅出分布式技术原理 教程 下载 下载4.zip

    6. 分布式一致性算法:例如Paxos、Raft和Gossip协议,它们确保在分布式环境中数据的一致性和可靠性。这些算法对于分布式系统中的状态同步和决策制定至关重要。 7. 分布式服务治理:包括服务注册、发现、路由、熔断...

    吉林大学精品课件——分布式计算系统

    5. **分布式算法**:如选举算法(如Paxos、Raft)、分布式一致性算法(如Gossip协议、Chubby锁服务)等,都是分布式计算中的核心算法。 6. **容错与恢复**:通过备份、冗余和故障检测机制,分布式系统能应对硬件...

    [分布式算法导论(原书第2版)].(荷)Gerard Tel_分布式算法计算机网络_

    2. 分布式一致性:这是分布式算法中的核心问题,包括Paxos、Raft等一致性算法,它们保证了多个节点间数据的一致性和可靠性。 3. 分布式共识:如何在存在网络故障的情况下达成共识,例如选举主节点或决定全局状态,...

    分布式架构面试题汇总.zip

    4. **分布式一致性模型**:如Paxos、Raft等算法,用于在分布式环境中确保数据的一致性。Paxos较为复杂,而Raft相对简单,更适合实际应用。 5. **负载均衡**:在分布式系统中,通过负载均衡技术可以将工作负载分布到...

    分布式系统概念与设计 原书第5版

    2. 分布式计算模型:探索CAP理论,理解一致性模型(如强一致性和最终一致性),并研究Paxos、Raft等分布式一致性算法。 3. 分布式数据存储:讨论分布式数据库、键值存储、列族数据库和文档数据库等不同类型的存储...

Global site tag (gtag.js) - Google Analytics