应用场景,在很多情况下我们只希望复杂的逻辑来过滤数据,得到的数据可能只有1M,但是数据源可能会达到1T,譬如需要知道对iphone比较感兴趣的用户有哪些。
需要过滤里面的字段品牌和相应的权重,
如果全部将数据读入mapreduce意味着较多的IO开销。
下面附上本人的代码
JobTask jobTask = new JobTask(null, new Path("/user/pms/xq/full_user_profile1/" + i)) .setInputFormat(TableInputFormat.class) .setMapper(BrandTopMapper.class) .setMapperKey(Text.class).setMapperValue(NullWritable.class) .setReducer(null) .setOutputFormat(TextOutputFormat.class) .setJobOptionsSetter(new JobOptionsSetter() { @Override public void setOptions(Job job) throws Exception { Configuration conf = job.getConfiguration(); conf.set(DataImpConstants.MAPRED_REDUCE_TASKS, getOption(MAPRED_REDUCE_TASKS)); conf.set("mapred.job.queue.name", "pms"); Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setStartRow(pair.getFirst()); scan.setStopRow(pair.getSecond()); scan.setId("com.yhd.db.hbase.job.coprocessors.BrandFilter"); HbaseUtils.createRegionScan(job, scan); conf.set(TableInputFormat.INPUT_TABLE, getOption(TABLE_NAME)); conf.set(FAMILY_NAME, getOption(FAMILY_NAME)); } });
scan.setStartRow(pair.getFirst());
scan.setStopRow(pair.getSecond());
这两行是分批扫描整张表。
其中com.yhd.db.hbase.job.coprocessors.BrandFilter 是传到hbase region服务器,让服务器决定用哪一个过滤器
public class HGetBase extends BaseRegionObserver { ObserverFilter observerFilter; @Override public boolean postScannerNext( ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { if(observerFilter != null) { for(int i=results.size() - 1; i >=0; i--) { if(!observerFilter.execute(results.get(i))) { results.remove(i); } } } return super.postScannerNext(e, s, results, limit, hasMore); } @Override public RegionScanner postScannerOpen( ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) throws IOException { if(scan.getId() != null) { try { observerFilter = (ObserverFilter) Class.forName(scan.getId()).newInstance(); } catch (Exception e1) { e1.printStackTrace(); } } return super.postScannerOpen(e, scan, s); } }
这个就设置为coprocessor的类
譬如
disable 'top_user_profile'
alter 'top_user_profile', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
alter 'top_user_profile', METHOD => 'table_att', 'coprocessor'=>'hdfs:///user/pms/xq/protest13.jar|com.yhd.db.hbase.job.coprocessors.HGetBase|1000'
enable 'top_user_profile'
根据前面的参数com.yhd.db.hbase.job.coprocessors.BrandFilter,会执行下面这个过滤类
public interface ObserverFilter { public boolean execute(Result result); }
扩展性的接口
public class BrandFilter implements ObserverFilter { @Override public boolean execute(Result result) { for(Entry<byte[], byte[]> r : result.getFamilyMap("cat".getBytes()).entrySet()) { UserCateProfile uc = JSON.parseObject(new String(r.getValue()), UserCateProfile.class); List<UserAttriProfile> list = uc.getUserAttriProfiles(); for(UserAttriProfile ua : list) { if(ua.getAttributeType() == 0) { //brand权重和名字的判断 907647 苹果 Set<AttributeItem> items = ua.getItems(); for(AttributeItem ai : items) { //中兴 936432 苹果929029 if(ai.getId() == 929029 && ai.getAv() > 0.3) { return true; } } } } } return false; } }
region server会执行相应的过滤代码,大大的减小了IO开销,缩短了执行时间。
注意的问题就是,如果要更新jar包,可能存在不支持覆盖的,jar被从hdfs上load过去在本地缓存了在临时文件了,region server还是在用临时文件,没有做覆盖操作。需要更改jar的名字或路径,才能让新的包生效,
缺点是反复更改会造成磁盘空间不足。
源代码
- CoprocessorClassLoader
缓存了jar的class loader信息,
private static final ConcurrentMap<Path, CoprocessorClassLoader> classLoadersCache = new MapMaker().concurrencyLevel(3).weakValues().makeMap();
相关推荐
《HBase企业应用开发实战》是一本深度剖析HBase在实际业务场景中应用的专业书籍,旨在帮助读者理解和掌握HBase的核心功能、设计理念以及在大规模数据处理中的应用策略。HBase,作为Apache的一个分布式、高性能、基于...
为了在实际应用中使用这些Coprocessor,我们需要在HBase的配置文件(如`hbase-site.xml`)中指定Coprocessor的路径和优先级,然后重启HBase服务。在Java代码中,可以通过`HTable`或`Admin`接口来调用Coprocessor的...
《HBase企业应用开发实战》是由马延辉和孟鑫合著的一本深入解析HBase在企业级应用中的实践指南。这本书旨在帮助读者理解和掌握如何有效地利用HBase解决大数据存储和处理的问题。以下是对该书内容的详细概述: HBase...
- **利用 HBase Coprocessor 触发器**:应用只与 HBase 交互,Coprocessor 处理数据写入和查询加速。这种方式简化了应用接口,但开发和维护 Coprocessor 需要深度技术积累。 综合考虑,选择合适的方案需要根据具体...
另外,HBaseCoprocessor的实现与应用也是重要的知识点,Coprocessor提供了在服务器端扩展HBase功能的能力。 运维指南部分讲解了HBase2.0的新特性,比如In-MemoryCompaction,这是一个内存压缩技术,有助于提升数据...
阿里云HBase作为一个大规模分布式数据库,广泛应用于大数据存储和处理场景。然而,由于HBase原生API的复杂性,开发者在使用过程中面临诸多挑战,例如学习成本高、代码量大以及对业务场景的定制化需求。为了改善这一...
《HBase在淘宝的应用与...淘宝在HBase的应用与优化实践,不仅解决了大规模数据处理的挑战,也为其他企业提供了宝贵的经验和参考。通过不断的技术创新和优化,淘宝成功构建了一个高可用、高性能的大数据存储与处理平台。
HBase在小米的实践代表了在大规模实际应用中,如何解决分布式存储和大数据处理的挑战。 2. AsyncHBaseClient的使用 文件提到了为什么要使用异步HBase客户端(AsyncHBaseClient),这表明在分布式系统中,处理方式...
同时,还有关于HBaseCoprocessor的实现与应用的介绍,该组件允许在服务器端以分布式方式执行用户自定义代码,进而扩展HBase的功能。在平台篇中,则介绍了HBase平台实践和应用,包括平台建设方面的内容。 运维指南则...
根据提供的文件信息,本内容涉及HBase实践中的MOB(Medium Object)使用指南。HBase是一种开源的非关系型分布式数据库(NoSQL),它建立在Hadoop文件系统之上,适用于存储海量稀疏数据。MOB是HBase 2.0+版本引入的...
3,结合工作实践及分析应用,培养解决实际问题的能力。 4,企业级方案设计,完全匹配工作场景。 适用人群 1、对大数据感兴趣的在校生及应届毕业生。 2、对目前职业有进一步提升要求,希望从事大数据行业高薪工作的...
在阿里巴巴的实践中,他们可能已经建立了一套完整的数据流水线,从数据采集、清洗、转换到加载到HBase,再到数据分析和应用。 数据管道的实践往往涉及到以下几个关键环节: 1. 数据接入:阿里巴巴可能使用Flume或...
在分布式大数据存储领域,HBase是一个非常重要的列式数据库,尤其在处理海量实时数据时表现卓越。本主题将深入探讨如何使用Java客户端API与...不断学习和实践,你将能够充分利用HBase的潜力,解决复杂的大数据挑战。
HBase,作为一款分布式列式存储系统,广泛应用于大数据领域,尤其在实时查询场景下..."hbase-transactional-tableindexed-master"项目可能是对此类技术的一个具体实践,对于理解和掌握HBase的高级特性具有重要价值。
8. HBase在企业中的应用场景:HBase被广泛应用于电信、互联网和金融等多个行业,这些行业通常具有高并发、大数据量的业务场景。 由于技术原因,OCR扫描文字可能会有误,但通过语境理解,可以将识别错误或遗漏的部分...
在HBase的生态建设方面,文中提到了构建HBase平台的实践和应用,强调了HBase平台化后的新特性,如HBase2.0&阿里云HBase的解读,进一步说明了HBase在国内生态的成熟度和实用性。 本文档是HBase技术社区成员的共同...
《HBase设计模式》这本书是深入理解HBase数据库架构与应用的重要参考资料,它涵盖了HBase在实际场景中的最佳实践和常见问题解决方案。HBase作为Apache Hadoop生态系统中的一个分布式、高性能、版本化、列族式的NoSQL...
7. **Coprocessor机制**:HBase引入了Coprocessor框架,允许用户在Region Server上编写自定义逻辑,如实现复杂的过滤、索引和计算,提高数据处理效率。 8. **HBase Shell**:HBase提供了一个命令行工具Shell,用于...