`
m635674608
  • 浏览: 5060429 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark Streaming原理简析

 
阅读更多

执行流程

数据的接收

StreamingContext实例化的时候,需要传入一个SparkContext,然后指定要连接的spark matser url,即连接一个spark engine,用于获得executor

实例化之后,首先,要指定一个接收数据的方式,如

val lines = ssc.socketTextStream("localhost"9999)

  • 1

    这样从socket接收文本数据。这个步骤返回的是一个ReceiverInputDStream的实现,内含Receiver,可接收数据并转化为RDD放内存里。

    ReceiverInputDStream有一个需要子类实现的方法

    def getReceiver(): Receiver[T]

  • 1

    子类实现这个方法,worker节点调用后能得到Receiver,使得数据接收的工作能分布到worker上。

    如果是local跑,由于Receiver接收数据在本地,所以在启动streaming application的时候,要注意分配的core数目要大于Receiver数目,才能腾出cpu做计算任务的调度。

    Receiver需要子类实现

    def onStart()def onStop()

  • 1
  • 2

    来定义一个数据接收器的初始化、接收到数据后如何存、如何在结束的时候释放资源。

    Receiver提供了一系列store()接口,如store(ByteBuffer)store(Iterator)等等。这些store接口是实现好了的,会由worker节点上初始化的ReceiverSupervisor来完成这些存储功能。ReceiverSupervisor还会对Receiver做监控,如监控是否启动了、是否停止了、是否要重启、汇报error等等。

    ReceiverSupervisor的存储接口的实现,借助的是BlockManager,数据会以RDD的形式被存放,根据StorageLevel选择不同存放策略。默认是序列化后存内存,放不下的话写磁盘(executor)。被计算出来的RDD中间结果,默认存放策略是序列化后只存内存。

    ReceiverSupervisor在做putBlock操作的时候,会首先借助BlockManager存好数据,然后往ReceiverTracker发送一个AddBlock的消息。ReceiverTracker内部的ReceivedBlockTracker用于维护一个receiver接收到的所有block信息,即BlockInfo,所以AddBlock会把信息存放在ReceivedBlockTracker里。未来需要计算的时候,ReceiverTracker根据streamId,从ReceivedBlockTracker取出对应的block列表。

    RateLimiter帮助控制Receiver速度,spark.streaming.receiver.maxRate参数。

    数据源方面,普通的数据源为file, socket, akka, RDDs。高级数据源为Twitter, Kafka, Flume等。开发者也可以自己定制数据源。

    任务调度

    JobSchedulercontext里初始化。当context start的时候,触发schedulerstart

    schedulerstart触发了ReceiverTrackerJobGeneratorstart。这两个类是任务调度的重点。前者在worker上启动Receiver接收数据,并且暴露接口能够根据streamId获得对应的一批Block地址。后者基于数据和时间来生成任务描述。

    JobScheduler内含一个线程池,用于调度任务执行。spark.streaming.concurrentJobs可以控制job并发度,默认是1,即它只能一个一个提job

    job来自JobGenerator生成的JobSetJobGenerator根据时间,生成job并且执行cp

    JobGenerator的生成job逻辑:

    调用ReceiverTrackerallocateBlocksToBatch方法,为本批数据分配好block,即准备好数据

    间接调用DStreamgenerateJob(time)方法,制造可执行的RDD

    DStream切分RDD和生成可执行的RDD,即getOrCompute(time)

    如果这个时间点的RDD已经生成好了,那么从内存hashmap里拿出来,否则下一步

    如果时间是批次间隔的整数倍,则下一步,否则这个时间点不切

    调用DStream的子类的compute方法,得到RDD。可能是一个RDD,也可以是个RDD列表

    对每个RDD,调用persist方法,制定默认的存储策略。如果时间点合适,同时调用RDDcheckpoint方法,制定好cp策略

    得到这些RDD后,调用SparkContext.runJob(rdd, emptyFunction)。把这整个变成一个function,生成Job类。未来会在executor上触发其runJob

    JobGenerator成功生成job后,调用JobScheduler.submitJobSet(JobSet)JobScheduler会使用线程池提交JobSet中的所有job。该方法调用结束后,JobGenerator发送一个DoCheckpoint的消息,注意这里的cpdriver端元数据的cp,而不是RDD本身的cp。如果time合适,会触发cp操作,内部的CheckpointWriter类会完成write(streamingContext, time)

    JobScheduler提交job的线程里,触发了jobrun()方法,同时,job跑完后,JobScheduler处理JobCompleted(job)。如果job跑成功了,调用JobSethandleJobCompletion(Job),做些计时和数数工作,如果整个JobSet完成了,调用JobGeneratoronBatchCompletion(time)方法,JobGenerator接着会做clearMetadata的工作,然后JobScheduler打印输出;如果job跑失败了,JobScheduler汇报error,最后会在context里抛异常。

    更多说明

    特殊操作

  • transform:可以与外部RDD交互,比如做维表的join
  • updateStateByKey:生成StateDStream,比如做增量计算。WordCount例子
  • 每一批都需要与增量RDD进行一次cogroup之后,然后执行update function。两个RDDcogroup过程有些开销:RDD[K, V]RDD[K, U]合成RDD[K, List[V], List[U]]List[U]一般size1,理解为oldvalue,即RDD[K, batchValueList, Option[oldValue]]。然后update function处理完,变成RDD[K, newValue]
  • 批与批之间严格有序,即增量合并操作,是有序的,批之间没发并发
  • 增量RDD的分区数可以开大,即这步增量的计算可以调大并发
  • windowbatch sizewindow length, sliding interval三个参数组成的滑窗操作。把多个批次的RDD合并成一个UnionRDD进行计算。
  • foreachRDD: 这个操作是一个输出操作,比较特殊。


    /**

    * Apply a function to each RDD in this DStream. This is an output operator, so

    * 'this' DStream will be registered as an output stream and therefore materialized.

    */


    def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()

    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

    DStream.foreachRDD()操作使开发者可以直接控制RDD的计算逻辑,而不是通过DStream映射过去。所以借助这个方法,可以实现MLlib, Spark SQLStreaming的集合,如:结合Spark SQLDataFrameWordcount

    Cache

    如果是window操作,默认接收的数据都persist在内存里。

    如果是flume, kafka源头,默认接收的数据replicate成两份存起来。

    Checkpoint

    state有关的流计算,计算出来的结果RDD,会被cpHDFS上,原文如下:

    Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depends on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

    cp的时间间隔也可以设定,可以多批做一次cp

    cp的操作是同步的。

    简单的不带state操作的流任务,可以不开启cp

    driver端的metadata也有cp策略。driver cp的时候是将整个StreamingContext对象写到了可靠存储里。

http://www.superwu.cn/?p=1926

分享到:
评论

相关推荐

    06Spark Streaming原理和实践

    ### Spark Streaming原理与实践 #### 一、为什么需要流处理? 传统的批处理框架如MapReduce在处理实时数据流时存在一些局限性,主要是因为每次处理一批数据就需要启动一个新任务,而任务的启动过程(包括输入切分...

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    基于Spark Streaming的大数据实时流计算平台和框架,并且是基于运行在yarn模式运行的spark streaming

    一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...

    spark Streaming原理和实战

    ### Spark Streaming原理与实战 #### 一、Spark Streaming概述 **Spark Streaming** 是Apache Spark生态中的一个重要组件,它主要用于处理实时数据流。相比于传统的批处理技术,Spark Streaming提供了对实时数据流...

    SparkStreaming原理介绍

    ### Spark Streaming 原理详解 #### 1. Spark Streaming 简介 ##### 1.1 概述 Spark Streaming 是 Apache Spark 生态系统中的一个重要组成部分,它为实时流数据处理提供了一套完整的解决方案。相比于传统的批处理...

    kafka+spark streaming开发文档

    kafka+Spark Streaming开发文档 本文档主要讲解了使用Kafka和Spark Streaming进行实时数据处理的开发文档,涵盖了Kafka集群的搭建、Spark Streaming的配置和开发等内容。 一、Kafka集群搭建 首先,需要安装Kafka...

    spark Streaming和structed streaming分析

    Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...

    SparkStreaming预研报告

    Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。以下是基于文档提供内容的详细知识点: 1. Spark Streaming简介与渊源 Spark Streaming是Spark生态中...

    Hadoop原理与技术Spark Streaming操作实验

    2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)Spark Streaming处理套接字流 1:编写...

    sparkStreaming消费数据不丢失

    sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失

    Flume对接Spark Streaming的相关jar包

    在大数据处理领域,Flume 和 Spark Streaming 是两个重要的工具,它们分别用于数据收集与实时流处理。本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark ...

    SparkStreaming流式日志过滤与分析

    (1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...

    spark之sparkStreaming 理解

    ### Spark Streaming工作原理 #### 四、工作流程 1. **实时数据接收**:Spark Streaming接收实时输入数据流。 2. **数据分批**:将数据流分割成一系列微小的批次数据。 3. **数据处理**:利用Spark Engine处理这些...

    SparkStreaming入门案例

    Spark Streaming 入门案例 Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一...

    spark streaming相关15篇论文,包含几篇硕士论文,包含几篇期刊论,有的结合自然语言处理

    Spark Streaming 是 Apache Spark 的一个模块,专为实时数据流处理设计。它允许开发人员使用类似于...通过阅读这些论文,读者不仅可以学习到Spark Streaming的基本原理,还能了解到相关领域的前沿技术和实践经验。

    spark streaming

    首先,需要了解Spark Streaming的工作原理。Spark Streaming使用微批处理(micro-batching)模型来处理实时数据流。这种方式将流数据拆分成一系列小批次(batch),每个批次都会被当作一个小的批处理任务来处理。每...

    深入理解SparkStreaming执行模型

    Spark Streaming是Apache Spark的重要组成部分,它提供了一种高吞吐量、可容错的实时数据处理方式。Spark Streaming的核心是一个执行模型,这个执行模型基于微批处理(micro-batch processing)的概念,允许将实时数据...

    spark Streaming和storm的对比

    以下将详细介绍Spark Streaming和Storm的核心原理,并进行对比分析。 首先来了解Spark Streaming,它是由Spark提供的一种实时数据处理框架。Spark Streaming利用了Spark强大的批处理能力,并将流式计算转换成一系列...

    Spark Streaming 示例

    Spark Streaming 是 Apache Spark 的一个模块,它允许开发者处理实时数据流。这个强大的工具提供了一种弹性、容错性好且易于编程的模型,用于构建实时流处理应用。在这个"Spark Streaming 示例"中,我们将深入探讨...

    SparkStreaming和kafka的整合.pdf

    通过以上介绍,我们了解到Spark Streaming与Kafka结合使用的基本原理及其实现方式。这种方式能够高效地处理大规模的实时数据流,并支持复杂的业务逻辑处理。在实际应用中,还可以结合其他技术如Hadoop HDFS或Elastic...

Global site tag (gtag.js) - Google Analytics