- 浏览: 74313 次
前面一片文章介绍了SocketTextStream 是如何从blockmanager里面获取block组成blockRDD的, 地址:
http://humingminghz.iteye.com/admin/blogs/2310003
那么接下来关注一下block是怎么存到blockmanager里面的。
还是从receiverTracker.start() 入手, 会执行ReceiverTracker的start方法:
首先注册了一个endpoint:ReceiverTrackerEndpoint, 里面的实体类是ReceiverTrackerEndpoint
然后执行launchReceivers
首先从receiverInputStreams里面对每个inputstream遍历, 这个inputstream其实就是我们上一篇文章里面说的SocketInputDStream, 执行getReceiver 其实是执行SocketInputDStream类里面的new SocketReceiver方法, 这个方法里面有一个receive方法, 会建立socket连接不停获取数据, receive方法会在onStart里面被调用。
接下来就出发endpoint (ReceiverTrackerEndpoint类)里面的StartAllReceivers(receivers):
这里面主要是拿了有preferredLocation (executor)的receiver, 然后调用 startReceiver 方法去启动receiver:
可以看到里面有一句:
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
他把receivers和startReceiverFunc作为一个Job submit到spark里面去执行, 而且在onComplete里面重复调用。
在startReceiverFunc里面注册生成了一个ReceiverSupervisorImpl, 然后执行他的start()方法, 在start里面做了两个事:
1.执行ReceiverSupervisorImpl的onstart
2.执行ReceiverSupervisor的startReceiver
在onstart里面做了:
启动了BlockGenerator, 从名字上面来看, sparkstreaming就是通过这个类来生成block的, 看看他的start方法:
里面也做了两个事:
1.blockIntervalTimer.start()
2.blockPushingThread.start()
对1, blockIntervalTimer 是这样定义的:
这个类我们在之前的文章看到过, 会重复的根据blockIntervalMs时间不停重复执行updateCurrentBuffer
updateCurrentBuffer:
简单的来说这个方法就是把已经收集到的数据生成一个block 然后放到blocksForPushing里面, 在初始化currentBuffer。 好了到这里已经知道block是怎么生成的了。
但是从前面的文章我们知道实际数据是从blockManager里面获取的, 那么block是怎么存到blockmanager里面的呢, 我们要看blockPushingThread.start方法了:
其实只是一个thread, 所以在start方法里面会跑 run, 这个run里面执行了keepPushingBlocks:
可以看到里面是只要有block生成, 就从blocksForPushing里面把block拿出来做一个pushBlock(block)的动作,
pushBlock(block)最终是调用listener的onPushBlock:
看一下listener的定义:
看到onPushBlock是做了 pushArrayBuffer(arrayBuffer, None, Some(blockId))
然后再看 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
里面调用了receivedBlockHandler.storeBlock(blockId, receivedBlock)
我们看一下receivedBlockHandler的定义:
结果就是要么返回一个WriteAheadLogBasedBlockHandler,要么BlockManagerBasedBlockHandler, 我们拿一个作为列子, 看一下BlockManagerBasedBlockHandler:
可以看到里面有一个storeBlock方法, 就是前面调用的:
这里面就很简单了, 就是把blockbuffer作为Iterator放到blockManager 供SockettextStream去拿
整个过程基本上就是这样
http://humingminghz.iteye.com/admin/blogs/2310003
那么接下来关注一下block是怎么存到blockmanager里面的。
还是从receiverTracker.start() 入手, 会执行ReceiverTracker的start方法:
def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }
首先注册了一个endpoint:ReceiverTrackerEndpoint, 里面的实体类是ReceiverTrackerEndpoint
然后执行launchReceivers
private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) }
首先从receiverInputStreams里面对每个inputstream遍历, 这个inputstream其实就是我们上一篇文章里面说的SocketInputDStream, 执行getReceiver 其实是执行SocketInputDStream类里面的new SocketReceiver方法, 这个方法里面有一个receive方法, 会建立socket连接不停获取数据, receive方法会在onStart里面被调用。
接下来就出发endpoint (ReceiverTrackerEndpoint类)里面的StartAllReceivers(receivers):
case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) }
这里面主要是拿了有preferredLocation (executor)的receiver, 然后调用 startReceiver 方法去启动receiver:
private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") }
可以看到里面有一句:
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
他把receivers和startReceiverFunc作为一个Job submit到spark里面去执行, 而且在onComplete里面重复调用。
在startReceiverFunc里面注册生成了一个ReceiverSupervisorImpl, 然后执行他的start()方法, 在start里面做了两个事:
def start() { onStart() startReceiver() }
1.执行ReceiverSupervisorImpl的onstart
2.执行ReceiverSupervisor的startReceiver
在onstart里面做了:
override protected def onStart() { registeredBlockGenerators.foreach { _.start() } }
启动了BlockGenerator, 从名字上面来看, sparkstreaming就是通过这个类来生成block的, 看看他的start方法:
def start(): Unit = synchronized { if (state == Initialized) { state = Active blockIntervalTimer.start() blockPushingThread.start() logInfo("Started BlockGenerator") } else { throw new SparkException( s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]") } }
里面也做了两个事:
1.blockIntervalTimer.start()
2.blockPushingThread.start()
对1, blockIntervalTimer 是这样定义的:
private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
这个类我们在之前的文章看到过, 会重复的根据blockIntervalMs时间不停重复执行updateCurrentBuffer
updateCurrentBuffer:
private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
简单的来说这个方法就是把已经收集到的数据生成一个block 然后放到blocksForPushing里面, 在初始化currentBuffer。 好了到这里已经知道block是怎么生成的了。
但是从前面的文章我们知道实际数据是从blockManager里面获取的, 那么block是怎么存到blockmanager里面的呢, 我们要看blockPushingThread.start方法了:
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
其实只是一个thread, 所以在start方法里面会跑 run, 这个run里面执行了keepPushingBlocks:
private def keepPushingBlocks() { logInfo("Started block pushing thread") def areBlocksBeingGenerated: Boolean = synchronized { state != StoppedGeneratingBlocks } try { // While blocks are being generated, keep polling for to-be-pushed blocks and push them. while (areBlocksBeingGenerated) { Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") while (!blocksForPushing.isEmpty) { val block = blocksForPushing.take() logDebug(s"Pushing block $block") pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } }
可以看到里面是只要有block生成, 就从blocksForPushing里面把block拿出来做一个pushBlock(block)的动作,
pushBlock(block)最终是调用listener的onPushBlock:
private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) }
看一下listener的定义:
private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } }
看到onPushBlock是做了 pushArrayBuffer(arrayBuffer, None, Some(blockId))
def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) }
然后再看 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
里面调用了receivedBlockHandler.storeBlock(blockId, receivedBlock)
我们看一下receivedBlockHandler的定义:
private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } }
结果就是要么返回一个WriteAheadLogBasedBlockHandler,要么BlockManagerBasedBlockHandler, 我们拿一个作为列子, 看一下BlockManagerBasedBlockHandler:
可以看到里面有一个storeBlock方法, 就是前面调用的:
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } BlockManagerBasedStoreResult(blockId, numRecords) }
这里面就很简单了, 就是把blockbuffer作为Iterator放到blockManager 供SockettextStream去拿
整个过程基本上就是这样
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1115最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1438之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1852前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1486前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3381之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4133在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6671前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1488前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10173前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4668一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4739在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8807最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7417林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
Receiver在Spark Streaming中是一个重要的组件,它负责从数据源接收数据。例如,使用Kafka作为输入源时,Receiver将从Kafka主题中读取数据并将其存储在Executor内存中。 在容错方面,Spark Streaming支持两种容错...
标题中的“Flume push数据到SparkStreaming”是指在大数据处理领域中,使用Apache Flume将实时数据流推送到Apache Spark Streaming进行进一步的实时分析和处理的过程。这两个组件都是Apache Hadoop生态系统的重要...
Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...
SparkStreaming之Dstream入门 Spark Streaming是Apache Spark中的一个组件,用于处理流式数据。它可以从多种数据源中接收数据,如Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等,并使用Spark的高度抽象原语如...
接收的数据可以通过Socket或其他协议读取,例如,监听特定端口上的数据流,并将这些数据转化为RDD,再注入到SparkStreaming的计算流程中。 WordCount的实现过程中,我们首先通过`map`操作将输入的每行文本转换为...
Receiver-based Approach 是 Spark Streaming 早期的数据接收方式,该方式下 Spark Streaming 会启动一个 ReceiverSupervisor,负责启动 KafkaReceiver 并接收数据。数据接收流程如下: 1. ReceiverSupervisor 启动...
* Spark Streaming:Spark Streaming是一个基于Spark Core的流处理系统,主要用于处理大规模的数据流。Spark Streaming的架构主要包括Driver、Executor、Receiver三个组件。Spark Streaming具有良好的可扩展性和高...
DStream(Discretized Stream)是SparkStreaming中的核心抽象,表示连续的数据流,由一系列的RDD组成,每个RDD对应一个批次的数据。对DStream的操作会转化为对底层RDD的操作,从而实现数据流的转换和计算。 总的来...
Flume的Source可以配置为读取日志文件或监听数据库binlog,然后将数据发送到Spark Streaming的接收器(Receiver)。Spark Streaming作为Flume的Sink,接收数据并进行实时处理。这种集成简化了数据流的处理流程,提高...
在这种模式下,Spark Streaming 会启动一个 Receiver,通常是 KafkaReceiver,该 Receiver 在后台线程中从 Kafka 主题订阅并接收消息。Receiver 将接收到的数据存储到 Spark 内部的数据结构中,如 BlockGenerator。...
Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间...
Learn the right cutting-edge skills and knowledge to leverage Spark Streaming to implement a wide array of real-time, streaming applications. Pro Spark Streaming walks you through end-to-end real-time...
Spark Streaming是Apache Spark项目的一部分,它提供了一个高级抽象来处理实时数据流。在Spark 2.3.x版本中,这个框架进一步增强了其处理大规模、低延迟流数据的能力。本资料包着重介绍了Spark Streaming的核心概念...
总结来说,将MetaQ中的数据传递到Spark涉及的主要步骤包括:配置MetaQ与Kafka的连接,使用Spark Streaming从Kafka消费数据,使用Spark的DataFrame/Dataset API进行数据处理,以及选择合适的方式输出处理结果。...
这个示例代码是用 Scala 编写的,用于演示如何使用 Spark Streaming 消费来自 Kafka 的数据,并将这些数据存储到 HBase 数据库中。Kafka 是一个分布式流处理平台,而 HBase 是一个基于 Hadoop 的非关系型数据库,...
Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理。在 Spark Streaming 中,有一个最基本的抽象叫 DStream(代理),本质上就是一系列连续的 RDD,DStream ...
当Spark on Yarn启动后,Receiver组件负责接收实时数据,将其转化为数据块,并通过SparkAppMaster调度到Executor上执行。每个数据块都会触发一个Spark Job,Job进一步被分解为Task,由Scheduler分配到空闲Executor...
Spark Streaming的Direct Approach提供了一种从Kafka中直接消费数据的机制。在这种模式下,Spark Streaming直接与Kafka的底层API交互,而不是使用高级API(如Receiver)。这样可以减少数据在接收过程中的延迟,并...
摘要:眼下大数据领域最热门的词汇之一便是流计算了,而其中最耀眼的无疑是来自...如图一所示,Driver持久化的元数据包括:Block元数据(图1中的绿色箭头):Receiver从网络上接收到的数据,组装成Block后产生的Block元
`spark-streaming-kafka-0-10_2.12-3.0.0.jar`文件即为这一功能的实现,它包含了一系列API和类,使得Spark能够从Kafka消费数据,并将结果写回Kafka或者其他的持久化存储。 2. **Kafka Clients** `kafka-clients-...