`
bit1129
  • 浏览: 1069726 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十五】Spark Streaming分析结果落地到MySQL

 
阅读更多

几点总结:

1. DStream.foreachRDD是一个Output Operation,类似于RDD的action,会触发Job的提交。DStream.foreachRDD是数据落地很常用的方法

2. 获取MySQL Connection的操作应该放在foreachRDD的参数(是一个RDD[T]=>Unit的函数类型),这样,当foreachRDD方法在每个Worker上执行时,连接是在Worker上创建。如果Connection的获取放到dstream.foreachRDD之前,那么

Connection的获取动作将发生在Driver端,然后通过序列化的方式发送到各个Worker(Connection的序列化通常是无法正确序列化的)

3. Connection的获取在foreachRDD的参数中获取,同时还要在遍历RDD之前获取(调用RDD的foreach方法前获取),如果遍历中获取,那么RDD中的每个record都要打开关闭连接,这对于数据库连接资源将是极大的考验

4. 业务逻辑处理定义在func中,它是在foreachRDD的方法参数体中定义的,如果把func的定义放到外面,即Driver中,貌似也是可以的,Spark会对计算方法通过Broadcast进行广播到各个计算节点。

 

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?

object SparkStreamingForPartition {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //This dstream object represents the stream of data that will be received from the data
    //server. Each record in this DStream is a line of text
    //The DStream is a collection of RDD, which makes the method foreachRDD reasonable
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    dstream.foreachRDD(rdd => {
      //embedded function
      def func(records: Iterator[String]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://192.168.26.140:3306/person";
          val user = "root";
          val password = ""
          conn = DriverManager.getConnection(url, user, password)
          records.flatMap(_.split(" ")).foreach(word => {
            val sql = "insert into TBL_WORDS(word) values (?)";
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, word)
            stmt.executeUpdate();
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

分享到:
评论

相关推荐

    SparkStreaming流式日志过滤与分析

    (1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...

    基于spark的外卖大数据平台分析系统.zip

    本文将深入探讨如何利用Spark构建一个基于外卖数据的分析系统,以及在这个过程中涉及到的关键技术和应用。 首先,我们需要理解Spark的核心特性。Spark提供了一种内存计算模型,它将数据存储在内存中,从而极大地...

    Spark Streaming 流式日志过滤的实验资源

    - **数据库连接**:Spark Streaming 可以直接与多种数据库系统集成,如 HBase、Cassandra 或 MySQL,将处理结果实时写入。 - **数据清洗**:在写入数据库前,可能需要对数据进行清洗,去除无用信息,确保存储的...

    基于spark的电影点评系统

    而Spark SQL则可以方便地将结构化数据集成到Spark中,使我们能够方便地对电影评论进行查询和分析。例如,我们可以使用Spark SQL来统计最受欢迎的电影、用户评分分布等。 在用户行为分析部分,项目可能采用了Spark ...

    Spark-Streaming+Kafka+mysql实战示例

    通过该示例,您将了解到如何使用Spark Streaming和Kafka处理实时数据流,以及如何将处理后的数据保存到MySQL数据库中。示例涵盖了从环境搭建到代码实现的全过程,帮助您快速上手实时数据处理的开发。提供了一个完整...

    基于Spark的零售交易数据分析

    该项目是大三下学期的课程设计,选取了共541909条数据,以Python为编程语言,使用大数据框架Spark对数据进行了预处理,然后分别从多个方面对数据进行了分类和分析,并对分析结果进行可视化。里面包含我的课程设计...

    Spark-Streaming:Spark Streaming实时解析flume和kafka传来的josn数据写入mysql

    Spark Streaming实时解析flume和kafka传来的josn数据写入mysql 注意,以下文件不提供 配置c3p0-config.xml链接,链接数据库 配置log4j.properties、my.properties 另,还需将您的spark和hadoop安装文件下的core-site...

    基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时).zip

    对于离线分析,Spark SQL 或 DataFrames 可以用于结构化数据的处理,而 Spark Streaming 则用于处理实时数据流,它可以以微批处理的方式高效地处理数据流,提供近实时的分析结果。 - **Spark SQL**:用于离线分析...

    毕业设计:基于spark的外卖大数据平台分析系统.zip

    《基于Spark的外卖大数据平台分析系统》是一款针对计算机科学与技术专业的毕业设计项目,它主要展示了如何利用大数据处理工具Spark来构建一个对外卖业务数据进行深度分析的系统。该系统涵盖了数据库管理、数据处理和...

    spark考试练习题含答案.rar

    五、Spark性能优化 1. **Caching与Persistence**:通过缓存中间结果,减少重复计算,提高性能。 2. **Shuffle操作优化**:合理设置分区策略,减少网络传输和磁盘I/O。 3. **Executor配置**:调整executor的数量、...

    spark安装包+spark实验安装软件

    - **解压并配置环境变量**: 解压缩后,将Spark的安装路径添加到系统环境变量`SPARK_HOME`中。 - **配置JDK**: Spark运行需要Java环境,确保已安装JDK并设置好`JAVA_HOME`环境变量。 - **选择运行模式**: Spark可以...

    计算机课程毕设:基于spark及用户行为标签的日志大数据分析系统.zip

    在本项目中,Spark将被用来处理和分析日志数据,包括数据的读取、转换、清洗以及计算,这将涉及到Spark的SQL、Streaming和MLlib库,以进行数据查询、实时流处理和机器学习分析。 2. **用户行为标签**:用户行为标签...

    毕业设计:基于Spark streaming的系统日志分析系统.zip

    《基于Spark Streaming的系统日志分析系统》是一个典型的计算机科学毕业设计项目,主要涉及大数据处理、实时流计算以及系统监控等多个重要知识点。本项目利用Apache Spark Streaming技术对系统日志进行实时分析,以...

    基于Spark的电商用户分析系统-开题报告.pdf

    一是使用CDH大数据集群研究与搭建,二是使用Flume监控制定日志文件以及使用Kafka将数据转移到HDFS中,三是使用SparkStreaming实时处理平台,四是使用行为分析的查询系统平台研究。 知识点3:系统架构和技术架构 ...

    clickhouse-mysql-spark.zip

    这一过程可能涉及到数据抽取(ETL,Extract-Transform-Load)流程,通过Spark作为中间层,将MySQL的实时或定期更新的数据高效地导入到ClickHouse中。这种方式既能利用MySQL的事务处理能力,又能发挥ClickHouse的分析...

    【SparkStreaming】之图书评分数据实时分析系统

    【SparkStreaming】之图书评分数据实时分析系统是一个综合性的大数据处理和实时分析项目,它结合了多个技术组件,包括Hadoop、Kafka、SparkStreaming、SpringBoot和Mysql,以及Echarts用于数据可视化。这个系统的...

    毕业设计基于Spark网易云音乐数据分析.zip

    标题 "毕业设计基于Spark网易云音乐数据分析.zip" 暗示了一个使用Apache Spark进行的大规模数据处理项目,其目标是对网易云音乐的数据进行分析。这个毕业设计可能涉及到多个技术领域,包括大数据处理、数据可视化...

    基于Spark的高校数据分析系统

    基于Spark的高校数据分析系统 。同时实现了Spark-core(被注释了);Spark-ML,Spark-streaming。 spark-streaming虽然过时很久了,但是对于我学习来说还是够了。 streaming存在很多的弊端,但是主要思想还是处理流式...

    spark生态系统的学习

    5. Spark Streaming:Spark Streaming是Spark生态系统中的流处理引擎,提供了对流数据的处理和分析功能。 Spark的优点包括: 1. 高性能:Spark可以快速处理大规模数据,提供了高性能的计算能力。 2. 高可扩展性:...

Global site tag (gtag.js) - Google Analytics