Hbase的Region Compact算法属于一种多路归并的外排算法。这种算法的特点是,待排序文件本身是有序的,同时打开这些文件,顺序遍历并对比它们的首条数据,最后合并输出为一个文件,多个文件遍历时的首条数据用内存堆进行内排。
Hbase在实现该算法的过程中重要的是下面这五个类。
1.org.apache.hadoop.hbase.regionserver.Store
2.org.apache.hadoop.hbase.regionserver.StoreScanner
3.org.apache.hadoop.hbase.regionserver.StoreFileScanner
4.org.apache.hadoop.hbase.regionserver.KeyValueHeap
5.org.apache.hadoop.hbase.regionserver.ScanQueryMatcher
这五个类的关系是
1.Store类调用StoreScanner的next方法,并循环输出kv到合并文件;
2.StoreScanner的作用是负责创建并持有多个输入文件的StoreFileScanner,内部遍历这些StoreFileScanner并通过KeyValueHeap来排序这些输入文件的首条记录;
3.StoreFileScanner的作用是遍历单个输入文件,管理并提供单个输入文件的首条记录;
4.KeyValueHeap的作用就是通过堆来排序每个输入文件的首条记录。
5.ScanQueryMatcher的作用是当输入文件的首条记录来的时候,根据一定的策略判断这条记录到底是该输出还是该跳过。
源代码如下:
1.Store.compact(final List<StoreFile> filesToCompact,
final boolean majorCompaction, final long maxId)
throws IOException {
这个方法重点是对StoreScanner的方法的调用,因此比较简单,关键的是下面有注释的几句。
// calculate maximum key count after compaction (for blooms)
int maxKeyCount = 0;
for (StoreFile file : filesToCompact) {
StoreFile.Reader r = file.getReader();
if (r != null) {
// NOTE: getFilterEntries could cause under-sized blooms if the user
// switches bloom type (e.g. from ROW to ROWCOL)
long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
? r.getFilterEntries() : r.getEntries();
maxKeyCount += keyCount;
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
", bloomtype=" + r.getBloomFilterType().toString() +
", size=" + StringUtils.humanReadableInt(r.length()) );
}
}
}
// For each file, obtain a scanner:
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(filesToCompact, false, false);//将filesToCompact里的每个输入文件,都为它们创建一个StoreFileScanner对象,用于之后遍历和管理每个输入文件的首条记录。
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
try {
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
/* include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(this, scan, scanners, !majorCompaction);//用刚才创建好的scanners为成员变量再创建StoreScanner,StoreScanner才是真正排序这些文件的对象。
int bytesWritten = 0;
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
while (scanner.next(kvs)) {//StoreScanner将文件的记录在内部排序好后,通过next方法循环输出
if (writer == null && !kvs.isEmpty()) {
writer = createWriterInTmp(maxKeyCount,
this.compactionCompression);
}
if (writer != null) {
// output to writer:
for (KeyValue kv : kvs) {
writer.append(kv);//直接将这些记录循环输出到合并文件就行了。
// check periodically to see if a system stop is requested
if (Store.closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > Store.closeCheckInterval) {
bytesWritten = 0;
if (!this.region.areWritesEnabled()) {
writer.close();
fs.delete(writer.getPath(), false);
throw new InterruptedIOException(
"Aborting compaction of store " + this +
" in region " + this.region +
" because user requested stop.");
}
}
}
}
}
kvs.clear();
}
} finally {
if (scanner != null) {
scanner.close();
}
}
} finally {
if (writer != null) {
writer.appendMetadata(maxId, majorCompaction);
writer.close();
}
}
return writer;
}
2。StoreScanner.StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
boolean retainDeletesInOutput)
throws IOException {
这个是StoreScanner的构造函数,主要做了三方面工作,见下面注释
this.store = store;
this.cacheBlocks = false;
this.isGet = false;
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);//创建ScanQueryMatcher对象,用于以后判断记录是该过滤还是输出
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());//每个输入文件都初始化好首条记录
}
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator);//创建堆对象,用于之后每个输入文件首条记录的排序
}
3.StoreScanner.next(List<KeyValue> outResult, int limit) throws IOException {
//DebugPrint.println("SS.next");
这个方法封装了处理heap取出的记录值的逻辑,根据matcher对该值的判断来决定这个值是输出还是跳过,主要是下面三个方法,见注释
checkReseek();
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
close();
return false;
}
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
return false;
}
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
matcher.setRow(peeked.getRow());
}
KeyValue kv;
List<KeyValue> results = new ArrayList<KeyValue>();
LOOP: while((kv = this.heap.peek()) != null) {//拿出heap里排好序的值
// kv is no longer immutable due to KeyOnlyFilter! use copy for safety
KeyValue copyKv = new KeyValue(kv.getBuffer(), kv.getOffset(), kv.getLength());
ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);//判断该值该如何处理,输出或跳过
//DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
switch(qcode) {//根据判断具体的去处理该值
case INCLUDE:
results.add(copyKv);
this.heap.next();
if (limit > 0 && (results.size() == limit)) {
break LOOP;
}
continue;
case DONE:
// copy jazz
outResult.addAll(results);
return true;
case DONE_SCAN:
close();
// copy jazz
outResult.addAll(results);
return false;
case SEEK_NEXT_ROW:
// This is just a relatively simple end of scan fix, to short-cut end us if there is a
// endKey in the scan.
if (!matcher.moreRowsMayExistAfter(kv)) {
outResult.addAll(results);
return false;
}
reseek(matcher.getKeyForNextRow(kv));
break;
case SEEK_NEXT_COL:
reseek(matcher.getKeyForNextColumn(kv));
break;
case SKIP:
this.heap.next();
break;
case SEEK_NEXT_USING_HINT:
KeyValue nextKV = matcher.getNextKeyHint(kv);
if (nextKV != null) {
reseek(nextKV);
} else {
heap.next();
}
break;
default:
throw new RuntimeException("UNEXPECTED");
}
}
if (!results.isEmpty()) {
// copy jazz
outResult.addAll(results);
return true;
}
// No more keys
close();
return false;
}
4.KeyValueHeap.KeyValueHeap(List<? extends KeyValueScanner> scanners,
KVComparator comparator) {
排序堆的构造方法,原来就是一个PriorityQueue,因为要比较的对象是kv记录,所以将kv的comparator作为参数来生成PriorityQueue的comparator
this.comparator = new KVScannerComparator(comparator);
if (!scanners.isEmpty()) {
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
this.comparator);//堆内部就是一个PriorityQueue
for (KeyValueScanner scanner : scanners) {
if (scanner.peek() != null) {
this.heap.add(scanner);//把要排的文件scanner装入堆,文件自动按首条记录通过comparator排好序
} else {
scanner.close();
}
}
this.current = heap.poll();//堆排序后的首个文件scanner
}
}
5.KeyValueHeap.next() throws IOException {
堆里面最重要的方法其实就是next,不过看这个方法的主要功能不是为了算出nextKeyValue,而主要是为了算出nextScanner,然后需在外部再次调用peek方法来取得nextKeyValue,不是很简练。
if(this.current == null) {
return null;
}
KeyValue kvReturn = this.current.next();
KeyValue kvNext = this.current.peek();
if (kvNext == null) {//如果currentScanner已经读完
this.current.close();
this.current = this.heap.poll();//直接从余下scanner中取出新的nextScanner
} else {//如果currentScanner还有值,把余下的scanner中首值最小的scanner取出来比一比,谁小谁就是新的nextScanner
KeyValueScanner topScanner = this.heap.peek();
if (topScanner == null ||
this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
this.heap.add(this.current);
this.current = this.heap.poll();
}
}
return kvReturn;
}
总结:Hbase的Region Compact算法是在一个RegionServer上执行的,在这个执行的过程中,可能有两个瓶颈,
第一,如果待排序的文件数目过多,有导致内存堆溢出的风险,虽然在内存很大的今天,这种情形应该较难出现,但是如果一个regionserver管理了很多region,而这些region又恰好同时都在执行compact的话这就很难说了;
第二,文件的合并工作是在一台RegionServer上执行的,为何不考虑用mapred的形式通过整个集群来分担这种文件合并的工作,最后regionserver只需把合并好的文件装入region就行。
分享到:
相关推荐
* 数据分析:HBase可以用于数据分析,满足数据挖掘和机器学习的需求 * 实时数据处理:HBase可以用于实时数据处理,满足流数据处理的需求 HBase表设计 HBase表设计是指设计HBase表的结构和schema,以满足具体的业务...
HBase支持多种压缩算法,如Snappy、LZO和GZIP。Snappy压缩率较低但解压速度快,适合实时查询场景;LZO无损且注重解压速度;GZIP则提供更高的压缩率但牺牲了速度。使用`CompressionTest`工具检查系统是否已安装所需...
但预先创建多个Region可避免所有数据集中在一个Region,实现数据的负载均衡,提高写入效率。 其次, Compact与Split机制是HBase内部数据管理的核心。数据更新首先写入WAL日志和内存的MemStore,当MemStore达到阈值...
当MemStore达到一定大小时,数据会被Flush到StoreFile,并在必要时进行Compact和Split操作。HLog作为WAL,保证数据在写入HBase前先写入日志,以防数据丢失。 HFile是HBase的存储格式,有V1、V2和V3三个版本,V2引入...
5. `hbase-major-compact-htable.hbase`:HBase的重大合并(Major Compaction)是将多个HFile合并成一个,以减少数据文件的数量并优化空间使用。这个脚本可能就是执行这个操作的命令。 6. `README.md`:这是一个...
版本管理:HBase中的数据更新本质上是不断追加新的版本,通过compact操作来做版本间的文件合并。 3. Region的split和集群管理: Region的split是指将一个region分成多个region的过程。集群管理是通过ZooKeeper+...
28. HBase Region切分:根据Region大小和负载自动切分,保持负载均衡。 29. 读写流程:读取通过RowKey定位,写入先写WAL再写MemStore,定期Flush到HFile,最后Compact。 30. RowKey设计:应尽量唯一且有序,利于...