`

数据平台之sqoop实践

 
阅读更多

sqoop目前用的比较多的是数据仓库的建立,在很多数情况下,同步一个表,需要些sqoop脚本,来同步表,有的时候执行成功与否,数据量多少,数据源表是否更新,都不清楚,而且脚本文件不便于管理。

本次介绍的数据平台具有专业的管理功能。

下面是表的设计

DROP TABLE IF EXISTS task_info; CREATE TABLE task_info(task_id INT,database_id INT,db_table VARCHAR(100), hive_table VARCHAR(100), db_columns VARCHAR(5000), where_express VARCHAR(200)
, is_hive_import VARCHAR(2), hive_partition_key VARCHAR(10), mappers INT, split_by VARCHAR(10) , is_use VARCHAR(2), priority INT, cron VARCHAR(100), alter_date longtext, meta_data longtext, PRIMARY KEY (task_id));

DROP TABLE IF EXISTS db_info; CREATE TABLE db_info(id INT, uname VARCHAR(100) , password VARCHAR(500), url VARCHAR(200),PRIMARY KEY (id));

DROP TABLE IF EXISTS task_log; CREATE TABLE task_log(name VARCHAR(100) , begin_time VARCHAR(100), end_time VARCHAR(100), result VARCHAR(100), data_num VARCHAR(50), log longtext);

 

将sqoop需要的相关参数可配置化,包括数据源表也实行配置化。当然如果要改变就执行执行一个sql,让调度程序注意到就行了。这是一个定时执行的。

接下来是核心的具体代码

public class SecureEncrypter implements Encrypter{
	
	private int seed = 0;
	
	public SecureEncrypter(int seed){
		this.seed=seed;	
	}	

	@Override
	public String encode(String input) {
		StringBuilder builder =new  StringBuilder();
		for(int i=0;i<input.length();i++){
			int k =(int)input.charAt(i);
			//int code = new Integer(k^flag);
			builder.append("u" + new Integer(k^seed).toString());
		}
		return builder.substring(1);
	}

	@Override
	public String decode(String input) {
		String[] arr=input.split("u");
		StringBuilder builder= new StringBuilder();
		for(String str:arr){
			int t=Integer.valueOf(str);
			t = t ^ seed;
			builder.append((char)t);
		}		
		return builder.toString();
	}

	
	
}

 

上面是密码的加密解密,有些密码不能轻易透漏给他人必须加密处理,然后解密作为sqoop的密码参数

 

public class HDFSExtract implements Extract {

	/**
	 * @param args
	 * 该类负责管理HDFS的数据抽取工作
	 */
	public static Logger log = Logger.getLogger(HDFSExtract.class);
	private Connection con = null;
	private Statement st = null;
	private ResultSet rs = null;
	
	private int	task_id;
	private String	db_user             = null;
	private String	db_password         = null;
        
	private String	db_table            = null;
	private String	db_columns          = null;
	private String	url                 = null;
	private String	where               = null;
        
	private String	is_hive_import      = null;
	private String	hive_table          = null;
	private String	hive_partition_key  = null;
	private String split_by             = null;
	private String count_date           = null;
	
	private String	mappers= null;
	
	public void extract(TaskInfo tInfo)
	{
		// 1 读取配置表内容 yhd_extract_to_hdfs
		//count_date 默认为yesterday
		count_date = ManaUtils.getData(-1);
		try {
			if(tInfo.getAlter_date() != null && !"".equals(tInfo.getAlter_date())) {
				count_date = tInfo.getAlter_date();
			}
			String sql = "select d.uname,d.password,d.url from db_info d where d.id = " ;
			sql += tInfo.getDatabase_id();
			log.info("sql: "+sql);
			con = DBConnection.getInstance();
			st = con.createStatement();
			rs = st.executeQuery(sql);
			Runtime run = null;
			if(rs.next())
			{
				ETL_vo vo = new ETL_vo();
				setFromResultSet(rs, tInfo);
				vo.setTableName(hive_table);
					run = Runtime.getRuntime();
					try {
						run.exec("hadoop fs -rmr "+db_table.trim());
						run.exec("hadoop fs -rmr "+db_table.trim().toUpperCase());
						run.exec("hadoop fs -rmr "+db_table.trim().toLowerCase());
						log.info("Begin excuete task "+task_id+" "+hive_table+" ......"+ManaUtils.sdf.format(new Date())) ;
						vo.setBeginTime(ManaUtils.sdf.format(new Date()));
						vo.setResult("SUCCESS");
						System.out.println(toString(createOptions()));
						Process p = run.exec(createOptions());
						ManaUtils.exeOption(p, tInfo, st, null);
					} catch (Exception e) {
						e.printStackTrace();
					}
			}
			log.info("Finish Extracting ! "+ManaUtils.sdf.format(new Date())) ;
			
		} catch (Exception e) {
			log.info("Error: extract fail !");
			e.printStackTrace();
			return ;
		}
		finally{
//			try {
//				rs.close();
//				st.close();
//				con.close();
//			} catch (SQLException e) {
//				e.printStackTrace();
//			}
			
		}
	}
	
	public void setFromResultSet(ResultSet rs, TaskInfo tInfo) throws SQLException
	{
		task_id             = tInfo.getTask_id();
		db_user             = rs.getString("uname").trim();
		db_password         = rs.getString("password").trim();
		url                 = rs.getString("url");
		                    
		db_table            = tInfo.getDb_table();
		db_columns          = tInfo.getDb_columns();
		where               = tInfo.getWhere_express();
		                    
		is_hive_import      = tInfo.getIs_hive_import();
		hive_table          = tInfo.getHive_table();
		hive_partition_key  = tInfo.getHive_partition_key();
		mappers              = tInfo.getMappers().toString(); 
		split_by             = tInfo.getSplit_by(); 
	}
	
	public String toString(String[] args) {
		StringBuffer stringBuffer = new StringBuffer();
//		System.out.println("---shell---");
		for(String a : args) {
//			System.out.println(a);
			stringBuffer.append(a);
			stringBuffer.append(" ");
		}
//		System.out.println("---shell---");
		return stringBuffer.toString();
	}
	
	public String[] createOptions() throws Exception
	{
		List<String> optSb = new ArrayList<String>();
		optSb.add("sqoop");
		optSb.add("import");
		optSb.add("-D");
		optSb.add("mapred.job.queue.name=pms");
		optSb.add("--connect");
		optSb.add(url);
		optSb.add("--username");
		optSb.add(db_user);
		optSb.add("--password");
		optSb.add(EncrypterFactory.getEncryperInstance(1).decode(db_password));
		if (mappers != null && ! "".equals(mappers)) 
		{  
			optSb.add("-m");
			optSb.add(mappers);
		}
		if (split_by != null && !"".equals(split_by)) {
			optSb.add("--split-by");
			optSb.add(split_by.toUpperCase());
		}
		
		optSb.add("--null-string");
		optSb.add("''");
		
		optSb.add("--table");
		optSb.add(db_table);
		//以下是数据表非必填项,需要判断
		if(db_columns != null && ! "".equals(db_columns)){
			optSb.add("--columns");
			optSb.add("\""+db_columns.toUpperCase()+"\"");
		}
		if (where != null && ! "".equals(where)) {
			optSb.add("--where");
			where = where.replaceAll("\\$TXDATE", "'"+count_date+"'") ;
			optSb.add("\""+where.trim()+"\"");
		}
		
		if (is_hive_import != null && ! "".equals(is_hive_import)
				&& "Y".equalsIgnoreCase(is_hive_import))
		{
			optSb.add("--hive-overwrite");  // 统一为覆盖模式,要求hive 表必须存在
			optSb.add("--hive-import");
			optSb.add("--hive-drop-import-delims");
			
			if (hive_table == null || "".equals(hive_table)) {
				log.info("Error: hive_table must be set 当--hive-import时 !");
			}else {
				optSb.add("--hive-table");
				optSb.add(hive_table.trim());
			}
			if (hive_partition_key != null && !"".equals(hive_partition_key)) {
				optSb.add("--hive-partition-key");
				optSb.add(hive_partition_key.trim());
				optSb.add("--hive-partition-value");
				optSb.add("\""+count_date.trim()+"\"");
			}
		}
		optSb.add("--null-string");
		optSb.add("'\\\\N'");
		optSb.add("--null-non-string");
		optSb.add("'\\\\N'");
		return optSb.toArray(new String[0]);
	}
	
}

 

在很多情况下,需要每天增量的同步,条件里需要加上日期相关

分享到:
评论

相关推荐

    《Hadoop系统搭建及项目实践》课件10Hadoop 与RDMS数据迁移工具Sqoop.pdf

    《Hadoop系统搭建及项目实践》课件10Hadoop 与RDMS数据迁移工具Sqoop.pdf《Hadoop系统搭建及项目实践》课件10Hadoop 与RDMS数据迁移工具Sqoop.pdf《Hadoop系统搭建及项目实践》课件10Hadoop 与RDMS数据迁移工具Sqoop...

    sqoop导入数据到hive中,数据不一致

    在大数据处理领域,Sqoop和Hive是两个重要的工具,分别用于数据迁移和数据仓库管理。当使用Sqoop将数据导入Hive时,有时可能会遇到数据不一致的问题,这可能是由于多种原因引起的。本文将深入探讨这个问题,并提供...

    使用sqoop抽取mysql数据

    通过对上述知识点的理解和实践,你将能够有效地利用 Sqoop 工具进行 MySQL 数据库到 Hadoop 集群的数据迁移,同时通过观察并发、数据量等因素对性能的影响,可以优化 Sqoop 的配置,提升数据处理的效率。

    大数据实践-sqoop数据导入导出.doc

    【大数据实践-Sqoop数据导入导出】 Sqoop是一个用于在关系型数据库和Hadoop之间进行数据迁移的工具。在大数据处理中,Sqoop扮演着关键角色,它使得传统数据库中的数据可以方便地导入到Hadoop的HDFS(Hadoop ...

    sqoop2 java API从oracle导数据到HDFS开发总结

    本文档旨在帮助读者理解如何使用sqoop2的Java API将数据从Oracle数据库迁移至HDFS(Hadoop Distributed File System),同时分享了作者在实践中遇到的一些问题及解决方案。为确保能够顺利地运行示例代码,建议先按照...

    大数据系列2020-数据迁移工具资料汇总(sqoop、kettle、datax).zip

    大数据领域的数据迁移是至关重要的一个环节,它涉及到数据的整合、分析和业务连续性。本资料汇总主要聚焦于三大常用的数据迁移工具:...通过深入学习和实践,你可以更好地驾驭这些工具,提升数据迁移的效率和质量。

    hadoop平台下的数据导入导出工具sqoop

    ### Sqoop在Hadoop平台下的应用与实践 #### 一、Sqoop简介 Sqoop是一款开源工具,主要用于在Hadoop和关系型数据库之间高效地传输数据。它支持多种关系型数据库,如MySQL、Oracle等,并且能够利用MapReduce作业来...

    Sqoop与HBase间数据高效迁移实践与注意事项

    阅读建议:读者应该先熟悉Hadoop、HBase及Sqoop的基本概念,然后根据自身项目需求选取文中对应章节深入学习和尝试实践,特别注意操作前后进行必要的验证测试,以确保数据的准确无误。此外,由于技术不断更新换代,...

    sqoop-1.4.4-cdh5.0.6.tar

    Sqoop 是 Apache 开源项目中一个用于在关系数据库与 Hadoop 之间进行数据导入导出的工具。在大数据处理场景中,Sqoop 提供了一种高效、方便的方式将结构化数据从传统数据库如 MySQL、Oracle 等迁移到 Hadoop 的 HDFS...

    大数据处理技术中Sqoop与HBase的数据交互详解

    使用场景及目标:旨在帮助技术人员熟练掌握利用 Sqoop 实现大规模数据在异构存储系统间迁移的最佳实践,从而提高数据处理效率。通过具体实例的操作,使用户能够在生产环境中安全稳定地应用这些技能。 其他说明:文档...

    sqoop-1.4.7.zip

    Sqoop是Apache Hadoop生态中的一个工具,...此外,保持 Sqoop 和相关依赖库的更新也是避免类似问题的重要实践。在开发和管理大数据项目时,理解和处理这类问题的能力至关重要,因为它直接影响到数据的流动和分析效率。

    Sqoop学习文档(1){Sqoop基本概念、Sqoop的安装配置}.docx

    1. **SQL-to-Hadoop**: Sqoop 的核心功能是实现 SQL 数据库与 Hadoop 平台之间的数据交换。 2. **桥梁角色**: Sqoop 作为一个中间件,连接了传统的 RDBMS 系统与 Hadoop 生态系统,使得数据在两者间可以便捷地迁移...

    Apache Sqoop Cookbook

    《Apache Sqoop Cookbook》这本书是针对Apache Sqoop这一开源数据导入导出工具的实用指南,旨在帮助用户深入了解和熟练运用Sqoop在Hadoop生态系统中的数据迁移功能。Sqoop作为一个高效、方便的工具,使得从传统的...

    sqoop-1.4.6-cdh5.14.0

    1. 数据导入:Sqoop 可以高效地将结构化的数据从传统的 RDBMS 导入到 Hadoop 的 HDFS 中,支持批处理和增量导入,使得大数据分析能够利用到更多的历史数据。 2. 数据导出:同样,Sqoop 也允许用户将 HDFS 或 HBase ...

    Sqoop企业级大数据迁移方案全方位实战视频教程

    从零开始讲解大数据业务及数据采集和迁移需求,以案例驱动的方式讲解基于Sqoop构建高性能的分布式数据迁移和同步平台。 课程亮点 1,知识体系完备,从小白到大神各阶段读者均能学有所获。 2,生动形象,化繁为简,...

    sqoop-1.4.5

    Sqoop是Apache Hadoop生态中的一个工具,专门用于在关系型数据库(如MySQL、Oracle等)和Hadoop的HDFS之间高效地导入导出数据。Sqoop-1.4.5是该工具的一个版本,它与Hadoop 2.x系列兼容,特别是与Hadoop 2.6版本配合...

    sqoop2-1.99.7 documentation 英文文档

    提供关于如何确保 Sqoop 数据传输安全性的建议和最佳实践。 #### 八、许可 Sqoop 采用 Apache Software License v2 许可协议发布。 以上内容概括了 Sqoop2-1.99.7 文档的主要部分,包括安装指南、工具、用户和...

    apache-atlas-2.1.0-sqoop-hook.tar.gz--基于cdh6.3.1编译完成

    5. **数据治理**:通过定义数据治理规则和策略,Apache Atlas可以帮助企业确保数据质量、一致性,并促进数据治理的最佳实践。 然后,我们来看看基于CDH 6.3.1的Apache Atlas 2.1.0与Sqoop Hook的结合: 1. **兼容...

    【推荐】最强大数据学习与最佳实践资料合集(基础+架构+数仓+治理+案例)(100份).zip

    快手数据中台建设:大数据服务化之路 快手离线数据全链路分级保障平台化建设 快手万亿级实时 OLAP 平台的建设与实践 ClickHouse在快手的大规模应用与架构改进 美团点评酒旅数据仓库建设实践 美团酒旅数据治理实践 ...

    AS深圳2017《airbnb数据平台实践》

    标题《AS深圳2017《airbnb数据平台实践》》和描述《AS深圳2017《airbnb数据平台实践》,airbnb数据平台解析》共同表明这是一篇关于Airbnb公司内部数据平台实施经验的分享。 从提供的内容片段中,我们可以梳理出以下...

Global site tag (gtag.js) - Google Analytics