概述
本文介绍Spark SQL增加的Columnar模块代码实现。
首先介绍Columnar内的代码结构和实现,然后介绍在SqlContext里的使用方式。
Columnar
InMemoryColumnarTableScan
实现
InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划。

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
extends LeafNode {
传入的child是一个SparkPlan(确认了的物理执行计划)和一个属性序列。
行转列并cache的过程如下:
lazy val cachedColumnBuffers = {
val output = child.output
// 遍历每个RDD的partiti on
val cached = child.execute().mapPartitions { iterator =>
// 把属性Seq转换成为ColumnBuilder数组
val columnBuilders = output.map { attribute =>
// 都是基本ColumnBuilder,默认ByteBuffer大小
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
}.toArray
var row: Row = null
// RDD每个Partition的Rows,每个Row的所有field信息存到ColumnBuilder里
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
}
Iterator.single(columnBuilders.map(_.build()))
}.cache()
cached.setName(child.toString)
// Force the materialization of the cached RDD.
cached.count()
cached
}
ColumnType类用于表示Column的类型,他的typeId变量用来区分数据类型,生成对应的ColumnBuilder(typeId, initialSize=0, columnName)。ColumnBuilder的生成如下:
def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
val builder = (typeId match {
case INT.typeId => new IntColumnBuilder
case LONG.typeId => new LongColumnBuilder
case FLOAT.typeId => new FloatColumnBuilder
case DOUBLE.typeId => new DoubleColumnBuilder
case BOOLEAN.typeId => new BooleanColumnBuilder
case BYTE.typeId => new ByteColumnBuilder
case SHORT.typeId => new ShortColumnBuilder
case STRING.typeId => new StringColumnBuilder
case BINARY.typeId => new BinaryColumnBuilder
case GENERIC.typeId => new GenericColumnBuilder
}).asInstanceOf[ColumnBuilder]
builder.initialize(initialSize, columnName)
builder
}
他的继承结构如下,主要有三大体系:

这里涉及到的是Basic这个体系,继承结构如下:

