`

hive-脚本增量导入数据

    博客分类:
  • hive
 
阅读更多

 

 

 

0 现象描述:
 ct_teach_coursewares_content从mysql抽取数据到hive仓库时,抽取时间过长,以前是全量抽取,现在计划修改成增量抽取,
其中,ct_teach_coursewares_content 和 ct_teach_coursewares 通过 coursewares_id 授课课件ID 关联, ct_teach_coursewares表中 update_time是增加索引了的

 

1 修改流程:
1 ct_teach_coursewares(老师授课表)   这边一直是全量抽取的,这边抽取速度也挺快,几分钟就结束,逻辑不动
2 在hive中将抽取过来的 ct_teach_coursewares_content(课件内容表) 按照 select coursewares_id from ct_teach_coursewares where update_time>='${com_present_date_Y_m_d}' and update_time<'${com_after_date_Y_m_d}' 得到增量的 id 写到本地磁盘
3.0 id_list=`cat ${MODE_PATH}/${ct_teach_coursewares_column}/*`  将本地磁盘的id列出来,其中 id格式需要拼接成  '11','22','33'这种方式--- 带单引号是因为coursewares_id是varchar类型
3.1 ct_teach_coursewares_content 在抽取的时候根据2的id去mysql中抽取,query="select coursewares_id,belong_type,content,school_id from ct_teach_coursewares_content where coursewares_id in (${id_list})"

 

2 一些主要代码:
2.1 步骤2中,ct_teach_coursewares表抽取id并拼接:
insert overwrite local directory '${MODE_PATH}/${hive_table_column}'
ROW FORMAT DELIMITED
fields terminated by '\001'
STORED AS TEXTFILE
select
concat_ws(',',collect_set(concat('\'',coursewares_id,'\'')))
from  stg_ct_teach_coursewares where day='${com_present_date_Y_m_d}' and  update_time >= '${com_present_date_Y_m_d}' AND update_time < '${com_after_date_Y_m_d}'

 

----->  因为查询只查询一个 coursewares_id 因此在使用 collect_set 列传行的时候 没有使用 group by

         collect_set里参数是字符串,因此 coursewares_id使用了单引号来拼接, 最后用concat_ws将多个字符串中间用,间隔 最终构建成  '11','22','33'的效果

 

 

2.2 总调用sh写法:
 #通过起始时间和天数获取需要处理的数据的具体时间
        comAddDay $com_start_date $c
        hive_table=stg_ct_teach_coursewares_content
        funDropTmpTable
  
  #0 获取待统计的授课课件ID
  ct_teach_coursewares_column="coursewares_id"
  exportIdFromHiveTable "${ct_teach_coursewares_column}"
  echo "==============================================="
  #查询文件
  echo "id所在的,目录: ${MODE_PATH}/${ct_teach_coursewares_column}/ "
  id_list=`cat ${MODE_PATH}/${ct_teach_coursewares_column}/*`
  #echo "${id_list}"
        if [ "${id_list}" == "" ];then
            echo "没有待抽取的评价ID数据,退出。"
            exit 0
        else
            echo "${id_list}"
  

  ### 1 抽取数据 
     query="select coursewares_id,belong_type,content,school_id from ct_teach_coursewares_content where coursewares_id in (${id_list})"
           query_count="SELECT COUNT(1) FROM ct_teach_coursewares_content where coursewares_id in (${id_list})"
     tableIsSharding="true"
           comGetSqoopHdfs
          
           funDirToTmpTable
     tabledir="$output_dir"
           echo $tabledir
           stgCheck $tabledir
           funTmpToStg
           funDropTmpTable
  fi 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Increment_Backup_To_Hive:一个增量备份关系数据库(MySQL, PostgreSQL, SQL Server, SQLite, Oracle等)到hive的php脚本工具

    增量备份时,脚本主要根据表的自增主键来查询新增数据,然后把新增的数据按照hive表的格式导出成文本,最后调用hive命令直接把文本导入hive内部。支持压缩表+多分区+字段增减。环境脚本内部会调用hive命令,所以必须...

    使用kettle将mysql中的数据导入到hive中

    先说说思路:1是直接从mysql输出至hive中,另外一个是先将数据搞到hdfs中,再load一下,当然这只是textfile存储格式而言,若是ORC等其他列式存储的话建议先搞一个textfile格式的中间表,再insert into table select ...

    sqoop1.x 导入数据

    - **增量导入**:当数据表包含时间列时,可通过 `where '$do_date' = 'date_format(create_time, '%Y-%m-%d')'` 实现基于特定日期的数据增量导入。 - **更新及变更数据处理**:类似于增量导入,但可能需要根据实际...

    airflow_demo:使用Sqoop从Mysql向Hive导入增量数据的Airflow脚本

    代码演示,如何编写基本的Airflow以实现从Mysql到Hive的增量导入。 #问题陈述:-MySQL具有名为'employee_profile'的表,该表具有雇员信息,包括名字,姓氏和SSN。 脚本应检查表中是否有新记录和修改过的记录,并...

    【63课时完整版】大数据实践HIVE详解及实战

    31.Sqoop中的增量导入与Sqoop job 32.Sqoop将MySQL数据导入Hive表中 33.Sqoop的导出及脚本中使用的方式 34.案例分析-动态分区的实现 35.案例分析-源表的分区加载创建 36.案例分析-指标分析使用Sqoop导出 第4章:...

    sqoop-jar.zip

    - **集成Sqoop**: 使用`--hive-import`选项,可以直接将导入的数据创建为Hive表,方便后续使用HQL进行数据分析。 5. **使用注意事项** - **元数据同步**: 当从数据库导入数据到Hadoop时,Sqoop并不自动处理数据库...

    实战Sqoop数据导入及大数据用户行为案例分析

    07_Sqoop中的增量导入与Sqoop job 08_Sqoop将MySQL数据导入Hive表中 09_Sqoop的导出及脚本中使用的方式 10_案例分析-动态分区的实现 11_案例分析-源表的分区加载创建 12_案例分析-指标分析使用Sqoop导出

    load_data_incr_sqoop.zip

    标题“load_data_incr_sqoop.zip”提示我们这是一个与数据加载相关的压缩文件,特别是使用了Sqoop工具进行从SQL Server到Hive的增量导入。描述进一步指出,这个过程是逐步进行的,旨在有效地处理数据库的新数据。...

    利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka

    Flume可以将MySQL的数据写入HDFS,以便进行后续的批处理分析或者作为其他Hadoop服务(如Hive、Pig等)的数据源。 4. **Kafka**: Kafka是一个高吞吐量的分布式消息系统,通常用作实时数据管道,将数据从一个位置传输...

    EasyDataInsight项目Hive 数据同步到Mysql的实现方法1

    这个流程确保了从Hive到MySQL的数据一致性,同时也提供了增量同步的能力,即只同步自上次同步以来发生变化的数据。通过sqoop工具,可以高效地完成大量数据的导入导出工作,减少了不必要的网络传输和计算资源的消耗。...

    Apache Hadoop---Sqoop.docx

    - 数据导入:Sqoop 可以从常见的关系型数据库如 MySQL、Oracle、PostgreSQL 等中将结构化数据高效地导入到 HDFS 或 Hive 中,支持批量和增量导入。 - 数据导出:同样,Sqoop 也能将 HDFS 或 Hive 中的数据导出回 ...

    hadoop权威指南

    - 支持增量导入/导出。 - **应用场景**: - 数据迁移。 - 数据同步。 #### 十一、案例分析 - **示例**: - Web日志分析。 - 用户行为分析。 - 推荐系统构建。 - **实践要点**: - 数据清洗和预处理。 - 算法...

    Hadoop Real-World Solutions Cookbook - Second Edition

    - **增量导入**:支持增量方式导入数据,避免全量导入带来的开销。 - **并行处理**:通过多线程并行处理数据,提高导入导出速度。 - **数据类型转换**:自动处理不同数据类型之间的转换。 ##### 8. Apache Spark:...

    hive_labs:Hive,Sqoop相关实验室

    7. 故障恢复和增量导入:学习如何处理数据迁移过程中的错误,以及如何进行增量数据导入。 在这个“Hive,Sqoop相关实验室”中,你将有机会实践这些概念,并通过Shell脚本自动化这些流程。这将帮助你更好地理解和...

    sqoop-1.4.5.bin__hadoop-2.6.0.zip

    5. 增量导入:通过 `--incremental` 和 `--check-column` 参数实现只导入新数据,以节省资源。 6. 错误处理和调试:利用 Sqoop 的日志功能跟踪错误,以便调试和优化。 总的来说,Sqoop 是大数据领域的重要工具,...

    5.sqoop安装与配置.pdf

    - **增量导入**:仅导入自上次导入以来更改的数据,节省资源。 - **导出更新和删除**:支持将 MapReduce 作业的结果作为更新或删除操作回写到数据库。 - **合并功能**:在导入过程中合并已存在的数据,避免覆盖...

    MaxCompute数据开发实战—数据进入MaxCompute的N种方式.pdf

    - **DataWorks数据集成**:DataWorks提供了一个图形化的数据同步任务定义界面,用户可以通过向导或脚本模式配置数据来源和流向,进行数据字段映射、数据过滤和加载控制,支持初始化和增量同步。 2. **实时数据上云...

    datax插件verticawriter

    4. **编写作业脚本**:描述数据源和目标之间的数据转换规则,以及如何进行增量同步。 5. **执行任务**:启动DataX任务,执行数据同步。 6. **监控与调试**:通过日志查看任务执行情况,如果出现问题,可以依据错误...

Global site tag (gtag.js) - Google Analytics