`
kavy
  • 浏览: 890552 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

spark– 如何定义DataFrame的分区?

 
阅读更多

在Spark> = 1.6中,可以使用按列分区查询和缓存。参见:SPARK-11410SPARK-4849使用重分区方法:

 

val df = sc.parallelize(Seq(("A",1),("B",2),("A",3),("C",1))).toDF("k","v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)// == Parsed Logical Plan ==// 'RepartitionByExpression ['k], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Analyzed Logical Plan ==// k: string, v: int// RepartitionByExpression [k#7], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Optimized Logical Plan ==// RepartitionByExpression [k#7], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Physical Plan ==// TungstenExchange hashpartitioning(k#7,200), None// +- Project [_1#5 AS k#7,_2#6 AS v#8]//    +- Scan PhysicalRDD[_1#5,_2#6]

与RDDs不同,Spark Dataset(包括Dataset [Row] a.k.a DataFrame)现在不能使用自定义分区器。你通常可以通过创建一个人工分区列来解决这个问题,但它不会给你相同的灵活性。

Spark< 1.6.0: 您可以做的一件事是在创建DataFrame之前预分区输入数据

import org.apache.spark.sql.types._
import org.apache.spark.sql.Rowimport org.apache.spark.HashPartitioner

val schema =StructType(Seq(StructField("x",StringType,false),StructField("y",LongType,false),StructField("z",DoubleType,false)))

val rdd = sc.parallelize(Seq(Row("foo",1L,0.5),Row("bar",0L,0.0),Row("??",-1L,2.0),Row("foo",-1L,0.0),Row("??",3L,0.6),Row("bar",-3L,0.99)))

val partitioner =newHashPartitioner(5) 

val partitioned = rdd.map(r =>(r.getString(0), r)).partitionBy(partitioner).values

val df = sqlContext.createDataFrame(partitioned, schema)

由于从RDD创建DataFrame只需要一个简单的映射阶段现有的分区布局应该保留*:

 

assert(df.rdd.partitions == partitioned.partitions)

以同样的方式,您可以重新分区现有的DataFrame:

 

sqlContext.createDataFrame(
  df.rdd.map(r =>(r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

所以看起来这不是不可能的。问题仍然存在,如果它是有意义的。我会说,大多数时候它不:

>重新分区是一个昂贵的过程。在典型情况下,大多数数据必须序列化,混排和反序列化。另一方面,可以从预分割数据中受益的操作数量相对较少,并且如果内部API未被设计为利用该属性,则进一步受限。

>在某些情况下联接,但它需要内部支持,
>窗口函数调用与匹配分区器。同上,限于一个窗口定义。它已经在内部进行了分区,因此预分区可能是多余的,
>使用GROUP BY的简单聚合 – 可以减少临时缓冲区**的内存占用,但总体成本要高得多。或多或少相当于groupByKey.mapValues(_。reduce)(当前行为)vs reduceByKey(预分区)。不太可能在实践中有用。
>使用SqlContext.cacheTable进行数据压缩。由于它看起来像是使用运行长度编码,应用OrderedRDDFunctions.repartitionAndSortWithinPartitions可以提高压缩率。

>性能高度依赖于密钥的分布。如果它是倾斜的,它将导致次优资源利用。在最坏的情况下,根本不可能完成这项工作。
>使用高级声明性API的一个重点是将自己与低级实现细节隔离开来。正如@dwysakowicz@RomiKuntsman已经提到的,优化是Catalyst Optimizer的工作。它是一个非常复杂的野兽,我真的怀疑你可以轻松地改进,没有深入到它的内部。

使用JDBC源分区:

JDBC数据源支持predicates argument.它可以如下使用:

 

sqlContext.read.jdbc(url, table,Array("foo = 1","foo = 3"), props)

它为每个谓词创建一个JDBC分区。请记住,如果使用单个谓词创建的集合不是不相交的,则会在结果表中看到重复的集合。

DataFrameWriter中的partitionBy方法:

Spark DataFrameWriter提供了partitionBy方法,可用于在写入时“分区”数据。它使用提供的列集分隔写入数据

 

val df =Seq(("foo",1.0),("bar",2.0),("foo",1.5),("bar",2.6)).toDF("k","v")

df.write.partitionBy("k").json("/tmp/foo.json")

这使得基于键的查询读取上的谓词下推:

 

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k"==="bar")

但它不等同于DataFrame.repartition。特别是聚合:

 

val cnts = df1.groupBy($"k").sum()

仍将需要TunnstenExchange:

 

cnts.explain

// == Physical Plan ==// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])// +- TungstenExchange hashpartitioning(k#90,200), None//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

*分区布局我的意思是只有一个数据分布。分区RDD不再是分区器。
**假设没有早期预测。如果聚合仅覆盖列的小子集,则可能没有任何增益。

分享到:
评论

相关推荐

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。Dataframe可以看作是具有模式信息的分布式数据集。Dataset是类型安全的DataFrame。 在本文中,将详细介绍Spark的RDD API,...

    Spark SQL常见4种数据源详解

    Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...

    Spark创建RDD、DataFrame各种情况的默认分区数

    熟悉Spark的分区对于Spark性能调优很重要,本文总结Spark通过各种函数创建RDD、DataFrame时默认的分区数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些...

    spark: RDD与DataFrame之间的相互转换方法

    在大数据处理框架Spark中,RDD(弹性分布式数据集)和DataFrame是两种常用的抽象数据类型,它们分别对应于不同的操作接口和优化技术。RDD是最基础的分布式数据集合,提供了一种容错的并行处理数据的方式。而...

    spark rdd转dataframe 写入mysql的实例讲解

    这里需要注意,`Spark.sql.*`系列的配置参数可以在创建`SparkConf`时设置,例如`spark.sql.shuffle.partitions`用于设置默认的分区数,`spark.sql.inMemoryColumnarStorage.compressed`用来决定DataFrame在内存中...

    Spark.sql数据库部分的内容

    8. **动态分区裁剪**:在读取Hive表时,Spark SQL可以自动识别查询中的分区条件,只读取相关的分区,从而减少数据的I/O。 9. **倾斜键处理**:Spark SQL提供了处理数据倾斜的策略,如采样重分布和广播JOIN,以解决...

    JAVA spark创建DataFrame的方法

    总的来说,Java操作Spark创建DataFrame涉及创建SparkSession、加载数据、转换数据、定义Schema以及保存结果。这个过程虽然相对Python或Scala来说代码较多,但提供了面向对象编程的灵活性,适合大型企业级应用。了解...

    pandas和spark dataframe互相转换实例详解

    在大数据处理领域,`pandas` 和 `Spark DataFrame` 是两个重要的工具。`pandas` 是 Python 中用于数据处理和分析的库,而 `Spark DataFrame` 是 Apache Spark 的核心组件,提供了一种分布式数据处理能力。本文将详细...

    Apache Spark的面试题.zip

    ** RDD是Spark的基础数据抽象,是弹性分布式数据集,是不可变、分区的记录集合,支持并行操作。 2. **Spark的架构** - **Spark的工作模式有哪几种?** 本地模式、集群模式(如Standalone、YARN、Mesos)和...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    2. **RDD概念**:RDD的定义,其不可变性和分区特性,以及转换和行动操作的原理。 3. **Spark架构**:Driver、Executor、Cluster Manager的角色,以及如何通过SparkContext启动计算任务。 4. **内存管理**:Spark如何...

    spark安装包+spark实验安装软件

    Spark SQL可以通过DataFrame API进行数据处理,它允许开发者使用SQL或DataFrame API进行数据查询。DataFrame可以看作是跨语言的表,支持多种数据源,如Hive、Parquet、JSON等。 **6. Spark Streaming的应用** ...

    spark笔记.zip

    3. Spark SQL与DataFrame/Dataset:Spark SQL引入DataFrame和Dataset,提供更接近于传统SQL的接口,使得数据处理更加方便。DataFrame是基于Schema的RDD,而Dataset则结合了RDD的性能和强类型的优势。 4. Spark ...

    spark-timeSeries.rar_scala 时间序列_spark ARIMA_spark arima_spark 滑

    在Spark中,可以自定义窗口函数实现滑动平均,例如使用`window`函数和`avg`函数,定义一个窗口大小,对每个窗口内的数据进行平均。 六、预测与评估 预测完成后,我们需要评估模型的性能。这通常通过比较预测值与...

    Spark技术内幕 深入解析Spark内核架构设计与实现原理 高清 完整书签

    5. **Spark SQL与DataFrame/Dataset**:探讨Spark SQL如何提供SQL接口进行结构化数据处理,介绍DataFrame和Dataset的概念,它们如何作为统一的数据抽象层,简化编程模型。 6. **Spark Streaming**:讲解Spark ...

    46488_Spark大数据技术与应用_习题数据和答案.rar

    2. **第3章**:Spark SQL与DataFrame - DataFrame的引入,对比RDD的优势 - Spark SQL的基础概念,如何创建DataFrame - DataFrame的转换和行动操作 - Hive与Spark SQL的集成,使用Hive metastore - Spark SQL的...

    Spark开发指导文档

    2. 创建Spark应用:使用Scala编写Spark程序,创建SparkContext并定义数据处理逻辑。 3. 数据读写:使用Spark的DataFrame或RDD API读取HDFS、Hive、Cassandra等数据源,或者通过Spark SQL执行SQL查询。 4. 并行计算:...

    Spark各种demo学习

    3. DataFrame:基于Spark SQL的DataFrame,提供了更高级别的数据操作接口,适用于结构化数据处理。 4. Dataset:DataFrame的类型安全版本,支持强类型和编译时检查。 5. Spark Streaming:用于实时流处理,通过微...

    spark大数据分析核心概念技术及实践

    - **Spark SQL的DDL**:创建表、视图,定义分区等。 - **DataFrame/Dataset的交互式查询**:通过SparkSession,可以进行交互式SQL查询。 5. **Spark Streaming** - **DStream(Discretized Stream)**:连续的...

    大数据时代下统计分析的新利器——SparkR

    - **RDD API的支持**:除了DataFrame外,SparkR还支持传统的RDD API,提供了一系列常用的数据处理操作,如数据缓存控制、数据保存、数据转换、数据分组与聚合、RDD间join操作、排序操作以及重分区操作等。...

Global site tag (gtag.js) - Google Analytics