`

Spark Mysql to hdfs

 
阅读更多

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SQLContext

import org.apache.spark.{SparkConf, SparkContext}

 

 

object MysqlToHdfs {

   def main(args: Array[String]) {

 

     val conf = new SparkConf().setMaster("spark://ip:7077").setAppName("test")

     val sc = new SparkContext(conf)

     val sqlContext = new SQLContext(sc)

 

     val df1 = sqlContext.read.format("jdbc").options(Map("url" ->"jdbc:mysql://192.0.0.1:3306/db1","driver"->"com.mysql.jdbc.Driver","dbtable"->"(select field1  from table1) as t1","user"->"xxx","password"->"xxx")).load()

     val df2 = sqlContext.read.format("jdbc").options(Map("url" ->"jdbc:mysql://192.0.0.1:3306/db2","driver"->"com.mysql.jdbc.Driver","dbtable"->"(select b from t2 ) as t2","user"->"xxx","password"->"xxx")).load()

 

     df1.registerTempTable("aa")

     df2.registerTempTable("bb")

     val resDF = sqlContext.sql("SELECT \n t1 join t2 ")

 

     val hadoopConf = sc.hadoopConfiguration

     val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

     val path = new Path(args(0))

     if(hdfs.exists(path)){

       hdfs.delete(path,true)

     }

 

     resDF.rdd.map(r =>{

       r.get(0)+"\001"+r.get(1)+"\001"+r.get(2)+"\001"+r.get(3)+"\001"+r.get(4)+"\001"+r.get(5)+"\001"+r.get(6)+"\001"+r.get(7)+"\001"+r.get(8)+"\001"+r.get(9)+"\001"+r.get(10)

     }).repartition(3).saveAsTextFile(args(0))

     //resDF.rdd.repartition(3).saveAsTextFile(args(0))

   }

 

    val jdbc = "url";

    classOf[com.mysql.jdbc.Driver]

    val conn = DriverManager.getConnection(jdbc)

    //conn.prepareStatement("delete from t1").execute()

    conn.prepareStatement("sql").execute()

    val prep = conn.prepareStatement("INSERT INTO t1 " +

      "VALUES (?,?,?,?,?,?,?,?,?,?,?)"

    )

 

    val arr = targetDataResult.map(row=>{

      var r1 = row(0)

      val line = r1+"\001"+r2+"\001"+r3+"\001"+r4+"\001"+r5+"\001"+r6+"\001"+r7+"\001"+r8+"\001"+r9+"\001"+r10+"\001"+r11

      line

    }).collect()

 

    for( i <- arr){

      val arr1 =i.split("\001")

      prep.setString(1, arr1(0))

      prep.executeUpdate

    }

 

 }

 

分享到:
评论

相关推荐

    基于scala语言的spark操作,包含连接操作mysql,连接hdfs.zip

    本教程将探讨如何使用 Scala 语言来操作 Spark,并介绍如何与 MySQL 数据库和 HDFS(Hadoop 分布式文件系统)进行交互。以下是相关知识点的详细说明: **1. Scala 语言基础** Scala 是一种多范式编程语言,融合了...

    利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka

    标题中的“利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka”是一项数据集成任务,涉及Apache Flume、MySQL数据库、Hadoop Distributed File System (HDFS) 和Apache Kafka这四个关键技术。Flume是Apache的一...

    基于scala语言的spark操作,包含连接操作mysql,连接hdfs+源代码+文档说明

    - 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! &lt;项目介绍&gt; 1、该资源内项目代码都经过测试运行成功,...

    Mysql到hdfs增量同步实验手册.pdf

    - 编写JSON配置文件`mysql_to_hdfs.json`,配置DataX的reader和writer。reader部分设置MySQL连接信息和基于时间戳的查询SQL,用于筛选出需要同步的增量数据。writer部分设置HDFS的存储路径、文件类型、列信息以及...

    hadoop搭建 zookeeper_hbase_hive_sqoop_mysql_spark_hdfs.doc

    在构建一个完整的Hadoop生态系统时,我们需要搭建多个组件,包括Zookeeper、HBase、Hive、MySQL、Kafka以及Spark,并且它们都要运行在HDFS之上。下面将详细介绍这些组件的安装与配置过程。 1. **Zookeeper**: ...

    基于Sqoop+Hive+Spark+MySQL+AirFlow+Grafana的工业大数据离线数仓项目

    在大数据项目中,AirFlow能确保数据从收集到分析的整个流程自动化、有条不紊地运行,例如设置定时任务从MySQL抽取数据,用Sqoop导入HDFS,然后启动Hive和Spark作业进行分析。 最后,Grafana是一个强大的可视化工具...

    sparksql连接mysql,hive

    这里未提供具体安装步骤,但需要注意的是,为了使SparkSQL能够连接到MySQL,还需要添加MySQL JDBC驱动到Spark的类路径中。 6. **SparkSQL连接MySQL和Hive**: - 要使用SparkSQL连接MySQL,需要在Spark应用中引入...

    spark安装包+spark实验安装软件

    Spark可以与Hadoop生态系统集成,但本压缩包不包含Hadoop依赖,这意味着你需要自己提供Hadoop配置,包括HDFS的配置信息,以便Spark能读写Hadoop的数据。 **4. Spark的运行原理** Spark基于弹性分布式数据集...

    人工智能-hadoop-基于hdfs spark的视频非结构化数据计算

    本项目是基于HDFS来存储视频数据,利用Spark来对其进行机器视觉算法分析。我希望能够将其他非结构化数据处理也加入其中,有兴趣的希望加入我。 本项目分为四个模块: algorithm:算法部分,将c++版本的opencv算法...

    本项目为大数据基础镜像组件,其中包括Hadoop、Spark、Hive、Tez、Hue、Flink、MySQL等

    Spark支持批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)和机器学习(通过MLlib库)等多种数据处理模式。 3. **Hive**:Hive是基于Hadoop的数据仓库工具,可将结构化的数据文件映射为...

    win10下搭建Hadoop环境(jdk+mysql+hadoop+scala+hive+spark) 3.docx

    在Windows 10环境下搭建Hadoop生态系统,包括JDK、MySQL、Hadoop、Scala、Hive和Spark等组件,是一项繁琐但重要的任务,这将为你提供一个基础的大数据处理平台。下面将详细介绍每个组件的安装与配置过程。 **1. JDK...

    spark考试练习题含答案.rar

    1. **DataFrame API**:提供了SQL-like接口进行数据处理,支持多种数据源,如HDFS、Cassandra、Hive等。 2. **DataFrame Join操作**:用于将两个或多个DataFrame合并,支持不同类型的join(inner join, outer join,...

    基于spark的电影点评系统

    1. 数据读取模块:负责从数据源(如HDFS、数据库或本地文件)加载用户评价数据。 2. 数据预处理模块:清洗数据,处理缺失值,进行特征工程。 3. 分析与建模模块:进行用户行为分析,构建推荐模型。 4. 结果展示模块...

    Spark与Mysql的交互

     Spark在对目标数据进行计算后,RDD格式的数据一般都会存在HDFS,Hive,HBase中,另一方面,对于非RDD格式的数据,可能会存放在像Mysql中这种传统的RDMS中.  但是写入过程中经常出现各种各样的问题, ...

    Spark-Streaming:Spark Streaming实时解析flume和kafka传来的josn数据写入mysql

    Spark Streaming实时解析flume和kafka传来的josn数据写入mysql 注意,以下文件不提供 配置c3p0-config.xml链接,链接数据库 配置log4j.properties、my.properties 另,还需将您的spark和hadoop安装文件下的core-site...

    docker 部署spark集群配置文件

    描述: 这个资源包含了一个基本的Spark集群配置,包括Hadoop、Hive、MySQL和Spark的配置文件。 文件清单: Dockerfile build.sh build_network.sh -yarn-site.xml -stop_containers.sh -start-hadoop.sh -start_...

    Hadoop+Spark+Hive+HBase+Oozie+Kafka+Flume+Flink+ES+Redash等详细安装部署

    在集群中部署Redash,需要配置数据源连接,如Hive、MySQL或Elasticsearch。 综上所述,搭建这样一个大数据集群需要深入了解每个组件的特性和配置要求,同时还需要具备一定的网络和系统管理知识。过程中可能遇到的...

    基于Spark的零售交易数据分析

    该项目是大三下学期的课程设计,选取了共541909条数据,以Python为编程语言,使用大数据框架Spark对数据进行了预处理,然后分别从多个方面对数据进行了分类和分析,并对分析结果进行可视化。里面包含我的课程设计...

    清华大学精品大数据实战课程(Hadoop、Hbase、Hive、Spark)PPT课件含习题(29页) 第6章 Spark SQL.pptx

    配置MySQL元数据库需要创建数据库用户,下载并配置MySQL驱动,最后修改Spark和Hive的相关配置文件。 在实际使用中,Spark SQL可以通过以下方式交互: - **Spark SQL CLI**:用户可以直接启动Spark SQL命令行界面...

    基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时).zip

    - **Spark SQL**:用于离线分析,可以方便地执行SQL查询,对接多种数据源,包括HDFS、Hive、Cassandra等,帮助我们从海量日志中提取关键信息,例如用户购买行为、热门商品等。 - **Spark Streaming**:处理实时...

Global site tag (gtag.js) - Google Analytics