在Spark> = 1.6中,可以使用按列分区查询和缓存。参见:SPARK-11410和SPARK-4849使用重分区方法:
val df = sc.parallelize(Seq(("A",1),("B",2),("A",3),("C",1))).toDF("k","v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)// == Parsed Logical Plan ==// 'RepartitionByExpression ['k], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Analyzed Logical Plan ==// k: string, v: int// RepartitionByExpression [k#7], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Optimized Logical Plan ==// RepartitionByExpression [k#7], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Physical Plan ==// TungstenExchange hashpartitioning(k#7,200), None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- Scan PhysicalRDD[_1#5,_2#6]
与RDDs不同,Spark Dataset(包括Dataset [Row] a.k.a DataFrame)现在不能使用自定义分区器。你通常可以通过创建一个人工分区列来解决这个问题,但它不会给你相同的灵活性。
Spark< 1.6.0: 您可以做的一件事是在创建DataFrame之前预分区输入数据
import org.apache.spark.sql.types._
import org.apache.spark.sql.Rowimport org.apache.spark.HashPartitioner
val schema =StructType(Seq(StructField("x",StringType,false),StructField("y",LongType,false),StructField("z",DoubleType,false)))
val rdd = sc.parallelize(Seq(Row("foo",1L,0.5),Row("bar",0L,0.0),Row("??",-1L,2.0),Row("foo",-1L,0.0),Row("??",3L,0.6),Row("bar",-3L,0.99)))
val partitioner =newHashPartitioner(5)
val partitioned = rdd.map(r =>(r.getString(0), r)).partitionBy(partitioner).values
val df = sqlContext.createDataFrame(partitioned, schema)
由于从RDD创建DataFrame只需要一个简单的映射阶段现有的分区布局应该保留*:
assert(df.rdd.partitions == partitioned.partitions)
以同样的方式,您可以重新分区现有的DataFrame:
sqlContext.createDataFrame(
df.rdd.map(r =>(r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
所以看起来这不是不可能的。问题仍然存在,如果它是有意义的。我会说,大多数时候它不:
>重新分区是一个昂贵的过程。在典型情况下,大多数数据必须序列化,混排和反序列化。另一方面,可以从预分割数据中受益的操作数量相对较少,并且如果内部API未被设计为利用该属性,则进一步受限。
>在某些情况下联接,但它需要内部支持,
>窗口函数调用与匹配分区器。同上,限于一个窗口定义。它已经在内部进行了分区,因此预分区可能是多余的,
>使用GROUP BY的简单聚合 – 可以减少临时缓冲区**的内存占用,但总体成本要高得多。或多或少相当于groupByKey.mapValues(_。reduce)(当前行为)vs reduceByKey(预分区)。不太可能在实践中有用。
>使用SqlContext.cacheTable进行数据压缩。由于它看起来像是使用运行长度编码,应用OrderedRDDFunctions.repartitionAndSortWithinPartitions可以提高压缩率。
>性能高度依赖于密钥的分布。如果它是倾斜的,它将导致次优资源利用。在最坏的情况下,根本不可能完成这项工作。
>使用高级声明性API的一个重点是将自己与低级实现细节隔离开来。正如@dwysakowicz和@RomiKuntsman已经提到的,优化是Catalyst Optimizer的工作。它是一个非常复杂的野兽,我真的怀疑你可以轻松地改进,没有深入到它的内部。
使用JDBC源分区:
JDBC数据源支持predicates
argument.它可以如下使用:
sqlContext.read.jdbc(url, table,Array("foo = 1","foo = 3"), props)
它为每个谓词创建一个JDBC分区。请记住,如果使用单个谓词创建的集合不是不相交的,则会在结果表中看到重复的集合。
DataFrameWriter中的partitionBy方法:
Spark DataFrameWriter提供了partitionBy方法,可用于在写入时“分区”数据。它使用提供的列集分隔写入数据
val df =Seq(("foo",1.0),("bar",2.0),("foo",1.5),("bar",2.6)).toDF("k","v")
df.write.partitionBy("k").json("/tmp/foo.json")
这使得基于键的查询读取上的谓词下推:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k"==="bar")
但它不等同于DataFrame.repartition。特别是聚合:
val cnts = df1.groupBy($"k").sum()
仍将需要TunnstenExchange:
cnts.explain
// == Physical Plan ==// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])// +- TungstenExchange hashpartitioning(k#90,200), None// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
*分区布局我的意思是只有一个数据分布。分区RDD不再是分区器。
**假设没有早期预测。如果聚合仅覆盖列的小子集,则可能没有任何增益。
相关推荐
而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。Dataframe可以看作是具有模式信息的分布式数据集。Dataset是类型安全的DataFrame。 在本文中,将详细介绍Spark的RDD API,...
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
熟悉Spark的分区对于Spark性能调优很重要,本文总结Spark通过各种函数创建RDD、DataFrame时默认的分区数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些...
在大数据处理框架Spark中,RDD(弹性分布式数据集)和DataFrame是两种常用的抽象数据类型,它们分别对应于不同的操作接口和优化技术。RDD是最基础的分布式数据集合,提供了一种容错的并行处理数据的方式。而...
这里需要注意,`Spark.sql.*`系列的配置参数可以在创建`SparkConf`时设置,例如`spark.sql.shuffle.partitions`用于设置默认的分区数,`spark.sql.inMemoryColumnarStorage.compressed`用来决定DataFrame在内存中...
8. **动态分区裁剪**:在读取Hive表时,Spark SQL可以自动识别查询中的分区条件,只读取相关的分区,从而减少数据的I/O。 9. **倾斜键处理**:Spark SQL提供了处理数据倾斜的策略,如采样重分布和广播JOIN,以解决...
总的来说,Java操作Spark创建DataFrame涉及创建SparkSession、加载数据、转换数据、定义Schema以及保存结果。这个过程虽然相对Python或Scala来说代码较多,但提供了面向对象编程的灵活性,适合大型企业级应用。了解...
在大数据处理领域,`pandas` 和 `Spark DataFrame` 是两个重要的工具。`pandas` 是 Python 中用于数据处理和分析的库,而 `Spark DataFrame` 是 Apache Spark 的核心组件,提供了一种分布式数据处理能力。本文将详细...
** RDD是Spark的基础数据抽象,是弹性分布式数据集,是不可变、分区的记录集合,支持并行操作。 2. **Spark的架构** - **Spark的工作模式有哪几种?** 本地模式、集群模式(如Standalone、YARN、Mesos)和...
2. **RDD概念**:RDD的定义,其不可变性和分区特性,以及转换和行动操作的原理。 3. **Spark架构**:Driver、Executor、Cluster Manager的角色,以及如何通过SparkContext启动计算任务。 4. **内存管理**:Spark如何...
Spark SQL可以通过DataFrame API进行数据处理,它允许开发者使用SQL或DataFrame API进行数据查询。DataFrame可以看作是跨语言的表,支持多种数据源,如Hive、Parquet、JSON等。 **6. Spark Streaming的应用** ...
3. Spark SQL与DataFrame/Dataset:Spark SQL引入DataFrame和Dataset,提供更接近于传统SQL的接口,使得数据处理更加方便。DataFrame是基于Schema的RDD,而Dataset则结合了RDD的性能和强类型的优势。 4. Spark ...
在Spark中,可以自定义窗口函数实现滑动平均,例如使用`window`函数和`avg`函数,定义一个窗口大小,对每个窗口内的数据进行平均。 六、预测与评估 预测完成后,我们需要评估模型的性能。这通常通过比较预测值与...
5. **Spark SQL与DataFrame/Dataset**:探讨Spark SQL如何提供SQL接口进行结构化数据处理,介绍DataFrame和Dataset的概念,它们如何作为统一的数据抽象层,简化编程模型。 6. **Spark Streaming**:讲解Spark ...
2. **第3章**:Spark SQL与DataFrame - DataFrame的引入,对比RDD的优势 - Spark SQL的基础概念,如何创建DataFrame - DataFrame的转换和行动操作 - Hive与Spark SQL的集成,使用Hive metastore - Spark SQL的...
2. 创建Spark应用:使用Scala编写Spark程序,创建SparkContext并定义数据处理逻辑。 3. 数据读写:使用Spark的DataFrame或RDD API读取HDFS、Hive、Cassandra等数据源,或者通过Spark SQL执行SQL查询。 4. 并行计算:...
3. DataFrame:基于Spark SQL的DataFrame,提供了更高级别的数据操作接口,适用于结构化数据处理。 4. Dataset:DataFrame的类型安全版本,支持强类型和编译时检查。 5. Spark Streaming:用于实时流处理,通过微...
- **Spark SQL的DDL**:创建表、视图,定义分区等。 - **DataFrame/Dataset的交互式查询**:通过SparkSession,可以进行交互式SQL查询。 5. **Spark Streaming** - **DStream(Discretized Stream)**:连续的...
- **RDD API的支持**:除了DataFrame外,SparkR还支持传统的RDD API,提供了一系列常用的数据处理操作,如数据缓存控制、数据保存、数据转换、数据分组与聚合、RDD间join操作、排序操作以及重分区操作等。...