- 浏览: 505004 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
gaoke:
"我觉得这是java动态生成代码的方式得到的,因为使 ...
InvocationHandler中invoke()方法的调用问题 -
lyandyhk:
可以,反正对于我这个水平来说刚刚好,正好全部看懂,满分
InvocationHandler中invoke()方法的调用问题 -
593844923:
Subject subject=(Subject) Proxy ...
InvocationHandler中invoke()方法的调用问题 -
hl174:
写的不错 源码确实有点长 第一次大致看还有些没看怎么明白
InvocationHandler中invoke()方法的调用问题 -
draem0507:
129应该表示为00000000 10000001,转成byt ...
Java的补码表示
遇到一个Hive需求:有A、B、C三列,按A列进行聚合,求出C列聚合后的最小值和最大值各自对应的B列值。这个需求用hql和内建函数也可完成,但是比较繁琐,会解析成几个MR进行执行,如果自定义UDAF便可只利用一个MR完成任务。
所用Hive为0.13.1版本。UDAF有两种,第一种是比较简单的形式,利用抽象类UDAF和UDAFEvaluator,暂不做讨论。主要说一下第二种形式,利用接口GenericUDAFResolver2(或者抽象类AbstractGenericUDAFResolver)和抽象类GenericUDAFEvaluator。
这里用AbstractGenericUDAFResolver做说明。
可以看到,该抽象类有两个方法,其中一个已经被弃用,所以只需要实现参数类型为TypeInfo的getEvaluator方法即可。
该方法其实相当于一个工厂,TypeInfo表示在使用时传入该UDAF的参数的类型。该方法主要做的工作有:
返回的对象类型为GenericUDAFEvaluator,这是一个抽象类:
说明上述方法的之前,需要提一个GenericUDAFEvaluator的内部枚举类Mode
可以看到,UDAF将任务分成了几种类型,PARTIAL1相当于MR程序的map阶段,负责迭代处理记录并返回该阶段的中间结果。PARTIAL2相当于Combiner,对map阶段的结果进行一次聚合。FINAL是reduce阶段,进行整体聚合以及返回最终结果。COMPLETE有点特殊,是一个没有reduce阶段的map过程,所以在进行记录迭代之后,直接返回最终结果。
再来看GenericUDAFEvaluator中的各方法
初始化方法,在Mode的每一个阶段启动时会执行init方法。该方法有两个参数,第一个参数是Mode,可以根据此参数判断当前执行的是哪个阶段,进行该阶段相应的初始化工作。ObjectInspector是一个抽象的类型描述,例如:当参数类型是原生类型时,可以转化为PrimitiveObjectInspector,除此之外还有StructObjectInspector等等。ObjectInspector只是描述类型,并不存储实际数据。后面的具体例子中会有一些使用说明。
ObjectInspector[]的长度不是固定的,要看当前是处于哪个阶段。如果是PARTIAL1,那么与使用时传入该UDAF的参数个数一致;如果是FINAL阶段,长度就是1了,因为map阶段返回的结果只有一个对象。
AggregationBuffer是一个标识接口,没有任何需要实现的方法。实现该接口的类被用于暂存中间结果。reset是为了重置AggregationBuffer,但是在实际应用场景中没有发现单独调用该方法进行重置,有可能是聚合key的数据量还不够大,在后面会再说一下这个问题。
iterate方法存在于MR的M阶段,用于处理每一条输入记录。Object[]作为输入传入UFAF,AggregationBuffer作为中间缓存暂存结果。需要注意的是,每次调用iterate传入的AggregationBuffer并不一定是同一个对象。Hive调用UDAF的时候会用一个Map来管理AggregationBuffer,Map的key即为需要聚合的key。就通过实际运行过程来看,在每一次iterate调用之前,会根据聚合key从Map中查找对应的AggregationBuffer,若能找到则直接返回AggregationBuffer对象,找不到则调用getNewAggregationBuffer方法新建并插入Map中并返回结果。
terminatePartial方法在iterate处理完所有输入后调用,用于返回初步的聚合结果。
merge方法存在于MR的R阶段(也同样存在于Combine阶段),用于最后的聚合。Object类型的partial参数与terminatePartial返回值一致,AggregationBuffer参数与上述一致。
terminate方法在merge方法执行完毕之后调用,用于进行最后的处理,并返回最后结果。
像上面提到的Mode一样,这些方法并不一定都会被调用,与Hive解析成的MR程序类型有关。例如解析后的MR程序只有M阶段,则只会调用iterate和terminate。实际使用过程中,由于聚合key数据量有限,内存可以承载,所以没有发现reset单独调用的情况。每次遇到一个不同的key,则新建一个AggregationBuffer,没有看源码,不知道当聚合key很大的时候,是否会调用reset进行对象重用。
所用Hive为0.13.1版本。UDAF有两种,第一种是比较简单的形式,利用抽象类UDAF和UDAFEvaluator,暂不做讨论。主要说一下第二种形式,利用接口GenericUDAFResolver2(或者抽象类AbstractGenericUDAFResolver)和抽象类GenericUDAFEvaluator。
这里用AbstractGenericUDAFResolver做说明。
public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 { @SuppressWarnings("deprecation") @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { if (info.isAllColumns()) { throw new SemanticException( "The specified syntax for UDAF invocation is invalid."); } return getEvaluator(info.getParameters()); } @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException { throw new SemanticException( "This UDAF does not support the deprecated getEvaluator() method."); } }
可以看到,该抽象类有两个方法,其中一个已经被弃用,所以只需要实现参数类型为TypeInfo的getEvaluator方法即可。
该方法其实相当于一个工厂,TypeInfo表示在使用时传入该UDAF的参数的类型。该方法主要做的工作有:
- 检查参数长度和类型
- 根据参数返回对应的实际处理对象
返回的对象类型为GenericUDAFEvaluator,这是一个抽象类:
public abstract class GenericUDAFEvaluator implements Closeable { ...... public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { // This function should be overriden in every sub class // And the sub class should call super.init(m, parameters) to get mode set. mode = m; return null; } public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; public abstract void reset(AggregationBuffer agg) throws HiveException; public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException; public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException; public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException; public abstract Object terminate(AggregationBuffer agg) throws HiveException; ...... }
说明上述方法的之前,需要提一个GenericUDAFEvaluator的内部枚举类Mode
public static enum Mode { /** * 相当于map阶段,调用iterate()和terminatePartial() */ PARTIAL1, /** * 相当于combiner阶段,调用merge()和terminatePartial() */ PARTIAL2, /** * 相当于reduce阶段调用merge()和terminate() */ FINAL, /** * COMPLETE: 相当于没有reduce阶段map,调用iterate()和terminate() */ COMPLETE };
可以看到,UDAF将任务分成了几种类型,PARTIAL1相当于MR程序的map阶段,负责迭代处理记录并返回该阶段的中间结果。PARTIAL2相当于Combiner,对map阶段的结果进行一次聚合。FINAL是reduce阶段,进行整体聚合以及返回最终结果。COMPLETE有点特殊,是一个没有reduce阶段的map过程,所以在进行记录迭代之后,直接返回最终结果。
再来看GenericUDAFEvaluator中的各方法
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {...}
初始化方法,在Mode的每一个阶段启动时会执行init方法。该方法有两个参数,第一个参数是Mode,可以根据此参数判断当前执行的是哪个阶段,进行该阶段相应的初始化工作。ObjectInspector是一个抽象的类型描述,例如:当参数类型是原生类型时,可以转化为PrimitiveObjectInspector,除此之外还有StructObjectInspector等等。ObjectInspector只是描述类型,并不存储实际数据。后面的具体例子中会有一些使用说明。
ObjectInspector[]的长度不是固定的,要看当前是处于哪个阶段。如果是PARTIAL1,那么与使用时传入该UDAF的参数个数一致;如果是FINAL阶段,长度就是1了,因为map阶段返回的结果只有一个对象。
public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; public abstract void reset(AggregationBuffer agg) throws HiveException;
AggregationBuffer是一个标识接口,没有任何需要实现的方法。实现该接口的类被用于暂存中间结果。reset是为了重置AggregationBuffer,但是在实际应用场景中没有发现单独调用该方法进行重置,有可能是聚合key的数据量还不够大,在后面会再说一下这个问题。
public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException; public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException; public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException; public abstract Object terminate(AggregationBuffer agg) throws HiveException; ...... }
iterate方法存在于MR的M阶段,用于处理每一条输入记录。Object[]作为输入传入UFAF,AggregationBuffer作为中间缓存暂存结果。需要注意的是,每次调用iterate传入的AggregationBuffer并不一定是同一个对象。Hive调用UDAF的时候会用一个Map来管理AggregationBuffer,Map的key即为需要聚合的key。就通过实际运行过程来看,在每一次iterate调用之前,会根据聚合key从Map中查找对应的AggregationBuffer,若能找到则直接返回AggregationBuffer对象,找不到则调用getNewAggregationBuffer方法新建并插入Map中并返回结果。
terminatePartial方法在iterate处理完所有输入后调用,用于返回初步的聚合结果。
merge方法存在于MR的R阶段(也同样存在于Combine阶段),用于最后的聚合。Object类型的partial参数与terminatePartial返回值一致,AggregationBuffer参数与上述一致。
terminate方法在merge方法执行完毕之后调用,用于进行最后的处理,并返回最后结果。
像上面提到的Mode一样,这些方法并不一定都会被调用,与Hive解析成的MR程序类型有关。例如解析后的MR程序只有M阶段,则只会调用iterate和terminate。实际使用过程中,由于聚合key数据量有限,内存可以承载,所以没有发现reset单独调用的情况。每次遇到一个不同的key,则新建一个AggregationBuffer,没有看源码,不知道当聚合key很大的时候,是否会调用reset进行对象重用。
发表评论
-
编译libhdfs
2014-02-25 15:32 3773Mysql Applier是Mysql向hdf ... -
Hadoop本地读
2014-01-06 15:59 0http://blog.sina.com.cn/s/blog_ ... -
RCFile浅析
2013-07-11 16:52 8349RCFile是Facebook制定的一种数据格式 ... -
mapreduce输出文件的重命名
2012-06-27 19:35 2126之前看过,但用到的时候又忘记了……做下备注,以 ... -
hadoop中迭代器的对象重用问题
2012-05-07 18:58 3448在用reduce时出现一个问题,在这上面耗费了 ... -
Java序列化机制对同一对象的处理
2012-04-20 18:47 0以下例子参考http://developer. ... -
Java的补码表示
2012-04-20 17:19 3962最近发现一个比较低级的问题,就是java中的数 ... -
同步锁的失败可能
2012-04-20 10:29 1093以下例子参考http://developer.51cto.co ... -
使用hadoop序列化机制时的一点小问题
2012-02-25 17:07 1496其实在现在接触到的数据处理中还没怎么碰到到需要 ... -
Configuration简析
2011-08-09 10:30 1411以下分析基于Hadoop-0.19.2 ... -
SpringExt 扩展原理
2011-07-25 11:27 9031这篇文章是基于webx框架官方文档整理的。具体 ... -
优化多线程执行效率
2011-05-22 21:38 1708摘抄自http://zhmocean.itey ... -
ExecutorService与Executors例子的简单剖析
2011-05-11 00:13 6593对于多线程有了一点了解之后,那么来看看java ... -
java深度历险
2011-04-17 20:57 1290InfoQ上面有几篇文章不错,适合已经对jav ... -
Java引用类型
2011-04-05 19:00 5479Java有四种引 ... -
double类型运算精度丢失
2011-04-04 15:45 5621前段时间看了一点python入门,写了几个运算 ... -
Java变量初始化
2011-03-29 15:01 1476先看一个例子: public class Te ... -
Java实现循环移位
2011-03-25 20:56 5950做MD5算法时遇到了循环移位,在网上找了写资料 ... -
Java简记
2011-03-18 20:17 1811这里随手做一些记录,以便有空时整理。 ... -
Java的栈和堆
2011-03-12 21:18 1366学的越多, ...
相关推荐
Hive支持三种类型的自定义函数:用户定义的函数(UDF)、用户定义的聚合函数(UDAF)和用户定义的表生成函数(UDTF)。本篇文章主要介绍UDF的实现方法。 ##### 2.1 UDF的作用 - **扩展性**:允许开发人员根据具体...
Hive支持用户自定义函数,包括UDF(单行函数)、UDAF(聚合函数)和UDTF(多行函数),扩展了Hive的功能,满足特定的业务需求。 9. **Hive与Hadoop的关系**: Hive依赖于Hadoop的MapReduce或Tez等计算框架来处理...
#### 四、Hive自定义函数(UDF, UDAF, UDTF) - **UDF:User-Defined Function**:用户自定义函数,用于扩展Hive的功能。 - **UDAF:User-Defined Aggregation Function**:用户自定义聚合函数,用于实现特定的聚合...
### Hive SQL语法详解 #### 一、Hive简介与特性 Hive是一个建立在Hadoop之上的数据仓库工具,主要用于对存储在Hadoop文件系统(HDFS)中的数据进行数据分析和处理。它提供了类似SQL的查询语言——Hive SQL,使得...
【Hive面试知识点详解】 Hive是大数据领域中一个重要的数据仓库工具,它设计的目标是为大规模数据集提供数据查询和分析能力。由于Hive提供了类似SQL的查询语言(HQL),使得非Java背景的分析师也能轻松进行大数据...
### Apache Hive面试题详解 #### 一、Hive Metastore的三种模式 Hive Metastore服务主要用于存储Hive的元数据信息,包括表结构、分区信息等。Hive支持三种Metastore的存储模式: 1. **内嵌Derby方式**: - **...
#### Hive查询语言详解 HiveQL支持丰富的SQL特性,包括但不限于: - **分组查询(Group by)** - **等值连接(Equi-joins)** - **半连接(Semi-Join)** - **映射连接(Map join/bucket map join/sort merge map ...
本实验主要围绕大数据处理的核心组件,包括Flume、MapReduce (MR)、Hive、Sqoop以及自定义UDF(用户定义函数)的应用,旨在构建一个完整的大数据处理流程,从数据采集、清洗、分析到结果导入数据库。以下是各个步骤...
- **用户自定义函数(UDF和UDAF)的开发与演示**:指导如何开发UDF和UDAF,并提供示例代码。 - **Hive优化**:分享Hive性能优化的策略和技术。 #### 六、数据迁移工具Sqoop - **Sqoop简介和配置**:概述Sqoop的作用...
- **Hive实现**:利用Hive SQL实现用户画像相关的数据处理逻辑,包括UDAF函数、分区处理等高级特性。 ##### 11. CMD与日志生成模块 - **日志处理**:实现命令行工具,支持日志数据的生成与管理。 ##### 12. 作业...
- **用户自定义函数(UDF和UDAF)的开发与演示**:开发自定义函数以满足特定需求。 - **Hive优化**:采用各种策略提高Hive查询效率。 #### 五、数据迁移工具Sqoop - **Sqoop简介和配置**:了解Sqoop的功能及其配置...