`
jackyhongvip
  • 浏览: 159705 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Cassandra 源码解析 4: GMS 集群管理 .

 
阅读更多

集群管理要做哪些事情:

  • 节点的添加。通知大家,I join the group. 引起部分hash空间的重新分布,需要做数据传输(bootstrap);什么时候,新的节点开始响应request?所有group memeber视图一致时。部分节点更新了member视图,部分节点没有更新,如果这时读写数据会有什么结果?
  • 节点的删除(宕机)。原则上数据会有N个备份,一台宕机,则会要找寻下一台存放备份
  • 节点重启. 不能因为重启而导致rebalancing of the partition
  • 节点之间的heartbeat:检测节点的状态
  • 节点之间数据的一致性:拥有相同数据备份的节点,怎样保证数据的最终一致性
  • 节点视图的一致性:怎样保证节点拥有相同的member视图,比如member join 或者 leave,怎样用最少的网络代价来通知到所有节点.

原理

首先要解决的是membership的维护,没有准确的membership视图,其他一起都是扯淡

假如我们着手解决这个问题,简单的方法是

  • 每个node起来时,发送一个multicast组播或者一个UDP广播. 已有node接收并更新自己的member视图。
  • 已有node中address最小的一个(master node)发送member list给新的节点
  • 每个节点对所有其他节点维持heartbeat,如果有节点死亡,则从list移走

如果网络数目有几百个,每个节点都需要维持几百个heartbeat连接,定时发送几百个消息。每次heartbeat的消息数为o(n^2),网络开销巨大。

Gossip Protocols

cassandra中使用的gossip protocol

A gossip-based protocol propagates membership changes and maintains an eventually consistent view of membership. Each node contacts a peer chosen at random every second and two nodes efficiently reconcile their persisted membership change histories.

illinois大学的一名学生在Adavnced Operation Systems中做了一个PPT

淘宝网的开发人员若海的介绍: Gossip简介

源头是一篇引用非常高的论文:Epidemic algorithms for replicated database maintenance

论文PPT介绍,同样来自illinois大学:Epidemics

理论支撑:Epidemic model

Single infected site eventually infects entire population of susceptible sites
In database replication, infected site is the one with the latest update, susceptible sites are those needing the update

Gossip的一个要点就是,每个node只需要定时和集群中某个node(每次随机)同步一次member视图,就能保证集群中所有节点的member视图一致. 按照Epidemic(伊波拉)理论,有一个节点被感染(新节点同该节点交互同步),最终所有的节点都会被感染.

Anti-Entropy

在谈论Gossip时,另外一个重要名词就是Anti-Entropy,在dbthink翻译的一篇cassandra文章中,anti-entropy和gossip分别讨论(cassandra的gossipanti-entropy实现如此)。事实上论文介绍中,anti-entropy是gossip的一种实现形式.
名词解析: 在信息论中,熵是衡量信息量多少的量化指标,或者说是对某个随机变量的不确定性的衡量,变量的不确定性越大,熵也就越大,把它搞清楚所需要的信息量也就越大(百度百科). 我理解的逆熵就是将不确定性变为确定性的过程。
every site regularly choose another site at random and by exchaning database contetns with it resolves any differences between the two
本文中谈及anti-entropy,将其对应为编程中一个的方法(过程)。其伪代码如下
forsome (site s in Sites) //注意这里是some不是all,不用和所有的节点都进行同步,这是gossip核心所在
resolveDifferences(localDB, s);
根据resolveDifferences的不同,anti-entropy对应如下三种形式

pull

if (localItem.timeStamp < i.timeStamp)
localItem.value = i.value; //pull,将更新从其他节点拉倒本地 - 更新本地

push

if (localItem.timeStamp > i.timeStamp)
i.value = localItem.value; //push
,将本地更新推送到其他节点-更新远端

pull-push

if (localItem.timeStamp < i.timeStamp)
localItem.value = i.value; //pull
else if (localItem.timeStamp > i.timeStamp)
i.value = localItem.value; //push

表面上看,pull和push最终都能将更新从一个节点传播至所有节点,但二者从概率分析上传播速度有所不同,pull比push收敛更快(即更新更快的达到所有节点,详细分析见Epidemic algorithms论文)。假如是log(n)回合,则一个更新传播出去所需要的通信次数是log(n) * n,每个回合每个节点都通信一次。

Cassandra 实现

EndPointState

节点的状态信息EndPointState,每个(已知)节点一个EndPointState,保存在Gossiper.endPointStateMap_中

 

  1. EndPointState
  2. - updateTimestamp
  3. - lisAlive
  4. - isAGossiper
  5. - hasToken
  6. - HeartBeatState
  7. - generation
  8. - version
  9. - ApplicatonState
  10. - version MOVE_STATE
  11. NORMA,Token(Serial) //initServer
  12. BOOT,Token(Serial)//startBootstrap
  13. NORMAL,Token(Serial) //finishBootStrapping
  14. LEAVING,Token //startLeaving
  15. LEFT,left,Token //leaveRing
  16. LEFT,remove,Token //removeToken
  17. - version LOAD-INFORMATION
  18. diskUsage

 

很显然每个字段都有其用处,细节繁杂,不作深究,这里仅仅说明其中的几个字段

generation: 系统启动时,赋为当前时间(in seconds)(StorageService.initServer -> Gossiper.start)

version: 每次应用状态变化时,增1;每次heartbeat消息时,增1

ApplicationState 目前包含两个,一个是系统当下是处在normal还是boot当前(如果系统处在boot当中,不响应读写请求,可以参看TokenMetadata对所有Token的维护和使用;其他的left, leaving状态一般不会使用,比如强制某个节点退出ring时调用),另外一个是系统当下的负载信息(磁盘占用大小),load-balancer负载平衡用(另作分析)。同样,每个State有一个version。

 

generation + version(max) 构成同一节点两个状态的排序依据,version是heartbeat,MOVE_STATE和LOAD-INFORMATION中较大的一个version 。这两个字段可生成一个GossipDigest对象。在节点之间状态同步时,并不是将所有状态信息全部发送给对方比较,而是将每个EndPointState变为GossipDigest,节省传输数据量(?)。

member status syns 1 :(A -> B GossipDigestSynMessage)

如前介绍的Anti-Entropy,随机挑选一个(或者的)节点,将自身所有的GossipDigest(每个已知节点对应一个GossipDigest)发给对方。另外还根据一定的概率,随机挑选一个unreachable的endpoint,向其发送同步信息(如果响应,则live again)。最后,也随机挑选一个种子节点同步一下。GossipDigest被封装在GossipDigestSynMessage

 

  1. //Gossiper.GossipTimerTask
  2. Message message = makeGossipDigestSynMessage(gDigests);
  3. /* Gossip to some random live member */
  4. boolean gossipedToSeed = doGossipToLiveMember(message);
  5. doGossipToUnreachableMember(message);
  6. if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size())
  7. doGossipToSeed(message);
  8. doStatusCheck();

 

member status syns 2 : (B -> A GossipDigestAckMessage)

对方节点收到sync消息后,会在GossipDigestSynVerbHandler中处理。resolveDifferences,根据GossipDigest找出哪些是要push(本地比remote新),哪些是要pull(remote比本地新),将push的EndPointState和pull的GossipDigest通过GossipDigestAckMessage消息回发给发起端.这里有一个trick是引入heartbeat的version(why)

If the max remote version is greater then we request the remote endpoint send us all the data for this endpoint with version greater than the max version number we have locally for this endpoint. If the max remote version is lesser, then we send all the data we have locally for this endpoint with version greater than the max remote version.

假如本地包含一个endpoint的version为(1,2,10), 10为heartbeat的version,在不停的增加. 另外一个remote point包含此endpoint的version为(1,2,20)。这时要求remote endpoint 发送version > 10的State,remote endpoint仅仅发送HeartbeatState,因为仅有heartbeat的version > 10。这样本地将endpoint的version更新为(1,2,20)

 

  1. //Gossiper.examineGossiper
  2. synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndPointState> deltaEpStateMap)
  3. {
  4. for ( GossipDigest gDigest : gDigestList )
  5. {
  6. int remoteGeneration = gDigest.getGeneration();
  7. int maxRemoteVersion = gDigest.getMaxVersion();
  8. /* Get state associated with the end point in digest */
  9. EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint());
  10. /*
  11. Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
  12. then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
  13. request all the data for this endpoint.
  14. */
  15. if ( epStatePtr != null )
  16. {
  17. int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
  18. /* get the max version of all keys in the state associated with this endpoint */
  19. int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr);
  20. if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
  21. continue;
  22. if ( remoteGeneration > localGeneration )
  23. {
  24. /* we request everything from the gossiper */
  25. requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
  26. }
  27. if ( remoteGeneration < localGeneration )
  28. {
  29. /* send all data with generation = localgeneration and version > 0 */
  30. sendAll(gDigest, deltaEpStateMap, 0);
  31. }
  32. if ( remoteGeneration == localGeneration )
  33. {
  34. /*
  35. If the max remote version is greater then we request the remote endpoint send us all the data
  36. for this endpoint with version greater than the max version number we have locally for this
  37. endpoint.
  38. If the max remote version is lesser, then we send all the data we have locally for this endpoint
  39. with version greater than the max remote version.
  40. */
  41. if ( maxRemoteVersion > maxLocalVersion )
  42. {
  43. deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) );
  44. }
  45. if ( maxRemoteVersion < maxLocalVersion )
  46. {
  47. /* send all data with generation = localgeneration and version > maxRemoteVersion */
  48. sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
  49. }
  50. }
  51. }
  52. else
  53. {
  54. /* We are here since we have no data for this endpoint locally so request everything. */
  55. requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
  56. }
  57. }
  58. }

 

member status syns 3 : (A -> B GossipDigestAck2Message)

发起方收到ack消息后,会在GossipDigestAckVerbHandler中处理。将push过来的状态更新到本地applyStateLocally,同时将要pull的状态封装在GossipDigestAck2Message中发给对方

member status syns 4

对方收到ack2消息后,会在GossipDigestAck2VerbHandler中处理,将pull过来的状态更新到本地applyStateLocally.

applyStateLocally将HeartBeatState和ApplicationState更新至本地,同时通知IEndPointStateChangeSubscriber.onChange(StorageService和StorageLoadBalancer,前者更新node的token,或者更新node的load)

 

  1. //Gossiper.applyApplicationStateLocally
  2. markAlive(ep, localEpStatePtr); //live
  3. applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState);/* apply ApplicationState */
  4. applyApplicationStateLocally(ep, localEpStatePtr, remoteState);
  5. handleNewJoin(ep, remoteState);

 

FailureDetector

其基本原理是如果now - last_heart_time 远远大于(根据代码中的公式,为18倍左右)以往两次heartbeat之间的时间间隔的平均值,则宣布该节点dead,通知IFailureDetectionEventListener(Gossiper.convict)

 

  1. //FailureDetector.ArrivalWindow
  2. synchronized void add(double value)
  3. {
  4. double interArrivalTime;
  5. if ( tLast_ > 0L )
  6. {
  7. interArrivalTime = (value - tLast_);
  8. }
  9. else
  10. {
  11. interArrivalTime = Gossiper.intervalInMillis_ / 2;
  12. }
  13. tLast_ = value;
  14. arrivalIntervals_.add(interArrivalTime);
  15. }
  16. double p(double t)
  17. {
  18. double mean = mean();
  19. double exponent = (-1)*(t)/mean;
  20. return 1 - ( 1 - Math.pow(Math.E, exponent) );
  21. }
  22. double phi(long tnow)
  23. {
  24. int size = arrivalIntervals_.size();
  25. double log = 0d;
  26. if ( size > 0 )
  27. {
  28. double t = tnow - tLast_;
  29. double probability = p(t);
  30. log = (-1) * Math.log10( probability );
  31. }
  32. return log;
  33. }
  34. //FailureDetector.interpret
  35. if ( phi > phiConvictThreshold_ )
  36. {
  37. for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
  38. {
  39. listener.convict(ep);
  40. }
  41. }

 

节点的启动/restart/宕机,member视图更新过程

启动时,和种子节点sync,种子节点将member视图push给新节点,同时将新节点的信息pull到本地。在下次gossip时候,种子节点和新节点都有可能将新节点的信息广播出去.

restart...

宕机...

分享到:
评论

相关推荐

    数据源管理 分布式NoSQL系统,Cassandra集群管理.docx

    编辑`/opt/cassandra3.11/conf/cassandra.yaml`文件,完成以下配置: - **集群名称**:`cluster_name: 'CasCluster'` - **数据目录**:`data_file_directories: - /data/cassandra/data` - **日志目录**:`commit...

    cassandra源码分析

    1. Gossip模块:查看`org.apache.cassandra.gms`包下的类,如`Gossiper`,了解节点间状态交换的实现。 2. 数据存储:深入`org.apache.cassandra.db`包,研究`Memtable`、`SSTableWriter`和`ColumnFamilyStore`等类...

    分布式存储系统:Cassandra:Cassandra的集群管理与运维实践.docx

    分布式存储系统:Cassandra:Cassandra的集群管理与运维实践.docx

    cassandra集群配置

    展开 Cassandra 发行包,并进入 conf 目录,修改 cassandra.yaml 文件,该文件为主要配置文件。在 cassandra.yaml 文件中,需要设置数据文件目录、提交日志目录、保存缓存目录等。 * 数据文件目录(data_file_...

    Cassandra(apache-cassandra-3.11.11-bin.tar.gz)

    Cassandra(apache-cassandra-3.11.11-bin.tar.gz)是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身...

    sclo-cassandra3-antlr3-C-3.4-3.5.2.14.el7.x86_64.rpm

    官方离线安装包,亲测可用

    Cassandra集群应用.docx

    ### Cassandra集群应用知识点详解 #### 一、Cassandra简介 ##### 1、基础描述 Cassandra是一款开源的分布式NoSQL数据库系统,最初由Facebook开发,主要用于处理简单的数据格式,如邮箱信息等。随着时间的发展,因...

    分布式存储系统:Cassandra:Cassandra的高级特性:二级索引与轻量级事务.docx

    分布式存储系统:Cassandra:Cassandra的高级特性:二级索引与轻量级事务.docx

    数据库查询语言:Cassandra Query Language.zip

    史上最全编程语言全套教程,共99门编程语言,包括: 函数式编程语言 壳编程语言 常见编程语言 并行编程语言 数据分析编程语言 数据库查询语言 系统编程语言 脚本编程语言 逻辑编程语言 面向对象编程语言 ...

    SpringBoot-Kafka-Cassandra-master.zip

    这个Spring Boot应用程序集成了Kafka和Cassandra。 a) 您可以通过post请求将json数据传递给API,将数据插入Cassandra。它处理这些消息并插入到Cassandra DB中。 Spring Boot版本:1.4.2 JDK版本:1.8 Cassandra/...

    apache-cassandra-0.6.1-bin.tar.gz

    apache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gz

    cassandra-3.11.3下载

    在3.11.3版本中,Cassandra增强了数据复制的策略,能够更好地管理数据在集群中的分布,确保数据一致性的同时提高读写性能。 其次,Cassandra支持列式存储,这与传统的行式存储有所不同。列族数据库在处理大量稀疏...

    Python库 | aws-cdk.aws-cassandra-1.104.0.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:aws-cdk.aws-cassandra-1.104.0.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    apache-cassandra-3.11.14-bin.tar.gz

    为了部署和使用这个压缩包,你需要解压后配置 `conf/cassandra.yaml` 文件,设置数据中心、节点IP、端口等信息,然后启动 `bin/cassandra` 命令。你可以通过 `cqlsh` 工具来交互式地操作数据库,或者使用各种客户端...

    藏经阁-Cassandra总体介绍.pdf

    Cassandra是一款开源的、分布式的NoSQL数据库管理系统,由Facebook开发,后来捐赠给Apache基金会。Cassandra的设计目标是处理大量数据,提供高可用性和高扩展性,同时也具有良好的性能。 Cassandra的特点: 1. ...

    netty-all-4.1.29.Final-sources.jar 最新版netty源码

    它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。 Netty is a NIO ...

    SpringBoot 源码解析——源码模块功能分析.docx

    SpringBoot 源码解析——源码模块功能分析 SpringBoot 源码解析是指对 SpringBoot 框架的源码进行深入分析和研究,以了解框架的内部机制和工作原理。源码模块是 SpringBoot 框架的核心组成部分,它们之间的交互和...

    apache-cassandra-2.2.14-bin.tar.gz

    然后,你需要根据你的环境修改 `conf/cassandra.yaml` 配置文件。这可能包括设置数据存储路径、定义种子节点、设置网络端口等。 4. **初始化**:在首次启动前,你需要创建键空间(keyspace)和表(table),这可以...

Global site tag (gtag.js) - Google Analytics