`
bit1129
  • 浏览: 1075679 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【spark八十七】给定Driver Program, 如何判断哪些代码在Driver运行,哪些代码在Worker上执行

 
阅读更多

Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分

  • 作为驱动: Driver与Master、Worker协作完成application进程的启动、DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等。
  • 计算逻辑本身,当计算任务在Worker执行时,执行计算逻辑完成application的计算任务

接下来的问题是,给定一个driver programming,哪些作为"驱动代码"在Driver进程中执行,哪些"任务逻辑代码”被包装到任务中,然后分发到计算节点进行计算?

 

1. 基本Spark driver application

package spark.examples.databricks.reference.apps.loganalysis

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._


object LogAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Log Analyzer in Scala").setMaster("local[3]")
    val sc = new SparkContext(sparkConf)


    //logFile path is provided by the program args



    val logFile = if (args != null && args.length == 1) args(0) else "E:\\softwareInstalled\\Apache2.2\\logs\\access.log"

    //transform each line of the logFile into ApacheAccessLog object,
    //RDD[T],T is of type ApacheAccessLog
    //Because it will be used more than more, so cache it.
    ////从数据源中读取文本文件内容,每一行转换为ApacheAccessLog,然后进行cache
    val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache()

    // Calculate statistics based on the content size.
    //Retrieve the contentSize column and cache it
    val contentSizes = accessLogs.map(log => log.contentSize).cache()
    ///reduce是个action,count是个action
    println("Content Size Avg: %s, Min: %s, Max: %s".format(
      contentSizes.reduce(_ + _) / contentSizes.count,
      contentSizes.min,
      contentSizes.max))

    // Compute Response Code to Count.
    //Take first 100 responseCode, no sort here
    //take操作是个action
    val responseCodeToCount = accessLogs
      .map(log => (log.responseCode, 1))
      .reduceByKey(_ + _)
      .take(100)
    println( s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""")

    // Any IPAddress that has accessed the server more than 10 times.
    //take操作是个action
    val ipAddresses = accessLogs
      .map(log => (log.ipAddress, 1))
      .reduceByKey(_ + _)
      .filter(_._2 > 10) //Get the ipAddress that accessed the server 10+ times
      .map(_._1) //Map to the IP Address column
      .take(100)
    println( s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""")

    // Top Endpoints.
    //top操作是个action
    val topEndpoints = accessLogs
      .map(log => (log.endpoint, 1))
      .reduceByKey(_ + _)
      .top(10)(OrderingUtils.SecondValueOrdering)
    println( s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""")

    sc.stop()
  }
}

 

Spark application首先在Driver上开始运行main函数,执行过程中。 计算逻辑的开始是从读取数据源(比如HDFS中的文件)创建RDD开始,RDD分为transform和action两种操作,transform使用的是懒执行,而action操作将会触发Job的提交。

Job的提交以为着DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等,这是真正的将任务分发的开始。因此,从代码中首先要识别出哪些action。

另外一个问题, 任务执行过程中执行的逻辑代码如何识别?

Application在Driver上执行,遇到RDD的action动作后,开始提交作业,当作业执行完成后,后面的作业陆续提交,也就是说,虽然一个Spark Application可以有多个Job(每个action对应一个Job),这些Job是顺序执行的,Job(X)执行完成才会执行Job(X+1)。

 

遇到Job执行,比如上面的contentSize.reduce(_+_),那么所谓的计算逻辑就是函数 _+_,这是个求和操作。具体的任务逻辑执行时,还是从action回溯RDD,中间经过转换得到最终的这个Task所属的Partition的数据,然后执行reduce(_+_)

 

2. Spark Stream程序

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?


object SparkStreamingForPartition {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    //foreachRDD是DStream的动作函数,会触发Job执行,然后对一个时间间隔内创建的RDD进行处理。如果RDD执行RDD的动作函数,是否继续触发Job执行?
    dstream.foreachRDD(rdd => {
      //embedded function
      def func(records: Iterator[String]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://192.168.26.140:3306/person";
          val user = "root";
          val password = ""
          conn = DriverManager.getConnection(url, user, password)
          records.flatMap(_.split(" ")).foreach(word => {
            val sql = "insert into TBL_WORDS(word) values (?)";
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, word)
            stmt.executeUpdate();
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
      ///对RDD进行重新分区,以改变处理的并行度
      val repartitionedRDD = rdd.repartition(3)
      ///对每个分区调用func函数,func函数的参数就是一个分区对应的数据的遍历器(Iterator)
      repartitionedRDD.foreachPartition(func)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

DStream的foreachRDD是个Output Operation,类似于RDD的action,因此,高阶函数foreachRDD的函数参数,将在worker上执行。这里的func是定义为foreachRDD参数函数的内部函数,因此会发送到Worker上执行,如果func定义在最外层,比如作为main函数的直接内部函数,是否可以顺利的从Driver序列化到Worker上呢?我认为是可以的。函数在Scala中只是一个普通的对象,没有状态,序列化反序列化时需要创建MySQL的Connection。

 

3. foreachRDD中的rdd继续执行action算子

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object SparkStreamingForPartition2 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    dstream.foreachRDD(rdd => {
      //对于空RDD调用flatMap报错,这里进行判断
      if (!rdd.isEmpty()) {
        ///RDD的Record个数
        val recordCount = rdd.count()
        ///RDD中的单词数
        val wordCount = rdd.flatMap(_.split(" ")).map(word => 1).reduce(_ + _)
        println("recordCount: =" + recordCount + "," + "wordCount:=" + wordCount)
      } else {
        println("Empty RDD, No Data")
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 需要注意的是,对于没有任何元素的RDD(RDD.isEmpty为true),那么不能执行转换算子如flatMap等,这在Spark Streaming中读取空的RDD是很常见的情况?但是这个空检查是在RDD的reduce函数中执行的

 

  def reduce(f: (T, T) => T): T = {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

 

如下操作对于空RDD是可以的,返回0

  rdd.flatMap(_.split(" ")).map(_ => 1).count()

 

 

 

分享到:
评论

相关推荐

    Spark快速数据处理 PDF电子书下载

    Spark的基本架构包括驱动程序(Driver Program)、集群管理器(Cluster Manager)和工作节点(Worker Node)。驱动程序负责运行应用的main()函数,并创建SparkContext对象。集群管理器负责资源分配,如Standalone、...

    Spark学习笔记

    - **任务调度**:Driver Program负责任务的调度,将任务分配给合适的Worker节点。 - **计算结果管理**:如果任务产生的结果过大,则不回收结果,以避免内存溢出等问题。 #### Spark代码流程 - **初始化**:通过创建...

    光环国际spark大数据&机器学习PPT

    其处理模式包括TRANSFORMATION(延迟执行)和ACTIONS,它通过DRIVER PROGRAM和SPARK CONTEXT进行工作,并通过Cluster Manager协调本地线程或Worker的Spark Executor进行数据处理。 机器学习库Spark MLlib是Spark的一...

    计算机二级公共基础知识模 拟试题及答案详解.pdf

    计算机二级公共基础知识模 拟试题及答案详解.pdf

    电子工程领域的语音发射机电路设计与实现

    内容概要:本文档详细介绍了语音发射机的设计与实现,涵盖了从硬件电路到具体元件的选择和连接方式。文档提供了详细的电路图,包括电源管理、信号处理、音频输入输出接口以及射频模块等关键部分。此外,还展示了各个引脚的功能定义及其与其他组件的连接关系,确保了系统的稳定性和高效性能。通过这份文档,读者可以全面了解语音发射机的工作原理和技术细节。 适合人群:对电子工程感兴趣的初学者、从事嵌入式系统开发的技术人员以及需要深入了解语音发射机制的专业人士。 使用场景及目标:适用于希望构建自己的语音发射设备的研究人员或爱好者,帮助他们掌握相关技术和实际操作技能。同时,也为教学机构提供了一个很好的案例研究材料。 其他说明:文档不仅限于理论讲解,还包括具体的实施步骤,使读者能够动手实践并验证所学知识。

    易语言注册机源码详解:单线程架构下的接码、滑块验证与IP代理实现

    内容概要:本文详细介绍了用易语言编写的单线程全功能注册机源码,涵盖了接码平台对接、滑块验证处理、IP代理管理以及料子导入等多个核心功能。文章首先展示了主框架的初始化配置和事件驱动逻辑,随后深入探讨了接码平台(如打码兔)的API调用及其返回数据的处理方法。对于滑块验证部分,作者分享了如何利用易语言的绘图功能模拟真实用户的操作轨迹,并提高了验证通过率。IP代理模块则实现了智能切换策略,确保代理的有效性和稳定性。此外,料子导入功能支持多种格式的数据解析和去重校验,防止脏数据污染。最后,文章提到了状态机设计用于控制注册流程的状态持久化。 适合人群:有一定编程基础,尤其是熟悉易语言的开发者和技术爱好者。 使用场景及目标:适用于希望深入了解易语言注册机开发的技术细节,掌握接码、滑块验证、IP代理等关键技术的应用场景。目标是帮助读者理解并优化现有注册机的功能,提高其稳定性和效率。 其他说明:文中提到的部分技术和实现方式可能存在一定的风险,请谨慎使用。同时,建议读者在合法合规的前提下进行相关开发和测试。

    计算机绘图实用教程 第三章.pdf

    计算机绘图实用教程 第三章.pdf

    计算机辅助设计—AutoCAD 2018中文版基础教程 各章CAD图纸及相关说明汇总.pdf

    计算机辅助设计—AutoCAD 2018中文版基础教程 各章CAD图纸及相关说明汇总.pdf

    计算机类电子书集合PDF

    C++相关书籍,计算机相关书籍,linux相关及http等计算机学习、面试书籍。

    计算机二级mysql数据库程序设计练习题(一).pdf

    计算机二级mysql数据库程序设计练习题(一).pdf

    计算机发展史.pdf

    计算机发展史.pdf

    计算机二级课件.pdf

    计算机二级课件.pdf

    计算机概论第三讲:计算机组成.pdf

    计算机概论第三讲:计算机组成.pdf

    端侧算力网络白皮书:6G时代终端算力资源高效利用与应用场景解析

    内容概要:本文档由中国移动通信集团终端有限公司、北京邮电大学、中国信息通信研究院和中国通信学会共同发布,旨在探讨端侧算力网络(TCAN)的概念、架构、关键技术及其应用场景。文中详细分析了终端的发展现状、基本特征和发展趋势,阐述了端侧算力网络的定义、体系架构、功能架构及其主要特征。端侧算力网络通过整合海量泛在异构终端的算力资源,实现分布式多级端侧算力资源的高效利用,提升网络整体资源利用率和服务质量。关键技术涵盖层次化端算力感知图模型、资源虚拟化、数据压缩、多粒度多层次算力调度、现场级AI推理和算力定价机制。此外,还探讨了端侧算力网络在智能家居、智能医疗、车联网、智慧教育和智慧农业等领域的潜在应用场景。 适合人群:从事通信网络、物联网、边缘计算等领域研究和开发的专业人士,以及对6G网络和端侧算力网络感兴趣的学者和从业者。 使用场景及目标:适用于希望深入了解端侧算力网络技术原理、架构设计和应用场景的读者。目标是帮助读者掌握端侧算力网络的核心技术,理解其在不同行业的应用潜力,推动端侧算力网络技术的商业化和产业化。 其他说明:本文档不仅提供了端侧算力网络的技术细节,还对其隐私与安全进行了深入探讨

    学习java的心得体会.docx

    学习java的心得体会.docx

    计算机二级考试(南开100题齐全).pdf

    计算机二级考试(南开100题齐全).pdf

    计算机二级C语言考试通关宝典:全面解析核心知识点与解题技巧

    内容概要:本文详细介绍了计算机二级C语言考试的内容和备考方法。首先概述了计算机二级考试的意义及其在计算机技能认证中的重要性,重点讲解了C语言的基础语法,包括程序结构、数据类型、运算符和表达式等。接着深入探讨了进阶知识,如函数、数组、指针、结构体和共用体的应用。最后分享了针对选择题、填空题和编程题的具体解题技巧,强调了复习方法和实战演练的重要性。 适合人群:准备参加计算机二级C语言考试的学生和技术爱好者。 使用场景及目标:①帮助考生系统地掌握C语言的核心知识点;②提供有效的解题策略,提高应试能力;③指导考生制定合理的复习计划,增强实战经验。 其他说明:本文不仅涵盖了理论知识,还提供了大量实例代码和详细的解释,有助于读者更好地理解和应用所学内容。此外,文中提到的解题技巧和复习建议对实际编程也有很大帮助。

    论文格式及要求.doc

    论文格式及要求.doc

    三菱FX3U与台达变频器RS485通信程序设置及应用实例

    内容概要:本文详细介绍了如何使用三菱FX3U PLC及其485BD通信板与四台台达VFD-M系列变频器进行通信的设置与应用。主要内容涵盖硬件连接注意事项、通信参数配置、RS指令的应用、CRC校验算法的实现以及频率给定和状态读取的具体方法。文中提供了多个实用的编程示例,展示了如何通过梯形图和结构化文本编写通信程序,并讨论了常见的调试技巧和优化建议。此外,还提到了系统的扩展性和稳定性措施,如增加温度传感器通信功能和应对电磁干扰的方法。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些熟悉三菱PLC和台达变频器的使用者。 使用场景及目标:适用于需要实现多台变频器联动控制的工业应用场景,旨在提高生产效率和系统可靠性。通过学习本文,读者可以掌握如何构建稳定的RS485通信网络,确保变频器之间的高效协同工作。 其他说明:本文不仅提供了详细的理论指导,还包括了许多来自实际项目的经验教训,帮助读者避免常见错误并提升编程技能。

    计算机服务规范.pdf

    计算机服务规范.pdf

Global site tag (gtag.js) - Google Analytics