Get主要流程:
1.拼装Scanner
2.调用scanner的next方法取记录
3.返回result
scanner入口是RegionScanner,代表扫描一个region,其实现RegionScannerImpl有一个属性KeyValueHeap,这个KeyValueHeap又包装了多个StoreScanner。每个StoreScanner对应一个column family,而每个StoreScanner又对应一个MemStoreScanner和多个StoreFileScanner。MemStoreScanner代表对memstore进行scan,StoreFileScanner对应一个storefile。其类图如下
0.94里实现如下
HRegion的Get入口
private List<KeyValue> get(Get get, boolean withCoprocessor) throws IOException { long now = EnvironmentEdgeManager.currentTimeMillis(); List<KeyValue> results = new ArrayList<KeyValue>(); ..... //转成Scan,startRow和stopRow一样 Scan scan = new Scan(get); RegionScanner scanner = null; try { //按照上述结构,构造scanner,这里会有seek操作,表示scanner已经做好next准备了 scanner = getScanner(scan); //取数据 scanner.next(results); } finally { if (scanner != null) scanner.close(); } ...... return results; }RegionScannerImpl构造
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException { this.maxResultSize = scan.getMaxResultSize(); this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; } else { this.stopRow = scan.getStopRow(); } // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. //get式的scan为-1 this.isScan = scan.isGetScan() ? -1 : 0; // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. //支持脏读,默认COMMITTED才能读 IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions this.readPt = Long.MAX_VALUE; MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); } else { this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); } scannerReadPoints.put(this, this.readPt); } ..... //每个需要scan的store构造scanner for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); StoreScanner scanner = store.getScanner(scan, entry.getValue()); scanners.add(scanner); } //store的scanner集合 this.storeHeap = new KeyValueHeap(scanners, comparator); }StoreScanner构造,columns为需要scan的列名
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) throws IOException { this(store, scan.getCacheBlocks(), scan, columns, store.scanInfo.getTtl(), store.scanInfo.getMinVersions()); initializeMetricNames(); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException( "Cannot specify any column for a raw scan"); } //核心Query,作用是对keyvalue在next迭代的时候判断当前keyvalue是否满足条件,决定下一步是跳过当前kv,跳过当前column还是直接到下一行 matcher = new ScanQueryMatcher(scan, store.scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); // Pass columns to try to filter out unnecessary StoreFiles. //这里构造了memstoreScanner和StoreFileScanner List<KeyValueScanner> scanners = getScannersNoCompaction(); Store.openScannerOps.incrementAndGet(); Store.openedScannerNum.addAndGet(scanners.size()); // Seek all scanners to the start of the Row (or if the exact matching row // key does not exist, then to the start of the next matching Row). // Always check bloom filter to optimize the top row seek for delete // family marker. //执行seek操作 if (explicitColumnQuery && lazySeekEnabledGlobally) { for (KeyValueScanner scanner : scanners) { scanner.requestSeek(matcher.getStartKey(), false, true); } } else { for (KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } } // Combine all seeked scanners with a heap //所有scanner组合成一个KeyValueHeap,按照seek的第一个keyvalue排序,结果是按照column family顺序scan heap = new KeyValueHeap(scanners, store.comparator); this.store.addChangedReaderObserver(this); }Store获取所有scanner
protected List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean isCompaction, ScanQueryMatcher matcher) throws IOException { List<StoreFile> storeFiles; List<KeyValueScanner> memStoreScanners; this.lock.readLock().lock(); try { storeFiles = this.getStorefiles(); //MemstoreScanner memStoreScanners = this.memstore.getScanners(); } finally { this.lock.readLock().unlock(); } // First the store file scanners // TODO this used to get the store files in descending order, // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. //StoreFileScanner集合,这里会打开HDFS文件流 List<StoreFileScanner> sfScanners = StoreFileScanner .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size()+1); scanners.addAll(sfScanners); // Then the memstore scanners scanners.addAll(memStoreScanners); return scanners; }KeyValueHeap结构
public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) throws IOException { //scanner比较器,按照peek的第一个kv对象排序,小的scanner先扫描 this.comparator = new KVScannerComparator(comparator); if (!scanners.isEmpty()) { //scanner队列,因为同一个store可能有多个scanner this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(), this.comparator); for (KeyValueScanner scanner : scanners) { //之前scanner已经seek过了,所以peek可以直接取kv,如果seek到了,则添加到队列 if (scanner.peek() != null) { this.heap.add(scanner); } else { scanner.close(); } } //取第一个scanner,多个scanner情况下会按照peek的一个kv对象排序,小的scanner先扫描 //其结果是优先扫描MemStore,再按照StoreFile俺sequenceId从小到大扫描 this.current = pollRealKV(); }看看KVScannerComparator,先按kv排序,一样则按sequenceid排序
public int compare(KeyValueScanner left, KeyValueScanner right) { int comparison = compare(left.peek(), right.peek()); //直接比较keyvalue if (comparison != 0) { return comparison; } else { //如果keyvalue对象一样,这个情况很少,则按照sequenceId比较,注意MemStoreScanner有最大的id // Since both the keys are exactly the same, we break the tie in favor // of the key which came latest. long leftSequenceID = left.getSequenceID(); long rightSequenceID = right.getSequenceID(); if (leftSequenceID > rightSequenceID) { return -1; } else if (leftSequenceID < rightSequenceID) { return 1; } else { return 0; } } } }以上就是scanner构造过程,RegionScannerImpl开始next取数据,注意这里是'Grab the next row's worth of values',就是取下一行,因为get操作只会涉及单行数据
private boolean nextInternal(int limit) throws IOException { RpcCallContext rpcCall = HBaseServer.getCurrentCall(); while (true) { //client是否已经关闭连接 if (rpcCall != null) { // If a user specifies a too-restrictive or too-slow scanner, the // client might time out and disconnect while the server side // is still processing the request. We should abort aggressively // in that case. rpcCall.throwExceptionIfCallerDisconnected(); } //从Heap中拿当前seek到的row byte [] currentRow = peekRow(); //判断是否是stopRow,currentRow为null或currentRow大于等于stopRow,所以这里实现了‘)’操作 if (isStopRow(currentRow)) { if (filter != null && filter.hasFilterRow()) { filter.filterRow(results); } if (filter != null && filter.filterRow()) { results.clear(); } return false; } //filter行过滤 else if (filterRowKey(currentRow)) { nextRow(currentRow); } else { byte [] nextRow; //内循环,从heap中取kv数据,直到满足limit或者跨行,因为这里只去单行数据 do { //从heap中批量获取keyvalue this.storeHeap.next(results, limit - results.size()); //取满limit,默认没限制,limit为-1 if (limit > 0 && results.size() == limit) { if (this.filter != null && filter.hasFilterRow()) { throw new IncompatibleFilterException( "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!"); } return true; // we are expecting more yes, but also limited to how many we can return. } } while (Bytes.equals(currentRow, nextRow = peekRow())); final boolean stopRow = isStopRow(nextRow); // now that we have an entire row, lets process with a filters: // first filter with the filterRow(List) //过滤 if (filter != null && filter.hasFilterRow()) { filter.filterRow(results); } ...... return !stopRow; } } }RegionScannerImpl的KeyValueHeap取数,这个KeyValueHeap里的scanner都是StoreScanner,按照seek之后的第一个keyvalue排序,就是按照column family顺序从小到大排序
public boolean next(List<KeyValue> result, int limit) throws IOException { if (this.current == null) { return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; //第一个StoreScanner取数 boolean mayContainMoreRows = currentAsInternal.next(result, limit); //取完之后的peek值 KeyValue pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no * further rows to be fetched. So, we can close a scanner if it returns * false. All existing implementations seem to be fine with this. It is much * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ //scan结束,关闭scanner if (pee == null || !mayContainMoreRows) { this.current.close(); } //当前scanner还没结束,继续 else { this.heap.add(this.current); } //下一个scanner this.current = pollRealKV(); return (this.current != null); }StoreScanner取数
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException { ...... // only call setRow if the row changes; avoids confusing the query matcher // if scanning intra-row //当前row if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) { matcher.setRow(peeked.getRow()); } KeyValue kv; KeyValue prevKV = null; List<KeyValue> results = new ArrayList<KeyValue>(); // Only do a sanity-check if store and comparator are available. KeyValue.KVComparator comparator = store != null ? store.getComparator() : null; //从heap中取数,直到满足limit,或者scan结束,或者matcher认为不需要再往下扫描,比如column取满数据了 LOOP: while((kv = this.heap.peek()) != null) { // Check that the heap gives us KVs in an increasing order. if (prevKV != null && comparator != null && comparator.compare(prevKV, kv) > 0) { throw new IOException("Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store); } prevKV = kv; //matcher决定是接着scan还是结束 ScanQueryMatcher.MatchCode qcode = matcher.match(kv); switch(qcode) { //当前keyvalue有效,继续往下 case INCLUDE: case INCLUDE_AND_SEEK_NEXT_ROW: case INCLUDE_AND_SEEK_NEXT_COL: //添加到result Filter f = matcher.getFilter(); results.add(f == null ? kv : f.transform(kv)); //需要换行,检查下是否还需要下行数据,对于get请求,这里会直接返回,因为单行数据就够了 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (!matcher.moreRowsMayExistAfter(kv)) { outResult.addAll(results); return false; } reseek(matcher.getKeyForNextRow(kv)); } //取下一个column,前一个column取满了 else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { reseek(matcher.getKeyForNextColumn(kv)); } //当前column,取下一个version else { this.heap.next(); } RegionMetricsStorage.incrNumericMetric(metricNameGetSize, kv.getLength()); //limit满直接返回 if (limit > 0 && (results.size() == limit)) { break LOOP; } continue; case DONE: // copy jazz outResult.addAll(results); return true; case DONE_SCAN: close(); // copy jazz outResult.addAll(results); return false; ...... } } if (!results.isEmpty()) { // copy jazz outResult.addAll(results); return true; } // No more keys close(); return false; }match过程
public MatchCode match(KeyValue kv) throws IOException { ..... //和开始row比较 int ret = this.rowComparator.compareRows(row, 0, row.length, bytes, offset, rowLength); //如果当前row比开始row大,表示开始row scan结束 if (ret <= -1) { return MatchCode.DONE; } //如果当前row小于开始row,往下seek直到我们感兴趣的row else if (ret >= 1) { // could optimize this, if necessary? // Could also be called SEEK_TO_CURRENT_ROW, but this // should be rare/never happens. return MatchCode.SEEK_NEXT_ROW; } //行匹配 // optimize case. if (this.stickyNextRow) return MatchCode.SEEK_NEXT_ROW; //所有column都处理完了,处理下一行 if (this.columns.done()) { stickyNextRow = true; return MatchCode.SEEK_NEXT_ROW; } //Passing rowLength offset += rowLength; //Skipping family byte familyLength = bytes [offset]; offset += familyLength + 1; int qualLength = keyLength + KeyValue.ROW_OFFSET - (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE; long timestamp = kv.getTimestamp(); // check for early out based on timestamp alone //当前keyvalue的timestamp是否已经没用,如果是,则当前column可以不用处理了,因为后续version的数据timestamp只会更小 //让columnChecker决定是否需要取下一列或下一行 if (columns.isDone(timestamp)) { return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } ....... //匹配时间 int timestampComparison = tr.compare(timestamp); //超过了,则跳过当前keyvalue if (timestampComparison >= 1) { return MatchCode.SKIP; } //不够,则当前column可以不用处理了,让columnChecker决定是否需要取下一列或下一行 else if (timestampComparison <= -1) { return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } .... //检查column取数是否已完成,内部会维护一个ColumnCount保留匹配的version数量 MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp, type, kv.getMemstoreTS() > maxReadPointToTrackVersions); /* * According to current implementation, colChecker can only be * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return * the MatchCode. If it is SEEK_NEXT_ROW, also set stickyNextRow. */ if (colChecker == MatchCode.SEEK_NEXT_ROW) { stickyNextRow = true; } return colChecker; }以指定column方式get的ExplicitColumnTracker为例,看看如何checkColumn,ColumnChecker内部维护一个column列表和一个index指针,代表当前处理的column,按column顺序处理,每个处理完的column会从列表中remove掉,直到column都处理完,则认为该行数据都处理完了
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, int length, long timestamp, byte type, boolean ignoreCount) { // delete markers should never be passed to an // *Explicit*ColumnTracker assert !KeyValue.isDelete(type); do { // No more columns left, we are done with this query //所有column已经处理完了,则换行 if(this.columns.size() == 0) { return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row } // No more columns to match against, done with storefile //column处理完,则换行 if(this.column == null) { return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row } // Compare specific column to current column //当前处理column和keyvalue匹配列名 int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), column.getLength(), bytes, offset, length); // Column Matches. If it is not a duplicate key, increment the version count // and include. //列名匹配,则处理之 if(ret == 0) { if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; //If column matches, check if it is a duplicate timestamp //相同timestamp,跳过 if (sameAsPreviousTS(timestamp)) { //If duplicate, skip this Key return ScanQueryMatcher.MatchCode.SKIP; } //count递增 int count = this.column.increment(); //version数取够了或者timestamp太小,则该column可以跳过了 if(count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { // Done with versions for this column // Note: because we are done with this column, and are removing // it from columns, we don't do a ++this.index. The index stays // the same but the columns have shifted within the array such // that index now points to the next column we are interested in. //先删掉 this.columns.remove(this.index); resetTS(); //删完之后比较数量,如果和index一致,则认为所有column都已处理完成 if (this.columns.size() == this.index) { // We have served all the requested columns. this.column = null; return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; } //给下一个column处理做准备 else { // We are done with current column; advance to next column // of interest. this.column = this.columns.get(this.index); return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; } } else { setTS(timestamp); } //数量还不够,继续往下scan return ScanQueryMatcher.MatchCode.INCLUDE; } //当前keyvalue和column不匹配 resetTS(); //当前keyvalue的column小于希望的column,跳过读下一个column if (ret > 0) { // The current KV is smaller than the column the ExplicitColumnTracker // is interested in, so seek to that column of interest. return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; } // The current KV is bigger than the column the ExplicitColumnTracker // is interested in. That means there is no more data for the column // of interest. Advance the ExplicitColumnTracker state to next // column of interest, and check again. //当前keyvalue的column大于希望的column,则继续处理下一个column,不理解 if (ret <= -1) { if (++this.index >= this.columns.size()) { // No more to match, do not include, done with this row. return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row } // This is the recursive case. this.column = this.columns.get(this.index); } } while(true); }KeyValueHeap迭代,保证keyvalue是按顺序scan的,有可能多个scanner之间会来回切换
public KeyValue next() throws IOException { if(this.current == null) { return null; } //当前值 KeyValue kvReturn = this.current.next(); //当前scanner的下一个keyvalue KeyValue kvNext = this.current.peek(); //当前scanner结束,换一个scanner if (kvNext == null) { this.current.close(); this.current = pollRealKV(); } //当前scanner的keyvalue再和其他scanner的peek值比较,如果大于则切换到其他scanner,保证keyvalue是从小到大排序 else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner == null || this.comparator.compare(kvNext, topScanner.peek()) >= 0) { this.heap.add(this.current); this.current = pollRealKV(); } } return kvReturn; }以MemStoreScanner来看看next取数,在keset和snapshot中切换
public synchronized KeyValue next() { if (theNext == null) { return null; } //老的值 final KeyValue ret = theNext; // Advance one of the iterators //从kvset中迭代 if (theNext == kvsetNextRow) { kvsetNextRow = getNext(kvsetIt); } //从snapshot迭代 else { snapshotNextRow = getNext(snapshotIt); } // Calculate the next value //取小的那个 theNext = getLowest(kvsetNextRow, snapshotNextRow); //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + // getLowest() + " threadpoint=" + readpoint); return ret; }以上就是Get的过程,主要步骤
1.scanner组装
2.迭代时,多个scanner之间需要保证keyvalue对象按顺序scan出来,核心是PriorityQueue+KVScannerComparator
3.ScanQueryMatcher来决定当前keyvalue对象是否可用,下一个请求如何处理,跳列还是跳行
4.ColumnChecker来决定当前column是否已经处理完毕,下一个请求如何处理,跳列还是跳行
相关推荐
需要注意的是,这里需要配置好HBase的相关参数,如HBase表的名称、列族等信息。 5. **运行作业**:设置作业参数,运行作业将数据从MySQL迁移到HBase。 #### 四、总结 通过本文的介绍,我们了解了Kettle集群的基本...
hbase-hbck2-1.1.0-SNAPSHOT.jar
HBase的查询主要通过`Get`对象实现,`Get`对象可以指定行键和列过滤条件。调用`HTable`的`get()`方法,传入`Get`对象,即可获取对应的数据。对于更复杂的查询需求,如范围扫描或使用过滤器,可以使用`Scan`对象。 5...
HBase get命令用于从表中获取数据。例如,获取Student表中的一条数据: get 'Student', '0001' HBase scan命令:查询全表数据 --------------------------- HBase scan命令用于查询全表数据。例如,查询Student表...
HbaseTemplate的`get`方法接受RowKey作为参数,返回一个Result对象,其中包含了该行的所有列族和列的信息。 4. **执行(execute)操作**:`execute`方法是一个更为灵活的接口,允许我们传递一个HBase操作的回调函数...
创建 Hbase 表需要使用 HBaseAdmin 类,通过 getConfiguration() 方法获取 Hbase 配置信息,然后使用 createTable() 方法创建表结构。代码如下: ```java public static void create(String tableName, String ...
import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.util.Bytes ``` 2. 配置HBase连接: 创建一个`Configuration`对象并加载HBase的配置信息: ```scala val conf = ...
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; public class ...
3. 查询数据:`get '表名', '行键'`获取整行数据,或`scan '表名'`进行全表扫描,可以指定过滤器等条件。 4. 删除数据:`delete '表名', '行键', '列族:列限定符', [时间戳]`删除特定单元格,可选时间戳指定版本。 ...
在Java中,我们通过HBase客户端API与HBase交互,这些API包括了`HBaseAdmin`、`HTable`、`Put`、`Get`、`Scan`等核心类。 1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置...
查询HBase中的数据通常使用`get`命令。例如,查询“zhangsan”的地址: ```shell get 'student', 'zhangsan', 'info:address' ``` 查询“lisi”的“Hadoop”成绩: ```shell get 'student', 'lisi', 'info:Hadoop' `...
HBase数据查询API HBase是一种分布式的、面向列的NoSQL数据库,主要应用于存储大量的半结构化数据。HBase提供了多种查询方式,包括单条查询和批量查询。 单条查询 单条查询是通过rowkey在table中查询某一行的数据...
3. 查询功能实现:根据RowKey查询数据是HBase的基本操作,通过输入RowKey,后台执行get操作获取对应行数据,并展示在页面上。 4. 表管理:支持HBase的建表和删除操作,这需要调用HBase的Admin API,完成表的创建、...
1. **HBase客户端库**:这是与HBase交互的基础,包含了HBase的API,如`org.apache.hadoop.hbase.client.Connection`和`org.apache.hadoop.hbase.client.Table`等,用于创建连接、打开表、执行Get、Put、Scan等操作。...
作者使用了getTable()方法来获取表对象,并传入了表名作为参数。 知识点4:字节数组的转换 在HBase中,需要将字符串转换为字节数组。作者使用了getBytes()方法来将字符串转换为字节数组。 知识点5:分页查询的...
在本文档中,我们将深入探讨如何使用Java API与HBase数据库进行交互,特别是关于如何创建表、修改表结构以及批量插入数据。HBase是Apache的一个分布式、可扩展的大数据存储系统,它基于谷歌的Bigtable设计,适用于...
根据提供的文件信息,本文将详细介绍HBase的Shell操作及其应用场景,包括如何创建表、插入数据、查询数据等关键操作。 ### HBase Shell简介 HBase Shell是HBase提供的一种交互式命令行工具,用于执行HBase操作。它...
在HBase这个分布式列式数据库中,Java API是开发者常用的一种接口来操作HBase,包括创建表、插入数据、查询数据以及实现分页等操作。本文将深入探讨如何使用HBase Java API进行数据访问和分页查询。 首先,我们要...
hbase(main):003:0> get 'my_table', 'row1' ``` 9. **监控与维护**:可以使用HBase提供的JMX监控工具,或者集成Zabbix、Prometheus等第三方监控系统来监控HBase的性能和稳定性。 10. **扩展性**:随着数据的...