`
bit1129
  • 浏览: 1069532 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark105】Spark SQL动态代码生成一

 
阅读更多

 

2015-09-02 14:46:27,681-[TS] DEBUG Executor task launch worker-0 org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection - code for input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]:

 

日志中的如下信息是如何产生的,这是列及其类型么?

input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]

 

 

代码:

 

 

/**
 * A projection that returns UnsafeRow.
 */
abstract class UnsafeProjection extends Projection {
  //将IntervalRow转换为UnsafeRow
  override def apply(row: InternalRow): UnsafeRow
}
 

 

2. UnsafeRow的pointTo方法
  /**
   * Update this UnsafeRow to point to different backing data.
   *
   * @param baseObject the base object
   * @param baseOffset the offset within the base object
   * @param numFields the number of fields in this row
   * @param sizeInBytes the size of this row's backing data, in bytes
   */
  public void pointTo(Object baseObject, long baseOffset, int numFields, int sizeInBytes) {
    assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
    this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
    this.baseObject = baseObject;
    this.baseOffset = baseOffset;
    this.numFields = numFields;
    this.sizeInBytes = sizeInBytes;
  }
 
 
 

 

package org.apache.spark.sql.test;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;

///在Spark的代码中中没有UnsafeProjection的子类,UnsafeProjection的子类是动态生成的
//UnsafeProjection是一个抽象类,子类需要实现apply方法
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
  
  ///涉及的expressions
  private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
  
  //apply方法返回的结果
  private UnsafeRow convertedStruct10;
  private byte[] buffer11;
  private int cursor12;
  
  
  public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {
    this.expressions = expressions;
    this.convertedStruct10 = new UnsafeRow();
    //buffer11是字节数组,长度为48
    this.buffer11 = new byte[48];
    this.cursor12 = 0;
  }
  
  // Scala.Function1 need this
//  public Object apply(Object row) {
//    return apply((InternalRow) row);
//  }
  
  public UnsafeRow apply(InternalRow i) {
    
    //cursor12首先复制为48,这个更buffer11的长度一样
    cursor12 = 48;

    //这步操作是对convertedStruct10(UnsafeRow)的一些属性更新更新
    //第二个参数是baseOffset,值是
    //第三个参数(值5)是Row中的列数
    //第四个参数cursor12表示这个Row的sizeInBytes(the size of this row's backing data, in bytes)
    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, cursor12);
    
    /* input[0, StringType] */
      
   //第一列是否为null
    boolean isNull0 = i.isNullAt(0); 

    //不为null,则通过调用getUTF8String获取其UTF8String数据
    UTF8String primitive1 = isNull0 ? null : (i.getUTF8String(0));
    
    //获取第一列的字节数(如果是null,则为0),加到cursor12上。
    int numBytes14 = cursor12 + (isNull0 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive1));
    //如果第一列不为null 
   if (buffer11.length < numBytes14) {
      // This will not happen frequently, because the buffer is re-used.
      //扩容
      byte[] tmpBuffer13 = new byte[numBytes14 * 2];
      //将buffer11的数据复制到tmpBuffer13,然后将tmpBuffer13复制给buffer11,此时buffer11完成了扩容的工作
      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
      buffer11 = tmpBuffer13;
    }
    //更新值
    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes14);
    
    
    if (isNull0) {
      convertedStruct10.setNullAt(0);
    } else {
      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 0, cursor12, primitive1);
    }
    
    
    /* input[1, StringType] */
    
    boolean isNull2 = i.isNullAt(1);
    UTF8String primitive3 = isNull2 ? null : (i.getUTF8String(1));
    
    
    int numBytes15 = cursor12 + (isNull2 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive3));
    if (buffer11.length < numBytes15) {
      // This will not happen frequently, because the buffer is re-used.
      byte[] tmpBuffer13 = new byte[numBytes15 * 2];
      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
      buffer11 = tmpBuffer13;
    }
    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes15);
    
    
    if (isNull2) {
      convertedStruct10.setNullAt(1);
    } else {
      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 1, cursor12, primitive3);
    }
    
    
    /* input[2, StringType] */
    
    boolean isNull4 = i.isNullAt(2);
    UTF8String primitive5 = isNull4 ? null : (i.getUTF8String(2));
    
    
    int numBytes16 = cursor12 + (isNull4 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive5));
    if (buffer11.length < numBytes16) {
      // This will not happen frequently, because the buffer is re-used.
      byte[] tmpBuffer13 = new byte[numBytes16 * 2];
      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
      buffer11 = tmpBuffer13;
    }
    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes16);
    
    
    if (isNull4) {
      convertedStruct10.setNullAt(2);
    } else {
      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 2, cursor12, primitive5);
    }
    
    
    /* input[3, StringType] */
    
    boolean isNull6 = i.isNullAt(3);
    UTF8String primitive7 = isNull6 ? null : (i.getUTF8String(3));
    
    
    int numBytes17 = cursor12 + (isNull6 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive7));
    if (buffer11.length < numBytes17) {
      // This will not happen frequently, because the buffer is re-used.
      byte[] tmpBuffer13 = new byte[numBytes17 * 2];
      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
      buffer11 = tmpBuffer13;
    }
    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes17);
    
    
    if (isNull6) {
      convertedStruct10.setNullAt(3);
    } else {
      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 3, cursor12, primitive7);
    }
    
    
    /* input[4, StringType] */
    
    boolean isNull8 = i.isNullAt(4);
    UTF8String primitive9 = isNull8 ? null : (i.getUTF8String(4));
    
    
    int numBytes18 = cursor12 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive9));
    if (buffer11.length < numBytes18) {
      // This will not happen frequently, because the buffer is re-used.
      byte[] tmpBuffer13 = new byte[numBytes18 * 2];
      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
      buffer11 = tmpBuffer13;
    }
    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes18);
    
    
    if (isNull8) {
      convertedStruct10.setNullAt(4);
    } else {
      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 4, cursor12, primitive9);
    }
    
    
    
    return convertedStruct10;
  }
}

 

分享到:
评论
1 楼 tivan 2016-01-28  
你好,
这个代码生成主要在,那个地方使用。

相关推荐

    基于antlr4 解析器,支持spark sql, tidb sql, flink sql, Sparkflink运行命令解析器

    在本项目中,ANTLR4被用来创建一个解析器,这个解析器支持多种SQL方言,包括Spark SQL、TiDB SQL以及Flink SQL,同时还支持Spark和Flink的运行命令解析。 Spark SQL是Apache Spark的一个组件,主要负责处理结构化的...

    剥离的Parser模块,用于查看Spark SQL语法解析SQL后生成的语法树

    在"ANTLR4-SqlBase-master"这个压缩包中,包含了使用ANTLR4工具生成的SqlBase解析器的相关代码,SqlBase是Apache Calcite项目的一个子项目,提供了SQL标准的语法定义。Spark SQL的解析层就是基于SqlBase的语法规则来...

    spark sql解析-源码分析

    1. **DataFrame API**:DataFrame是Spark SQL的核心概念,它是一种分布式数据集合,提供了类似SQL的API用于数据操作。DataFrame是基于RDD(弹性分布式数据集)的概念,但增加了类型安全和元数据,使得数据处理更加...

    Spark DataFrame

    此外,Spark SQL作为一个新模块,引入了关系型处理和Spark的功能编程API的紧密集成。相比于之前的一些系统,Spark SQL提供了更加紧密的关系型与过程型处理之间的集成,通过声明式DataFrame API,可以和过程式的Spark...

    Learning Spark SQL - Aurobindo Sarkar

    1. Spark SQL基础:介绍Spark SQL的基本概念和环境设置。 2. DataFrame和Dataset API:讲解DataFrame和Dataset的创建、操作及转换。 3. SQL支持:解释如何使用SQL语句进行查询,包括JOIN、GROUP BY、ORDER BY等操作...

    mastering-spark-sql.pdf

    代码生成则是一种JIT编译技术,可以将Spark SQL生成的中间表示转换为高效执行的Java字节码。 文档还提到了Catalyst优化器,它是Spark SQL的查询优化引擎。Catalyst优化器使用一种领域特定语言(DSL),可以根据各种...

    Spark SQL 实验

    Spark SQL是Apache Spark的一个模块,它提供了对结构化数据的查询和处理能力。它允许用户使用SQL查询语言对分布式数据集进行查询和分析。Spark SQL不仅支持SQL标准,还支持 HiveQL,同时兼容Hive的表和UDF(用户定义...

    Spark SQL操作大全.zip

    - **代码生成**:Spark SQL会将DataFrame操作转化为高效的执行计划,利用代码生成技术减少运行时的反射开销。 - **广播变量**:对于小表,可以使用广播join来减少网络传输,提高性能。 - **分区裁剪**:根据查询...

    Spark SQL 2.3.0:深入浅出

    提交后,Spark SQL会生成一个未完全解析的逻辑执行计划,接着利用内部的schema信息生成一个完整的逻辑执行计划。然后,该逻辑执行计划会被优化,最终生成一个优化后的逻辑执行计划。这个过程的核心是由Catalyst完成...

    Spark.sql数据库部分的内容

    6. **性能优化**:Spark SQL采用了 Catalyst 编译器进行查询优化,包括代码生成、列式存储、过滤推导等,以提高查询性能。 7. **跨语言支持**:Spark SQL允许不同语言之间的交互,例如,Python用户可以创建...

    Spark SQL源码概览.zip

    Spark SQL是Apache Spark项目的一部分,它提供了一个用于处理结构化数据的强大框架,允许开发者使用SQL或者DataFrame/Dataset API来查询数据。本资料“Spark SQL源码概览.zip”包含了一份详细的Spark SQL源码分析,...

    Spark编程基础:Spark SQL单元测验与答案.pdf

    Spark SQL 是 Spark 生态系统中的一个基于 SQL 的查询引擎,提供了高效、灵活的数据处理能力。本文档将从 Spark SQL 的架构、功能、应用场景等方面对知识点进行总结。 一、Spark SQL 架构 Spark SQL 的架构基于 ...

    Spark SQL上海摩拜共享单车数据分析源码

    7. 执行优化:Spark SQL通过 Catalyst 编译器进行查询优化,包括列式存储、代码生成、动态分区裁剪等,以提高查询性能。 这个项目提供了完整的源码,对于学习如何使用Spark SQL进行大数据分析的开发者来说,这是一...

    Spark-Sql源码解析

    SparkPlanner 是 Spark-Sql 源码解析中的一个关键组件,负责生成执行计划。SparkPlanner 负责将物理计划转换为可执行的计划,以便在 Spark 集群中执行。 QueryExecution 是 Spark-Sql 源码解析中的一个关键组件,...

    mastering-spark-sql

    5. ** Catalyst优化器**:Spark SQL使用Catalyst作为其查询优化器,它可以自动进行各种查询优化,如代码生成、列裁剪、谓词下推等,提高查询性能。 6. **DataFrame的编程接口**:Spark SQL提供了Scala、Java、...

    spark框架SQL部分解析过程

    逻辑计划生成是 Spark SQL 的核心组件之一。在逻辑计划生成阶段,Spark SQL 会将抽象语法树转换为逻辑计划,然后对逻辑计划进行优化。逻辑计划优化是 Spark SQL 的关键步骤之一,Spark SQL 会根据查询的特点和数据...

    spark源代码部署及编译生成

    - **SQL**:Spark SQL提供了一种用于处理结构化数据的API,它集成了Hive查询语言。 - **Streaming**:Spark Streaming提供了基于微批处理的实时数据流处理能力。 - **MLlib**:Spark的机器学习库,包含各种算法和...

    27:Spark2.3.x SQL大数据项目离线分析.rar

    Catalyst优化器负责转换和优化查询计划,Tungsten则通过自动生成Java字节码来提高执行效率,Code Generation能够将SQL查询转换为高效执行的本地代码。 三、离线分析流程 1. 数据加载:离线分析通常涉及从HDFS、Hive...

    Spark编程基础:Spark SQL单元测验与答案.docx

    * Spark SQL 执行计划生成和优化都由 Catalyst(函数式关系查询优化框架)负责。 二、DataFrame 操作 * DataFrame 是一种以RDD为基础的分布式数据集,提供了详细的结构信息。 * DataFrame 支持多种数据源,包括 ...

Global site tag (gtag.js) - Google Analytics