`

Spark DataFrame处理数据倾斜问题

阅读更多
由于爬虫抓取等原因,会导致单一ID的日志条数过多。在spark中,同一ID的日志会被shuffle到单一的节点上进行处理,导致系统运行缓慢!
因为这些用户的访问本来就是无效的,所以可以直接过滤掉这部分用户。
话不多说,scala的DataFrame版输出和代码如下(参考链接见代码注释):
引用
spark version: 1.6.1
Original DataFrame (with fake users):
+---------+------+
|       id| movie|
+---------+------+
|       u1|WhoAmI|
|       u2|Zoppia|
|       u2|  Lost|
|FakeUserA|Zoppia|
|FakeUserA|  Lost|
|FakeUserA|Zoppia|
|FakeUserA|  Lost|
|FakeUserA|Zoppia|
|FakeUserA|  Lost|
|FakeUserB|  Lost|
|FakeUserB|  Lost|
|FakeUserB|  Lost|
|FakeUserB|  Lost|
+---------+------+

Fake Users with count (threshold=2):
+---------+-----+
|       id|count|
+---------+-----+
|FakeUserA|    6|
|FakeUserB|    4|
+---------+-----+

Fake Users:
Set(FakeUserA, FakeUserB)

Valid users after filter:
+---+------+
| id| movie|
+---+------+
| u1|WhoAmI|
| u2|Zoppia|
| u2|  Lost|
+---+------+




import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

/**
  * Created by colinliang on 2017/8/14.
  */
case class IDMovie(id: String, movie: String)
object BroadcastTest {
  def main(args: Array[String]): Unit = {
    Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
    val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
    val sc = new SparkContext(conf)
    println("spark version: " + sc.version)
    sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
    val spark = new SQLContext(sc)



    val idvids = List(
      IDMovie("u1", "WhoAmI")
      , IDMovie("u2", "Zoppia")
      , IDMovie("u2", "Lost")
      , IDMovie("FakeUserA", "Zoppia")
      , IDMovie("FakeUserA", "Lost")
      , IDMovie("FakeUserA", "Zoppia")
      , IDMovie("FakeUserA", "Lost")
      , IDMovie("FakeUserA", "Zoppia")
      , IDMovie("FakeUserA", "Lost")
      , IDMovie("FakeUserB", "Lost")
      , IDMovie("FakeUserB", "Lost")
      , IDMovie("FakeUserB", "Lost")
      , IDMovie("FakeUserB", "Lost")
      );


    val df = spark
      .createDataFrame(idvids)
      .repartition(col("id"))

    println("Original DataFrame (with fake users): ")
    df.show()

//    val df_fakeUsers_with_count=df.sample(false,0.1).groupBy(col("id")).count().filter(col("count")>2).limit(10000)//实际中可以根据需要仅采样一部分数据
    val df_fakeUsers_with_count=df.groupBy(col("id")).count().filter(col("count")>2)
    /**DataFrame 中的groupby 为aggregation形式的,不涉及shuffle,速度很快。参见:https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html
      更多聚合函数参见:https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.sql.functions$
      此外,还可以通过agg()函数对groupBy后的数据的多列进行聚合
      */
    println("Fake Users with count (threshold=2):")
    df_fakeUsers_with_count.show()


    val set_fakeUsers=df_fakeUsers_with_count.select("id").collect().map(_(0)).toList.map(_.toString).toArray[String].toSet
    println("Fake Users:")
    println(set_fakeUsers)
    val set_fakeUsers_broadcast=sc.broadcast(set_fakeUsers)
    /** broadcast教程:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html
      * 官方文档: http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
      */

    val udf_isValidUser = udf((id: String) => !set_fakeUsers_broadcast.value.contains(id)) //直接用set_highCountUsers.contains(id) 也行,但效率低,因为反序列化的次数可能比较多,参见http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
    val df_filtered=df.filter(udf_isValidUser(col("id")) ) //过滤掉这部分用户
    /** 如果是要保留部分用户,而不是过滤掉这部分用户,且用户量很小,无需定义UDF:
      * https://stackoverflow.com/questions/39234360/filter-spark-scala-dataframe-if-column-is-present-in-set
      * val validValues = Set("A", "B", "C")
      * data.filter($"myColumn".isin(validValues.toSeq: _*))
      */
    /** 如果是要保留部分用户,且用户量比较大,可以用broadcast 的DataFrame:
      * https://stackoverflow.com/questions/33824933/spark-dataframe-filtering-retain-element-belonging-to-a-list
      * import org.apache.spark.sql.functions.broadcast
      * initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
      */
    println("\nValid users after filter:")
    df_filtered.show()
  }
}
分享到:
评论

相关推荐

    Spark大数据处理 技术 应用与性能优化 完整版 pdf

    3. 数据倾斜:识别并处理数据分布不均问题,通过分区策略优化数据分布。 4. 缓存策略:利用RDD持久化,将常用数据缓存到内存,减少重复计算。 5. SQL优化:使用DataFrame或Dataset API,避免冗余转换,使用合适的...

    Spark.sql数据库部分的内容

    9. **倾斜键处理**:Spark SQL提供了处理数据倾斜的策略,如采样重分布和广播JOIN,以解决大规模数据集处理中的性能瓶颈。 10. **Spark SQL与Spark Streaming集成**:Spark SQL可以与Spark Streaming结合,对实时流...

    Spark快速数据处理 PDF电子书下载

    -并行度和分区操作:合理控制并行度,避免数据倾斜。 6. Spark的生态系统 Spark生态包含众多项目,除了核心的Spark之外,还包括了用于实时数据处理的Spark Streaming、提供高级SQL和数据框操作的Spark SQL、用于...

    Spark大数据处理技术 应用与性能优化 高清带目录 .pdf

    - Spark性能调优策略:包括内存管理、持久化级别选择、广播变量使用、并行度设置、数据倾斜处理等。 - Spark容错机制:如RDD的不变性和分区概念,使得在节点失败时可以通过重新计算恢复数据。 此外,还可能涉及到...

    Spark大数据处理技术PDF 高清带目录完整版 夏俊鸾黄洁程浩等

    9. **Spark性能优化**:提供Spark性能调优的实践指导,包括配置参数调整、数据倾斜处理、内存管理策略等。 10. **Spark实际应用案例**:通过实际项目或案例分析,展示Spark在不同领域的应用,如推荐系统、日志分析...

    spark高级分析数据源码

    这可能涉及到配置调整、数据倾斜处理、宽依赖优化、Shuffle减少等多个方面。 10. **Spark与大数据生态集成**:Spark可以与Hadoop、Cassandra、HBase等大数据组件集成,实现数据的读取、写入和处理,了解这些集成...

    spark 快速大数据分析

    4. 数据倾斜:处理数据分布不均的问题,避免“热点”分区。 七、Spark部署模式 Spark可以运行在本地模式、独立集群模式、YARN、Mesos或Kubernetes等资源管理系统上。选择合适的部署模式取决于具体需求,如资源管理...

    Spark大数据处理:技术、应用与性能优化(全).7z

    4. 数据倾斜:识别和处理数据分布不均的问题,避免单个task过载。 5. 缓存策略:智能缓存热点数据,减少重复计算,提升整体性能。 6. 代码优化:避免Shuffle操作,减少网络传输;使用宽依赖替代窄依赖,提高执行效率...

    Spark大数据处理:技术、应用与性能优化 (大数据技术丛书).pdf

    此外,可能还会涉及数据缓存策略、任务并行度、数据倾斜问题的解决等。 8. **案例研究**:书中可能会包含多个实际应用案例,展示如何在不同领域,如金融、社交媒体、电商等,利用Spark进行大数据分析和处理。 9. *...

    Spark大数据处理_基础学习glj.zip

    6. 性能优化:为了最大化Spark的性能,需要关注数据倾斜、宽依赖优化、缓存策略、网络传输效率等因素。例如,通过分区调整避免数据不平衡,利用广播变量减少网络传输,合理设置executor数量和内存大小等。 7. 总结...

    《Spark 快速大数据分析》学习笔记.zip

    11. **性能调优**:包括配置参数调整(如executor内存、并行度设置)、数据倾斜处理、宽依赖优化等,以提高Spark应用的性能。 12. **案例研究**:笔记可能包含使用Spark处理实际问题的案例,如日志分析、推荐系统...

    spark学习资料

    读者可能会从中了解到如何配置和优化Spark集群,以及如何处理数据倾斜等问题,这些对于在企业环境中部署和运行Spark至关重要。 《深入理解Spark 核心思想与源码分析》则可能针对希望深入了解Spark内部机制的读者。...

    Spark面试攻略:全面准备与技巧指南.docx

    * 诊断和确认数据倾斜 * 增加 shuffle 分区数 * 使用随机前缀和扩展键 * 广播小表 * 分桶(Bucketing) * 自定义分区器 Spark 的 DataFrame API 与 RDD 相比的优点和局限性 DataFrame 和 RDD 是 Spark 中两种不同...

    Spark大数据分析平台架构.pptx

    此外,业务层面的问题包括平台无法提供足够的指导来优化Executor配置和内存参数,以及发现并解决数据倾斜、HDFS提交阻塞等问题。 为了解决这些问题,平台需要建立自动化分析和故障诊断系统,比如“华佗”系统。它...

    光环大数据培训spark体系学习文档

    3. 数据倾斜:识别和处理数据不均匀分布问题,如使用自定义分区器。 4. SQL性能调优:优化查询计划,使用广播变量和Join优化等方法。 七、实战应用 通过案例分析,了解如何在实际项目中运用Spark解决大数据问题,如...

    深入理解SPARK2018

    7. **性能优化**:Spark的性能优化是开发者关注的重点,可能涉及到调度策略、并行度设置、数据倾斜处理、网络通信优化等多个方面。 8. **Spark与Hadoop的集成**:Spark可以在Hadoop生态系统中无缝运行,资料可能...

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    12. **Spark性能调优**:涵盖参数调整、数据倾斜处理、任务并行度优化、网络传输优化等方面,以最大化系统性能。 13. **Spark的故障恢复与容错机制**:包括检查点和事件时间窗口,以及如何在失败后恢复作业。 通过...

    spark程序

    1. **数据分区策略**:合理设置数据分区可以帮助优化计算性能,避免数据倾斜问题。 2. **内存管理**:理解Spark的存储层次和缓存策略,可以有效利用内存资源,减少磁盘I/O。 3. **错误恢复**:Spark提供容错机制,如...

    sparksqlCmd_Spark!_spark_

    在大数据处理中,数据倾斜可能造成部分节点处理的数据量过大,影响整体性能。解决这个问题的方法包括使用哈希分桶、自定义分区函数,或者使用Stable Hash Join来平衡负载。 以上就是关于SparkSQL命令提交接口的基本...

    Spark Adaptive Execution

    自适应执行引擎突破了这一限制,能够在运行时根据数据的分布和执行情况优化查询执行,例如自动选择合适的分区数、处理数据倾斜问题以及优化连接(Joins)操作。 分区是Spark中的一个重要概念,Spark SQL通过分区...

Global site tag (gtag.js) - Google Analytics