最近项目需求需要完善Sqoop的更多功能点,其中一项是将Hbase的数据导出到hdfs或hive,重点是Hbase出来的数据需要支持条件过滤。类似于Sql中的什么 > ,< ,=,主要是针对数字类型的数据过滤 等。
研究了关于Hbase的过滤只能通过Filter来进行,其中符合我们条件的Filter有一个:
SingleColumnValueFilter
这个Filter支持根据字段值进行过滤。
但是Filter 的 Comparator 没有一个支持数字类型比较器,BinaryComparator,BitComparator这些比较器没法实现我们的需求,使用他们过滤出来的数据不准确。于是目前想到的有两种方案
1.Scan出数据以后自己通过条件过滤每一行数据满不满足条件。(不雅观)
2.自定义满足条件的Comparator 。
最终选择自定义Comparator这种方案。
在网上搜索了一下,并且看了HBase现有的Comparator的源码,自定义Comparator需要做以下这些事:
1.定义protobuf文件
protobuf文件定义可以参考hbase源码的hbase-protocol模块下面的protobuf文件夹下面的Comparator.proto文件。我是直接拷贝过来然后修改修改。
至于为什么需要定义proto文件,是因为hbase所有的rpc数据交互都是通过protobuf来完成的。
下面是我定义的proto文件:
---------------------------------------------------------------------------------------------------------------
// This file contains protocol buffers that are used for filters
option java_package = "com.star.hbase.defined.comparator";
option java_outer_classname = "ComparatorProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
// This file contains protocol buffers that are used for comparators (e.g. in filters)
message NumberComparator{
required bytes value = 1;
required string fieldType = 2;
}
---------------------------------------------------------------------------------------------------------------
通过以下命令生产java类:前提是protoc.exe必须在当前目录
protoc --java_out=D:\proto NumberComparator.proto
具体protobuf的用法及其他我就不说了,网上搜一下即可。
我定义了一个NumberComparator的vo类,它下面有两个字段 ,第一个是需要进行过滤的值,第二个是需要将hbase的指定列转成对应的类型进行比较 ,比如 int double等 只支持数字类型。
//
2.创建比较器java类并且该类继承ByteArrayComparable
具体代码如下:
-------------------------------------------------------------------------------------------------------------------
package com.star.hbase.defined.comparator;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Created with Intellij IDEA
* User: star
* Date: 2015-02-09
* Time: 17:10
* function:
* To change this template use File | Settings | File Templates.
*/
public class NumberComparator extends ByteArrayComparable {
private String fieldType;
private byte [] data;
/**
* Constructor
* @param value value
*/
public NumberComparator(byte[] value,String fieldType) {
super(value);
this.fieldType = fieldType;
this.data = value;
}
@Override //重写该方法
public byte[] toByteArray() {
ComparatorProtos.NumberComparator.Builder builder =
ComparatorProtos.NumberComparator.newBuilder();
builder.setValue(ByteString.copyFrom(this.data));
builder.setFieldType(this.fieldType);
return builder.build().toByteArray();
}
//定义该方法,用于对象反序列化操作
public static NumberComparator parseFrom(final byte [] bytes) throws DeserializationException {
ComparatorProtos.NumberComparator proto = null;
try {
proto = ComparatorProtos.NumberComparator.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return new NumberComparator(proto.getValue().toByteArray(),proto.getFieldType());
}
@Override //重写比较方法 里面就可以按照自己的意愿来实现自己的比较器
public int compareTo(byte[] bytes, int offset, int length) {
if(fieldType.equalsIgnoreCase("int") || fieldType.equalsIgnoreCase("integer")) {
Integer paramValue = byteConvertObj(Integer.class,data);
Integer currentValue = byteConvertObj(Integer.class,Bytes.copy(bytes, offset, length));
return paramValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("long") || fieldType.equalsIgnoreCase("bigint")){
Long paramsValue = byteConvertObj(Long.class,data);
Long currentValue = byteConvertObj(Long.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("float")){
Float paramsValue = byteConvertObj(Float.class,data);
Float currentValue = byteConvertObj(Float.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("double")){
Double paramsValue = byteConvertObj(Double.class,data);
Double currentValue = byteConvertObj(Double.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("short") || fieldType.equalsIgnoreCase("SMALLINT")){
Short paramsValue = byteConvertObj(Short.class,data);
Short currentValue = byteConvertObj(Short.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
}
return 1;
}
private <T> T byteConvertObj(Class<T> clazz,byte [] data){
String clazzName = clazz.getSimpleName();
if(clazzName.equalsIgnoreCase("Integer")){
Integer paramValue ;
try {
paramValue = Bytes.toInt(data);
} catch (IllegalArgumentException e) {
paramValue = Integer.valueOf(Bytes.toString(data));
}
return (T)paramValue;
}else if(clazzName.equalsIgnoreCase("Long")){
Long paramValue ;
try {
paramValue = Bytes.toLong(data);
} catch (IllegalArgumentException e) {
paramValue = Long.valueOf(Bytes.toString(data));
}
return (T)paramValue;
}else if(clazzName.equalsIgnoreCase("Float")){
Float paramValue ;
try {
paramValue = Bytes.toFloat(data);
} catch (IllegalArgumentException e) {
paramValue = Float.valueOf(Bytes.toString(data));
}
return (T)paramValue;
}else if(clazzName.equalsIgnoreCase("Double")){
Double paramValue;
try {
paramValue = Bytes.toDouble(data);
} catch (IllegalArgumentException e) {
paramValue = Double.valueOf(Bytes.toString(data));
}
return (T)paramValue;
}else if(clazzName.equalsIgnoreCase("Short")){
Short paramValue;
try {
paramValue = Bytes.toShort(data);
} catch (IllegalArgumentException e) {
paramValue = Short.valueOf(Bytes.toString(data));
}
return (T)paramValue;
}
return null;
}
}
---------------------------------------------------------------------------------------------------------------------------------
至此 比较器定义完成,接着需要将该protobuf生产的java类和我们定义的Comparator类打成jar包,然后放到Hbase目录下面的lib目录里面,这样才真正执行的时候才能找到该类。放进去后需要重启以下hbase集群。
最后我们写一个测试来看下我们的自定义比较器是否生效:局部代码:
-------------------------------------------------------------------------------------------------------------
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("info"));
FilterList filterList = new FilterList();
NumberComparator comparator = new NumberComparator(Bytes.toBytes(1500),"int"); //自定义的比较器传入我们自己定义的两个参数
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("id"),
CompareFilter.CompareOp.GREATER,comparator);
filterList.addFilter(filter);
scan.setFilter(filterList);
-------------------------------------------------------------------------------------------------------------
最后成功实现该功能。
实现期间遇到的一个异常我在这里列出来 尤为深刻:
这个异常时因为自定义的Comparator里面出现了异常,然后数据传输遇到问题,数据一直传输不过去出现的问题。可能解释的不是特别好,希望大家可以完善,毕竟自身的对hbase还不是特别熟悉。
--------------------------------------------------------------------------------------------------------------------------
java.lang.RuntimeException: org.apache.hadoop.hbase.DoNotRetryIOException: Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?
at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)
at org.springframework.data.hadoop.hbase.RowMapperResultsExtractor.extractData(RowMapperResultsExtractor.java:46)
at org.springframework.data.hadoop.hbase.RowMapperResultsExtractor.extractData(RowMapperResultsExtractor.java:30)
at org.springframework.data.hadoop.hbase.HbaseTemplate$1.doInTable(HbaseTemplate.java:131)
at org.springframework.data.hadoop.hbase.HbaseTemplate.execute(HbaseTemplate.java:58)
at org.springframework.data.hadoop.hbase.HbaseTemplate.find(HbaseTemplate.java:126)
at org.springframework.data.hadoop.hbase.HbaseTemplate.find(HbaseTemplate.java:155)
at com.csx.hbase.HBaseServiceImpl.scan(HBaseServiceImpl.java:116)
at com.csx.hbase.TestHbaseServiceImpl.test(TestHbaseServiceImpl.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:82)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:240)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:180)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:402)
at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:91)
... 39 more
Caused by: org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: Expected nextCallSeq: 1 But the nextCallSeq got from client: 0; request=scanner_id: 14 number_of_rows: 100 close_scanner: false next_call_seq: 0
at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3098)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:168)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:39)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:111)
at java.lang.Thread.run(Thread.java:745)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:285)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:204)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:354)
... 40 more
Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException): org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: Expected nextCallSeq: 1 But the nextCallSeq got from client: 0; request=scanner_id: 14 number_of_rows: 100 close_scanner: false next_call_seq: 0
at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3098)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:168)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:39)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:111)
at java.lang.Thread.run(Thread.java:745)
at org.apache.hadoop.hbase.ipc.RpcClient.call(RpcClient.java:1453)
at org.apache.hadoop.hbase.ipc.RpcClient.callBlockingMethod(RpcClient.java:1657)
at org.apache.hadoop.hbase.ipc.RpcClient$BlockingRpcChannelImplementation.callBlockingMethod(RpcClient.java:1715)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:29900)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:174)
... 44 more
相关推荐
hbase 自带的Comparator只能进行字符串的比较,不能进行数值比较,通过自定义代码实现该功能。 具体使用请参考 http://blog.csdn.net/mtj66/article/details/52574739
由于HBase默认的字典序排序可能无法满足所有业务需求,因此,我们可以自定义比较器(Comparator)来改变行键的排序规则。本文将深入探讨如何在HBase中创建一个自定义的数值型比较器,并将其整合到项目中。 首先,...
标题提到的"HBASE-comparator.zip"文件可能包含了一些关于如何在HBase中实现自定义比较器的示例或教程。HBase的默认行为是按照字节顺序(字典序)对字符串类型的数据进行比较,这可能导致与我们直觉上对于数值大小...
hbasesink 自定义序列化类 ,可实现自定义rowkey及去除字段两边索引,具体请看下代码。 hbasesink 自定义序列化类 ,可实现自定义rowkey及去除字段两边索引,具体请看下代码。
通过`Table`的`getScanner(Scan scan)`方法创建一个扫描器,`Scan`对象可以设置扫描范围(行键)、过滤器等。然后使用`ResultScanner.next()`或`ResultScanner.iterator()`遍历结果。 5. **批处理操作** 使用`...
HBase RowKey 设计与协处理器运用 HBase 是一个基于 HDFS 的分布式、面向列的 NoSQL 数据库,具有高性能、可靠性和扩展性等特点。本文将详细介绍 HBase 的 RowKey 设计和协处理器运用。 HBase 的介绍 HBase 是一...
也算是Scan系列的其中一篇吧,后面对于Scan还会有一篇结合HDFS分析HBase数据读取在HDFS层面是怎么一个流程,敬请期待。HBase中Scan从大的层面来看主要有三种常见用法:ScanAPI、TableScanMR以及SnapshotScanMR
本文将深入探讨HBase的比较过滤器RowFilter的使用源码,帮助你理解如何在实际项目中应用这一关键功能。 RowFilter是HBase提供的过滤器之一,它允许我们根据行键(row key)来过滤表中的数据。在Java API中,我们...
《Spring Boot Starter HBase:构建高效HBase操作的利器》 在Java开发中,Spring Boot以其简洁、高效的...对于那些希望在Spring Boot项目中集成HBase的开发者来说,这个自定义启动器无疑是一个值得尝试的优秀选择。
HBase在不同版本(1.x, 2.x, 3.0)中针对不同类型的硬件(以IO为例,HDD/SATA-SSD/PCIe-SSD/Cloud)和场景(single/batch, get/scan)做了(即将做)各种不同的优化,这些优化都有哪些?如何针对自己的生产业务和...
在Spring Boot框架中,`spring-boot-starter-hbase`是一个非常实用的启动器,它简化了与Apache HBase数据库的集成。HBase是基于Google Bigtable设计的一个分布式、高性能、版本化的NoSQL数据库,适用于大数据处理。...
hbase从HBase表中导入数据到Hbase表中将fruit表中的一部分数据,通过MR迁入到fruit_mr表中从HDFS中导入数据到Hbase表中根据HDFS中的数据导入到fruit_hdfs表中
- **Map-Reduce Framework**: 包含了GC时间和CPU时间等,GC时间(2844ms)是Java垃圾回收器执行的时间,CPU时间(0ms)可能是因为测试未记录这部分数据。 2. **随机写入(Random Write)**: - **FILE: Number of...
然后,我们可以将这个过滤器添加到Scan对象中,Scan对象是用于定义HBase查询条件的: ```java Scan scan = new Scan(); scan.setFilter(pageFilter); ``` 接下来,使用HBase的Table对象执行扫描操作: ```java ...
hbase(main):020:0> scan 't1' ROW COLUMN+CELL row1 column=cl1:age, timestamp=1649309921756, value=24 column=cl1:name, timestamp=1649309921756, value=zhangsan column=cl2:sex, timestamp=...
在IT行业中,尤其是在大数据处理领域,HBase是一个广泛使用的分布式、高性能、列式存储的NoSQL数据库。HBase是建立在Hadoop文件系统(HDFS)之上,为处理大规模数据提供了一个高效的数据存储解决方案。而Spring Data...
【标题】"tools4j-config-provider-hbase-filter-0.12.2.zip" 提供的是一个名为 tools4j-config-provider-hbase-filter 的开源项目,版本号为 0.12.2。这个项目通常涉及到配置管理以及针对 HBase 数据库的过滤功能。...
同时,`Comparator`接口允许自定义列值的比较规则。 6. **安全性与权限**:Java-HBase开发包也支持HBase的安全特性,包括认证、授权和审计,这使得开发者可以控制对HBase数据的访问。 7. **异步API**:除了传统的...
另外,hbase-common-2.2.4.jar包含了HBase的公共库,如元数据管理、行键设计、过滤器实现等,是HBase操作的基础。而hadoop-mapreduce-client-common-3.1.3.jar则是Hadoop MapReduce的通用客户端库,提供了与...
除了这些基本过滤器外,HBase还支持复合过滤器,如FilterList,它可以组合多个过滤器并按顺序或逻辑运算符(如AND、OR)应用它们。这使得我们能够构建更复杂的查询逻辑。 使用过滤器的一个关键步骤是理解`Filter`...