package spark.examples.db import java.sql.{PreparedStatement, Connection, DriverManager} import com.mysql.jdbc.Driver import org.apache.spark.{SparkContext, SparkConf} object SparkMySQLIntegration { case class Person(name: String, age: Int) def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkRDDCount").setMaster("local"); val sc = new SparkContext(conf); val data = sc.parallelize(List(("Tom", 31), ("Jack", 22), ("Mary", 25))) def func(iter: Iterator[(String, Int)]): Unit = { // Class.forName("com.mysql.jdbc.Driver ") var conn:Connection = null val d :Driver = null var pstmt:PreparedStatement = null try { val url="jdbc:mysql://localhost:3306/person"; val user="root"; val password="" //在forPartition函数内打开连接,这样连接将在worker上打开 conn = DriverManager.getConnection(url, user, password) while (iter.hasNext) { val item = iter.next() println(item._1 + "," + item._2) val sql = "insert into TBL_PERSON(name, age) values (?, ?)"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, item._1) pstmt.setInt(2, item._2) pstmt.executeUpdate(); } } catch { case e: Exception => e.printStackTrace() } finally { if (pstmt != null) { pstmt.close() } if (conn != null) { conn.close() } } } data.foreachPartition(func); } }
这个代码遇到了两个坑,
1. 按照Java程序员使用JDBC的习惯,首先通过Class.forName("com.mysql.jdbc.Driver ")注册MySQL的JDBC驱动,但是在Scala中却不需要这么做,这么做还出错,包ClassNotFoundExeception(但是com.mysql.jdbc.Driver明明在classpath上)
所以代码中添加了注释
2. 在本地运行这个代码时,反反复复报错说sql语句的(?,?)附近有语法错误,反反复复的看也没看出来哪里有错,后来发现原来是pstmt.executeUpdate();写成了pstmt.executeUpdate(sql);如此严重的编译错,Intellij Idea竟然编译不报错!!!
Spark RDD存入MySQL等存储系统最佳实践
将Spark的RDD写入数据存储系统,不管是关系型数据库如MySQL,还是NoSQL,如MongoDB,HBase,都面临着比较大的存储压力,因为每个RDD的每个partition的数据量可能非常大,因为必须节省有限的存储服务器连接,如下是一些最佳实践:
- You can write your own custom writer and call a transform on your RDD to write each element to a database of your choice, but there's a lot of ways to write something that looks like it would work, but does not work well in a distributed environment. Here are some things to watch out for:
- A common naive mistake is to open a connection on the Spark driver program, and then try to use that connection on the Spark workers. The connection should be opened on the Spark worker, such as by calling forEachPartition and opening the connection inside that function.
- Use partitioning to control the parallelism for writing to your data storage. Your data storage may not support too many concurrent connections.
- Use batching for writing out multiple objects at a time if batching is optimal for your data storage.
- Make sure your write mechanism is resilient to failures.
- Writing out a very large dataset can take a long time, which increases the chance something can go wrong - a network failure, etc.
- Consider utilizing a static pool of database connections on your Spark workers.
- If you are writing to a sharded data storage, partition your RDD to match your sharding strategy. That way each of your Spark workers only connects to one database shard, rather than each Spark worker connecting to every database shard.
- Be cautious when writing out so much data, and make sure you understand the distributed nature of Spark!
**上面提到了batch操作,batch应该是一个节省连接资源非常有效的手段,将多个更新或者插入操作组成一个batch,使用一个连接将数据传送到存储系统引擎,关注下MySQL和MongoDB的batch操作**
相关推荐
本示例将详细介绍如何使用 Spark 从 HBase 中读取数据,并通过 Spark SQL 将其存储到 MySQL 数据库中。 首先,让我们了解 Spark 与 HBase 的交互。Spark 提供了 `spark-hbase-connector` 库,允许我们方便地连接到 ...
Spark 支持通过 HiveContext(现在称为 HiveSession)连接到 Hive,这样可以使用 SQL 查询 Hive 表并将其结果转换为 Spark DataFrame。在读取 Hive 数据时,我们可以通过创建一个 HiveContext 对象,然后使用 `sql()...
基于Django2.2+MySQL+spark的在线电影推荐系统源码+说明+数据库(MySQL部分支持在线计算,spark支持离线计算).zip 基于Django2.2+MySQL+spark的在线电影推荐系统源码+说明+数据库(MySQL部分支持在线计算,spark...
综合来看,这个项目涵盖了从数据存储(MySQL)、服务端开发(Spring)、数据访问(MyBatis)到大数据处理(Spark)的完整流程,对于想深入理解并实践大数据处理与微服务架构的开发者来说,是一个非常有价值的参考...
spark连接mysql核心代码 java实现方式======================================================================
在本项目中,我们主要探讨的是如何利用Apache Spark 2.2版本的计算能力,结合SparkSQL和MySQL数据库,来实现一个高效的诗歌浏览和自动集句系统。这个系统的实现源码包含在一个名为"ScalaPoet2"的压缩包中,这暗示了...
- **解压并配置环境变量**: 解压缩后,将Spark的安装路径添加到系统环境变量`SPARK_HOME`中。 - **配置JDK**: Spark运行需要Java环境,确保已安装JDK并设置好`JAVA_HOME`环境变量。 - **选择运行模式**: Spark可以...
这一过程可能涉及到数据抽取(ETL,Extract-Transform-Load)流程,通过Spark作为中间层,将MySQL的实时或定期更新的数据高效地导入到ClickHouse中。这种方式既能利用MySQL的事务处理能力,又能发挥ClickHouse的分析...
它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现了一套扩展的,为TiDB定制的SQL前端(Parser,Planner和优化器):它了解TiDB...
本项目涉及到了使用Spark处理IP地址并统计它们的省份归属,然后将结果存入MySQL数据库,这是一个典型的批处理流程,涵盖了数据处理、地理位置解析以及数据持久化等重要环节。 首先,我们需要理解`ip.txt`文件。这个...
Spark的计算过程主要包括从数据源读取数据,进行数据分区,计算中间结果,并将最终结果写回数据源。 Spark开发相关的知识点包括: 1. Spark核心架构:Spark Core是Spark生态系统的核心组件,负责处理数据的读取和...
六、Spark的其他高级特性 1. **Spark MLlib**:提供机器学习库,包含多种算法,如分类、回归、聚类等。 2. **Spark GraphX**:用于处理图形数据,支持图算法。 3. **Spark Structured Streaming**:新一代的流...
Spark作为一款强大的分布式计算框架,其高效的数据处理能力在大数据领域广泛应用。它支持多种编程语言,其中Java接口是常用的一种。Java以其稳定性和跨平台性深受开发者的喜爱,结合Spark,可以构建大规模的数据处理...
总结起来,这个工业大数据离线数仓项目通过 Sqoop 实现数据的高效导入,Hive 构建数据仓库并进行数据处理,Spark 提供快速计算能力,MySQL 存储关键业务数据,AirFlow 管理任务流程,而 Grafana 负责数据可视化,...
- 导入正确的数据库驱动:比如对于MySQL数据库,需要导入mysql-connector-java驱动,并在spark-env.sh中设置SPARK_CLASSPATH,同时在任务提交时加入该驱动的路径,确保Spark能正确加载JDBC驱动。 - 合理配置分区参数...
而Spark SQL则可以方便地将结构化数据集成到Spark中,使我们能够方便地对电影评论进行查询和分析。例如,我们可以使用Spark SQL来统计最受欢迎的电影、用户评分分布等。 在用户行为分析部分,项目可能采用了Spark ...
现有的大数据平台Hadoop、Spark等都在处理文本数据方面具有很好的支持,并且效率也经过了各种优化,所以在利用分布式框架来处理日志类数据,工作...sql:数据库相关操作,将算法的结果插入到数据库中(mysql或mongdb)
本教程将探讨如何使用 Scala 语言来操作 Spark,并介绍如何与 MySQL 数据库和 HDFS(Hadoop 分布式文件系统)进行交互。以下是相关知识点的详细说明: **1. Scala 语言基础** Scala 是一种多范式编程语言,融合了...