`
bit1129
  • 浏览: 1067926 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十六】Spark Streaming之DStream vs. InputDStream

 
阅读更多

 

1. DStream的类说明文档:

 

/**
 * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
 * sequence of RDDs (of the same type) representing a continuous stream of data (see
 * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
 * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
 * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
 * transforming existing DStreams using operations such as `map`,
 * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
 * periodically generates a RDD, either from live data or by transforming the RDD generated by a
 * parent DStream.
 *
 * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
 * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
 * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
 * `join`. These operations are automatically available on any DStream of pairs
 * (e.g., DStream[(Int, Int)] through implicit conversions.
 *
 * DStreams internally is characterized by a few basic properties:
 *  - A list of other DStreams that the DStream depends on
 *  - A time interval at which the DStream generates an RDD
 *  - A function that is used to generate an RDD after each time interval
 */

 

从上面可以看出来,

1. DStream每隔一段时间仅产生一个RDD

2. DStream如同RDD,也有依赖关系,比如通过转换算子,把一个DStream转换成另外一个DStream

3. DStream需要有一个方法,在指定的时间间隔到达后,用于创建RDD。这个方法同RDD计算它包含的数据行一样,是compute方法

  /** Method that generates a RDD for the given time */
  def compute (validTime: Time): Option[RDD[T]]

 

2. DStream的实现类ReceiverInputDStream实现的compute方法

 

 /**
   * Generates RDDs with blocks received by the receiver of this stream. */
 ///根据在一个时间范围内获取到的数据创建RDD(这些接收到的数据由BlockManager管理)
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {
      ////如果时间不在要计算的范围内,则返回空RDD(具体是BlockRDD)
      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        //根据validTime获取Block信息,validTime和Block的对应关系放在一个Map中,因此,validTime是Block的索引,
        //因此可以把validTime看成是一个块创建的时间点(此后,一个时间间隔的数据都写入这个块中)
        //id is an unique identifier for the receiver input stream.
        val blockInfos =
          ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
        //对blockInfos中的每个元素(ReceivedBlockInfo)调用blockStoreResult中
        val blockStoreResults = blockInfos.map { _.blockStoreResult }
        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray

        // Check whether all the results are of the same type
        val resultTypes = blockStoreResults.map { _.getClass }.distinct
        if (resultTypes.size > 1) {
          logWarning("Multiple result types in block information, WAL information will be ignored.")
        }

        // If all the results are of type WriteAheadLogBasedStoreResult, then create
        // WriteAheadLogBackedBlockRDD else create simple BlockRDD.
        if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
          val logSegments = blockStoreResults.map {
            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
          }.toArray
          // Since storeInBlockManager = false, the storage level does not matter.
          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
            blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
        } else {
          ///构造BlockRDD的参数是blockIds,也就是这个RDD包含的数据存放在BlockManager中,可以由blockId读取指定的块
          new BlockRDD[T](ssc.sc, blockIds)
        }
      }
    }
    Some(blockRDD)
  }

 

3. InputDStream的类说明文档

/**
 * This is the abstract base class for all input streams. This class provides methods
 * start() and stop() which is called by Spark Streaming system to start and stop receiving data.
 * Input streams that can generate RDDs from new data by running a service/thread only on
 * the driver node (that is, without running a receiver on worker nodes), can be
 * implemented by directly inheriting this InputDStream. For example,
 * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
 * new files and generates RDDs with the new files. For implementing input streams
 * that requires running a receiver on the worker nodes, use
 * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
 *
 * @param ssc_ Streaming context that will execute this input stream
 */

 

InputDStream定义了接收和停止接收数据的方法(start和stop),InputDStream的直接实现者不需要定义专门的Receiver,它在Driver上即可完成生成RDD的任务。

比如FileInputDStream即是这样一个InputDStream,它监听本地或者HDFS上的新文件,然后生成RDD。

问题:如此说来,FileInputDStream是一个没有实现分布式的数据接收器?

 

4. ReceiverInputDStream

 

/**
 * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
 * that has to start a receiver on worker nodes to receive external data.
 * Specific implementations of ReceiverInputDStream must
 * define `the getReceiver()` function that gets the receiver object of type
 * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
 * to the workers to receive data.
 * @param ssc_ Streaming context that will execute this input stream
 * @tparam T Class type of the object of this stream
 */

 

 ReceiverInputDStream的实现者需要实现getReceiver来回去数据接收器,这些Reciver将运行在所有的Worker节点上。

 

5. 总结

从上面的分析中,DStream是InputDStream的父类,而InputDStream是ReceiverInputDStream的父类。

DStream相对于InputDStream,更像是个RDD,即只是一个包含接收数据的一系列的集合,即DStream是一个数据集。

InputDStream在此基础之上需要实现启动接收和停止接收的逻辑,也就是,InputDStream更像是自来水管的水龙头开关,而DStream是流出来的自来水。

ReceiverInputStream将接收数据的逻辑进行分布式接收

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    【SparkStreaming篇01】SparkStreaming之Dstream入门1

    SparkStreaming之Dstream入门 Spark Streaming是Apache Spark中的一个组件,用于处理流式数据。它可以从多种数据源中接收数据,如Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等,并使用Spark的高度抽象原语如...

    【SparkStreaming篇02】SparkStreaming之Dstream创建1

    在本篇中,我们将深入探讨如何使用SparkStreaming创建DStream(Discretized Stream),并实现一个简单的WordCount应用。 DStream是SparkStreaming的核心抽象,它代表了连续不断的数据块序列。DStream可以由多种方式...

    【SparkStreaming篇03】SparkStreaming之Dstream转换和输出1

    在Spark Streaming中,数据流被划分为一系列的小批量数据,称为Discretized Stream (DStream)。DStream是Spark Streaming的核心抽象,它代表了一种连续不断的数据流。 在本篇文章中,我们将重点讨论两个关键的概念...

    51DStream笔记.docx

    * SparkStreaming 应用层面调优:对 Spark Streaming 应用程序进行优化,例如使用缓存、优化数据处理逻辑等。 七、DStream 的转换算子 DStream 提供了多种转换算子,用于实现流式数据的转换操作。常见的转换算子...

    spark Streaming和structed streaming分析

    Spark Streaming提供了一种名为DStream(Discretized Stream)的高级抽象,其将实时数据流建模为一系列小批量数据。它在处理时,将这些小批量数据转换为RDD(Resilient Distributed Dataset),这是Spark核心API的一...

    SparkStreaming和kafka的整合.pdf

    import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, ...

    Apress.Pro.Spark.Streaming.The.Zen.of.Real-Time.Analytics.Using.Apache.Spark

    Spark Streaming作为其组件之一,专门用于处理实时数据流,为实时分析提供了强大支持。 Spark Streaming的核心概念是微批处理(micro-batching),它将实时数据流分割成小批次的数据块,然后使用Spark的核心API进行...

    藏经阁-From Spark Streaming to Structured Streaming.pdf

    《藏经阁-From Spark Streaming to Structured Streaming》是一份关于大数据处理技术的文档,主要对比分析了Apache Spark Streaming和Google Dataflow,以及引出了新兴的Structured Streaming技术。以下是这份文档...

    Apress.Pro.Spark.Streaming.The.Zen.of.Real-Time.A

    《Apress.Pro.Spark.Streaming.The.Zen.of.Real-Time.A》这本书主要聚焦于Apache Spark Streaming这一实时数据处理框架,深入探讨了如何利用Spark Streaming构建高效、可靠的实时数据处理系统。Spark Streaming是...

    Apache Spark源码走读之5 -- DStream处理的容错性分析

    ### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...

    基于spark-streaming的实时数仓.zip

    Spark-Streaming是基于Spark核心计算框架的微批处理系统,它可以将实时数据流分解为小批量的数据块(DStream),然后对这些数据块进行并行处理,从而实现流式计算。这种模式既保留了批处理的效率,又满足了实时处理...

    spark-streaming课堂讲义.docx

    SparkStreaming 的基本工作原理是将实时数据流分割成多个时间窗口(micro-batches),然后使用 Spark Core 的并行和分布式处理能力对每个批次进行计算。这种方式既保留了 Spark 的内存计算优势,也实现了对实时数据...

    Apache Spark源码走读之4 -- DStream实时流数据处理

    本文档探讨了Apache Spark Streaming的核心概念之一——**DStream**(Discretized Stream)及其如何实现对实时流数据的有效处理。首先,我们需要理解流数据的基本特性,与静态文件不同的是: 1. **数据的持续变化性...

    SparkStreaming入门案例

    Spark Streaming 入门案例 Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一...

    24:Spark2.3.x Streaming实时计算.zip

    Spark Streaming构建在核心的Spark引擎之上,它将实时数据流分割成微小的批处理任务(DStreams),这些任务可以被快速调度和处理。这种方式允许Spark Streaming利用Spark的并行处理能力,实现高效且容错的流处理。 ...

    Spark Streaming实时流处理项目实战.rar.rar

    Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...

    Spark-Streaming编程指南.docx

    Spark Streaming 的强大之处在于它可以无缝集成 Spark Core、Spark SQL、MLlib 和 GraphX,允许用户在实时流处理中应用机器学习和图计算算法,实现复杂的数据分析任务。同时,通过 DStream 的窗口操作,可以实现时间...

    spark之sparkStreaming 理解

    Spark Streaming内部处理机制的核心在于将实时数据流拆分为一系列微小的批次(通常是几秒至几十秒的间隔),然后利用Spark Engine对这些微批次数据进行处理,最终产生处理后的结果数据。这种机制使得Spark Streaming...

    Spark Streaming 示例

    `SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...

    2-3-Spark+Streaming.pdf

    这与传统Spark程序一次性处理一批历史数据的方式不同,Spark Streaming可以持续不断地从数据源接收数据,并将其分割成一系列小的批处理块,这被称为离散流(DStream)。 在文档中提到的“Spark Streaming2017”可能...

Global site tag (gtag.js) - Google Analytics