sqoop目前用的比较多的是数据仓库的建立,在很多数情况下,同步一个表,需要些sqoop脚本,来同步表,有的时候执行成功与否,数据量多少,数据源表是否更新,都不清楚,而且脚本文件不便于管理。
本次介绍的数据平台具有专业的管理功能。
下面是表的设计
DROP TABLE IF EXISTS task_info; CREATE TABLE task_info(task_id INT,database_id INT,db_table VARCHAR(100), hive_table VARCHAR(100), db_columns VARCHAR(5000), where_express VARCHAR(200) , is_hive_import VARCHAR(2), hive_partition_key VARCHAR(10), mappers INT, split_by VARCHAR(10) , is_use VARCHAR(2), priority INT, cron VARCHAR(100), alter_date longtext, meta_data longtext, PRIMARY KEY (task_id)); DROP TABLE IF EXISTS db_info; CREATE TABLE db_info(id INT, uname VARCHAR(100) , password VARCHAR(500), url VARCHAR(200),PRIMARY KEY (id)); DROP TABLE IF EXISTS task_log; CREATE TABLE task_log(name VARCHAR(100) , begin_time VARCHAR(100), end_time VARCHAR(100), result VARCHAR(100), data_num VARCHAR(50), log longtext);
将sqoop需要的相关参数可配置化,包括数据源表也实行配置化。当然如果要改变就执行执行一个sql,让调度程序注意到就行了。这是一个定时执行的。
接下来是核心的具体代码
public class SecureEncrypter implements Encrypter{ private int seed = 0; public SecureEncrypter(int seed){ this.seed=seed; } @Override public String encode(String input) { StringBuilder builder =new StringBuilder(); for(int i=0;i<input.length();i++){ int k =(int)input.charAt(i); //int code = new Integer(k^flag); builder.append("u" + new Integer(k^seed).toString()); } return builder.substring(1); } @Override public String decode(String input) { String[] arr=input.split("u"); StringBuilder builder= new StringBuilder(); for(String str:arr){ int t=Integer.valueOf(str); t = t ^ seed; builder.append((char)t); } return builder.toString(); } }
上面是密码的加密解密,有些密码不能轻易透漏给他人必须加密处理,然后解密作为sqoop的密码参数
public class HDFSExtract implements Extract { /** * @param args * 该类负责管理HDFS的数据抽取工作 */ public static Logger log = Logger.getLogger(HDFSExtract.class); private Connection con = null; private Statement st = null; private ResultSet rs = null; private int task_id; private String db_user = null; private String db_password = null; private String db_table = null; private String db_columns = null; private String url = null; private String where = null; private String is_hive_import = null; private String hive_table = null; private String hive_partition_key = null; private String split_by = null; private String count_date = null; private String mappers= null; public void extract(TaskInfo tInfo) { // 1 读取配置表内容 yhd_extract_to_hdfs //count_date 默认为yesterday count_date = ManaUtils.getData(-1); try { if(tInfo.getAlter_date() != null && !"".equals(tInfo.getAlter_date())) { count_date = tInfo.getAlter_date(); } String sql = "select d.uname,d.password,d.url from db_info d where d.id = " ; sql += tInfo.getDatabase_id(); log.info("sql: "+sql); con = DBConnection.getInstance(); st = con.createStatement(); rs = st.executeQuery(sql); Runtime run = null; if(rs.next()) { ETL_vo vo = new ETL_vo(); setFromResultSet(rs, tInfo); vo.setTableName(hive_table); run = Runtime.getRuntime(); try { run.exec("hadoop fs -rmr "+db_table.trim()); run.exec("hadoop fs -rmr "+db_table.trim().toUpperCase()); run.exec("hadoop fs -rmr "+db_table.trim().toLowerCase()); log.info("Begin excuete task "+task_id+" "+hive_table+" ......"+ManaUtils.sdf.format(new Date())) ; vo.setBeginTime(ManaUtils.sdf.format(new Date())); vo.setResult("SUCCESS"); System.out.println(toString(createOptions())); Process p = run.exec(createOptions()); ManaUtils.exeOption(p, tInfo, st, null); } catch (Exception e) { e.printStackTrace(); } } log.info("Finish Extracting ! "+ManaUtils.sdf.format(new Date())) ; } catch (Exception e) { log.info("Error: extract fail !"); e.printStackTrace(); return ; } finally{ // try { // rs.close(); // st.close(); // con.close(); // } catch (SQLException e) { // e.printStackTrace(); // } } } public void setFromResultSet(ResultSet rs, TaskInfo tInfo) throws SQLException { task_id = tInfo.getTask_id(); db_user = rs.getString("uname").trim(); db_password = rs.getString("password").trim(); url = rs.getString("url"); db_table = tInfo.getDb_table(); db_columns = tInfo.getDb_columns(); where = tInfo.getWhere_express(); is_hive_import = tInfo.getIs_hive_import(); hive_table = tInfo.getHive_table(); hive_partition_key = tInfo.getHive_partition_key(); mappers = tInfo.getMappers().toString(); split_by = tInfo.getSplit_by(); } public String toString(String[] args) { StringBuffer stringBuffer = new StringBuffer(); // System.out.println("---shell---"); for(String a : args) { // System.out.println(a); stringBuffer.append(a); stringBuffer.append(" "); } // System.out.println("---shell---"); return stringBuffer.toString(); } public String[] createOptions() throws Exception { List<String> optSb = new ArrayList<String>(); optSb.add("sqoop"); optSb.add("import"); optSb.add("-D"); optSb.add("mapred.job.queue.name=pms"); optSb.add("--connect"); optSb.add(url); optSb.add("--username"); optSb.add(db_user); optSb.add("--password"); optSb.add(EncrypterFactory.getEncryperInstance(1).decode(db_password)); if (mappers != null && ! "".equals(mappers)) { optSb.add("-m"); optSb.add(mappers); } if (split_by != null && !"".equals(split_by)) { optSb.add("--split-by"); optSb.add(split_by.toUpperCase()); } optSb.add("--null-string"); optSb.add("''"); optSb.add("--table"); optSb.add(db_table); //以下是数据表非必填项,需要判断 if(db_columns != null && ! "".equals(db_columns)){ optSb.add("--columns"); optSb.add("\""+db_columns.toUpperCase()+"\""); } if (where != null && ! "".equals(where)) { optSb.add("--where"); where = where.replaceAll("\\$TXDATE", "'"+count_date+"'") ; optSb.add("\""+where.trim()+"\""); } if (is_hive_import != null && ! "".equals(is_hive_import) && "Y".equalsIgnoreCase(is_hive_import)) { optSb.add("--hive-overwrite"); // 统一为覆盖模式,要求hive 表必须存在 optSb.add("--hive-import"); optSb.add("--hive-drop-import-delims"); if (hive_table == null || "".equals(hive_table)) { log.info("Error: hive_table must be set 当--hive-import时 !"); }else { optSb.add("--hive-table"); optSb.add(hive_table.trim()); } if (hive_partition_key != null && !"".equals(hive_partition_key)) { optSb.add("--hive-partition-key"); optSb.add(hive_partition_key.trim()); optSb.add("--hive-partition-value"); optSb.add("\""+count_date.trim()+"\""); } } optSb.add("--null-string"); optSb.add("'\\\\N'"); optSb.add("--null-non-string"); optSb.add("'\\\\N'"); return optSb.toArray(new String[0]); } }
在很多情况下,需要每天增量的同步,条件里需要加上日期相关
相关推荐
《Hadoop系统搭建及项目实践》课件10Hadoop 与RDMS数据迁移工具Sqoop.pdf《Hadoop系统搭建及项目实践》课件10Hadoop 与RDMS数据迁移工具Sqoop.pdf《Hadoop系统搭建及项目实践》课件10Hadoop 与RDMS数据迁移工具Sqoop...
在大数据处理领域,Sqoop和Hive是两个重要的工具,分别用于数据迁移和数据仓库管理。当使用Sqoop将数据导入Hive时,有时可能会遇到数据不一致的问题,这可能是由于多种原因引起的。本文将深入探讨这个问题,并提供...
通过对上述知识点的理解和实践,你将能够有效地利用 Sqoop 工具进行 MySQL 数据库到 Hadoop 集群的数据迁移,同时通过观察并发、数据量等因素对性能的影响,可以优化 Sqoop 的配置,提升数据处理的效率。
【大数据实践-Sqoop数据导入导出】 Sqoop是一个用于在关系型数据库和Hadoop之间进行数据迁移的工具。在大数据处理中,Sqoop扮演着关键角色,它使得传统数据库中的数据可以方便地导入到Hadoop的HDFS(Hadoop ...
本文档旨在帮助读者理解如何使用sqoop2的Java API将数据从Oracle数据库迁移至HDFS(Hadoop Distributed File System),同时分享了作者在实践中遇到的一些问题及解决方案。为确保能够顺利地运行示例代码,建议先按照...
大数据领域的数据迁移是至关重要的一个环节,它涉及到数据的整合、分析和业务连续性。本资料汇总主要聚焦于三大常用的数据迁移工具:...通过深入学习和实践,你可以更好地驾驭这些工具,提升数据迁移的效率和质量。
### Sqoop在Hadoop平台下的应用与实践 #### 一、Sqoop简介 Sqoop是一款开源工具,主要用于在Hadoop和关系型数据库之间高效地传输数据。它支持多种关系型数据库,如MySQL、Oracle等,并且能够利用MapReduce作业来...
阅读建议:读者应该先熟悉Hadoop、HBase及Sqoop的基本概念,然后根据自身项目需求选取文中对应章节深入学习和尝试实践,特别注意操作前后进行必要的验证测试,以确保数据的准确无误。此外,由于技术不断更新换代,...
Sqoop 是 Apache 开源项目中一个用于在关系数据库与 Hadoop 之间进行数据导入导出的工具。在大数据处理场景中,Sqoop 提供了一种高效、方便的方式将结构化数据从传统数据库如 MySQL、Oracle 等迁移到 Hadoop 的 HDFS...
使用场景及目标:旨在帮助技术人员熟练掌握利用 Sqoop 实现大规模数据在异构存储系统间迁移的最佳实践,从而提高数据处理效率。通过具体实例的操作,使用户能够在生产环境中安全稳定地应用这些技能。 其他说明:文档...
Sqoop是Apache Hadoop生态中的一个工具,...此外,保持 Sqoop 和相关依赖库的更新也是避免类似问题的重要实践。在开发和管理大数据项目时,理解和处理这类问题的能力至关重要,因为它直接影响到数据的流动和分析效率。
1. **SQL-to-Hadoop**: Sqoop 的核心功能是实现 SQL 数据库与 Hadoop 平台之间的数据交换。 2. **桥梁角色**: Sqoop 作为一个中间件,连接了传统的 RDBMS 系统与 Hadoop 生态系统,使得数据在两者间可以便捷地迁移...
《Apache Sqoop Cookbook》这本书是针对Apache Sqoop这一开源数据导入导出工具的实用指南,旨在帮助用户深入了解和熟练运用Sqoop在Hadoop生态系统中的数据迁移功能。Sqoop作为一个高效、方便的工具,使得从传统的...
1. 数据导入:Sqoop 可以高效地将结构化的数据从传统的 RDBMS 导入到 Hadoop 的 HDFS 中,支持批处理和增量导入,使得大数据分析能够利用到更多的历史数据。 2. 数据导出:同样,Sqoop 也允许用户将 HDFS 或 HBase ...
从零开始讲解大数据业务及数据采集和迁移需求,以案例驱动的方式讲解基于Sqoop构建高性能的分布式数据迁移和同步平台。 课程亮点 1,知识体系完备,从小白到大神各阶段读者均能学有所获。 2,生动形象,化繁为简,...
Sqoop是Apache Hadoop生态中的一个工具,专门用于在关系型数据库(如MySQL、Oracle等)和Hadoop的HDFS之间高效地导入导出数据。Sqoop-1.4.5是该工具的一个版本,它与Hadoop 2.x系列兼容,特别是与Hadoop 2.6版本配合...
提供关于如何确保 Sqoop 数据传输安全性的建议和最佳实践。 #### 八、许可 Sqoop 采用 Apache Software License v2 许可协议发布。 以上内容概括了 Sqoop2-1.99.7 文档的主要部分,包括安装指南、工具、用户和...
5. **数据治理**:通过定义数据治理规则和策略,Apache Atlas可以帮助企业确保数据质量、一致性,并促进数据治理的最佳实践。 然后,我们来看看基于CDH 6.3.1的Apache Atlas 2.1.0与Sqoop Hook的结合: 1. **兼容...
快手数据中台建设:大数据服务化之路 快手离线数据全链路分级保障平台化建设 快手万亿级实时 OLAP 平台的建设与实践 ClickHouse在快手的大规模应用与架构改进 美团点评酒旅数据仓库建设实践 美团酒旅数据治理实践 ...
标题《AS深圳2017《airbnb数据平台实践》》和描述《AS深圳2017《airbnb数据平台实践》,airbnb数据平台解析》共同表明这是一篇关于Airbnb公司内部数据平台实施经验的分享。 从提供的内容片段中,我们可以梳理出以下...