`
hugh.wangp
  • 浏览: 293444 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

HIVE UDF/UDAF/UDTF的Map Reduce代码框架模板

    博客分类:
  • HIVE
阅读更多

 

自己写代码时候的利用到的模板

UDF步骤:
1.必须继承org.apache.hadoop.hive.ql.exec.UDF
2.必须实现evaluate函数,evaluate函数支持重载

package com.alibaba.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF

public class helloword extends UDF{
     public String evaluate(){
          return "hello world!";
     }

     public String evaluate(String str){
          return "hello world: " + str;
     }
}
 


UDAF步骤:
1.必须继承
     org.apache.hadoop.hive.ql.exec.UDAF(函数类继承)
     org.apache.hadoop.hive.ql.exec.UDAFEvaluator(内部类Evaluator实现UDAFEvaluator接口)
2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
     init():类似于构造函数,用于UDAF的初始化
     iterate():接收传入的参数,并进行内部的轮转。其返回类型为boolean
     terminatePartial():无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner(iterate--mapper;terminatePartial--reducer)
     merge():接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
     terminate():返回最终的聚集函数结果

package com.alibaba.hive;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class myAVG extends UDAF{

     public static class avgScore{
          private long pSum;
          private double pCount;
     }
     
     public static class AvgEvaluator extends UDAFEvaluator{
          avgScore score;
          
          public AvgEvaluator(){
               score = new avgScore();
               init();
          }
          
          /*
          *init函数类似于构造函数,用于UDAF的初始化
          */
          public void init(){
               score.pSum = 0;
               score.pCount = 0;
          }
          
          /*
          *iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
          *类似Combiner中的mapper
          */
          public boolean iterate(Double in){
               if(in != null){
                    score.pSum += in;
                    score.pCount ++;
               }
               return true;
          }
          
          /*
          *terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据
          *类似Combiner中的reducer
          */
          public avgScore terminatePartial(){
               return score.pCount == 0 ? null : score;
          }
          
          /*
          *merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
          */
          public boolean merge(avgScore in){
               if(in != null){
                    score.pSum += in.pSum;
                    score.pCount += in.pCount;
               }
               return true;
          }
          
          /*
          *terminate返回最终的聚集函数结果
          */
          public Double terminate(){
               return score.pCount == 0 ? null : Double.valueof(score.pSum/score.pCount);
          }
     }
}
 

UDTF步骤:
1.必须继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
3.UDTF首先会
     a.调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)
     b.初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回
     c.最后close()方法调用,对需要清理的方法进行清理

public class GenericUDTFExplode extends GenericUDTF {

  private ListObjectInspector listOI = null;

  @Override
  public void close() throws HiveException {
  }

  @Override
  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
    if (args.length != 1) {
      throw new UDFArgumentException("explode() takes only one argument");
    }

    if (args[0].getCategory() != ObjectInspector.Category.LIST) {
      throw new UDFArgumentException("explode() takes an array as a parameter");
    }
    listOI = (ListObjectInspector) args[0];

    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    fieldNames.add("col");
    fieldOIs.add(listOI.getListElementObjectInspector());
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
        fieldOIs);
  }

  private final Object[] forwardObj = new Object[1];

  @Override
  public void process(Object[] o) throws HiveException {
    List<?> list = listOI.getList(o[0]);
    if(list == null) {
      return;
    }
    for (Object r : list) {
      forwardObj[0] = r;
      forward(forwardObj);
    }
  }

  @Override
  public String toString() {
    return "explode";
  }
}
 


 

0
1
分享到:
评论

相关推荐

    Hive - A Warehousing Solution Over a Map-Reduce.pdf

    ### Hive:一种基于Map-Reduce的数据仓库解决方案 #### 一、引言 随着大数据时代的到来,数据集的规模正以前所未有的速度增长。这些数据不仅来自传统的商业领域,也涵盖了社交媒体、物联网等多个新兴领域。面对...

    Hive大数据技术原理与实践.pptx

    * 可扩充 UDF/UDAF/UDTF Hive 的缺点包括: * 延迟较高,性能有提升空间 * 不支持事务类操作 Hive 提供数据提取、转换、加载功能,并可用类似于 SQL 的语法,对 HDFS 海量数据库中的数据进行查询统计等操作。Hive...

    基于Facebook的Hive开发

    - **可插拔的UDF/UDAF/UDTF**:支持用户定义函数,包括聚合函数和表生成函数,以适应特定业务需求。 - **复杂对象类型支持**:可以处理JSON、XML等非结构化数据。 - **列式存储支持**:通过列式存储格式提高查询性能...

    Hive教程.pdf

    - **可扩展性强**: 可以通过自定义函数(UDF/UDAF/UDTF)来扩展Hive的功能。 - **支持多种数据格式**: 支持多种存储格式,如TextFile、SequenceFile、ORC等。 - **高容错性**: 基于Hadoop的分布式文件系统(HDFS...

    hive_have_null_id.tar.gz

    10. **Hive的存储过程(UDF、UDAF、UDTF)**: 用户自定义函数(UDF)、用户自定义聚合函数(UDAF)和用户自定义表生成函数(UDTF)允许用户扩展Hive的功能,处理特定的业务逻辑。 以上是基于“hive”标签的一些基础...

    Hive语法详解.docx (排版清晰,覆盖全面,含目录)

    #### 四、Hive自定义函数(UDF, UDAF, UDTF) - **UDF:User-Defined Function**:用户自定义函数,用于扩展Hive的功能。 - **UDAF:User-Defined Aggregation Function**:用户自定义聚合函数,用于实现特定的聚合...

    hive简介共5页.pdf.zip

    1. **Hive概述**:Hive是一个数据仓库工具,可将结构化的数据文件映射为一张数据库表,并提供SQL-like接口进行查询,同时也支持用户自定义函数(UDF)、自定义聚合函数(UDAF)和用户自定义分区函数(UDTF)。...

    用户推荐Slope One算法与mapreduce&hive实现

    在这个文件中,开发者可能定义了自定义的Hive UDF(用户定义函数)或者UDAF(用户定义聚合函数),以便在Hive SQL中直接调用Slope One的预测逻辑。这样,我们就可以在分布式环境中高效地处理大规模的推荐任务。 ...

    史上最全的大数据面试题,大数据开发者必看.pdf

    - Hive优化包括:使用`sort by`代替`order by`(局部排序)、静态分区、减少job和task数量(例如使用JOIN操作)、解决数据倾斜问题(启用`hive.groupby.skewindata`)、合并小文件、以及使用UDF和UDAF。 4. Hbase...

    史上最全的大数据面试题,大数据开发者必看.docx

    - **Hive优化**:包括排序优化(如使用sort by而非order by)、分区优化、减少job和task数量、解决数据倾斜问题、小文件合并以及使用UDF和UDAF等。 3. **HBase存储与优化**: - **RowKey设计**:RowKey是HBase中...

    Hadoop学习时间轴

    - **代码定制化**:针对特定的需求或问题,可以考虑对Hadoop和Hive的源代码进行定制化的修改和扩展。 综上所述,Hadoop的学习不仅涉及到理论知识的理解和掌握,还需要通过大量的实践来加深理解和提高技能水平。同时...

    史上最全的大数据面试题-大数据开发者必看.docx

    - **使用 UDF 或 UDAF 函数**:利用自定义函数提高查询灵活性和效率。 ### 3\. HBase 的 RowKey 设计与优化 #### 3.1 RowKey 设计原则 - **字典排序**:RowKey 应按照字典顺序排序。 - **长度优化**:RowKey 越短...

    Hadoop权威指南(中文版)2015上传.rar

    map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据块 namenode和...

    Hadoop权威指南 第二版(中文版)

     map阶段和reduce阶段  横向扩展  合并函数  运行一个分布式的MapReduce作业  Hadoop的Streaming  Ruby版本  Python版本  Hadoop Pipes  编译运行 第3章 Hadoop分布式文件系统  HDFS的设计  HDFS的概念 ...

    大数据课程体系.docx

    - **用户自定义函数(UDF和UDAF)的开发与演示**:指导如何开发UDF和UDAF,并提供示例代码。 - **Hive优化**:分享Hive性能优化的策略和技术。 #### 六、数据迁移工具Sqoop - **Sqoop简介和配置**:概述Sqoop的作用...

    weblog-KPI:flume采集日志,MapReduce清洗日志,HiveETL

    Java API提供了丰富的类和接口,使得开发者可以轻松实现自定义的mapper和reducer函数,以及Hive的UDF(User Defined Function)和UDAF(User Defined Aggregation Function)。此外,HiveQL允许使用类似SQL的语法...

    大数据课程体系

    - **用户自定义函数(UDF和UDAF)的开发与演示**:开发自定义函数以满足特定需求。 - **Hive优化**:采用各种策略提高Hive查询效率。 #### 五、数据迁移工具Sqoop - **Sqoop简介和配置**:了解Sqoop的功能及其配置...

    hadoop_the_definitive_guide_3nd_edition

    Map and Reduce 20 Java MapReduce 22 Scaling Out 30 Data Flow 31 Combiner Functions 34 Running a Distributed MapReduce Job 37 Hadoop Streaming 37 Ruby 37 Python 40 iii www.it-ebooks.info Hadoop Pipes ...

Global site tag (gtag.js) - Google Analytics