`
student_lp
  • 浏览: 436553 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

1.   Spark Streaming

提到spark streaming,我们就必须了解一下BDASBerkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈。从它的视角来看,目前的大数据处理可以分为如下三个类型:

  • 复杂的批量数据处理(batch data processing),通常的时间跨度在数十分钟到数小时之间;
  • 基于历史数据的交互式查询(interactive query),通常的时间跨度在数十秒到数分钟之间;
  • 基于实时数据流的数据处理(streaming data processing),通常的时间跨度在数百毫秒到数秒之间;

目前已经有很多相对成熟的开源软件来处理以上三种情况,我们可以利用:MapReduce来进行批量数据处理;可以用Impala进行交互式查询;对于流式数据处理,我们可以采用storm。对于大多数互联网公司来说,一般都会同时遇到以上三种情况,那么在使用的过程中这些公司可能会遇到如下的不便:

  • 三种情况的输入输出数据无法无缝共享,需要进行格式相互转换;
  • 每个开源软件都需要一个开发和维护团队,提高了成本;
  • 在同一个集群中对各个系统协调资源分配比较困难;

BDAS就是以Spark为基础的一套软件栈,利用基于内存的通用计算模型将以上三种情景一网打尽,同时支持BatchInteractiveStreaming的处理,且兼容支持HDFSS3等分布式文件系统,可以部署在YARNMesos等流行的集群资源管理器之上。BDAS的构架如下图所示,其中Spark可以替代MapReduce进行批处理,利用其基于内存的特点,特别擅长迭代式和交互式数据处理;Shark处理大规模数据的SQL查询,兼容HiveHQL。本文要重点介绍的Spark Streaming,在整个BDAS中进行大规模流式处理。


1.1.  Spark Streaming架构

1.1.1.计算流程

Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDDResilient Distributed Dataset),然后将Spark Streaming中对DStreamTransformation操作变为针对Spark中对RDDTransformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加,或者存储到外部设备。下图显示了Spark Streaming的整个流程。


1.1.2.容错性

对于流式计算来说,容错性至关重要。首先我们要明确一下SparkRDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。


对于Spark Streaming来说,其RDD的传承关系如上图所示,图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性。所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。 

1.1.3.实时性

对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解,以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。

1.1.4.扩展性与吞吐量

Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm25倍,下图是Berkeley利用WordCountGrep两个用例所做的测试,在Grep这个测试中,Spark Streaming中的每个节点的吞吐量是670k records/s,而Storm115k records/s


1.2. Spark Streaming的编程模型 

Spark Streaming的编程和Spark的编程如出一辙,对于编程的理解也非常类似。对于Spark来说,编程就是对于RDD的操作;而对于Spark Streaming来说,就是对DStream的操作。下面将通过一个大家熟悉的WordCount的例子来说明Spark Streaming中的输入操作、转换操作和输出操作。 

1.2.1.Spark Streaming初始化

在开始进行DStream操作之前,需要对Spark Streaming进行初始化生成StreamingContext。参数中比较重要的是第一个和第三个,第一个参数是指定Spark Streaming运行的集群地址,而第三个参数是指定Spark Streaming运行时的batch窗口大小。在这个例子中就是将1秒钟的输入数据进行一次Spark Job处理。

val ssc = new StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes], [Jars]) 

1.2.2.  Spark Streaming的输入操作

目前Spark Streaming已支持了丰富的输入接口,大致分为两类:一类是磁盘输入,如以batch size作为时间间隔监控HDFS文件系统的某个目录,将目录中内容的变化作为Spark Streaming的输入;另一类就是网络流的方式,目前支持KafkaFlumeTwitterTCP socket。在WordCount例子中,假定通过网络socket作为输入流,监听某个特定的端口,最后得出输入DStreamlines)。

val lines = ssc.socketTextStream(“localhost”,8888)

1.2.3.Spark Streaming的转换操作

Spark RDD的操作极为类似,Spark Streaming也就是通过转换操作将一个或多个DStream转换成新的DStream。常用的操作包括mapfilterflatmapjoin,以及需要进行shuffle操作的groupByKey/reduceByKey等。在WordCount例子中,我们首先需要将DStream(lines)切分成单词,然后将相同单词的数量进行叠加, 最终得到的wordCounts就是每一个batch size的(单词,数量)中间结果。 

val words = lines.flatMap(_.split(“ ”))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

另外,Spark Streaming有特定的窗口操作,窗口操作涉及两个参数:一个是滑动窗口的宽度(Window Duration);另一个是窗口滑动的频率(Slide Duration),这两个参数必须是batch size的倍数。例如以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,那么我们会将过去5秒钟的每一秒钟的WordCount都进行统计,然后进行叠加,得出这个窗口中的单词统计。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s)seconds(1))

但上面这种方式还不够高效。如果我们以增量的方式来计算就更加高效,例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3t+4]的统计量,在减去[t-2t-1]的统计量(如图5所示),这种方法可以复用中间三秒的统计量,提高统计的效率。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s)seconds(1))


1.2.4.Spark Streaming的输出操作

对于输出操作,Spark提供了将数据打印到屏幕及输入到文件中。在WordCount中我们将DStream wordCounts输入到HDFS文件中。

wordCounts = saveAsHadoopFiles(“WordCount”)

1.2.5.Spark Streaming启动

经过上述的操作,Spark Streaming还没有进行工作,我们还需要调用Start操作,Spark Streaming才开始监听相应的端口,然后收取数据,并进行统计。

ssc.start()

1.3. Spark Streaming案例分析 

在互联网应用中,网站流量统计作为一种常用的应用模式,需要在不同粒度上对不同数据进行统计,既有实时性的需求,又需要涉及到聚合、去重、连接等较为复杂的统计需求。传统上,若是使用Hadoop MapReduce框架,虽然可以容易地实现较为复杂的统计需求,但实时性却无法得到保证;反之若是采用Storm这样的流式框架,实时性虽可以得到保证,但需求的实现复杂度也大大提高了。Spark Streaming在两者之间找到了一个平衡点,能够以准实时的方式容易地实现较为复杂的统计需求。

1.3.1.  KafkaSpark Streaming统计框架

下面介绍一下使用KafkaSpark Streaming搭建实时流量统计框架。 

  • 数据暂存:Kafka作为分布式消息队列,既有非常优秀的吞吐量,又有较高的可靠性和扩展性,在这里采用Kafka作为日志传递中间件来接收日志,抓取客户端发送的流量日志,同时接受Spark Streaming的请求,将流量日志按序发送给Spark Streaming集群。
  • 数据处理:将Spark Streaming集群与Kafka集群对接,Spark StreamingKafka集群中获取流量日志并进行处理。Spark Streaming会实时地从Kafka集群中获取数据并将其存储在内部的可用内存空间中。当每一个batch窗口到来时,便对这些数据进行处理。 
  • 结果存储:为了便于前端展示和页面请求,处理得到的结果将写入到数据库中。

1.3.2.Kafka+Spark Streaming架构优点

相比于传统的处理框架,Kafka+Spark Streaming的架构有以下几个优点。

  • Spark框架的高效和低延迟保证了Spark Streaming操作的准实时性。
  • 利用Spark框架提供的丰富API和高灵活性,可以精简地写出较为复杂的算法。 
  • 编程模型的高度一致使得上手Spark Streaming相当容易,同时也可以保证业务逻辑在实时处理和批处理上的复用。 

在基于Kafka+Spark Streaming的流量统计应用运行过程中,有时会遇到内存不足、GC阻塞等各种问题。下面介绍一下如何对Spark Streaming应用程序进行调优来减少甚至避免这些问题的影响。 

1.3.3.性能调优

    优化运行时间

  • 增加并行度。确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。
  • 减少数据序列化、反序列化的负担。Spark Streaming默认将接收到的数据序列化后存储以减少内存的使用。但序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的序列化接口可以更高效地使用CPU 
  • 设置合理的batch窗口。在Spark Streaming中,Job之间有可能存在着依赖关系,后面的Job必须确保前面的Job执行结束后才能提交。若前面的Job执行时间超出了设置的batch窗口,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此,设置一个合理的batch窗口确保Job能够在这个batch窗口中结束是必须的。 
  • 减少任务提交和分发所带来的负担。通常情况下Akka框架能够高效地确保任务及时分发,但当batch窗口非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常会比使用Fine-Grained Mesos模式有更小的延迟。 

    优化内存使用

  • 控制batch sizeSpark Streaming会把batch窗口内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存至少能够容纳这个batch窗口内所有的数据,否则必须增加新的资源以提高集群的处理能力。
  • 及时清理不再使用的数据。上面说到Spark Streaming会将接收到的数据全部存储于内部的可用内存区域中,因此对于处理过的不再需要的数据应及时清理以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据。 
  • 观察及适当调整GC策略。GC会影响Job的正常运行,延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采取不同的GC策略以进一步减小内存回收对Job运行的影响。 

1.4.  总结 

Spark Streaming提供了一套高效、可容错的准实时大规模流式处理框架,它能和批处理及即时查询放在同一个软件栈中,降低学习成本。如果你学会了Spark编程,那么也就学会了Spark Streaming编程,如果理解了Spark的调度和存储,那么Spark Streaming也类似。对开源软件感兴趣的读者,我们可以一起贡献社区。目前Spark已在Apache孵化器中。按照目前的发展趋势,Spark Streaming一定将会得到更大范围的使用。 

1.5.编程实例

1.5.1.增量分析

package com.cyou.spark

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel

object FlumeCount {

  def main(args: Array[String]): Unit = {
    if(args.length < 2){
      System.out.println("Plase input <hostname> and <port>!")
      System.exit(1)
    }
    val Array(host,port) = args
    val batchInterval = Seconds(5)
    val sparkConf = new SparkConf().setMaster("spark://20.hadoop.cyou.com:7077").setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    
    ssc.start()
    ssc.awaitTermination()
  }

}

1.5.2.窗口累计分析

package com.cyou.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.Minutes

object WindowCount {
  
	def main(args: Array[String]): Unit = {
	    if(args.length < 2){
	      System.out.println("Plase input <hostname> and <port>!")
	      System.exit(1)
	    }
	    val Array(host,port) = args
	    val batchInterval = Seconds(1)
	    val sparkConf = new SparkConf().setMaster("spark://20.hadoop.cyou.com:7077").setAppName("WindowEventCount")
	    val ssc = new StreamingContext(sparkConf, batchInterval)
	    ssc.checkpoint("hdfs://54.hadoop.cyou.com:8020//logs/spark/windows")
	    val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
	    stream.countByWindow(Minutes(60),Seconds(5)).map(cnt => "Received " + cnt + " flume events." ).print()
	    
	    ssc.start()
	    ssc.awaitTermination()
  }
}
  • 大小: 116.7 KB
  • 大小: 183.1 KB
  • 大小: 132.4 KB
  • 大小: 87.2 KB
  • 大小: 100.8 KB
分享到:
评论

相关推荐

    spark之sparkStreaming 理解

    ### Spark Streaming概述 #### 一、Spark Streaming定义与特点 **Spark Streaming** 是Apache Spark生态中的一个重要组件,它主要用于处理实时数据流。该模块构建在基础Spark API之上,旨在实现可扩展、高吞吐量...

    SparkStreaming原理介绍

    ### Spark Streaming 原理详解 #### 1. Spark Streaming 简介 ##### 1.1 概述 Spark Streaming 是 Apache Spark 生态系统中的一个重要组成部分,它为实时流数据处理提供了一套完整的解决方案。相比于传统的批处理...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    《基于Spark Streaming、Kafka与HBase的日志统计分析系统详解》 在现代大数据处理领域,实时数据流处理已经成为不可或缺的一部分。本项目“基于Spark Streaming和Kafka,Hbase的日志统计分析系统”是一个综合性的...

    spark-mqtt-sample:使用Spark Streaming的简单MQTT客户端

    **Spark Streaming MQTT客户端详解** 在当今的大数据处理领域,Apache Spark以其高效、实时的数据处理能力备受青睐。Spark Streaming是Spark框架的一个组件,专门用于处理连续的数据流,它提供了丰富的API来构建...

    Spark Streaming

    #### 三、Spark Streaming 实现方案详解 **1. 数据源** Spark Streaming 支持多种数据源,包括但不限于 Kafka Streams、Flume Streams、File Streams 和 Network Streams。在这个案例中,使用的是自定义的数据源 ...

    基于ELK Stack 和 Spark Streaming 的日志处理平台.docx

    【基于ELK Stack 和 Spark Streaming的日志处理平台】 在大数据时代,日志处理变得至关重要,因为海量数据的生成需要高效、可靠且具有实时性的解决方案。ELK Stack(Elasticsearch, Logstash, Kibana)和Spark ...

    基于Spark streaming+Kafka+RedisHBase的GBDT+LR推荐排序模型.zip

    《基于Spark Streaming+Kafka+RedisHBase的GBDT+LR推荐排序模型详解》 在当前大数据时代,人工智能(AI)技术已经深入到各个领域,其中推荐系统作为一项重要的应用,已经成为电商、社交网络等平台提升用户体验的...

    SparkStreamingKafka:Spark Streaming日志到kafka

    **Spark Streaming与Kafka集成详解** Spark Streaming是Apache Spark项目的一部分,它提供了一个高级抽象来处理实时数据流。Kafka是一种流行的分布式消息系统,用于在应用程序之间高效地传输大量数据。将Spark ...

    Spark-Streaming编程指南.docx

    ### Spark Streaming 编程指南详解 #### 概览 Spark Streaming是Apache Spark生态系统中的一个重要组成部分,主要用于处理实时数据流。其核心能力在于能够提供高吞吐量与容错机制,适用于大规模实时数据处理场景。...

    spark详解 PDF 下载

    通过《Spark详解》这本书,读者将能够了解Spark的安装与配置,学习如何使用Java API创建Spark程序,掌握Spark SQL和Spark Streaming的基本用法,以及如何在实际项目中应用Spark进行大规模数据处理和分析。...

    基于spark的电影点评系统

    **基于Spark的电影点评系统详解** 本项目是一个大三下学期的课程设计,核心是利用Apache Spark构建一个电影点评系统,旨在对用户的行为数据进行分析,以便为用户提供个性化的电影推荐。Spark作为大数据处理框架,以...

    Spark从入门到精通

    本课程主要讲解的内容包括:Scala编程、Hadoop与Spark集群搭建、Spark核心编程、Spark内核源码深度剖析、Spark性能调优、Spark SQL、Spark Streaming。 本课程的最大特色包括: 1、代码驱动讲解Spark的各个技术点...

    Spark源码剖析

    2. **Spark 核心组件**:Spark 包含多个组件,如 Spark Core、Spark SQL、Spark Streaming、MLlib 和 GraphX。Spark Core 提供基本的调度、内存管理和故障恢复功能;Spark SQL 提供了 SQL 查询接口,支持与多种数据...

    Hadoop+Spark生态详解.zip

    这份“Hadoop+Spark生态详解.zip”压缩包文件提供了关于这两个生态系统的详细介绍,以及相关的实战应用。 Hadoop是Apache基金会开发的一个开源框架,主要用于处理和存储大量数据。其核心组件包括HDFS(Hadoop ...

    基于spark-streaming的实时数仓.zip

    《基于Spark-Streaming的实时数仓构建详解》 在当今大数据时代,实时数据处理与分析已经成为企业业务发展的关键。Spark作为一个高效、通用的大数据处理框架,因其强大的性能和易用性,深受业界青睐。其中,Spark-...

    spark原理与调优详解

    spark原理与调优详解 Spark 是一种基于内存的分布式计算框架,旨在高效地处理大规模数据。下面是 Spark 的原理和调优详解。 Spark 背景和安装 Spark 的产生背景是为了解决传统 MapReduce 框架的不足之处,如计算...

    Spark技术内幕

    此外,源码详解部分会深入到Spark的内部机制,比如任务调度算法、内存管理策略和数据序列化等。 Spark的源码阅读对于深入理解其工作原理至关重要。读者可以从中了解到Spark如何实现高效的分布式计算,以及如何通过...

    spark全套学习资料.zip

    04_尚硅谷大数据技术之SparkStreaming.docx详细阐述了DStream的概念、窗口操作、状态管理和容错机制,以及与其他流处理框架的对比。 四、Spark内核详解 Spark的高性能主要得益于其内存计算模型。05_尚硅谷大数据...

Global site tag (gtag.js) - Google Analytics