`

2。hbase CRUD--Read(Scan) operations(server side)

 
阅读更多

 

  just recovered from a disease,i should finish the retain part of work now...

  yes,as u can see,the Scan/Get oper from hbase is some more tricky,as the kvs multi-dimensions and related to storefile and memstore.

 

----Part 1:Abstract

  first ,have a glance at the query flow below:




 
there are two main loops in a Scan oper:

1. iterate rowkeys -controls how to filter per row kvs and limit the number of rows returned to client.

2.iterate qualifiers of the same row-determine which qulifiers to be matched and how many verions of a  same qualifier. 

 

 second,here is a Heap Tree Search Model of hbase



there are three level scanners in hbase :

1.region scanner-use to combine all kvs from store scanners

2. store scanner-integrate all underlying storefile scanners and one memstore scanner

3.storefile /memstore scanner-the leafe scanners in hbase,these are the data sources

 

  every child lifts the min references to parent,for the later ,will contains a heap called KeyValueHeap which implements "heap merge",that is choosed a scanner whcih has the min kv as current sacnner.the heap also has a in-built "PriorityQueue" to achieve the min scanner.

 

----Part 2:some utilities classes

class usage  
RegionScannerImpl implement of region scanner  
ScanQueryMatcher

whether a qualifier is to filter out or remain or see to next row/col;

and columns amount checking ,number of verions...

 
ColumnTracker tracks the expected qualifiers and versions,used with above  
HFileReaderV2 how to read a kv from  a hfile or a data block  
MemStore the writing cache of a store   

 

 

----Part 3:Implementions

 below loop is corresponding the first loop above:

public Result[] next(final long scannerId, int nbRows) throws IOException {
  ...
  //2 -- FIRST LOOP:rowKeys
      for (int i = 0; i < nbRows	//limit by client size
          && currentScanResultSize < maxScannerResultSize; i++) {	//limit by return byte size
        requestCount.incrementAndGet();
        // Collect values to be returned here
        boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE); //-go into RegionScannerImpl
        if (!values.isEmpty()) {
          for (KeyValue kv : values) {
            currentScanResultSize += kv.heapSize();
          }
          results.add(new Result(values));
        }
        if (!moreRows) {
          break;
        }
        values.clear();
      }
  ....
}

 

 

second loop:

/**依据scannerid记录的历史scan偏移,取出limit(实际是batch) fields/cols of a row;此方法是对底层(memstore,hfiles)scan操作的包装类
     * @return true if exists more rows(only use for real Scan to stop remain iterations)
     */
    private boolean nextInternal(int limit, String metric) throws IOException {
      RpcCallContext rpcCall = HBaseServer.getCurrentCall();
      while (true) {	//outer loop:skip unmatched rows
        if (rpcCall != null) {
          // If a user specifies a too-restrictive or too-slow scanner, the
          // client might time out and disconnect while the server side
          // is still processing the request. We should abort aggressively/竭力地
          // in that case.
          rpcCall.throwExceptionIfCallerDisconnected();
        }
        //--NOTE this is rowKey only instead of composite key(used in heap to switch scanners) to compare!
        byte [] currentRow = peekRow(); //-so the second loop to will need to compare qualifies in fact
        if (isStopRow(currentRow)) { //--NOTE case in providing a stop row,this will avoid iterating the remain rows
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results);
          }
          if (filter != null && filter.filterRow()) { //filterRow:true to exclude row
            results.clear();
          }

          return false;
        } else if (filterRowKey(currentRow)) { //-NOTE ignore needless row,eg. PrefixFilter if currentRow is less then the prefix
          nextRow(currentRow);
        } else {
          byte [] nextRow;
          do {//-1.how to locate the expected row,2 how to retrieve all kvs about this row? see StoreScanner#next(...) @A
            this.storeHeap.next(results, limit - results.size(), metric); 
            if (limit > 0 && results.size() == limit) {
              if (this.filter != null && filter.hasFilterRow()) {
                throw new IncompatibleFilterException(//client processed also,@see Scan#setBatch()
                  "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
              }
              return true; // we are expecting more yes, but also limited to how many we can return.
            } //-this loop is for:if one store scanner iterate completely,go to next one
          } while (Bytes.equals(currentRow, nextRow = peekRow())); //-note:use row key only to compare;retrieve same rowkey's fields(cols)

          final boolean stopRow = isStopRow(nextRow); //-ture for Get oper;but maybe false for real Scan

          // now that we have an entire row, lets process with a filters:

          // first filter with the filterRow(List)
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results); //--exclude or adjust the results before returning to clent for improving perf?
          }

          if (results.isEmpty() || filterRow()) {
            // this seems like a redundant step - we already consumed the row
            // there're no left overs.
            // the reasons for calling this method are:
            // 1. reset the filters.
            // 2. provide a hook to fast forward the row (used by subclasses)
            nextRow(currentRow);

            // This row was totally filtered out, if this is NOT the last row,
            // we should continue on.-so continue the next round seeking the expect row

            if (!stopRow) continue; //-not Get oper(ie. Scan),iterrate the next row
          }
          return !stopRow; //-if this is a stoprow(no more rows),return false 
        }//else
      }//while
    }

 KeyValueHeap#next(xxx)

/**
   * Gets the next row of keys from the top-most scanner.--will be invoked in loop for the same rowKey
   * <p>
   * This method takes care of updating the heap.
   * <p>
   * This can ONLY be called when you are using Scanners that implement
   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
   * @param result output result list
   * @param limit limit on row count to get
   * @param metric the metric name
   * @return true if there are more keys, false if all scanners are done
   */
  public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
    if (this.current == null) {
      return false;
    }
    InternalScanner currentAsInternal = (InternalScanner)this.current; //-StoreScanner if invoked from region level;
    boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
    KeyValue pee = this.current.peek(); //-whether this scanner exists more data?if false then destroy it by close() below
    /*
     * By definition, any InternalScanner must return false only when it has no
     * further rows to be fetched. So, we can close a scanner if it returns
     * false. All existing implementations seem to be fine with this. It is much
     * more efficient to close scanners which are not needed than keep them in
     * the heap. This is also required for certain optimizations.--this descriptions maybe focus older versions as below has closed
     */
    if (pee == null || !mayContainMoreRows) { //-no more data
      this.current.close();
    } else {
      this.heap.add(this.current); //-put back to priority queue , usage:switch StoreScanner to seek smallest kv?yes
    }
    this.current = pollRealKV(); //---then acquire the smallest-kv scanner in current again
    return (this.current != null);
  }

KeyValueHeap#next()---switch the min sanner per invoking

/** --return the next kv and update the current scanner if any
   * --if this scanner is the region level,this method *maybe* switch the current smallest kv among multi-memstores/storefiles.
   * example,same row with certain col updates:
   * -------------------------------------
   * order | ts | ms1   || order | ts |ms2
   * -------------------------------------
   * 1	   | 5  | q1    || 1     | 4  | q2	 
   * 2     | 3  | q1    || 2     | 2  | q2	--switch MemStoreScanners/StoreFileScanners by ts as 3 < 4,so this time will use ms2 scanner as current
   */
  public KeyValue next()  throws IOException {
    if(this.current == null) { //maybe null if closed or some cases below pollReaKV()
      return null;
    }
    KeyValue kvReturn = this.current.next(); //-@A retrieve kv of current scanner;this position if not changed if it is from put back to heap
    KeyValue kvNext = this.current.peek();   //-prepare:also probe the next scanner for next time to invoke this method
    if (kvNext == null) {
      this.current.close();
      this.current = pollRealKV(); 
    } else {
      KeyValueScanner topScanner = this.heap.peek();
      if (topScanner == null || //-in fact is needless here
          this.comparator.compare(kvNext, topScanner.peek()) >= 0) { //-NOTE multi-memstores/storefiles case:switch the smallest(latest) scanners by kv
        this.heap.add(this.current); //put back to heap .NOTE the next time to invoke next() will seek to next kv correctly,see @A
        this.current = pollRealKV(); //-then acquire the kv-least(newest) scanner
      }
    }
    return kvReturn;
  }

  

 

StoreScanner#next()

/**--second loop for retrieve 
   * --most of methods here are synchronized,but note that this class is a instance per Get,so i think this will not decrease the perf
   * Get the next row of values from this Store.-  how to guarantee all kvs are belong same row?see below @A
   * @param outResult
   * @param limit ---1 means all kvs to be returned
   * @return true if there are more rows, false if scanner is done
   */
  @Override
  public synchronized boolean next(List<KeyValue> outResult, int limit,
      String metric) throws IOException {
	  //--some preconditions every time invoke this method for the same/different row key--
    if (checkReseek()) {
      return true;
    }

    // if the heap was left null, then the scanners had previously run out anyways, close and
    // return.
    if (this.heap == null) {
      close();
      return false;
    }

    KeyValue peeked = this.heap.peek(); //-actually use this.current to do 
    if (peeked == null) {
      close();
      return false;
    }

    // only call setRow if the row changes; avoids confusing the query matcher--NOTE keep the resulting kvs all belong this row
    // if scanning intra-row/行内-that is same rowKey @A
    if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
      matcher.setRow(peeked.getRow());
    }

    KeyValue kv;
    KeyValue prevKV = null;

    // Only do a sanity-check if store and comparator are available.
    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    long cumulativeMetric = 0;
    int count = 0;
    try {  //--NOTE SECOND LOOP:qualifies
      LOOP: while((kv = this.heap.peek()) != null) { //-iterate throughout current row's kvs
        // Check that the heap gives us KVs in an increasing order.
        assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
          "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
        prevKV = kv;
        ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        switch(qcode) {
          case INCLUDE: //-all are backed to client below cases
          case INCLUDE_AND_SEEK_NEXT_ROW:
          case INCLUDE_AND_SEEK_NEXT_COL:

            Filter f = matcher.getFilter();
            outResult.add(f == null ? kv : f.transform(kv)); //-maybe do a simplified conversion of kv,eg. KeyOnlyFilter use key only
            count++;

            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
              if (!matcher.moreRowsMayExistAfter(kv)) { //--a simple,effect mean to return directly without doing a loop to check again
                return false;
              }
              reseek(matcher.getKeyForNextRow(kv)); //-construct a fake kv to quickly locate to the next row position,
            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
              reseek(matcher.getKeyForNextColumn(kv)); //---do it here is more effect than outer loop
            } else { //-case INCLUDE
              this.heap.next(); //--here will not miss the retunred kv as we will use peak() to use this kv before any next() 
            }

            cumulativeMetric += kv.getLength();
            if (limit > 0 && (count == limit)) { //-reach the limit,return
              break LOOP; //-same as break
            }
            continue; //-for include 

          case DONE:
            return true;

          case DONE_SCAN: //-this scan is complete
            close();

            return false;

          case SEEK_NEXT_ROW: //-a bit like appropriate include one;
            // This is just a relatively simple end of scan fix, to short-cut end
            // us if there is an endKey in the scan.
            if (!matcher.moreRowsMayExistAfter(kv)) { //--quick effect way to terminate this get/scan oper
              return false;
            }

            reseek(matcher.getKeyForNextRow(kv));
            break;

          case SEEK_NEXT_COL: //-a bit like appropriate include one
            reseek(matcher.getKeyForNextColumn(kv));
            break;

          case SKIP: //-return's value is checked by count(number of versions);ignore this col and seek to next one
            this.heap.next(); 
            break;

          case SEEK_NEXT_USING_HINT:
            KeyValue nextKV = matcher.getNextKeyHint(kv);
            if (nextKV != null) {
              reseek(nextKV);
            } else {
              heap.next();
            }
            break;

          default:
            throw new RuntimeException("UNEXPECTED");
        }//switch
      }//while
    } finally {
      if (cumulativeMetric > 0 && metric != null) {
        RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric,
            cumulativeMetric);
      }
    }

    if (count > 0) {
      return true;
    }

    // No more keys
    close();
    return false;
  }

 

 

 

----Part 4: FAQs

 

 

 

 

ref:

2。hbase CRUD--Read(query) operations 

hbase源码系列(十二)Get、Scan在服务端是如何处理?

 

  • 大小: 70.4 KB
  • 大小: 109.4 KB
  • 大小: 46.3 KB
分享到:
评论

相关推荐

    ycsb-hbase14-binding-0.17.0

    《深入解析YCSB-HBase14-Binding 0.17.0》 YCSB(Yahoo! Cloud Serving Benchmark)是一种广泛使用的云数据库基准测试工具,它为各种分布式存储系统提供了标准化的性能评估框架。YCSB-HBase14-Binding 0.17.0是针对...

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    hbase-server-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-server-1.4.3.jar; 赠送原API文档:hbase-server-1.4.3-javadoc.jar; 赠送源代码:hbase-server-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.4.3.pom; 包含翻译后的API文档:...

    hbase的hbase-1.2.0-cdh5.14.2.tar.gz资源包

    `hbase-1.2.0-cdh5.14.2.tar.gz` 是针对Cloudera Distribution Including Apache Hadoop (CDH) 5.14.2的一个特定版本的HBase打包文件。CDH是一个流行的Hadoop发行版,包含了多个大数据组件,如HDFS、MapReduce、YARN...

    hbase-meta-repair-hbase-2.0.2.jar

    HBase 元数据修复工具包。 ①修改 jar 包中的application.properties,重点是 zookeeper.address、zookeeper.nodeParent、hdfs....③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`

    phoenix-hbase-2.4-5.1.2

    《Phoenix与HBase的深度解析:基于phoenix-hbase-2.4-5.1.2版本》 在大数据处理领域,Apache HBase和Phoenix是两个至关重要的组件。HBase作为一个分布式、列式存储的NoSQL数据库,为海量数据提供了高效、实时的访问...

    hbase-client-2.1.0-cdh6.3.0.jar

    hbase-client-2.1.0-cdh6.3.0.jar

    phoenix-hbase-2.2-5.1.2-bin.tar.gz

    本文将深入探讨这两个技术及其结合体`phoenix-hbase-2.2-5.1.2-bin.tar.gz`的详细内容。 首先,HBase(Hadoop Database)是Apache软件基金会的一个开源项目,它构建于Hadoop之上,是一款面向列的分布式数据库。...

    phoenix-client-hbase-2.2-5.1.2.jar

    phoenix-client-hbase-2.2-5.1.2.jar

    hbase-prefix-tree-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-prefix-tree-1.4.3.jar; 赠送原API文档:hbase-prefix-tree-1.4.3-javadoc.jar; 赠送源代码:hbase-prefix-tree-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.4.3.pom; ...

    phoenix-hbase-1.4-4.16.1-bin

    《Phoenix与HBase的深度解析:基于phoenix-hbase-1.4-4.16.1-bin的探讨》 Phoenix是一种开源的SQL层,它为Apache HBase提供了高性能的关系型数据库查询能力。在大数据领域,HBase因其分布式、列式存储的特性,常被...

    hbase-1.2.1-bin.tar.gz.zip

    标题“hbase-1.2.1-bin.tar.gz.zip”表明这是HBase 1.2.1版本的二进制发行版,以tar.gz格式压缩,并且进一步用zip压缩。这种双重压缩方式可能用于减小文件大小,方便在网络上传输。用户需要先对zip文件进行解压,...

    hbase-2.4.17-bin 安装包

    这个“hbase-2.4.17-bin”安装包提供了HBase的最新稳定版本2.4.17,适用于大数据处理和分析场景。下面将详细介绍HBase的核心概念、安装步骤以及配置和管理。 一、HBase核心概念 1. 表(Table):HBase中的表是由行...

    hive-hbase-handler-1.2.1.jar

    被编译的hive-hbase-handler-1.2.1.jar,用于在Hive中创建关联HBase表的jar,解决创建Hive关联HBase时报FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop....

    phoenix-core-4.7.0-HBase-1.1-API文档-中文版.zip

    赠送jar包:phoenix-core-4.7.0-HBase-1.1.jar; 赠送原API文档:phoenix-core-4.7.0-HBase-1.1-javadoc.jar; 赠送源代码:phoenix-core-4.7.0-HBase-1.1-sources.jar; 赠送Maven依赖信息文件:phoenix-core-4.7.0...

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase-hbck2-1.2.0-SNAPSHOT.jar

    HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除;...其GitHub地址为:https://github.com/apache/hbase-operator-tools.git 附件资源是已经编译好的hbase2.4.4版本的hbck

    hbase-hadoop-compat-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....

    hbase-prefix-tree-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-prefix-tree-1.1.3.jar; 赠送原API文档:hbase-prefix-tree-1.1.3-javadoc.jar; 赠送源代码:hbase-prefix-tree-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.1.3.pom; ...

    apache-phoenix-5.0.0-HBase-2.0-bin.tar.gz

    Apache Phoenix是构建在HBase之上的关系型数据库层,作为内嵌的客户端JDBC驱动用以对HBase中的数据进行低延迟访问。Apache Phoenix会将用户编写的sql查询编译为一系列的scan操作,最终产生通用的JDBC结果集返回给...

Global site tag (gtag.js) - Google Analytics