`
m635674608
  • 浏览: 5028637 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark Streaming实践和优化

 
阅读更多

在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎。其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎。如图1所示,Spark Streaming支持的数据源有很多,如Kafka、Flume、TCP等。Spark Streaming的内部数据表示形式为DStream(Discretized Stream,离散的数据流),其接口设计与RDD非常相似,这使得它对Spark用户非常友好。Spark Streaming的核心思想是把流式处理转化为“微批处理”,即以时间为单位切分数据流,每个切片内的数据对应一个RDD,进而可以采用Spark引擎进行快速计算。由于Spark Streaming采用了微批处理方式,因此严格来说只是一个近实时的处理系统,而不是真正的流式处理系统。

图片描述

 

图1 Spark Streaming数据流

 

Storm是这个领域另一个著名的开源流式计算引擎,这是一个真正的流式处理系统,它每次从数据源读一条数据,然后单独处理。相比于Spark Streaming,Storm有更快速的响应时间(小于一秒),更适合低延迟的应用场景,比如信用卡欺诈系统,广告系统等。但是对比Storm,Spark Streaming的优势是吞吐量大,响应时间也可以接受(秒级),并且兼容Spark系统中的其他工具库,如MLlib和GraphX。从而,对于时间不敏感且流量很大的系统,Spark Streaming是更优的选择。

Spark Streaming在Hulu应用

Hulu是美国的专业在线视频网站,每天会有大量用户在线观看视频,进而产生大量用户观看的行为数据。这些数据通过收集系统进入Hulu的大数据平台,存储并做进一步处理。在大数据平台之上,各个团队会根据需要设计相应的算法对数据进行分析和挖掘以便产生商业价值:推荐团队从这些数据里挖掘出用户感兴趣的内容并做精准推荐,广告团队根据用户的历史行为推送最合适的广告,数据团队从数据的各个维度进行分析从而为公司的策略制定提供可靠依据。

Hulu大数据平台的实现依循Lambda架构。Lambda架构是一个通用的大数据处理框架,包含离线的批处理层、在线的加速层和服务层三部分,具体如图2所示。服务层一般使用HTTP服务或自定制的客户端对外提供数据访问,离线的批处理层一般使用批处理计算框架Spark和MapReduce进行数据分析,在线的加速层一般使用流式实时计算框架Spark Streaming和Storm进行数据分析。

图片描述

 

图2 lambda架构原理图

 

图片描述

图3 Hulu数据收集流程

 

对于实时计算部分,Hulu内部使用了Kafka、Codis和Spark Streaming。下面按照数据流的过程,介绍我们的项目。

收集数据

从服务器日志中收集数据,主要包括两个部分:

  • 来自网页、手机App、机顶盒等设备用户产生的视频观看、广告点击等行为,这些行为数据记录在各自的Nginx服务的日志中。
  • 使用Flume将用户行为数据同时导入HDFS和Kafka,其中HDFS中的数据用于离线分析,而Kafka中数据则用于流式实时分析。

存储标签数据

Hulu使用Hbase存储用户标签数据,包括基本信息如性别、年龄、是否付费,以及其他模型推测出来的偏好属性。这些属性需要作为计算模型的输入,同时HBase随机读取的速度比较慢,需要将数据同步到缓存服务器中以加快数据读取速度。Redis是一个应用广泛的开源缓存服务器,但其本身是个单机系统,不能很好地支持大量数据的缓存。为解决Redis扩展性差的问题,豌豆荚开源了Codis,一个分布式Redis解决方案。Hulu将Codis打成Docker镜像,并实现一键式构建缓存系统,附带自动监控和修复功能。为了更精细的监控,Hulu构建了多个Codis缓存,分别是:

  • codis-profile,同步HBase中的用户属性;
  • codis-action,缓存来自Kafka的用户行为;
  • codis-result,记录计算结果。

实时处理数据

在一切准备就绪,启动Spark Streaming程序:

  • Spark Streaming启动Kafka Receiver,持续地从Kafka服务器拉取数据;
  • 每隔两秒,Kafka的数据被整理成一个RDD,交给Spark引擎处理;
  • 对一条用户行为,Spark会从codis-action缓存中拿到该用户的行为记录,然后把新的行为追加进去;
  • Spark从codis-action和codis-profile中获得该用户的所有相关属性,然后执行广告和推荐的计算模型,最后把结果写入codis-result,进而供服务层实时读取这些结果。

Spark Streaming优化经验

实践中,业务逻辑首先保证完成,使得在Kafka输入数据量较小的情况下系统稳定运行,且输入输出满足项目需求。然后开始调优,修改Spark Streaming的参数,比如Executor的数量,Core的数量,Receiver的流量等。最后发现仅调参数无法完全满足本项目的业务场景,所以有更进一步的优化方案,总结如下。

Executor初始化

很多机器学习的模型在第一次运行时,需要执行初始化方法,还会连接外部的数据库,常常需要5-10分钟,这会成为潜在的不稳定因素。在Spark Streaming应用中,当Receiver完成初始化,它就开始源源不断地接收数据,并且由Driver定期调度任务消耗这些数据。如果刚启动时Executor需要几分钟做准备,会导致第一个作业一直没有完成,这段时间内 Driver不会调度新的作业。这时候在Kafka Receiver端会有数据积压,随着积压的数据量越来越大,大部分数据会撑过新生代进入老年代,进而给Java GC带来严重的压力,容易引发应用程序崩溃。

本项目的解决方案是,修改Spark内核,在每个Executor接收任务之前先执行一个用户自定义的初始化函数,初始化函数中可以执行一些独立的用户逻辑。示例代码如下:

 

[plain] view plain copy
 
  1. // sc:是SparkContext, setupEnvironment是Hulu扩展的API  
  2. sc.setupEnvironment(() => {  
  3.   application.initialize() // 用户应用程序初始化,需执行几分钟  
  4. })  

代码1

该方案需要更改Spark的任务调度器,首先将每个Executor设置为未初始化状态。此时,调度器只会给未初始化状态的Executor分配初始化任务(执行前面提到的初始化函数)。等初始化任务完毕,调度器更新Executor的状态为已初始化,这样的Executor才可以分配正常的计算任务。

异步处理Task中的业务逻辑

本项目中,模型的输入参数均来自Codis,甚至模型内部也可能访问外部存储,直接导致模型计算时长不稳定,很多时间消耗在网络等待上。

为提高系统吞吐量,增大并行度是常用的优化方案,但在本项目的场景中并不适用。Spark作业的调度策略是,等待上一个作业的所有Task执行完毕,然后调度下一个作业。如果单个Task的运行时间不稳定,易发生个别Task拖慢整个作业的情况,以至于资源利用率不高;甚至并行度越大问题越严重。一种常用解决Task不稳定的方案是增大Spark Streaming的micro batch的时间间隔,该方案会使整个实时系统的延迟变长,并不推荐。

因此这里通过异步处理Task中的业务逻辑来解决。如下文的代码所示,同步方案中,Task内执行业务逻辑,处理时间不定;异步方案中,Task把业务逻辑嵌入线程,交给线程池执行,Task立刻结束, Executor向Driver报告执行完毕,异步处理的时间非常短,在100ms以内。另外,当线程池中积压的线程数量太大时(代码中qsize>100的情况),会暂时使用同步处理,配合反压机制(见下文的参数spark.streaming.backpressure.enabled),可以保证不会因为数据积压过多而导致系统崩溃。经实验验证,该方案大大提高了系统的吞吐量。

 

[plain] view plain copy
 
  1. // 同步处理  
  2. // 函数 runBusinessLogic是 Task 中的业务逻辑,执行时间不定  
  3. rdd.foreachPartition(partition => runBusinessLogic (partition))  
  4.   
  5. // 异步处理,threadPool是线程池  
  6. rdd.foreachPartition(partition => {  
  7.   val qsize = threadPool.getQueue.size // 线程池中积压的线程数  
  8.   if (qsize > 100) {  
  9.     runBusinessLogic(partition) // 暂时同步处理  
  10.   }  
  11.   threadPool.execute(new Runnable {  
  12.     override def run() = runBusinessLogic(partition)  
  13.   })  
  14. })  

代码2

异步化Task也存在缺点:如果Executor发生异常,存放在线程池中的业务逻辑无法重新计算,会导致部分数据丢失。经实验验证,仅当Executor异常崩溃时有数据丢失,且不常见,在本项目的场景中可以接受。

Kafka Receiver的稳定性

本项目使用了Spark Streaming中的Kafka Receiver,本质上调用Kafka官方的客户端ZookeeperConsumerConnector。其策略是每个客户端在Zookeeper的固定路径下把自己注册为临时节点,于是所有客户端都知道其他客户端的存在,然后自动协调和分配Kafka的数据资源。该策略存在一个弊端,当一个客户端与Zookeeper的连接状态发生改变(断开或者连上),所有的客户端都会通过Zookeeper协调, 重新分配Kafka的数据资源;在此期间所有客户端都断开与Kafka的连接,系统接收不到Kafka的数据,直到重新分配成功。如果网络质量不佳,并且Receiver的个数较多,这种策略会造成数据输入不稳定,很多Spark Streaming用户遇到这样的问题。在我们的系统中,该策略并没有产生明显的负面影响。值得注意的是,Kafka 客户端与Zookeeper有个默认的参数zookeeper.session.timeout.ms=6000,表示客户端与Zookeeper连接的session有效时间为6秒,我们的客户端多次出现因为Full GC超过6秒而与Zookeeper断开连接,之后再次连接上,期间所有客户端都受到影响,系统表现不稳定。所以项目中设置参数zookeeper.session.timeout.ms=30000。

YARN资源抢占问题

在Hulu内部,Spark Streaming这样的长时服务与MapRedue、Spark、Hive等批处理应用共享YARN集群资源。在共享环境中,经常因一个批处理应用占用大量网络资源或者CPU资源导致Spark Streaming服务不稳定(尽管我们采用了CGroup进行资源隔离,但效果不佳)。更严重的问题是,如果个别Container崩溃Driver需要向YARN申请新的Container,或者如果整个应用崩溃需要重启,Spark Streaming不能保证很快申请到足够的资源,也就无法保证线上服务的质量。为解决该问题,Hulu使用label-based scheduling的调度策略,从YARN集群中隔离出若干节点专门运行Spark Streaming和其他长时服务,避免与批处理程序竞争资源。

完善监控信息

监控反映系统运行的性能状态,也是一切优化的基础。Hulu使用Graphite和Grafana作为第三方监控系统,本项目把系统中关键的性能参数(如计算时长和次数)发送给Graphite服务器,就能够在Grafana网页上看到直观的统计图。
图4是统计Kafka中日志的剩余数量,一条线对应于一个partition的历史余量,大部分情况下余量接近零,符合预期。图中09:55左右日志余量开始出现很明显的尖峰,之后又迅速逼近零。事后经过多种数据核对,证实Kafka的数据一直稳定,而当时Spark Streaming执行作业突然变慢,反压机制生效,于是Kafka Receiver减小读取日志的速率,造成Kafka数据积压;一段时间之后Spark Streaming又恢复正常,快速消耗了Kafka中的数据余量。

直观的监控系统能有效地暴露问题,进而理解和强化系统。在实践中,主要的监控指标有:

  • Kafka的剩余数据量
  • Spark的作业运行时间和调度时间
  • 每个Task的计算时间
  • Codis的访问次数、时间、命中率

另外,有脚本定期分析这些统计数据,出现异常则发邮件报警。比如图4中 Kafka 的日志余量过大时,会有连续的报警邮件。我们的经验是,监控越细致,之后的优化工作越轻松。同时,优秀的监控也需要对系统深刻的理解。

图片描述

图4 Graphite监控信息,展示了Kafka中日志的剩余数量,一条线对应于一个partition的历史余量

参数优化

下表列出本项目中比较关键的几个参数: 
图片描述

总结

Spark Streaming的产品上线运行一年多,期间进行了多次Spark版本升级,从最早期的0.8版本到最近的 1.5.x版本。总体上Spark Streaming是一款优秀的实时计算框架,可以在线上使用 。但仍然存在一些不足,包括:Spark同时使用堆内和堆外的内存,缺乏一些有效的监控信息,遇到OOM时分析和调试比较困难;缺少Executor初始化接口;Spark采用函数式编程方式,抽象层次高,好处是使用方便,坏处是理解和优化困难;新版本的Spark有一些异常,如Shuffle过程中Block丢失、内存溢出。  

原文链接:http://geek.csdn.NET/news/detail/78416?utm_source=tuicool&utm_medium=referral

 

http://blog.csdn.net/guohecang/article/details/51583214

分享到:
评论

相关推荐

    06Spark Streaming原理和实践

    ### Spark Streaming原理与实践 #### 一、为什么需要流处理? 传统的批处理框架如MapReduce在处理实时数据流时存在一些局限性,主要是因为每次处理一批数据就需要启动一个新任务,而任务的启动过程(包括输入切分...

    SparkStreaming预研报告

    Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。...对于希望利用Spark Streaming进行实时数据处理的用户而言,这份文档提供了宝贵的理论和实践指导。

    spark streaming相关15篇论文,包含几篇硕士论文,包含几篇期刊论,有的结合自然语言处理

    Spark Streaming 是 Apache Spark 的一个模块,专为实时数据流处理设计。它允许开发人员使用类似于...通过阅读这些论文,读者不仅可以学习到Spark Streaming的基本原理,还能了解到相关领域的前沿技术和实践经验。

    Spark Streaming实时流处理项目实战.rar.rar

    Spark Streaming是中国大数据技术领域中广泛使用...通过这个Spark Streaming实时流处理项目实战,你将有机会实践以上知识,并进一步理解如何在实际场景中部署和调整Spark Streaming,提升实时数据处理的效能和稳定性。

    Spark Streaming Programming Guide 笔记

    在理解和应用 Spark Streaming 时,正确配置核心数和接收器的比例、合理使用缓存机制以及适时启用 Checkpoint 都是非常重要的步骤。这些最佳实践有助于构建高效且可靠的实时数据处理系统。通过遵循这些指导原则,...

    Spark Streaming Systems - Tyler Akidau

    总的来说,《Spark Streaming Systems》是理解Spark Streaming原理和实践的宝贵资源,无论你是初学者还是有经验的开发者,都能从中获得对实时数据处理深入的理解,并学会如何利用Spark Streaming构建高性能的流处理...

    Spark Streaming在滴滴的大规模实践_.zip

    在滴滴的实践中,Spark Streaming被用来识别和处理复杂的事件模式,如异常检测、用户行为分析等,帮助优化服务质量和安全性。 8. **性能优化** 通过对算法优化、内存管理以及硬件配置的调整,滴滴成功地提升了...

    Spark-streaming 在京东的项目实践.zip

    这种设计使得Spark-Streaming能够利用已有的批处理优化,并具备高吞吐量和低延迟的特性。 2. **项目架构设计** 在京东的项目中,通常采用Kafka作为数据源,因为Kafka具有高可用性和强大的消息持久化能力。Spark-...

    基于Spark Streaming将图片以流的方式写入HDFS分布式文件系统.zip

    8. **实战经验**:项目提供了一个实际环境来测试和优化Spark Streaming与HDFS的集成,对于学习者来说,是提升实际操作能力的好机会。 以上就是基于给定标题和描述可以提炼出的主要知识点,每个点都值得深入学习和...

    Spark-streaming 在京东的项目实践

    由于给定的文件信息中【部分内容】的文本出现了重复,并且具体的技术细节和项目实施...随着技术的不断进步和业务的持续发展,京东也会不断优化其基于Spark Streaming的实时数据处理架构,以适应未来大数据时代的挑战。

    CCTC 2016 腾讯林立伟:Spark Streaming在腾讯广点通的应用

    林立伟深入探讨了腾讯广点通(Tencent SocialAds)如何应用Apache Spark Streaming框架进行实时数据处理,以及在不同版本的Spark中的实践和改进。 知识点分析: 1. Apache Spark Streaming概述: - Spark ...

    spark streaming 大型电商 项目实战

    根据提供的文件信息,我们可以推断出,该文档是关于“Spark Streaming在大型...通过该案例的学习,可以了解到从项目部署到实际运营的完整过程,并能够根据项目中遇到的问题及其解决方案,对自身项目进行优化和调整。

    Pro Spark Streaming

    在描述中提到了“最佳实践”,这可能包括了对Spark Streaming架构的深入理解、高效使用DStream API进行流式数据操作、对实时数据流的错误处理和恢复机制、性能优化策略、以及如何与Spark的其他组件(如Spark SQL、...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    本项目名为“基于Spark Streaming和Kafka,HBase的日志统计分析系统”,旨在利用这三个核心组件构建一个实时、高效的数据处理管道。以下将详细介绍这三个组件以及它们在系统中的作用。 **Spark Streaming** Apache ...

    Spark 实践-基于 Spark Streaming 的实时日志分析系统+源代码+文档说明

    1、资源内容:Spark 实践——基于 Spark Streaming 的实时日志分析系统+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行成功...

    计算机课程毕设:基于spark streaming和kafka,hbase的日志统计分析系统.zip

    这个毕业设计项目涵盖了实时大数据处理的关键技术,对于理解和掌握大数据处理流程、Spark流处理以及分布式存储有着重要的实践价值。通过实际操作,学生可以深入理解这些工具的使用,并提升解决实际问题的能力。

    计算机课程毕设:基于Spark Streaming+ALS的餐饮智能推荐系统.zip

    6. 性能优化:通过调整Spark Streaming的窗口大小、批处理间隔等参数,以及ALS的迭代次数、正则化参数等,平衡推荐精度和系统性能。 本项目不仅适用于毕业设计和课程设计,也是提升个人技能、理解大数据实时处理和...

    大数据技术原理及应用课实验7 :Spark初级编程实践

    优化和改进方面,可以考虑使用更高效的Join策略,如Broadcast Join来处理大型数据集,或者使用DataFrames和Datasets API来利用其编译时检查和优化。另外,还可以研究Spark的动态资源调度,以适应数据量的变化和集群...

    毕业设计:基于Spark streaming的系统日志分析系统.zip

    总结来说,这个基于Spark Streaming的系统日志分析系统涵盖了大数据实时处理、日志分析、数据库管理和系统设计等多个重要领域,对于计算机科学专业的学生来说,是一个很好的实践平台,能帮助他们深入理解相关技术并...

    基于flume+kafka_spark streaming+hbase的流式处理系统设计与实现.zip

    在实际应用中,可以根据业务需求调整各个组件的配置,如Flume的采集策略、Kafka的分区策略、Spark Streaming的窗口大小以及HBase的表结构设计等,以优化系统的性能和效率。同时,由于该项目已经经过测试并可直接运行...

Global site tag (gtag.js) - Google Analytics