`
kavy
  • 浏览: 890647 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

hcatalog简介和使用

 
阅读更多

Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。

 

HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance

 

由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode

HiveMetaStoreClient.Java

 

[java] view plain copy
 
 print?
  1. public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws  
  2.     MetaException, TException {  
  3.   if (localMetaStore) {  
  4.     throw new UnsupportedOperationException("getDelegationToken() can be " +  
  5.         "called only in thrift (non local) mode");  
  6.   }  
  7.   return client.get_delegation_token(owner, renewerKerberosPrincipalName);  
  8. }  

 

HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表

HCatInputFormat API:

 

[java] view plain copy
 
 print?
  1. public static void setInput(Job job,  
  2.     InputJobInfo inputJobInfo) throws IOException;  

 

先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据

 

 

[java] view plain copy
 
 print?
  1. public static HCatSchema getTableSchema(JobContext context)   
  2.     throws IOException;  

在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息

 

 

HCatOutputFormat API:

 

[java] view plain copy
 
 print?
  1. public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;  

OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values

比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写

 

[java] view plain copy
 
 print?
  1. Map<String, String> partitionValues = new HashMap<String, String>();  
  2. partitionValues.put("dt""2013-06-13");  
  3. partitionValues.put("country""china");  
  4. HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);  
  5. HCatOutputFormat.setOutput(job, info);  

 

[java] view plain copy
 
 print?
  1. public static HCatSchema getTableSchema(JobContext context) throws IOException;  

获取之前HCatOutputFormat.setOutput指定的table schema信息

 

 

[java] view plain copy
 
 print?
  1. public static void setSchema(final Job job, final HCatSchema schema) throws IOException;  

设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息

 

 

下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中

 

[java] view plain copy
 
 print?
  1. import java.io.IOException;  
  2. import java.util.Iterator;  
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.conf.Configured;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.WritableComparable;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.Mapper;  
  11. import org.apache.hadoop.mapreduce.Reducer;  
  12. import org.apache.hadoop.util.Tool;  
  13. import org.apache.hadoop.util.ToolRunner;  
  14. import org.apache.hcatalog.data.DefaultHCatRecord;  
  15. import org.apache.hcatalog.data.HCatRecord;  
  16. import org.apache.hcatalog.data.schema.HCatSchema;  
  17. import org.apache.hcatalog.mapreduce.HCatInputFormat;  
  18. import org.apache.hcatalog.mapreduce.HCatOutputFormat;  
  19. import org.apache.hcatalog.mapreduce.InputJobInfo;  
  20. import org.apache.hcatalog.mapreduce.OutputJobInfo;  
  21.   
  22. public class GroupByGuid extends Configured implements Tool {  
  23.   
  24.     @SuppressWarnings("rawtypes")  
  25.     public static class Map extends  
  26.             Mapper<WritableComparable, HCatRecord, Text, IntWritable> {  
  27.         HCatSchema schema;  
  28.         Text guid;  
  29.         IntWritable one;  
  30.   
  31.         @Override  
  32.         protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)  
  33.                 throws IOException, InterruptedException {  
  34.             guid = new Text();  
  35.             one = new IntWritable(1);  
  36.             schema = HCatInputFormat.getTableSchema(context);  
  37.         }  
  38.   
  39.         @Override  
  40.         protected void map(WritableComparable key, HCatRecord value,  
  41.                 Context context) throws IOException, InterruptedException {  
  42.             guid.set(value.getString("guid", schema));  
  43.             context.write(guid, one);  
  44.         }  
  45.     }  
  46.   
  47.     @SuppressWarnings("rawtypes")  
  48.     public static class Reduce extends  
  49.             Reducer<Text, IntWritable, WritableComparable, HCatRecord> {  
  50.         HCatSchema schema;  
  51.   
  52.         @Override  
  53.         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)  
  54.                 throws IOException, InterruptedException {  
  55.             schema = HCatOutputFormat.getTableSchema(context);  
  56.         }  
  57.   
  58.         @Override  
  59.         protected void reduce(Text key, Iterable<IntWritable> values,  
  60.                 Context context) throws IOException, InterruptedException {  
  61.             int sum = 0;  
  62.             Iterator<IntWritable> iter = values.iterator();  
  63.             while (iter.hasNext()) {  
  64.                 sum++;  
  65.                 iter.next();  
  66.             }  
  67.             HCatRecord record = new DefaultHCatRecord(2);  
  68.             record.setString("guid", schema, key.toString());  
  69.             record.setInteger("count", schema, sum);  
  70.             context.write(null, record);  
  71.         }  
  72.     }  
  73.   
  74.     @Override  
  75.     public int run(String[] args) throws Exception {  
  76.         Configuration conf = getConf();  
  77.   
  78.         String dbname = args[0];  
  79.         String inputTable = args[1];  
  80.         String filter = args[2];  
  81.         String outputTable = args[3];  
  82.         int reduceNum = Integer.parseInt(args[4]);  
  83.   
  84.         Job job = new Job(conf,  
  85.                 "GroupByGuid, Calculating every guid's pageview");  
  86.         HCatInputFormat.setInput(job,  
  87.                 InputJobInfo.create(dbname, inputTable, filter));  
  88.   
  89.         job.setJarByClass(GroupByGuid.class);  
  90.         job.setInputFormatClass(HCatInputFormat.class);  
  91.         job.setMapperClass(Map.class);  
  92.         job.setReducerClass(Reduce.class);  
  93.         job.setMapOutputKeyClass(Text.class);  
  94.         job.setMapOutputValueClass(IntWritable.class);  
  95.         job.setOutputKeyClass(WritableComparable.class);  
  96.         job.setOutputValueClass(DefaultHCatRecord.class);  
  97.         job.setNumReduceTasks(reduceNum);  
  98.   
  99.         HCatOutputFormat.setOutput(job,  
  100.                 OutputJobInfo.create(dbname, outputTable, null));  
  101.         HCatSchema s = HCatOutputFormat.getTableSchema(job);  
  102.         HCatOutputFormat.setSchema(job, s);  
  103.   
  104.         job.setOutputFormatClass(HCatOutputFormat.class);  
  105.   
  106.         return (job.waitForCompletion(true) ? 0 : 1);  
  107.     }  
  108.   
  109.     public static void main(String[] args) throws Exception {  
  110.         int exitCode = ToolRunner.run(new GroupByGuid(), args);  
  111.         System.exit(exitCode);  
  112.     }  
  113. }  

 

其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了

 

本文链接http://blog.csdn.net/lalaguozhe/article/details/9083905,转载请注明

 

分享到:
评论

相关推荐

    Hadoop各个组件大概介绍

    Hcatalog架构主要由Metastore和Data Storage组成,Metastore负责管理数据仓库的元数据,而Data Storage负责存储和检索数据。Hcatalog的特点是高可扩展性和高性能,使得其广泛应用于大数据处理领域。 12. Sqoop...

    sentry 权限简介

    Sentry最初的设计目标是为了与Hive/Hcatalog、Apache Solr和Cloudera Impala等组件集成,并计划未来扩展到HDFS和HBase等更多Hadoop组件。 #### 二、Sentry的作用及背景 ##### A. Sentry为Hadoop带来的便利 - **...

    大数据工程实验室申报书.pdf

    - **发展与扩展**:自2006年以来,Hadoop家族迅速壮大,产生了许多顶级项目,如YARN、Hcatalog、Oozie和Cassandra,以应对不断发展的大数据需求。 3. **R语言与Hadoop的结合**: - **互补性**:Hadoop擅长全量...

    Pro Apache Hadoop 2nd Edition 2014

    6. **工具与框架集成** - 强调了Hadoop生态系统中各种工具和框架的重要性,如Pig、HCatalog、HBase等,并提供了详细的使用指南。 7. **云部署与管理** - 指导读者如何在云环境中部署和管理Hadoop集群,充分利用...

    大数据运维

    这包括但不限于 HDFS、MapReduce、Hive、Pig、HBase、Zookeeper、Sqoop 和 HCatalog 等。Ambari 的目标是使 Hadoop 及其相关的大数据软件更易于使用。 ##### 2. Ambari取得的成绩 - **简化集群供应**:通过逐步的...

    大数据系列-Hive入门与实战.pptx

    大数据系列-Hive入门与实战 Hive 是什么? ---------------- ...同时,Hive 的安装、开发和使用都需要了解 Hive 的基本概念和语法,需要掌握 Hive 的设计特征、体系结构、用户接口、应用场景等知识。

    Ambari搭建HadoopAmbari搭建Hadoop_.docx

    例如,如果计划安装和管理HDP 2.3.4或更高版本,则必须使用Ambari 2.2.0或更高版本。 - Ambari并不需要安装Hue或Solr等额外组件,但这些组件可以在安装过程中作为选项添加。 ##### 1.2 系统最低需求 - **操作系统...

    ambari 大数据组件部署手册

    #### 一、Ambari简介 Apache Ambari是一款基于Web的工具,旨在简化Apache Hadoop集群的部署、管理与监控过程。Ambari支持广泛的Hadoop生态组件,如HDFS、MapReduce、Hive、Pig、HBase、ZooKeeper、Sqoop、Hcatalog等...

    hadoop集群自动化安装手册

    #### 一、Ambari简介与功能 ##### 1.1 Ambari概述 - **来源**: Apache Ambari项目源自Apache官方,其官方网站为http://ambari.apache.org/。 - **目标**: 通过开发一系列工具简化Hadoop集群的配置、监控及管理过程。...

    ambari admin guide

    - **HCatalog:** 提供了对Hadoop数据的表格视图,简化了对Hadoop数据的管理。 - **Pig:** 一种高级数据分析工具,用于编写简单脚本来处理大型数据集。 - **Hive:** 一种数据仓库基础设施,提供了SQL-like查询语言...

Global site tag (gtag.js) - Google Analytics