`
zhangxiong0301
  • 浏览: 360710 次
社区版块
存档分类
最新评论

hive udaf开发入门和运行过程详解(2)

    博客分类:
  • HIVE
阅读更多

在用Hive进行ETL的时候,对于一些复杂的数据处理逻辑,往往不能用简单的HQL来解决,这个时候就需要使用UDAF了。

 

对于底层的内容还没有细看,先从应用的角度来说一下吧。

使用UDAF需要实现接口GenericUDAFResolver2,或者继承抽象类AbstractGenericUDAFResolver。

 

UDAF主要分为2个部分,第一个部分是对传入参数进行校验,数据类型的校验。然后根据传入的数据类型不同调用具体的处理逻辑。

比如说,自己写了一个SUM,SUM对于Long类型和Double类型进行求和,没有问题。

但是,如果传入的参数是一个Array呢?这个时候,就需要在Evaluator方法里面,对参数进行校验了。

Java代码  收藏代码
  1. public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  
  2.       throws SemanticException {  
  3.     if (parameters.length != 1) {  
  4.       throw new UDFArgumentTypeException(parameters.length - 1,  
  5.           "Exactly one argument is expected.");  
  6.     }  
  7.   
  8.     if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {  
  9.       throw new UDFArgumentTypeException(0,  
  10.           "Only primitive type arguments are accepted but "  
  11.           + parameters[0].getTypeName() + " is passed.");  
  12.     }  
  13.     switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {  
  14.     case BYTE:  
  15.     case SHORT:  
  16.     case INT:  
  17.     case LONG:  
  18.     case FLOAT:  
  19.     case DOUBLE:  
  20.     case STRING:  
  21.     case TIMESTAMP:  
  22.       return new GenericUDAFAverageEvaluator();  
  23.     case BOOLEAN:  
  24.     default:  
  25.       throw new UDFArgumentTypeException(0,  
  26.           "Only numeric or string type arguments are accepted but "  
  27.           + parameters[0].getTypeName() + " is passed.");  
  28.     }  
  29.   }  

 

这个方法只支持Primitive类型,也就是INT,String,Double,Float这些。

 

UDAF使用一个ObjectInspector来抽象化每一行数据的读取。

上面使用的Primitive类型的数据,所以使用PrimitiveObjectInspector来读取传入的参数。

 

UDAF会根据不同的计算模型,产生不同的阶段。

如:SUM()聚合函数,接受一个原始类型的整型数值,然后创建一个整型的PARTIAL数据,
返回一个固定的整型结果。

如:median() 中位数
可以接受原始整型输入,然后会产生一个中间的整数PARTIAL数据(排序),
然后再返回一个固定的整型结果。

 

注意:

Java代码  收藏代码
  1. 聚合操作会在reduce的环境下执行,然后由一个Java进程的内存大小限制这个操作。  
  2. 因此像排序大结构体的数据,可能会产生对内存不足的异常。  
  3. 一般情况下可以增加内存来解决这个问题。  
  4. <property>  
  5. <name>mapred.child.java.opts</name>  
  6. <value>-Xmx200m</value>  
  7. </property>  

  

在处理逻辑之前,介绍一下UDAF的Mode。

UDAF的Mode,也就是执行阶段。无论怎样的UDAF,最终都会变成MapReduce Job。

Mode是一UDAF的使用类型,主要有4种形势:

因为MapReduce可能是,Map->Reduce也可能是,Map->Reduce->Reduce

Java代码  收藏代码
  1. public static enum Mode {  
  2.     /** 
  3.      * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合 
  4.      * 将会调用iterate()和terminatePartial() 
  5.      */  
  6.     PARTIAL1,  
  7.         /** 
  8.      * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合: 
  9.      * 将会调用merge() 和 terminatePartial()  
  10.      */  
  11.     PARTIAL2,  
  12.         /** 
  13.      * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合  
  14.      * 将会调用merge()和terminate() 
  15.      */  
  16.     FINAL,  
  17.         /** 
  18.      * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合 
  19.       * 将会调用 iterate()和terminate() 
  20.      */  
  21.     COMPLETE  
  22. };  

 

 

有的UDAF函数会可以像UDF函数那样使用,有的必须在聚合函数环境下使用,如group by,over(partition by )

而在使用UDAF进行计算的时候,会启用一个init方法。这个init的方法会在买个阶段前面都启动一次。第一次启动的时候,参数指的是读入每一行记录的参数。第二次启动的时候,传入的参数只有1个,指的是中间结果的参数。这里需要特别注意。

Java代码  收藏代码
  1. @Override  
  2.         public ObjectInspector init(Mode m, ObjectInspector[] parameters)  
  3.                 throws HiveException {  
  4.             super.init(m, parameters);  
  5.               
  6.             //init input  
  7.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有  
  8.                 LOG.info(" Mode:"+m.toString()+" result has init");  
  9.                 inputOI = (PrimitiveObjectInspector) parameters[0];  
  10.                 inputOI2 = (PrimitiveObjectInspector) parameters[1];  
  11. //              result = new DoubleWritable(0);  
  12.             }  
  13.             //init output  
  14.             if (m == Mode.PARTIAL2 || m == Mode.FINAL) {  
  15.                 outputOI = (PrimitiveObjectInspector) parameters[0];  
  16.                 result = new DoubleWritable(0);  
  17.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  18.             }else{  
  19.                 result = new DoubleWritable(0);  
  20.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  21.             }   
  22.               
  23.         }  

 所以我们使用枚举方法,根据init启动阶段的不同,接入不同的参数。

 

实现UDAF的时候,实际就是一个Reducer

 

对于计算过程中的中间结果,会有一个Buffer对象来进行缓冲。

Buffer对象相当于Reducer里面记录结果集的一个内存对象。

 

这里面可以大大的发挥想象,作出你想要的各种数据类型。

另外,在UDAF输出的时候,也可以输出Struct,Array类型的数据。

这一部分等到用到再进行研究吧。

 

最后是完整的UDAF代码。实现一个有条件的SUM,传入2个参数,当第二个参数>1 的时候进行SUM。

 

Java代码  收藏代码
  1. package com.test.udaf;  
  2.   
  3. import org.apache.commons.logging.Log;  
  4. import org.apache.commons.logging.LogFactory;  
  5. import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;  
  6. import org.apache.hadoop.hive.ql.metadata.HiveException;  
  7. import org.apache.hadoop.hive.ql.parse.SemanticException;  
  8. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;  
  9. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;  
  10. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;  
  11. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;  
  12. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;  
  13. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;  
  14. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;  
  15. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;  
  16. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;  
  17. import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;  
  18. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;  
  19. import org.apache.hadoop.hive.serde2.io.DoubleWritable;  
  20. import org.apache.hadoop.util.StringUtils;  
  21.   
  22. public class GenericUdafMemberLevel2 extends AbstractGenericUDAFResolver {  
  23.     private static final Log LOG = LogFactory  
  24.             .getLog(GenericUdafMemberLevel2.class.getName());  
  25.       
  26.     @Override  
  27.       public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  
  28.         throws SemanticException {  
  29.           
  30.         return new GenericUdafMeberLevelEvaluator();  
  31.       }  
  32.       
  33.     public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator {  
  34.         private PrimitiveObjectInspector inputOI;  
  35.         private PrimitiveObjectInspector inputOI2;  
  36.         private PrimitiveObjectInspector outputOI;  
  37.         private DoubleWritable result;  
  38.   
  39.         @Override  
  40.         public ObjectInspector init(Mode m, ObjectInspector[] parameters)  
  41.                 throws HiveException {  
  42.             super.init(m, parameters);  
  43.               
  44.             //init input  
  45.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有  
  46.                 LOG.info(" Mode:"+m.toString()+" result has init");  
  47.                 inputOI = (PrimitiveObjectInspector) parameters[0];  
  48.                 inputOI2 = (PrimitiveObjectInspector) parameters[1];  
  49. //              result = new DoubleWritable(0);  
  50.             }  
  51.             //init output  
  52.             if (m == Mode.PARTIAL2 || m == Mode.FINAL) {  
  53.                 outputOI = (PrimitiveObjectInspector) parameters[0];  
  54.                 result = new DoubleWritable(0);  
  55.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  56.             }else{  
  57.                 result = new DoubleWritable(0);  
  58.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  59.             }   
  60.               
  61.         }  
  62.   
  63.         /** class for storing count value. */  
  64.         static class SumAgg implements AggregationBuffer {  
  65.             boolean empty;  
  66.             double value;  
  67.         }  
  68.   
  69.         @Override  
  70.         //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。  
  71.         //使用buffer对象前,先进行内存的清空——reset  
  72.         public AggregationBuffer getNewAggregationBuffer() throws HiveException {  
  73.             SumAgg buffer = new SumAgg();  
  74.             reset(buffer);  
  75.             return buffer;  
  76.         }  
  77.   
  78.         @Override  
  79.         //重置为0  
  80.         //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。  
  81.         public void reset(AggregationBuffer agg) throws HiveException {  
  82.             ((SumAgg) agg).value = 0.0;  
  83.             ((SumAgg) agg).empty = true;  
  84.         }  
  85.   
  86.         private boolean warned = false;  
  87.         //迭代  
  88.         //只要把保存当前和的对象agg,再加上输入的参数,就可以了。  
  89.         @Override  
  90.         public void iterate(AggregationBuffer agg, Object[] parameters)  
  91.                 throws HiveException {  
  92.             // parameters == null means the input table/split is empty  
  93.             if (parameters == null) {  
  94.                 return;  
  95.             }  
  96.             try {  
  97.                 double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2);  
  98.                 if(flag > 1.0)   //参数条件  
  99.                     merge(agg, parameters[0]);   //这里将迭代数据放入combiner进行合并  
  100.               } catch (NumberFormatException e) {  
  101.                 if (!warned) {  
  102.                   warned = true;  
  103.                   LOG.warn(getClass().getSimpleName() + " "  
  104.                       + StringUtils.stringifyException(e));  
  105.                 }  
  106.               }  
  107.   
  108.         }  
  109.   
  110.         @Override  
  111.         //这里的操作就是具体的聚合操作。  
  112.         public void merge(AggregationBuffer agg, Object partial) {  
  113.             if (partial != null) {  
  114.                 // 通过ObejctInspector取每一个字段的数据  
  115.                 if (inputOI != null) {  
  116.                     double p = PrimitiveObjectInspectorUtils.getDouble(partial,  
  117.                             inputOI);  
  118.                     LOG.info("add up 1:" + p);  
  119.                     ((SumAgg) agg).value += p;  
  120.                 } else {  
  121.                     double p = PrimitiveObjectInspectorUtils.getDouble(partial,  
  122.                             outputOI);  
  123.                     LOG.info("add up 2:" + p);  
  124.                     ((SumAgg) agg).value += p;  
  125.                 }  
  126.             }  
  127.         }  
  128.   
  129.   
  130.         @Override  
  131.         public Object terminatePartial(AggregationBuffer agg) {  
  132.                 return terminate(agg);  
  133.         }  
  134.           
  135.         @Override  
  136.         public Object terminate(AggregationBuffer agg){  
  137.             SumAgg myagg = (SumAgg) agg;  
  138.             result.set(myagg.value);  
  139.             return result;  
  140.         }  
  141.     }  
  142. }  

 

 在使用Hive的UDAF,需要使用ADD JAR语句,将UDAF方程上传到Hadoop Distributed Cache,让每一个DataNode都能共享到这个jar包。

然后才进行调用

Shell代码  收藏代码
  1. hive> add jar /home/daxingyu930/test_sum.jar;  
  2. hive> drop temporary function sum_test;  
  3. hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel';  
  4.   
  5. hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel2';  
  6.   
  7. hive> select sum_test(height,2.0) from student_height;  

 

附录:关于UDAF流程介绍 

init  当实例化UDAF evaluator的时候执行。
getNewAggregationBuffer  返回一个对象用来保存临时的聚合结果集。
iterate  将一条新的数据处理放到聚合内存块中(aggregation buffer)
terminateParital  返回现有的聚合好的一个持久化的路径,相当于数据对象。这些数据可以通过Hive的数据类型可来访问,这个数据对象可以被Java理解,如Integer,String,或者是Array,Map这种。
相当于第二次MapReduce的map阶段。
merge   将partital数据(分区汇总的数据),于terminateParital数据融合在一起
terminate 返回一个最终的数据聚合结果,是一个结果,或者是一个结果集。

在init阶段,hive会自动检测最终生成的object inspector。
并获取使用聚合函数所处的mode。

iterate和 terminalPartial 都是在map阶段
而terminate和merge 都是在reduce阶段。

merge则用来聚合结果集

注意,无论使用UDF和UDAF,尽可能少地使用new关键字,可以使用静态类。
这样可以减少JVM的GC操作,提高效率。

 

分享到:
评论

相关推荐

    hive udaf 实现按位取与或

    1. 定义UDAF类:创建一个Java类继承Hive提供的抽象类`GenericUDAFResolver2`,并实现`init()`、`iterate()`、`terminatePartial()`、`merge()`和`terminate()`等方法。 2. 初始化(init()):初始化UDAF的状态,例如...

    Hive UDAF示例

    A custom UDAF to group oncatenates all arguments from different rows into a single string.

    hive入门级详解

    Hive 入门级详解 Hive 是一个基于 Hadoop 的数据仓库系统,它提供了一个类似于关系型数据库的查询语言 HQL,并且可以将查询转换为 MapReduce 任务来执行。Hive 的存储结构主要包括三个层面:数据存储层、计算资源层...

    Hive SQL 编译过程详解

    理解Hive SQL的编译过程对于解决Hive的问题、优化SQL查询和定制功能至关重要。通过对MapReduce实现SQL操作原理的深入理解,我们可以更好地掌握Hive的工作机制,从而提高数据分析的效率和准确性。在日常工作中,这样...

    HIVE从入门到精通.pdf

    ### HIVE从入门到精通知识点概述 #### 一、Hive简介 - **背景与需求**:随着商业智能领域数据量的急剧增加,传统的数据仓库解决方案成本高昂,难以满足需求。Hadoop作为一种流行且开源的MapReduce实现,在Yahoo、...

    Hive入门与实战 PDF

    Hive入门与实战 PDF

    HIVE安装及详解

    "HIVE安装及详解" HIVE是一种基于Hadoop的数据仓库工具,主要用于处理和分析大规模数据。下面是关于HIVE的安装及详解。 HIVE基本概念 HIVE是什么?HIVE是一种数据仓库工具,主要用于处理和分析大规模数据。它将...

    Hive 入门级编程全案例详解

    Hive 入门级编程实例详解。涵盖了各类基础函数使用要点以及 Java 编写 Hive 函数等。

    hive:个人配置单元 UDAF

    个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...

    Hive开发规范及要点

    在Hive开发中,遵循一定的规范和要点非常重要,以下是Hive开发规范及要点: 一、基本操作 1. 模糊搜索表:使用`show tables like '*name*';`语句可以搜索包含某个关键字的表名。 2. 查看表结构信息:使用`desc ...

    hive常用的开发规范

    这些规范旨在提高 Hive 开发的标准化、可维护性和系统安全性。遵循这些规范可以确保数据的有序管理,同时降低因为不规范操作导致的问题。在实际工作中,还需要根据具体项目需求和团队协作模式进行适当的调整和优化。

    Hive入门与实战

    Hive入门与实战

    hive编程入门课程

    ### Hive编程入门课程知识点详解 #### 一、Hive概览 Hive是建立在Hadoop之上的数据仓库基础设施,由Facebook赞助开发。它通过提供SQL-like查询语言(HiveQL),使用户能够轻松地对存储在Hadoop分布式文件系统(HDFS...

    Hive从入门到精通资源.zip

    本资源包“Hive从入门到精通资源.zip”包含了学习Hive所需的基本资料和依赖库,旨在帮助初学者快速掌握Hive的使用,并逐步晋升为高级用户。 1. **Hive简介** Hive是Facebook开源的一款基于Hadoop的数据仓库工具,...

    hive hadoop 开发手册

    总的来说,这个压缩包提供的资料对于学习和开发Hadoop及Hive项目非常有价值。通过阅读和理解这些文档,开发者能够掌握如何在Hadoop平台上构建高效的数据仓库系统,利用Hive进行大数据分析,以及遵循HQL的最佳实践来...

    hive开发中常遇到的坑

    ### hive开发中常遇到的坑 ...在Hive开发过程中,正确理解和处理这些问题对于保证数据处理的准确性和效率至关重要。通过对以上几个方面的学习,可以帮助开发者更好地应对日常开发中可能遇到的各种挑战。

    Hive UDF开发

    ### Hive UDF开发详解 #### 一、引言 在大数据处理领域,Apache Hive作为一款广泛使用的数据仓库工具,能够高效地对存储在Hadoop文件系统中的数据进行查询与管理。然而,对于某些特定的数据处理需求,Hive内置的...

    最强HiveSQL开发指南.pdf

    《最强HiveSQL开发指南》是一本专注于Hive性能调优和实战操作的教程,旨在帮助读者深入理解和熟练运用Hive进行大数据处理。Hive作为Apache Hadoop生态系统中的一个组件,主要用于处理和分析大规模分布式存储的数据。...

Global site tag (gtag.js) - Google Analytics