`
yugouai
  • 浏览: 497533 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

再谈GenericUDAF(以collect_set源码分析)

 
阅读更多

        最近对Generic UDAF思索了一下,感觉最关键的是理解UDAF执行的每一步过程的输入与输出,其实步骤根据说明来编写相关代码就基本没问题,但是需要注意的是,数据类型需要统一,建议使用 Hadoop 数据类型,即分布式对象。实践中证实使用writable系列的类型比java系列的类型简单. 不要尝试同时使用二种系列的类型, 中间容易出现ClassCastException.

        
        编写步骤:

        0)在resolver对输入数据(类型、个数)加以判断

        1)首先分析数据从原始数据到最后输出所需的步骤

        2)init方法根据每个步骤的数据的输入不同,加上相关的判断与输出类型

        3)init方法注意每一步的输出类型

        4)定义静态类实现AggregationBuffer聚合流接口,在此定义临时存放集合的变量,该变量是临时存储聚合。

        5)reset方法需要手工调用,在getNewAggergationBuffer方法中声明实现AggregationBuffer的静态类变量,并调用reset方法

        6)iterate方法将原始数据转为临时聚合流数据,注意将原始数据赋值到AggregationBuffer聚合流变量

        7)terminatePartial方法,将返回部分聚集结果,一个封装了聚集计算当前状态的对象

        8)merge方法,将terminatePartial方法生成的部分聚集与另一部分聚合值合并

        9)terminate方法,将返回最后聚集的结果集

 

        HIVE内置Generic UDAF(collect_set)源码分析

/**
 * GenericUDAFCollectSet
 */
@Description(name = "collect_set", value = "_FUNC_(x) - Returns a set of objects with duplicate elements eliminated")
public class GenericUDAFCollectSet extends AbstractGenericUDAFResolver {

  static final Log LOG = LogFactory.getLog(GenericUDAFCollectSet.class.getName());
  
  public GenericUDAFCollectSet() {
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
      throws SemanticException {
    //判别参数个数
    if (parameters.length != 1) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Exactly one argument is expected.");
    }
    //判别是否是基本类型,可以重写成支持复合类型
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
    //指定调用的Evaluator,用来接收消息和指定UDAF如何调用
    return new GenericUDAFMkSetEvaluator();
  }

  public static class GenericUDAFMkSetEvaluator extends GenericUDAFEvaluator {
    
    // For PARTIAL1 and COMPLETE: ObjectInspectors for original data
    private PrimitiveObjectInspector inputOI;
    // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list
    // of objs)
    private StandardListObjectInspector loi;
    
    private StandardListObjectInspector internalMergeOI;
    
    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
        throws HiveException {
      super.init(m, parameters);
      // init output object inspectors
      // The output of a partial aggregation is a list
      /**
      * collect_set函数每个阶段分析
      * 1.PARTIAL1阶段,原始数据到部分聚合,在collect_set中,则是将原始数据放入set中,所以,
      * 输入数据类型是PrimitiveObjectInspector,输出类型是StandardListObjectInspector
      * 2.在其他情况,有两种情形:(1)两个set之间的数据合并,也就是不满足if条件情况下
      *(2)直接从原始数据到set,这种情况的出现是为了兼容从原始数据直接到set,也就是说map后
      * 直接到输出,没有reduce过程,也就是COMPLETE阶段
      */
      if (m == Mode.PARTIAL1) {
        inputOI = (PrimitiveObjectInspector) parameters[0];
        return ObjectInspectorFactory
            .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils
                .getStandardObjectInspector(inputOI));
      } else {
        //COMPLETE 阶段
        if (!(parameters[0] instanceof StandardListObjectInspector)) {
          //no map aggregation.
          inputOI = (PrimitiveObjectInspector)  ObjectInspectorUtils
          .getStandardObjectInspector(parameters[0]);
          return (StandardListObjectInspector) ObjectInspectorFactory
              .getStandardListObjectInspector(inputOI);
        } else { //PARTIAL2,FINAL阶段,两个阶段都是list与list合并,调用一致
          internalMergeOI = (StandardListObjectInspector) parameters[0];
          inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();
          loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);          
          return loi;
        }
      }
    }
    
    static class MkArrayAggregationBuffer implements AggregationBuffer {
      Set<Object> container;
    }
    
    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
      ((MkArrayAggregationBuffer) agg).container = new HashSet<Object>();
    }
    
    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer();
      reset(ret);
      return ret;
    }

    //mapside,将原始值转换添加到集合中
    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
        throws HiveException {
      assert (parameters.length == 1);
      Object p = parameters[0];

      if (p != null) {
        MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
        putIntoSet(p, myagg);
      }
    }

    //mapside,临时聚集
    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
      ArrayList<Object> ret = new ArrayList<Object>(myagg.container.size());
      ret.addAll(myagg.container);
      return ret;
    }
    //terminatePartial的临时聚集跟另一个聚集合并
    @Override
    public void merge(AggregationBuffer agg, Object partial)
        throws HiveException {
      MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
      ArrayList<Object> partialResult = (ArrayList<Object>) internalMergeOI.getList(partial);
      for(Object i : partialResult) {
        putIntoSet(i, myagg);
      }
    }
    
    //合并最终结果到结果集返回
    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
      MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
      ArrayList<Object> ret = new ArrayList<Object>(myagg.container.size());
      ret.addAll(myagg.container);
      return ret;
    }
    
    private void putIntoSet(Object p, MkArrayAggregationBuffer myagg) {
      if (myagg.container.contains(p))
        return;
      Object pCopy = ObjectInspectorUtils.copyToStandardObject(p,
          this.inputOI);
      myagg.container.add(pCopy);
    }
  }
  
}

 

分享到:
评论

相关推荐

    TXTcollector_collect_zip_源码

    总结来说,"TXTcollector_collect_zip_源码"涉及到的主要知识点有:TXT文件处理、文本收集策略、ZIP压缩技术、源码理解和分析,以及软件部署与文档阅读。这些是编程和软件工程领域中基础且重要的技能,对提升文件...

    date_collect_.rar_ date_collect__数据采集_数据采集系统_采集_频域分析

    "date_collect__数据采集_数据采集系统_采集_频域分析"的标题暗示了这个项目不仅关注数据的获取,还涉及到对这些数据的深入分析,特别是频域分析。 频域分析是一种信号处理方法,用于研究信号在频率域内的特性。在...

    collect_py.rar_Python 采集_collect_py.rar_python web_python 网站_采集

    标题中的"collect_py.rar_Python 采集_collect_py.rar_python web_python 网站_采集"表明这是一个关于Python网络数据采集的项目,其中包含了名为"collect_py"的源代码。描述进一步证实了这一点,它提到这是一个使用...

    Python库 | django_collect_offline-0.2.39-py3-none-any.whl

    **Python库 django_collect_offline-0.2.39-py3-none-any.whl** `django_collect_offline` 是一个针对Python的Web开发框架Django的扩展库,它旨在优化和改进Django项目的静态文件处理流程,特别是对于离线场景。在...

    Collect Data_无源码.7z

    无源码的"Collect Data_无源码.7z"可能是一个数据收集项目的成果,其中包含了未公开的实现细节,但我们可以从一般的角度来讨论数据收集的一般流程和技术。 数据收集主要分为以下几类: 1. 网络爬虫:网络爬虫是...

    PyPI 官网下载 | os_collect_config-5.0.0.0b3-py2-none-any.whl

    对于.os_collect_config这样的库,其可能包含的API接口、使用示例、错误处理和性能优化等方面,都需要通过阅读官方文档或库源码来深入了解。在实际应用中,根据具体需求,可以将这个库集成到自动化脚本、系统管理...

    face_collect_0510.rar

    本文将详细探讨如何通过摄像头操作收集人脸数据,并基于"face_collect_0510.rar"这个压缩包,了解其在人脸数据采集中的应用。 首先,我们需要理解“face_collect_0510.rar”这一文件的含义。这是一个用于人脸数据...

    PyPI 官网下载 | django_collect_offline-0.2.34-py3-none-any.whl

    **PyPI 官网下载 | django_collect_offline-0.2.34-py3-none-any.whl** PyPI(Python Package Index)是Python社区官方的软件包仓库,提供了大量预编译的Python库,方便开发者下载和安装。在本案例中,我们关注的是...

    市场数据收集,_Collect_market_data_for_quant_analytics_market_data_

    市场数据收集,_Collect_market_data_for_quant_analytics_market_data_collector

    Python库 | pytest_collect_formatter-0.3.0-py2.py3-none-any.whl

    `pytest_collect_formatter` 是一个基于Python的测试框架`pytest`的扩展库,版本为0.3.0。这个库主要用于格式化和展示`pytest`在收集测试时的信息,提供了更友好的输出,使得开发者在执行测试时能更清晰地了解测试...

    mm_facetoface_collect_qrcode_1731229982888.png

    mm_facetoface_collect_qrcode_1731229982888.png

    mm_facetoface_collect_qrcode_1711075679703.png

    mm_facetoface_collect_qrcode_1711075679703.png

    biz_collect_pack.sql

    biz_collect_pack.sql

    JavaScript应用实例-Alipay_collect_energy_Optimize.js

    JavaScript应用实例-Alipay_collect_energy_Optimize.js

    PyPI 官网下载 | os_collect_config-13.0.1-py3-none-any.whl

    《PyPI官网下载:深入解析os_collect_config-13.0.1-py3-none-any.whl》 PyPI(Python Package Index)是Python开发者最常用的一个资源库,它提供了大量的Python软件包供用户下载和使用。在我们讨论的这个案例中,...

    data_collect_SEND.rar_信息采集

    "data_collect_SEND.rar_信息采集"这个压缩包文件显然是一个专门用于数据采集发送的程序,下面我们将深入探讨相关的知识点。 1. **数据采集**:数据采集是通过各种手段从不同源收集数据的过程,包括网络爬虫、API...

    leetcode答案-git_collect_sub:git_collect_sub

    leetcode ...git_collect_sub 个人收集的有用项目,使用git submodule方式,引用已有项目。 项目环境 submodule的使用 参考: 增加submodule git submodule add https://SubModule.git 首次拉取 git clone ...

    sweep_collect_扫频_RTL-SDRmatlab_

    标题中的“sweep_collect_扫频_RTL-SDRmatlab_”暗示了这是一个使用RTL-SDR(Realtek低成本软件定义无线电)设备,并通过MATLAB编程实现的扫频(频率扫描)应用。RTL-SDR是一种廉价的硬件,允许用户接收并处理射频...

    collect_data_CSI数据提取_csi信号处理_csi_grabbedo7y_

    本项目名为“collect_data_CSI数据提取_csi信号处理_csi_grabbedo7y_”,其核心在于从无线环境中获取和处理CSI数据,采用滤波技术和Principal Component Analysis (PCA) 进行信号分析。 首先,我们需要理解什么是...

    sweep_collect_通信建模实验_天线频谱收集_

    本次实验的主题是“sweep_collect_通信建模实验_天线频谱收集”,旨在利用MATLAB编程语言来模拟天线对空间中无线信号的频谱收集过程。这涉及到多个知识点,包括无线通信基础、天线理论、频谱分析以及MATLAB编程。 ...

Global site tag (gtag.js) - Google Analytics