refer to:http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html
在用Hive进行ETL的时候,对于一些复杂的数据处理逻辑,往往不能用简单的HQL来解决,这个时候就需要使用UDAF了。
对于底层的内容还没有细看,先从应用的角度来说一下吧。
使用UDAF需要实现接口GenericUDAFResolver2,或者继承抽象类AbstractGenericUDAFResolver。
UDAF主要分为2个部分,第一个部分是对传入参数进行校验,数据类型的校验。然后根据传入的数据类型不同调用具体的处理逻辑。
比如说,自己写了一个SUM,SUM对于Long类型和Double类型进行求和,没有问题。
但是,如果传入的参数是一个Array呢?这个时候,就需要在Evaluator方法里面,对参数进行校验了。
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { if (parameters.length != 1) { throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected."); } if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + parameters[0].getTypeName() + " is passed."); } switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) { case BYTE: case SHORT: case INT: case LONG: case FLOAT: case DOUBLE: case STRING: case TIMESTAMP: return new GenericUDAFAverageEvaluator(); case BOOLEAN: default: throw new UDFArgumentTypeException(0, "Only numeric or string type arguments are accepted but " + parameters[0].getTypeName() + " is passed."); } }
这个方法只支持Primitive类型,也就是INT,String,Double,Float这些。
UDAF使用一个ObjectInspector来抽象化每一行数据的读取。
上面使用的Primitive类型的数据,所以使用PrimitiveObjectInspector来读取传入的参数。
UDAF会根据不同的计算模型,产生不同的阶段。
如:SUM()聚合函数,接受一个原始类型的整型数值,然后创建一个整型的PARTIAL数据,
返回一个固定的整型结果。
如:median() 中位数
可以接受原始整型输入,然后会产生一个中间的整数PARTIAL数据(排序),
然后再返回一个固定的整型结果。
注意:
聚合操作会在reduce的环境下执行,然后由一个Java进程的内存大小限制这个操作。 因此像排序大结构体的数据,可能会产生对内存不足的异常。 一般情况下可以增加内存来解决这个问题。 <property> <name>mapred.child.java.opts</name> <value>-Xmx200m</value> </property>
在处理逻辑之前,介绍一下UDAF的Mode。
UDAF的Mode,也就是执行阶段。无论怎样的UDAF,最终都会变成MapReduce Job。
Mode是一UDAF的使用类型,主要有4种形势:
因为MapReduce可能是,Map->Reduce也可能是,Map->Reduce->Reduce
public static enum Mode { /** * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合 * 将会调用iterate()和terminatePartial() */ PARTIAL1, /** * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合: * 将会调用merge() 和 terminatePartial() */ PARTIAL2, /** * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 * 将会调用merge()和terminate() */ FINAL, /** * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合 * 将会调用 iterate()和terminate() */ COMPLETE };
有的UDAF函数会可以像UDF函数那样使用,有的必须在聚合函数环境下使用,如group by,over(partition by )
而在使用UDAF进行计算的时候,会启用一个init方法。这个init的方法会在买个阶段前面都启动一次。第一次启动的时候,参数指的是读入每一行记录的参数。第二次启动的时候,传入的参数只有1个,指的是中间结果的参数。这里需要特别注意。
@Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); //init input if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有 LOG.info(" Mode:"+m.toString()+" result has init"); inputOI = (PrimitiveObjectInspector) parameters[0]; inputOI2 = (PrimitiveObjectInspector) parameters[1]; // result = new DoubleWritable(0); } //init output if (m == Mode.PARTIAL2 || m == Mode.FINAL) { outputOI = (PrimitiveObjectInspector) parameters[0]; result = new DoubleWritable(0); return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; }else{ result = new DoubleWritable(0); return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; } }
所以我们使用枚举方法,根据init启动阶段的不同,接入不同的参数。
实现UDAF的时候,实际就是一个Reducer
对于计算过程中的中间结果,会有一个Buffer对象来进行缓冲。
Buffer对象相当于Reducer里面记录结果集的一个内存对象。
这里面可以大大的发挥想象,作出你想要的各种数据类型。
另外,在UDAF输出的时候,也可以输出Struct,Array类型的数据。
这一部分等到用到再进行研究吧。
最后是完整的UDAF代码。实现一个有条件的SUM,传入2个参数,当第二个参数>1 的时候进行SUM。
package com.test.udaf; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.util.StringUtils; public class GenericUdafMemberLevel2 extends AbstractGenericUDAFResolver { private static final Log LOG = LogFactory .getLog(GenericUdafMemberLevel2.class.getName()); @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { return new GenericUdafMeberLevelEvaluator(); } public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator { private PrimitiveObjectInspector inputOI; private PrimitiveObjectInspector inputOI2; private PrimitiveObjectInspector outputOI; private DoubleWritable result; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); //init input if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有 LOG.info(" Mode:"+m.toString()+" result has init"); inputOI = (PrimitiveObjectInspector) parameters[0]; inputOI2 = (PrimitiveObjectInspector) parameters[1]; // result = new DoubleWritable(0); } //init output if (m == Mode.PARTIAL2 || m == Mode.FINAL) { outputOI = (PrimitiveObjectInspector) parameters[0]; result = new DoubleWritable(0); return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; }else{ result = new DoubleWritable(0); return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; } } /** class for storing count value. */ static class SumAgg implements AggregationBuffer { boolean empty; double value; } @Override //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。 //使用buffer对象前,先进行内存的清空——reset public AggregationBuffer getNewAggregationBuffer() throws HiveException { SumAgg buffer = new SumAgg(); reset(buffer); return buffer; } @Override //重置为0 //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。 public void reset(AggregationBuffer agg) throws HiveException { ((SumAgg) agg).value = 0.0; ((SumAgg) agg).empty = true; } private boolean warned = false; //迭代 //只要把保存当前和的对象agg,再加上输入的参数,就可以了。 @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { // parameters == null means the input table/split is empty if (parameters == null) { return; } try { double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2); if(flag > 1.0) //参数条件 merge(agg, parameters[0]); //这里将迭代数据放入combiner进行合并 } catch (NumberFormatException e) { if (!warned) { warned = true; LOG.warn(getClass().getSimpleName() + " " + StringUtils.stringifyException(e)); } } } @Override //这里的操作就是具体的聚合操作。 public void merge(AggregationBuffer agg, Object partial) { if (partial != null) { // 通过ObejctInspector取每一个字段的数据 if (inputOI != null) { double p = PrimitiveObjectInspectorUtils.getDouble(partial, inputOI); LOG.info("add up 1:" + p); ((SumAgg) agg).value += p; } else { double p = PrimitiveObjectInspectorUtils.getDouble(partial, outputOI); LOG.info("add up 2:" + p); ((SumAgg) agg).value += p; } } } @Override public Object terminatePartial(AggregationBuffer agg) { return terminate(agg); } @Override public Object terminate(AggregationBuffer agg){ SumAgg myagg = (SumAgg) agg; result.set(myagg.value); return result; } } }
在使用Hive的UDAF,需要使用ADD JAR语句,将UDAF方程上传到Hadoop Distributed Cache,让每一个DataNode都能共享到这个jar包。
然后才进行调用
hive> add jar /home/daxingyu930/test_sum.jar; hive> drop temporary function sum_test; hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel'; hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel2'; hive> select sum_test(height,2.0) from student_height;
附录:关于UDAF流程介绍
init 当实例化UDAF evaluator的时候执行。
getNewAggregationBuffer 返回一个对象用来保存临时的聚合结果集。
iterate 将一条新的数据处理放到聚合内存块中(aggregation buffer)
terminateParital 返回现有的聚合好的一个持久化的路径,相当于数据对象。这些数据可以通过Hive的数据类型可来访问,这个数据对象可以被Java理解,如Integer,String,或者是Array,Map这种。
相当于第二次MapReduce的map阶段。
merge 将partital数据(分区汇总的数据),于terminateParital数据融合在一起
terminate 返回一个最终的数据聚合结果,是一个结果,或者是一个结果集。
在init阶段,hive会自动检测最终生成的object inspector。
并获取使用聚合函数所处的mode。
iterate和 terminalPartial 都是在map阶段
而terminate和merge 都是在reduce阶段。
merge则用来聚合结果集
注意,无论使用UDF和UDAF,尽可能少地使用new关键字,可以使用静态类。
这样可以减少JVM的GC操作,提高效率。
相关推荐
- Hive UDFs 分为三类:UDF(单行函数),UDAF(聚合函数)和 UDTF(多行函数)。`hive-udf-collections` 主要关注 UDF。 - UDF 允许用户扩展 Hive 的功能,解决内置函数无法满足的特定需求。 - UDFs 必须用 Java...
Hive 基本概念 Hive 应用场景。 Hive 与hadoop的关系。 Hive 与传统数据库对比。 Hive 的数据存储机制。 Hive 基本操作 ...Hive 中的DDL操作。...Hive UDF/UDAF开发实例。 Hive 执行过程分析及优化策略
总之,《Hive编程指南-2013.12》详细阐述了如何使用Hive进行大数据分析,无论是初学者还是经验丰富的开发人员,都能从中获取宝贵的知识和实践经验。通过学习这份指南,读者可以掌握Hive的基本操作,并了解如何利用其...
《设计开发 Hive 编程指南 完整版》是一份详尽的教程,旨在帮助开发者深入理解和高效使用 Apache Hive 进行大数据处理。Hive 是一个基于 Hadoop 的数据仓库工具,可将结构化的数据文件映射为一张数据库表,并提供 ...
- **可扩展性**:Hive支持自定义函数,如用户定义函数(UDF)、用户定义聚合函数(UDAF)和用户定义表生成函数(UDTF),这些函数允许用户在MapReduce作业中执行特定的计算任务。 - **存储格式灵活性**:Hive支持多种存储...
6. **Hive的高级特性**:讨论UDF(用户自定义函数)、UDAF(用户自定义聚合函数)和UDTF(用户自定义表生成函数)的开发和使用,以及Hive与Spark、Tez等执行引擎的集成。 7. **Hive的分布式计算模型**:解释Hive...
5. **高级特性**:可能会涉及UDF(用户定义函数)、UDAF(用户定义聚合函数)和UDTF(用户定义表生成函数)的开发和使用,这些允许用户自定义函数来扩展Hive的功能。 6. **性能优化**:如何通过分区、桶、缓存等...
在大数据处理领域,Apache Hive 是一个非常重要的工具,它提供了SQL-like 的查询语言,使得...记得查阅官方文档(https://cwiki.apache.org/confluence/display/Hive/HivePlugins),以获取最新的函数信息和开发指南。
《Spark SQL大数据实例开发教程》是一本专注于Spark SQL学习的指南,由王家林和祝茂农等人编著。本书旨在帮助企业级开发人员深入理解和掌握Spark SQL,它在Spark生态系统中扮演着至关重要的角色,是处理大规模数据的...
1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的“旋风之旅” 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift ...
- **用户自定义函数(UDF和UDAF)的开发与演示**:指导如何开发UDF和UDAF,并提供示例代码。 - **Hive优化**:分享Hive性能优化的策略和技术。 #### 六、数据迁移工具Sqoop - **Sqoop简介和配置**:概述Sqoop的作用...
描述中提到的“HADOOP用户开发手册”表明这是一份面向开发者的指南,帮助他们理解和使用Hadoop平台,特别是基于Hadoop的Hive开发架构,暗示了内容可能涵盖了Hive的查询语言、数据处理和分析等。标签提到了“单片机...
1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的"旋风之旅" 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift 2.5 示例 2.5.1 模式 2.5.2 ...