0 现象描述:
ct_teach_coursewares_content从mysql抽取数据到hive仓库时,抽取时间过长,以前是全量抽取,现在计划修改成增量抽取,
其中,ct_teach_coursewares_content 和 ct_teach_coursewares 通过 coursewares_id 授课课件ID 关联, ct_teach_coursewares表中 update_time是增加索引了的
1 修改流程:
1 ct_teach_coursewares(老师授课表) 这边一直是全量抽取的,这边抽取速度也挺快,几分钟就结束,逻辑不动
2 在hive中将抽取过来的 ct_teach_coursewares_content(课件内容表) 按照 select coursewares_id from ct_teach_coursewares where update_time>='${com_present_date_Y_m_d}' and update_time<'${com_after_date_Y_m_d}' 得到增量的 id 写到本地磁盘
3.0 id_list=`cat ${MODE_PATH}/${ct_teach_coursewares_column}/*` 将本地磁盘的id列出来,其中 id格式需要拼接成 '11','22','33'这种方式--- 带单引号是因为coursewares_id是varchar类型
3.1 ct_teach_coursewares_content 在抽取的时候根据2的id去mysql中抽取,query="select coursewares_id,belong_type,content,school_id from ct_teach_coursewares_content where coursewares_id in (${id_list})"
2 一些主要代码:
2.1 步骤2中,ct_teach_coursewares表抽取id并拼接:
insert overwrite local directory '${MODE_PATH}/${hive_table_column}'
ROW FORMAT DELIMITED
fields terminated by '\001'
STORED AS TEXTFILE
select
concat_ws(',',collect_set(concat('\'',coursewares_id,'\'')))
from stg_ct_teach_coursewares where day='${com_present_date_Y_m_d}' and update_time >= '${com_present_date_Y_m_d}' AND update_time < '${com_after_date_Y_m_d}'
-----> 因为查询只查询一个 coursewares_id 因此在使用 collect_set 列传行的时候 没有使用 group by
collect_set里参数是字符串,因此 coursewares_id使用了单引号来拼接, 最后用concat_ws将多个字符串中间用,间隔 最终构建成 '11','22','33'的效果
2.2 总调用sh写法:
#通过起始时间和天数获取需要处理的数据的具体时间
comAddDay $com_start_date $c
hive_table=stg_ct_teach_coursewares_content
funDropTmpTable
#0 获取待统计的授课课件ID
ct_teach_coursewares_column="coursewares_id"
exportIdFromHiveTable "${ct_teach_coursewares_column}"
echo "==============================================="
#查询文件
echo "id所在的,目录: ${MODE_PATH}/${ct_teach_coursewares_column}/ "
id_list=`cat ${MODE_PATH}/${ct_teach_coursewares_column}/*`
#echo "${id_list}"
if [ "${id_list}" == "" ];then
echo "没有待抽取的评价ID数据,退出。"
exit 0
else
echo "${id_list}"
### 1 抽取数据
query="select coursewares_id,belong_type,content,school_id from ct_teach_coursewares_content where coursewares_id in (${id_list})"
query_count="SELECT COUNT(1) FROM ct_teach_coursewares_content where coursewares_id in (${id_list})"
tableIsSharding="true"
comGetSqoopHdfs
funDirToTmpTable
tabledir="$output_dir"
echo $tabledir
stgCheck $tabledir
funTmpToStg
funDropTmpTable
fi
相关推荐
增量备份时,脚本主要根据表的自增主键来查询新增数据,然后把新增的数据按照hive表的格式导出成文本,最后调用hive命令直接把文本导入hive内部。支持压缩表+多分区+字段增减。环境脚本内部会调用hive命令,所以必须...
先说说思路:1是直接从mysql输出至hive中,另外一个是先将数据搞到hdfs中,再load一下,当然这只是textfile存储格式而言,若是ORC等其他列式存储的话建议先搞一个textfile格式的中间表,再insert into table select ...
- **增量导入**:当数据表包含时间列时,可通过 `where '$do_date' = 'date_format(create_time, '%Y-%m-%d')'` 实现基于特定日期的数据增量导入。 - **更新及变更数据处理**:类似于增量导入,但可能需要根据实际...
代码演示,如何编写基本的Airflow以实现从Mysql到Hive的增量导入。 #问题陈述:-MySQL具有名为'employee_profile'的表,该表具有雇员信息,包括名字,姓氏和SSN。 脚本应检查表中是否有新记录和修改过的记录,并...
31.Sqoop中的增量导入与Sqoop job 32.Sqoop将MySQL数据导入Hive表中 33.Sqoop的导出及脚本中使用的方式 34.案例分析-动态分区的实现 35.案例分析-源表的分区加载创建 36.案例分析-指标分析使用Sqoop导出 第4章:...
- **集成Sqoop**: 使用`--hive-import`选项,可以直接将导入的数据创建为Hive表,方便后续使用HQL进行数据分析。 5. **使用注意事项** - **元数据同步**: 当从数据库导入数据到Hadoop时,Sqoop并不自动处理数据库...
07_Sqoop中的增量导入与Sqoop job 08_Sqoop将MySQL数据导入Hive表中 09_Sqoop的导出及脚本中使用的方式 10_案例分析-动态分区的实现 11_案例分析-源表的分区加载创建 12_案例分析-指标分析使用Sqoop导出
标题“load_data_incr_sqoop.zip”提示我们这是一个与数据加载相关的压缩文件,特别是使用了Sqoop工具进行从SQL Server到Hive的增量导入。描述进一步指出,这个过程是逐步进行的,旨在有效地处理数据库的新数据。...
Flume可以将MySQL的数据写入HDFS,以便进行后续的批处理分析或者作为其他Hadoop服务(如Hive、Pig等)的数据源。 4. **Kafka**: Kafka是一个高吞吐量的分布式消息系统,通常用作实时数据管道,将数据从一个位置传输...
这个流程确保了从Hive到MySQL的数据一致性,同时也提供了增量同步的能力,即只同步自上次同步以来发生变化的数据。通过sqoop工具,可以高效地完成大量数据的导入导出工作,减少了不必要的网络传输和计算资源的消耗。...
- 数据导入:Sqoop 可以从常见的关系型数据库如 MySQL、Oracle、PostgreSQL 等中将结构化数据高效地导入到 HDFS 或 Hive 中,支持批量和增量导入。 - 数据导出:同样,Sqoop 也能将 HDFS 或 Hive 中的数据导出回 ...
- 支持增量导入/导出。 - **应用场景**: - 数据迁移。 - 数据同步。 #### 十一、案例分析 - **示例**: - Web日志分析。 - 用户行为分析。 - 推荐系统构建。 - **实践要点**: - 数据清洗和预处理。 - 算法...
- **增量导入**:支持增量方式导入数据,避免全量导入带来的开销。 - **并行处理**:通过多线程并行处理数据,提高导入导出速度。 - **数据类型转换**:自动处理不同数据类型之间的转换。 ##### 8. Apache Spark:...
7. 故障恢复和增量导入:学习如何处理数据迁移过程中的错误,以及如何进行增量数据导入。 在这个“Hive,Sqoop相关实验室”中,你将有机会实践这些概念,并通过Shell脚本自动化这些流程。这将帮助你更好地理解和...
5. 增量导入:通过 `--incremental` 和 `--check-column` 参数实现只导入新数据,以节省资源。 6. 错误处理和调试:利用 Sqoop 的日志功能跟踪错误,以便调试和优化。 总的来说,Sqoop 是大数据领域的重要工具,...
- **增量导入**:仅导入自上次导入以来更改的数据,节省资源。 - **导出更新和删除**:支持将 MapReduce 作业的结果作为更新或删除操作回写到数据库。 - **合并功能**:在导入过程中合并已存在的数据,避免覆盖...
- **DataWorks数据集成**:DataWorks提供了一个图形化的数据同步任务定义界面,用户可以通过向导或脚本模式配置数据来源和流向,进行数据字段映射、数据过滤和加载控制,支持初始化和增量同步。 2. **实时数据上云...
4. **编写作业脚本**:描述数据源和目标之间的数据转换规则,以及如何进行增量同步。 5. **执行任务**:启动DataX任务,执行数据同步。 6. **监控与调试**:通过日志查看任务执行情况,如果出现问题,可以依据错误...