`
zhangxiong0301
  • 浏览: 361139 次
社区版块
存档分类
最新评论

用HIVE中的UDAF实现JSON字符串组装

    博客分类:
  • HIVE
阅读更多

 

      最近有个需求,需要把一个用户的应用使用情况组装成一个GSON字符串,通过UDAF实现了这一功能。具体来说:一张表如下:

meid app usecnt usetime
meid1 com.yulong.x 1 2
meid1 com.baidu.x 2 5
meid2 com.tencent.x 3 8

最终要把同一个用户的应用使用情况做成json串,比如结果中的一条数据如下:

 

{"AppUsageStat": [
        {
            "apName": "cn.kuwo.player",
            "frequency": 9,
            "duration": 312237
        },
        {
            "apName": "com.android.gallery3d",
            "frequency": 3,
            "duration": 70737
        }
    ]
}

 

 

具体代码如下:

package com.yulong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;



public class MakeCountAndTimeGsonResolver extends AbstractGenericUDAFResolver{
	
	static final Log LOG = LogFactory.getLog( MakeCountAndTimeGsonResolver.class.getName() );

	@Override
	public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
		
		if( params.length != 3 ){
			throw new UDFArgumentTypeException(params.length-1,"Exactly 3 args required,supplied " + params.length );
		}
		
		for( int i = 0 ; i<3 ; i ++  ){
			if( params[i].getCategory() != ObjectInspector.Category.PRIMITIVE ){
				throw new UDFArgumentTypeException(i,"Only primitive type arguments are accepted but "
						+ params[i].getTypeName() + " is passed.");
			}
		}
		return new MakeCountAndTimeGsonEvaluator();
	}


	
	public static class MakeCountAndTimeGsonEvaluator extends GenericUDAFEvaluator{

		private Text gsonRs;
		//3个原始数据行中传入的参数对应ObjectInspector
		private PrimitiveObjectInspector inputOI1;
		private PrimitiveObjectInspector inputOI2;
		private PrimitiveObjectInspector inputOI3;
		//combiner或reducer输入的部分结果对应的ObjectInspector,在Merge方法中用到
		private PrimitiveObjectInspector outputOI;
		
		//存放结果的类,实现AggregationBuffer接口
		public static class GsonAgg implements AggregationBuffer{
			String rs;
			boolean empty;
		}
		
		void resetAgg(GsonAgg result){
			result.empty = true;
			result.rs = "\"AppUsageStat\":[]";
		}
		
		//这个方法返回了UDAF的返回类型,这里确定了MakeCountAndTimeGsonEvaluator自定义函数的返回类型是String类型
		@Override
		public ObjectInspector init(Mode m, ObjectInspector[] parameters)
				throws HiveException {
			
			assert ( parameters.length == 3 ) ;
			super.init(m, parameters);
			gsonRs = new Text( "{\"AppUsageStat:\"[]}" );



//每个阶段都会执行init,不同阶段对应的parameters是不一样的,在map阶段parameters代表的是sql语句中每个udaf对应参数的ObjectInspector,而在combiner或者reducer中parameters代表部分聚合结果对应的ObjectInspector。所以要区分对待。从iterate和merge的参数类型(一个数组类型,一个是object)就能看出来。因此在iterate和merge中分别使用inputOI1/2/3和outputOI 提取对应数据



			if( m == Mode.PARTIAL1 || m == Mode.COMPLETE ){
				inputOI1 = ( PrimitiveObjectInspector ) parameters[0];
				inputOI2 = ( PrimitiveObjectInspector ) parameters[1];
				inputOI3 = ( PrimitiveObjectInspector ) parameters[2];
			}else if( m == Mode.PARTIAL2 || m == Mode.FINAL ){
				outputOI = ( PrimitiveObjectInspector ) parameters[0];
			}
			
			return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
		}

		//创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的结果
		@Override
		public AggregationBuffer getNewAggregationBuffer() throws HiveException {
			
			GsonAgg result = new GsonAgg();
			reset(result);
			return result;
		}

		//map阶段调用,只要把保存当前的对象agg,再加上输入的参数,就可以了。
		@Override
		public void iterate(AggregationBuffer result, Object[] parts)
				throws HiveException {
			
			if( parts != null ){
				GsonAgg tmpResult = (GsonAgg)result;
				String pkg = PrimitiveObjectInspectorUtils.getString( parts[0] ,inputOI1 );
				Long count = PrimitiveObjectInspectorUtils.getLong( parts[1] ,  inputOI2 );
				Long time = PrimitiveObjectInspectorUtils.getLong( parts[2] ,  inputOI3 );
				String partialGson = "{\"apName\":\""+pkg+"\",\"frequency\":"+count+",\"duration\":"+time+"}";
				int len = tmpResult.rs.length();
				if( tmpResult.empty ){
					tmpResult.empty = false;
					tmpResult.rs = tmpResult.rs.substring(0,len-1 )+partialGson+"]";
				}else{
					tmpResult.rs = tmpResult.rs.substring(0,len-1 )+","+partialGson+"]";
				}
			}
		}

		//combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
		@Override
		public void merge(AggregationBuffer result, Object partial)
				throws HiveException {
			
			String partialGson = PrimitiveObjectInspectorUtils.getString( partial ,  outputOI ); 
			String partialUsage = partialGson.substring(16 ,  partialGson.length()-1 );
			
			GsonAgg tmpResult = (GsonAgg)result;
			int len = tmpResult.rs.length();
			if( tmpResult.empty ){
				tmpResult.empty = false;
				tmpResult.rs = tmpResult.rs.substring(0,len-2 )+partialUsage+"]";
			}else{
				tmpResult.rs = tmpResult.rs.substring(0,len-2 )+","+partialUsage+"]";
			}
		}

		
		
		@Override
		public void reset(AggregationBuffer arg0) throws HiveException {
			
			GsonAgg  result = (GsonAgg)arg0;
			result.empty = true;
			result.rs = "\"AppUsageStat\":[]";
			
		}

		//reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
		@Override
		public Object terminate(AggregationBuffer result) throws HiveException {
			
			if( result != null ){
				gsonRs.set( ((GsonAgg)result).rs );
				return gsonRs;
			}
			return null;
		}

		//mapper结束要返回的结果,还有combiner结束返回的结果
		@Override
		public Object terminatePartial(AggregationBuffer result)
				throws HiveException {
			terminate(result);
			return null;
		}
		
	}
	
	
}

 

 

分享到:
评论

相关推荐

    hive udaf 实现按位取与或

    在“hive udaf 实现按位取与或”的场景中,我们主要探讨如何使用UDAF来实现数据的按位逻辑运算,如按位与(AND)和按位或(OR)。 一、Hive UDAF基本概念 UDAF是一种特殊的用户自定义函数,它负责处理一组输入值并...

    Hive UDAF示例

    A custom UDAF to group oncatenates all arguments from different rows into a single string.

    截取用,分割的字符串中的第n个字符串 SQL

    根据给定的信息,本文将详细解释如何在SQL中实现截取用特定字符分割的字符串中的第n个子字符串。此需求通常应用于数据处理与分析场景中,尤其在处理半结构化或非结构化的文本数据时非常有用。 ### 核心知识点解析 ...

    hive,json格式传送,加载数据

    使用 JSON-Serde,我们可以在创建 Hive 表时指定 JSON 数据格式,并将 JSON 字符串解析成列。 ```sql CREATE TABLE my_table (col1 string, col2 int) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' ...

    一些有用的自定义配置单元udf函数、特殊数组、json、数学、字符串函数。___下载.zip

    Hive提供了一些内建的JSON函数,如get_json_object和json_tuple,用于解析和提取JSON字符串中的数据。然而,如果需要更复杂的JSON操作,如解析嵌套的JSON对象或处理JSON数组,自定义的JSON处理函数就显得尤为重要。`...

    06.hive中的json解析函数--json-tuple.mp4

    06.hive中的json解析函数--json-tuple.mp4

    Hive-JSON-Serde-develop

    在Hive中,JSON数据的处理并不直接支持,因此需要SerDe来转换JSON字符串与Hive的内部表示之间。 **SerDe(Serializer/Deserializer)** 是Hive中用于处理表数据的接口。当Hive读取或写入数据时,SerDe负责将数据...

    部分普通sql查询在hive中的实现方式

    - **背景**:Hive不支持直接使用`HAVING`关键字,但可以通过嵌套子查询并在外层查询中使用`WHERE`条件来实现类似的功能。 - **示例**:如果想实现如下标准SQL的`HAVING`查询: ```sql SELECT gender, COUNT(*) as...

    Hive-JSON-Serde-1.3.8.zip

    在这个例子中,`data`字段将被视为JSON字符串,由JSON-Serde解析成内部的Hive结构。`serialization.format`参数通常用来指定JSON对象的键值对分隔符,这里设置为1表示使用冒号(:)作为分隔符。 集成JSON-Serde后,...

    hive-json-schema最新源代码

    hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema...

    impala中substr()截取中文字符串乱码的问题

    5. **使用UDF**:注册成功后,你就可以在Impala SQL查询中使用`substr_udf()`函数了,它会根据你的实现正确处理中文字符串。 需要注意的是,创建UDF时应确保函数的性能尽可能高效,因为Impala中的UDF会在执行时影响...

    ascii码 与 字符串 相互转化

    ascii码 与 字符串 相互转化 ascii码 与 字符串 相互转化 ascii码 与 字符串 相互转化

    Hive-JSON-Array-UDF:一个UDF从嵌套的JSON数组中检索元素,并作为HiveQL数组返回

    Hive嵌套JSON Arrray UDF 此UDF接收“ JSON字符串”和JSON数组的路径,并收集此路径指定的所有元素(也处理嵌套的JSON数组)。 例子: 假设此JSON在某些表的行中: {" request " : {" user " : " Mario " ," ...

    分组字符合并SQL语句 按某字段合并字符串之一(简单合并)

    标题:按某字段合并字符串之一(简单合并) 描述:将如下形式的数据按id字段合并value字段。...1、sql2000中只能用自定义的函数解决 create table tb(id int, value varchar(10)) insert into tb values(1,

    hive支持json格式的数据.docx

    get_json_object 函数可以从 JSON 字符串中提取指定的字段,例如: ``` select id, get_json_object(json, '$.name') name, get_json_object(json, '$.age') age, get_json_object(json, '$.sex') sex from...

    hive解析json格式数据所需jar包

    hive解析json时所需jar包。具体使用: add jar ../../../target/json-serde-1.3-jar-with-dependencies.jar; CREATE TABLE json_nested_test ( country string, languages array, religions map,array&lt;int&gt;&gt;) ...

    php判断字符串在另一个字符串位置的方法

    在PHP编程语言中,处理字符串是一项常见的任务,其中包括查找一个字符串在另一个字符串中的位置。`strstr()`函数就是这样一个工具,它能帮助我们实现这个功能。本文将深入探讨`strstr()`函数的工作原理、语法以及...

    SQL Server数据库驱动及连接字符串

    在IT领域,数据库是存储和管理数据的核心工具,而SQL Server是...了解了这些基本概念后,你可以根据具体需求选择合适的驱动,并编写连接字符串,实现与SQL Server的顺畅通信。希望本篇内容能为你的开发工作提供帮助。

    判断字符串中是否包含emoji表情

    判断字符串是否包含emoji表情

    hive-json-serde-0.2.jar

    “hive-json-serde-0.2.jar”是一个针对Hive的JSON SerDe实现,它的主要功能是将JSON文档转换为Hive可以理解的表格结构,同时也能将Hive的表格数据转换回JSON格式。这个库使得Hive能够直接操作JSON格式的数据,无需...

Global site tag (gtag.js) - Google Analytics