just recovered from a disease,i should finish the retain part of work now...
yes,as u can see,the Scan/Get oper from hbase is some more tricky,as the kvs multi-dimensions and related to storefile and memstore.
----Part 1:Abstract
first ,have a glance at the query flow below:
there are two main loops in a Scan oper:
1. iterate rowkeys -controls how to filter per row kvs and limit the number of rows returned to client.
2.iterate qualifiers of the same row-determine which qulifiers to be matched and how many verions of a same qualifier.
second,here is a Heap Tree Search Model of hbase
there are three level scanners in hbase :
1.region scanner-use to combine all kvs from store scanners
2. store scanner-integrate all underlying storefile scanners and one memstore scanner
3.storefile /memstore scanner-the leafe scanners in hbase,these are the data sources
every child lifts the min references to parent,for the later ,will contains a heap called KeyValueHeap which implements "heap merge",that is choosed a scanner whcih has the min kv as current sacnner.the heap also has a in-built "PriorityQueue" to achieve the min scanner.
----Part 2:some utilities classes
class | usage | |
RegionScannerImpl | implement of region scanner | |
ScanQueryMatcher |
whether a qualifier is to filter out or remain or see to next row/col; and columns amount checking ,number of verions... |
|
ColumnTracker | tracks the expected qualifiers and versions,used with above | |
HFileReaderV2 | how to read a kv from a hfile or a data block | |
MemStore | the writing cache of a store |
----Part 3:Implementions
below loop is corresponding the first loop above:
public Result[] next(final long scannerId, int nbRows) throws IOException { ... //2 -- FIRST LOOP:rowKeys for (int i = 0; i < nbRows //limit by client size && currentScanResultSize < maxScannerResultSize; i++) { //limit by return byte size requestCount.incrementAndGet(); // Collect values to be returned here boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE); //-go into RegionScannerImpl if (!values.isEmpty()) { for (KeyValue kv : values) { currentScanResultSize += kv.heapSize(); } results.add(new Result(values)); } if (!moreRows) { break; } values.clear(); } .... }
second loop:
/**依据scannerid记录的历史scan偏移,取出limit(实际是batch) fields/cols of a row;此方法是对底层(memstore,hfiles)scan操作的包装类 * @return true if exists more rows(only use for real Scan to stop remain iterations) */ private boolean nextInternal(int limit, String metric) throws IOException { RpcCallContext rpcCall = HBaseServer.getCurrentCall(); while (true) { //outer loop:skip unmatched rows if (rpcCall != null) { // If a user specifies a too-restrictive or too-slow scanner, the // client might time out and disconnect while the server side // is still processing the request. We should abort aggressively/竭力地 // in that case. rpcCall.throwExceptionIfCallerDisconnected(); } //--NOTE this is rowKey only instead of composite key(used in heap to switch scanners) to compare! byte [] currentRow = peekRow(); //-so the second loop to will need to compare qualifies in fact if (isStopRow(currentRow)) { //--NOTE case in providing a stop row,this will avoid iterating the remain rows if (filter != null && filter.hasFilterRow()) { filter.filterRow(results); } if (filter != null && filter.filterRow()) { //filterRow:true to exclude row results.clear(); } return false; } else if (filterRowKey(currentRow)) { //-NOTE ignore needless row,eg. PrefixFilter if currentRow is less then the prefix nextRow(currentRow); } else { byte [] nextRow; do {//-1.how to locate the expected row,2 how to retrieve all kvs about this row? see StoreScanner#next(...) @A this.storeHeap.next(results, limit - results.size(), metric); if (limit > 0 && results.size() == limit) { if (this.filter != null && filter.hasFilterRow()) { throw new IncompatibleFilterException(//client processed also,@see Scan#setBatch() "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!"); } return true; // we are expecting more yes, but also limited to how many we can return. } //-this loop is for:if one store scanner iterate completely,go to next one } while (Bytes.equals(currentRow, nextRow = peekRow())); //-note:use row key only to compare;retrieve same rowkey's fields(cols) final boolean stopRow = isStopRow(nextRow); //-ture for Get oper;but maybe false for real Scan // now that we have an entire row, lets process with a filters: // first filter with the filterRow(List) if (filter != null && filter.hasFilterRow()) { filter.filterRow(results); //--exclude or adjust the results before returning to clent for improving perf? } if (results.isEmpty() || filterRow()) { // this seems like a redundant step - we already consumed the row // there're no left overs. // the reasons for calling this method are: // 1. reset the filters. // 2. provide a hook to fast forward the row (used by subclasses) nextRow(currentRow); // This row was totally filtered out, if this is NOT the last row, // we should continue on.-so continue the next round seeking the expect row if (!stopRow) continue; //-not Get oper(ie. Scan),iterrate the next row } return !stopRow; //-if this is a stoprow(no more rows),return false }//else }//while }
KeyValueHeap#next(xxx)
/** * Gets the next row of keys from the top-most scanner.--will be invoked in loop for the same rowKey * <p> * This method takes care of updating the heap. * <p> * This can ONLY be called when you are using Scanners that implement * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}). * @param result output result list * @param limit limit on row count to get * @param metric the metric name * @return true if there are more keys, false if all scanners are done */ public boolean next(List<KeyValue> result, int limit, String metric) throws IOException { if (this.current == null) { return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; //-StoreScanner if invoked from region level; boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric); KeyValue pee = this.current.peek(); //-whether this scanner exists more data?if false then destroy it by close() below /* * By definition, any InternalScanner must return false only when it has no * further rows to be fetched. So, we can close a scanner if it returns * false. All existing implementations seem to be fine with this. It is much * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations.--this descriptions maybe focus older versions as below has closed */ if (pee == null || !mayContainMoreRows) { //-no more data this.current.close(); } else { this.heap.add(this.current); //-put back to priority queue , usage:switch StoreScanner to seek smallest kv?yes } this.current = pollRealKV(); //---then acquire the smallest-kv scanner in current again return (this.current != null); }
KeyValueHeap#next()---switch the min sanner per invoking
/** --return the next kv and update the current scanner if any * --if this scanner is the region level,this method *maybe* switch the current smallest kv among multi-memstores/storefiles. * example,same row with certain col updates: * ------------------------------------- * order | ts | ms1 || order | ts |ms2 * ------------------------------------- * 1 | 5 | q1 || 1 | 4 | q2 * 2 | 3 | q1 || 2 | 2 | q2 --switch MemStoreScanners/StoreFileScanners by ts as 3 < 4,so this time will use ms2 scanner as current */ public KeyValue next() throws IOException { if(this.current == null) { //maybe null if closed or some cases below pollReaKV() return null; } KeyValue kvReturn = this.current.next(); //-@A retrieve kv of current scanner;this position if not changed if it is from put back to heap KeyValue kvNext = this.current.peek(); //-prepare:also probe the next scanner for next time to invoke this method if (kvNext == null) { this.current.close(); this.current = pollRealKV(); } else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner == null || //-in fact is needless here this.comparator.compare(kvNext, topScanner.peek()) >= 0) { //-NOTE multi-memstores/storefiles case:switch the smallest(latest) scanners by kv this.heap.add(this.current); //put back to heap .NOTE the next time to invoke next() will seek to next kv correctly,see @A this.current = pollRealKV(); //-then acquire the kv-least(newest) scanner } } return kvReturn; }
StoreScanner#next()
/**--second loop for retrieve * --most of methods here are synchronized,but note that this class is a instance per Get,so i think this will not decrease the perf * Get the next row of values from this Store.- how to guarantee all kvs are belong same row?see below @A * @param outResult * @param limit ---1 means all kvs to be returned * @return true if there are more rows, false if scanner is done */ @Override public synchronized boolean next(List<KeyValue> outResult, int limit, String metric) throws IOException { //--some preconditions every time invoke this method for the same/different row key-- if (checkReseek()) { return true; } // if the heap was left null, then the scanners had previously run out anyways, close and // return. if (this.heap == null) { close(); return false; } KeyValue peeked = this.heap.peek(); //-actually use this.current to do if (peeked == null) { close(); return false; } // only call setRow if the row changes; avoids confusing the query matcher--NOTE keep the resulting kvs all belong this row // if scanning intra-row/行内-that is same rowKey @A if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) { matcher.setRow(peeked.getRow()); } KeyValue kv; KeyValue prevKV = null; // Only do a sanity-check if store and comparator are available. KeyValue.KVComparator comparator = store != null ? store.getComparator() : null; long cumulativeMetric = 0; int count = 0; try { //--NOTE SECOND LOOP:qualifies LOOP: while((kv = this.heap.peek()) != null) { //-iterate throughout current row's kvs // Check that the heap gives us KVs in an increasing order. assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; prevKV = kv; ScanQueryMatcher.MatchCode qcode = matcher.match(kv); switch(qcode) { case INCLUDE: //-all are backed to client below cases case INCLUDE_AND_SEEK_NEXT_ROW: case INCLUDE_AND_SEEK_NEXT_COL: Filter f = matcher.getFilter(); outResult.add(f == null ? kv : f.transform(kv)); //-maybe do a simplified conversion of kv,eg. KeyOnlyFilter use key only count++; if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (!matcher.moreRowsMayExistAfter(kv)) { //--a simple,effect mean to return directly without doing a loop to check again return false; } reseek(matcher.getKeyForNextRow(kv)); //-construct a fake kv to quickly locate to the next row position, } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { reseek(matcher.getKeyForNextColumn(kv)); //---do it here is more effect than outer loop } else { //-case INCLUDE this.heap.next(); //--here will not miss the retunred kv as we will use peak() to use this kv before any next() } cumulativeMetric += kv.getLength(); if (limit > 0 && (count == limit)) { //-reach the limit,return break LOOP; //-same as break } continue; //-for include case DONE: return true; case DONE_SCAN: //-this scan is complete close(); return false; case SEEK_NEXT_ROW: //-a bit like appropriate include one; // This is just a relatively simple end of scan fix, to short-cut end // us if there is an endKey in the scan. if (!matcher.moreRowsMayExistAfter(kv)) { //--quick effect way to terminate this get/scan oper return false; } reseek(matcher.getKeyForNextRow(kv)); break; case SEEK_NEXT_COL: //-a bit like appropriate include one reseek(matcher.getKeyForNextColumn(kv)); break; case SKIP: //-return's value is checked by count(number of versions);ignore this col and seek to next one this.heap.next(); break; case SEEK_NEXT_USING_HINT: KeyValue nextKV = matcher.getNextKeyHint(kv); if (nextKV != null) { reseek(nextKV); } else { heap.next(); } break; default: throw new RuntimeException("UNEXPECTED"); }//switch }//while } finally { if (cumulativeMetric > 0 && metric != null) { RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric, cumulativeMetric); } } if (count > 0) { return true; } // No more keys close(); return false; }
----Part 4: FAQs
ref:
相关推荐
《深入解析YCSB-HBase14-Binding 0.17.0》 YCSB(Yahoo! Cloud Serving Benchmark)是一种广泛使用的云数据库基准测试工具,它为各种分布式存储系统提供了标准化的性能评估框架。YCSB-HBase14-Binding 0.17.0是针对...
HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...
赠送jar包:hbase-server-1.4.3.jar; 赠送原API文档:hbase-server-1.4.3-javadoc.jar; 赠送源代码:hbase-server-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.4.3.pom; 包含翻译后的API文档:...
`hbase-1.2.0-cdh5.14.2.tar.gz` 是针对Cloudera Distribution Including Apache Hadoop (CDH) 5.14.2的一个特定版本的HBase打包文件。CDH是一个流行的Hadoop发行版,包含了多个大数据组件,如HDFS、MapReduce、YARN...
HBase 元数据修复工具包。 ①修改 jar 包中的application.properties,重点是 zookeeper.address、zookeeper.nodeParent、hdfs....③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`
《Phoenix与HBase的深度解析:基于phoenix-hbase-2.4-5.1.2版本》 在大数据处理领域,Apache HBase和Phoenix是两个至关重要的组件。HBase作为一个分布式、列式存储的NoSQL数据库,为海量数据提供了高效、实时的访问...
hbase-client-2.1.0-cdh6.3.0.jar
本文将深入探讨这两个技术及其结合体`phoenix-hbase-2.2-5.1.2-bin.tar.gz`的详细内容。 首先,HBase(Hadoop Database)是Apache软件基金会的一个开源项目,它构建于Hadoop之上,是一款面向列的分布式数据库。...
phoenix-client-hbase-2.2-5.1.2.jar
赠送jar包:hbase-prefix-tree-1.4.3.jar; 赠送原API文档:hbase-prefix-tree-1.4.3-javadoc.jar; 赠送源代码:hbase-prefix-tree-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.4.3.pom; ...
《Phoenix与HBase的深度解析:基于phoenix-hbase-1.4-4.16.1-bin的探讨》 Phoenix是一种开源的SQL层,它为Apache HBase提供了高性能的关系型数据库查询能力。在大数据领域,HBase因其分布式、列式存储的特性,常被...
标题“hbase-1.2.1-bin.tar.gz.zip”表明这是HBase 1.2.1版本的二进制发行版,以tar.gz格式压缩,并且进一步用zip压缩。这种双重压缩方式可能用于减小文件大小,方便在网络上传输。用户需要先对zip文件进行解压,...
这个“hbase-2.4.17-bin”安装包提供了HBase的最新稳定版本2.4.17,适用于大数据处理和分析场景。下面将详细介绍HBase的核心概念、安装步骤以及配置和管理。 一、HBase核心概念 1. 表(Table):HBase中的表是由行...
被编译的hive-hbase-handler-1.2.1.jar,用于在Hive中创建关联HBase表的jar,解决创建Hive关联HBase时报FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop....
赠送jar包:phoenix-core-4.7.0-HBase-1.1.jar; 赠送原API文档:phoenix-core-4.7.0-HBase-1.1-javadoc.jar; 赠送源代码:phoenix-core-4.7.0-HBase-1.1-sources.jar; 赠送Maven依赖信息文件:phoenix-core-4.7.0...
hbase-hbck2-1.1.0-SNAPSHOT.jar
HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除;...其GitHub地址为:https://github.com/apache/hbase-operator-tools.git 附件资源是已经编译好的hbase2.4.4版本的hbck
赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....
赠送jar包:hbase-prefix-tree-1.1.3.jar; 赠送原API文档:hbase-prefix-tree-1.1.3-javadoc.jar; 赠送源代码:hbase-prefix-tree-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.1.3.pom; ...
Apache Phoenix是构建在HBase之上的关系型数据库层,作为内嵌的客户端JDBC驱动用以对HBase中的数据进行低延迟访问。Apache Phoenix会将用户编写的sql查询编译为一系列的scan操作,最终产生通用的JDBC结果集返回给...