`
z01_ejdazhi
  • 浏览: 1391 次
最近访客 更多访客>>
文章分类
社区版块
存档分类
最新评论

那些年我们在spark SQL上踩过的坑

阅读更多

做了一年延云YDB的开发,这一年在使用spark上真心踩了不少坑,总结一下,希望对大家有所帮助。

spark 内存泄露

1.高并发情况下的内存泄露的具体表现

很遗憾,Spark的设计架构并不是为了高并发请求而设计的,我们尝试在网络条件不好的集群下,进行100并发的查询,在压测3天后发现了内存泄露。

a)在进行大量小SQL的压测过程中发现,有大量的activejob在spark ui上一直处于pending状态,且永远不结束,如下图所示

 

b)并且发现driver内存爆满

 

c)用内存分析分析工具分析了下

 

2.高并发下AsynchronousListenerBus引起的WEB UI的内存泄露

短时间内 SPARK 提交大量的SQL ,而且SQL里面存在大量的 union与join的情形,会创建大量的event对象,使得这里的 event数量超过10000个event ,

一旦超过10000个event就开始丢弃 event,而这个event是用来回收 资源的,丢弃了 资源就无法回收了
。 针对UI页面的这个问题,我们将这个队列长度的限制给取消了。

 

 

 

 

 

 

3.AsynchronousListenerBus本身引起的内存泄露

抓包发现


 

 

 

这些event是通过post方法传递的,并写入到队列里

 

 

但是也是由一个单线程进行postToAll的

 

 

但是在高并发情况下,单线程的postToAll的速度没有post的速度快,会导致队列堆积的event越来越多,如果是持续性的高并发的SQL查询,这里就会导致内存泄露

 

接下来我们在分析下postToAll的方法里面,那个路径是最慢的,导致事件处理最慢的逻辑是那个?

 

 

 

 


可能您都不敢相信,通过jstack抓取分析,程序大部分时间都阻塞在记录日志上

 

可以通过禁用这个地方的log来提升event的速度

 

log4j.logger.org.apache.spark.scheduler=ERROR

 


 

 

 

4.高并发下的Cleaner的内存泄露

       说道这里,Cleaner的设计应该算是spark最糟糕的设计。spark的ContextCleaner是用于回收与清理已经完成了的 广播boradcast,shuffle数据的。但是高并发下,我们发现这个地方积累的数据会越来越多,最终导致driver内存跑满而挂掉。

l我们先看下,是如何触发内存回收的

 

      没错,就是通过System.gc() 回收的内存,如果我们在jvm里配置了禁止执行System.gc,这个逻辑就等于废掉(而且有很多jvm的优化参数一般都推荐配置禁止system.gc 参数)

lclean过程

这是一个单线程的逻辑,而且每次清理都要协同很多机器一同清理,清理速度相对来说比较慢,但是SQL并发很大的时候,产生速度超过了清理速度,整个driver就会发生内存泄露。而且brocadcast如果占用内存太多,也会使用非常多的本地磁盘小文件,我们在测试中发现,高持续性并发的情况下本地磁盘用于存储blockmanager的目录占据了我们60%的存储空间。

 

 

我们再来分析下 clean里面,那个逻辑最慢

 

真正的瓶颈在于blockManagerMaster里面的removeBroadcast,因为这部分逻辑是需要跨越多台机器的。

 

针对这种问题,

l我们在SQL层加了一个SQLWAITING逻辑,判断了堆积长度,如果堆积长度超过了我们的设定值,我们这里将阻塞新的SQL的执行。堆积长度可以通过更改conf目录下的ya100_env_default.sh中的ydb.sql.waiting.queue.size的值来设置。

 

l建议集群的带宽要大一些,万兆网络肯定会比千兆网络的清理速度快很多。

l给集群休息的机会,不要一直持续性的高并发,让集群有间断的机会。

l增大spark的线程池,可以调节conf下的spark-defaults.conf的如下值来改善。

 

 

 

5.线程池与threadlocal引起的内存泄露

       发现spark,Hive,lucene都非常钟爱使用threadlocal来管理临时的session对象,期待SQL执行完毕后这些对象能够自动释放,但是与此同时spark又使用了线程池,线程池里的线程一直不结束,这些资源一直就不释放,时间久了内存就堆积起来了。

针对这个问题,延云修改了spark关键线程池的实现,更改为每1个小时,强制更换线程池为新的线程池,旧的线程数能够自动释放。

 

6.文件泄露

      您会发现,随着请求的session变多,spark会在hdfs和本地磁盘创建海量的磁盘目录,最终会因为本地磁盘与hdfs上的目录过多,而导致文件系统和整个文件系统瘫痪。在YDB里面我们针对这种情况也做了处理。

 

7.deleteONExit内存泄露

 

 

 

 

 

为什么会有这些对象在里面,我们看下源码

 

 

 

 

 

 

 

 

8.JDO内存泄露

多达10万多个JDOPersistenceManager

 


 

 

 


 


 

 

 

 

 

 

 

 

9.listerner内存泄露

通过debug工具监控发现,spark的listerner随着时间的积累,通知(post)速度运来越慢

发现所有代码都卡在了onpostevent上

 

 

 

 

 

jstack的结果如下


 

 

研究下了调用逻辑如下,发现是循环调用listerners,而且listerner都是空执行才会产生上面的jstack截图

 

 

通过内存发现有30多万个linterner在里面

 

 

发现都是大多数都是同一个listener,我们核对下该处源码

 

 

最终定位问题

确系是这个地方的BUG ,每次创建JDBC连接的时候 ,spark就会增加一个listener, 时间久了,listener就会积累越来越多  针对这个问题 我简单的修改了一行代码,开始进入下一轮的压测

 

 

 

 

二十二、spark源码调优

      测试发现,即使只有1条记录,使用 spark进行一次SQL查询也会耗时1秒,对很多即席查询来说1秒的等待,对用户体验非常不友好。针对这个问题,我们在spark与hive的细节代码上进行了局部调优,调优后,响应时间由原先的1秒缩减到现在的200~300毫秒。

      

以下是我们改动过的地方

1.SessionState 的创建目录 占用较多的时间

 

 

另外使用Hadoop namenode HA的同学会注意到,如果第一个namenode是standby状态,这个地方会更慢,就不止一秒,所以除了改动源码外,如果使用namenode ha的同学一定要注意,将active状态的node一定要放在前面。

2.HiveConf的初始化过程占用太多时间

频繁的hiveConf初始化,需要读取core-default.xml,hdfs-default.xml,yarn-default.xml

,mapreduce-default.xml,hive-default.xml等多个xml文件,而这些xml文件都是内嵌在jar包内的。

第一,解压这些jar包需要耗费较多的时间,第二每次都对这些xml文件解析也耗费时间。

 

 

 

 

 

 

 

 

 

 

 

 

3.广播broadcast传递的hadoop configuration序列化很耗时

lconfiguration的序列化,采用了压缩的方式进行序列化,有全局锁的问题

lconfiguration每次序列化,传递了太多了没用的配置项了,1000多个配置项,占用60多Kb。我们剔除了不是必须传输的配置项后,缩减到44个配置项,2kb的大小。

 

 

 

 

 

 

4.对spark广播数据broadcast的Cleaner的改进

 

由于SPARK-3015 的BUG,spark的cleaner 目前为单线程回收模式。

大家留意spark源码注释

 

 

 

其中的单线程瓶颈点在于广播数据的cleaner,由于要跨越很多机器,需要通过akka进行网络交互。

如果回收并发特别大,SPARK-3015 的bug报告会出现网络拥堵,导致大量的 timeout出现。

为什么回收量特变大呢? 其实是因为cleaner 本质是通过system.gc(),定期执行的,默认积累30分钟或者进行了gc后才触发cleaner,这样就会导致瞬间,大量的akka并发执行,集中释放,网络不瞬间瘫痪才不怪呢。

但是单线程回收意味着回收速度
恒定,如果查询并发很大,回收速度跟不上cleaner的速度,会导致cleaner积累很多,会导致进程OOM(YDB做了修改,会限制前台查询的并发)。

不论是OOM还是限制并发都不是我们希望看到的,所以针对高并发情况下,这种单线程的回收速度是满足不了高并发的需求的。


对于官方的这样的做法,我们表示并不是一个完美的cleaner方案。并发回收一定要支持,只要解决akka的timeout问题即可。
所以这个问题要仔细分析一下,akka为什么会timeout,是因为cleaner占据了太多的资源,那么我们是否可以控制下cleaner的并发呢?比如说使用4个并发,而不是默认将全部的并发线程都给占满呢?这样及解决了cleaner的回收速度,也解决了akka的问题不是更好么?

针对这个问题,我们最终还是选择了修改spark的ContextCleaner对象,将广播数据的回收 改成多线程的方式,但现在了线程的并发数量,从而解决了该问题。

分享到:
评论

相关推荐

    Atlas Spark SQL血缘分析,Hive Hook

    在本主题中,我们将深入探讨如何使用Apache Atlas进行Spark SQL的血缘分析,并结合Hive Hook来实现这一功能。 首先,Apache Atlas 提供的血缘分析功能可以帮助用户理解数据的来源和去向,这对于数据治理和数据质量...

    Learning Spark SQL epub

    Learning Spark SQL 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    spark sql解析-源码分析

    Spark SQL结合了DataFrame API和传统的SQL接口,使得开发人员可以灵活地在结构化和半结构化数据上进行高性能计算。在这个源码分析中,我们将深入探讨Spark SQL的工作原理、主要组件以及其解析过程。 1. **DataFrame...

    Learning Spark SQL - Aurobindo Sarkar

    《Learning Spark SQL - Aurobindo Sarkar》这本书是针对Apache Spark SQL的深入学习指南,由Aurobindo Sarkar撰写。Spark SQL是Apache Spark框架的一部分,它允许开发者使用SQL或者DataFrame API处理大规模数据。...

    Spark SQL操作JSON字段的小技巧

    Spark SQL是一款强大的大数据处理工具,它提供了对JSON数据的内置支持,使得在处理JSON格式的数据时更加便捷。本文将详细介绍Spark SQL操作JSON字段的几个关键函数:get_json_object、from_json 和 to_json,以及...

    Spark SQL常见4种数据源详解

    Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...

    实训指导书_使用Spark SQL进行法律服务网站数据分析.zip

    在开始分析之前,我们需要将法律服务网站的数据导入到Spark SQL中。这通常涉及到数据清洗、格式转换等工作,确保数据符合DataFrame的结构。数据可能来源于不同的源,如CSV、JSON或数据库,Spark SQL提供了丰富的数据...

    2015 Spark技术峰会-Spark SQL结构化数据分析-连城

    Databrciks工程师,Spark Committer,Spark SQL主要开发者之一的连城详细解读了“Spark SQL结构化数据分析”。他介绍了Spark1.3版本中的很多新特性。重点介绍了DataFrame。其从SchemaRDD演变而来,提供了更加高层...

    Spark SQL 2.3.0:深入浅出

    它是Spark用于SQL和数据帧API的一个组件,能够在Spark程序中查询结构化数据。该模块集成在Spark中,使得用户能够以声明式的方式使用SQL来处理数据,同时也可以采用RDD的DataFrame API来处理数据。 Spark SQL的核心...

    Spark Sql中时间字段少8个小时问题解决

    综上所述,在处理Spark SQL中时间字段少8个小时的问题时,通过设置正确的时区配置,可以有效地解决该问题。此外,对于时间数据的处理,始终要注意时区的转换和处理,以确保数据的准确性和一致性。

    Spark SQL操作大全.zip

    Spark SQL是Apache Spark项目的一个核心组件,它提供了处理结构化数据的强大功能,使得在大数据分析领域中,Spark SQL成为了一种不可或缺的工具。本资料主要涵盖了Spark SQL的基础概念、核心特性、操作方法以及实战...

    Spark SQL 实验

    实验介绍部分告诉我们,实验的目的是通过实验的方式来深入了解和掌握Spark SQL的使用。用户需要将名为"ml-1m.zip"的数据集文件拷贝到/tmp目录下,并进行解压。数据集"ml-1m"是电影评分数据集,包含用户信息、评分等...

    基于antlr4 解析器,支持spark sql, tidb sql, flink sql, Sparkflink运行命令解析器

    在本项目中,ANTLR4被用来创建一个解析器,这个解析器支持多种SQL方言,包括Spark SQL、TiDB SQL以及Flink SQL,同时还支持Spark和Flink的运行命令解析。 Spark SQL是Apache Spark的一个组件,主要负责处理结构化的...

    Spark SQL 表达式计算

    表达式计算在Spark SQL中随处可见,本演讲将简介表达式、UDF、UDAF、UDTF的概念,主要的API,以及如何扩展Spark SQL函数库。本演讲还将提及Catalyst在计划阶段和Project Tungsten在执行层做的优化,以及未来性能提升...

    Spark_SQL大数据实例开发教程.pdf by Spark_SQL大数据实例开发教程.pdf (z-lib.org)1

    本书旨在帮助企业级开发人员深入理解和掌握Spark SQL,它在Spark生态系统中扮演着至关重要的角色,是处理大规模数据的核心组件。书中通过实际案例,详细介绍了Spark SQL的各种特性和操作方法。 首先,书中的前言...

    Spark SQL 教学讲解PPT

    参考Spark官网以及一些文献,制作的Spark SQL教学幻灯片,适合进行Spark入门介绍与教学!所有的Spark教学系列都在我的资源内!

    《Spark SQL编程指南(v1.1.0)

    综上所述,Spark SQL通过其强大的DataFrame和Dataset API、SQL支持、高效的Catalyst优化器以及与Hive的深度集成,为大数据分析提供了一个高效且灵活的平台。无论是对于SQL开发者还是Scala/Java/Python开发者,Spark ...

    Spark SQL源码概览.zip

    Spark SQL的执行计划由Executor节点执行,它们并行运行在Spark集群上,实现了数据的分布式处理。Stage是执行计划的基本单元,每个Stage由一个或多个Task组成,Tasks在Executor之间分配工作。 7. **代码生成**: ...

Global site tag (gtag.js) - Google Analytics