`
qiemengdao
  • 浏览: 274913 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

Cassandra中失效检测原理详解

阅读更多

 

Cassandra中失效检测原理

一、传统失效检测及其不足

传统失效检测方法

在分布式系统中经常使用心跳(Heartbeat)来检测Server的健康状况,但从理论上来说,心跳无法真正检测对方是否crash,主要困难在于无 法真正区别对方是宕机还是“慢”。传统的检测方法是设定一个超时时间T,只要在T之内没有接收到对方的心跳包便认为对方宕机,方法简单粗暴,但使用广泛。

传统错误检测存在的缺陷

如上所述,在传统方式下,目标主机会每间隔t秒发起心跳,而接收方采用超时时间T(t<T)来判断目标是否宕机,接收方首先要非常清楚目标的心跳规 律(周期为t的间隔)才能正确设定一个超时时间T,而T的选择依赖当前网络状况、目标主机的处理能力等很多不确定因素,因此在实际中往往会通过测试或估计 的方式为T赋一个上限值。上限值设置过大,会导致判断“迟缓”,但会增大判断的正确性;过小,会提高判断效率,但会增加误判的可能性。但下面几种场景不能 使用传统检测方法:

1. Gossip通信

但在实际应用中,比如基于Gossip通信应用中,因为随机通信,两个Server之间并不存在有规律的心跳,因此很难找到一个适合的超时时间T,除非把T设置的非常大,但这样检测过程就会“迟缓”的无法忍受。

2. 网络负载动态变化

还有一种情况是,随着网路负载的加大,Server心跳的接收时间可能会大于上限值T;但当网络压力减少时,心跳接收时间又会小于T,如果用一成不变的T来反映心跳状况,则会造成判断”迟缓“或误判。

3. 心跳检测与结果的分离

并不是每个应用都只需要知道一个目标主机宕机与否的结果(true/false),即有很多应用需要自己解释心跳结果从而采取不同的处理动 作。比如,如果目标主机3s内没有心跳,应用A解读为宕机并重试;而应用B则解读为目标”不活跃“,需要把任务委派到其他Server。也就是说,目标主机是否“宕机”应该由业务逻辑决定的,而不是简单的通过一个超时时间T决定,这就需要把心跳检测过程与对结果的解释相分离,从而为应用提供更好的灵活性。

Gossiper中采用的 Φ 失效检测方法

由失效检测的经典论文The Phi accrual failure detector http://vsedach.googlepages.com/HDY04.pdf)中的证明,分布式环境中,对主机的心跳统计,根据以往心跳间隔的经验值,可以由下面的方法判断主机是否宕机。

1. 给定一个阀值 Φ

2. 在一定时间内,记录各个心跳间隔时间

3. 对心跳的间隔值求指数分布(Exponential distribution)概率:

P = E ^ (-1 * (now - lastTimeStamp) / mean) E是对数2.71828...mean为此前的间隔时间平均值)

其表示,自上次统计以来,心跳到达时间将超过 now - lastTimeStamp 的概率

4. 计算 φ = - log10 P

5. φ > Φ 时,就可以认为主机已经宕机了。

当然这可能会存在误判,误判的可能性如下:

Φ = 1, 1%

Φ = 2, 0.1%

Φ = 3, 0.01%

......

由此可见,当Φ = 8时,误判率已经很小了。cassandra中默认采用Φ = 8

下面有一个关于Phi失效检测算法的java实现。Cassandra中实现与此类似。

/**

java demo for phi failure detector

*/

import java.util.ArrayDeque;

import java.util.Iterator;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

public class PhiAccrualFailureDetector {

private static final int sampleWindowSize = 1000;

private static int phiSuspectThreshold = 8;

private SamplingWindow simpleingWindow = new SamplingWindow(sampleWindowSize);

 

public PhiAccrualFailureDetector() {

}

 

public void addSample() {

simpleingWindow.add(System.currentTimeMillis());

}

 

public void addSample(double sample) {

simpleingWindow.add(sample);

}

 

public void interpret() {

double phi = simpleingWindow.phi(System.currentTimeMillis());

System.out.println("PHI = " + phi);

if (phi > phiSuspectThreshold) {

System.out.println("We are assuming the moniored machine is down!");

} else {

System.out.println("We are assuming the moniored machine is still running!");

}

}

 

/**

* @param args

* the command line arguments

*/

public static void main(String[] args) {

PhiAccrualFailureDetector pafd = new PhiAccrualFailureDetector();

// first try with phi < phiSuspectThreshold

for (int i = 0; i < 10; i++) {

pafd.addSample();

try {

Thread.sleep(10L);

} catch (InterruptedException ex) {

// no op

}

}

try {

Thread.sleep(500L);

} catch (InterruptedException ex) {

// no op

}

System.out.println(pafd.simpleingWindow.toString());

pafd.interpret();

// second try result phi > phiSuspectThreshold

for (int i = 0; i < 10; i++) {

pafd.addSample();

try {

Thread.sleep(10L);

} catch (InterruptedException ex) {

// no op

}

}

try {

Thread.sleep(1500L);

} catch (InterruptedException ex) {

// no op

}

System.out.println(pafd.simpleingWindow.toString());

pafd.interpret();

}

 

static class SamplingWindow {

private final Lock lock = new ReentrantLock();

private double lastTimeStamp = 0L;

private StatisticDeque arrivalIntervals;

 

SamplingWindow(int size) {

arrivalIntervals = new StatisticDeque(size);

}

 

void add(double value) {

lock.lock();

try {

double interval;

if (lastTimeStamp > 0L) {

interval = (value - lastTimeStamp);

} else {

interval = 1000 / 2;

}

lastTimeStamp = value;

arrivalIntervals.add(interval);

} finally {

lock.unlock();

}

}

 

double sum() {

lock.lock();

try {

return arrivalIntervals.sum();

} finally {

lock.unlock();

}

}

 

double sumOfDeviations() {

lock.lock();

try {

return arrivalIntervals.sumOfDeviations();

} finally {

lock.unlock();

}

}

 

double mean() {

lock.lock();

try {

return arrivalIntervals.mean();

} finally {

lock.unlock();

}

}

 

double variance() {

lock.lock();

try {

return arrivalIntervals.variance();

} finally {

lock.unlock();

}

}

 

double stdev() {

lock.lock();

try {

return arrivalIntervals.stdev();

} finally {

lock.unlock();

}

}

 

void clear() {

lock.lock();

try {

arrivalIntervals.clear();

} finally {

lock.unlock();

}

}

 

/**

*

* p = E ^ (-1 * (tnow - lastTimeStamp) / mean)

*/

double p(double t) {

double mean = mean();

double exponent = (-1) * (t) / mean;

return Math.pow(Math.E, exponent);

}

 

double phi(long tnow) {

int size = arrivalIntervals.size();

double log = 0d;

if (size > 0) {

double t = tnow - lastTimeStamp;

double probability = p(t);

log = (-1) * Math.log10(probability);

}

return log;

}

 

@Override

public String toString() {

StringBuilder s = new StringBuilder();

for (Iterator<Double> it = arrivalIntervals.iterator(); it.hasNext();) {

s.append(it.next()).append(" ");

}

return s.toString();

}

}

 

static class StatisticDeque implements Iterable<Double> {

private final int size;

protected final ArrayDeque<Double> queue;

 

public StatisticDeque(int size) {

this.size = size;

queue = new ArrayDeque<Double>(size);

}

 

public Iterator<Double> iterator() {

return queue.iterator();

}

 

public int size() {

return queue.size();

}

 

public void clear() {

queue.clear();

}

 

public void add(double o) {

if (size == queue.size()) {

queue.remove();

}

queue.add(o);

}

 

public double sum() {

double sum = 0D;

for (Double interval : this) {

sum += interval;

}

return sum;

}

 

public double sumOfDeviations() {

double sumOfDeviations = 0D;

double mean = mean();

for (Double interval : this) {

double d = interval - mean;

sumOfDeviations += d * d;

}

return sumOfDeviations;

}

 

public double mean() {

return sum() / size();

}

 

public double variance() {

return sumOfDeviations() / size();

}

 

public double stdev() {

return Math.sqrt(variance());

}

}

}

 

参考资料:《分布式系统实现》

分享到:
评论

相关推荐

    cassandra详解

    Cassandra详解 Cassandra是一款高度可扩展、最终一致性的分布式结构化键值存储系统,由Facebook的Avinash Lakshman(Dynamo的贡献者)和Prashant Malik共同创建。最初用于支持Facebook的收件箱搜索功能,后于2008年...

    Cassandra技术详解 操作与测试报告

    - **Keyspace**: Cassandra中的逻辑容器,用于组织Column Families。Keyspace中的数据可以分布在多个物理节点上,并且支持数据复制策略。 #### 四、Cassandra的关键功能 1. **分布式写入操作** - Cassandra能够...

    Cassandra DataStax原理及安装

    总而言之,Cassandra DataStax原理及安装的知识点涵盖了Cassandra的开源和DataStax的商业版DSE的介绍,它们的安装方法,以及如何使用OpsCenter、DevCenter等工具来管理和使用Cassandra集群。文档通过详细的步骤和...

    cassandra权威指南(中文)

    - **数据修复**:阐述Cassandra中数据修复的原理和操作步骤,以保证数据的完整性。 #### 六、Cassandra的查询语言CQL - **CQL简介**:介绍Cassandra查询语言(CQL)的基本语法,以及它与SQL的区别。 - **数据操作*...

    分布式数据库Cassandra 一致性详解.zip

    1.CAP定理理与Cassandra 1.1 Cassandra优势 2.Cassandra ⼀一致性实现 2.1 CAS 2.2 Quorum读写 2.3 不不⼀一致产⽣生原因 2.4 Hinted handoff 2.5 Read repair 2.6 Manual repair 3.Cassandra应⽤用场景 ...

    Cassandra 分布式数据库详解

    在深入了解 Cassandra 之前,理解其配置文件 `storage-config.xml` 中的参数至关重要,因为这些参数直接影响到系统的性能和稳定性。 首先,我们来看 `ClusterName`,这是一个集群的标识,代表整个 Cassandra 集群的...

    2019云栖大会Cassandra一致性详解-201909.pdf

    【Cassandra一致性详解】 在2019云栖大会上,郭泽晖(索月)对Cassandra的一致性进行了深入的解析。Cassandra是一个分布式NoSQL数据库系统,它遵循CAP定理,即在分布式系统中,无法同时保证一致性(Consistency)、...

    Cassandra关键技术详解[整理].pdf

    Cassandra 关键技术详解 Cassandra 是一种 NoSQL 数据库,属于键值存储系统,广泛应用于社交网络、工业大数据等领域。下面是 Cassandra 的关键技术详解。 1. NoSQL 运动与 Cassandra 系统 NoSQL 运动始于 1998 年...

    cassandra-operator,apache-cassandra的kubernetes算子.zip

    Cassandra-Operator是针对Apache Cassandra在Kubernetes集群中部署和管理的一个开源项目。它使得在Kubernetes环境中运行和扩展Cassandra数据库变得更加简单和自动化。在这个压缩包“cassandra-operator,apache-...

    Cassandra详解(ppt)

    **Cassandra详解** Cassandra是一款分布式NoSQL数据库系统,由Facebook于2008年设计,后成为Apache软件基金会的顶级项目。它被设计用于处理大规模数据,具有高可用性、可扩展性和线性可扩展性的特点。在本PPT中,...

    Cassandra1.2

    **Cassandra 1.2 知识点详解** Cassandra 是一个分布式、高度可扩展的NoSQL数据库系统,由Facebook最初开发,后成为Apache软件基金会的顶级项目。Cassandra 1.2 版本是在其早期版本的基础上进行了一系列优化和改进...

    Cassandra权威指南【中文版】

    Cassandra,作为NoSQL数据库家族中的重要成员,因其高可用性、可扩展性和出色的性能,在大数据处理领域得到了广泛应用。本书旨在帮助读者理解Cassandra的核心概念、架构设计以及实际操作技巧。 在Cassandra的世界里...

    Cassandra个人学习笔记总结

    新的机制降低了临时存储Hint data的节点负载,同时消除了失效检测中的时间窗口问题,从而提高了数据写入的成功率和效率。这使得全节点修复不再是处理写冲突的必要步骤,只有在网络异常或数据损坏的情况下才需执行,...

    Cassandra在饿了么的应用

    描述中重复多次提及"Cassandra",这表明主题将专注于Cassandra数据库,并探讨它的基本原理和在饿了么的实际应用。标签"Cassandra"强调了文章的重点是围绕这一特定的数据库技术。 从给出的部分内容来看,文章将覆盖...

    cassandra-3.11.3下载

    再者,Cassandra使用Gossip协议进行节点间的通信,这种协议允许节点间快速传播状态信息,从而实现故障检测和负载均衡。在3.11.3中,Gossip协议的性能和稳定性得到了提升,能够更快地发现并应对节点故障,保证服务的...

    Cassandra查询分析器

    **Cassandra 查询分析器详解** Cassandra 查询分析器是 Apache Cassandra 数据库系统中的核心组件之一,它在数据查询过程中起着至关重要的作用。Cassandra 是一个分布式、高性能、可扩展的NoSQL数据库,广泛用于...

    Cassandra实战.pdf

    ### 关键知识点一:Cassandra架构原理 Cassandra采用了一种独特的环状拓扑结构,所有节点都处于平等地位,形成一个环形网络。这种设计确保了数据的高可用性和故障恢复能力,即使部分节点失效,数据仍然可以从其他...

    java NoSql Cassandra hector

    3. 定义Keyspace和ColumnFamily:Cassandra中的数据存储在Keyspace(类似数据库)和ColumnFamily(类似表)中。Hector提供了相应的API来定义它们。 ```java KeyspaceDef keyspaceDef = HFactory....

    Learning_Apache_Cassandra

    6. 静态列的使用,这在Cassandra中是一种特殊的列,可以像预定义的联结那样工作。文档可能还包括了如何定义和使用静态列、静态列与数据交互的方式、静态插入操作,以及静态列在何时使用为宜。 7. 超越键值查找,...

    DevCenter cassandra客户端

    用户可以通过图形化界面定义键空间(keyspaces)、列族(column families,Cassandra中的表)以及它们的字段,支持主键和索引的配置。 2. **CQL编辑器**:DevCenter内置了一个Cassandra查询语言(CQL)的编辑器,...

Global site tag (gtag.js) - Google Analytics