Major compaction时的scan操作
发起major compaction时,通过CompactSplitThread.CompactionRunner.run开始执行
-->region.compact(compaction, store)-->store.compact(compaction)-->
CompactionContext.compact,发起compact操作
CompactionContext的实例通过HStore中的storeEngine.createCompaction()生成,
默认值为DefaultStoreEngine,通过hbase.hstore.engine.class配置。
默认的CompactionContext实例为DefaultCompactionContext。
而DefaultCompactionContext.compact方法最终调用DefaultStoreEngine.compactor来执行
compactor的实现通过hbase.hstore.defaultengine.compactor.class配置,默认实现为DefaultCompactor
调用DefaultCompactor.compact(request);
1.根据要进行compact的storefile文件,生成对应的StoreFileScanner集合列表。
在生成StoreFileScanner实例时,每一个scanner中的ScanQueryMatcher为null
2.创建StoreScanner实例,设置ScanType为ScanType.COMPACT_DROP_DELETES。
privateStoreScanner(Storestore, ScanInfo scanInfo, Scan scan,
List<? extendsKeyValueScanner> scanners, ScanTypescanType, longsmallestReadPoint,
longearliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, false, scan, null, scanInfo.getTtl(),
scanInfo.getMinVersions());
if (dropDeletesFromRow == null) {
执行这里,传入的columns为null
matcher = newScanQueryMatcher(scan, scanInfo, null, scanType,
smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
} else {
matcher = newScanQueryMatcher(scan, scanInfo, null, smallestReadPoint,
earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow);
}
ScanqueryMatcher的构造方法:
传入的columns为null
publicScanQueryMatcher(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> columns, ScanTypescanType,
longreadPointToUse, longearliestPutTs, longoldestUnexpiredTS) {
tr中mintime=0,maxtime=long.maxvalue
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
此deletes属性中的kv delete信息为到一个新的row时,都会重新进行清空。
this.deletes = newScanDeleteTracker();
this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(),
scanInfo.getFamily());
得到filter实例
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
此处为的值为false
/* how to deal with deletes */
this.isUserScan = scanType == ScanType.USER_SCAN;
此处的值为false,scanInfo.getKeepDeletedCells()的值默认false,
可通过table的columnfmaily中配置KEEP_DELETED_CELLS属性
scan.isRaw()可通过在scan中setAttribute的_raw_属性,默认为false
// keep deleted cells: if compaction or raw scan
this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && !isUserScan) || scan.isRaw();
此处的值为false,此时是major的compact,不保留delete的数据
scan.isRaw()可通过在scan中setAttribute的_raw_属性,默认为false
// retain deletes: if minor compaction or raw scan
this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES || scan.isRaw();
此时的值为false
// seePastDeleteMarker: user initiated scans
this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && isUserScan;
得到查询的最大版本数,此时的scan.maxversion与scanInfo.maxversion的值是相同的值
intmaxVersions =
scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(),
scanInfo.getMaxVersions());
生成columns属性的值为ScanWildcardColumnTracker实例,设置hasNullColumn的值为true
// Single branch to deal with two types of reads (columns vs all in family)
if (columns == null || columns.size() == 0) {
// there is always a null column in the wildcard column query.
hasNullColumn = true;
columns属性中的index表示当前比对到的column的下标值,每比较一行时,此值会重新清空
// use a specialized scan for wildcard column tracker.
this.columns = newScanWildcardColumnTracker(
scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
} else {
这个部分在compact时是不会执行的
// whether there is null column in the explicit column query
hasNullColumn = (columns.first().length == 0);
// We can share the ExplicitColumnTracker, diff is we reset
// between rows, not between storefiles.
this.columns = newExplicitColumnTracker(columns,
scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
}
}
ScanQueryMatcher.match过滤kv是否包含的方法分析
在通过StoreScanner.next(kvlist,limit)得到下一行的kv集合时
调用ScanQueryMatcher.match来过滤数据的方法分析
其中match方法返回的值具体作用可参见StoreScanner中的如下方法:
public boolean next(List<Cell> outResult, int limit).....
public MatchCode match(KeyValue kv) throws IOException {
调用filter的filterAllRemaining方法,如果此方法返回true表示此次scan结束
if (filter != null && filter.filterAllRemaining()) {
returnMatchCode.DONE_SCAN;
}
得到kv的值
byte [] bytes = kv.getBuffer();
KV在bytes中的开始位置
intoffset = kv.getOffset();
得到key的长度
keyvalue的组成:
4 |
4 |
2 |
~ |
1 |
~ |
~ |
8 |
1 |
~ |
kenlen |
vlen |
rowlen |
row |
cflen |
cf |
column |
timestamp |
kvtype |
value |
intkeyLength = Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
得到rowkey的长度记录的开始位置(不包含keylen与vlen)
offset += KeyValue.ROW_OFFSET;
rowkey的长度记录的开始位置
intinitialOffset = offset;
得到rowkey的长度
shortrowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
得到rowkey的开始位置
offset += Bytes.SIZEOF_SHORT;
比较当前传入的kv的rowkey部分是否与当前行中第一个kv的rowkey部分相同。也就是是否是同一行的数据
intret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
bytes, offset, rowLength);
如果当前传入的kv中的rowkey大于当前行的kv的rowkey部分,表示现在传入的kv是下一行,
结束当前next操作,(不是结束scan,是结束当次的next,表示这个next的一行数据的所有kv都查找完了)
if (ret <= -1) {
returnMatchCode.DONE;
否则表示当前传入的kv是上一行的数据,需要把当前的scanner向下移动一行
} elseif (ret >= 1) {
// could optimize this, if necessary?
// Could also be called SEEK_TO_CURRENT_ROW, but this
// should be rare/never happens.
returnMatchCode.SEEK_NEXT_ROW;
}
优化配置,是否需要不执行下面流程,直接把当前的scanner向下移动一行
stickyNextRow的值为true的条件:
1.ColumnTracker.done返回为true,
2.ColumnTracker.checkColumn返回为SEEK_NEXT_ROW.
3.filter.filterKeyValue(kv);返回结果为NEXT_ROW。
4.ColumnTracker.checkVersions返回为INCLUDE_AND_SEEK_NEXT_ROW。
ColumnTracker的实现在scan的columns为null或者是compact scan时为ScanWildcardColumnTracker。
否则为ExplicitColumnTracker。
// optimize case.
if (this.stickyNextRow)
returnMatchCode.SEEK_NEXT_ROW;
在ScanWildcardColumnTracker实例中返回值为false,
在ExplicitColumnTracker实例中返回值是当前的kv是否大于或等于查找的column列表的总和
if (this.columns.done()) {
stickyNextRow = true;
returnMatchCode.SEEK_NEXT_ROW;
}
得到familylen的记录位置
//Passing rowLength
offset += rowLength;
得到family的长度
//Skipping family
bytefamilyLength = bytes [offset];
把位置移动到family的名称记录的位置
offset += familyLength + 1;
得到column的长度
intqualLength = keyLength -
(offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
得到kv中timestamp的值
longtimestamp = Bytes.toLong(bytes, initialOffset + keyLength – KeyValue.TIMESTAMP_TYPE_SIZE);
检查timestamp是否在指定的范围内,主要检查ttl是否过期
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
如果发现kv的ttl过期,在ScanWildcardColumnTracker实例中直接返回SEEK_NEXT_COL。这个在compact中是默认
在ExplicitColumnTracker实例中检查是否有下一个column如果有返回SEEK_NEXT_COL。否则返回SEEK_NEXT_ROW。
returncolumns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
/*
* The delete logic is pretty complicated now.
* This is corroborated by the following:
* 1. The store might be instructed to keep deleted rows around.
* 2. A scan can optionally see past a delete marker now.
* 3. If deleted rows are kept, we have to find out when we can
* remove the delete markers.
* 4. Family delete markers are always first (regardless of their TS)
* 5. Delete markers should not be counted as version
* 6. Delete markers affect puts of the *same* TS
* 7. Delete marker need to be version counted together with puts
* they affect
*/
得到kv的类型。
bytetype = bytes[initialOffset + keyLength – 1];
如果kv是删除的kv
if (kv.isDelete()) {
在默认情况下,此keepDeletedCells值为false,这里的if检查会进去
if (!keepDeletedCells) {
// first ignore delete markers if the scanner can do so, and the
// range does not include the marker
//
// during flushes and compactions also ignore delete markers newer
// than the readpoint of any open scanner, this prevents deleted
// rows that could still be seen by a scanner from being collected
此时的值为true,scan中的tr默认为alltime=true
booleanincludeDeleteMarker = seePastDeleteMarkers ?
tr.withinTimeRange(timestamp) :
tr.withinOrAfterTimeRange(timestamp);
把删除的kv添加到DeleteTracker中。compact时的实现为ScanDeleteTracker。
if (includeDeleteMarker
&& kv.getMvccVersion() <= maxReadPointToTrackVersions) {
this.deletes.add(bytes, offset, qualLength, timestamp, type);
}
// Can't early out now, because DelFam come before any other keys
}
如果非minor compact时,
或者在compact的scan时,同时当前时间减去kv的timestamp还不到hbase.hstore.time.to.purge.deletes配置的时间,
默认的配置值为0,
或者kv的mvcc值大于现在最大的mvcc值时。此if会进行。目前在做major compact的scan,不进去
if (retainDeletesInOutput
|| (!isUserScan && (EnvironmentEdgeManager.currentTimeMillis() - timestamp) <= timeToPurgeDeletes)
|| kv.getMvccVersion() > maxReadPointToTrackVersions) {
// always include or it is not time yet to check whether it is OK
// to purge deltes or not
if (!isUserScan) {
// if this is not a user scan (compaction), we can filter this deletemarker right here
// otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking
returnMatchCode.INCLUDE;
}
以下的检查通常情况不会进入
} elseif (keepDeletedCells) {
if (timestamp < earliestPutTs) {
// keeping delete rows, but there are no puts older than
// this delete in the store files.
returncolumns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
// else: fall through and do version counting on the
// delete markers
如果kv是已经delete的kv,添加到DeleteTracker后,直接返回SKIP.
} else {
returnMatchCode.SKIP;
}
// note the following next else if...
// delete marker are not subject to other delete markers
} elseif (!this.deletes.isEmpty()) {
如果不是删除的KV时,检查删除的kv中是否包含此kv的版本。
a.如果KV是DeleteFamily。同时当前的KV的TIMESTAMP的值小于删除的KV的TIMESTAMP的值,返回FAMILY_DELETED。
b.如果KV是DeleteFamilyVersion已经删除掉的版本(删除时指定了timestamp)。返回FAMILY_VERSION_DELETED。
c.如果KV的是DeleteColumn,同时deleteTracker中包含的kv中部分qualifier的值
与传入的kv中部分qualifier的值相同。同时delete中包含的kv是DeleteColumn返回COLUMN_DELETED。
否则deleteTracker中包含的kv中部分qualifier的值与传入的kv中部分qualifier的值相同。
同时传入的kv中的timestamp的值是delete中的timestamp值,表示删除指定的版本,返回VERSION_DELETED。
d.否则表示没有删除的情况,返回NOT_DELETED。
DeleteResultdeleteResult = deletes.isDeleted(bytes, offset, qualLength,
timestamp);
switch (deleteResult) {
caseFAMILY_DELETED:
caseCOLUMN_DELETED:
returncolumns.getNextRowOrNextColumn(bytes, offset, qualLength);
caseVERSION_DELETED:
caseFAMILY_VERSION_DELETED:
returnMatchCode.SKIP;
caseNOT_DELETED:
break;
default:
thrownewRuntimeException("UNEXPECTED");
}
}
检查当前传入的kv的timestamp是否在包含的时间范围内,默认的scan是所有时间都包含
inttimestampComparison = tr.compare(timestamp);
如果当前kv的时间超过了最大的时间,返回SKIP。
if (timestampComparison >= 1) {
returnMatchCode.SKIP;
} elseif (timestampComparison <= -1) {
如果当前kv的时间小于了最小的时间,返回SEEK_NEXT_COL或者SEEK_NEXT_ROW。
returncolumns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
如果时间在正常的范围内,columns.checkColumn如果是compact时的scan 此方法返回INCLUDE。
其它情况请参见ExplicitColumnTracker的实现。
// STEP 1: Check if the column is part of the requested columns
MatchCodecolChecker = columns.checkColumn(bytes, offset, qualLength, type);
此处的IF检查会进入
if (colChecker == MatchCode.INCLUDE) {
执行filter操作,并根据filter的响应返回相关的值,此处不在说明,比较容易看明白。
ReturnCodefilterResponse = ReturnCode.SKIP;
// STEP 2: Yes, the column is part of the requested columns. Check if filter is present
if (filter != null) {
// STEP 3: Filter the key value and return if it filters out
filterResponse = filter.filterKeyValue(kv);
switch (filterResponse) {
caseSKIP:
returnMatchCode.SKIP;
caseNEXT_COL:
returncolumns.getNextRowOrNextColumn(bytes, offset, qualLength);
caseNEXT_ROW:
stickyNextRow = true;
returnMatchCode.SEEK_NEXT_ROW;
caseSEEK_NEXT_USING_HINT:
returnMatchCode.SEEK_NEXT_USING_HINT;
default:
//It means it is either include or include and seek next
break;
}
}
/*
* STEP 4: Reaching this step means the column is part of the requested columns and either
* the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
* Now check the number of versions needed. This method call returns SKIP, INCLUDE,
* INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
*
* FilterResponse ColumnChecker Desired behavior
* INCLUDE SKIP row has already been included, SKIP.
* INCLUDE INCLUDE INCLUDE
* INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
* INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
* INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
*
* In all the above scenarios, we return the column checker return value except for
* FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
*/
colChecker =
columns.checkVersions(bytes, offset, qualLength, timestamp, type,
kv.getMvccVersion() > maxReadPointToTrackVersions);
//Optimize with stickyNextRow
stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow;
return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
: colChecker;
}
stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true
: stickyNextRow;
returncolChecker;
}
major与minor的compact写入新storefile时的区别
如果是major的compact的写入,会在close掉writer时,
在meta中写入major==true的值MAJOR_COMPACTION_KEY=true。
此值主要用来控制做minor的compact时是否选择这个storefile文件。
if (writer != null) {
writer.appendMetadata(fd.maxSeqId, request.isMajor());
writer.close();
newFiles.add(writer.getPath());
}
相关推荐
当 L0 层文件数量达到 kL0_CompactionTrigger(默认为 4)时,会触发 major compaction,将这些文件合并到 L1 层。此外,还有两个阈值用于控制写入速度: 1. **kL0_SlowdownWritesTrigger**(默认 8):当 L0 层...
(2)根据 get_need_compaction_tables.sh 中 的阈值 version_count_max=500 筛选出需要手工进行compaction的tablets (3)根据生成的结果执行 compaction; 【二 .脚本介绍】 (1)get_need_compaction_tables.sh ...
这些数据在Major Compaction期间才会被真正删除,这样的设计简化了更新和删除操作,但增加了读取时的复杂性。 Scan查询是HBase中一种重要的数据获取方式,它以行为基础,逐行扫描。首先,Scan会获取第一行的所有...
因此,调整至合理的数据块大小,比如3倍于系统默认值,可以有效减少数据碎片化,提升数据读取速度,尤其是在执行major compaction操作时,能够确保系统的稳定性,避免因频繁的flush hfile操作导致的扫描性能下降。...
HBase In-Memory Compaction是HBase存储系统中的一种高性能的存储机制,它基于Log-Structured-Merge(LSM)树设计,通过将数据存储在内存中,减少磁盘I/O操作,提高写入吞吐量和读取延迟性能。 Accordion算法是...
2. Compaction:是RocksDB维护数据结构和优化存储空间的关键过程,分为Minor Compaction和Major Compaction。 - Minor Compaction:主要作用是将内存中的memtable数据写入到SSTable文件,防止memtable过度增长影响...
- 当StoreFile数量超过该值时,新数据将先进行Split或Compaction,以避免MemStore因等待Flush而导致写入操作被阻塞。 ##### 6. `hbase.regionserver.global.memstore.upperLimit` 和 `hbase.regionserver.global....
当客户端向`RegionServer`发起写操作时,如`HTable.put(Put)`请求,其流程如下: 1. **写入WAL**:首先将数据写入`Write-Ahead Log` (`WAL`),这是一个标准的Hadoop SequenceFile。WAL用于记录尚未被持久化到磁盘的...
然后在hbase shell中更改TSDB的't'列族的配置 disable 'tsdb' alter 'tsdb', {NAME => 't', CONFIGURATION => {'hbase.hstore.defaultengine.compactor.class' => ' com.twilio.compaction.TSDCompactor'}} ...
将HBase作为研究对象,分析其存储架构,针对HBase存储机制进行深入研究
HBase的In-Memory Compaction技术是HBase在处理数据写入操作时采用的一种优化算法,它对数据进行内存中的整理和压缩,以此来改善系统性能。In-Memory Compaction的设计目标是在保证数据可靠性和持久性存储的同时,...
标题 "biology-soil-compaction_Biological_" 暗示了我们即将探讨的主题是关于土壤压实与生物学的关系。在这个领域,我们将深入理解土壤生物活动如何受到土壤压实的影响,以及这种影响如何反过来对生态系统产生深远...
6. **Put、Get和Scan操作**:HBase提供了对数据的插入(Put)、查询(Get)和扫描(Scan)操作。在`org.apache.hadoop.hbase.client`包下,可以看到这些操作的实现类,如`Put`、`Get`和`Scan`。 7. **WAL(Write ...
- HBase 有两种类型的 Compaction:Minor Compaction 和 Major Compaction。 - Minor Compaction:合并多个较小的 HFile 文件。 - Major Compaction:定期执行,将所有的 HFile 文件合并为更大的文件。 3. **...
此外,研究还展望了液体存在时散体材料的振动摩擦机制,为液体与土颗粒相互作用提供了新的研究视角。 综上所述,振动压实技术的深入研究不仅应关注振动对土体压实效果的影响,还应深入探讨土体内部的振动摩擦机理,...
**249.** 在支持虚拟存储器的操作系统中,当进程从就绪态被选中执行CPU时,系统选择进程的依据是: - **答案:** D(半饥饿状态) - **解析:** 半饥饿状态指的是进程虽然处于可运行状态但实际未能得到足够的资源或...
查询HBase数据主要通过Get和Scan操作。Get操作根据行键获取特定行的所有数据,而Scan则用于扫描表的一部分或全部,可以设置过滤器以精确控制返回结果。例如,`get 'myTable', 'rowKey'` 获取“rowKey”行的所有数据...
模拟系统应能展示这些现象,并可能提供减少碎片的方法,如紧凑(Compaction)操作。 3. **内存释放**:当进程结束或释放内存时,模拟系统需要处理如何将这部分内存重新纳入空闲分区列表,以及如何选择合适的合并...
特别地,HBase在处理更新和删除时,不是立即修改原有数据,而是通过时间戳创建新版本或标记为删除,真正的删除在Major Compaction时执行。这就导致了读取操作需要对多版本和已标记删除的数据进行过滤,增加了读取的...