Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。
HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance
由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode
HiveMetaStoreClient.Java
- public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws
- MetaException, TException {
- if (localMetaStore) {
- throw new UnsupportedOperationException("getDelegationToken() can be " +
- "called only in thrift (non local) mode");
- }
- return client.get_delegation_token(owner, renewerKerberosPrincipalName);
- }
HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表
HCatInputFormat API:
- public static void setInput(Job job,
- InputJobInfo inputJobInfo) throws IOException;
先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据
- public static HCatSchema getTableSchema(JobContext context)
- throws IOException;
在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息
HCatOutputFormat API:
- public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;
OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values
比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写
- Map<String, String> partitionValues = new HashMap<String, String>();
- partitionValues.put("dt", "2013-06-13");
- partitionValues.put("country", "china");
- HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
- HCatOutputFormat.setOutput(job, info);
- public static HCatSchema getTableSchema(JobContext context) throws IOException;
获取之前HCatOutputFormat.setOutput指定的table schema信息
- public static void setSchema(final Job job, final HCatSchema schema) throws IOException;
设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息
下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中
- import java.io.IOException;
- import java.util.Iterator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.apache.hcatalog.data.DefaultHCatRecord;
- import org.apache.hcatalog.data.HCatRecord;
- import org.apache.hcatalog.data.schema.HCatSchema;
- import org.apache.hcatalog.mapreduce.HCatInputFormat;
- import org.apache.hcatalog.mapreduce.HCatOutputFormat;
- import org.apache.hcatalog.mapreduce.InputJobInfo;
- import org.apache.hcatalog.mapreduce.OutputJobInfo;
- public class GroupByGuid extends Configured implements Tool {
- @SuppressWarnings("rawtypes")
- public static class Map extends
- Mapper<WritableComparable, HCatRecord, Text, IntWritable> {
- HCatSchema schema;
- Text guid;
- IntWritable one;
- @Override
- protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
- throws IOException, InterruptedException {
- guid = new Text();
- one = new IntWritable(1);
- schema = HCatInputFormat.getTableSchema(context);
- }
- @Override
- protected void map(WritableComparable key, HCatRecord value,
- Context context) throws IOException, InterruptedException {
- guid.set(value.getString("guid", schema));
- context.write(guid, one);
- }
- }
- @SuppressWarnings("rawtypes")
- public static class Reduce extends
- Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
- HCatSchema schema;
- @Override
- protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
- throws IOException, InterruptedException {
- schema = HCatOutputFormat.getTableSchema(context);
- }
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
- int sum = 0;
- Iterator<IntWritable> iter = values.iterator();
- while (iter.hasNext()) {
- sum++;
- iter.next();
- }
- HCatRecord record = new DefaultHCatRecord(2);
- record.setString("guid", schema, key.toString());
- record.setInteger("count", schema, sum);
- context.write(null, record);
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- String dbname = args[0];
- String inputTable = args[1];
- String filter = args[2];
- String outputTable = args[3];
- int reduceNum = Integer.parseInt(args[4]);
- Job job = new Job(conf,
- "GroupByGuid, Calculating every guid's pageview");
- HCatInputFormat.setInput(job,
- InputJobInfo.create(dbname, inputTable, filter));
- job.setJarByClass(GroupByGuid.class);
- job.setInputFormatClass(HCatInputFormat.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(WritableComparable.class);
- job.setOutputValueClass(DefaultHCatRecord.class);
- job.setNumReduceTasks(reduceNum);
- HCatOutputFormat.setOutput(job,
- OutputJobInfo.create(dbname, outputTable, null));
- HCatSchema s = HCatOutputFormat.getTableSchema(job);
- HCatOutputFormat.setSchema(job, s);
- job.setOutputFormatClass(HCatOutputFormat.class);
- return (job.waitForCompletion(true) ? 0 : 1);
- }
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new GroupByGuid(), args);
- System.exit(exitCode);
- }
- }
其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了
本文链接http://blog.csdn.net/lalaguozhe/article/details/9083905,转载请注明
相关推荐
Hcatalog架构主要由Metastore和Data Storage组成,Metastore负责管理数据仓库的元数据,而Data Storage负责存储和检索数据。Hcatalog的特点是高可扩展性和高性能,使得其广泛应用于大数据处理领域。 12. Sqoop...
Sentry最初的设计目标是为了与Hive/Hcatalog、Apache Solr和Cloudera Impala等组件集成,并计划未来扩展到HDFS和HBase等更多Hadoop组件。 #### 二、Sentry的作用及背景 ##### A. Sentry为Hadoop带来的便利 - **...
- **发展与扩展**:自2006年以来,Hadoop家族迅速壮大,产生了许多顶级项目,如YARN、Hcatalog、Oozie和Cassandra,以应对不断发展的大数据需求。 3. **R语言与Hadoop的结合**: - **互补性**:Hadoop擅长全量...
6. **工具与框架集成** - 强调了Hadoop生态系统中各种工具和框架的重要性,如Pig、HCatalog、HBase等,并提供了详细的使用指南。 7. **云部署与管理** - 指导读者如何在云环境中部署和管理Hadoop集群,充分利用...
这包括但不限于 HDFS、MapReduce、Hive、Pig、HBase、Zookeeper、Sqoop 和 HCatalog 等。Ambari 的目标是使 Hadoop 及其相关的大数据软件更易于使用。 ##### 2. Ambari取得的成绩 - **简化集群供应**:通过逐步的...
大数据系列-Hive入门与实战 Hive 是什么? ---------------- ...同时,Hive 的安装、开发和使用都需要了解 Hive 的基本概念和语法,需要掌握 Hive 的设计特征、体系结构、用户接口、应用场景等知识。
例如,如果计划安装和管理HDP 2.3.4或更高版本,则必须使用Ambari 2.2.0或更高版本。 - Ambari并不需要安装Hue或Solr等额外组件,但这些组件可以在安装过程中作为选项添加。 ##### 1.2 系统最低需求 - **操作系统...
#### 一、Ambari简介 Apache Ambari是一款基于Web的工具,旨在简化Apache Hadoop集群的部署、管理与监控过程。Ambari支持广泛的Hadoop生态组件,如HDFS、MapReduce、Hive、Pig、HBase、ZooKeeper、Sqoop、Hcatalog等...
#### 一、Ambari简介与功能 ##### 1.1 Ambari概述 - **来源**: Apache Ambari项目源自Apache官方,其官方网站为http://ambari.apache.org/。 - **目标**: 通过开发一系列工具简化Hadoop集群的配置、监控及管理过程。...
- **HCatalog:** 提供了对Hadoop数据的表格视图,简化了对Hadoop数据的管理。 - **Pig:** 一种高级数据分析工具,用于编写简单脚本来处理大型数据集。 - **Hive:** 一种数据仓库基础设施,提供了SQL-like查询语言...