`
qiemengdao
  • 浏览: 277034 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

Cassandra启动过程详解【原创】

阅读更多

Cassandra启动过程详解

这里的分析从CassandraDaemon.java文件开始。

一、配置文件storage-config.xml的读取和log4j的配置文件log4j.property的设置。

配置文件的读取和解析都是在 org.apache.cassandra.config.DatabaseDescriptor 类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给 DatabaseDescriptor 的私有静态常量。值得注意的是关于 Keyspace 的解析,按照 ColumnFamily 的配置信息构建成 org.apache.cassandra.config.CFMetaData 对象,最后把这些所有 ColumnFamily 放入 Keyspace 的 HashMap 对象 org.apache.cassandra.config.KSMetaData 中,每个 Keyspace 就是一个 Table。这些信息都是作为基本的元信息,可以通过 DatabaseDescriptor 类直接获取。

二、Keyspace的初始化。


这里主要调用Table.open(tableName)方法创建每个Table的实例。创建 Table 的实例将完成:1)获取该 Table 的元信息 TableMatedate。2)创建改 Table 下每个 ColumnFamily 的存储操作对象 ColumnFamilyStore。3)启动定时程序,检查该 ColumnFamily 的 Memtable 设置的 MemtableFlushAfterMinutes 是否已经过期,过期立即写到磁盘。详细过程可参见我前面关于该方法的详细代码跟踪分析。

一个 Keyspace 对应一个 Table,一个 Table 持有多个 ColumnFamilyStore,而一个 ColumnFamily 对应一个 ColumnFamilyStore。Table 并没有直接持有 ColumnFamily 的引用而是持有 ColumnFamilyStore,这是因为 ColumnFamilyStore 类中不仅定义了对 ColumnFamily 的各种操作而且它还持有 ColumnFamily 在各种状态下数据对象的引用,所以持有了 ColumnFamilyStore 就可以操作任何与 ColumnFamily 相关的数据了。

三、Commitlog日志文件的恢复。

这里调用CmmitLog.recover()方法主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog 日志文件的恢复策略是,在头文件中发现没有被序列化的最新的ColumnFamily Id,然后取出这个这个被序列化 RowMutation 对象的起始地址,反序列化成为 RowMutation 对象,后面的操作和新添一条数据的流程是一样的,如果这个 RowMutation 对象中的数据被成功写到磁盘中,那么会在 CommitLog 去掉已经被持久化的 ColumnFamily Id。

四、检查数据文件是否需要压缩

调用CompactionManager.instance.checkAllColumnFamilies()检查CF对应的数据文件是否需要压缩。将相似大小的SStable放到一个bucket中,然后调用submitMinorIfNeeded(cfs)。

五、启动存储服务

这是启动过程最重要的一步,需要启动很多服务。具体步骤有:

5.1)创建StorageMetadata

调用方法SystemTable.initMetadata()创建StorageMetadata。元数据只创建一次,如果元数据已经存在,则直接返回。StorageMetadata 将包含三个关键信息:本节点的 Token、当前 generation 以及 ClusterName。这三个信息被存在 StorageService 类的 属性metadata中(metadata是StorageMetadata类型的对象),以便后面随时调用。

Cassandra 判断如果是第一次启动,Cassandra 将会创建三列分别存储这些信息并将它们存在在系统表的 LocationInfo ColumnFamily 中,key 是“L”。这里的 Token 判断用户是否指定,如果指定了则使用用户指定的,否则随机生成一个 Token,但是这个 Token 有可能在后面被修改;generation=System.currentTimeMillis() / 1000,ClusterName为读取配置文件得到的值。

如果不是第一次启动将会更新这三个值:读取数据文件中的Token信息,generation信息以及ClusterName信息后设置Token值和ClusterName的值,更新generation的值为max(当前时间秒数,old_generation+1)。这里有点要注意的是,如果在后续的过程中更改了配置文件中ClusterName的名字,这会跟数据文件中存储的信息不一致,最终会导致Cassandra无法启动。

5.2)创建所有目录

调用方法DatabaseDescriptor.createAllDirectories()创建所有的目录。包括数据文件目录data/以及日志文件目录commitlog/。同时还为keyspaces创建了数据文件目录的子目录data/system和data/keyspace,…(keyspace为用户定义的keyspace)。当然这个方法早在Table.open()已经调用过了,在这里再次调用可能是为了某些测试需要。

5.3)启动GCInspector.instance.start 服务

主要是统计统计当前系统中资源的使用情况(主要就是内存使用和回收情况),将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。

5.4)启动消息监听服务

这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra 会根据每个消息的类型,做出相应的反应。消息监听代码如下:

public void listen(InetAddress localEp) throws IOException {

ServerSocketChannel serverChannel = ServerSocketChannel.open();

final ServerSocket ss = serverChannel.socket();

ss.setReuseAddress(true);

ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));

socketThread = new SocketThread(ss, “ACCEPT-” + localEp);

socketThread.start();

}



这里用到了nio里面的异步IO与连网的部分。监听端口默认为7000。创建一个线程SocketThread用于监听消息。

每接收到一个消息,就创建一个新的线程new IncomingTcpConnection(socket).start()进行消息响应;该线程run方法中主要是对消息进行魔数的验证,以及读取消息头部和消息体等内容,然后将消息内容反序列化的任务MessageDeserializationTask 递交到相应的消息反序列化线程池messageDeserializerExecutor_中。

MessageDeserializationTask反序列化消息内容后调用MessagingService.receive()处理消息。

receive()方法中创建MessageDeliveryTask任务对象,根据消息类型得到相应的stage的线程池对象,如果没有对应的线程池,则使用messageDeserializerExecutor_。

stage线程池执行MessageDeliveryTask任务,该任务主要是根据消息中的Verb,调用相应的VerbHandler.doVerb()方法来完成消息的处理。比如GossipDigestAckVerbHandler.doVerb()用来处理Gossip阶段的ACK消息。

5.5)启动StorageLoadBalancer.instance.startBroadcasting 服务

调用 方法 loadTimer_.schedule(new LoadDisseminator(), 2 * Gossiper.intervalInMillis_, BROADCAST_INTERVAL),定时得到节点负载信息,2个Gossiper心跳后开始,间隔时间为60s。该任务得到节点数据总量(包括所有Data文件、FIlter文件以及Index文件),并将其更新到ApplicationState中,然后就可以通过这个 state 来和其它节点交换信息。这个 load 信息在数据的存储和新节点加入的时候,会有参考价值。

5.6)启动Gossiper服务

在启动Gossiper服务之前,将 StorageService 注册为观察者,一旦节点的某些状态发生变化,而这些状态是 StorageService 感兴趣的,StorageService 的 onChange 方法就会触发。Gossiper 服务就是一个定时程序,它会创建一个EndPointState对象。EndPointState对象持有HeartBeatState 的引用和ApplicationState的一个引用集Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>()。对于每个Application对象,EndPointState只保存一个最新的值,所以新值会覆盖旧值。

HeartBeatState 对象记录了当前心跳的 generation 和 version,这个 generation 和前面的 StorageMetadata 存储的 generation 是一致的,在节点每次启动的时候更新;而version 是从 0 开始的,每次更新加1;每个节点有一个HeartBeatState对象与之关联。

ApplicationState的一个引用集Map<String, ApplicationState> applicationState_则是记录一些状态信息,比如前面 startBroadcasting()过程中记录节点负载情况。ApplicationState对象包含state值和version值。比如表示节点负载的状态信息可能表示形式为(5.2, 45),意思就是在version为45的时候负载为5.2;相似地,节点启动的状态信息可能表示形式为(bLpassF3XD8Kyks, 56),前面的值表示启动的token,后面的56是version值。是需要注意的是创建ApplicationState对象时,version值加1。

还有一个结构需要注意,就是Gossiper中的Map<InetAddress, EndPointState> endPointStateMap__,它保存了它监听到的所有节点的EndPointState信息,包括它自己的。

Gossiper这个定时程序每隔一秒钟随机向定义的节点发送一个消息,而这个消息是保持集群中节点状态一致的唯一途径(具体过程后面详述)。

5.7)判断启动模式。

启动模式跟配置文件中的AutoBootstrap这一项相关。那这个配置项与 Token 和负载又有何关联性?其实表面上看起来这个配置项是当这个节点启动时是否自动加入集群。但是,当你设置成 False 时它是不是就不加入集群呢?显然不是,这还要看你有没有配置 seeds,如果你配置了其它 seed,那么它仍然会去加入集群。

那么到底有何区别,通过分析其启动代码发现,这个配置项不仅跟 seed 配置项有关而且和 Cassandra 是否是第一次启动也有关。Cassandra 的启动规则大慨如下:

1)当 AutoBootstrap 设为 FALSE,

第一次启动时 Cassandra 会设置系统表中 key为Bootstrap,CF为STATUS_CF的Column为B的值为TRUE,以表示这是由系统自动设置的,其实这个只是作为一个标志来判断你以后的启动情况。 标记启动方式,主要是防止用户再修改 AutoBootstrap 的启动模式。
调用TokenMetadata.updateToken()更新token。
加入ApplicationState对象到EPS的map<key, AS>中。key为MOVE,state为NORMAL:Token 。
设置模式为“Normal”。

 

2)当 AutoBootstrap 设为 TRUE,第一次启动,Cassandra 会判断当前节点配置在seeds 中,Cassandra 的启动情况和 1 是一样的。


3)当 AutoBootstrap 设为 TRUE,第一次启动,并且没有配置为 seed,Cassandra 将会有一个漫长的启动过程,当然这个时间的长短和你的当前的集群的数据量有很大的关系。

设置模式为“Joining: getting load information”。
等待90s(BROADCAST_INTERVAL+RING_DELAY)为了节点获得所有其他节点的负载信息。
如果tokenMetadata已经包含了本节点ip,则抛出异常。
设置模式为“Joining: getting bootstrap token”。
如果在配置文件中指定了InitialToken,则返回这个InitialToken。否则调用getBalancedToken(metadata, load)。
getBalancedToken()方法首先调用getBootstrapSource()方法得到负载最大的节点的ip地址,(如果没有任何节点的负载信息,则抛出运行时异常;这个排序过程是先以落入该节点token范围内的正处于bootstrap的节点数目排序,数目越多优先级越低。如果落入其中的bootstrap节点数目相同,再以负载大小排序)。然后向这个节点发送消息,获取其一半 key 范围所对应的 Token,这个 Token 是前半部分值(如果key的数目<3,则返回一个随机Token值)。

public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load) {

InetAddress maxEndpoint = getBootstrapSource(metadata, load);

Token<?> t = getBootstrapTokenFrom(maxEndpoint);

return t;

}



调用startBootstrap()方法。

privatevoid startBootstrap(Token token) throws IOException {
isBootstrapMode = true; //设置isBootstrapMode。

SystemTable.updateToken(token); //更新本节点的Token值。

//加入状态信息。(MOVE, appstate(BOOT:Token))

Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter

partitioner_.getTokenFactory().toString(token)));

//设置模式为等待range重新分布

setMode(“Joining: sleeping “ + RING_DELAY + ” for pending range setup”, true);

try {

Thread.sleep(RING_DELAY);

} catch (InterruptedException e) {

thrownew AssertionError(e);

}

//设置模式为正在启动

setMode(“Bootstrapping”, true);

//开始启动

new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles

}



调用 BootStrapper.startBootstrap()方法完成启动,发送STREAM_REQUEST消息,请求数据。

publicvoid startBootstrap() throws IOException {
for (String table : DatabaseDescriptor.getNonSystemTables()) {

Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table);

/* Send messages to respective folks to stream data over to me*/

for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet()) {

InetAddress source = entry.getKey();

StorageService.instance.addBootstrapSource(source, table);

StreamIn.requestRanges(source, table, entry.getValue());

}

}

}


  1. 首先调用getRangesWithSources(table)得到该节点负责的ranges所对应的节点集合。
  2. 对于上一步得到的节点集合,移除掉不存活的节点,然后将活着的节点加入到Multimap<InetAddress, Range> sources 集合中。通过循环对每一个活着的节点,将其加入到bootstrapSet中(即作为bootstrap source),然后调用StreamIn.requestRanges()请求该节点对应范围内的数据。
  3. requestRanges(ip, tableName, ranges)方法构建流请求消息StreamRequestMessage,然后调用MessagingService.instance.sendOneWay(message, source)发送消息。
  4. 流请求消息的序列化格式为

     5. 流数据需要经历STREAM_REQUEST, STREAM_INITIATE, STREAM_INITIATE_DONE, STREAM_COMPLETE, STREAM_FINISHED..等阶段。在最后会调用 finishBootstrapping()方法,其中设置启动标志,并在setToken()中设置系统表中token值,并调用updateToken()更新token环。最后加入状态信息<MOVE,NORMAL:Token>,并设置模式为Normal。

 

privatevoid finishBootstrapping() {
isBootstrapMode = false;

SystemTable.setBootstrapped(true);

setToken(getLocalToken());

Gossiper.instance.addLocalApplicationState(MOVE_STATE,

new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));

setMode(“Normal”, false);

}



参考资料:[url]
cassandra详解 http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu1/
[/url]

 

  • 大小: 10.1 KB
分享到:
评论
1 楼 zhoujianyong2011 2012-04-17  
192.168.5.6     datacenter1 rack1       Up     Normal  64.49 MB        33.33%  0                                         
192.168.5.11    datacenter1 rack1       Up     Normal  64.62 MB        33.33%  56713727820156410577229101238628035242    
192.168.5.7     datacenter1 rack1       Up     Normal  64.57 MB        33.33%  113427455640312821154458202477256070484
三台机器的状态已经这样了,求指点!
Space used (total): 67485766
Space used (live): 67485766
我用cfstats命令看cassandra状态成这样了,是不是代表没有空间了?
如何才能扩大空间呢?

相关推荐

    2019云栖大会Cassandra一致性详解-201909.pdf

    【Cassandra一致性详解】 在2019云栖大会上,郭泽晖(索月)对Cassandra的一致性进行了深入的解析。Cassandra是一个分布式NoSQL数据库系统,它遵循CAP定理,即在分布式系统中,无法同时保证一致性(Consistency)、...

    Cassandra 分布式数据库详解

    种子节点(seeds)配置会影响这一过程,种子节点用于引导新节点加入集群。 再来看看 `Keyspaces`,这是 Cassandra 中的一个关键概念,类似于传统数据库的表空间。Keyspace 是数据的逻辑分区,可以包含多个 Column...

    Cassandra技术详解 操作与测试报告

    ### Cassandra技术详解与操作测试报告 #### 一、Cassandra概述 Cassandra是一个开源的分布式数据库管理系统,由Facebook开发,并随后捐赠给Apache基金会。它的设计灵感来源于Amazon的Dynamo和Google的Bigtable,...

    Cassandra关键技术详解[整理].pdf

    Cassandra 关键技术详解 Cassandra 是一种 NoSQL 数据库,属于键值存储系统,广泛应用于社交网络、工业大数据等领域。下面是 Cassandra 的关键技术详解。 1. NoSQL 运动与 Cassandra 系统 NoSQL 运动始于 1998 年...

    cassandra详解

    Cassandra详解 Cassandra是一款高度可扩展、最终一致性的分布式结构化键值存储系统,由Facebook的Avinash Lakshman(Dynamo的贡献者)和Prashant Malik共同创建。最初用于支持Facebook的收件箱搜索功能,后于2008年...

    分布式数据库Cassandra 一致性详解.zip

    1.CAP定理理与Cassandra 1.1 Cassandra优势 2.Cassandra ⼀一致性实现 2.1 CAS 2.2 Quorum读写 2.3 不不⼀一致产⽣生原因 2.4 Hinted handoff 2.5 Read repair 2.6 Manual repair 3.Cassandra应⽤用场景 ...

    Cassandra详解(ppt)

    **Cassandra详解** Cassandra是一款分布式NoSQL数据库系统,由Facebook于2008年设计,后成为Apache软件基金会的顶级项目。它被设计用于处理大规模数据,具有高可用性、可扩展性和线性可扩展性的特点。在本PPT中,...

    docker-cassandra, 在 Docker的快速启动中,Cassandra.zip

    docker-cassandra, 在 Docker的快速启动中,Cassandra Docker 中的Cassandra这个库提供了在 Docker 中运行Cassandra所需的一切,并为快速的容器启动而调整。为什么?虽然天真的Cassandra图像大约需要 30秒的时间,...

    cassandra集群配置

    Cassandra 集群配置详解 本文将通过实例介绍 Cassandra 的简单配置,包括基本配置、集群配置、启动 Cassandra 和集群管理等方面的内容。 一、基本配置 在 Cassandra 中,需要准备 3 台或以上的计算机,并且每台...

    Windows下的Cassandra 安装图文教程

    本教程涵盖了 Cassandra 的基本介绍、安装步骤、配置过程等方面的内容。 Cassandra 简介 ----------- Cassandra 是一个混合型的非关系数据库,类似于 Google 的 BigTable。其主要功能比 Dynomite(分布式的 Key-...

    详解第二部分

    #### 二、Cassandra启动过程详解 Cassandra的启动过程主要分为以下几个阶段: 1. **配置文件解析**:启动时,首先读取并解析`storage-config.xml`配置文件,这一工作由`org.apache.cassandra.config....

    Cassandra在Windows上安装及使用方法

    启动过程中,Cassandra会显示一系列的日志信息,包括初始化、堆大小、加载配置文件等信息。 ### 高级配置与注意事项 - **性能优化**:根据服务器硬件配置调整Cassandra配置文件中的内存分配、磁盘I/O等参数,以...

    cassandra权威指南(中文)

    - **配置参数详解**:包括Cassandra的配置文件详解,如`cassandra.yaml`中的关键参数设置方法。 - **性能调优**:提供性能调优的最佳实践,包括硬件选型、JVM参数调整、系统监控等方面的内容。 #### 五、Cassandra...

    Cassandra1.2

    **Cassandra 1.2 知识点详解** Cassandra 是一个分布式、高度可扩展的NoSQL数据库系统,由Facebook最初开发,后成为Apache软件基金会的顶级项目。Cassandra 1.2 版本是在其早期版本的基础上进行了一系列优化和改进...

    java NoSql Cassandra hector

    Java NoSQL Cassandra Hector详解 在当今大数据时代,非关系型数据库(NoSQL)因其灵活性、高可扩展性和高性能,越来越受到开发者的青睐。Cassandra,作为NoSQL数据库家族中的重要一员,尤其在大规模分布式存储系统...

    Cassandra应用和改进

    接下来,360团队针对Cassandra应用过程中遇到的问题进行了梳理,并分享了改进的重心主要集中在数据可靠性、运维的便捷性以及成本的考量上。 在数据可靠性方面,360指出目前Cassandra在数据副本数量不足时无法保证...

    cassandra安装使用教程

    1、cassandra的安装、维护使用 2、java操作cassandra实例 3、cql使用详解

    Cassandra查询分析器

    **Cassandra 查询分析器详解** Cassandra 查询分析器是 Apache Cassandra 数据库系统中的核心组件之一,它在数据查询过程中起着至关重要的作用。Cassandra 是一个分布式、高性能、可扩展的NoSQL数据库,广泛用于...

Global site tag (gtag.js) - Google Analytics