`
bit1129
  • 浏览: 1067856 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark100】Spark Streaming Checkpoint的一个坑

 
阅读更多

Spark Streaming UI这块是本篇额外的内容,与主题无关,只是把它记录下来

Spark Streaming UI上一组统计数字的含义

 

Streaming

  • Started at: 1433563238275(Spark Streaming开始运行的时间)
  • Time since start: 3 minutes 51 seconds(Spark Streaming已经运行了多长时间)
  • Network receivers: 2(Receiver个数)
  • Batch interval: 1 second(每个Batch的时间间隔,即接收多长时间的数据就生成一个Batch,或者说是RDD)
  • Processed batches: 231 (已经处理的Batch个数,不管Batch中是否有数据,都会计算在内,)
  • Waiting batches: 0 (等待处理的Batch数据,如果这个值很大,表明Spark的处理速度较数据接收的速度慢,需要增加计算能力或者降低接收速度)
  • Received records: 66 (已经接收到的数据,每读取一次,读取到的所有数据称为一个record)
  • Processed records: 66 (已经处理的record)

 

(Processed batches + Waiting batches) * Batch Interval = Time Since Start

 

 

Spark Streaming Checkpoint的一个坑

 

 源代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkStreamingCheckpointEnabledTest {
  def main(args: Array[String]) {

    val checkpointDirectory = "file:///d:/data/chk_streaming"
    def funcToCreateSSC(): StreamingContext = {
      val conf = new SparkConf().setAppName("NetCatWordCount")
      conf.setMaster("local[3]")
      val ssc = new StreamingContext(conf, Seconds(1))
      ssc.checkpoint(checkpointDirectory)
      ssc
    }
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, funcToCreateSSC)
    val numStreams = 2
    val streams = (1 to numStreams).map(i => ssc.socketTextStream("localhost", 9999))
    val lines = ssc.union(streams)
    lines.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 以上代码是错误的,因为停掉Driver后再次重启,将无法启动,解决办法是将streams的操作放到funcToCreateSSC函数里,ssc返回前

 

object SparkStreamingCheckpointEnabledTest {
  def process(streams: Seq[DStream[String]], ssc: StreamingContext) {
    val lines = ssc.union(streams)
    lines.print
  }

  def main(args: Array[String]) {
    val checkpointDirectory = "file:///d:/data/chk_streaming"
    def funcToCreateSSC(): StreamingContext = {
      val conf = new SparkConf().setAppName("NetCatWordCount")
      conf.setMaster("local[3]")
      val ssc = new StreamingContext(conf, Seconds(1))
      ssc.checkpoint(checkpointDirectory)
      val numStreams = 2
      val streams = (1 to numStreams).map(i => ssc.socketTextStream("localhost", 9999))
      process(streams, ssc)
      ssc
    }
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, funcToCreateSSC)
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 

 

 

  • 大小: 54 KB
分享到:
评论

相关推荐

    Spark Streaming Programming Guide 笔记

    **Spark Streaming** 是 Apache Spark 的一个模块,它能够处理实时的数据流。与传统的批量处理不同,Spark Streaming 接收实时输入数据流,并将其划分为一系列小批次(即离散化流或 DStream),然后通过 Spark 引擎...

    Spark的checkpoint源码讲解

    Checkpoint 的应用Checkpoint 的应用非常广泛,例如在 SparkStreaming 任务中使用Checkpoint,可以快速恢复应用程序的状态。在 SparkCore 中,Checkpoint 可以用于关键点进行Checkpoint,便于故障恢复。Checkpoint ...

    06Spark Streaming原理和实践

    Spark Streaming是Apache Spark的一个扩展库,用于处理实时数据流。它通过将输入数据流分解成一系列短小的批次数据来进行处理。这些短小的批次数据称为微批次(Micro-batches),每个微批次的处理都类似于Spark中的...

    spark Streaming原理和实战

    **Spark Streaming** 是Apache Spark生态中的一个重要组件,它主要用于处理实时数据流。相比于传统的批处理技术,Spark Streaming提供了对实时数据流的支持,使得开发者能够在大数据环境中实现高效的数据处理任务。 ...

    Spark从入门到精通

    3、覆盖Spark所有功能点(Spark RDD、Spark SQL、Spark Streaming,初级功能到高级特性,一个不少); 4、Scala全程案例实战讲解(近百个趣味性案例); 5、Spark案例实战的代码,几乎都提供了Java和Scala两个版本和...

    SparkStreaming与Stom比较

    1. **非纯实时处理需求**:如果实时性不是最重要的考量因素,而是希望有一个更全面的数据处理平台,那么Spark Streaming是不错的选择。 2. **综合数据处理需求**:Spark Streaming非常适合那些不仅需要实时计算,还...

    SparkStreaming之滑动窗口的实现.zip_Spark!_spark stream 窗口_spark streamin

    Spark Streaming是Apache Spark的一个模块,专门用于处理实时数据流。在大数据处理领域,实时流处理是一种重要的技术,它能够及时地分析和响应不断到来的数据。Spark Streaming构建在Spark Core之上,利用Spark的...

    sparkStream-kafka.rar

    1. **Spark Streaming**:Spark Streaming是基于微批处理(Micro-batching)的概念,它将实时数据流分割成小的时间窗口(例如,每5秒或10秒一个批次),然后对每个批次的数据执行Spark的批处理操作。这种方式既保留...

    Apress.Pro.Spark.Streaming.The.Zen.of.Real-Time.Analytics.Using.Apache.Spark

    Apache Spark作为一个快速、通用且可扩展的大数据处理框架,因其高效性能和易用性在大数据领域备受推崇。Spark Streaming作为其组件之一,专门用于处理实时数据流,为实时分析提供了强大支持。 Spark Streaming的...

    基于Scala的Spark_Core、Spark_SQL和Spark_Streaming设计源码

    本项目基于Scala开发,包含148个文件,包括Scala源代码、CRC校验文件、TXT文本文件、以及多个checkpoint和ck文件。系统实现了基于Scala的Spark_Core、Spark_SQL和Spark_Streaming功能,界面友好,功能完善,适合用于...

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。...Spark Streaming源码阅读 动态发现新增分区 Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会

    Spark checkPoint Demo

    import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream object UpadateDemo { def main(args: Array[String]): Unit = { val conf =...

    Spark分布式内存计算框架视频教程

    9.SparkStreaming Checkpoint 10.消费Kafka偏移量管理 第六章、StructuredStreaming模块 1.StructuredStreaming 概述(核心设计和编程模型) 2.入门案例:WordCount 3.输入源InputSources 4.Streaming Query 设置 5....

    Spark一个高效的分布式计算系统

    Spark是一个由UC Berkeley AMP实验室开发并开源的分布式计算框架,其设计目标是提供高效、通用的并行计算能力,尤其适合大数据处理中的迭代计算任务。Spark借鉴了Hadoop MapReduce的思想,但在性能和灵活性上进行了...

    大数据平台spark组件说明书

    Spark Streaming是Spark的一个模块,用于处理实时数据流。它通过微批处理的方式模拟实时流处理,将数据流分解为小批次,然后用Spark的批处理引擎处理。这种方式保持了Spark的高吞吐量和低延迟特性,同时提供了复杂...

    基于scala语言的spark操作,包含连接操作mysql,连接hdfs.zip

    Spark 提供了一个交互式的 Spark Shell,允许开发者使用 Scala 语言直接在命令行中测试和运行 Spark 代码。这对于快速原型开发和数据探索非常有用。 **4. Spark Scala API** Spark Scala API 提供了一系列类和方法...

    基于Apache Spark的Scala大数据处理设计源码

    本源码为基于Apache Spark的Scala大数据处理设计,共包含191个文件,其中crc文件56个,class文件39个,scala文件20个,bk文件10个,xml文件8个,txt文件2个,properties文件2个,idea/$PRODUCT_WORKSPACE_FILE$文件1...

    藏经阁-Streaming datasets for Personalization.pdf

    Spark Streaming支持所有基本的RDD转换和操作,包括时间窗口、Checkpoint支持等。 Spark Streaming的性能调整是非常重要的,需要根据实际情况选择合适的微批间隔、集群内存、并行度和CPU数量等参数。 四、个人化...

    Spark面试2000题

    面试中,性能优化也是一个常见的话题,如了解如何调整executor的数量、内存大小、shuffle行为,以及如何利用宽依赖和窄依赖优化任务执行效率。此外,理解Spark的持久化机制,如cached和checkpoint,以及错误处理和...

    实时指标计算引擎-Spark-Part_1_杨鑫_2019-12-19.pptx

    **Spark Streaming**是Spark的一个核心组件,专门用于处理实时数据流。它将输入数据流分解成一系列的小批量数据片段,然后使用Spark的核心API来处理这些数据片段。这种方式使得Spark Streaming既能处理实时数据流,...

Global site tag (gtag.js) - Google Analytics