- 浏览: 160848 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
西巴拉古呀那:
Linux多线程并发服务器编程(线程池,FTP服务器)网盘地址 ...
多线程服务器的适用场合 -
somefuture:
第四题怎麼用位图法。写了半天没出来,用了下面的,速度很快pri ...
位图法应用 -
寂寞秋江:
系统,全面,条理清晰,由浅入深,简直是集大成之作。 特别是最后 ...
单个进程最大线程数 -
darkjune:
有点意思, 不错
单个进程最大线程数 -
kidding87:
谢谢啦,收藏着
google使用一点敲门分享
集群管理要做哪些事情:
- 节点的添加。通知大家,I join the group. 引起部分hash空间的重新分布,需要做数据传输(bootstrap);什么时候,新的节点开始响应request?所有group memeber视图一致时。部分节点更新了member视图,部分节点没有更新,如果这时读写数据会有什么结果?
- 节点的删除(宕机)。原则上数据会有N个备份,一台宕机,则会要找寻下一台存放备份
- 节点重启. 不能因为重启而导致rebalancing of the partition
- 节点之间的heartbeat:检测节点的状态
- 节点之间数据的一致性:拥有相同数据备份的节点,怎样保证数据的最终一致性
- 节点视图的一致性:怎样保证节点拥有相同的member视图,比如member join 或者 leave,怎样用最少的网络代价来通知到所有节点.
原理
首先要解决的是membership的维护,没有准确的membership视图,其他一起都是扯淡
假如我们着手解决这个问题,简单的方法是
- 每个node起来时,发送一个multicast组播或者一个UDP广播. 已有node接收并更新自己的member视图。
- 已有node中address最小的一个(master node)发送member list给新的节点
- 每个节点对所有其他节点维持heartbeat,如果有节点死亡,则从list移走
如果网络数目有几百个,每个节点都需要维持几百个heartbeat连接,定时发送几百个消息。每次heartbeat的消息数为o(n^2),网络开销巨大。
Gossip Protocols
cassandra中使用的gossip protocol
A gossip-based protocol propagates membership changes and maintains an eventually consistent view of membership. Each node contacts a peer chosen at random every second and two nodes efficiently reconcile their persisted membership change histories.
illinois大学的一名学生在Adavnced Operation Systems中做了一个PPT
淘宝网的开发人员若海的介绍: Gossip简介
源头是一篇引用非常高的论文:Epidemic algorithms for replicated database maintenance
论文PPT介绍,同样来自illinois大学:Epidemics
理论支撑:Epidemic model
Single infected site eventually infects entire population of susceptible sites
In database replication, infected site is the one with the latest update, susceptible sites are those needing the update
Gossip的一个要点就是,每个node只需要定时和集群中某个node(每次随机)同步一次member视图,就能保证集群中所有节点的member视图一致. 按照Epidemic(伊波拉)理论,有一个节点被感染(新节点同该节点交互同步),最终所有的节点都会被感染.
Anti-Entropy
every site regularly choose another site at random and by exchaning database contetns with it resolves any differences between the two
resolveDifferences(localDB, s);
pull
if (localItem.timeStamp < i.timeStamp)
localItem.value = i.value; //pull,将更新从其他节点拉倒本地 - 更新本地
push
if (localItem.timeStamp > i.timeStamp)
i.value = localItem.value; //push,将本地更新推送到其他节点-更新远端
pull-push
if (localItem.timeStamp < i.timeStamp)
localItem.value = i.value; //pull
else if (localItem.timeStamp > i.timeStamp)
i.value = localItem.value; //push
表面上看,pull和push最终都能将更新从一个节点传播至所有节点,但二者从概率分析上传播速度有所不同,pull比push收敛更快(即更新更快的达到所有节点,详细分析见Epidemic algorithms论文)。假如是log(n)回合,则一个更新传播出去所需要的通信次数是log(n) * n,每个回合每个节点都通信一次。
Cassandra 实现
EndPointState
节点的状态信息EndPointState,每个(已知)节点一个EndPointState,保存在Gossiper.endPointStateMap_中
- EndPointState
- - updateTimestamp
- - lisAlive
- - isAGossiper
- - hasToken
- - HeartBeatState
- - generation
- - version
- - ApplicatonState
- - version MOVE_STATE
- NORMA,Token(Serial) //initServer
- BOOT,Token(Serial)//startBootstrap
- NORMAL,Token(Serial) //finishBootStrapping
- LEAVING,Token //startLeaving
- LEFT,left,Token //leaveRing
- LEFT,remove,Token //removeToken
- - version LOAD-INFORMATION
- diskUsage
很显然每个字段都有其用处,细节繁杂,不作深究,这里仅仅说明其中的几个字段
generation: 系统启动时,赋为当前时间(in seconds)(StorageService.initServer -> Gossiper.start)
version: 每次应用状态变化时,增1;每次heartbeat消息时,增1
ApplicationState 目前包含两个,一个是系统当下是处在normal还是boot当前(如果系统处在boot当中,不响应读写请求,可以参看TokenMetadata对所有Token的维护和使用;其他的left, leaving状态一般不会使用,比如强制某个节点退出ring时调用),另外一个是系统当下的负载信息(磁盘占用大小),load-balancer负载平衡用(另作分析)。同样,每个State有一个version。
generation + version(max) 构成同一节点两个状态的排序依据,version是heartbeat,MOVE_STATE和LOAD-INFORMATION中较大的一个version 。这两个字段可生成一个GossipDigest对象。在节点之间状态同步时,并不是将所有状态信息全部发送给对方比较,而是将每个EndPointState变为GossipDigest,节省传输数据量(?)。
member status syns 1 :(A -> B GossipDigestSynMessage)
如前介绍的Anti-Entropy,随机挑选一个(或者的)节点,将自身所有的GossipDigest(每个已知节点对应一个GossipDigest)发给对方。另外还根据一定的概率,随机挑选一个unreachable的endpoint,向其发送同步信息(如果响应,则live again)。最后,也随机挑选一个种子节点同步一下。GossipDigest被封装在GossipDigestSynMessage
- //Gossiper.GossipTimerTask
- Message message = makeGossipDigestSynMessage(gDigests);
- /* Gossip to some random live member */
- boolean gossipedToSeed = doGossipToLiveMember(message);
- doGossipToUnreachableMember(message);
- if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size())
- doGossipToSeed(message);
- doStatusCheck();
member status syns 2 : (B -> A GossipDigestAckMessage)
对方节点收到sync消息后,会在GossipDigestSynVerbHandler中处理。resolveDifferences,根据GossipDigest找出哪些是要push(本地比remote新),哪些是要pull(remote比本地新),将push的EndPointState和pull的GossipDigest通过GossipDigestAckMessage消息回发给发起端.这里有一个trick是引入heartbeat的version(why)
If the max remote version is greater then we request the remote endpoint send us all the data for this endpoint with version greater than the max version number we have locally for this endpoint. If the max remote version is lesser, then we send all the data we have locally for this endpoint with version greater than the max remote version.
假如本地包含一个endpoint的version为(1,2,10), 10为heartbeat的version,在不停的增加. 另外一个remote point包含此endpoint的version为(1,2,20)。这时要求remote endpoint 发送version > 10的State,remote endpoint仅仅发送HeartbeatState,因为仅有heartbeat的version > 10。这样本地将endpoint的version更新为(1,2,20)
- //Gossiper.examineGossiper
- synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndPointState> deltaEpStateMap)
- {
- for ( GossipDigest gDigest : gDigestList )
- {
- int remoteGeneration = gDigest.getGeneration();
- int maxRemoteVersion = gDigest.getMaxVersion();
- /* Get state associated with the end point in digest */
- EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint());
- /*
- Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
- then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
- request all the data for this endpoint.
- */
- if ( epStatePtr != null )
- {
- int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
- /* get the max version of all keys in the state associated with this endpoint */
- int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr);
- if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
- continue;
- if ( remoteGeneration > localGeneration )
- {
- /* we request everything from the gossiper */
- requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
- }
- if ( remoteGeneration < localGeneration )
- {
- /* send all data with generation = localgeneration and version > 0 */
- sendAll(gDigest, deltaEpStateMap, 0);
- }
- if ( remoteGeneration == localGeneration )
- {
- /*
- If the max remote version is greater then we request the remote endpoint send us all the data
- for this endpoint with version greater than the max version number we have locally for this
- endpoint.
- If the max remote version is lesser, then we send all the data we have locally for this endpoint
- with version greater than the max remote version.
- */
- if ( maxRemoteVersion > maxLocalVersion )
- {
- deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) );
- }
- if ( maxRemoteVersion < maxLocalVersion )
- {
- /* send all data with generation = localgeneration and version > maxRemoteVersion */
- sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
- }
- }
- }
- else
- {
- /* We are here since we have no data for this endpoint locally so request everything. */
- requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
- }
- }
- }
member status syns 3 : (A -> B GossipDigestAck2Message)
发起方收到ack消息后,会在GossipDigestAckVerbHandler中处理。将push过来的状态更新到本地applyStateLocally,同时将要pull的状态封装在GossipDigestAck2Message中发给对方
member status syns 4
对方收到ack2消息后,会在GossipDigestAck2VerbHandler中处理,将pull过来的状态更新到本地applyStateLocally.
applyStateLocally将HeartBeatState和ApplicationState更新至本地,同时通知IEndPointStateChangeSubscriber.onChange(StorageService和StorageLoadBalancer,前者更新node的token,或者更新node的load)
- //Gossiper.applyApplicationStateLocally
- markAlive(ep, localEpStatePtr); //live
- applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState);/* apply ApplicationState */
- applyApplicationStateLocally(ep, localEpStatePtr, remoteState);
- handleNewJoin(ep, remoteState);
FailureDetector
其基本原理是如果now - last_heart_time 远远大于(根据代码中的公式,为18倍左右)以往两次heartbeat之间的时间间隔的平均值,则宣布该节点dead,通知IFailureDetectionEventListener(Gossiper.convict)
- //FailureDetector.ArrivalWindow
- synchronized void add(double value)
- {
- double interArrivalTime;
- if ( tLast_ > 0L )
- {
- interArrivalTime = (value - tLast_);
- }
- else
- {
- interArrivalTime = Gossiper.intervalInMillis_ / 2;
- }
- tLast_ = value;
- arrivalIntervals_.add(interArrivalTime);
- }
- double p(double t)
- {
- double mean = mean();
- double exponent = (-1)*(t)/mean;
- return 1 - ( 1 - Math.pow(Math.E, exponent) );
- }
- double phi(long tnow)
- {
- int size = arrivalIntervals_.size();
- double log = 0d;
- if ( size > 0 )
- {
- double t = tnow - tLast_;
- double probability = p(t);
- log = (-1) * Math.log10( probability );
- }
- return log;
- }
- //FailureDetector.interpret
- if ( phi > phiConvictThreshold_ )
- {
- for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
- {
- listener.convict(ep);
- }
- }
节点的启动/restart/宕机,member视图更新过程
启动时,和种子节点sync,种子节点将member视图push给新节点,同时将新节点的信息pull到本地。在下次gossip时候,种子节点和新节点都有可能将新节点的信息广播出去.
restart...
宕机...
发表评论
-
java io
2014-03-12 09:35 664Java 的 I/O 类库的基本架构 I/O 问题是任何编 ... -
布隆过滤器
2014-03-11 11:14 813布隆过滤器 (Bloom Filter) ... -
cassandra 读过程
2012-06-06 16:00 0Cassandra内部机制 : 读操作 Cass ... -
cassandra 写过程
2012-06-06 15:47 0Cassandra内核介绍--写操作 Cassa ... -
Merkle Tree
2014-03-11 11:14 1010A Merkle tree is a has ... -
LVS中的负载均衡技术浅析
2012-10-08 21:47 1103通过NAT实现虚拟服务器 ... -
HDFS的数据通信机制
2012-03-05 14:58 0简单而言,HDFS分为了三 ... -
ZooKeeper典型使用场景一览
2012-02-22 17:28 974ZooKeeper是一个高可用 ... -
zookeeper代码解析
2012-02-22 17:25 1906摘自:http://rdc.taobao.com/team/j ... -
分布式一致性Paxos算法学习笔记(二):算法详解
2012-02-22 17:17 1176声明:Paxos算法学习笔 ... -
分布式一致性Paxos算法学习笔记(一):paxos大杂烩
2012-02-22 17:16 1134声明:Paxos算法学习笔记系列摘自:http://www ... -
分布式架构模型
2012-02-17 16:31 1626摘自 : http://www.kafka01 ...
相关推荐
编辑`/opt/cassandra3.11/conf/cassandra.yaml`文件,完成以下配置: - **集群名称**:`cluster_name: 'CasCluster'` - **数据目录**:`data_file_directories: - /data/cassandra/data` - **日志目录**:`commit...
1. Gossip模块:查看`org.apache.cassandra.gms`包下的类,如`Gossiper`,了解节点间状态交换的实现。 2. 数据存储:深入`org.apache.cassandra.db`包,研究`Memtable`、`SSTableWriter`和`ColumnFamilyStore`等类...
Redis 集群管理工具 Relumin(Redis 集群管理员)Relumin 是什么?Relumin 是一个 REdis cLUster adMIN 工具。主要特点是...可视化 Redis 集群状态操作集群添加节点、重新分片、删除节点、复制、故障转移...等以图表...
展开 Cassandra 发行包,并进入 conf 目录,修改 cassandra.yaml 文件,该文件为主要配置文件。在 cassandra.yaml 文件中,需要设置数据文件目录、提交日志目录、保存缓存目录等。 * 数据文件目录(data_file_...
Cassandra(apache-cassandra-3.11.11-bin.tar.gz)是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身...
官方离线安装包,亲测可用
### Cassandra集群应用知识点详解 #### 一、Cassandra简介 ##### 1、基础描述 Cassandra是一款开源的分布式NoSQL数据库系统,最初由Facebook开发,主要用于处理简单的数据格式,如邮箱信息等。随着时间的发展,因...
Cassandra的应用场景包括社交媒体、物流管理、实时分析、物联网等领域。 Twitter和Facebook等知名企业已经在使用Cassandra。 Cassandra的主要功能包括: 1.分布式基于column的结构化高伸展性 2.模式灵活性 3.真正...
史上最全编程语言全套教程,共99门编程语言,包括: 函数式编程语言 壳编程语言 常见编程语言 并行编程语言 数据分析编程语言 数据库查询语言 系统编程语言 脚本编程语言 逻辑编程语言 面向对象编程语言 ...
这个Spring Boot应用程序集成了Kafka和Cassandra。 a) 您可以通过post请求将json数据传递给API,将数据插入Cassandra。它处理这些消息并插入到Cassandra DB中。 Spring Boot版本:1.4.2 JDK版本:1.8 Cassandra/...
apache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gzapache-cassandra-0.6.1-bin.tar.gz
在3.11.3版本中,Cassandra增强了数据复制的策略,能够更好地管理数据在集群中的分布,确保数据一致性的同时提高读写性能。 其次,Cassandra支持列式存储,这与传统的行式存储有所不同。列族数据库在处理大量稀疏...
资源分类:Python库 所属语言:Python 资源全名:aws-cdk.aws-cassandra-1.104.0.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
为了部署和使用这个压缩包,你需要解压后配置 `conf/cassandra.yaml` 文件,设置数据中心、节点IP、端口等信息,然后启动 `bin/cassandra` 命令。你可以通过 `cqlsh` 工具来交互式地操作数据库,或者使用各种客户端...
Cassandra是一款开源的、分布式的NoSQL数据库管理系统,由Facebook开发,后来捐赠给Apache基金会。Cassandra的设计目标是处理大量数据,提供高可用性和高扩展性,同时也具有良好的性能。 Cassandra的特点: 1. ...
SpringBoot 源码解析——源码模块功能分析 SpringBoot 源码解析是指对 SpringBoot 框架的源码进行深入分析和研究,以了解框架的内部机制和工作原理。源码模块是 SpringBoot 框架的核心组成部分,它们之间的交互和...
然后,你需要根据你的环境修改 `conf/cassandra.yaml` 配置文件。这可能包括设置数据存储路径、定义种子节点、设置网络端口等。 4. **初始化**:在首次启动前,你需要创建键空间(keyspace)和表(table),这可以...
1.CAP定理理与Cassandra 1.1 Cassandra优势 2.Cassandra ⼀一致性实现 2.1 CAS 2.2 Quorum读写 2.3 不不⼀一致产⽣生原因 2.4 Hinted handoff 2.5 Read repair 2.6 Manual repair 3.Cassandra应⽤用场景 ...
4. **分区与复制**:Cassandra使用一致性哈希进行数据分区,将数据分布在集群的不同节点上。每个节点负责一部分的键空间,并且数据会被复制到多个节点以提高容错性。 5. **Gossip协议**:Cassandra使用Gossip协议来...