最近有个需求,需要把一个用户的应用使用情况组装成一个GSON字符串,通过UDAF实现了这一功能。具体来说:一张表如下:
meid | app | usecnt | usetime |
meid1 | com.yulong.x | 1 | 2 |
meid1 | com.baidu.x | 2 | 5 |
meid2 | com.tencent.x | 3 | 8 |
最终要把同一个用户的应用使用情况做成json串,比如结果中的一条数据如下:
{"AppUsageStat": [ { "apName": "cn.kuwo.player", "frequency": 9, "duration": 312237 }, { "apName": "com.android.gallery3d", "frequency": 3, "duration": 70737 } ] }
具体代码如下:
package com.yulong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;
public class MakeCountAndTimeGsonResolver extends AbstractGenericUDAFResolver{
static final Log LOG = LogFactory.getLog( MakeCountAndTimeGsonResolver.class.getName() );
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
if( params.length != 3 ){
throw new UDFArgumentTypeException(params.length-1,"Exactly 3 args required,supplied " + params.length );
}
for( int i = 0 ; i<3 ; i ++ ){
if( params[i].getCategory() != ObjectInspector.Category.PRIMITIVE ){
throw new UDFArgumentTypeException(i,"Only primitive type arguments are accepted but "
+ params[i].getTypeName() + " is passed.");
}
}
return new MakeCountAndTimeGsonEvaluator();
}
public static class MakeCountAndTimeGsonEvaluator extends GenericUDAFEvaluator{
private Text gsonRs;
//3个原始数据行中传入的参数对应ObjectInspector
private PrimitiveObjectInspector inputOI1;
private PrimitiveObjectInspector inputOI2;
private PrimitiveObjectInspector inputOI3;
//combiner或reducer输入的部分结果对应的ObjectInspector,在Merge方法中用到
private PrimitiveObjectInspector outputOI;
//存放结果的类,实现AggregationBuffer接口
public static class GsonAgg implements AggregationBuffer{
String rs;
boolean empty;
}
void resetAgg(GsonAgg result){
result.empty = true;
result.rs = "\"AppUsageStat\":[]";
}
//这个方法返回了UDAF的返回类型,这里确定了MakeCountAndTimeGsonEvaluator自定义函数的返回类型是String类型
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
assert ( parameters.length == 3 ) ;
super.init(m, parameters);
gsonRs = new Text( "{\"AppUsageStat:\"[]}" );
//每个阶段都会执行init,不同阶段对应的parameters是不一样的,在map阶段parameters代表的是sql语句中每个udaf对应参数的ObjectInspector,而在combiner或者reducer中parameters代表部分聚合结果对应的ObjectInspector。所以要区分对待。从iterate和merge的参数类型(一个数组类型,一个是object)就能看出来。因此在iterate和merge中分别使用inputOI1/2/3和outputOI 提取对应数据
if( m == Mode.PARTIAL1 || m == Mode.COMPLETE ){
inputOI1 = ( PrimitiveObjectInspector ) parameters[0];
inputOI2 = ( PrimitiveObjectInspector ) parameters[1];
inputOI3 = ( PrimitiveObjectInspector ) parameters[2];
}else if( m == Mode.PARTIAL2 || m == Mode.FINAL ){
outputOI = ( PrimitiveObjectInspector ) parameters[0];
}
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
//创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的结果
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
GsonAgg result = new GsonAgg();
reset(result);
return result;
}
//map阶段调用,只要把保存当前的对象agg,再加上输入的参数,就可以了。
@Override
public void iterate(AggregationBuffer result, Object[] parts)
throws HiveException {
if( parts != null ){
GsonAgg tmpResult = (GsonAgg)result;
String pkg = PrimitiveObjectInspectorUtils.getString( parts[0] ,inputOI1 );
Long count = PrimitiveObjectInspectorUtils.getLong( parts[1] , inputOI2 );
Long time = PrimitiveObjectInspectorUtils.getLong( parts[2] , inputOI3 );
String partialGson = "{\"apName\":\""+pkg+"\",\"frequency\":"+count+",\"duration\":"+time+"}";
int len = tmpResult.rs.length();
if( tmpResult.empty ){
tmpResult.empty = false;
tmpResult.rs = tmpResult.rs.substring(0,len-1 )+partialGson+"]";
}else{
tmpResult.rs = tmpResult.rs.substring(0,len-1 )+","+partialGson+"]";
}
}
}
//combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
@Override
public void merge(AggregationBuffer result, Object partial)
throws HiveException {
String partialGson = PrimitiveObjectInspectorUtils.getString( partial , outputOI );
String partialUsage = partialGson.substring(16 , partialGson.length()-1 );
GsonAgg tmpResult = (GsonAgg)result;
int len = tmpResult.rs.length();
if( tmpResult.empty ){
tmpResult.empty = false;
tmpResult.rs = tmpResult.rs.substring(0,len-2 )+partialUsage+"]";
}else{
tmpResult.rs = tmpResult.rs.substring(0,len-2 )+","+partialUsage+"]";
}
}
@Override
public void reset(AggregationBuffer arg0) throws HiveException {
GsonAgg result = (GsonAgg)arg0;
result.empty = true;
result.rs = "\"AppUsageStat\":[]";
}
//reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
@Override
public Object terminate(AggregationBuffer result) throws HiveException {
if( result != null ){
gsonRs.set( ((GsonAgg)result).rs );
return gsonRs;
}
return null;
}
//mapper结束要返回的结果,还有combiner结束返回的结果
@Override
public Object terminatePartial(AggregationBuffer result)
throws HiveException {
terminate(result);
return null;
}
}
}
相关推荐
在“hive udaf 实现按位取与或”的场景中,我们主要探讨如何使用UDAF来实现数据的按位逻辑运算,如按位与(AND)和按位或(OR)。 一、Hive UDAF基本概念 UDAF是一种特殊的用户自定义函数,它负责处理一组输入值并...
A custom UDAF to group oncatenates all arguments from different rows into a single string.
根据给定的信息,本文将详细解释如何在SQL中实现截取用特定字符分割的字符串中的第n个子字符串。此需求通常应用于数据处理与分析场景中,尤其在处理半结构化或非结构化的文本数据时非常有用。 ### 核心知识点解析 ...
使用 JSON-Serde,我们可以在创建 Hive 表时指定 JSON 数据格式,并将 JSON 字符串解析成列。 ```sql CREATE TABLE my_table (col1 string, col2 int) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' ...
Hive提供了一些内建的JSON函数,如get_json_object和json_tuple,用于解析和提取JSON字符串中的数据。然而,如果需要更复杂的JSON操作,如解析嵌套的JSON对象或处理JSON数组,自定义的JSON处理函数就显得尤为重要。`...
06.hive中的json解析函数--json-tuple.mp4
在Hive中,JSON数据的处理并不直接支持,因此需要SerDe来转换JSON字符串与Hive的内部表示之间。 **SerDe(Serializer/Deserializer)** 是Hive中用于处理表数据的接口。当Hive读取或写入数据时,SerDe负责将数据...
- **背景**:Hive不支持直接使用`HAVING`关键字,但可以通过嵌套子查询并在外层查询中使用`WHERE`条件来实现类似的功能。 - **示例**:如果想实现如下标准SQL的`HAVING`查询: ```sql SELECT gender, COUNT(*) as...
在这个例子中,`data`字段将被视为JSON字符串,由JSON-Serde解析成内部的Hive结构。`serialization.format`参数通常用来指定JSON对象的键值对分隔符,这里设置为1表示使用冒号(:)作为分隔符。 集成JSON-Serde后,...
hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema...
5. **使用UDF**:注册成功后,你就可以在Impala SQL查询中使用`substr_udf()`函数了,它会根据你的实现正确处理中文字符串。 需要注意的是,创建UDF时应确保函数的性能尽可能高效,因为Impala中的UDF会在执行时影响...
ascii码 与 字符串 相互转化 ascii码 与 字符串 相互转化 ascii码 与 字符串 相互转化
Hive嵌套JSON Arrray UDF 此UDF接收“ JSON字符串”和JSON数组的路径,并收集此路径指定的所有元素(也处理嵌套的JSON数组)。 例子: 假设此JSON在某些表的行中: {" request " : {" user " : " Mario " ," ...
标题:按某字段合并字符串之一(简单合并) 描述:将如下形式的数据按id字段合并value字段。...1、sql2000中只能用自定义的函数解决 create table tb(id int, value varchar(10)) insert into tb values(1,
get_json_object 函数可以从 JSON 字符串中提取指定的字段,例如: ``` select id, get_json_object(json, '$.name') name, get_json_object(json, '$.age') age, get_json_object(json, '$.sex') sex from...
hive解析json时所需jar包。具体使用: add jar ../../../target/json-serde-1.3-jar-with-dependencies.jar; CREATE TABLE json_nested_test ( country string, languages array, religions map,array<int>>) ...
在PHP编程语言中,处理字符串是一项常见的任务,其中包括查找一个字符串在另一个字符串中的位置。`strstr()`函数就是这样一个工具,它能帮助我们实现这个功能。本文将深入探讨`strstr()`函数的工作原理、语法以及...
在IT领域,数据库是存储和管理数据的核心工具,而SQL Server是...了解了这些基本概念后,你可以根据具体需求选择合适的驱动,并编写连接字符串,实现与SQL Server的顺畅通信。希望本篇内容能为你的开发工作提供帮助。
判断字符串是否包含emoji表情
“hive-json-serde-0.2.jar”是一个针对Hive的JSON SerDe实现,它的主要功能是将JSON文档转换为Hive可以理解的表格结构,同时也能将Hive的表格数据转换回JSON格式。这个库使得Hive能够直接操作JSON格式的数据,无需...