`
bit1129
  • 浏览: 1069784 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark七十六】Spark计算结果存到MySQL

 
阅读更多
package spark.examples.db

import java.sql.{PreparedStatement, Connection, DriverManager}

import com.mysql.jdbc.Driver
import org.apache.spark.{SparkContext, SparkConf}

object SparkMySQLIntegration {

  case class Person(name: String, age: Int)

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDCount").setMaster("local");
    val sc = new SparkContext(conf);
    val data = sc.parallelize(List(("Tom", 31), ("Jack", 22), ("Mary", 25)))
    def func(iter: Iterator[(String, Int)]): Unit = {
//      Class.forName("com.mysql.jdbc.Driver ")
      var conn:Connection = null
      val d :Driver = null
      var pstmt:PreparedStatement = null
      try {
        val url="jdbc:mysql://localhost:3306/person";
        val user="root";
        val password=""
        //在forPartition函数内打开连接,这样连接将在worker上打开
        conn = DriverManager.getConnection(url, user, password)
        while (iter.hasNext) {
          val item = iter.next()
          println(item._1 + "," + item._2)
          val sql = "insert into TBL_PERSON(name, age) values (?, ?)";
          pstmt = conn.prepareStatement(sql);
          pstmt.setString(1, item._1)
          pstmt.setInt(2, item._2)
          pstmt.executeUpdate();
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (pstmt != null) {
          pstmt.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    }
    data.foreachPartition(func);
  }

}

 

这个代码遇到了两个坑,

1. 按照Java程序员使用JDBC的习惯,首先通过Class.forName("com.mysql.jdbc.Driver ")注册MySQL的JDBC驱动,但是在Scala中却不需要这么做,这么做还出错,包ClassNotFoundExeception(但是com.mysql.jdbc.Driver明明在classpath上)

所以代码中添加了注释

2. 在本地运行这个代码时,反反复复报错说sql语句的(?,?)附近有语法错误,反反复复的看也没看出来哪里有错,后来发现原来是pstmt.executeUpdate();写成了pstmt.executeUpdate(sql);如此严重的编译错,Intellij Idea竟然编译不报错!!!

 

 

Spark RDD存入MySQL等存储系统最佳实践

将Spark的RDD写入数据存储系统,不管是关系型数据库如MySQL,还是NoSQL,如MongoDB,HBase,都面临着比较大的存储压力,因为每个RDD的每个partition的数据量可能非常大,因为必须节省有限的存储服务器连接,如下是一些最佳实践:

 

  • You can write your own custom writer and call a transform on your RDD to write each element to a database of your choice, but there's a lot of ways to write something that looks like it would work, but does not work well in a distributed environment. Here are some things to watch out for:
  • A common naive mistake is to open a connection on the Spark driver program, and then try to use that connection on the Spark workers. The connection should be opened on the Spark worker, such as by calling forEachPartition and opening the connection inside that function.
  • Use partitioning to control the parallelism for writing to your data storage. Your data storage may not support too many concurrent connections.
  • Use batching for writing out multiple objects at a time if batching is optimal for your data storage.
  • Make sure your write mechanism is resilient to failures.
  • Writing out a very large dataset can take a long time, which increases the chance something can go wrong - a network failure, etc.
  • Consider utilizing a static pool of database connections on your Spark workers.
  • If you are writing to a sharded data storage, partition your RDD to match your sharding strategy. That way each of your Spark workers only connects to one database shard, rather than each Spark worker connecting to every database shard.
  • Be cautious when writing out so much data, and make sure you understand the distributed nature of Spark!

 

**上面提到了batch操作,batch应该是一个节省连接资源非常有效的手段,将多个更新或者插入操作组成一个batch,使用一个连接将数据传送到存储系统引擎,关注下MySQL和MongoDB的batch操作**

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    spark读取hbase数据,并使用spark sql保存到mysql

    本示例将详细介绍如何使用 Spark 从 HBase 中读取数据,并通过 Spark SQL 将其存储到 MySQL 数据库中。 首先,让我们了解 Spark 与 HBase 的交互。Spark 提供了 `spark-hbase-connector` 库,允许我们方便地连接到 ...

    简单的spark 读写hive以及mysql

    Spark 支持通过 HiveContext(现在称为 HiveSession)连接到 Hive,这样可以使用 SQL 查询 Hive 表并将其结果转换为 Spark DataFrame。在读取 Hive 数据时,我们可以通过创建一个 HiveContext 对象,然后使用 `sql()...

    基于Django2.2+MySQL+spark的在线电影推荐系统源码+说明+数据库(MySQL部分支持在线计算,spark支持离线计算).zip

    基于Django2.2+MySQL+spark的在线电影推荐系统源码+说明+数据库(MySQL部分支持在线计算,spark支持离线计算).zip 基于Django2.2+MySQL+spark的在线电影推荐系统源码+说明+数据库(MySQL部分支持在线计算,spark...

    spring+mybatis+spark+mysql

    综合来看,这个项目涵盖了从数据存储(MySQL)、服务端开发(Spring)、数据访问(MyBatis)到大数据处理(Spark)的完整流程,对于想深入理解并实践大数据处理与微服务架构的开发者来说,是一个非常有价值的参考...

    spark连接mysql核心代码 java实现方式

    spark连接mysql核心代码 java实现方式======================================================================

    改进版基于Spark2.2使用SparkSQL和MySql数据库实现的诗歌浏览和自动集句工程源码

    在本项目中,我们主要探讨的是如何利用Apache Spark 2.2版本的计算能力,结合SparkSQL和MySQL数据库,来实现一个高效的诗歌浏览和自动集句系统。这个系统的实现源码包含在一个名为"ScalaPoet2"的压缩包中,这暗示了...

    spark安装包+spark实验安装软件

    - **解压并配置环境变量**: 解压缩后,将Spark的安装路径添加到系统环境变量`SPARK_HOME`中。 - **配置JDK**: Spark运行需要Java环境,确保已安装JDK并设置好`JAVA_HOME`环境变量。 - **选择运行模式**: Spark可以...

    Spark和TiDB (Spark on TiDB)

    它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现了一套扩展的,为TiDB定制的SQL前端(Parser,Planner和优化器):它了解TiDB...

    clickhouse-mysql-spark.zip

    这一过程可能涉及到数据抽取(ETL,Extract-Transform-Load)流程,通过Spark作为中间层,将MySQL的实时或定期更新的数据高效地导入到ClickHouse中。这种方式既能利用MySQL的事务处理能力,又能发挥ClickHouse的分析...

    统计ip地址的所属省份的spark程序,并将处理的结果数据存储到mysql数据库中所用的资料

    本项目涉及到了使用Spark处理IP地址并统计它们的省份归属,然后将结果存入MySQL数据库,这是一个典型的批处理流程,涵盖了数据处理、地理位置解析以及数据持久化等重要环节。 首先,我们需要理解`ip.txt`文件。这个...

    spark考试练习题含答案.rar

    六、Spark的其他高级特性 1. **Spark MLlib**:提供机器学习库,包含多种算法,如分类、回归、聚类等。 2. **Spark GraphX**:用于处理图形数据,支持图算法。 3. **Spark Structured Streaming**:新一代的流...

    spark生态系统的学习

    Spark的计算过程主要包括从数据源读取数据,进行数据分区,计算中间结果,并将最终结果写回数据源。 Spark开发相关的知识点包括: 1. Spark核心架构:Spark Core是Spark生态系统的核心组件,负责处理数据的读取和...

    图书大数spark java echarts图书大数据爬虫 mysql 包调试文档web 报告据.rar

    Spark作为一款强大的分布式计算框架,其高效的数据处理能力在大数据领域广泛应用。它支持多种编程语言,其中Java接口是常用的一种。Java以其稳定性和跨平台性深受开发者的喜爱,结合Spark,可以构建大规模的数据处理...

    基于Sqoop+Hive+Spark+MySQL+AirFlow+Grafana的工业大数据离线数仓项目

    总结起来,这个工业大数据离线数仓项目通过 Sqoop 实现数据的高效导入,Hive 构建数据仓库并进行数据处理,Spark 提供快速计算能力,MySQL 存储关键业务数据,AirFlow 管理任务流程,而 Grafana 负责数据可视化,...

    spark jdbc 读取并发优化

    - 导入正确的数据库驱动:比如对于MySQL数据库,需要导入mysql-connector-java驱动,并在spark-env.sh中设置SPARK_CLASSPATH,同时在任务提交时加入该驱动的路径,确保Spark能正确加载JDBC驱动。 - 合理配置分区参数...

    基于spark的电影点评系统

    而Spark SQL则可以方便地将结构化数据集成到Spark中,使我们能够方便地对电影评论进行查询和分析。例如,我们可以使用Spark SQL来统计最受欢迎的电影、用户评分分布等。 在用户行为分析部分,项目可能采用了Spark ...

    人工智能-hadoop-基于hdfs spark的视频非结构化数据计算

    现有的大数据平台Hadoop、Spark等都在处理文本数据方面具有很好的支持,并且效率也经过了各种优化,所以在利用分布式框架来处理日志类数据,工作...sql:数据库相关操作,将算法的结果插入到数据库中(mysql或mongdb)

    基于Django2.2+MySQL+spark实现在线电影推荐系统其中MySQL部分支持在线计算,spark支持离线计算源码+文档+全部资料+优秀项目.zip

    基于Django2.2+MySQL+spark实现在线电影推荐系统其中MySQL部分支持在线计算,spark支持离线计算源码+文档+全部资料+优秀项目.zip 【备注】 1、该项目是个人高分项目源码,已获导师指导认可通过,答辩评审分达到95分 ...

Global site tag (gtag.js) - Google Analytics