`

SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析

阅读更多
前面一片文章介绍了SocketTextStream 是如何从blockmanager里面获取block组成blockRDD的, 地址:
http://humingminghz.iteye.com/admin/blogs/2310003

那么接下来关注一下block是怎么存到blockmanager里面的。

还是从receiverTracker.start() 入手, 会执行ReceiverTracker的start方法:
def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }



首先注册了一个endpoint:ReceiverTrackerEndpoint, 里面的实体类是ReceiverTrackerEndpoint

然后执行launchReceivers


private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map(nis => {
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    })

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
  }


首先从receiverInputStreams里面对每个inputstream遍历, 这个inputstream其实就是我们上一篇文章里面说的SocketInputDStream, 执行getReceiver 其实是执行SocketInputDStream类里面的new SocketReceiver方法, 这个方法里面有一个receive方法, 会建立socket连接不停获取数据, receive方法会在onStart里面被调用。

接下来就出发endpoint (ReceiverTrackerEndpoint类)里面的StartAllReceivers(receivers):
 case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }



这里面主要是拿了有preferredLocation (executor)的receiver, 然后调用 startReceiver 方法去启动receiver:


    private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf =
        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(submitJobThreadPool)
      logInfo(s"Receiver ${receiver.streamId} started")
    }



可以看到里面有一句:
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

他把receivers和startReceiverFunc作为一个Job submit到spark里面去执行, 而且在onComplete里面重复调用。

在startReceiverFunc里面注册生成了一个ReceiverSupervisorImpl, 然后执行他的start()方法, 在start里面做了两个事:
def start() {
    onStart()
    startReceiver()
  }



1.执行ReceiverSupervisorImpl的onstart
2.执行ReceiverSupervisor的startReceiver

在onstart里面做了:

 override protected def onStart() {
    registeredBlockGenerators.foreach { _.start() }
  }


启动了BlockGenerator, 从名字上面来看, sparkstreaming就是通过这个类来生成block的, 看看他的start方法:
  def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      blockIntervalTimer.start()
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
  }


里面也做了两个事:
1.blockIntervalTimer.start()
2.blockPushingThread.start()

对1, blockIntervalTimer 是这样定义的:
private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")



这个类我们在之前的文章看到过, 会重复的根据blockIntervalMs时间不停重复执行updateCurrentBuffer

updateCurrentBuffer:
private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) {
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          listener.onGenerateBlock(blockId)
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }

      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }


简单的来说这个方法就是把已经收集到的数据生成一个block 然后放到blocksForPushing里面, 在初始化currentBuffer。 好了到这里已经知道block是怎么生成的了。

但是从前面的文章我们知道实际数据是从blockManager里面获取的, 那么block是怎么存到blockmanager里面的呢, 我们要看blockPushingThread.start方法了:
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

其实只是一个thread, 所以在start方法里面会跑 run, 这个run里面执行了keepPushingBlocks:
private def keepPushingBlocks() {
    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }



可以看到里面是只要有block生成, 就从blocksForPushing里面把block拿出来做一个pushBlock(block)的动作,

pushBlock(block)最终是调用listener的onPushBlock:
  private def pushBlock(block: Block) {
    listener.onPushBlock(block.id, block.buffer)
    logInfo("Pushed block " + block.id)
  }


看一下listener的定义:
 private val defaultBlockGeneratorListener = new BlockGeneratorListener {
    def onAddData(data: Any, metadata: Any): Unit = { }

    def onGenerateBlock(blockId: StreamBlockId): Unit = { }

    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }



看到onPushBlock是做了 pushArrayBuffer(arrayBuffer, None, Some(blockId))

  def pushArrayBuffer(
      arrayBuffer: ArrayBuffer[_],
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
  }


然后再看 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)

  def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    val numRecords = blockStoreResult.numRecords
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    logDebug(s"Reported block $blockId")
  }


里面调用了receivedBlockHandler.storeBlock(blockId, receivedBlock)

我们看一下receivedBlockHandler的定义:
 private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      }
      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
  }


结果就是要么返回一个WriteAheadLogBasedBlockHandler,要么BlockManagerBasedBlockHandler, 我们拿一个作为列子, 看一下BlockManagerBasedBlockHandler:



可以看到里面有一个storeBlock方法, 就是前面调用的:
 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

    var numRecords = None: Option[Long]

    val putResult: Seq[(BlockId, BlockStatus)] = block match {
      case ArrayBufferBlock(arrayBuffer) =>
        numRecords = Some(arrayBuffer.size.toLong)
        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
          tellMaster = true)
      case IteratorBlock(iterator) =>
        val countIterator = new CountingIterator(iterator)
        val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
          tellMaster = true)
        numRecords = countIterator.count
        putResult
      case ByteBufferBlock(byteBuffer) =>
        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
      case o =>
        throw new SparkException(
          s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
    }
    if (!putResult.map { _._1 }.contains(blockId)) {
      throw new SparkException(
        s"Could not store $blockId to block manager with storage level $storageLevel")
    }
    BlockManagerBasedStoreResult(blockId, numRecords)
  }




这里面就很简单了, 就是把blockbuffer作为Iterator放到blockManager 供SockettextStream去拿

整个过程基本上就是这样
1
4
分享到:
评论

相关推荐

    spark Streaming和structed streaming分析

    Receiver在Spark Streaming中是一个重要的组件,它负责从数据源接收数据。例如,使用Kafka作为输入源时,Receiver将从Kafka主题中读取数据并将其存储在Executor内存中。 在容错方面,Spark Streaming支持两种容错...

    Flume push数据到SparkStreaming

    标题中的“Flume push数据到SparkStreaming”是指在大数据处理领域中,使用Apache Flume将实时数据流推送到Apache Spark Streaming进行进一步的实时分析和处理的过程。这两个组件都是Apache Hadoop生态系统的重要...

    Spark Streaming实时流处理项目实战.rar.rar

    Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...

    【SparkStreaming篇01】SparkStreaming之Dstream入门1

    SparkStreaming之Dstream入门 Spark Streaming是Apache Spark中的一个组件,用于处理流式数据。它可以从多种数据源中接收数据,如Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等,并使用Spark的高度抽象原语如...

    【SparkStreaming篇02】SparkStreaming之Dstream创建1

    接收的数据可以通过Socket或其他协议读取,例如,监听特定端口上的数据流,并将这些数据转化为RDD,再注入到SparkStreaming的计算流程中。 WordCount的实现过程中,我们首先通过`map`操作将输入的每行文本转换为...

    Spark Streaming 与 Kafka 集成原理.pdf

    Receiver-based Approach 是 Spark Streaming 早期的数据接收方式,该方式下 Spark Streaming 会启动一个 ReceiverSupervisor,负责启动 KafkaReceiver 并接收数据。数据接收流程如下: 1. ReceiverSupervisor 启动...

    Flink,Storm,Spark Streaming三种流框架的对比分析

    * Spark Streaming:Spark Streaming是一个基于Spark Core的流处理系统,主要用于处理大规模的数据流。Spark Streaming的架构主要包括Driver、Executor、Receiver三个组件。Spark Streaming具有良好的可扩展性和高...

    SparkStreaming

    DStream(Discretized Stream)是SparkStreaming中的核心抽象,表示连续的数据流,由一系列的RDD组成,每个RDD对应一个批次的数据。对DStream的操作会转化为对底层RDD的操作,从而实现数据流的转换和计算。 总的来...

    基于Spark Streaming的实时数据处理系统设计与实现.docx

    Flume的Source可以配置为读取日志文件或监听数据库binlog,然后将数据发送到Spark Streaming的接收器(Receiver)。Spark Streaming作为Flume的Sink,接收数据并进行实时处理。这种集成简化了数据流的处理流程,提高...

    小晨精品Spark Streaming 与 Kafka 集成原理.pdf

    在这种模式下,Spark Streaming 会启动一个 Receiver,通常是 KafkaReceiver,该 Receiver 在后台线程中从 Kafka 主题订阅并接收消息。Receiver 将接收到的数据存储到 Spark 内部的数据结构中,如 BlockGenerator。...

    Spark讲义(下)1

    Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间...

    Pro Spark Streaming(Apress,2016)

    Learn the right cutting-edge skills and knowledge to leverage Spark Streaming to implement a wide array of real-time, streaming applications. Pro Spark Streaming walks you through end-to-end real-time...

    24:Spark2.3.x Streaming实时计算.zip

    Spark Streaming是Apache Spark项目的一部分,它提供了一个高级抽象来处理实时数据流。在Spark 2.3.x版本中,这个框架进一步增强了其处理大规模、低延迟流数据的能力。本资料包着重介绍了Spark Streaming的核心概念...

    metaQ向spark传数据

    总结来说,将MetaQ中的数据传递到Spark涉及的主要步骤包括:配置MetaQ与Kafka的连接,使用Spark Streaming从Kafka消费数据,使用Spark的DataFrame/Dataset API进行数据处理,以及选择合适的方式输出处理结果。...

    spark streamming消费kafka数据存入hbase示例代码

    这个示例代码是用 Scala 编写的,用于演示如何使用 Spark Streaming 消费来自 Kafka 的数据,并将这些数据存储到 HBase 数据库中。Kafka 是一个分布式流处理平台,而 HBase 是一个基于 Hadoop 的非关系型数据库,...

    Spark学习笔记Spark Streaming的使用

    Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理。在 Spark Streaming 中,有一个最基本的抽象叫 DStream(代理),本质上就是一系列连续的 RDD,DStream ...

    SparkStreaming实时计算框架介绍

    当Spark on Yarn启动后,Receiver组件负责接收实时数据,将其转化为数据块,并通过SparkAppMaster调度到Executor上执行。每个数据块都会触发一个Spark Job,Job进一步被分解为Task,由Scheduler分配到空闲Executor...

    spark源码分析系列

    Spark Streaming的Direct Approach提供了一种从Kafka中直接消费数据的机制。在这种模式下,Spark Streaming直接与Kafka的底层API交互,而不是使用高级API(如Receiver)。这样可以减少数据在接收过程中的延迟,并...

    论SparkStreaming的数据可靠性和一致性

    摘要:眼下大数据领域最热门的词汇之一便是流计算了,而其中最耀眼的无疑是来自...如图一所示,Driver持久化的元数据包括:Block元数据(图1中的绿色箭头):Receiver从网络上接收到的数据,组装成Block后产生的Block元

    spark-streaming-kafka.rar

    `spark-streaming-kafka-0-10_2.12-3.0.0.jar`文件即为这一功能的实现,它包含了一系列API和类,使得Spark能够从Kafka消费数据,并将结果写回Kafka或者其他的持久化存储。 2. **Kafka Clients** `kafka-clients-...

Global site tag (gtag.js) - Google Analytics