`

Sparkstreaming是如何获取数据组成Dstream的源码浅析

阅读更多
前面一篇文章介绍了SparkStreaming是如何不停的循环submitJob的, 连接:
http://humingminghz.iteye.com/admin/blogs/2308711

既然已经知道了Spark Streaming如何循环处理, 那么我们就要看一下处理过程中是怎么获取到Dstream的, 用简单的socketTextStream 来做一个列子, 入口:

val lines = scc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK)


在socketTextStream直接调用socketTextStream, 再到SocketInputDStream

def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  }

  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }




可以看到实际上是创建了一个SocketInputDStream对象, 而SocketInputDStream是继承自ReceiverInputDStream的:

private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}



先看一下这个getReceiver, 创建了一个receiver: SocketReceiver,SocketReceiver里面主要看他的receive方法:
 /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }


可以看到这个receiver主要就是连接socket获取传输过来的数据然后存起来到block里面

那么SocketTextStreaming又是怎么从block里面把数据拿出来组成RDD的呢,那么就要看 SocketInputDStream继承的父类ReceiverInputDStream了, 点进去看有个compute方法, 我们知道这个方法是会在action里面调用, 那么compute方法里面做了什么呢, 我们先看一下代码怎么写的:

override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

看到如果时间是正常的, 首先会拿到一个receiverTracker (在ssc.start方法里面tracker已经启动)然后从receivertracker里面拿到特定时间的blocks, 然后根据获取到的blockinfos去创建BlockRDD (createBlockRDD(validTime, blockInfos))

createBlockRDD方法:

 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {

    if (blockInfos.nonEmpty) {
      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

      // Are WAL record handles present with all the blocks
      val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

      if (areWALRecordHandlesPresent) {
        // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
        val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
        val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
      } else {
        // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
        // others then that is unexpected and log a warning accordingly.
        if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
          if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
            logError("Some blocks do not have Write Ahead Log information; " +
              "this is unexpected and data may not be recoverable after driver failures")
          } else {
            logWarning("Some blocks have Write Ahead Log information; this is unexpected")
          }
        }
        val validBlockIds = blockIds.filter { id =>
          ssc.sparkContext.env.blockManager.master.contains(id)
        }
        if (validBlockIds.size != blockIds.size) {
          logWarning("Some blocks could not be recovered as they were not found in memory. " +
            "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
            "for more details.")
        }
        new BlockRDD[T](ssc.sc, validBlockIds)
      }
    } else {
      // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
      // according to the configuration
      if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, Array.empty, Array.empty, Array.empty)
      } else {
        new BlockRDD[T](ssc.sc, Array.empty)
      }
    }
  }


这个方法返回的是一个RDD, 里面先会从blockinfos里面拿出所有blockid:
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

再看是不是所有block都有在WAL里面
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

如果是, 那么就创建返回WriteAheadLogBackedBlockRDD (继承自RDD)

如果不是, 则确认blockmanager里面有这个ID, 并组成validBlockids:
val validBlockIds = blockIds.filter { id =>
          ssc.sparkContext.env.blockManager.master.contains(id)
        }

然后根据validBlockids创建一个blockRDD:
new BlockRDD[T](ssc.sc, validBlockIds)

BlockRDD里面的compute方法是:

 override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    assertValid()
    val blockManager = SparkEnv.get.blockManager
    val blockId = split.asInstanceOf[BlockRDDPartition].blockId
    blockManager.get(blockId) match {
      case Some(block) => block.data.asInstanceOf[Iterator[T]]
      case None =>
        throw new Exception("Could not compute split, block " + blockId + " not found")
    }
  }


可以看到他从blockManager里面根据ID去拿对应的block并组成iterator返回。 到此为止一个blockRDD就生成了, 大家知道Dstream实际上就是一堆RDD的组合, 那么这个就是里面RDD是怎么来的的介绍。

接下来还会看一下blockmanager和receiverTRacker是怎么把数据存入到block里面来管理的。






分享到:
评论

相关推荐

    Apache Spark源码走读之4 -- DStream实时流数据处理

    ### Apache Spark源码走读之四:DStream实时流数据处理 #### 一、系统概述与流数据特性 本文档探讨了Apache Spark Streaming的核心概念之一——**DStream**(Discretized Stream)及其如何实现对实时流数据的有效...

    【SparkStreaming篇01】SparkStreaming之Dstream入门1

    SparkStreaming之Dstream入门 Spark Streaming是Apache Spark中的一个组件,用于处理流式数据。它可以从多种数据源中接收数据,如Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等,并使用Spark的高度抽象原语如...

    Apache Spark源码走读之5 -- DStream处理的容错性分析

    ### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...

    【SparkStreaming篇02】SparkStreaming之Dstream创建1

    DStream是SparkStreaming的核心抽象,它代表了连续不断的数据块序列。DStream可以由多种方式创建,包括从基本的数据源(如Kafka、Flume、HDFS等)或通过转换现有DStream。本文主要介绍了两种创建DStream的方法:通过...

    spark Streaming和structed streaming分析

    Spark Streaming提供了一种名为DStream(Discretized Stream)的高级抽象,其将实时数据流建模为一系列小批量数据。它在处理时,将这些小批量数据转换为RDD(Resilient Distributed Dataset),这是Spark核心API的一...

    基于SparkStreaming的实时音乐推荐系统源码.zip

    在本项目中,"基于SparkStreaming的实时音乐推荐系统源码.zip",主要涉及的是如何利用Apache Spark Streaming这一强大的实时处理框架,构建一个能够实时分析用户行为并进行个性化音乐推荐的系统。Spark Streaming是...

    【SparkStreaming篇03】SparkStreaming之Dstream转换和输出1

    在Spark Streaming中,数据流被划分为一系列的小批量数据,称为Discretized Stream (DStream)。DStream是Spark Streaming的核心抽象,它代表了一种连续不断的数据流。 在本篇文章中,我们将重点讨论两个关键的概念...

    SparkStreaming入门案例

    Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一批一批地数据进行处理。每一...

    Flume push数据到SparkStreaming

    标题中的“Flume push数据到SparkStreaming”是指在大数据处理领域中,使用Apache Flume将实时数据流推送到Apache Spark Streaming进行进一步的实时分析和处理的过程。这两个组件都是Apache Hadoop生态系统的重要...

    Spark Streaming Real-time big-data processing

    DStream是Spark Streaming对连续数据流的概念抽象,它是由一系列连续的RDD(弹性分布式数据集)组成的。每个RDD表示在一个时间窗口内的数据块。通过这种方式,Spark Streaming能够利用RDD的并行计算能力来高效处理...

    spark Streaming和storm的对比

    Spark Streaming的核心概念是DStream(Discretized Stream),它代表了一个连续的数据流,可以由输入数据源创建,也可以通过其他DStream的转换得到。DStream由一系列RDD(Resilient Distributed Dataset,弹性分布式...

    扶剑-Spark Streaming Spark流数据

    标题中提到的“Spark Streaming Spark流数据”指的是Apache Spark的一个子项目,即Spark Streaming,用于进行实时数据流处理。Apache Spark是一个强大的分布式数据处理框架,提供了包括批处理、流处理、机器学习、图...

    spark之sparkStreaming 理解

    - **定义**:Spark Streaming提供了名为DStream(Discretized Stream)的高级抽象,它代表了一个连续的数据流。 - **创建方式**:DStream可以通过来自Kafka、Flume、Kinesis等数据源的输入流创建,也可以通过对现有...

    spark streaming

    除了对数据流的处理,Spark Streaming还支持从Spark的批处理结果中获取数据,实现双向的数据流处理。这意味着Spark Streaming不仅仅是单向实时数据流的处理,还可以是双向数据处理,将实时计算和批量计算有机结合...

    深入理解SparkStreaming执行模型

    Spark Streaming是Apache Spark的重要组成部分,它提供了一种高吞吐量、可容错的实时数据处理方式。Spark Streaming的核心是一个执行模型,这个执行模型基于微批处理(micro-batch processing)的概念,允许将实时数据...

    Spark Streaming实时流处理项目实战.rar.rar

    - DStreams:Spark Streaming中的核心抽象,代表连续的数据流,由一系列时间窗口内的RDDs组成。 - Windowing:用于定义时间间隔,决定数据如何分组和处理。例如,可以按每分钟、每5分钟的数据进行处理。 - ...

    Spark Streaming 示例

    `SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...

    Kafka集成Spark Streaming并写入数据到HBase

    Spark Streaming则提供了基于微批处理的实时计算框架,能够高效地处理持续的数据流;而HBase作为分布式列式数据库,适合存储海量结构化半结构化数据。这三者之间的集成,使得实时数据流能够被快速处理并持久化存储。...

    SparkStreaming和kafka的整合.pdf

    Spark Streaming将数据流分割成一系列小批量的数据块进行处理,这种机制使得Spark Streaming既能够处理实时数据流,又能利用Spark的核心API进行复杂的数据处理。 #### 2. Kafka简介 Apache Kafka是一种分布式的发布...

    Spark Streaming 流式日志过滤的实验资源

    - **DStream(Discretized Stream)**:Spark Streaming 的核心抽象,表示连续的数据流,是由一系列时间间隔内的 RDD(Resilient Distributed Datasets)组成的。 - **窗口操作**:Spark Streaming 提供了时间窗口...

Global site tag (gtag.js) - Google Analytics