`
blxu
  • 浏览: 5453 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

实现Cassandra数据自动失效功能

    博客分类:
  • Java
阅读更多

原文链接: http://xulingbo.net

一、 Cassandra 数据失效机制

现在使用 Cassandra 的地方很多,由于 Cassandra 的写的性能很好,所以有一部分使用 Cassandra 做为类似于日志功能来使用,所
以一个需求就提出来了,那就是希望 Cassandra 能提供一个自动失效功能,希望 Cassandra 能保留一定天数后,能自动删除数据。

这种需求的确很常见,但是遗憾的是Cassandra 目前仍然不能满足这个需求,虽然Cassandra 已经提供了实现这个功能的基础,下面详细看一下Cassandra 是怎么删除数据的:在我写的另一篇文章中也介绍了Cassandra 删除数据的规则《Cassandra 分布式数据库详解》系列文档。

Cassandra 判断数据是否有效有三个地方,分别是下面代码片段:

第一段代码

long maxChange = column.mostRecentLiveChangeAt();

return (!column.isMarkedForDelete() || column.getLocalDeletionTime() >
gcBefore || maxChange > column.getMarkedForDeleteAt()) // (1)

&& (!container.isMarkedForDelete() || maxChange >
container.getMarkedForDeleteAt()); //(2)

这段代码判断这个列是否应该被关联,关联有两个条件

(1) 列没有被删除或者删除的时间在有效期以内或者删除的时间在最后修改数据的数据之前

(2) 列所在的容器没有被删除或者列的修改时间在容器删除时间之后

第二段代码

for (byte[] cname : cf.getColumnNames())

{

IColumn c = cf.getColumnsMap().get(cname);

long minTimestamp = Math.max(c.getMarkedForDeleteAt(),
cf.getMarkedForDeleteAt());

for (IColumn subColumn : c.getSubColumns())

{

if (subColumn.timestamp() <= minTimestamp|| (subColumn.isMarkedForDelete() && subColumn.getLocalDeletionTime()<= gcBefore))

{

((SuperColumn)c).remove(subColumn.name());

}

}

if (c.getSubColumns().isEmpty() && c.getLocalDeletionTime() <=
gcBefore)

{

cf.remove(c.name());

}

}

for (byte[] cname : cf.getColumnNames())

{

IColumn c = cf.getColumnsMap().get(cname);

if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore)

|| c.timestamp() <= cf.getMarkedForDeleteAt())

{

cf.remove(cname);

}

}

if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <=
gcBefore)

{

return null;

}

第二段代码是判断数据是否应该被删除,也有两个或条件

(1) 列已经被删除了并且数据已经过了时效期

(2) 数据的修改时间在容器删除时间之前

当所有列都删除并且容器已失效,这个 key 就会被删除, key 的删除是在 SSTable 合并的时候完成的

第三段代码是在客户端中

for (IColumn column : columns)

{

if (column.isMarkedForDelete()){

continue;

}

Column thrift_column = new Column(column.name(), column.value(),
column.timestamp());

thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));

}

这段代码清楚的说明只要列被删除,客户端将取不到数据

关于 gcBefore 是在配置文件中设置的 864000 ,很多人以为这个时间就是数据的失效时间,以为在这个时间段内数据可以被使用,从上面的三段代码来看, Cassandra 在设计数据失效机制,在实际应用中几乎没有利用价值,也就是数据失效对使用者来说没有任何好处

二、 改造 Cassandra 实现真正的自动失效功能

从上分析 Cassandra 判断数据是否失效主要根据三个标识

(1) Column 是否被删除标识: isMarkedForDelete

(2) Column getLocalDeletionTime() ,这个时间就是个失效时间 GCGraceSeconds 比较,判断该 Column 是否已经失效,这个和前面的条件是并集

(3) Column 和所在 ColumnFamily 的删除时间 markedForDeleteAt 比较如果小于这个时间,改 Column 就会被删除

所以我们要实现自动失效数据,不用通过调用 remove 接口,就能实现。也就是当数据在写到数据库之前,就标识这个数据在 GCGraceSeconds 时间内有效,一旦超过这个时间,数据被被自动删除,而且是物理删除。

需要改造的地方如下:

A. 改造 org.apache.cassandra.thrift. ColumnPath 类,增加一个

public boolean is_delete;

属性 , 当我们在调用 insert 接口是标识这个 Column 数据是自动失效的。如下所示:

ColumnPath col = new
ColumnPath(columnFamily,superColumn,column.getBytes(“UTF-8″),
true );

client.insert(keyspace,key,col,value.getBytes(),System.currentTimeMillis(),
ConsistencyLevel.ONE);

B. 改造 org.apache.cassandra.thrift. Column

也增加同样增加 is_delete 属性,当我们调用 batch_insert batch_mutate 接口是同样可以设置 Column 时支持自动失效的。

A B 是客户端接口需要修改的地方,下面是服务器端要修改的地方:

C. org.apache.cassandra.db.filter. QueryPath

也增加同样增加 is_delete 属性,用来保存客户端传过来的 is_delete 值。并增加一个结构体:

public QueryPath(String columnFamilyName, byte[] superColumnName, byte[] columnName,boolean is_delete) {
this.columnFamilyName = columnFamilyName;
this.superColumnName = superColumnName;
this.columnName = columnName;
this.is_delete = is_delete;

}

insert batch_insert batch_mutate 三个接口创建 QueryPath 对象的地方改成上面这个结构体创建对象

D. org.apache.cassandra.db. ColumnFamily

修改 addColumn 方法,将 false 改为 path.is_delete

public void addColumn(QueryPath path, byte[] value, long timestamp)

{
addColumn(path, value, timestamp,
path.is_delete );
//addColumn(path, value, timestamp,
false );

}

E. org.apache.cassandra.db. Column

修改 getLocalDeletionTime 方法,直接去 timestamp 时间

public int getLocalDeletionTime()

{
assert isMarkedForDelete;
//return ByteBuffer.wrap(value).getInt();

return (int)(timestamp/1000);

}

同时修改 comparePriority 方法,改变 Column 替换规则

public long comparePriority(Column o)

{
if(o.timestamp == -1){
return -1;

}
if(this.timestamp == -1){
return 1;

}

if(isMarkedForDelete)

{
// tombstone always wins ties.
return timestamp < o.timestamp ? -1 : 1;

}
return timestamp – o.timestamp;

}

F.
org.apache.cassandra.db. RowMutation

修改 delete 方法

public void delete(QueryPath path, long timestamp)

{
assert path.columnFamilyName != null;
String cfName = path.columnFamilyName;

int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
ColumnFamily columnFamily = modifications_.get(cfName);

if (columnFamily == null)
columnFamily = ColumnFamily.create(table_, cfName);

if (path.superColumnName ==
null && path.columnName == null)

{
columnFamily.delete(localDeleteTime, timestamp);

}

else
if (path.columnName == null)

{
SuperColumn sc = new SuperColumn(path.superColumnName,
DatabaseDescriptor.getSubComparator(table_, cfName));
sc.markForDeleteAt(localDeleteTime, timestamp);
columnFamily.addColumn(sc);

}

else

{

ByteBuffer bytes = ByteBuffer.allocate(4);
bytes.putInt(localDeleteTime);
long deleteTime = -1;
//columnFamily.addColumn(path, bytes.array(), timestamp, true);
columnFamily.addColumn(path,
bytes.array(), deleteTime, true);

}

modifications_.put(cfName, columnFamily);

}

调用删除接口时将删除时间设为 -1 ,这个和前面的修改的 comparePriority 方法相适应。

G. 修改 CassandraServer 类的 thriftifyColumns thriftifySubColumns

却掉 isMarkedForDelete 检查

public List<ColumnOrSuperColumn>
thriftifyColumns(Collection<IColumn> columns, boolean reverseOrder)

{
ArrayList<ColumnOrSuperColumn> thriftColumns = new
ArrayList<ColumnOrSuperColumn>(columns.size());

for
(IColumn column : columns)

{
/*if (column.isMarkedForDelete())
{

continue;
}*/


Column thrift_column = new Column(column.name(), column.value(),
column.timestamp());
thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));

}

//we have to do the reversing here, since internally we pass results around in
ColumnFamily

//
objects, which always sort their columns in the “natural” order

//TODO this is inconvenient for direct users of StorageProxy

if (reverseOrder)
Collections.reverse(thriftColumns);
return thriftColumns;

}

数据有效时间可以通过 GCGraceSeconds 配置项来设置,这样超过 gcBefore 时间,客户端就会取不到数据,并且 Cassandra 在执行 SSTable 合并的时候会执行物理删除,如果想立即删除数据可以调用 remove 接口,数据将会立即被删除,但是在有效时间内被删除的数据客户端仍然能够取到数据。这样就真正实现了数据自动失效和删除的功能。

分享到:
评论

相关推荐

    Cassandra实战

    6. 容错:Cassandra能够自动处理节点故障,自动将数据重新分布到其他节点上,保证了数据的不丢失。 7. 一致性可调:Cassandra支持最终一致性模型,允许系统在出现网络分区或节点故障时,通过调整一致性级别来保证...

    Mastering Apache Cassandra

    - **Gossip 协议**:Cassandra 使用 Gossip 协议进行节点间通信,自动发现集群中的新节点和失效节点。 - **虚拟节点**:通过虚拟节点技术,可以实现更均匀的数据分布和更高效的负载均衡。 #### 四、关键编程模式 -...

    Cassandra

    数据自动在新旧节点间重新分布,保持负载均衡。 ### 6. 容错机制 通过Gossip协议,Cassandra节点间定期交换状态信息,检测并处理失效节点。当某个节点故障时,其上的数据会由其他副本节点接管,保证服务连续性。 ...

    cassandra 0.6.3下载

    在"0.6.3"这个版本中,Cassandra 已经展现出了强大的功能和稳定性,尤其对于那些需要处理PB级别数据的企业而言,这是一个值得信赖的解决方案。 一、Cassandra 的核心特性 1. 分布式架构:Cassandra 基于谷歌的...

    apache_cassandra

    8. 容错性:Cassandra 设计为容忍节点故障,通过数据复制和故障检测机制,当某个节点失效时,系统能够自动恢复服务,保证业务连续性。 9. SQL-like 查询语言:Cassandra 提供CQL(Cassandra Query Language),使得...

    行业分类-设备装置-一种分布式数据平台下数据存储的方法和装置.zip

    10. 框架与工具:分布式数据平台通常依赖于特定的框架和工具,如Hadoop、Spark、Cassandra等,它们提供了用于数据存储、处理和分析的高级接口和功能。 综上所述,"一种分布式数据平台下数据存储的方法和装置"涵盖了...

    亿级用户的分布式数据存储解决方案_2.docx

    综上所述,针对亿级用户的分布式数据存储解决方案需要综合考虑多种因素,包括但不限于数据复制机制的选择、数据分片的设计以及分布式部署的具体实现方式。通过对MySQL复制、数据分片原理及其实现方案的深入理解与...

    五大分布式搜索方案选型.doc

    此外,Solandra的索引分布对后期维护不透明,且在节点故障时,如果超过一定数量的节点失效,搜索功能会受到影响。 SolrCloud是Solr的分布式版本,采用Zookeeper进行节点间通信和管理,支持实时搜索(即将在Lucene...

    nosql数据库的应用探讨

    此外,Redis还支持数据持久化功能,可以通过定期快照或追加日志的方式将数据保存到磁盘。 **3.2 MongoDB** MongoDB是一个面向文档的NoSQL数据库,它使用BSON(Binary JSON)格式存储数据。MongoDB支持动态模式,...

    cpp-Redis相关文章每日精选

    7. **键空间通知**:Redis可以发送键空间通知,当指定的键发生变化时,订阅者可以接收到这些通知,这有助于实现缓存失效等机制。 8. **限流与计数**:Redis提供了限流器和计数器功能,如`INCR`、`DECR`以及`/...

    NoSQL数据笔谈

    - **Cassandra**:由Facebook开发,现由Apache软件基金会维护,是一个高度可扩展、高性能的分布式NoSQL数据库。 - **MongoDB**:一种面向文档的NoSQL数据库,支持动态查询和灵活的数据模型。 - **Redis**:一个开源...

    大数据产品能力评测赋能企业大数据能力建设 ( PDFDrive ).pdf

    例如,对于Hadoop平台,评测涵盖了NameNode主节点失效恢复、自动化部署、ODBC兼容性、数据导入认证、租户管理等工作流创建和集群动态扩展的能力。对于MPP数据库,评测则关注功能、运维、安全、扩展性、可用性和兼容...

    ES和HADOOP使用问题和需求

    - **其他存储方式**:可以通过外部数据库(如MySQL、MongoDB、Cassandra、HBase等)来存储部分数据。这种方式可以降低ES的I/O压力,同时保持查询性能。 - **读写影响**:采用不同存储方式时,需要考虑数据的一致性和...

    互联网环境下分布式事务处理系统现状与趋势.pdf

    摩尔定律的失效意味着单个处理器性能提升的速度减缓,而数据的增长速度却在不断加快。因此,分布式事务处理系统成为了解决这一问题的关键。 分布式事务处理系统的发展背景主要源于两方面的需求:一是性能需求,二是...

    开源项目-uber-ringpop-go.zip

    在实际应用中,Ringpop-go 可以与各种数据库和服务相结合,如Cassandra、Memcached等,实现分布式缓存、数据存储等场景。它还支持故障检测和自我修复功能,当检测到某个节点失效时,会自动将该节点的职责转移到其他...

    (word完整版)NoSQL数据库笔谈.doc

    1. 云计算架构中,NoSQL数据库常用于构建弹性、可扩展的服务,避免单点故障,实现数据和服务的高可用性。 2. 反模式包括单点失败、同步调用、不具备回滚能力、不记录日志等,这些都会影响系统的稳定性和可靠性。 ...

    GFS论文中英文版.rar

    当某台服务器失效时,系统可以自动从其他副本中恢复数据。此外,主服务器也能够检测到块服务器的失效,并重新分配其上的块。 2. **高性能读写**:GFS优化了读写操作,特别是顺序读写。由于大部分谷歌应用倾向于进行...

    卡桑德拉测试

    9. **分布式事务**:虽然Cassandra不支持ACID(原子性、一致性、隔离性、持久性)事务,但可以通过特定的设计模式如微批次和最终一致性实现类似的功能。测试这些模式的正确性和可靠性。 10. **扩展性**:随着数据...

Global site tag (gtag.js) - Google Analytics