- 浏览: 200473 次
- 性别:
- 来自: 广州
文章分类
最新评论
-
永立s:
这篇博客帮我解决了一个问题,十分感谢.
HBase表增加snappy压缩 -
BlackWing:
日志是job运行日志,看你怎么配置了,一般就在hadoop安装 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
heymaomao 写道有两个问题,想请教下楼主 第一是日志楼 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
有两个问题,想请教下楼主 第一是日志楼主到底看的是哪个日志文件 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
atomduan:
本地的Unix 进程创建失败,检查下服务器内存是否够用,是不是 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误
引用
转载请注明出处,文章链接:http://blackwing.iteye.com/blog/1788647
之前通过修改TableInputFormatBase类实现了客户端分拆每个HRegion,从而实现一个region可以同时被多个map同时读取,原文:
http://blackwing.iteye.com/admin/blogs/1763961
但以上方法是把数据取回客户端进行,速度慢,现在改用coprocessor的endpoint方式,直接在server端计算好InputSplit后返回给客户端。
Hbase的coprocessor详解请参考:
https://blogs.apache.org/hbase/entry/coprocessor_introduction
coprocessor的开发还是很直观、简单的。
1.继承SplitRowProtocol
public interface SplitRowProtocol extends CoprocessorProtocol { public List<InputSplit> getSplitRow(byte [] splitStart, byte [] splitEnd, byte [] tableName,String regionLocation, int mappersPerSplit) throws IOException; }
把自己需要的函数、参数定义好。
2.实现刚才继承的接口、继承BaseEndpointCoprocessor
public class SplitRowEndPoint extends BaseEndpointCoprocessor implements SplitRowProtocol { @Override public List<InputSplit> getSplitRow(byte[] splitStart, byte[] splitEnd, byte[] tableName,String regionLocation,int mappersPerSplit) throws IOException { RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment(); List<InputSplit> splits = new ArrayList<InputSplit>(); HRegion region = environment.getRegion(); byte[] splitRow = region.checkSplit(); if (null != splitRow) return splits; try { HTableInterface table = environment.getTable(tableName); Scan scan = new Scan(); scan.setFilter(new FirstKeyOnlyFilter()); scan.setStartRow(splitStart); scan.setStopRow(splitEnd); scan.setBatch(300); /*String regionLocation = table.getRegionLocation(splitStart,true) .getHostname();*/ InternalScanner scanner = region.getScanner(scan); List<String> rows = new ArrayList<String>(); try { List<KeyValue> curVals = new ArrayList<KeyValue>(); boolean hasMore = false; do { curVals.clear(); hasMore = scanner.next(curVals); KeyValue kv = curVals.get(0); rows.add(Bytes.toString(curVals.get(0).getRow())); } while (hasMore); } finally { scanner.close(); } int splitSize = rows.size() / mappersPerSplit; for (int j = 0; j < mappersPerSplit; j++) { TableSplit tablesplit = null; if (j == mappersPerSplit - 1) tablesplit = new TableSplit(table.getTableName(), rows.get( j * splitSize).getBytes(), rows .get(rows.size() - 1).getBytes(), regionLocation); else tablesplit = new TableSplit(table.getTableName(), rows.get( j * splitSize).getBytes(), rows.get( j * splitSize + splitSize - 1).getBytes(), regionLocation); splits.add(tablesplit); } } catch (IOException e) { e.printStackTrace(); } return splits; } }
3.为需要使用到该coprocessor的表加载coprocessor
加载coprocessor有3种方式
1)一种是通过配置文件,在hbase-site.xml中配置:
<property> <name>hbase.coprocessor.region.classes</name> <value>com.blackwing.util.hbase.coprocessor.SplitRowEndPoint</value> </property>
这种方法缺点是,需要重启hbase。
2)通过hbase shell设置coprocessor
主要通过alter和table_att实现coprocessor的设置,之前需要disable表才能进行操作:
alter 'user_video_pref_t2',METHOD=>'table_att','coprocessor'=>'hdfs://myhadoop:8020/user/hadoop/coprocessor.jar|com.blackwing.util.hbase.coprocessor.SplitRowEndPoint|1073741823'
跟着enable表,再describe这个表,就能看到已经为该表添加了coprocessor。
3)java动态加载
动态加载,是通过java程序,实现某表的coprocessor设置,优点当然是无需重启hbase。
HBaseAdmin admin; String[] truncatedTableInfo; HTableDescriptor desc; truncatedTableInfo = conf.getStrings("user_video_pref"); conf.addResource(propertyFileName); try { admin = new HBaseAdmin(conf); desc = new HTableDescriptor(truncatedTableInfo[0]); desc.setValue("VERSIONS", "1"); HColumnDescriptor coldef = new HColumnDescriptor(truncatedTableInfo[1]); desc.addFamily(coldef); int priority = 0; if(conf.get("coprocessor.pref.priority").equals("USER")) priority = Coprocessor.PRIORITY_USER; else priority = Coprocessor.PRIORITY_SYSTEM; //2013-2-2 增加coprocessor desc.setValue("COPROCESSOR$1", conf.get("coprocessor.pref.path")+"|" +conf.get("coprocessor.pref.class")+ "|"+priority); try { if(admin.isTableAvailable(truncatedTableInfo[0])) { //清表 admin.disableTable(truncatedTableInfo[0]); admin.deleteTable(truncatedTableInfo[0]); if(admin.isTableAvailable(truncatedTableInfo[0])) LOG.info("truncate table : user_video_pref fail !"); //建表 admin.createTable(desc); } if(admin.isTableAvailable(truncatedTableInfo[0])) LOG.info("create table : user_video_pref done !"); } catch (IOException e) { e.printStackTrace(); } } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); }
以上3种方法只是把coprocessor增加到某表,但因为hbase不会检查路径上的jar是否存在,类是否正确,所以要最终确认coprocessor是否真正添加成功,需要:
1)在hbase shell下,输入status 'detailed',看看对应表的属性中是否有:coprocessors=[SplitRowEndPoint]
或者:
2)在hbase的60010 web界面中,找到刚增加了coprocessor的表,点击进去其region server,查看该表的“Metrics”列,是否有:coprocessors=[SplitRowEndPoint]
如果有该属性,说明coprocessor已经添加成功,这样就能进行客户端的远程调用了。
客户端调用coprocessor也有两种方式,如下:
public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, Row row); public <T extends CoprocessorProtocol, R> void coprocessorExec( Class<T> protocol, List<? extends Row> rows, BatchCall<T,R> callable, BatchCallback<R> callback); public <T extends CoprocessorProtocol, R> voidcoprocessorExec( Class<T> protocol, RowRange range, BatchCall<T,R> callable, BatchCallback<R> callback);
一是使用coprocessorProxy方法,二是使用voidcoprocessorExec方法。二者的区别是
就是Exec方法是并行的,效率更高。
具体调用代码如下:
public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); conf.addResource("FilePath.xml"); String tableName="user_video_pref_t2"; try { HTable table = new HTable(conf,tableName.getBytes()); Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); for (int i = 0; i < keys.getFirst().length; i++) { String regionLocation = table.getRegionLocation(keys.getFirst()[i], true).getHostname(); Batch.Call call = Batch.forMethod(SplitRowProtocol.class, "getSplitRow", f.getBytes(), e.getBytes(), tableName.getBytes(),regionLocation,1); Map<byte[], List<InputSplit>> results = table .coprocessorExec(SplitRowProtocol.class, f.getBytes(), e.getBytes(), call); // 2013-2-4 取得返回的所有InputSplit for (List<InputSplit> list : results.values()) { System.out.println("total input splits : " + list.size()); } } } catch (Throwable e) { e.printStackTrace(); } }
coprocessor的另外一种模式,oberser模式,类似于传统数据库的触发器,针对某个动作进行响应,例如preGet方法,就是在客户端get操作前触发执行,具体略过。
评论
2 楼
BlackWing
2013-09-04
你好,我的也是看到具体某个类:
1)在hbase shell下,输入status 'detailed',看看对应表的属性中是否有:coprocessors=[SplitRowEndPoint]
1)在hbase shell下,输入status 'detailed',看看对应表的属性中是否有:coprocessors=[SplitRowEndPoint]
jxauwxj 写道
大神你好,请教一个问题。
我用
alter 'user_video_pref_t2',METHOD=>'table_att','coprocessor'=>'hdfs://myhadoop:8020/user/hadoop/coprocessor.jar|com.blackwing.util.hbase.coprocessor.SplitRowEndPoint|1073741823'
这种方式加的,怎么在表的
“Metrics”列,只有:coprocessors=AggregateImplementation] 呢?
请指教,谢谢。
我用
alter 'user_video_pref_t2',METHOD=>'table_att','coprocessor'=>'hdfs://myhadoop:8020/user/hadoop/coprocessor.jar|com.blackwing.util.hbase.coprocessor.SplitRowEndPoint|1073741823'
这种方式加的,怎么在表的
“Metrics”列,只有:coprocessors=AggregateImplementation] 呢?
请指教,谢谢。
1 楼
jxauwxj
2013-08-15
大神你好,请教一个问题。
我用
alter 'user_video_pref_t2',METHOD=>'table_att','coprocessor'=>'hdfs://myhadoop:8020/user/hadoop/coprocessor.jar|com.blackwing.util.hbase.coprocessor.SplitRowEndPoint|1073741823'
这种方式加的,怎么在表的
“Metrics”列,只有:coprocessors=AggregateImplementation] 呢?
请指教,谢谢。
我用
alter 'user_video_pref_t2',METHOD=>'table_att','coprocessor'=>'hdfs://myhadoop:8020/user/hadoop/coprocessor.jar|com.blackwing.util.hbase.coprocessor.SplitRowEndPoint|1073741823'
这种方式加的,怎么在表的
“Metrics”列,只有:coprocessors=AggregateImplementation] 呢?
请指教,谢谢。
发表评论
-
解决直接读HFile时因表数据写入而导致文件目录变化问题
2015-03-02 18:22 1540转载请标明出处:http://blackwing.iteye. ... -
LoadIncrementalHFiles是copy而不是move的疑惑
2013-12-19 10:57 4157转载请标明出处:http://blackwing.iteye. ... -
Hadoop生成HFile直接入库HBase心得
2013-12-18 16:15 5281转载请标明出处:http://blackwing.iteye. ... -
Hadoop的Text类getBytes字节数据put到HBase后有多余字符串问题
2013-11-21 15:53 2147转载请标明出处:http://blackwing.iteye. ... -
编译YCSB 解决Not a host:port pair问题
2013-09-18 17:25 2001转载请标明出处:http://blackwing.iteye. ... -
HBase使用SNAPPY压缩遇到compression test fail问题解决
2013-09-18 10:51 10950转载请标明出处:http://blackwing.iteye. ... -
HBase表增加snappy压缩
2013-09-13 17:54 4396转载请标明来源:http://blackwing.iteye. ... -
ROOT不在线的另外一种原因及解决办法
2013-07-29 14:28 1515转载请声明出处:http://blackwing.iteye. ... -
enable和disable表时出现表未disable/enable异常处理
2013-07-29 11:42 4994转载请标明出处:http://blackwing.iteye. ... -
MultithreadedMapper多线程读取数据
2013-04-27 15:51 0由于当前业务需求读取HBase表时,会存在数据倾斜,大部分数据 ... -
HBase的start key和end key疑惑
2013-02-05 15:57 4946转载请标明来源:http://blackwing.iteye. ... -
分拆TableSplit 让多个mapper同时读取
2013-01-06 18:13 2621默认情况下,一个region是一个tableSplit,对应一 ... -
GET查询HBase无结果时 Result的size也不为空
2012-11-28 11:15 2082用Get查询hbase某个row时,就算该row不存在,但还是 ... -
导出导入HBase数据库
2012-06-01 16:44 1948系统上已经安装来Hadoop,并且hbase通过hadoop存 ... -
删除Hbase的META中多余表项
2012-05-24 18:22 2475由于操作HBase比较粗犷,有时直接在hadoop中删除hba ... -
Windows下访问VM中HBase
2012-04-27 17:38 3019资源所限,只能先在本机上模拟hadoop集群。(见文章:htt ...
相关推荐
HBase Coprocessor 是什么? HBase Coprocessor 是一种基于 HBase 的 coprocessor 机制,可以实现对 HBase 的扩展和自定义。它提供了一个灵活的方式来实现数据的处理和分析,例如二级索引、聚合计算、数据排序等。 ...
### HBase Coprocessor 优化与实验 #### HBase及Coprocessor概述 HBase是一种非关系型、面向列的分布式数据库系统,它基于Hadoop之上构建,旨在为大规模数据提供高可靠、高性能的支持。HBase的核心优势在于其能够...
### HBase Coprocessor 的实现与应用 #### 一、Coprocessor简介 HBase Coprocessor 是一种灵活且强大的机制,它允许用户在 HBase 上执行自定义逻辑,从而扩展 HBase 的功能。Coprocessor 的灵感源自 BigTable 的协...
讲师:陈杨——快手大数据高级研发工程师 ...内容概要:(1)讲解hbase coprocessor的原理以及使用场景,(2) coprocessor整个流程实战,包括开发,加载,运行以及管理(3)结合1,2分析coprocessor在rsgroup中的具体使用
通过阅读“HBaseCoprocessor的实现与应用.pdf”,你可以更深入地了解如何利用Coprocessor来定制HBase的行为,提升系统效率,以及在实际项目中遇到的问题和解决方案。这份资料对于理解和掌握HBase的高级特性具有很高...
此外,由于Coprocessor运行在HBase的Region Server上,因此它可以充分利用计算本地性,进行高效的数据聚合和计算。 Coprocessor 使用场景: 1. Observer:Observer是运行在Region Server上的,它监听并响应HBase的...
在HBase中,Coprocessor机制是一个强大的特性,它允许用户在HBase服务器端自定义扩展功能,如数据过滤、统计计算、访问控制等。这个压缩包“hbase_coprocessor_hbase_coprocessorjava_源码”显然包含了用Java API...
2. **配置Coprocessor**:在HBase的表或列族配置中,添加Coprocessor的类路径和优先级。这样,每当有数据写入时,Coprocessor就会被触发执行。 3. **连接Elasticsearch**:在Coprocessor中,使用Elasticsearch的...
【HBase Coprocessor简介】 HBase Coprocessor机制源于Google Bigtable的协处理器概念,旨在增强HBase的功能,提供更高效的数据处理能力。Coprocessor允许在HBase的RegionServer上运行用户自定义的代码,实现了数据...
1、region 拆分机制 ...当region大小大于某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。 但是在生产线上这种切分策略却有相当大的弊端:切分策略对于大表和小表没有
hbase-solr-coprocessor 测试代码,目的是借助solr实现hbase二级索引,以使hbase支持高效的多条件查询。主要通过hbase的coprocessor的Observer实现,通过coprocessor在记录插入hbase时向solr中创建索引。 项目核心为...
### HBase Coprocessor:深度解析与应用案例 #### 概览 HBase Coprocessor 是 HBase 的一个核心特性,允许用户在 RegionServer 上执行自定义代码,从而实现数据处理逻辑靠近数据存储的位置。这一特性极大地提高了...
* `hbase.coprocessor.region.classes`:指定RegionServer 的授权控制器的类名。 * `hbase.coprocessor.regionserver.classes`:指定RegionServer 的授权控制器的类名。 例如,在 hbase-site.xml 文件中增加以下...
4. Coprocessor:在Region服务器端实现业务逻辑,减少网络传输。 六、HBase监控与故障恢复 1. 监控指标:包括内存使用、磁盘I/O、网络流量等,通过JMX和Hadoop Metrics2提供。 2. 故障处理:Master节点和Region...
【HBASERegion数量增多问题描述及解决方案】 在HBase分布式数据库中,Region是表数据的基本存储单元,它将表的数据按照ROWKEY的范围进行分割。随着数据的增长,一个Region会分裂成两个,以此来确保数据的均衡分布。...
增量式的Apriori算法,有点像分布式的Apriori,因为我们可以把已挖掘的事务集和新增的事务集看作两个互相独立的数据集,挖掘新增的事务集,获取所有新增频繁集,然后与已有的频繁集做并集,对于两边都同时频繁的项集...
Hbase本身只有一级索引rowkey,现在通过Hbase coprocessor协处理器把Hbase的数据索引存储到Elasticsearch,从而建立二级索引;ppt中讲述了一些注意事项,挺有用的,希望能有所帮忙!
<name>hbase.coprocessor.region.classes <value>org.apache.hadoop.hbase.security.token.TokenProvider,org.apache.hadoop <name>hbase.coprocessor.master.classes <value>org.apache.hadoop.hbase....
另外,HBaseCoprocessor的实现与应用也是重要的知识点,Coprocessor提供了在服务器端扩展HBase功能的能力。 运维指南部分讲解了HBase2.0的新特性,比如In-MemoryCompaction,这是一个内存压缩技术,有助于提升数据...
当一个Region的数据量增长到预设阈值时,HBase会自动进行Region分裂,确保单个Region的大小保持在可控范围内,从而保持读写性能。 5. **RegionServer**: RegionServer是HBase的主要工作节点,负责存储和处理...