`
m635674608
  • 浏览: 5053514 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark Streaming:性能调优

 
阅读更多
数据接收并行度调优(一)
通过网络接收数据时(比如Kafka、Flume),会将数据反序列化,并存储在Spark的内存中。如果数据接收称为系统的瓶颈,那么可以考虑并行化数据接收。每一个输入DStream都会在某个Worker的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个输入DStream,并且配置它们接收数据源不同的分区数据,达到接收多个数据流的效果。比如说,一个接收两个Kafka Topic的输入DStream,可以被拆分为两个输入DStream,每个分别接收一个topic的数据。这样就会创建两个Receiver,从而并行地接收数据,进而提升吞吐量。多个DStream可以使用union算子进行聚合,从而形成一个DStream。然后后续的transformation算子操作都针对该一个聚合后的DStream即可。
 
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
 
 
 
数据接收并行度调优(二)
数据接收并行度调优,除了创建更多输入DStream和Receiver以外,还可以考虑调节block interval。通过参数,spark.streaming.blockInterval,可以设置block interval,默认是200ms。对于大多数Receiver来说,在将接收到的数据保存到Spark的BlockManager之前,都会将数据切分为一个一个的block。而每个batch中的block数量,则决定了该batch对应的RDD的partition的数量,以及针对该RDD执行transformation操作时,创建的task的数量。每个batch对应的task数量是大约估计的,即batch interval / block interval。
 
例如说,batch interval为2s,block interval为200ms,会创建10个task。如果你认为每个batch的task数量太少,即低于每台机器的cpu core数量,那么就说明batch的task数量是不够的,因为所有的cpu资源无法完全被利用起来。要为batch增加block的数量,那么就减小block interval。然而,推荐的block interval最小值是50ms,如果低于这个数值,那么大量task的启动时间,可能会变成一个性能开销点。
 
 
 
数据接收并行度调优(三)
除了上述说的两个提升数据接收并行度的方式,还有一种方法,就是显式地对输入数据流进行重分区。使用inputStream.repartition(<number of partitions>)即可。这样就可以将接收到的batch,分布到指定数量的机器上,然后再进行进一步的操作。
 
 
 
任务启动调优
如果每秒钟启动的task过于多,比如每秒钟启动50个,那么发送这些task去Worker节点上的Executor的性能开销,会比较大,而且此时基本就很难达到毫秒级的延迟了。使用下述操作可以减少这方面的性能开销:
 
1、Task序列化:使用Kryo序列化机制来序列化task,可以减小task的大小,从而减少发送这些task到各个Worker节点上的Executor的时间。
2、执行模式:Standalone模式下运行Spark,可以达到更少的task启动时间
 
上述方式,也许可以将每个batch的处理时间减少100毫秒。从而从秒级降到毫秒级。
 
 
 
数据处理并行度调优
如果在计算的任何stage中使用的并行task的数量没有足够多,那么集群资源是无法被充分利用的。举例来说,对于分布式的reduce操作,比如reduceByKey和reduceByKeyAndWindow,默认的并行task的数量是由spark.default.parallelism参数决定的。你可以在reduceByKey等操作中,传入第二个参数,手动指定该操作的并行度,也可以调节全局的spark.default.parallelism参数。
 
 
 
数据序列化调优(一)
数据序列化造成的系统开销可以由序列化格式的优化来减小。在流式计算的场景下,有两种类型的数据需要序列化。
 
1、输入数据:默认情况下,接收到的输入数据,是存储在Executor的内存中的,使用的持久化级别是StorageLevel.MEMORY_AND_DISK_SER_2。这意味着,数据被序列化为字节从而减小GC开销,并且会复制以进行executor失败的容错。因此,数据首先会存储在内存中,然后在内存不足时会溢写到磁盘上,从而为流式计算来保存所有需要的数据。这里的序列化有明显的性能开销——Receiver必须反序列化从网络接收到的数据,然后再使用Spark的序列化格式序列化数据。
 
2、流式计算操作生成的持久化RDD:流式计算操作生成的持久化RDD,可能会持久化到内存中。例如,窗口操作默认就会将数据持久化在内存中,因为这些数据后面可能会在多个窗口中被使用,并被处理多次。然而,不像Spark Core的默认持久化级别,StorageLevel.MEMORY_ONLY,流式计算操作生成的RDD的默认持久化级别是StorageLevel.MEMORY_ONLY_SER ,默认就会减小GC开销。
 
 
 
 
数据序列化调优(二)
在上述的场景中,使用Kryo序列化类库可以减小CPU和内存的性能开销。使用Kryo时,一定要考虑注册自定义的类,并且禁用对应引用的tracking(spark.kryo.referenceTracking)。
 
在一些特殊的场景中,比如需要为流式应用保持的数据总量并不是很多,也许可以将数据以非序列化的方式进行持久化,从而减少序列化和反序列化的CPU开销,而且又不会有太昂贵的GC开销。举例来说,如果你数秒的batch interval,并且没有使用window操作,那么你可以考虑通过显式地设置持久化级别,来禁止持久化时对数据进行序列化。这样就可以减少用于序列化和反序列化的CPU性能开销,并且不用承担太多的GC开销。
 
 
batch interval调优(最重要)
如果想让一个运行在集群上的Spark Streaming应用程序可以稳定,它就必须尽可能快地处理接收到的数据。换句话说,batch应该在生成之后,就尽可能快地处理掉。对于一个应用来说,这个是不是一个问题,可以通过观察Spark UI上的batch处理时间来定。batch处理时间必须小于batch interval时间。
 
基于流式计算的本质,batch interval对于,在固定集群资源条件下,应用能保持的数据接收速率,会有巨大的影响。例如,在WordCount例子中,对于一个特定的数据接收速率,应用业务可以保证每2秒打印一次单词计数,而不是每500ms。因此batch interval需要被设置得,让预期的数据接收速率可以在生产环境中保持住。
 
为你的应用计算正确的batch大小的比较好的方法,是在一个很保守的batch interval,比如5~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据速率,可以检查每个batch的处理时间的延迟,如果处理时间与batch interval基本吻合,那么应用就是稳定的。否则,如果batch调度的延迟持续增长,那么就意味应用无法跟得上这个速率,也就是不稳定的。因此你要想有一个稳定的配置,可以尝试提升数据处理的速度,或者增加batch interval。记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢复即可。
 
 
 
内存调优(一)
优化Spark应用的内存使用和GC行为,在Spark Core的调优中,已经讲过了。这里讲一下与Spark Streaming应用相关的调优参数。
 
Spark Streaming应用需要的集群内存资源,是由使用的transformation操作类型决定的。举例来说,如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来说,如果想要做一个简单的map-filter-store操作,那么需要使用的内存就很少。
 
通常来说,通过Receiver接收到的数据,会使用StorageLevel.MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上。而溢写到磁盘上,是会降低应用的性能的。因此,通常是建议为应用提供它需要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估。
 
 
内存调优(二)
内存调优的另外一个方面是垃圾回收。对于流式应用来说,如果要获得低延迟,肯定不想要有因为JVM垃圾回收导致的长时间延迟。有很多参数可以帮助降低内存使用和GC开销:
 
1、DStream的持久化:正如在“数据序列化调优”一节中提到的,输入数据和某些操作生产的中间RDD,默认持久化时都会序列化为字节。与非序列化的方式相比,这会降低内存和GC开销。使用Kryo序列化机制可以进一步减少内存使用和GC开销。进一步降低内存使用率,可以对数据进行压缩,由spark.rdd.compress参数控制(默认false)。
 
2、清理旧数据:默认情况下,所有输入数据和通过DStream transformation操作生成的持久化RDD,会自动被清理。Spark Streaming会决定何时清理这些数据,取决于transformation操作类型。例如,你在使用窗口长度为10分钟内的window操作,Spark会保持10分钟以内的数据,时间过了以后就会清理旧数据。但是在某些特殊场景下,比如Spark SQL和Spark Streaming整合使用时,在异步开启的线程中,使用Spark SQL针对batch RDD进行执行查询。那么就需要让Spark保存更长时间的数据,直到Spark SQL查询结束。可以使用streamingContext.remember()方法来实现。
 
3、CMS垃圾回收器:使用并行的mark-sweep垃圾回收机制,被推荐使用,用来保持GC低开销。虽然并行的GC会降低吞吐量,但是还是建议使用它,来减少batch的处理时间(降低处理过程中的gc开销)。如果要使用,那么要在driver端和executor端都开启。在spark-submit中使用--driver-java-options设置;使用spark.executor.extraJavaOptions参数设置。-XX:+UseConcMarkSweepGC。

http://blog.csdn.net/kwu_ganymede/article/details/50577920

分享到:
评论

相关推荐

    Spark 编程指南简体中文版.pdf

    * 性能调优:Spark SQL 的性能调优技巧 * SQL 接口:Spark SQL 的 SQL 接口,包括 Language-Integrated 的相关查询 GraphX * GraphX:Spark 的图形处理模块 * 属性图:GraphX 的基本数据结构 * 图操作符:GraphX ...

    Spark大数据商业实战三部曲_内核解密_商业案例_性能调优 实例源码

    每个案例都详细展示了如何在实际业务场景中运用Spark,以及如何针对特定问题进行性能调优。 通过学习这套《Spark大数据商业实战三部曲》,开发者可以提升在大数据处理、实时分析和机器学习等方面的能力,为商业决策...

    SparkStreaming预研报告

    Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。以下是基于文档提供内容的详细知识点: 1. Spark Streaming简介与渊源 Spark Streaming是Spark生态中...

    Spark内核剖析+调优全套教程 附课件、代码、资料

    Spark内核深度剖析 Spark调优 SparkSQL精讲 SparkStreaming精讲 Spark2新特性

    Spark内核机制解析及性能调优

    本资料《Spark内核机制解析及性能调优》深入探讨了Spark的核心原理以及如何对其进行优化,以提升大数据处理的效率。 首先,Spark的核心组件包括Driver、Executor、RDD(弹性分布式数据集)和DAG Scheduler。Driver...

    Spark: The Definitive Guide: Big Data Processing Made Simple 英文.pdf版

    书中还将探讨Spark的性能优化策略,如Tungsten内存管理和Code Generation,以及如何在集群环境中部署和管理Spark应用程序。此外,还会有实践案例来展示如何在实际业务场景中运用Spark解决复杂问题。 对于希望深入...

    大数据各类性能调优

    - Spark Streaming的性能调优主要关注点在于: - 调整Batch Duration。 - 优化Checkpoint配置。 - 使用Window函数等。 ##### 12.9.4 Spark CBO调优 - Spark CBO的调优主要涉及: - 优化统计信息收集。 - 合理...

    Spark Streaming Systems - Tyler Akidau

    8. 性能优化:除了理论知识,书里还讲解了如何调整参数、优化执行计划,以及如何针对特定工作负载进行性能调优。 总的来说,《Spark Streaming Systems》是理解Spark Streaming原理和实践的宝贵资源,无论你是初学...

    Spark从入门到精通

    本课程主要讲解的内容包括:Scala编程、Hadoop与Spark集群搭建、Spark核心编程、Spark内核源码深度剖析、Spark性能调优、Spark SQL、Spark Streaming。 本课程的最大特色包括: 1、代码驱动讲解Spark的各个技术点...

    SparkStreaming原理介绍

    ##### 2.3 容错、持久化和性能调优 **2.3.1 容错** Spark Streaming 通过以下几种方式实现了容错: 1. **Checkpointing**:定期将 DStream 的状态信息保存到可靠的存储系统中,以便在发生故障时恢复。 2. **RDD ...

    基于Spark Streaming + Kafka + Flume 实现的日志收集处理系统.zip

    《Spark Streaming + Kafka + Flume 日志收集处理系统的构建与应用》 在大数据处理领域,实时数据流处理已经成为不可或缺的一部分。Spark Streaming、Kafka和Flume作为三个关键组件,共同构建了一个高效、可靠且可...

    大数据技术分享 Spark技术讲座 深入探索具有高级性能调优的SQL Spark 共45页.pdf

    ### 大数据技术分享:深入探索Spark SQL与高级性能调优 #### 一、引言 随着大数据时代的到来,高效处理海量数据成为企业面临的重要挑战之一。Apache Spark作为一款开源的大规模数据处理框架,因其出色的性能表现而...

    深入理解spark:核心思想与源码分析 高清版本

    9. 性能优化:Spark的性能优化是学习的重点之一,包括配置调优、内存管理、shuffle操作优化等,这些技巧能够显著提升Spark应用程序的运行效率。 10. 实战案例:作为优秀的教材,该书很可能包含丰富的实战案例,通过...

    深入理解Spark:核心思想及源码分析.pdf

    Spark的性能优化是重要一环,包括配置调优、内存管理优化、数据序列化、减少shuffle操作等。理解这些优化策略对于实际项目中提升Spark性能至关重要。 8. **案例研究**: 可能会包含一些实际应用案例,如推荐系统...

    Spark大数据处理:技术、应用与性能优化(全)

    本资源“Spark大数据处理:技术、应用与性能优化(全)”全面覆盖了Spark的核心技术、实际应用场景以及性能调优策略,旨在帮助读者深入理解并熟练运用Spark。 1. Spark核心技术: - RDD(弹性分布式数据集):Spark...

    spark入门实战

    - **深入了解运行计划及调优:**详细解析了Spark SQL的查询优化策略,包括如何制定有效的查询计划来提高性能。 - **Spark实战应用:**通过具体案例展示了如何利用Spark SQL进行数据分析。 - **2.5 Spark ...

    spark调优.rar

    1. **资源调优**:Spark运行在分布式环境中,资源管理是性能优化的基础。这包括合理分配Executor的数量、内存和CPU核心。Executor是Spark执行任务的工作单元,它们负责执行任务并缓存数据。增加Executor数量可以减少...

    Spark The Definitive Guide-201712

    7. **Spark性能调优**:书中会讲解如何优化Spark应用程序,包括配置参数、内存管理、数据序列化和缓存策略等。 8. **Spark部署**:Spark可以运行在多种模式下,如本地模式、standalone模式、Mesos、YARN或...

    基于Spark Streaming将图片以流的方式写入HDFS分布式文件系统.zip

    6. 监控与性能优化:通过监控系统性能指标,如延迟、吞吐量等,对系统进行调优,确保实时处理性能。 四、系统优势 该系统利用Spark Streaming的实时处理能力和HDFS的分布式存储能力,可以高效地处理大量图片数据,...

Global site tag (gtag.js) - Google Analytics