`
yugouai
  • 浏览: 498466 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

hive udaf开发入门和运行过程详解

 
阅读更多

介绍

hive的用户自定义聚合函数(UDAF)是一个很好的功能,集成了先进的数据处理。hive有两种UDAF:简单和通用。顾名思义,简单的 UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用,如可变长度参数列表。通用UDAF可以使用​​所有功能,但是 UDAF就写的比较复杂,不直观。

 

UDAF是需要在hive的sql语句和group by联合使用,hive的group by对于每个分组,只能返回一条记录,这点和mysql不一样,切记。

 

UDAF开发概览

开发通用UDAF有两个步骤,第一个是编写resolver类,第二个是编写evaluator类。resolver负责类型检查,操作符重载。evaluator真正实现UDAF的逻辑。通常来说,顶层UDAF类继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,里面编写嵌套类evaluator 实现UDAF的逻辑。

 

 本文以Hive的内置UDAF sum函数的源代码作为示例讲解。

 

实现 resolver

resolver通常继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是我们更建议继承AbstractGenericUDAFResolver,隔离将来hive接口的变化。

 

GenericUDAFResolver和GenericUDAFResolver2接口的区别是,后面的允许evaluator实现可以访问更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)。

 

public class GenericUDAFSum extends AbstractGenericUDAFResolver {

  static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
    throws SemanticException {
    // Type-checking goes here!
    return new GenericUDAFSumLong(); 
  } 

  public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
    // UDAF logic goes here!
  } 
}

 这个就是UDAF的代码骨架,第一行创建LOG对象,用来写入警告和错误到hive的log。GenericUDAFResolver只需要重写一个方法:getEvaluator,它根据SQL传入的参数类型,返回正确的evaluator。这里最主要是实现操作符的重载。

 

getEvaluator的完整代码如下:

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
    throws SemanticException {
    if (parameters.length != 1) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Exactly one argument is expected.");
    }

    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed.");
    }
    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
    case BYTE:
    case SHORT:
    case INT:
    case LONG:
    case TIMESTAMP:
      return new GenericUDAFSumLong();
    case FLOAT:
    case DOUBLE:
    case STRING:
      return new GenericUDAFSumDouble();
    case BOOLEAN:
    default:
      throw new UDFArgumentTypeException(0,
          "Only numeric or string type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed.");
    }

 这里做了类型检查,如果不是原生类型(即符合类型,array,map此类),则抛出异常,还实现了操作符重载,对于整数类型,使用 GenericUDAFSumLong实现UDAF的逻辑,对于浮点类型,使用GenericUDAFSumDouble实现UDAF的逻辑。

 

 

实现evaluator

所有evaluators必须继承抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子类必须实现它的一些抽象方法,实现UDAF的逻辑。

 

GenericUDAFEvaluator有一个嵌套类Mode,这个类很重要,它表示了udaf在mapreduce的各个阶段,理解Mode的含义,就可以理解了hive的UDAF的运行流程。

 

public static enum Mode {
    /**
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用iterate()和terminatePartial()
     */
    PARTIAL1,
        /**
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
     * 将会调用merge() 和 terminatePartial() 
     */
    PARTIAL2,
        /**
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
     * 将会调用merge()和terminate()
     */
    FINAL,
        /**
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
      * 将会调用 iterate()和terminate()
     */
    COMPLETE
  };

 

一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历 PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历 PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

 

而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

 

下面以GenericUDAFSumLong的evaluator实现讲解

public static class GenericUDAFSumLong extends GenericUDAFEvaluator {

private PrimitiveObjectInspector inputOI;
    private LongWritable result;

   //这个方法返回了UDAF的返回类型,这里确定了sum自定义函数的返回类型是Long类型
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
      assert (parameters.length == 1);
      super.init(m, parameters);
      result = new LongWritable(0);
      inputOI = (PrimitiveObjectInspector) parameters[0];
      return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    }

    /** 存储sum的值的类 */
    static class SumLongAgg implements AggregationBuffer {
      boolean empty;
      long sum;
    }

    //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      SumLongAgg result = new SumLongAgg();
      reset(result);
      return result;
    }
    
    //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
      SumLongAgg myagg = (SumLongAgg) agg;
      myagg.empty = true;
      myagg.sum = 0;
    }

    private boolean warned = false;
  
    //map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。
    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
      assert (parameters.length == 1);
      try {
        merge(agg, parameters[0]);
      } catch (NumberFormatException e) {
        if (!warned) {
          warned = true;
          LOG.warn(getClass().getSimpleName() + " "
              + StringUtils.stringifyException(e));
        }
      }
    }
   //mapper结束要返回的结果,还有combiner结束返回的结果
    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      return terminate(agg);
    }
    
    //combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
    @Override
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
      if (partial != null) {
        SumLongAgg myagg = (SumLongAgg) agg;
        myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
        myagg.empty = false;
      }
    }
     
    //reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
      SumLongAgg myagg = (SumLongAgg) agg;
      if (myagg.empty) {
        return null;
      }
      result.set(myagg.sum);
      return result;
    }

  }

 

除了GenericUDAFSumLong,还有重载的GenericUDAFSumDouble,以上代码都在hive的源码:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum。

 

修改方法注册

修改 ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件,加入编写的UDAF类,并注册名字,或直接导入jar包,声明函数。

FunctionRegistry类包含了hive的所有内置自定义函数。想要更好学习hive的UDAF,建议多看看里面的UDAF。

 

总结

本文的目的是为初学者入门学习udaf,所以介绍了udaf的概览,尤其是udaf的运行过程,这对初学者是比较大的槛。

 

考虑入门,本文简单介绍了sum的UDAF实现,但是如果想要更好理解UDAF的运行过程,建议再看看avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage。avg UDAF对hive的运行流程要控制的更加精细,并判断当前运行的Mode做一定的逻辑处理。

分享到:
评论

相关推荐

    hive udaf 实现按位取与或

    在“hive udaf 实现按位取与或”的场景中,我们主要探讨如何使用UDAF来实现数据的按位逻辑运算,如按位与(AND)和按位或(OR)。 一、Hive UDAF基本概念 UDAF是一种特殊的用户自定义函数,它负责处理一组输入值并...

    Hive UDAF示例

    A custom UDAF to group oncatenates all arguments from different rows into a single string.

    hive入门级详解

    Hive 入门级详解 Hive 是一个基于 Hadoop 的数据仓库系统,它提供了一个类似于关系型数据库的查询语言 HQL,并且可以将查询转换为 MapReduce 任务来执行。Hive 的存储结构主要包括三个层面:数据存储层、计算资源层...

    Hive SQL 编译过程详解

    理解Hive SQL的编译过程对于解决Hive的问题、优化SQL查询和定制功能至关重要。通过对MapReduce实现SQL操作原理的深入理解,我们可以更好地掌握Hive的工作机制,从而提高数据分析的效率和准确性。在日常工作中,这样...

    HIVE从入门到精通.pdf

    ### HIVE从入门到精通知识点概述 #### 一、Hive简介 - **背景与需求**:随着商业智能领域数据量的急剧增加,传统的数据仓库解决方案成本高昂,难以满足需求。Hadoop作为一种流行且开源的MapReduce实现,在Yahoo、...

    HIVE安装及详解

    "HIVE安装及详解" HIVE是一种基于Hadoop的数据仓库工具,主要用于处理和分析大规模数据。下面是关于HIVE的安装及详解。 HIVE基本概念 HIVE是什么?HIVE是一种数据仓库工具,主要用于处理和分析大规模数据。它将...

    Hive入门与实战 PDF

    Hive入门与实战 PDF

    Hive 入门级编程全案例详解

    Hive 入门级编程实例详解。涵盖了各类基础函数使用要点以及 Java 编写 Hive 函数等。

    hive:个人配置单元 UDAF

    个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...

    Hive开发规范及要点

    在Hive开发中,遵循一定的规范和要点非常重要,以下是Hive开发规范及要点: 一、基本操作 1. 模糊搜索表:使用`show tables like '*name*';`语句可以搜索包含某个关键字的表名。 2. 查看表结构信息:使用`desc ...

    hive常用的开发规范

    这些规范旨在提高 Hive 开发的标准化、可维护性和系统安全性。遵循这些规范可以确保数据的有序管理,同时降低因为不规范操作导致的问题。在实际工作中,还需要根据具体项目需求和团队协作模式进行适当的调整和优化。

    Hive入门与实战

    Hive入门与实战

    Hive从入门到精通资源.zip

    本资源包“Hive从入门到精通资源.zip”包含了学习Hive所需的基本资料和依赖库,旨在帮助初学者快速掌握Hive的使用,并逐步晋升为高级用户。 1. **Hive简介** Hive是Facebook开源的一款基于Hadoop的数据仓库工具,...

    《Hive的开发指南》

    Hive 基本概念 Hive 应用场景。 Hive 与hadoop的关系。 Hive 与传统数据库对比。 Hive 的数据存储机制。 Hive 基本操作 ...Hive 中的DDL操作。...在Hive 中如何实现高效的JOIN查询。...Hive 执行过程分析及优化策略

    hive编程入门课程

    ### Hive编程入门课程知识点详解 #### 一、Hive概览 Hive是建立在Hadoop之上的数据仓库基础设施,由Facebook赞助开发。它通过提供SQL-like查询语言(HiveQL),使用户能够轻松地对存储在Hadoop分布式文件系统(HDFS...

    hive开发中常遇到的坑

    ### hive开发中常遇到的坑 ...在Hive开发过程中,正确理解和处理这些问题对于保证数据处理的准确性和效率至关重要。通过对以上几个方面的学习,可以帮助开发者更好地应对日常开发中可能遇到的各种挑战。

    最强HiveSQL开发指南.pdf

    《最强HiveSQL开发指南》是一本专注于Hive性能调优和实战操作的教程,旨在帮助读者深入理解和熟练运用Hive进行大数据处理。Hive作为Apache Hadoop生态系统中的一个组件,主要用于处理和分析大规模分布式存储的数据。...

    hive hadoop 开发手册

    总的来说,这个压缩包提供的资料对于学习和开发Hadoop及Hive项目非常有价值。通过阅读和理解这些文档,开发者能够掌握如何在Hadoop平台上构建高效的数据仓库系统,利用Hive进行大数据分析,以及遵循HQL的最佳实践来...

Global site tag (gtag.js) - Google Analytics