`
bit1129
  • 浏览: 1067881 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十四】Spark Streaming中DStream和RDD之间的关系

 
阅读更多

问题:在一个时间间隔中,Spark Streaming接收到的数据会生成几个RDD?

 

测试发现,在一个batchInterval中,会产生一个RDD,但是这个结论只是看到的现象。

如果在给定的batchInterval中,数据量非常大,Spark Streaming会产生多少个RDD,目前还不确定,只能通过看源代码才能确定了。

 

答案很确定,一个batchInterval产生且仅仅产生一个RDD。如果在这个时间间隔内,那么DStreaming仍然会对应1个RDD,不过这个RDD没有元素,即调用RDD.isEmpty为true。

当一个新的时间窗口(batchInterval)开始时,此时产生一个空的block,此后在这个窗口内接受到的数据都会累加到这个block上,当这个时间窗口结束时,停止累加,这个block对应的数据就是

这个时间窗口对应的RDD包含的数据

 

如下代码在控制台每隔五秒中输出一次,五秒钟就是Spark Stream进行处理数据接受的时间间隔

 

package spark.examples.streaming

import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object SparkStreamingForPartition {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    val count = new AtomicInteger(0)
    dstream.foreachRDD(rdd => {
      //      rdd.foreach(record => println("This is the content: " + record + "," + rdd.hashCode()))
      //      count.incrementAndGet()
      println(count.incrementAndGet() + "," + rdd.hashCode() + ", " + System.currentTimeMillis() + ", logFlag")
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

下图截取自Spark官方文档:

 

 

 其中有句话是,

 

 Each RDD in a DStream contains data from a certain interval

这里的certain interval就应该指的是生成的RDD包含的数据接收时间,上例的五秒钟。

 

 

这就导致需要思考一个问题,如果时间间隔比较大或者收取数据的速度非常快,那么必然导致在这个时间间隔里面,获取到的数据量非常大,那么会不会对内存和GC产生很大的压力?

 

Memory Tuning

Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail in the Tuning Guide. It is strongly recommended that you read that. In this section, we discuss a few tuning parameters specifically in the context of Spark Streaming applications.

The amount of cluster memory required by a Spark Streaming application depends heavily on the type of transformations used. For example, if you want to use a window operation on last 10 minutes of data, then your cluster should have sufficient memory to hold 10 minutes of worth of data in memory. Or if you want to use updateStateByKey with a large number of keys, then the necessary memory will be high. On the contrary, if you want to do a simple map-filter-store operation, then necessary memory will be low.

In general, since the data received through receivers are stored with StorageLevel.MEMORY_AND_DISK_SER_2, the data that does not fit in memory will spill over to the disk. This may reduce the performance of the streaming application, and hence it is advised to provide sufficient memory as required by your streaming application. Its best to try and see the memory usage on a small scale and estimate accordingly.

 

上面说到,默认情况下,接收到的数据尽量保存到内存,但是由于接受到的数据的持久化级别是内存+磁盘,因此在内存不够的情况下,数据会持久化到磁盘,但是当数据持久化到磁盘时,会带来严重的性能降级,因此对于Spark Streaming应用来说,应该分配足够大的内存,尤其是对于window操作以及有多个stateful key/value情形下的updateStateByKey操作

 

Another aspect of memory tuning is garbage collection. For a streaming application that require low latency, it is undesirable to have large pauses caused by JVM Garbage Collection.

There are a few parameters that can help you tune the memory usage and GC overheads.

  • Persistence Level of DStreams: As mentioned earlier in the Data Serialization section, the input data and RDDs are by default persisted as serialized bytes. This reduces both, the memory usage and GC overheads, compared to deserialized persistence. Enabling Kryo serialization further reduces serialized sizes and memory usage. Further reduction in memory usage can be achieved with compression (see the Spark configuration spark.rdd.compress), at the cost of CPU time.

  • Clearing old data: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using window operation of 10 minutes, then Spark Streaming will keep around last 10 minutes of data, and actively throw away older data. Data can be retained for longer duration (e.g. interactively querying older data) by setting streamingContext.remember.

  • CMS Garbage Collector: Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times. Make sure you set the CMS GC on both the driver (using --driver-java-options in spark-submit) and the executors (using Spark configuration spark.executor.extraJavaOptions).

  • Other tips: To further reduce GC overheads, here are some more tips to try.

    • Use Tachyon for off-heap storage of persisted RDDs. See more detail in the Spark Programming Guide.
    • Use more executors with smaller heap sizes. This will reduce the GC pressure within each JVM heap.

很可惜上面没有提到,一个时间间隔的数据是不是都放在一个RDD中(应该是的),只是说尽量放到内存(因为存储级别是 StorageLevel.MEMORY_AND_DISK_SER_2,所以内存不够后会spill到磁盘上,但是会带来较大的性能degrade)

 

 

 

 

 

 

 

 

  • 大小: 116.6 KB
分享到:
评论

相关推荐

    【SparkStreaming篇01】SparkStreaming之Dstream入门1

    SparkStreaming之Dstream入门 Spark Streaming是Apache Spark中的一个组件,用于处理流式数据。它可以从多种数据源中接收数据,如Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等,并使用Spark的高度抽象原语如...

    【SparkStreaming篇02】SparkStreaming之Dstream创建1

    在本篇中,我们将深入探讨如何使用SparkStreaming创建DStream(Discretized Stream),并实现一个简单的WordCount应用。 DStream是SparkStreaming的核心抽象,它代表了连续不断的数据块序列。DStream可以由多种方式...

    【SparkStreaming篇03】SparkStreaming之Dstream转换和输出1

    DStream的转换操作类似于Spark中的RDD转换,它们可以分为两类:无状态转换和有状态转换。无状态转换不会保存任何历史信息,每次只处理当前批次的数据,如`map`、`filter`、`reduceByKey`等。在无状态转换中,`reduce...

    spark Streaming和structed streaming分析

    Spark Streaming通过DStream来处理连续的数据流,同时DStream可以通过各种源创建,包括TCP套接字、Kafka、Flume和HDFS上的文件。 JobScheduler和JobSet是Spark Streaming中的关键组件,负责调度执行实时任务。Job...

    spark Streaming和storm的对比

    Spark Streaming的容错机制依赖于RDD的不变性和血统依赖关系。由于每个DStream被拆分成一系列的RDD,因此可以进行精确的故障恢复。如果某个处理节点失败,系统可以利用父RDD重新计算出丢失的RDD。此外,Spark还提供...

    SparkStreaming入门案例

    RDD 操作将由 Spark 核心来调度执行,DStream 屏蔽了这些细节。 本文将通过三个例子来演示 Spark Streaming 的使用:监控指定目录并处理该目录下的新文件、监控指定目录并处理该目录下的同一格式的新旧文件、窗口...

    spark之sparkStreaming 理解

    Spark Streaming内部处理机制的核心在于将实时数据流拆分为一系列微小的批次(通常是几秒至几十秒的间隔),然后利用Spark Engine对这些微批次数据进行处理,最终产生处理后的结果数据。这种机制使得Spark Streaming...

    spark core、spark sql以及spark streaming 的Scala、java项目混合框架搭建以及大数据案例

    项目中可能会展示如何创建DataFrame,执行SQL查询,以及DataFrame和RDD之间的转换。 3. **Spark Streaming**:Spark Streaming是Spark处理实时数据流的组件,它将数据流分解为小批次,然后使用Spark Core进行快速...

    2-3-Spark+Streaming.pdf

    最后,关于SparkRDD和SparkStreamingDStream的提及,说明了在Spark Streaming应用中,数据被组织在DStream这个抽象概念中。DStream是多个RDD的序列,每一个RDD代表一个时间片的数据。DStream的设计允许用户采用几乎...

    spark streaming

    Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。作为Spark核心API的一个扩展,它延续了Spark的易用性和高效性,能够将实时数据流处理与批量数据处理无缝集成在一起。利用...

    深入理解SparkStreaming执行模型

    综上所述,Spark Streaming通过其独特的微批处理执行模型,将实时数据流处理问题转换成了Spark Core中熟悉的RDD处理问题,同时结合了内存计算的优势和与其他大数据组件的良好集成性,为实时数据处理提供了强大的工具...

    Apache Spark源码走读之4 -- DStream实时流数据处理

    在Spark Streaming中,**DStream**是核心数据结构之一,它是对RDD的进一步封装,用于表示随时间变化的数据序列。具体来说,DStream实现了以下功能: 1. **数据的持久化**:通过将从网络等来源接收的数据暂存起来,...

    spark Streaming原理和实战

    **DStream (Discretized Stream)** 是Spark Streaming中的基本抽象单元,它代表了一系列连续的小批量数据(即RDD)。DStream模型将连续的数据流转换为一系列离散的时间片段数据处理,这种设计方式极大地简化了流处理...

    sparkstreaming

    DStream 是 Spark Streaming 中的核心抽象,它表示了一系列按时间顺序排列的 RDD(Resilient Distributed Datasets)。每个 DStream 都是一个连续的 RDD 序列,其中每个 RDD 包含了一段时间内收集的数据。 **...

    Spark Streaming实时流处理项目实战.rar.rar

    - DStream Graph:表示所有DStream和它们之间的transformations的关系。 - Executor:运行任务的地方,负责执行RDD的计算。 - Driver Program:创建和调度DStream Graph,通常运行在SparkContext实例上。 3. **...

    SparkStreaming和kafka的整合.pdf

    尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将重点放在Spark Streaming与Kafka的集成上。 ### Spark Streaming与Kafka集成概述 #### 1. Spark Streaming简介 Spark ...

    spark sparkStreaming sparkMaven

    通过在pom.xml文件中声明Spark、SparkStreaming和其他依赖库,Maven会自动下载并管理这些依赖,使得开发环境配置更为简便。此外,Maven还支持构建、测试和部署Spark应用,简化了整个项目的生命周期管理。 为了开发...

    spark-streaming课堂讲义.docx

    SparkStreaming 的基本工作原理是将实时数据流分割成多个时间窗口(micro-batches),然后使用 Spark Core 的并行和分布式处理能力对每个批次进行计算。这种方式既保留了 Spark 的内存计算优势,也实现了对实时数据...

    06Spark Streaming原理和实践

    Spark Streaming支持两种主要的持久化机制:检查点(Checkpoint)和内存中的状态更新。这两种机制都可以用来实现容错功能,确保在发生故障时可以从最近的状态恢复处理流程。 ##### 5. 优化 为了提高性能,Spark ...

    Spark Streaming Real-time big-data processing

    Spark Streaming的源码展示了其内部实现机制,包括如何接收数据、如何创建和操作DStream、以及如何调度和执行微批处理任务。深入理解源码可以帮助开发者优化性能,解决故障,并实现更复杂的实时流处理逻辑。 **工具...

Global site tag (gtag.js) - Google Analytics