`
bit1129
  • 浏览: 1069851 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【spark八十七】给定Driver Program, 如何判断哪些代码在Driver运行,哪些代码在Worker上执行

 
阅读更多

Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分

  • 作为驱动: Driver与Master、Worker协作完成application进程的启动、DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等。
  • 计算逻辑本身,当计算任务在Worker执行时,执行计算逻辑完成application的计算任务

接下来的问题是,给定一个driver programming,哪些作为"驱动代码"在Driver进程中执行,哪些"任务逻辑代码”被包装到任务中,然后分发到计算节点进行计算?

 

1. 基本Spark driver application

package spark.examples.databricks.reference.apps.loganalysis

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._


object LogAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Log Analyzer in Scala").setMaster("local[3]")
    val sc = new SparkContext(sparkConf)


    //logFile path is provided by the program args



    val logFile = if (args != null && args.length == 1) args(0) else "E:\\softwareInstalled\\Apache2.2\\logs\\access.log"

    //transform each line of the logFile into ApacheAccessLog object,
    //RDD[T],T is of type ApacheAccessLog
    //Because it will be used more than more, so cache it.
    ////从数据源中读取文本文件内容,每一行转换为ApacheAccessLog,然后进行cache
    val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache()

    // Calculate statistics based on the content size.
    //Retrieve the contentSize column and cache it
    val contentSizes = accessLogs.map(log => log.contentSize).cache()
    ///reduce是个action,count是个action
    println("Content Size Avg: %s, Min: %s, Max: %s".format(
      contentSizes.reduce(_ + _) / contentSizes.count,
      contentSizes.min,
      contentSizes.max))

    // Compute Response Code to Count.
    //Take first 100 responseCode, no sort here
    //take操作是个action
    val responseCodeToCount = accessLogs
      .map(log => (log.responseCode, 1))
      .reduceByKey(_ + _)
      .take(100)
    println( s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""")

    // Any IPAddress that has accessed the server more than 10 times.
    //take操作是个action
    val ipAddresses = accessLogs
      .map(log => (log.ipAddress, 1))
      .reduceByKey(_ + _)
      .filter(_._2 > 10) //Get the ipAddress that accessed the server 10+ times
      .map(_._1) //Map to the IP Address column
      .take(100)
    println( s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""")

    // Top Endpoints.
    //top操作是个action
    val topEndpoints = accessLogs
      .map(log => (log.endpoint, 1))
      .reduceByKey(_ + _)
      .top(10)(OrderingUtils.SecondValueOrdering)
    println( s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""")

    sc.stop()
  }
}

 

Spark application首先在Driver上开始运行main函数,执行过程中。 计算逻辑的开始是从读取数据源(比如HDFS中的文件)创建RDD开始,RDD分为transform和action两种操作,transform使用的是懒执行,而action操作将会触发Job的提交。

Job的提交以为着DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等,这是真正的将任务分发的开始。因此,从代码中首先要识别出哪些action。

另外一个问题, 任务执行过程中执行的逻辑代码如何识别?

Application在Driver上执行,遇到RDD的action动作后,开始提交作业,当作业执行完成后,后面的作业陆续提交,也就是说,虽然一个Spark Application可以有多个Job(每个action对应一个Job),这些Job是顺序执行的,Job(X)执行完成才会执行Job(X+1)。

 

遇到Job执行,比如上面的contentSize.reduce(_+_),那么所谓的计算逻辑就是函数 _+_,这是个求和操作。具体的任务逻辑执行时,还是从action回溯RDD,中间经过转换得到最终的这个Task所属的Partition的数据,然后执行reduce(_+_)

 

2. Spark Stream程序

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?


object SparkStreamingForPartition {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    //foreachRDD是DStream的动作函数,会触发Job执行,然后对一个时间间隔内创建的RDD进行处理。如果RDD执行RDD的动作函数,是否继续触发Job执行?
    dstream.foreachRDD(rdd => {
      //embedded function
      def func(records: Iterator[String]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://192.168.26.140:3306/person";
          val user = "root";
          val password = ""
          conn = DriverManager.getConnection(url, user, password)
          records.flatMap(_.split(" ")).foreach(word => {
            val sql = "insert into TBL_WORDS(word) values (?)";
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, word)
            stmt.executeUpdate();
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
      ///对RDD进行重新分区,以改变处理的并行度
      val repartitionedRDD = rdd.repartition(3)
      ///对每个分区调用func函数,func函数的参数就是一个分区对应的数据的遍历器(Iterator)
      repartitionedRDD.foreachPartition(func)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

DStream的foreachRDD是个Output Operation,类似于RDD的action,因此,高阶函数foreachRDD的函数参数,将在worker上执行。这里的func是定义为foreachRDD参数函数的内部函数,因此会发送到Worker上执行,如果func定义在最外层,比如作为main函数的直接内部函数,是否可以顺利的从Driver序列化到Worker上呢?我认为是可以的。函数在Scala中只是一个普通的对象,没有状态,序列化反序列化时需要创建MySQL的Connection。

 

3. foreachRDD中的rdd继续执行action算子

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object SparkStreamingForPartition2 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    dstream.foreachRDD(rdd => {
      //对于空RDD调用flatMap报错,这里进行判断
      if (!rdd.isEmpty()) {
        ///RDD的Record个数
        val recordCount = rdd.count()
        ///RDD中的单词数
        val wordCount = rdd.flatMap(_.split(" ")).map(word => 1).reduce(_ + _)
        println("recordCount: =" + recordCount + "," + "wordCount:=" + wordCount)
      } else {
        println("Empty RDD, No Data")
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 需要注意的是,对于没有任何元素的RDD(RDD.isEmpty为true),那么不能执行转换算子如flatMap等,这在Spark Streaming中读取空的RDD是很常见的情况?但是这个空检查是在RDD的reduce函数中执行的

 

  def reduce(f: (T, T) => T): T = {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

 

如下操作对于空RDD是可以的,返回0

  rdd.flatMap(_.split(" ")).map(_ => 1).count()

 

 

 

分享到:
评论

相关推荐

    Apache Spark源码走读之2 -- Job的提交与运行

    在SparkShell中,用户提交的代码会首先被解析为Driver Program,然后利用SparkContext实例来创建和执行各个任务。 Spark作业的提交与运行过程大致可以分为以下几个步骤: 1. 创建SparkContext实例:用户在Spark...

    【Spark内核篇02】Spark模式运行机制1

    1. 在Client模式下,SparkSubmit直接启动Driver线程执行用户代码,而不需要通过AM。 2. Driver创建ScheduleBackend与YarnClientSchedulerBackend交互,向RM申请ExecutorLauncher。 3. ExecutorLauncher启动...

    在WebWorker中运行一个模块

    这个“在Web Worker中运行一个模块”的主题,主要是关于如何利用JavaScript的特性将模块化代码适配到Web Worker的上下文中。 首先,我们要理解JavaScript的模块系统。在ES6中,引入了`import`和`export`关键字,...

    spark1.2.1常用模式部署运行

    单机模式下,Spark 只使用单台机器的资源,而伪分布式模式下,Spark 可以模拟分布式环境,但所有的计算都是在单台机器上完成的。 Starting a Cluster 在 standalone 模式下,需要手动启动 master 和 slave 节点。...

    Go-使用v8worker2在Golang上构建的ReasonML运行时

    在本文中,我们将深入探讨如何在Golang环境中利用v8worker2库构建一个ReasonML运行时。ReasonML是一种静态类型的函数式编程语言,它在JavaScript语法的基础上进行了优化,旨在提供更好的类型安全性和编译时检查。Go...

    Spark应用程序WebUI详解.docx

    - **Driver Program**:Driver Program 是 Spark 应用程序的控制中心,负责调度任务、处理数据等操作。 - **Stages**:Stages 是 Spark 执行计划的基本单元,由一系列连续的 Tasks 组成。 #### 六、总结 Spark ...

    spark overview

    如果在集群模式下运行,Driver程序可以运行在Master节点上,或者根据集群管理器的调度,运行在任何Worker节点上,比如在YARN集群中的 WorkerNode2。 在本地模式下,用户可以使用本地的IDE(例如Eclipse)运行Driver...

    Spark核心技术原理透视一Spark运行原理.pdf

    1. Application:Spark应用程序,指的是用户编写的Spark应用程序,包含了Driver功能代码和分布在集群中多个节点上的Executor代码。 2. Driver:驱动程序,Spark中的Driver即运行上述Application的Main()函数,并且...

    Spark运行架构和解析

    综上所述,Spark的运行架构设计精妙,通过高效的调度机制和灵活的Task类型,能够在不同的集群环境中实现高性能的数据处理。理解这些核心概念和技术细节对于有效利用Spark进行大规模数据处理至关重要。

    Spark简介以及其生态圈

    在术语定义中,Application指的是用户编写的Spark应用程序,它由运行main()函数的Driver和分布在集群中多个节点上运行的Executor组成。Driver创建SparkContext以准备运行环境,SparkContext负责与ClusterManager通信...

    Apache Spark常见面试题

    总结来说,在Spark中,Driver端主要负责协调工作、管理资源和处理高级逻辑,而Worker端则专注于执行具体的计算任务。 #### 二、Spark的部署方式详解 Apache Spark支持多种部署方式,包括local模式、standalone模式...

    spark_code_basic

    Driver Program负责创建SparkContext,SparkContext与集群管理器交互,启动Executor,Executor在Worker Nodes上运行任务。 3. **Spark编程模型** Spark提供了Scala、Java、Python和R等多种语言API,其中Scala API...

    Spark-内核源码解析.docx

    在 Spark 中,Application 指的是用户编写的 Spark 应用程序/代码,包含了 Driver 功能代码和分布在集群中多个节点上运行的 Executor 代码。一个 Spark 应用程序由一个或多个作业 JOB 组成,Driver 负责和 Cluster...

    cloudera-spark 官方文档

    - **Driver Program**:驱动程序是Spark应用的主要部分,它负责创建`SparkContext`(或`SparkSession`),定义转换和操作,以及触发执行。 - **Executor Processes**:执行器进程是由Worker Node启动的,用于执行...

    Patrick Wendell:Administering Spark

    在核心组件方面,Spark应用程序包括驱动程序(Driver program)和执行器(Executors)。驱动程序是一个Java程序,它通过创建SparkContext来启动应用。而执行器则是执行任务和存储数据的工作进程。在集群管理器...

    Spark原理及源码剖析1

    3. Driver:执行Spark应用的main方法,实际执行代码。Driver的工作包括申请资源、注册Executor、解析作业、生成Stage并调度Task,监控Task执行,最后释放资源并注销应用程序。 4. Executor:是JVM进程,执行具体的...

    spark部署和基础代码的编写

    - **兼容性**:Spark能够与多种数据源集成,如HDFS、Cassandra、HBase等,并且可以运行在多个集群管理器之上,如YARN、Mesos和Kubernetes。 3. **Spark集群安装** - **安装**: - **机器部署**:为了搭建Spark...

    Spark 学习之路,包含 Spark Core,Spark SQL,Spark Streaming,Spark mllib 学

    Spark 学习之路,包含 Spark Core,Spark SQL,Spark Streaming,Spark mllib 学习笔记 * [spark core学习笔记及代码 * [spark sql学习笔记及代码 * [spark streaming学习笔记及代码 Spark 消息通信 ### Spark ...

    计算引擎-spark (1)1

    Driver Program是用户代码运行的地方,它创建SparkContext来启动应用,而Executor是在集群中运行任务的进程,它们负责缓存数据和执行任务。 5. **DAGScheduler和TaskScheduler** DAGScheduler将用户提交的作业...

    spark-集群与大数据处理

    Spark可以在Hadoop的YARN集群管理器上运行,或者独立运行在支持HDFS的集群上。这样的互操作性让Spark可以利用Hadoop的分布式存储以及集群资源管理能力,同时通过Spark的高效执行引擎提升了整体性能。 在版本迭代的...

Global site tag (gtag.js) - Google Analytics