- 浏览: 70146 次
- 性别:
- 来自: 北京
最新评论
-
onlinetomcat:
创建java工程这个jar可以和冲突的jar使用吗
elasticsearch与spark,hbase等jar包冲突导致报错问题 -
字母哥:
hae 写道你的输入文件是从哪里来的,格式是什么样的。已经上传 ...
hadoop处理手机流量小例子 -
字母哥:
lvwenwen 写道文件格式是什么样。已经上传
hadoop处理手机流量小例子 -
lvwenwen:
文件格式是什么样。
hadoop处理手机流量小例子 -
hae:
你的输入文件是从哪里来的,格式是什么样的。
hadoop处理手机流量小例子
基于standalone模式
这里,我们主要关注最主要的2个地方的初始化,首先是TaskScheduler的创建初始化。
这里我们发现还会初始化SchedulerBackend,这里我们继续看createTaskScheduler方法
首先创建TaskSchedulerImpl,里面有2个比较重要的变量
然后看创建SparkDeploySchedulerBackend,最主要的方法在下面,因为会之后的程序会调用这个方法
首先会调用父类的start方法
主要是创建了driver的代理对象,可以给driver发送消息的对象。回到上面的start方法,主要是
直接找到这个start方法
就是创建了一个代理对象,看下这个代理对象
因为继承了ThreadSafeRpcEndpoint这个类,也就会依次调用onstart, receive等方法,类似与上一遍博客中akka的生命周期
那么会首先调用
直接看registerWithMaster方法,像master注册我们之前封装好的application
会调用master的代理对象,然后调用send方法,send方法实际就是底层调用akka方法,这里我们可以先看下,找到这个AkkaRpcEnv 这个类,看下面的方法,也就是上面调用的setupEndpointRef方法
注意最后会返回AkkaRpcEndpointRef对象,而这个对象重写了send方法
也就是akka的方法了,很显然。我们回到最初的sparkcontext
其实这个start的方法被调用时,backend才会被启动。才会去master注册application
这里就分析完了,driver向master注册application。下篇博客会继续往后分析注册完后,对资源进行调度,然后分配executor.
这里,我们主要关注最主要的2个地方的初始化,首先是TaskScheduler的创建初始化。
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
这里我们发现还会初始化SchedulerBackend,这里我们继续看createTaskScheduler方法
case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler)
首先创建TaskSchedulerImpl,里面有2个比较重要的变量
// Listener object to pass upcalls into var dagScheduler: DAGScheduler = null var backend: SchedulerBackend = null
然后看创建SparkDeploySchedulerBackend,最主要的方法在下面,因为会之后的程序会调用这个方法
override def start() { super.start() // The endpoint for executors to talk to us val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName, RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt), CoarseGrainedSchedulerBackend.ENDPOINT_NAME) val args = Seq( "--driver-url", driverUrl, "--executor-id", "{{EXECUTOR_ID}}", "--hostname", "{{HOSTNAME}}", "--cores", "{{CORES}}", "--app-id", "{{APP_ID}}", "--worker-url", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) // When testing, expose the parent class path to the child. This is processed by // compute-classpath.{cmd,sh} and makes all needed jars available to child processes // when the assembly is built with the "*-provided" profiles enabled. val testingClassPath = if (sys.props.contains("spark.testing")) { sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq } else { Nil } // Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() waitForRegistration() }
首先会调用父类的start方法
override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { properties += ((key, value)) } } // TODO (prashant) send conf instead of properties driverEndpoint = rpcEnv.setupEndpoint( CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties)) }
主要是创建了driver的代理对象,可以给driver发送消息的对象。回到上面的start方法,主要是
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start()
直接找到这个start方法
def start() { // Just launch an rpcEndpoint; it will call back into the listener. endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)) }
就是创建了一个代理对象,看下这个代理对象
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { private var master: Option[RpcEndpointRef] = None // To avoid calling listener.disconnected() multiple times private var alreadyDisconnected = false @volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times @volatile private var registerMasterFutures: Array[JFuture[_]] = null @volatile private var registrationRetryTimer: JScheduledFuture[_] = null // A thread pool for registering with masters. Because registering with a master is a blocking // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same // time so that we can register with all masters. private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool( "appclient-register-master-threadpool", masterRpcAddresses.length // Make sure we can register with all masters at the same time ) // A scheduled executor for scheduling the registration actions private val registrationRetryThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") override def onStart(): Unit = { try { registerWithMaster(1) } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() stop() } } /** * Register with all masters asynchronously and returns an array `Future`s for cancellation. */ private def tryRegisterAllMasters(): Array[JFuture[_]] = { for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) masterRef.send(RegisterApplication(appDescription, self)) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } }) } } /** * Register with all masters asynchronously. It will call `registerWithMaster` every * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times. * Once we connect to a master successfully, all scheduling work and Futures will be cancelled. * * nthRetry means this is the nth attempt to register with master. */ private def registerWithMaster(nthRetry: Int) { registerMasterFutures = tryRegisterAllMasters() registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = { Utils.tryOrExit { if (registered) { registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { registerMasterFutures.foreach(_.cancel(true)) registerWithMaster(nthRetry + 1) } } } }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS) } /** * Send a message to the current master. If we have not yet registered successfully with any * master, the message will be dropped. */ private def sendToMaster(message: Any): Unit = { master match { case Some(masterRef) => masterRef.send(message) case None => logWarning(s"Drop $message because has not yet connected to master") } } private def isPossibleMaster(remoteAddress: RpcAddress): Boolean = { masterRpcAddresses.contains(remoteAddress) } override def receive: PartialFunction[Any, Unit] = { case RegisteredApplication(appId_, masterRef) => // FIXME How to handle the following cases? // 1. A master receives multiple registrations and sends back multiple // RegisteredApplications due to an unstable network. // 2. Receive multiple RegisteredApplication from different masters because the master is // changing. appId = appId_ registered = true master = Some(masterRef) listener.connected(appId) case ApplicationRemoved(message) => markDead("Master removed our application: %s".format(message)) stop() case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) => val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) // FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not // guaranteed), `ExecutorStateChanged` may be sent to a dead master. sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) master = Some(masterRef) alreadyDisconnected = false masterRef.send(MasterChangeAcknowledged(appId)) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case StopAppClient => markDead("Application has been stopped.") sendToMaster(UnregisterApplication(appId)) context.reply(true) stop() case r: RequestExecutors => master match { case Some(m) => context.reply(m.askWithRetry[Boolean](r)) case None => logWarning("Attempted to request executors before registering with Master.") context.reply(false) } case k: KillExecutors => master match { case Some(m) => context.reply(m.askWithRetry[Boolean](k)) case None => logWarning("Attempted to kill executors before registering with Master.") context.reply(false) } } override def onDisconnected(address: RpcAddress): Unit = { if (master.exists(_.address == address)) { logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() } } override def onNetworkError(cause: Throwable, address: RpcAddress): Unit = { if (isPossibleMaster(address)) { logWarning(s"Could not connect to $address: $cause") } } /** * Notify the listener that we disconnected, if we hadn't already done so before. */ def markDisconnected() { if (!alreadyDisconnected) { listener.disconnected() alreadyDisconnected = true } } def markDead(reason: String) { if (!alreadyDead) { listener.dead(reason) alreadyDead = true } } override def onStop(): Unit = { if (registrationRetryTimer != null) { registrationRetryTimer.cancel(true) } registrationRetryThread.shutdownNow() registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() } }
因为继承了ThreadSafeRpcEndpoint这个类,也就会依次调用onstart, receive等方法,类似与上一遍博客中akka的生命周期
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint /** * An end point for the RPC that defines what functions to trigger given a message. * * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence. * * The life-cycle of an endpoint is: * * constructor -> onStart -> receive* -> onStop * * Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use * [[ThreadSafeRpcEndpoint]] * * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be * invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
那么会首先调用
override def onStart(): Unit = { try { registerWithMaster(1) } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() stop() } }
直接看registerWithMaster方法,像master注册我们之前封装好的application
/** * Register with all masters asynchronously and returns an array `Future`s for cancellation. */ private def tryRegisterAllMasters(): Array[JFuture[_]] = { for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) masterRef.send(RegisterApplication(appDescription, self)) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } }) } }
会调用master的代理对象,然后调用send方法,send方法实际就是底层调用akka方法,这里我们可以先看下,找到这个AkkaRpcEnv 这个类,看下面的方法,也就是上面调用的setupEndpointRef方法
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { @volatile var endpointRef: AkkaRpcEndpointRef = null // Use lazy because the Actor needs to use `endpointRef`. // So `actorRef` should be created after assigning `endpointRef`. lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { assert(endpointRef != null) override def preStart(): Unit = { // Listen for remote client network events context.system.eventStream.subscribe(self, classOf[AssociationEvent]) safelyCall(endpoint) { endpoint.onStart() } } override def receiveWithLogging: Receive = { case AssociatedEvent(_, remoteAddress, _) => safelyCall(endpoint) { endpoint.onConnected(akkaAddressToRpcAddress(remoteAddress)) } case DisassociatedEvent(_, remoteAddress, _) => safelyCall(endpoint) { endpoint.onDisconnected(akkaAddressToRpcAddress(remoteAddress)) } case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _) => safelyCall(endpoint) { endpoint.onNetworkError(cause, akkaAddressToRpcAddress(remoteAddress)) } case e: AssociationEvent => // TODO ignore? case m: AkkaMessage => logDebug(s"Received RPC message: $m") safelyCall(endpoint) { processMessage(endpoint, m, sender) } case AkkaFailure(e) => safelyCall(endpoint) { throw e } case message: Any => { logWarning(s"Unknown message: $message") } } override def postStop(): Unit = { unregisterEndpoint(endpoint.self) safelyCall(endpoint) { endpoint.onStop() } } }), name = name) endpointRef = new AkkaRpcEndpointRef(defaultAddress, actorRef, conf, initInConstructor = false) registerEndpoint(endpoint, endpointRef) // Now actorRef can be created safely endpointRef.init() endp
注意最后会返回AkkaRpcEndpointRef对象,而这个对象重写了send方法
override def send(message: Any): Unit = { actorRef ! AkkaMessage(message, false) }
也就是akka的方法了,很显然。我们回到最初的sparkcontext
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start()
其实这个start的方法被调用时,backend才会被启动。才会去master注册application
这里就分析完了,driver向master注册application。下篇博客会继续往后分析注册完后,对资源进行调度,然后分配executor.
发表评论
-
spark参数设置
2017-10-30 15:12 1757//設置sparkconf參數 val sparkCo ... -
修改并编译spark源码
2017-08-09 10:11 1259这里说一下spark源码的编译,可以修改一些源码,进行编译,这 ... -
scala调用api操作elasticsearch时long类型问题解决
2017-07-27 17:34 909在scala写spark程序的时候,为了方便控制,会调用jav ... -
Akka的生命周期
2017-07-04 09:52 1168首先定义一个类,继承Actorclass akka001 ex ... -
akka简单入门例子
2016-10-25 17:34 1286spark版本里面用到的就是akka通信,2.0版本 已经不再 ... -
spark重要的几个算子
2016-08-21 17:52 2856spark中有几个算子比较重要,开发中不是很常用,但很多算子的 ... -
elasticsearch与spark,hbase等jar包冲突导致报错问题
2016-07-19 21:52 5816在原有的spark程序中,其中包含hhase,spark等,会 ... -
spark实现hadoop中获取文件名的功能
2016-02-19 17:37 14748hadoop1版本中提供了获取文件名的功能,就是在map阶段可 ... -
flume+kafka+sparkstreaming搭建整合
2015-11-22 00:03 3592主要是数据从flume进去kafka,然后交给sparkstr ... -
flume整合sparkstreaming
2015-11-21 20:48 1500这里写一个flume整合spark ...
相关推荐
在源码分析部分,读者会了解到Spark如何通过`SparkContext`初始化,如何调度任务,`Executor`如何执行任务,以及`RDD`的创建、转换和行动操作的实现细节。此外,还会深入到`Shuffle`过程、错误恢复机制、存储策略...
例如,`ExamplePlugin`实现了初始化、关闭、卸载等方法,而`ExamplePreference`定义了插件的属性。 5. **其他包**: - `org.jivesoftware`:这部分源码主要与Openfire项目相关,Openfire是一个基于Spark即时通讯...
, 核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。通过这部分的内容,读者可以通过源码剖析更加深入理解Spark的核心设计与实现,以便在...
Spark 初始化源码阅读 Spark on YARN 的 Client 和 Cluster 区别 Spark 是一个大数据处理的开源框架,它可以在多种集群管理器上运行,如 YARN、Mesos 和 Standalone。Spark on YARN 是 Spark 在 YARN 集群管理器上...
讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...
#### SparkContext初始化 - `SparkContext`的创建需要一个`SparkConf`对象作为输入,该对象包含了Spark集群配置的各种参数。 - 在`WordCount`示例中,`SparkConf`对象被用来设置应用名称和主节点地址。 ### spark-...
核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。通过这部分的内容,读者可以通过源码剖析更加深入理解Spark的核心设计与实现,以便在...
, 核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。通过这部分的内容,读者可以通过源码剖析更加深入理解Spark的核心设计与实现,以便在...
在 SparkContext 初始化的过程中被实例化,一个 SparkContext 对应创建一个 DAGScheduler。DAGScheduler 完成以下工作:划分 Stage(TaskSet),记录哪个 RDD 或者 Stage 输出被物化(缓存),重新提交出错/失败的 ...
源码分析中会深入探讨如何初始化SparkContext,以及它如何协调执行任务,包括数据分区、RDD创建和持久化等关键操作。 接着,`DAGScheduler`(11-2,DAGScheduler源码分析(stage划分算法、task最佳位置计算算法)....
- **SparkContext的初始化**:如何连接到集群并创建工作环境。 - **RDD的创建与转换**:理解如何定义和操作RDD,以及它们如何在内存和磁盘间移动。 - **Stage与Task的划分**:了解Spark如何将DAG拆分成任务并调度...
1. **初始化**:创建`SparkContext`或`SparkSession`。 2. **数据加载**:从外部系统加载数据,例如HDFS、HBase或其他数据源。 3. **数据转换**:对数据进行转换操作,如map、filter、reduceByKey等。 4. **结果保存...
`BinaryClassificationMetrics`用于评估模型性能,`MLUtils`提供了一些有用的工具方法来加载和保存数据,以及`SparkConf`和`SparkContext`用于初始化Spark应用程序。 ##### 3.2 初始化Spark上下文 ```scala object ...