`
安铁辉
  • 浏览: 244852 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hive UDAF之cube

 
阅读更多
之前有想实现在hadoop上面自动cube,并计算同比的自动化解决方法。过考虑用UDAF去做但是一直没有去实现,最近项目中需要根据配置自动聚合生成数据,目标结果表格式固定,正好满足自动cube的场景,所以就搞了个demo,还在测试中

package com.taobao.voc.hive.udf;
/**
  * description  :对传入的多个维度的所有组合所对应的度量进行汇总
  * @param     :dim1,dim2 [... ,dim10] , '度量1,度量2,...度量N '
  * @return    : 返回一个数组,数组的每一个成员即一条返回记录,顺序与输入参数位置一一对应
  * @comment   :目前的多个度量需要拼成一个字符串,用逗号做分隔符,且是整数
  * @author    : 松坡
      * @version  : 1.0
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class SumCube extends UDAF {
 
	public static class Evaluator implements UDAFEvaluator {
		private boolean mEmpty;
		private double mSum;
		private Long mcount;
		private ArrayList<String> dim_list = new ArrayList<String>();
		private String split_str = "_";
		private String sub_split_str = ",";
		private Map<String, String> hm_out = new HashMap<String, String>();
		private Map<String, String> hm_ret = new HashMap<String, String>(); 
		private String[] dim_array;
		private static int dim_num = 0;
		private ArrayList<String> ret_al=new ArrayList<String>();
		
		public Evaluator() {
			super();
			init();
		}

		public void init() {
			mSum = 0;
			mcount = new Long(0);
			mEmpty = true;

		}

		public static String getAddStr(String src_old, String src_new) {
			String ret_str = "";
			if (src_old == null || src_old.equals("")) {
				ret_str = src_new;
			} else {
				String[] l_old = src_old.split(",");
				String[] l_new = src_new.split(",");

				for (int i = 0; i < src_new.split(",").length; i++) {
					ret_str = ret_str
							+ (Long.parseLong(l_old[i]) + Long
									.parseLong(l_new[i])) + ",";
				}
				ret_str = ret_str.substring(0, ret_str.length() - 1);
			}
			return ret_str;
		}

		public boolean iterate(String... args) {
			String in_num = "";

			if (args.length > 0) {
				in_num = args[args.length - 1];//最后一位是需要聚集的参数
				dim_array = new String[args.length - 1];
				dim_num = args.length - 1;
				//将字段保存到数组中
				for (int a = 0; a < args.length - 1; a++) {
					dim_array[a] = args[a];
				}
				// dim_array = dim.split(sub_split_str);
			}

			 //拼接纬度
			if (mcount == 0) {
				StringBuffer sb_tmp = new StringBuffer();

				for (int i = 0; i < dim_array.length; i++) {
					sb_tmp.append(i).append(sub_split_str);
				}
				String dim_src = sb_tmp.toString();
				  
				dim_list = getDim(dim_src.substring(0, dim_src.length() - 1));  
			}

			 
			for (int i = 0; i < dim_list.size(); i++) {
				StringBuffer sb_tmp_1 = new StringBuffer();
				String dim_cube = "";
				int num1 = 0;

				if (dim_list.get(i).contains("ALL")) {
					sb_tmp_1.append("ALL").append(split_str);
				} else {
					sb_tmp_1.append(dim_list.get(i)).append(split_str);

					for (int j = 0; j < dim_list.get(i).length(); j++) {
						num1 = Integer.parseInt(dim_list.get(i).substring(j,
								j + 1));
						sb_tmp_1.append(dim_array[num1]).append(sub_split_str);

					}
				}
				dim_cube = sb_tmp_1.toString().substring(0,
						sb_tmp_1.toString().length() - 1);

				 
				if (hm_out.containsKey(dim_cube)) {
					hm_out.put(dim_cube,
							getAddStr(hm_out.get(dim_cube), in_num));
				} else {
					hm_out.put(dim_cube, in_num);
				}
			}

			mcount++;

			return true;
		}

		public Map<String, String> terminatePartial() { 
			Map<String, String> hm_ext = new HashMap<String,String>();
			for (Map.Entry<String, String> entry : hm_out.entrySet()) {
				String key = entry.getKey().toString();
				String val = entry.getValue().toString(); 
				String v=getSrcDim(key,dim_num); 
				hm_ext.put(v, val);  
			} 
			return hm_ext;
		}

		public boolean merge(Map<String, String> hm_merge) {
			for (Map.Entry<String, String> entry : hm_merge.entrySet()) {
				String key = entry.getKey().toString();
				String value = entry.getValue().toString();
				if (hm_ret.containsKey(key)) {
					hm_ret.put(key, getAddStr(hm_ret.get(key), value));
				} else {
					hm_ret.put(key, value);
				}
			}

			 
			return true;
		}

		public ArrayList<String> terminate() {   
			for (Map.Entry<String, String> entry : hm_ret.entrySet()) {
				String key = entry.getKey().toString();
				String val = entry.getValue().toString();  
				ret_al.add(key+val);
			} 
 
			return  ret_al;
		}

		 
		public ArrayList<String> getDim(String dim_src) {
			String src_in = dim_src;

			String[] src_in_array = src_in.split(",");
			ArrayList<String> src_out_array = new ArrayList<String>();
			String slipt_str = ",";

			int j = 0;
			int flag = 0;
			int flag2 = 0;
			String tmp_new = "";
			String[] last_item_arry = null;
			StringBuffer tmp_sb = new StringBuffer();

			for (int i = 0; i < src_in_array.length; i++) { 
				tmp_sb = new StringBuffer();
				j = i;
				if (i == 0) {
					while (j < src_in_array.length) {
						tmp_sb.append(src_in_array[j]).append(slipt_str);
						j++;
						continue;
					}
				} else {
					for (int k = 0; k < last_item_arry.length; k++) {  
						for (int l = k; l < src_in_array.length; l++) {  
							if (last_item_arry[k].contains(src_in_array[l])) {
								continue;
							} else {

								for (int f = 0; f < tmp_sb.toString().split(
										slipt_str).length; f++) { 
									tmp_new = last_item_arry[k]
											.concat(src_in_array[l]);
									flag = 0;
									for (int d = 0; d < tmp_new.length(); d++) {
										if (tmp_sb.toString().split(slipt_str)[f]
												.contains(tmp_new.substring(d,
														d + 1))) {
											flag++;
											flag2 = 1;
										}
									}
									if (flag == tmp_new.length()) {
										flag2 = flag;
										break;
									}
								}

								if (flag <= i && flag2 < tmp_new.length()) {
									tmp_sb.append(last_item_arry[k])
											.append(src_in_array[l])
											.append(slipt_str);
								} else {
									flag2 = 1;
								}
							}
						}
					}
				}
				src_out_array.add(tmp_sb.toString());
				last_item_arry = tmp_sb.toString().split(slipt_str);
			}
			 
			ArrayList<String> out_array = new ArrayList<String>();
			String tmp_str = "";
			for (int e = 0; e < src_out_array.size(); e++) {
				tmp_str = src_out_array.get(e).toString();
				for (int w = 0; w < tmp_str.split(slipt_str).length; w++) {
					out_array.add(tmp_str.split(slipt_str)[w].toString());
				}
			}
			out_array.add("ALL");
			return out_array;
		}
		
		
		public static String getSrcDim(String arg, int num) {
			String ret = "";
			String tmp1 = "";
			String[] tmp2 = new String[1];  
			String[] tmp3= new String[num];  
			
			for(int r1=0;r1<num;r1++){
				tmp3[r1]="all";
			}
			
			if ((!arg.contains("ALL")) ) {
				tmp1 = arg.split("_")[0];
	            tmp2= arg.split("_")[1].split(",");
	            int tmp_f=0;
	            
	            for (int r2 = 0; r2 < tmp1.length(); r2++) {  
						tmp_f=(int)tmp1.charAt(r2)-48;  
					tmp3[tmp_f] = tmp2[r2];
				}
	            
	             
			}
			for(int r3=0;r3<num;r3++){
				ret=ret+tmp3[r3]+",";
			}
			return ret;

		}

	}
 
}

分享到:
评论

相关推荐

    hive udaf 实现按位取与或

    在“hive udaf 实现按位取与或”的场景中,我们主要探讨如何使用UDAF来实现数据的按位逻辑运算,如按位与(AND)和按位或(OR)。 一、Hive UDAF基本概念 UDAF是一种特殊的用户自定义函数,它负责处理一组输入值并...

    Hive UDAF示例

    A custom UDAF to group oncatenates all arguments from different rows into a single string.

    hive:个人配置单元 UDAF

    个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...

    hive案例之---------微博数据分析及答案.zip

    在这个“hive案例之---------微博数据分析及答案”的项目中,我们将会探索如何利用Hive进行大规模的微博数据挖掘与分析。 首先,项目说明文档.docx可能会详细阐述了该项目的目标、背景、数据来源以及预期的结果。...

    Hive开窗函数测试-cube,rollup

    本篇将重点探讨Hive中的窗口函数以及如何利用它们进行cube和rollup操作。这些功能极大地增强了数据分析的能力,使得我们可以对数据进行更复杂的聚合。 窗口函数在SQL中是一种非常强大的工具,它允许我们在一个数据...

    apache-hive-2.3.9-bin.tar大数据HIVE.zip

    6. **存储过程(UDF,UDAF,UDTF)**:Hive支持用户自定义函数(UDF),用户定义聚合函数(UDAF)和用户定义表生成函数(UDTF),允许扩展Hive的功能。 7. **连接Hadoop生态系统**:Hive与Hadoop生态系统的其他组件...

    hive客户端安装_hive客户端安装_hive_

    在大数据处理领域,Hive是一个非常重要的工具,它提供了一个基于Hadoop的数据仓库基础设施,用于数据查询、分析和管理大规模数据集。本教程将详细讲解如何在Linux环境下安装Hive客户端,以便进行数据操作和分析。 ...

    HIVE安装及详解

    HIVE与传统数据库有很多不同之处: * 数据存储:HIVE使用Hadoop分布式文件系统(HDFS)存储数据,而传统数据库使用关系数据库管理系统(RDBMS) * 数据处理:HIVE使用MapReduce处理数据,而传统数据库使用SQL查询 *...

    java操作Hive源码之HiveJDBC实例(mysql数据库)

    在Java编程环境中,与Hive进行交互通常涉及使用Hive JDBC驱动程序,这是一种允许Java应用程序通过JDBC(Java Database Connectivity)接口与Hadoop上的Hive数据仓库进行通信的方式。本实例将聚焦于如何使用Java和...

    尚硅谷大数据之Hive视频

    2. **可扩展性**:Hive允许用户自定义函数(UDF)、聚合函数(UDAF)和表函数(UDTF),这使得Hive具有高度的灵活性。 3. **批处理能力**:通过将SQL语句转换成MapReduce任务,Hive非常适合用于处理大规模数据集的...

    HIVE相关的jar包

    开发者通常需要这些jar包来创建自定义的Hive UDF(用户自定义函数)或UDAF(用户自定义聚合函数)以扩展Hive的功能。这些jar包也用于在MapReduce、Tez或Spark等计算框架上运行Hive查询。 在Hive的不同版本之间,jar...

    hive数仓、hive SQL 、 hive自定义函数 、hive参数深入浅出

    - UDAF(用户定义的聚合函数):用于处理一组输入值并返回单个值,如自定义平均值、众数等。 - UDTF(用户定义的表生成函数):将一行数据转换为多行,常用于数据拆分。 4. Hive参数调优: - 内存参数:如...

    Hive_JDBC.zip_hive java_hive jdbc_hive jdbc pom_java hive_maven连

    理解Hive JDBC的工作原理以及如何在Java项目中正确配置和使用它是大数据开发中的关键技能之一。通过这个实例,你现在已经具备了使用Hive JDBC的基础知识,可以进一步探索更高级的Hive和Java集成技术。

    Hive实战之视频网站的测试数据

    在本实践案例中,我们将深入探讨如何利用Hive这一大数据处理工具来处理视频网站的测试数据。Hive是一个基于Hadoop的数据仓库系统,它允许使用SQL语法进行数据查询和分析,非常适合大规模分布式数据集的处理。这个...

    Hive驱动1.1.0.zip

    首先,Hive驱动是连接Hive服务器并与之通信的关键组件。它实现了Hive的客户端接口,允许Java应用程序,如IDE(集成开发环境)或数据库管理工具,与Hive服务器进行交互。在DataGrip这样的专业数据库IDE中,Hive驱动是...

    hive UDF需要jar包

    这些函数可以是单行输入单行输出的UDF,多行输入单行输出的UDF(UDAF,User Defined Aggregation Function),或者多行输入多行输出的UDTF(User Defined Table Generating Function)。 2. **Java编程**: Hive ...

    Ambari下Hive3.0升级到Hive4.0

    在大数据领域,Apache Ambari 是一个用于 Hadoop 集群管理和监控的开源工具,而 Hive 是一个基于 Hadoop 的数据仓库系统,用于处理和分析大规模数据集。本话题聚焦于如何在 Ambari 环境下将 Hive 3.0 升级到 Hive ...

    Apache Hive Functions Cheat Sheet

    Apache Hive是建立在Hadoop之上的数据仓库工具,它为处理大数据提供了SQL查询功能。Hive Functions Cheat Sheet为我们提供了一系列Hive中内置函数的快速参考,并详细介绍了如何创建和使用这些函数。 首先,Hive提供...

    自定义hive函数

    Hive 的灵活性之一在于支持用户自定义函数(UDF),包括用户定义的单行函数(UDF)、用户定义的多行函数(UDAF)和用户定义的表函数(UDTF)。这些自定义函数允许开发者扩展Hive的功能,以满足特定的业务需求。 ...

    Hive3.1.2编译源码

    使用hive3.1.2和spark3.0.0配置hive on spark的时候,发现官方下载的hive3.1.2和spark3.0.0不兼容,hive3.1.2对应的版本是spark2.3.0,而spark3.0.0对应的hadoop版本是hadoop2.6或hadoop2.7。 所以,如果想要使用高...

Global site tag (gtag.js) - Google Analytics