`
dacoolbaby
  • 浏览: 1268421 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Hive-UDAF开发指南

    博客分类:
  • Hive
阅读更多

refer to:http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

 

在用Hive进行ETL的时候,对于一些复杂的数据处理逻辑,往往不能用简单的HQL来解决,这个时候就需要使用UDAF了。

 

对于底层的内容还没有细看,先从应用的角度来说一下吧。

使用UDAF需要实现接口GenericUDAFResolver2,或者继承抽象类AbstractGenericUDAFResolver。

 

UDAF主要分为2个部分,第一个部分是对传入参数进行校验,数据类型的校验。然后根据传入的数据类型不同调用具体的处理逻辑。

比如说,自己写了一个SUM,SUM对于Long类型和Double类型进行求和,没有问题。

但是,如果传入的参数是一个Array呢?这个时候,就需要在Evaluator方法里面,对参数进行校验了。

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
      throws SemanticException {
    if (parameters.length != 1) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Exactly one argument is expected.");
    }

    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed.");
    }
    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
    case BYTE:
    case SHORT:
    case INT:
    case LONG:
    case FLOAT:
    case DOUBLE:
    case STRING:
    case TIMESTAMP:
      return new GenericUDAFAverageEvaluator();
    case BOOLEAN:
    default:
      throw new UDFArgumentTypeException(0,
          "Only numeric or string type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed.");
    }
  }

 

这个方法只支持Primitive类型,也就是INT,String,Double,Float这些。

 

UDAF使用一个ObjectInspector来抽象化每一行数据的读取。

上面使用的Primitive类型的数据,所以使用PrimitiveObjectInspector来读取传入的参数。

 

UDAF会根据不同的计算模型,产生不同的阶段。

如:SUM()聚合函数,接受一个原始类型的整型数值,然后创建一个整型的PARTIAL数据,
返回一个固定的整型结果。

如:median() 中位数
可以接受原始整型输入,然后会产生一个中间的整数PARTIAL数据(排序),
然后再返回一个固定的整型结果。

 

注意:

聚合操作会在reduce的环境下执行,然后由一个Java进程的内存大小限制这个操作。
因此像排序大结构体的数据,可能会产生对内存不足的异常。
一般情况下可以增加内存来解决这个问题。
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx200m</value>
</property>

  

在处理逻辑之前,介绍一下UDAF的Mode。

UDAF的Mode,也就是执行阶段。无论怎样的UDAF,最终都会变成MapReduce Job。

Mode是一UDAF的使用类型,主要有4种形势:

因为MapReduce可能是,Map->Reduce也可能是,Map->Reduce->Reduce

public static enum Mode {
    /**
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用iterate()和terminatePartial()
     */
    PARTIAL1,
        /**
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
     * 将会调用merge() 和 terminatePartial() 
     */
    PARTIAL2,
        /**
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
     * 将会调用merge()和terminate()
     */
    FINAL,
        /**
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
      * 将会调用 iterate()和terminate()
     */
    COMPLETE
};

 

 

有的UDAF函数会可以像UDF函数那样使用,有的必须在聚合函数环境下使用,如group by,over(partition by )

而在使用UDAF进行计算的时候,会启用一个init方法。这个init的方法会在买个阶段前面都启动一次。第一次启动的时候,参数指的是读入每一行记录的参数。第二次启动的时候,传入的参数只有1个,指的是中间结果的参数。这里需要特别注意。

@Override
		public ObjectInspector init(Mode m, ObjectInspector[] parameters)
				throws HiveException {
			super.init(m, parameters);
			
			//init input
			if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有
				LOG.info(" Mode:"+m.toString()+" result has init");
				inputOI = (PrimitiveObjectInspector) parameters[0];
				inputOI2 = (PrimitiveObjectInspector) parameters[1];
//				result = new DoubleWritable(0);
			}
			//init output
			if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
				outputOI = (PrimitiveObjectInspector) parameters[0];
				result = new DoubleWritable(0);
				return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
			}else{
				result = new DoubleWritable(0);
				return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
			} 
			
		}

 所以我们使用枚举方法,根据init启动阶段的不同,接入不同的参数。

 

实现UDAF的时候,实际就是一个Reducer

 

对于计算过程中的中间结果,会有一个Buffer对象来进行缓冲。

Buffer对象相当于Reducer里面记录结果集的一个内存对象。

 

这里面可以大大的发挥想象,作出你想要的各种数据类型。

另外,在UDAF输出的时候,也可以输出Struct,Array类型的数据。

这一部分等到用到再进行研究吧。

 

最后是完整的UDAF代码。实现一个有条件的SUM,传入2个参数,当第二个参数>1 的时候进行SUM。

 

package com.test.udaf;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.util.StringUtils;

public class GenericUdafMemberLevel2 extends AbstractGenericUDAFResolver {
	private static final Log LOG = LogFactory
			.getLog(GenericUdafMemberLevel2.class.getName());
	
	@Override
	  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
	    throws SemanticException {
	    
	    return new GenericUdafMeberLevelEvaluator();
	  }
	
	public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator {
		private PrimitiveObjectInspector inputOI;
		private PrimitiveObjectInspector inputOI2;
		private PrimitiveObjectInspector outputOI;
		private DoubleWritable result;

		@Override
		public ObjectInspector init(Mode m, ObjectInspector[] parameters)
				throws HiveException {
			super.init(m, parameters);
			
			//init input
			if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有
				LOG.info(" Mode:"+m.toString()+" result has init");
				inputOI = (PrimitiveObjectInspector) parameters[0];
				inputOI2 = (PrimitiveObjectInspector) parameters[1];
//				result = new DoubleWritable(0);
			}
			//init output
			if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
				outputOI = (PrimitiveObjectInspector) parameters[0];
				result = new DoubleWritable(0);
				return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
			}else{
				result = new DoubleWritable(0);
				return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
			} 
			
		}

		/** class for storing count value. */
		static class SumAgg implements AggregationBuffer {
			boolean empty;
			double value;
		}

		@Override
		//创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。
		//使用buffer对象前,先进行内存的清空——reset
		public AggregationBuffer getNewAggregationBuffer() throws HiveException {
			SumAgg buffer = new SumAgg();
			reset(buffer);
			return buffer;
		}

		@Override
		//重置为0
		//mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
		public void reset(AggregationBuffer agg) throws HiveException {
			((SumAgg) agg).value = 0.0;
			((SumAgg) agg).empty = true;
		}

		private boolean warned = false;
		//迭代
	    //只要把保存当前和的对象agg,再加上输入的参数,就可以了。
		@Override
		public void iterate(AggregationBuffer agg, Object[] parameters)
				throws HiveException {
			// parameters == null means the input table/split is empty
			if (parameters == null) {
				return;
			}
			try {
				double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2);
				if(flag > 1.0)   //参数条件
					merge(agg, parameters[0]);   //这里将迭代数据放入combiner进行合并
		      } catch (NumberFormatException e) {
		        if (!warned) {
		          warned = true;
		          LOG.warn(getClass().getSimpleName() + " "
		              + StringUtils.stringifyException(e));
		        }
		      }

		}

		@Override
		//这里的操作就是具体的聚合操作。
		public void merge(AggregationBuffer agg, Object partial) {
			if (partial != null) {
				// 通过ObejctInspector取每一个字段的数据
				if (inputOI != null) {
					double p = PrimitiveObjectInspectorUtils.getDouble(partial,
							inputOI);
					LOG.info("add up 1:" + p);
					((SumAgg) agg).value += p;
				} else {
					double p = PrimitiveObjectInspectorUtils.getDouble(partial,
							outputOI);
					LOG.info("add up 2:" + p);
					((SumAgg) agg).value += p;
				}
			}
		}


		@Override
		public Object terminatePartial(AggregationBuffer agg) {
				return terminate(agg);
		}
		
		@Override
		public Object terminate(AggregationBuffer agg){
			SumAgg myagg = (SumAgg) agg;
			result.set(myagg.value);
			return result;
		}
	}
}

 

 在使用Hive的UDAF,需要使用ADD JAR语句,将UDAF方程上传到Hadoop Distributed Cache,让每一个DataNode都能共享到这个jar包。

然后才进行调用

hive> add jar /home/daxingyu930/test_sum.jar;
hive> drop temporary function sum_test;
hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel';

hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel2';

hive> select sum_test(height,2.0) from student_height;

 

附录:关于UDAF流程介绍 

init  当实例化UDAF evaluator的时候执行。
getNewAggregationBuffer  返回一个对象用来保存临时的聚合结果集。
iterate  将一条新的数据处理放到聚合内存块中(aggregation buffer)
terminateParital  返回现有的聚合好的一个持久化的路径,相当于数据对象。这些数据可以通过Hive的数据类型可来访问,这个数据对象可以被Java理解,如Integer,String,或者是Array,Map这种。
相当于第二次MapReduce的map阶段。
merge   将partital数据(分区汇总的数据),于terminateParital数据融合在一起
terminate 返回一个最终的数据聚合结果,是一个结果,或者是一个结果集。

在init阶段,hive会自动检测最终生成的object inspector。
并获取使用聚合函数所处的mode。

iterate和 terminalPartial 都是在map阶段
而terminate和merge 都是在reduce阶段。

merge则用来聚合结果集

注意,无论使用UDF和UDAF,尽可能少地使用new关键字,可以使用静态类。
这样可以减少JVM的GC操作,提高效率。

 

  

 

 

 

 

0
0
分享到:
评论
1 楼 秦时明月黑 2013-08-09  
ding

相关推荐

    hive-udf-collections:Hive 用户定义函数 (UDF) 集合

    - Hive UDFs 分为三类:UDF(单行函数),UDAF(聚合函数)和 UDTF(多行函数)。`hive-udf-collections` 主要关注 UDF。 - UDF 允许用户扩展 Hive 的功能,解决内置函数无法满足的特定需求。 - UDFs 必须用 Java...

    《Hive的开发指南》

    Hive 基本概念 Hive 应用场景。 Hive 与hadoop的关系。 Hive 与传统数据库对比。 Hive 的数据存储机制。 Hive 基本操作 ...Hive 中的DDL操作。...Hive UDF/UDAF开发实例。 Hive 执行过程分析及优化策略

    Hive编程指南-2013.12.pdf

    总之,《Hive编程指南-2013.12》详细阐述了如何使用Hive进行大数据分析,无论是初学者还是经验丰富的开发人员,都能从中获取宝贵的知识和实践经验。通过学习这份指南,读者可以掌握Hive的基本操作,并了解如何利用其...

    设计开发 Hive 编程指南 完整版

    《设计开发 Hive 编程指南 完整版》是一份详尽的教程,旨在帮助开发者深入理解和高效使用 Apache Hive 进行大数据处理。Hive 是一个基于 Hadoop 的数据仓库工具,可将结构化的数据文件映射为一张数据库表,并提供 ...

    Hive编程指南 PDF 中文高清版

    - **可扩展性**:Hive支持自定义函数,如用户定义函数(UDF)、用户定义聚合函数(UDAF)和用户定义表生成函数(UDTF),这些函数允许用户在MapReduce作业中执行特定的计算任务。 - **存储格式灵活性**:Hive支持多种存储...

    Hive编程指南

    6. **Hive的高级特性**:讨论UDF(用户自定义函数)、UDAF(用户自定义聚合函数)和UDTF(用户自定义表生成函数)的开发和使用,以及Hive与Spark、Tez等执行引擎的集成。 7. **Hive的分布式计算模型**:解释Hive...

    hive函数大全.7z

    5. **高级特性**:可能会涉及UDF(用户定义函数)、UDAF(用户定义聚合函数)和UDTF(用户定义表生成函数)的开发和使用,这些允许用户自定义函数来扩展Hive的功能。 6. **性能优化**:如何通过分区、桶、缓存等...

    快速学习-Hive函数

    在大数据处理领域,Apache Hive 是一个非常重要的工具,它提供了SQL-like 的查询语言,使得...记得查阅官方文档(https://cwiki.apache.org/confluence/display/Hive/HivePlugins),以获取最新的函数信息和开发指南。

    Spark_SQL大数据实例开发教程.pdf by Spark_SQL大数据实例开发教程.pdf (z-lib.org)1

    《Spark SQL大数据实例开发教程》是一本专注于Spark SQL学习的指南,由王家林和祝茂农等人编著。本书旨在帮助企业级开发人员深入理解和掌握Spark SQL,它在Spark生态系统中扮演着至关重要的角色,是处理大规模数据的...

    Hadoop权威指南 第二版(中文版)

     1.8.2 编写UDAF 第13章 HBase  2.1 HBasics  2.1.1 背景  2.2 概念  2.2.1 数据模型的“旋风之旅”  2.2.2 实现  2.3 安装  2.3.1 测试驱动  2.4 客户机  2.4.1 Java  2.4.2 Avro,REST,以及Thrift  ...

    大数据课程体系.docx

    - **用户自定义函数(UDF和UDAF)的开发与演示**:指导如何开发UDF和UDAF,并提供示例代码。 - **Hive优化**:分享Hive性能优化的策略和技术。 #### 六、数据迁移工具Sqoop - **Sqoop简介和配置**:概述Sqoop的作用...

    Hadoopahive-install.zip_单片机开发_Java_

    描述中提到的“HADOOP用户开发手册”表明这是一份面向开发者的指南,帮助他们理解和使用Hadoop平台,特别是基于Hadoop的Hive开发架构,暗示了内容可能涵盖了Hive的查询语言、数据处理和分析等。标签提到了“单片机...

    Hadoop权威指南(中文版)2015上传.rar

    1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的"旋风之旅" 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift 2.5 示例 2.5.1 模式 2.5.2 ...

Global site tag (gtag.js) - Google Analytics