`
greemranqq
  • 浏览: 977102 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

hive udf 唯一bigInt 生成器

阅读更多

一、背景

        mysql数据由于自增的bigint 主键,会插入更快,因为能持续往文件末尾插入嘛,因此需要这个东西。

        然后呢,服务端有专门生产id的接口,但是数据中心批量插入,肯定会拉暴他们,不让我们一起玩,只能自己玩。

 

二、方案

        1.redis 获取数据段,程序内部自增。

        问题:要用外部redis麻烦,而且要持久化

 

        2.python 服务注册的,这个自己网上搜索。

        问题:要机器,要服务端和客户端,而且python 我们版本不够高。。。尴尬,不想随便升级整个集群

 

 

        3.twitter 的 snowflake 算法

         参考: https://www.jianshu.com/p/54a87a7c3622

 

         参数:workId  datacenterId 唯一

  

         问题:

          1.两个ID 我都没法唯一。MAP 阶段可能是在同一个机器上,同时执行,参数不好搞

          

          解决方案:

          1.我们场景是只需要同一个任务的主键不重复就行。因此workId 我选取map 的ID,毕竟同一个任务,每个MAP的ID 肯定不同。 datacenterId   可以先默认0.

 

三.上代码:

      

public class MagicSnowFlake {

    //其实时间戳   2017-01-01 00:00:00
    private final static long twepoch = 1483200000000l;

 // 改到16 65535,认为MAP的最大数量限制

    private final static long ipIdMax = 65535;

    // 默认1位,我们小,没那么多数据中心,意思一下
    private final static long dataCenterIdBits = 1L;

  // 9+1 (10 )

    private final static long mapIdBits = 9L;

    private final static long dataCenterIdMax = ~ (-1L << dataCenterIdBits);

    //序列在id中占的位数 12bit
    private final static long seqBits = 12L;

    //序列最大值 4095 即2的12次方减一。
    private final static long seqMax = ~(-1L << seqBits);

    // 64位的数字:首位0  随后41位表示时间戳 MAP_ID 最后12位序列号
    private final static long dataCenterIdLeftShift = seqBits;
    private final static long mapIdLeftShift = seqBits + dataCenterIdBits;
    private final static long timeLeftShift = seqBits  + dataCenterIdBits + mapIdLeftShift;

    //IP标识(0~255)
    private long ipId;

    // 数据中心ID(0~3)
    private long dataCenterId;

    // 毫秒内序列(0~4095)
    private long seq = 0L;

    // 上次生成ID的时间截
    private long lastTime = -1L;

    public MagicSnowFlake(long ipId, long dataCenterId) {
        if(ipId < 0 || ipId > ipIdMax) {
            System.out.println(" ---------- ipId不在正常范围内(0~"+ipIdMax +") " + ipId);
            System.exit(0);
        }

        if(dataCenterId < 0 || dataCenterId > dataCenterIdMax) {
            System.out.println(" ---------- dataCenterId不在正常范围内(0~"+dataCenterIdMax +") " + dataCenterId);
            System.exit(0);
        }

        this.ipId = ipId;
        this.dataCenterId = dataCenterId;
    }

    public synchronized long nextId() {
        long nowTime = System.currentTimeMillis();

        if(nowTime < lastTime) {
            System.out.println(" ---------- 当前时间前于上次操作时间,当前时间有误: " + nowTime);
            System.exit(0);
        }

        if(nowTime == lastTime) {
            seq = (seq + 1) & seqMax;
            if(seq == 0) {
                nowTime = getNextTimeStamp();
            }
        } else {
            seq = 0L;
        }

        lastTime = nowTime;


        return ((nowTime - twepoch) << timeLeftShift)
                | (ipId << mapIdLeftShift)
                | (dataCenterId << dataCenterIdLeftShift)
                | seq;
    }

    private long getNextTimeStamp() {
        long nowTime;
        do {
            nowTime = System.currentTimeMillis();
        } while(nowTime <= lastTime);
        return nowTime;
    }

    public static void main(String[] args) {
        System.out.println(Long.MAX_VALUE);
        MagicSnowFlake msf = new MagicSnowFlake(1, 1);
        msf.nextId();
        System.out.println(~ (-1L << 15));
    }
}

 

       

 

    UDF 部分

   

import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * @author <a href="mailto:huoguo@2dfire.com">火锅</a>
 * @time 18/3/8
 */
@UDFType(deterministic = false, stateful = true)
public class LongIdUDF extends GenericUDF {
    private static final char SEPARATOR = '_';
    private static final String ATTEMPT = "attempt";
    private long mapTaskId = 0l;
    private int increment = 0;

    private MagicSnowFlake snowFlake;



    @Override
    public void configure(MapredContext context) {
        increment = context.getJobConf().getNumMapTasks();
        if(increment == 0) {
            throw new IllegalArgumentException("mapred.map.tasks is zero");
        }

        mapTaskId = getInitId(context.getJobConf().get("mapred.task.id"),increment);
        if(mapTaskId == 0l) {
            throw new IllegalArgumentException("mapred.task.id");
        }
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments)
            throws UDFArgumentException {
        return PrimitiveObjectInspectorFactory.javaLongObjectInspector;
    }

    @Override
    public Long evaluate(DeferredObject[] arguments) throws HiveException {
        if(snowFlake == null){
            int dataCenterId = Integer.parseInt(arguments[0].get().toString());
            snowFlake = new MagicSnowFlake(getMapTaskId(),dataCenterId);
        }
        return snowFlake.nextId();
    }

    @Override
    public String getDisplayString(String[] children) {
        return "getLongId(0)";
    }


    private synchronized long getMapTaskId() {
        return mapTaskId;
    }

    //attempt_1478926768563_0537_m_000004_0 // return 0+1
    private long getInitId (String taskAttemptIDstr,int numTasks)
            throws IllegalArgumentException {
        try {
            String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR));
            if(parts.length == 6) {
                if(parts[0].equals(ATTEMPT)) {
                    if(!parts[3].equals("m") && !parts[3].equals("r")) {
                        throw new Exception();
                    }
                    long result = Long.parseLong(parts[4]);
                    if(result >= numTasks) { //if taskid >= numtasks
                        throw new Exception("TaskAttemptId string : " + taskAttemptIDstr  + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] ..");
                    }
                    return result + 1;
                }
            }
        } catch (Exception e) {}
        throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr + " is  not properly formed");
    }


    public static void main(String[] args) {
        String s = "attempt_1478926768563_0537_m_000004_4";
        System.out.println(new LongIdUDF().getInitId(s,5));
    }

}

 

 

   小结:

          1.copy的代码自己改造的,忘记位置了,总的来说是出自twitter 嘛。

          2.用了30亿的表进行测试,没重复

          3.有问题 请及时提出,

 

0
0
分享到:
评论

相关推荐

    hive UDF需要jar包

    在Hive中,UDF(User Defined Functions)是用户自定义函数,允许开发人员扩展Hive的内置功能,以满足特定的数据处理需求。Hive UDF的实现通常涉及到编写Java代码,并将其打包成JAR(Java Archive)文件,然后在Hive...

    HIve UDF 说明书

    在Hive UDF的使用说明书中,包含了关于Hive的内置操作符、内置函数、聚合函数、表生成函数、以及自定义UDF的创建等内容。这些内容是学习和使用Hive UDF不可或缺的知识点。 内置操作符部分涵盖了数学函数和操作符、...

    Hive UDF开发

    根据功能不同,Hive UDF主要分为三类:基本UDF、通用UDF (GenericUDF) 和表生成UDF (UDTF)。 #### 三、Hive UDF开发步骤 ##### 3.1 创建Java程序 首先需要编写一个Java类,继承自`org.apache.hadoop.hive.ql.exec....

    大数据 java hive udf函数的示例代码(手机号码脱敏)

    "大数据 Java Hive UDF 函数示例代码(手机号码脱敏)" 大数据 Java Hive UDF 函数示例代码(手机号码脱敏)是指使用 Java 语言开发的用户定义函数(User Defined Function,UDF),该函数可以在 Hive 中使用,实现...

    Hive表生成工具,Hive表生成工具Hive表生成工具

    Hive表生成工具,Hive表生成工具Hive表生成工具

    hive的udf功能

    大数据的hive资源的详细代码设计以及分享,望博友相互交流

    base64加密解密的hive udf函数

    在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,被广泛用于数据查询和分析。在某些场景下,我们可能需要对数据进行加密或者解密操作,以保护敏感信息或实现特定的数据处理需求。Base64是一种常见的编码...

    dataiku hive udf

    在大数据处理领域,Hive是一个非常重要的组件,它提供了一个基于Hadoop的数据仓库工具,能够将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。Hive UDF(User Defined Functions)则是Hive中扩展其功能的...

    Hive的Udf函数进行数据脱敏

    在大数据处理领域,Apache Hive 是一个非常重要的工具,它提供了SQL-like接口来处理存储在分布式存储系统(如HDFS)中的大规模数据集。Hive 的 User Defined Functions (UDFs) 是用户自定义函数,允许开发者扩展Hive...

    hive自定义UDF编写函数.docx

    Hive 自定义 UDF 编写函数 本文主要讲解了 Hive 中自定义 UDF 函数的编写方法,包括创建 UDF 类、实现自定义函数逻辑、编译和打包 UDF jar 包、上传至 Hive 服务器并注册自定义函数。 一、创建 UDF 类 为了实现...

    hive-udf:NexR Hive UDF

    NexR Hive UDF 关于 NexR Hive UDF是Hive用户定义功能的集合。 执照 快速开始 $ git clone https://github.com/nexr/hive-udf.git $ cd hive-udf $ mvn clean package or $ ant -Dhive.install.dir=../hive/build/...

    Java_facebook Hive udf.zip

    Hive是Apache的一个数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,适合大规模数据集的批处理。而User Defined Functions (UDF)则是Hive中的一个重要特性,它允许用户自定义函数来...

    hive-udf(两地址间距离计算+省市区位置解析(Java代码))

    在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,被广泛用于结构化数据的查询、分析和存储。为了满足特定的业务需求,Hive提供了用户定义函数(UDF)的功能,允许用户自定义处理数据的逻辑。在这个“hive-...

    HiveUDF:Apache Hive UDF(用户定义函数)

    Hive UDF UDF 聚合 UDF Finds MIN, MAX and SUM from array of Struct Objects based on a field. 排序 UDF Returns sorted array of Struct objects for an array of Struct Objects based on a field. 日期 ...

    javasql笔试题-spark-hive-udf:展示如何在ApacheSpark中使用HiveUDF的示例项目

    Hive UDF 项目 介绍 该项目只是一个示例,包含多个 (UDF),用于 Apache Spark。 它旨在演示如何在 Scala 或 Java 中构建 Hive UDF 并在 . 为什么要使用 Hive UDF? Hive UDF 的一个特别好的用途是与 Python 和 ...

    各种情况手机号清洗udf函数(hive impala)

    在大数据处理领域,手机号码的清洗是一项至关重要的任务,尤其在使用Hive和Impala这样的大数据分析工具时。本文将详细讲解如何通过自定义函数(UDF)来处理各种格式的手机号码,确保数据的质量和一致性。我们将涵盖...

    hiveUDF-1.0-SNAPSHOT.jar

    hiveUDF-1.0-SNAPSHOT.jar

    hive的UDF的编写.docx

    hive是大数据处理的重要工具之一,其提供了强大的数据处理能力。然而,在实际应用中,我们可能需要根据业务需求编写自定义的UDF(User Defined Function),以满足特定的数据处理需求。下面,我们将详细介绍如何使用...

    java6string源码-jet-hive-udf:有用的hiveudf函数,包含日期计算,ip,useragent解析函数,加密解密等

    java6 string源码 ...将会在target目录下生成[A=jet-hive-udf-${version}-shaded.jar, B=jet-hive-udf-${version}.jar]文件.其中A是包括所有依赖包的jar, B是最小编译jar文件 你也可以直接在发布页下载打

    spark-hive-udf:Spark Hive UDF示例

    Spark Hive UDF示例 建立项目 mvn clean package 将spark-hive-udf-1.0.0-SNAPSHOT.jar复制到边缘节点临时目录 spark-hive-udf]# cp target/spark-hive-udf-1.0.0-SNAPSHOT.jar /tmp 通过提供罐子来启动火花壳 spark...

Global site tag (gtag.js) - Google Analytics