一、背景
mysql数据由于自增的bigint 主键,会插入更快,因为能持续往文件末尾插入嘛,因此需要这个东西。
然后呢,服务端有专门生产id的接口,但是数据中心批量插入,肯定会拉暴他们,不让我们一起玩,只能自己玩。
二、方案
1.redis 获取数据段,程序内部自增。
问题:要用外部redis麻烦,而且要持久化
2.python 服务注册的,这个自己网上搜索。
问题:要机器,要服务端和客户端,而且python 我们版本不够高。。。尴尬,不想随便升级整个集群
3.twitter 的 snowflake 算法
参考: https://www.jianshu.com/p/54a87a7c3622
参数:workId datacenterId 唯一
问题:
1.两个ID 我都没法唯一。MAP 阶段可能是在同一个机器上,同时执行,参数不好搞
解决方案:
1.我们场景是只需要同一个任务的主键不重复就行。因此workId 我选取map 的ID,毕竟同一个任务,每个MAP的ID 肯定不同。 datacenterId 可以先默认0.
三.上代码:
public class MagicSnowFlake { //其实时间戳 2017-01-01 00:00:00 private final static long twepoch = 1483200000000l;
// 改到16位 65535,认为MAP的最大数量限制
private final static long ipIdMax = 65535; // 默认1位,我们小,没那么多数据中心,意思一下 private final static long dataCenterIdBits = 1L;
// 9+1 (10 )
private final static long mapIdBits = 9L;
private final static long dataCenterIdMax = ~ (-1L << dataCenterIdBits); //序列在id中占的位数 12bit private final static long seqBits = 12L; //序列最大值 4095 即2的12次方减一。 private final static long seqMax = ~(-1L << seqBits); // 64位的数字:首位0 随后41位表示时间戳 MAP_ID 最后12位序列号 private final static long dataCenterIdLeftShift = seqBits; private final static long mapIdLeftShift = seqBits + dataCenterIdBits; private final static long timeLeftShift = seqBits + dataCenterIdBits + mapIdLeftShift; //IP标识(0~255) private long ipId; // 数据中心ID(0~3) private long dataCenterId; // 毫秒内序列(0~4095) private long seq = 0L; // 上次生成ID的时间截 private long lastTime = -1L; public MagicSnowFlake(long ipId, long dataCenterId) { if(ipId < 0 || ipId > ipIdMax) { System.out.println(" ---------- ipId不在正常范围内(0~"+ipIdMax +") " + ipId); System.exit(0); } if(dataCenterId < 0 || dataCenterId > dataCenterIdMax) { System.out.println(" ---------- dataCenterId不在正常范围内(0~"+dataCenterIdMax +") " + dataCenterId); System.exit(0); } this.ipId = ipId; this.dataCenterId = dataCenterId; } public synchronized long nextId() { long nowTime = System.currentTimeMillis(); if(nowTime < lastTime) { System.out.println(" ---------- 当前时间前于上次操作时间,当前时间有误: " + nowTime); System.exit(0); } if(nowTime == lastTime) { seq = (seq + 1) & seqMax; if(seq == 0) { nowTime = getNextTimeStamp(); } } else { seq = 0L; } lastTime = nowTime; return ((nowTime - twepoch) << timeLeftShift) | (ipId << mapIdLeftShift) | (dataCenterId << dataCenterIdLeftShift) | seq; } private long getNextTimeStamp() { long nowTime; do { nowTime = System.currentTimeMillis(); } while(nowTime <= lastTime); return nowTime; } public static void main(String[] args) { System.out.println(Long.MAX_VALUE); MagicSnowFlake msf = new MagicSnowFlake(1, 1); msf.nextId(); System.out.println(~ (-1L << 15)); } }
UDF 部分
import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * @author <a href="mailto:huoguo@2dfire.com">火锅</a> * @time 18/3/8 */ @UDFType(deterministic = false, stateful = true) public class LongIdUDF extends GenericUDF { private static final char SEPARATOR = '_'; private static final String ATTEMPT = "attempt"; private long mapTaskId = 0l; private int increment = 0; private MagicSnowFlake snowFlake; @Override public void configure(MapredContext context) { increment = context.getJobConf().getNumMapTasks(); if(increment == 0) { throw new IllegalArgumentException("mapred.map.tasks is zero"); } mapTaskId = getInitId(context.getJobConf().get("mapred.task.id"),increment); if(mapTaskId == 0l) { throw new IllegalArgumentException("mapred.task.id"); } } @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { return PrimitiveObjectInspectorFactory.javaLongObjectInspector; } @Override public Long evaluate(DeferredObject[] arguments) throws HiveException { if(snowFlake == null){ int dataCenterId = Integer.parseInt(arguments[0].get().toString()); snowFlake = new MagicSnowFlake(getMapTaskId(),dataCenterId); } return snowFlake.nextId(); } @Override public String getDisplayString(String[] children) { return "getLongId(0)"; } private synchronized long getMapTaskId() { return mapTaskId; } //attempt_1478926768563_0537_m_000004_0 // return 0+1 private long getInitId (String taskAttemptIDstr,int numTasks) throws IllegalArgumentException { try { String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); if(parts.length == 6) { if(parts[0].equals(ATTEMPT)) { if(!parts[3].equals("m") && !parts[3].equals("r")) { throw new Exception(); } long result = Long.parseLong(parts[4]); if(result >= numTasks) { //if taskid >= numtasks throw new Exception("TaskAttemptId string : " + taskAttemptIDstr + " parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); } return result + 1; } } } catch (Exception e) {} throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr + " is not properly formed"); } public static void main(String[] args) { String s = "attempt_1478926768563_0537_m_000004_4"; System.out.println(new LongIdUDF().getInitId(s,5)); } }
小结:
1.copy的代码自己改造的,忘记位置了,总的来说是出自twitter 嘛。
2.用了30亿的表进行测试,没重复
3.有问题 请及时提出,
相关推荐
在Hive中,UDF(User Defined Functions)是用户自定义函数,允许开发人员扩展Hive的内置功能,以满足特定的数据处理需求。Hive UDF的实现通常涉及到编写Java代码,并将其打包成JAR(Java Archive)文件,然后在Hive...
在Hive UDF的使用说明书中,包含了关于Hive的内置操作符、内置函数、聚合函数、表生成函数、以及自定义UDF的创建等内容。这些内容是学习和使用Hive UDF不可或缺的知识点。 内置操作符部分涵盖了数学函数和操作符、...
根据功能不同,Hive UDF主要分为三类:基本UDF、通用UDF (GenericUDF) 和表生成UDF (UDTF)。 #### 三、Hive UDF开发步骤 ##### 3.1 创建Java程序 首先需要编写一个Java类,继承自`org.apache.hadoop.hive.ql.exec....
"大数据 Java Hive UDF 函数示例代码(手机号码脱敏)" 大数据 Java Hive UDF 函数示例代码(手机号码脱敏)是指使用 Java 语言开发的用户定义函数(User Defined Function,UDF),该函数可以在 Hive 中使用,实现...
Hive表生成工具,Hive表生成工具Hive表生成工具
大数据的hive资源的详细代码设计以及分享,望博友相互交流
在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,被广泛用于数据查询和分析。在某些场景下,我们可能需要对数据进行加密或者解密操作,以保护敏感信息或实现特定的数据处理需求。Base64是一种常见的编码...
在大数据处理领域,Hive是一个非常重要的组件,它提供了一个基于Hadoop的数据仓库工具,能够将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。Hive UDF(User Defined Functions)则是Hive中扩展其功能的...
在大数据处理领域,Apache Hive 是一个非常重要的工具,它提供了SQL-like接口来处理存储在分布式存储系统(如HDFS)中的大规模数据集。Hive 的 User Defined Functions (UDFs) 是用户自定义函数,允许开发者扩展Hive...
Hive 自定义 UDF 编写函数 本文主要讲解了 Hive 中自定义 UDF 函数的编写方法,包括创建 UDF 类、实现自定义函数逻辑、编译和打包 UDF jar 包、上传至 Hive 服务器并注册自定义函数。 一、创建 UDF 类 为了实现...
NexR Hive UDF 关于 NexR Hive UDF是Hive用户定义功能的集合。 执照 快速开始 $ git clone https://github.com/nexr/hive-udf.git $ cd hive-udf $ mvn clean package or $ ant -Dhive.install.dir=../hive/build/...
Hive是Apache的一个数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,适合大规模数据集的批处理。而User Defined Functions (UDF)则是Hive中的一个重要特性,它允许用户自定义函数来...
在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,被广泛用于结构化数据的查询、分析和存储。为了满足特定的业务需求,Hive提供了用户定义函数(UDF)的功能,允许用户自定义处理数据的逻辑。在这个“hive-...
Hive UDF UDF 聚合 UDF Finds MIN, MAX and SUM from array of Struct Objects based on a field. 排序 UDF Returns sorted array of Struct objects for an array of Struct Objects based on a field. 日期 ...
Hive UDF 项目 介绍 该项目只是一个示例,包含多个 (UDF),用于 Apache Spark。 它旨在演示如何在 Scala 或 Java 中构建 Hive UDF 并在 . 为什么要使用 Hive UDF? Hive UDF 的一个特别好的用途是与 Python 和 ...
在大数据处理领域,手机号码的清洗是一项至关重要的任务,尤其在使用Hive和Impala这样的大数据分析工具时。本文将详细讲解如何通过自定义函数(UDF)来处理各种格式的手机号码,确保数据的质量和一致性。我们将涵盖...
hiveUDF-1.0-SNAPSHOT.jar
hive是大数据处理的重要工具之一,其提供了强大的数据处理能力。然而,在实际应用中,我们可能需要根据业务需求编写自定义的UDF(User Defined Function),以满足特定的数据处理需求。下面,我们将详细介绍如何使用...
java6 string源码 ...将会在target目录下生成[A=jet-hive-udf-${version}-shaded.jar, B=jet-hive-udf-${version}.jar]文件.其中A是包括所有依赖包的jar, B是最小编译jar文件 你也可以直接在发布页下载打
Spark Hive UDF示例 建立项目 mvn clean package 将spark-hive-udf-1.0.0-SNAPSHOT.jar复制到边缘节点临时目录 spark-hive-udf]# cp target/spark-hive-udf-1.0.0-SNAPSHOT.jar /tmp 通过提供罐子来启动火花壳 spark...