HIVE的row_number函数,类似于Oracle的ROW_NUMBER函数,实现在HIVE跑Map/Reduce的Reduce过程中取行号,一般应用于Sort By,Order By
具体代码如下:
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.LongWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Description(name = "row_number", value = "_FUNC_(a, [...]) - Assumes that incoming data is SORTed and DISTRIBUTEd according to the given columns, and then returns the row number for each row within the partition,") public class GenericUDFPartitionRowNumber extends GenericUDF { private Logger logger = LoggerFactory.getLogger(GenericUDFPartitionRowNumber.class); private LongWritable rowIndex = new LongWritable(0); private Object[] partitionColumnValues; private ObjectInspector[] objectInspectors; private int[] sortDirections; // holds +1 (for compare() > 0), 0 for unknown, -1 (for compare() < 0) /** * Takes the output of compare() and scales it to either, +1, 0 or -1. * * @param val * @return */ protected static int collapseToIndicator(int val) { if (val > 0) { return 1; } else if (val == 0) { return 0; } else { return -1; } } /** * Wraps Object.equals, but allows one or both arguments to be null. Note * that nullSafeEquals(null, null) == true. * * @param o1 * First object * @param o2 * Second object * @return */ protected static boolean nullSafeEquals(Object o1, Object o2) { if (o1 == null && o2 == null) { return true; } else if (o1 == null || o2 == null) { return false; } else { return (o1.equals(o2)); } } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { assert (arguments.length == partitionColumnValues.length); for (int i = 0; i < arguments.length; i++) { if (partitionColumnValues[i] == null) { partitionColumnValues[i] = ObjectInspectorUtils.copyToStandardObject(arguments[i].get(), objectInspectors[i]); } else if (!nullSafeEquals(arguments[i].get(), partitionColumnValues[i])) { // check sort directions. We know the elements aren't equal. int newDirection = collapseToIndicator(ObjectInspectorUtils.compare(arguments[i].get(), objectInspectors[i],partitionColumnValues[i], objectInspectors[i])); if (sortDirections[i] == 0) { // We don't already know what the sort direction should be sortDirections[i] = newDirection; } else if (sortDirections[i] != newDirection) { throw new HiveException( "Data in column: " + i + " does not appear to be consistently sorted, so partitionedRowNumber cannot be used."); } // reset everything (well, the remaining column values, because the previous ones haven't changed. for (int j = i; j < arguments.length; j++) { partitionColumnValues[j] = ObjectInspectorUtils.copyToStandardObject(arguments[j].get(),objectInspectors[j]); } rowIndex.set(1); return rowIndex; } } // partition columns are identical. Increment and continue. rowIndex.set(rowIndex.get() + 1); return rowIndex; } @Override public String getDisplayString(String[] children) { return "partitionedRowNumber(" + StringUtils.join(children, ", ") + ")"; } @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { logger.info("run times"); if (arguments.length == 0) { throw new UDFArgumentLengthException("The function partitionedRowNumber expects at least 1 argument."); } partitionColumnValues = new Object[arguments.length]; for (ObjectInspector oi : arguments) { if (ObjectInspectorUtils.isConstantObjectInspector(oi)) { throw new UDFArgumentException("No constant arguments should be passed to partitionedRowNumber."); } } objectInspectors = arguments; sortDirections = new int[arguments.length]; return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } }
HIVE的0.11.0版本中提供了row_number函数,看了一下源码:
import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.IntWritable; @WindowFunctionDescription ( description = @Description( name = "row_number", value = "_FUNC_() - The ROW_NUMBER function assigns a unique number (sequentially, starting from 1, as defined by ORDER BY) to each row within the partition." ), supportsWindow = false, pivotResult = true ) public class GenericUDAFRowNumber extends AbstractGenericUDAFResolver { static final Log LOG = LogFactory.getLog(GenericUDAFRowNumber.class.getName()); @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { if (parameters.length != 0) { throw new UDFArgumentTypeException(parameters.length - 1, "No argument is expected."); } return new GenericUDAFRowNumberEvaluator(); } static class RowNumberBuffer implements AggregationBuffer { ArrayList<IntWritable> rowNums; int nextRow; void init() { rowNums = new ArrayList<IntWritable>(); } RowNumberBuffer() { init(); nextRow = 1; } void incr() { rowNums.add(new IntWritable(nextRow++)); } } public static class GenericUDAFRowNumberEvaluator extends GenericUDAFEvaluator { @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); if (m != Mode.COMPLETE) { throw new HiveException("Only COMPLETE mode supported for row_number function"); } return ObjectInspectorFactory.getStandardListObjectInspector( PrimitiveObjectInspectorFactory.writableIntObjectInspector); } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { return new RowNumberBuffer(); } @Override public void reset(AggregationBuffer agg) throws HiveException { ((RowNumberBuffer) agg).init(); } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { ((RowNumberBuffer) agg).incr(); } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { throw new HiveException("terminatePartial not supported"); } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { throw new HiveException("merge not supported"); } @Override public Object terminate(AggregationBuffer agg) throws HiveException { return ((RowNumberBuffer) agg).rowNums; } } }
内置的row_number函数需要结合窗口函数使用,例如:
select s, sum(f) over (partition by i), row_number() over () from over10k where s = 'tom allen' or s = 'bob steinbeck';
窗口函数为0.11.0版本新增的特征。
相关推荐
spark-hive_2.11-2.3.0...spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0.jar
TPC-H_on_Hive_2009-08-14.tar.gz 是一个压缩包文件,其中包含了针对Hive的TPC-H测试工具。TPC-H是一个标准的决策支持系统(OLAP Online Analytical Processing)基准测试,主要用于评估大数据分析平台在处理复杂...
赠送jar包:flink-connector-hive_2.11-1.13.2.jar; 赠送原API文档:flink-connector-hive_2.11-1.13.2-javadoc.jar; 赠送源代码:flink-connector-hive_2.11-1.13.2-sources.jar; 赠送Maven依赖信息文件:flink-...
赠送jar包:flink-connector-hive_2.11-1.10.0.jar; 赠送原API文档:flink-connector-hive_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-hive_2.11-1.10.0-sources.jar; 赠送Maven依赖信息文件:flink-...
flink-sql-connector-hive-3.1.2_2.11-1.11.6.jar 已经解决guava冲突亲测可以
含两个文件hive-jdbc-3.1.2-standalone.jar和apache-hive-3.1.2-bin.tar.gz 含两个文件hive-jdbc-3.1.2-standalone.jar和apache-hive-3.1.2-bin.tar.gz 含两个文件hive-jdbc-3.1.2-standalone.jar和apache-hive-...
02.hive内置函数--窗口分析函数--row_number_over.mp4
由于jar包的下载速度很慢,我想也有很多下载慢的同学,分享下
flink-connector-hive_2.11-1.13.1
赠送jar包:flink-connector-hive_2.11-1.13.2.jar; 赠送原API文档:flink-connector-hive_2.11-1.13.2-javadoc.jar; 赠送源代码:flink-connector-hive_2.11-1.13.2-sources.jar; 赠送Maven依赖信息文件:flink-...
flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
apache-hive-2.1.1-bin.tar apache-hive-2.1.1-bin.tar apache-hive-2.1.1-bin.tarapache-hive-2.1.1-bin.tar apache-hive-2.1.1-bin.tar apache-hive-2.1.1-bin.tarapache-hive-2.1.1-bin.tar apache-hive-2.1.1-...
1. **hive-jdbc-2.1.1-cdh6.3.2-standalone.jar**:这是Hive JDBC的独立版本,包含了所有必要的依赖,可以直接在没有其他CDH库的环境中运行。开发者可以将这个JAR文件添加到他们的项目中,以便通过Java应用程序或Web...
02、hive-exec-2.1.1-cdh6.3.1.jar 03、hive-jdbc-2.1.1-cdh6.3.1.jar 04、hive-jdbc-2.1.1-cdh6.3.1-standalone.jar 05、hive-metastore-2.1.1-cdh6.3.1.jar 06、hive-service-2.1.1-cdh6.3.1.jar 07、libfb303-...
flink-connector-hive_2.12-1.13.1.jar 是 Apache Flink 的一个 Hive 连接器 JAR 包,用于在 Flink 中与 Apache Hive 集成。这里面的数字 2.12 和 1.13.1 分别表示了这个 JAR 包所依赖的 Scala 和 Flink 的版本。 ...
spark-hive-thriftserver_2.11-2.1.spark-hive-thrift
flink-sql-connector-hive-3.1.2-2.12-1.15.4.jar
标题中的"**hive-jdbc-uber-2.6.5.0-292.jar**"是一个Uber(也称为Shaded)JAR文件,它集成了Hive JDBC驱动的所有依赖项。Uber JAR的目的是为了方便部署,因为它将所有必需的库合并到一个单一的文件中,避免了类路径...
Apache Hive(apache-hive-3.1.3-bin.tar.gz、apache-hive-3.1.3-src.tar.gz)是一种分布式容错数据仓库系统,支持大规模分析,并使用 SQL 促进读取、写入和管理驻留在分布式存储中的 PB 级数据。Hive 构建在 Apache...
赠送jar包:flink-connector-hive_2.11-1.12.7.jar; 赠送原API文档:flink-connector-hive_2.11-1.12.7-javadoc.jar; 赠送源代码:flink-connector-hive_2.11-1.12.7-sources.jar; 赠送Maven依赖信息文件:flink-...