`
distantlight1
  • 浏览: 44264 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

spark源码梳理(1)-Action提交1

阅读更多

1.概述

Action算子是触发Spark计算的入口,属于Spark核心逻辑。本文梳理Action触发、计算、返回的整个数据流

 

本文涉及代码主体位于org.apache.spark.scheduler/rdd/executor几个模块。核心类:RDD、SparkContext、DAGScheduler、TaskSchedulerImpl、CoarseGrainedSchedulerBackEnd、CoarseGrainedExecutorBackEnd、TaskSetManager、Executor、ResultTask、ShuffleMapTask

 

BTW:吐槽一下Iteye的编辑器实在太不给力了,插入图片好麻烦,从mac版word复制还带格式乱码。。

 

2.整体调用逻辑图

 实线框内为方法名,连接线上为传递的消息实体

红线:提交计算主体调用流 蓝线:返回数据主体调用流 浅蓝线:事件总线消息分发 绿线:Stage提交回调回路

黄底:重要逻辑实现调用栈 绿底:其他模块相关调用 空心箭头:异步调用/远程通信

 

3.源码详解

Step 1-RDD

// Utils.getIteratorSize _是传入的计算函数,这里是计算每个分区的size,runJob返回Array[Long]。不同Action就是传入不同的func,因此调度逻辑(DagScheduler/TaskScheduler不需要关心具体操作内容)。RDD.foreach()就是直接传入自定义Action

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

 

Step 2-SparkContext

// 对于每个partition执行getIteratorSize。RDD的物理内容上就是一个Seq,Spark中通过迭代子的形式传递RDD内容的引用,逻辑上可以把Iterator[T]就看做RDD本身。func则传递Action操作信息。最终计算落地到RDD.compute(),后文会说到

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {

  runJob(rdd, func, 0 until rdd.partitions.length)

}

 

// cleanedFunc是一个闭包的函数,用到asm来解析class。大致是去掉对闭包无影响的父类、子类、transient属性等,确认闭包可序列化。后续文章再深入分析这个方法。最终getIteratorSize函数传到runJob里

def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = {

  val cleanedFunc = clean(func)

  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)

}

 

def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = {

  val callSite = getCallSite// callSite是通过StackTrace获取调用端的代码行号,用来记日志

  val cleanedFunc = clean(func)

  logInfo("Starting job: " + callSite.shortForm)

  if (conf.getBoolean("spark.logLineage", false)) {// 日志,建议打开

    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)

  }

  // 交给DasScheduler执行,action信息通过cleanedFunc参数传递

  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

  progressBar.foreach(_.finishAll())// 更新进度,也就是控制台看到的进度日志

  rdd.doCheckpoint()// 记录checkpoint

}

 

Step 3-DAGScheduler

// 这步就是异步调用submitJob

def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = {

val start = System.nanoTime

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

waiter.awaitResult() match {// 阻塞等待计算结果

  case JobSucceeded =>

    logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

  case JobFailed(exception: Exception) =>

    logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

    // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.

    val callerStackTrace = Thread.currentThread().getStackTrace.tail

    exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

    throw exception

  }

}

 

def submitJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = {

  // 校验所有partition都存在

  val maxPartitions = rdd.partitions.length

  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

    throw new IllegalArgumentException(

      "Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions)

  }

 

  val jobId = nextJobId.getAndIncrement()// 生成一个jobId

  if (partitions.size == 0) {// 没有partition,啥也不用做

    return new JobWaiter[U](this, jobId, 0, resultHandler)

  }

 

  assert(partitions.size > 0)

  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

  // JobWaiter是类似Future的异步任务封装

  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

      // 封装一个提交消息,JobSubmitted实现了DAGSchedulerEvent,EventLoop是一个消息队列,实现类是DAGSchedulerEventProcessLoop

  eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))

  waiter

}

 

DAGSchedulerEventLoop

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val eventThread = new Thread(name) {

    override def run(): Unit = {

        while (!stopped.get) {

          val event = eventQueue.take()

            onReceive(event)

……

}

 

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    ……//其他消息处理

 

// 回到dagScheduler,这里消息队列只是做了一次平峰。listener是JobWaiter回调对象

private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) {

  var finalStage: ResultStage = null

  try {

// 根据dependency关系划分Stage。Stage是可以并发执行的Task集合,Stage边界为RDD宽依赖

// 实现是一个广度优先遍历,具体代码参见《内幕》4.2.3节,此处略去

    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

  } catch {

      logWarning("Creating new stage failed due to exception - job: " + jobId, e)

      listener.jobFailed(e)

      return

  }

 

  // 生成一个ActiveJob准备执行,这里func信息封装到了Stage对象里,整个DAG的func信息由Stage.parentStage()关联到RDD起来

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) 

  clearCacheLocs()// 清本地缓存

  logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))

  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

  logInfo("Parents of final stage: " + finalStage.parents)

  logInfo("Missing parents: " + getMissingParentStages(finalStage))

 

  // 注册Job

  val jobSubmissionTime = clock.getTimeMillis()

  jobIdToActiveJob(jobId) = job

  activeJobs += job

  finalStage.setActiveJob(job)

  val stageIds = jobIdToStageIds(jobId).toArray

  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

 

  // 提交一个SparkListenerJobStart事件(SparkListenerEvent),listenerBus实现是AsynchronousListenerBus,也是一个异步消息队列

  listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

  

  submitStage(finalStage)  // 提交Stage

  submitWaitingStages()  // Stage完成后尝试提交等待的Stage,submitWaitingStage其实是一个回调方法,会在很多地方调用,形成回调回路

}

 

// 递归提交stage

private def submitStage(stage: Stage) {

  val jobId = activeJobForStage(stage)

  if (jobId.isDefined) {

    logDebug("submitStage(" + stage + ")")

    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 运行中/等待中/已失败的Stage不用重复触发

      val missing = getMissingParentStages(stage).sortBy(_.id)// 如果存在未完成的Parent Stage,就先提交ParentStage,本Stage等待

      logDebug("missing: " + missing)

      if (missing.isEmpty) {

        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

        submitMissingTasks(stage, jobId.get)

      } else {

        for (parent <- missing) {

          submitStage(parent)

        }

        waitingStages += stage

      }

    }

  } else {

    abortStage(stage, "No active job for stage " + stage.id, None)// 如果Job不存在也就不用计算Stage了

  }

}

 

Step 4-DAGScheduler

/** 提交当前Stage未计算的Task,此时Parent Stage的结果已经available*/

private def submitMissingTasks(stage: Stage, jobId: Int) {

  logDebug("submitMissingTasks(" + stage + ")")

  stage.pendingPartitions.clear()  // 注册PendingTask

 

  /* 获取未计算的分区,分两种实现:1.ShuffleMapStage:通过查找outputLocs(id).isEmpty, outputLocs存放MapStatus对象(ShuffleMapTask计算结果的元数据)

  2.ResultStage:通过!job.finished(partitionId)判断 */

  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

 

  // 仅当当前stage没有部分提交的时候,重置累加器(如果部分提交了,重置会覆盖之前的结果

  if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {

    stage.resetInternalAccumulators()

  }

 

  // 注册状态

  val properties = jobIdToActiveJob(jobId).properties

  runningStages += stage

 

  // 注册Stage各分区重试(Attempt)状态,OutputCommitCoordinator在Driver和Executor端都有,记录Stage运行状态及HDFS相关权限

  // SparkListenerStageSubmitted should be posted before testing whether tasks are serializable. If tasks are not serializable, a SparkListenerStageCompleted event will be posted, which should always come after a corresponding SparkListenerStageSubmitted event.

  stage match {

    case s: ShuffleMapStage =>

      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

    case s: ResultStage =>

      outputCommitCoordinator.stageStart(

        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)

  }

 

// 为每个mission partition分配候选的Executor地址,这里包含本地优先策略(locality),TaskLocation包含Executor的host和executorId。这个Map的value null表示没有优选的executor(不是说不能执行)

 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {

    stage match {

      case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap

      case s: ResultStage =>

        val job = s.activeJob.get

        partitionsToCompute.map { id =>

          val p = s.partitions(id)

          (id, getPreferredLocs(stage.rdd, p))

        }.toMap

    }

  } catch {

    case NonFatal(e) =>

      // 记录一次(已失败的)计算Attempt。StageInfo是一个封装Stage状态的pojo

      stage.makeNewStageAttempt(partitionsToCompute.size)

      listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

      runningStages -= stage

      return

  }

 

  // 记录一次计算Attempt

  stage.makeNewStageAttempt(partitionsToCompute.size,  taskIdToLocations.values.toSeq)

  // 异步注册StageSubmit消息监听,落地到SparkListener这个trait的实现类

  // ListenerBus是异步消息总线,用于Driver端整体的消息分发。消息被所有SparkListener实现类订阅(此处略去一些不重要的实现):

  // SparkFirehoseListener:用户自定义事件监听,这个类只暴露了一个onEvent抽象方法,可统一处理各类型事件

  // JavaSparkListener:用户自定义监听

  // SQLListener:Spark-SQL使用

  // ExecutorAllocationListener:根据负载动态挂载/卸载Executor,注意Executor是Slave节点上的独立进程

  // JobProcessListener:Job进度监听,用于维护各Stage运行状态

  // StorgeListener:存储状态监听,包括在WebUI呈现

  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

 

  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

  // 向所有Executor广播Task内容(RDD序列化后的二进制码)。每个Task都获取一份独立的(序列化的)RDD副本,这使Task之间有更好的隔离性。因此每个Task修改闭包中的引用都是独立的(所以也没有并发问题)This is necessary in Hadoop where the JobConf/Configuration object is not thread-safe.

  var taskBinary: Broadcast[Array[Byte]] = null

  try {

    // ShuffleMapTask, 广播rdd与shuffleDep依赖关系

    // ResultTask, 广播rdd与func(func就是Action操作,包括foreach)

    val taskBinaryBytes: Array[Byte] = stage match {

      case stage: ShuffleMapStage =>

      // closureSerializer默认为JavaSerializer,就是java.io.Serializer默认序列化方式。序列化异常的时候看到的一长串writeExternal堆栈就是这个类调用

        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))

      case stage: ResultStage =>

        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))

    }

    taskBinary = sc.broadcast(taskBinaryBytes)// 广播

  } catch {

    // 序列化失败报的错,如果任务无法序列化则直接放弃

    case e: NotSerializableException =>

      abortStage(stage, "Task not serializable: " + e.toString, Some(e))

      runningStages -= stage

      return

    case NonFatal(e) =>

      abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))

      runningStages -= stage

      return

  }

 

  // 创建Task实例。Task信息传递到Executor有多个渠道,DAG信息通过广播,依赖Jar通过文件下载,Action本身(字节码)通过Akka 

  val tasks: Seq[Task[_]] = try {

    stage match {

      case stage: ShuffleMapStage =>

        partitionsToCompute.map { id =>

          // 附带loc和partition信息

          val locs = taskIdToLocations(id)

          val part = stage.rdd.partitions(id)

          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators)

        }

      case stage: ResultStage =>

        val job = stage.activeJob.get

        partitionsToCompute.map { id =>

          val p: Int = stage.partitions(id)

          val part = stage.rdd.partitions(p)

          val locs = taskIdToLocations(id)

          new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators)

        }

    }

  } catch {

    case NonFatal(e) =>

      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

      runningStages -= stage

      return

  }

 

  // 提交Task

  if (tasks.size > 0) {

    logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")

    stage.pendingPartitions ++= tasks.map(_.partitionId)

    logDebug("New pending partitions: " + stage.pendingPartitions)

    // 封装Task到TaskSet并提交,TaskSet基本就是Task的集合。控制移交到TaskScheduler

    taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

  } else {

    // 没有待执行Task,标志Stage结束

    markStageAsFinished(stage, None)

    val debugString = stage match {

      case stage: ShuffleMapStage =>

        s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})"

      case stage : ResultStage =>

        s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"

    }

    logDebug(debugString)

  }

}

 

 

private def getPreferredLocsInternal(rdd: RDD[_],partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {

  // If the partition has already been visited, no need to re-visit.

  // This avoids exponential path exploration.  SPARK-695

  if (!visited.add((rdd, partition))) {

    // Nil has already been returned for previously visited partitions.

    return Nil

  }

  // 先拿缓存

  val cached = getCacheLocs(rdd)(partition)

  if (cached.nonEmpty) {

    return cached

  }

  // 如果有预设的preference,则根据预设返回。这是递归的终点,所有preference都是从input-rdd沿窄依赖传递的

  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

  if (rddPrefs.nonEmpty) {

    return rddPrefs.map(TaskLocation(_))

  }

  // 递归寻找各个窄依赖

  rdd.dependencies.foreach {

    case n: NarrowDependency[_] =>

      for (inPart <- n.getParents(partition)) {

        val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

        if (locs != Nil) {

          return locs

        }

      }

    case _ =>

  }

  Nil

}

 

以上完成DagScheduler提交逻辑,后续逻辑由TaskScheduler继续实现,未完待续

  • 大小: 251.6 KB
分享到:
评论

相关推荐

    spark2.2.0源码------

    1. **DataFrame/Dataset API强化**:Spark 2.2.0进一步提升了DataFrame和Dataset API的性能和易用性。DataFrame API提供了SQL-like查询接口,而Dataset API则支持类型安全和强类型编程。在这一版本中,API更加成熟,...

    spark源码:spark-master.zip

    spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。

    spark--bin-hadoop3-without-hive.tgz

    本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...

    spark2.1.0-bin-hadoop2.7

    1. 下载:首先,你需要下载Spark 2.1.0与Hadoop 2.7兼容的二进制包,即`spark-2.1.0-bin-hadoop2.7.tgz`。 2. 解压:在Linux服务器上,使用`tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz`命令解压文件。 3. 配置环境...

    spark-2.4.7-bin-hadoop2.6.tgz

    此外,可以通过`spark-submit`脚本提交应用程序到Spark集群执行,或直接在Spark Shell中交互式探索数据。 总结来说,Spark 2.4.7是大数据处理领域的重要工具,它的高性能、易用性和丰富的功能使其在数据科学和工程...

    spark-3.1.3-bin-without-hadoop.tgz

    1. 解压压缩包:使用tar命令解压文件,例如`tar -xvf spark-3.1.3-bin-without-hadoop.tgz`。 2. 配置环境变量:在`~/.bashrc`或`~/.bash_profile`中设置SPARK_HOME,并将Spark的bin目录添加到PATH。 3. 如果在...

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....

    spark-1.6.0-bin-hadoop2.6.tgz

    开发者可以根据需求选择合适的语言编写应用程序,然后使用`spark-submit`脚本来提交任务到集群。 **6. 性能调优** Spark性能优化主要包括内存管理、任务调度和数据本地性等方面。可以通过调整`spark.executor....

    北风网spark课程源码spark-study-scala.rar

    北风网spark课程源码spark-study-scala.rar,

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    对于应用程序开发,可以使用Scala、Java、Python或R编写代码,然后通过`spark-submit`脚本提交作业到集群。 6. **性能优化**: Spark提供了一系列性能优化手段,如Tungsten内存管理、Code Generation、Shuffle优化等...

    spark-3.0.0-bin-hadoop3.2

    在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...

    spark-2.4.7-bin-hadoop2.7.tar.gz

    该文件为2.4.7版本的spark包(spark-2.4.7-bin-hadoop2.7.tar.gz)

    spark-assembly-1.5.2-hadoop2.6.0.jar

    《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...

    spark-3.2.1-bin-hadoop2.7.tgz

    这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...

    spark-3.2.2-bin-3.0.0-cdh6.3.2

    内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql

    spark-3.2.0-bin-hadoop3.2.tgz

    这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...

    spark-2.2.2-bin-hadoop2.7.tgz

    1. `bin`:存放可执行脚本,如`spark-submit`用于提交Spark应用,`spark-shell`提供交互式Shell环境。 2. `conf`:配置文件夹,存放默认配置模板,如`spark-defaults.conf`,用户可以根据需求自定义配置。 3. `jars`...

Global site tag (gtag.js) - Google Analytics