BasicColumnBuilder里,initialSize = 0,指使用ByteBuffer的默认大小,即10*1024*104。然后在initialize()方法,会初始化ByteBuffer。
接下来,针对RDD每个partition,
var row: Row = null
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
}
进行了appendFrom操作:
override def appendFrom(row: Row, ordinal: Int) {
val field = columnType.getField(row, ordinal)
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
columnType.append(field, buffer)
}
用于把一个Row的每一个field,都存到一个ColumnBuilder里。在这里指BasicColumnBuilder这个类,维护了一个自己的ByteBuffer,把row里的各个field信息都存在了buffer里。
最后ColumnBuilders里的每个ColumnBuilder进行build(),即BasicColumnBuilder.build()方法,进行了一次ByteBuffer的rewind()方法。
这个方法的结果是一个RDD集合。由于在结束前调用了.count()方法,所以RDD的计算是被执行了的,返回的是新的RDD。
在Spark SQL里,外部调用cachedColumnBuffers方法只有在uncache table的时候,进行了unpersisit()操作。
下面看execute()方法:
override def execute() = {
cachedColumnBuffers.mapPartitions { iterator =>
// 在RDD partition里,iterator.next()返回的是一个ByteBuffer
// 也就是说,cachedColumnBuffers返回的结果RDD,类型是ByteBuffer
val columnBuffers = iterator.next()
assert(!iterator.hasNext)
new Iterator[Row] {
// 访问每一个ByteBuffer里的列信息
val columnAccessors = columnBuffers.map(ColumnAccessor(_))
val nextRow = new GenericMutableRow(columnAccessors.length)
override def next() = {
var i = 0
// 把column里的信息再转到Row里
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
i += 1
}
nextRow
}
override def hasNext = columnAccessors.head.hasNext
}
}
}
使用
在SqlContext里选择cache table的时候,会使用该类。
其实在cache的时候,首先去catalog里寻找这个table的信息和table的执行计划,然后会进行执行(执行到物理执行计划生成),然后把这个table再放回catalog里维护起来,这个时候的执行计划已经是最终要执行的物理执行计划了。但是此时Columner模块相关的转换等操作都是没有触发的。
真正的触发还是在execute()的时候,同其他SparkPlan的execute()方法触发场景是一样的。
ColumnBuilder 与 ColumnAccessor
一个包装Row的每个field成Column;一个访问column,然后可以转回Row
关于压缩
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
override val columnStats: NativeColumnStats[T],
override val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
with NullableColumnBuilder
with AllCompressionSchemes
with CompressibleColumnBuilder[T]
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
从继承结构看,压缩的builder和Accessor都以trait的方式继承了ColumnBuilder,而子类比如IntColumnBuilder,不但继承了BaseColumnBuilder,同时也具备压缩处理能力。
具体压缩处理可以参考CompressibleColumnBuilder类里的实现。
是否压缩会做一次判断,压缩比在0.8以下才执行压缩。
在build()的时候实施压缩,并且按照以下结构存在bytebuffer内。
* .--------------------------- Column type ID (4 bytes)
* | .----------------------- Null count N (4 bytes)
* | | .------------------- Null positions (4 x N bytes, empty if null count is zero)
* | | | .------------- Compression scheme ID (4 bytes)
* | | | | .--------- Compressed non-null elements
* V V V V V
* +---+---+-----+---+---------+
* | | | ... | | ... ... |
* +---+---+-----+---+---------+
* \-----------/ \-----------/
* header body
CompressionScheme子类是不同的压缩实现
都是scala实现的,未借助第三方库。不同的实现,指定了支持的column data类型。在build()的时候,会比较每种压缩,选择压缩率最小的(若仍大于0.8就不压缩了)。
这里的估算能力,在子类实现里,好像是由gatherCompressibilityStats方法实现的。
SqlContext
分析SqlContext内目前cache和uncache table的实现细节与Columnar的关系。
Cache Table
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
// 得到的是一个logicalPlan
val currentTable = catalog.lookupRelation(None, tableName)
// 物理执行计划生成之后交给InMemoryColumnarTableScan
val asInMemoryRelation =
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
// SparkLogicalPlan接受的Plan必须是已经确定plan好的SparkPlan
catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
}
从上面那段代码可以看到,cache之前,需要先把本次cache的table的物理执行计划生成出来。上述的currentTable其实是一个logicalPlan,来自catalog的lookupRelation。
最后注册表的时候,涉及到的SparkLogicalPlan类是LogicalPlan的实现类(但是本身其实是一个SparkPlan),它接受的是SparkPlan,并且是已经确定Plan好了的逻辑执行计划,目前接受两类:ExistingRdd和InMemoryColumnarTableScan。
在cache这个过程里,InMemoryColumnarTableScan并没有执行,但是生成了以InMemoryColumnarTableScan为物理执行计划的SparkLogicalPlan,并存成table的plan。
Uncache Table
在这一步,除了删除catalog里的table信息之外,还调用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,并进行了unpersist()操作。cacheColumnBuffers方法具体见Columner内,主要做了把RDD每个partition里的ROW的每个Field存到了ColumnBuilder内。
全文完 :)
分享到:
相关推荐
本资料“Spark SQL源码概览.zip”包含了一份详细的Spark SQL源码分析,主要面向Java开发人员,旨在帮助他们深入理解Spark SQL的内部工作机制。以下是基于该主题的详细知识点: 1. **Spark SQL架构**: Spark SQL的...
Spark源码分析主要涉及以下几个关键模块: 1. DAGScheduler:负责构建DAG,并将其拆分为任务阶段。 2. TaskScheduler:调度和分配任务到工作节点。 3. Executor:执行实际任务,管理内存和线程。 4. ShuffleManager...
Spark SQL结合了Spark的高性能计算能力与SQL的易用性,使得开发者能够以SQL或者DataFrame API进行复杂的数据分析。下面我们将详细解析Spark SQL的关键知识点。 1. **DataFrame API**: Spark SQL的核心是DataFrame...
Spark 在财务系统中的应用主要讲述了 Spark SQL 在财务系统中的应用,特别是 Spark SQL 的前身 Shark,以及 Spark SQL 的发展和特性。文章首先介绍了 Spark SQL 的能力,如何重构财务系统的供应商结算系统,然后...
9. **Python和R支持**:Spark 2.0增强了对Python和R语言的支持,Pyspark和SparkR库提供了丰富的API,使得数据科学家可以更方便地利用Spark进行大数据分析。 10. **集成与API**:Spark 2.0加强了与其他开源项目的...
Spark SQL是Spark框架中用于处理结构化数据的模块,它提供了一个高效、容错的处理方式,并且原生支持Hive,可以运行现有的HiveSQL语句。Spark SQL的主要优势在于其性能,对于很多数据处理任务,Spark SQL比Hive快10...
因此,Spark社区提出了Spark SQL项目,汲取了Shark的优点,如内存列存储(In-Memory Columnar Storage)和Hive兼容性等,重新开发了Spark SQL代码。Spark SQL提供了高效的数据处理能力和灵活的查询能力。 四、Spark...
随着Spark社区的发展壮大和技术的进步,Spark团队决定推出一个全新的SQL处理框架——SparkSQL,以取代Shark。SparkSQL不仅继承了Shark的优点,如内存列存储(In-Memory Columnar Storage)和Hive兼容性,还克服了Shark...
接下来是Structured Streaming,这是一种在Spark SQL引擎上运行的流处理技术,其特点是快速、可扩展、容错能力强,提供丰富、统一、高级的API。Structured Streaming能有效地处理复杂的数据和工作负载。由于其丰富的...
- **ORC (Optimized Row Columnar)**:另一种高效的列式存储格式,专为 Hadoop 生态系统设计。 #### 向量化优化 **向量化**是指将多个数据元素一起处理的技术,通常通过硬件加速器如 SIMD (Single Instruction ...
Chapter 10, Columnstore Indexes, revises the columnar storage and then explores the huge improvements for columnstore indexes in SQLServer 2016: updateable nonclustered columnstore indexes, column...
齐奥·科伦纳尔 即将搬到zio org
Apache Parquet和Apache Arrow是开源社区的成果,它们已经被广泛采用并集成到各种数据分析工具和大数据处理框架中,如Hadoop、Spark和Impala等。 总结来说,《藏经阁-The Columnar Era_ Leveraging P.pdf》强调了列...
藏经阁-The Columnar Era_ Leveraging Parquet, Arrow and Kudu for ...本文介绍了基于 Parquet、Arrow 和 Kudu 的高性能分析技术,讨论了 Columnar representation 的优点和缺点,并展示了 Parquet 的存储格式和优点。
SparkSQL是Apache Spark框架中的一个核心模块,专为处理结构化数据而设计。它在Spark 3.0版本中提供了高效、灵活的数据处理能力,旨在融合SQL查询与Spark的分布式计算优势。SparkSQL的主要目标是简化大数据处理,...
时序分析在 Spark 中的应用 时序分析(Time Series Analytics)是指对具有时间维度的数据进行分析和处理,以便挖掘出隐藏的模式和规律。在 Apache Spark 中,有一个开源的时序分析库 Spark-Timeseries,用于高性能...
1. 数据分布策略:Teradata使用一种称为“全列存储”(Columnar Storage)的方法,将数据分散在多个节点上,每个节点负责一部分数据。这种分布策略可以极大地提高数据处理速度,尤其是在执行聚合操作时。 2. 并行...
- Columnar and Grouped Reports - Bi Dashboards and Elements - Synchronizing Groups, Charts, and Sparklines - Chart and Gauge Reports - Creating a Personal Report Card - Creating a Multi-Series Multi-Y...
* 语法树分析:将 Excel 公式解析成语法树,然后将其转换为 Spark 程序。 知识点五: Excel as a DSL * Excel 作为领域特定语言(DSL):使用 Excel 电子表格作为一种领域特定语言,以便快速原型设计数据管道。 * ...