`
Mysun
  • 浏览: 273135 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

利用Filter进行HBase查询

阅读更多
在HBase中,我们可以利用其Scan接口对数据进行扫描,具体方式如下,
Scan scanConfig = new Scan();
ResultScanner scanner = table.getScanner(scanConfig);
for (Result result : scanner) {//ResultScanner实现了Iterator接口
	//do something here
}

在扫描过程中,我们想要哪些数据返回哪些数据不返回是用过Scan这个类提供的各种方法来控制的,下面列出了一些主要的方法,
/**
* 返回指定列族下的指定qualifier中的值
*/
public Scan addColumn(byte[] family, byte[] qualifier)
/**
* 设置扫描的起始行,starRow是存储时候用的rowKey
*/
public Scan setStartRow(byte[] startRow)
/**
* 设置扫描的结束行,stopRow是存储时候用的rowKey
*/
public Scan setStopRow(byte[] stopRow)
/**
* 设置返回结果的时间戳
*/
public Scan setTimeStamp(long timestamp)
/**
* 设置返回结果的时间戳返回
*/
public Scan setTimeRange(long minStamp, long maxStamp)
/**
* 设置过滤器,这是非常灵活的扫描机制
*/
public Scan setFilter(Filter filter)

其中最为有用的是setFilter方法,这个方法允许我们为扫描过程设置一个过滤器,从而过滤掉那些不符合要求的记录。Filter机制非常灵活,基本可以满足我们所有的查询需求。HBase的类库中已经预先定义了很多的Filter,通过使用这些预定义的Filter,我们可以非常灵活地组装自己的查询需求。但是依赖于Hbase对Filter的实现机制,还是会存在一些限制,这个下面马上就会说到。
HBase是如何实现Filter的
HBase提供了一个Java实现的客户端,用来与服务器进行通讯。这个客户端的通讯层使用了RPC来进行封装,封装所有RPC操作的是org.apache.hadoop.hbase.ipc.HBaseRPC,而这个类会将真正的通讯委托给org.apache.hadoop.hbase.ipc.HBaseClient类来执行。如果跟踪开头给出的代码中的第二行代码的执行,会发现其执行流程是这样的,

上图中的ScannerCallable.openScanner()方法如下,
protected long openScanner() throws IOException {
    return this.server.openScanner(this.location.getRegionInfo().getRegionName(),
      this.scan);
  }

其中server属性的类型是HRegionInterface,从调用栈可以看到这个接口被动态代理掉了,最终的调用会委托给HBaseRPC执行,也就是我们看到的调用栈最顶端的那行代码。检查HRegionInterface接口提供的方法,会发现其主要是用来与HBase的RegionServer进行交互的(HBase的RegionServer就是真正存放数据的服务器)。在HBaseRPC接受openScanner这个方法调用之后,它会委托HBaseClient去与各个RegionServer进行通信,告诉它们有个客户端 正在发起openScanner方法调用,同时会把openScanner方法的参数序列化之后传给各个RegionServer。这个方法调用成功之后(HBase客户端收到了所有RegionServer的正确相应),openScanner方法就返回了。
如果本文开头代码所示,第三行之后的代码就开始从扫描器中拿结果了。这个时候HBase客户端会逐个与RegionServer进行通信,告诉它们开始扫描吧。由于在openScanner的时候,已经把参数传递给了各个RegionServer,各个RegionServer就可以根据参数来执行扫描了,然后将扫描结果返回给HBase客户端,然后客户端程序就可以拿到结果进行处理了。
HBase中的Filter有何限制

从ScannerCallable.openScanner的参数可以看到,HBase客户端其实是把客户端程序中创建的Scan对象当作参数传递给通讯层的,也就是说这个Scan参数会被序列化给各个RegionServer,当然也就包括设置在其中的Filter。
继续跟踪代码会发现,对参数执行序列化操作的代码是放在HBaseClient的一个叫做Connection的内部类中的sendParam方法中的,其代码如下,
protected void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }

      DataOutputBuffer d=null;
      try {
        //noinspection SynchronizeOnNonFinalField
        synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + " sending #" + call.id);

          //for serializing the
          //data to be written
          d = new DataOutputBuffer();
          d.writeInt(0xdeadbeef); // placeholder for data length
          d.writeInt(call.id);
          call.param.write(d);
          byte[] data = d.getData();
          int dataLength = d.getLength();
          // fill in the placeholder
          Bytes.putInt(data, 0, dataLength - 4);
          out.write(data, 0, dataLength);
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally {
        //the buffer is just an in-memory buffer, but it is still polite to
        // close early
        IOUtils.closeStream(d);
      }
    }

我们会发现这个方法会调用Call对象中的param属性的write方法,目的是让param属性自己将自己转换成字节,然后放入DataOutputBuffer里面。在ScannerCallable.openScanner()方法中,这个param就是Scan对象,通过查看Scan对象的源代码,我们发现这个对象实现了HBase的Writable接口,因此确实有一个write方法,这个方法的代码如下,
public void write(final DataOutput out)
  throws IOException {
    out.writeByte(SCAN_VERSION);
    Bytes.writeByteArray(out, this.startRow);
    Bytes.writeByteArray(out, this.stopRow);
    out.writeInt(this.maxVersions);
    out.writeInt(this.batch);
    out.writeInt(this.caching);
    out.writeBoolean(this.cacheBlocks);
    if(this.filter == null) {
      out.writeBoolean(false);
    } else {
      out.writeBoolean(true);
      Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
      filter.write(out);
    }
    tr.write(out);
    out.writeInt(familyMap.size());
    for(Map.Entry<byte [], NavigableSet<byte []>> entry : familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      NavigableSet<byte []> columnSet = entry.getValue();
      if(columnSet != null){
        out.writeInt(columnSet.size());
        for(byte [] qualifier : columnSet) {
          Bytes.writeByteArray(out, qualifier);
        }
      } else {
        out.writeInt(0);
      }
    }
  }


这个方法会把Scan对象中的所有在RegionServer执行扫描过程中需要用到的参数全部写入DataOutput中,当然也包括Filter。从这个代码中,我们很容易地知道HBase中的Filter也是实现了Writable接口的。而且在写入Filter对象本身的byte之前,还会往输出流中写入这个Filter的类名,这一步是必须的,否则RegionServer将无法知道它们需要使用哪一个Filter。正式由于这一点,给用户自定义Filter带来了限制。由于真正的扫描数据的过程是在RegionServer上发生的,HBase就采用这种序列化的方式将扫描数据需要用到的类和属性告知RegionServer。对于用户自定义的Filter,由于在RegionServer上是找不到相关类的,因此在执行的过程中会抛出异常。也许有人会说,把自定义的Filter放到各个RegionServer上去不就可以了吗?这样做确实可以,但是维护成本会比较大,而且自定义Filter有时候也不是那么好些的,因为会设置到序列化自身。因此我们还是应该使用HBase已经提供的Filter,如果能够合理有效地组合预定义的各个Filter,是足够用来实现我们的查询需求的。
  • 大小: 211.4 KB
分享到:
评论
3 楼 wubo2qml 2013-11-22  
问下如何进行列中数字大小的比较。我看了几个过滤器,最接近的是
SingleColumnValueFilter  但是这个里面的过滤器是按照字典顺序进行比较的!
2 楼 blackproof 2013-08-12  
rowkey是A_B
我知道要的A集合,也知道要的B范围
不用自定义,怎么Filter呢
1 楼 bin_1715575332 2013-05-06  
文章不错,尤其了后半部分讲解一些原理。

相关推荐

    hbase java api 访问 查询、分页

    在实际应用中,可能会涉及到更复杂的查询场景,如使用Filter进行数据筛选,或者利用HBase的Region Split特性优化存储和查询性能。对于过滤器,可以创建各种类型的Filter对象,如`PrefixFilter`、`RowFilter`等,并将...

    HBase基本操作.pdf

    HBase通过Filter来支持复杂的查询操作。例如,使用`scan '表名', {FILTER =&gt; '过滤器'} `可以对表数据进行过滤。过滤器可以是多个,用以组合不同条件。 #### 10. 时间戳和数据版本操作 HBase存储数据时会自动为每条...

    HBase使用的jar包

    为了在Hadoop项目中使用HBase进行明细查询,我们需要依赖特定的jar包。这些jar包包含了HBase运行所需的所有类库和函数,使得开发者可以在应用程序中调用HBase的相关API来操作数据。 首先,我们要理解HBase的架构。...

    hbase-2.4.17-bin 安装包

    4. Bloom Filter:使用Bloom Filter可以减少不必要的磁盘I/O,提高查询性能。 5. Indexing:虽然HBase本身不支持索引,但可以通过第三方库如 phoenix 或 hbase-indexer 实现索引功能。 六、监控与维护 1. 使用...

    HBase官方文档中文版-HBase手册中文版

    3. Hive集成:通过Hive的HBase存储过程进行数据查询。 4. Flume、Kafka集成:用于日志收集和实时流处理。 这份“HBase官方文档中文版”详细阐述了HBase的核心概念、架构、操作以及最佳实践,对于HBase的学习者和...

    hbase一些查询

    在探讨HBase查询技巧及其应用时,我们聚焦于利用各种Filter进行精确且高效的查询操作。HBase,作为一款分布式、版本化的列存储数据库,专为海量数据设计,其查询性能和灵活性很大程度上依赖于合理使用Filter。以下是...

    hbase2.5.6最新版本下载

    3. 索引与查询:虽然HBase不是全索引数据库,但可以通过Secondary Index和Filter实现复杂查询。 4. 复制机制:支持多种复制策略,如同步复制、异步复制等,保证数据冗余和容错。 四、HBase 2.5.6版本的改进 在...

    HBase实战实例

    2. Bloom Filter:利用Bloom Filter减少不必要的磁盘I/O,提高查询效率。 3. Compaction策略:适时进行Compaction,平衡存储空间和读取速度。 六、挑战与解决方案 1. 数据一致性:在高并发场景下,如何保证数据的...

    HBase性能优化方法总结

    在大数据处理领域,HBase作为一个分布式列式存储系统,因其高效的数据存储和查询能力而备受青睐。然而,随着数据量的增长,如何进行有效的性能优化变得至关重要。本文将深入探讨HBase性能优化的各种策略,旨在帮助你...

    HBase权威指南中文版

    可以利用布隆过滤器(Bloom Filter)减少无效的磁盘I/O,以及Scan操作来批量获取数据。 7. **HBase与Hadoop集成**:HBase构建在Hadoop的HDFS之上,利用HDFS的分布式存储特性。MapReduce可以用于对HBase进行批量操作...

    HBase企业应用开发实战-高清

    此外,本书还会详细阐述HBase的API使用,包括Java API和命令行工具,以及如何通过HBase Shell进行数据操作。通过实例,读者可以学习如何创建表、插入和查询数据,以及执行复杂的扫描操作,这对于实际开发中的数据...

    hbase0.94java源代码

    HBase是Apache Hadoop生态系统中的一个分布式、高性能、版本化、列族式数据库,它主要设计用于处理海量数据...同时,了解源代码有助于开发者更好地利用HBase特性,优化数据访问性能,以及在遇到问题时进行排查和解决。

    hbase 学习 hbase原理 hbase资料

    HBase的数据模型基于BigTable的设计,以行和列来进行数据组织,每个表被分为多个行,行由行键(Row Key)标识,而每一行又包含多个列族(Column Family),列族下有多个列(Qualifier)。 1. **HBase的架构** - **...

    hbase-hadoop+database系统入门书籍

    - **高级功能**:HBase还提供了更多的高级功能,如过滤器(Filter)、批量操作(Batch)、观察者(Observer)等。 ### 面向的目标受众 该教程主要面向以下几类人群: - **希望从事大数据分析领域的专业人士**:包括软件...

    HBase:权威指南(中文版)

    HBase支持多种查询方式,如Get、Scan和Filter,这些工具可以帮助开发者实现复杂的数据检索逻辑。同时,理解如何根据业务需求调整HBase配置,以及如何处理数据热点问题,是提高系统性能的关键。 总之,《HBase:权威...

    hbase-0.92.1.tar.gz

    HBase是NoSQL数据库的一种实现,它不支持SQL查询语言,而是提供了API和命令行工具来进行数据操作。 在HBase 0.92.1中,有以下几个关键知识点: 1. **Region Server**:这是HBase的主要工作单元,负责存储和处理表...

    Hbase行键设计(rowkey)实现多条件查询

    实现多条件查询时,可以使用HBase的自定义比较器(Comparator)和过滤器(Filter)。例如,创建一个相等比较器用于实现精确匹配查询,另一个范围比较器用于执行范围查询。这两个比较器结合过滤器,可以在一次Scan...

    hbase-0.98.6-cdh5.3.6.zip

    8. **过滤器(Filter)**:HBase支持多种过滤器,允许用户在查询时指定条件,提高查询效率。 9. ** Coprocessor机制**:HBase 0.98.6引入了Coprocessor框架,允许用户自定义插件在Region服务器端执行,实现数据的...

Global site tag (gtag.js) - Google Analytics