package com.csm.data.udf.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;
/**
* 根据
* copr_idAcont_idA
* copr_idAcont_idB
* copr_idAcont_idC
*
* 得到
* copr_idAcont_idB
*
* @author hadoop_szty
*
*/
public class UDAFGetCont extends AbstractGenericUDAFResolver {
/**
* 验证参数是否正确
*/
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
//只允许一个参数传入
if (parameters.length != 1) {
throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected.");
}
//这里只能传入原始的数据类型,其他的列表,数组不能处理
if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted.");
}
return new Evaluator();
}
public static class Evaluator extends GenericUDAFEvaluator {
//最终结果变量
private Text resContId;
private PrimitiveObjectInspector inputOI;
public Evaluator() {
super();
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
// TODO Auto-generated method stub
super.init(m, parameters);
resContId = new Text();
inputOI = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
static class ContAgg implements AggregationBuffer {
boolean empty;
String resCont;
}
// 返回存储临时聚合结果的AggregationBuffer对象。
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
ContAgg result = new ContAgg();
reset(result);
return result;
}
// 重置聚合结果对象,以支持mapper和reducer的重用。
@Override
public void reset(AggregationBuffer agg) throws HiveException {
ContAgg myagg = (ContAgg) agg;
myagg.empty = true;
myagg.resCont = "";
}
// 迭代处理原始数据parameters并保存到agg中。
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
// TODO Auto-generated method stub
merge(agg, parameters[0]);
}
// 以持久化的方式返回agg表示的部分聚合结果,
// 这里的持久化意味着返回值只能Java基础类型、数组、基础类型包装器、Hadoop的Writables、Lists和Maps。
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return terminate(agg);
}
// 合并由partial表示的部分聚合结果到agg中。
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
ContAgg myagg = (ContAgg) agg;
String contIdStr = PrimitiveObjectInspectorUtils.getString(partial, inputOI);
if (!contIdStr.equals("")) {
if (contIdStr.length() == 18) {
String flag = contIdStr.toString().substring(8, 10);
if (flag.equals("02") || flag.equals("03")) {
myagg.resCont = contIdStr;
myagg.empty = false;
}
} else if (contIdStr.length() == 22 && myagg.resCont.length() != 18) {
myagg.resCont = contIdStr;
myagg.empty = false;
}
}
}
}
// 返回最终结果。
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
ContAgg myagg = (ContAgg) agg;
if (myagg.empty) {
return null;
}
resContId.set(myagg.resCont);
return resContId;
}
}
}
相关推荐
本文将通过一个具体的Java与Hive结合的实例,深入讲解如何利用Java API进行Hive的操作,以加深对Hive语句的理解。 首先,要使用Java与Hive交互,我们需要引入Hive的JDBC驱动。在项目中,可以通过Maven或Gradle添加...
6. **存储过程(UDF,UDAF,UDTF)**:Hive支持用户自定义函数(UDF),用户定义聚合函数(UDAF)和用户定义表生成函数(UDTF),允许扩展Hive的功能。 7. **连接Hadoop生态系统**:Hive与Hadoop生态系统的其他组件...
首先,Hive提供了多种函数类型,包括用户定义函数(UDF)、用户定义聚合函数(UDAF)、用户定义表生成函数(UDTF)和宏。用户定义函数(UDF)是一个接受一个或多个行中的列作为参数,并返回一个值或对象的函数,例如...
Hive 基本概念 Hive 应用场景。 Hive 与hadoop的关系。 Hive 与传统数据库对比。 Hive 的数据存储机制。 Hive 基本操作 ...Hive 中的DDL操作。...Hive UDF/UDAF开发实例。 Hive 执行过程分析及优化策略
1. **Hive Server**:这是Hive服务的运行实例,用于接收客户端的查询请求并执行它们。它提供了两种交互模式:HiveServer(旧版,基于Thrift)和HiveServer2(新版,更安全和高性能)。 2. **Hive Metastore**:存储...
5. **Hive进阶功能**:包括分区、桶、视图、联接、子查询、UDF(用户自定义函数)、UDAF(用户自定义聚合函数)和UDTF(用户自定义表生成函数)等高级特性。 6. **Hive性能优化**:如何通过优化HQL、选择合适的存储...
《设计开发 Hive 编程指南 完整版》是一份详尽的教程...这份完整的编程指南将详细解释以上知识点,并通过实例演示如何使用 Hive 进行数据处理和分析,对于希望掌握 Hive 技术的开发者来说,是一份非常宝贵的参考资料。
2. **易于扩展**:Hive支持用户自定义函数(UDF)、用户自定义聚合函数(UDAF)和用户自定义表函数(UDTF),使得功能扩展变得简单。 3. **高容错性**:即使某些节点出现故障,Hive仍然能够继续执行SQL查询任务,...
- **Hive在实际项目中的应用案例**:通过具体的实例展示Hive如何解决实际问题。 通过阅读《实用Hive》,读者不仅可以了解到Hive的基本概念和技术细节,还可以学习到如何有效地使用Hive解决实际的大数据处理问题。...
《Spark SQL大数据实例开发教程》是一本专注于Spark SQL学习的指南,由王家林和祝茂农等人编著。本书旨在帮助企业级开发人员深入理解和掌握Spark SQL,它在Spark生态系统中扮演着至关重要的角色,是处理大规模数据的...
- Hive UDFs 分为三类:UDF(单行函数),UDAF(聚合函数)和 UDTF(多行函数)。`hive-udf-collections` 主要关注 UDF。 - UDF 允许用户扩展 Hive 的功能,解决内置函数无法满足的特定需求。 - UDFs 必须用 Java...
总的来说,《Hive编程指南》这本书会深入讲解这些概念,通过实例演示如何使用Hive进行大数据分析,同时也会涵盖Hive在实际项目中的最佳实践和常见问题解决方案。对于希望掌握Hadoop生态系统中数据分析工具的读者来说...
Hadoop精讲第二部分的培训内容涵盖了Hive的深入介绍、其应用实例以及与之相关的各种技术细节。Hive作为Hadoop生态中非常重要的组件,它是一个建立在Hadoop之上的数据仓库工具,允许用户使用类SQL语言HQL来查询存储在...
#### 2.2 Hive 优化实例 - **排序优化**:使用 `sort by` 替代 `order by`,以提高效率。 - **静态分区**:使用静态分区可以避免运行时动态分区带来的额外开销。 - **减少 Job 和 Task 数量**:通过表连接等操作来...
1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的"旋风之旅" 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift 2.5 示例 2.5.1 模式 2.5.2 ...
1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的“旋风之旅” 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift ...
标题“udf.zip_UDF案例_udf_udf模板”暗示了这个压缩包包含了一系列关于UDF的实例和模板,适合初学者学习和参考。这可能包括各种类型的UDF编写方式,以及如何在不同的场景下应用它们。 描述中的"udf 各种编写模板,...
Hive提供了数据定义、查询、内置函数以及自定义函数(UDF、UDAF、UDTF),方便大数据分析。 实时计算部分,Storm实时流计算框架的介绍和Spark Streaming的编程实例,帮助学生理解实时数据处理的原理和架构。Scala...
- **用户自定义函数(UDF和UDAF)的开发与演示**:指导如何开发UDF和UDAF,并提供示例代码。 - **Hive优化**:分享Hive性能优化的策略和技术。 #### 六、数据迁移工具Sqoop - **Sqoop简介和配置**:概述Sqoop的作用...
- **用户自定义函数(UDF和UDAF)的开发与演示**:开发自定义函数以满足特定需求。 - **Hive优化**:采用各种策略提高Hive查询效率。 #### 五、数据迁移工具Sqoop - **Sqoop简介和配置**:了解Sqoop的功能及其配置...