`

Spark SQL运行 过程 抄的别人的,记录 学习

 
阅读更多
抄的别人的,觉得写的特别好


val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
    buildStaticConf("spark.sql.filesourceTableRelationCacheSize")


org.apache.spark.sql.catalyst.catalog.SessionCatalog#tableRelationCache

  private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
    val cacheSize = conf.tableRelationCacheSize
    CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
  }

这里记录的是  QualifiedTableName 和 LogicalPlan 的对应关系。

每个表 和这个表的关系



这里是处理了刷新表  ,这里有好几个 Map 都存了什么
tableRelationCache
tempTables
globalTempViewManager



def refreshTable(name: TableIdentifier): Unit = synchronized {
    val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
    val tableName = formatTableName(name.table)

    // Go through temporary tables and invalidate them.
    // If the database is defined, this may be a global temporary view.
    // If the database is not defined, there is a good chance this is a temp table.
    if (name.database.isEmpty) {
      tempTables.get(tableName).foreach(_.refresh())
    } else if (dbName == globalTempViewManager.database) {
      globalTempViewManager.get(tableName).foreach(_.refresh())
    }

    // Also invalidate the table relation cache.
    val qualifiedTableName = QualifiedTableName(dbName, tableName)
    tableRelationCache.invalidate(qualifiedTableName)
  }








https://blog.csdn.net/junerli/article/details/78607708

meta 这一块
org.apache.spark.sql.catalyst.catalog.SessionCatalog

org.apache.spark.sql.hive.HiveSessionCatalog

private[sql] class HiveSessionCatalog(
    externalCatalog: HiveExternalCatalog,
    globalTempViewManager: GlobalTempViewManager,
    val metastoreCatalog: HiveMetastoreCatalog,
    functionRegistry: FunctionRegistry,
    conf: SQLConf,
    hadoopConf: Configuration,
    parser: ParserInterface,
    functionResourceLoader: FunctionResourceLoader)
  extends SessionCatalog(
      externalCatalog,
      globalTempViewManager,
      functionRegistry,
      conf,
      hadoopConf,
      parser,
      functionResourceLoader) {








Spark SQL运行流程

    将SQL语句通过词法和语法解析生成未绑定的逻辑执行计划(Unresolved LogicalPlan),包含Unresolved Relation、Unresolved Function和Unresolved Attribute,然后在后续步骤中使用不同的Rule应用到该逻辑计划上
Analyzer使用Analysis Rules,配合元数据(如SessionCatalog 或是 Hive Metastore等)完善未绑定的逻辑计划的属性而转换成绑定的逻辑计划。具体流程是县实例化一个Simple Analyzer,然后遍历预定义好的Batch,通过父类Rule Executor的执行方法运行Batch里的Rules,每个Rule会对未绑定的逻辑计划进行处理,有些可以通过一次解析处理,有些需要多次迭代,迭代直到达到FixedPoint次数或前后两次的树结构没变化才停止操作。
Optimizer使用Optimization Rules,将绑定的逻辑计划进行合并、列裁剪和过滤器下推等优化工作后生成优化的逻辑计划。
Planner使用Planning Strategies,对优化的逻辑计划进行转换(Transform)生成可以执行的物理计划。根据过去的性能统计数据,选择最佳的物理执行计划CostModel,最后生成可以执行的物理执行计划树,得到SparkPlan。
在最终真正执行物理执行计划之前,还要进行preparations规则处理,最后调用SparkPlan的execute执行计算RDD。
三、SparkContext运行原理分析
    前面我们队Spark SQL运行架构进行分析,知道从输入SQL语句到生成Dataset分为5个步骤,但实际运行过程中在输入SQL语句之前,Spark还有加载SessionCatalog步骤。
3.1 使用SessionCatalog保存元数据
    在解析SQL语句之前需要初始化SQLContext,它定义了Spark SQL执行的上下文,并把元数据保存在SessionCatalog中,这些元数据包括表名称、表字段名称和字段类型等。
    SessionCatalog中保存的是表名和逻辑执行计划对应的哈希列表,这些数据将在解析未绑定的逻辑计划上使用。(SessionCatalog中的表名对应的逻辑执行计划是什么?是这个Dataset对应的逻辑执行计划)
3.2 使用Antlr生成未绑定的逻辑计划
    Spark 2.0版本起使用Antlr进行词法和语法解析。使用Antlr生成未绑定的逻辑计划分为两个阶段:第一阶段的过程为词法分析(Lexical Analysis),负责将符号(Token)分组成符号类(Token class or Token type),第二阶段就是真正的Parser,默认Antlr会构建出一颗分析树(Parser Tree)或者叫语法树(Syntax Tree)。
    SQLContext类中定义了SQL的解析方法parseSql。具体的SQL解析在AbastrctSqlParser抽象类中的parse方法进行,解析完毕后生成语法树,语法树会根据系统初始化的AstBuilder解析生成表达式、逻辑计划或表标识对象。
    在AbstractSqlParse的parse方法中,先实例化词法解析器SqlBaseLexer和语法解析器SqlBaseParser,然后尝试用Antlr较快的解析模式SLL,如果解析失败,则会再尝试使用普通解析模型LL,解析完毕后返回解析结果。
3.3 使用Analyzer绑定逻辑执行计划
    该阶段Analyzer使用Analysis Rules,结合SessionCatalog元数据,对未绑定的逻辑计划进行解析,生成了已绑定的逻辑计划。在该处理过程中,先实例化一个Analyzer,在Analyzer中定义了FiexedPoint和Seq[Batch]两个变量,其中FixedPoint为迭代次数的上线,而Seq[Batch]为所定义需要执行批处理的序列,每个批处理由一系列Rule和策略所组成,策略一般分为Once和FixedPoint(可理解为迭代次数)
3.4 使用Optimizer优化逻辑计划
    Optimizer的实现和处理方式同Analyzer类似,在该类中定义一系列Rule并同样继承于RuleExecutor。利用这些Rule对逻辑计划和Expression进行迭代处理,从而达到对树的节点进行合并和优化。其中主要的优化策略总结起来是合并、列裁剪和过滤器下推等几大类。
    Optimizer的优化策略不仅对已绑定的逻辑计划进行优化,而且对逻辑计划中的Expression也进行了优化,其原理就是遍历树,然后应用优化Rule,但是注意一点,对逻辑计划处理时先序遍历,而对Expression的处理是后续遍历(根据树节点类型来判断是逻辑计划还是Expression吗?)
3.5 使用SparkPlanner生成可执行物理计划
    SparkPlanner使用Planning Strategies对优化的逻辑计划进行转换(Transform),生成可以执行的物理计划。在QueryExecution类代码中,调用SparkPlanner.plan方法对优化的逻辑计划进行处理,而SparkPlanner并未定义plan方法,实际是调用SparkPlanner的祖父类QueyrPlanner的plan方法,然后会返回一个Iterator[Physicalplan]。
    SparkPlanner继承于SparkStrategies,而SparkStrategies继承了QueryPlanner。其中SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的GenericStrategy。在SparkPlanner通过改写祖父类QueryPlanner中strategies策略变量,在该变量中定义了转换成物理计划所执行的策略。
3.6 使用QueryExecution执行物理计划
    在该步骤中先调用SparkPlanner.preparations对物理计划进行准备工作,规范返回数据行的格式等,然后调用SparkPlan.execute执行物理计划,从数据库中查询数据并生成RDD。
    SparkPlan的preparations其实是一个RuleExecutor[SparkPlan],它会调用RuleExecutor的execut方法对前面生成的物理计划应用Rule进行匹配,最终生成一个SparkPlan。
    SparkPlan继承于QueryPlan,SparkPlan中定义了SQL语句执行的execute方法,执行完execute方法返回的是一个RDD,之后可以运行Spark作业对该RDD进行操作。
分享到:
评论

相关推荐

    实训指导书_使用Spark SQL进行法律服务网站数据分析.zip

    Spark SQL支持加密和访问控制,确保数据在传输和处理过程中不被非法获取。同时,应遵循数据最小化原则,只处理必要的数据,并对敏感信息进行脱敏处理。 总结,本实训指导书将引导你通过Spark SQL进行法律服务网站的...

    mastering-spark-sql

    通过学习《Mastering Spark SQL》,读者可以掌握如何利用Spark SQL进行大数据处理,提高数据分析的效率和质量。对于Java开发者来说,这本书将帮助他们更好地理解和应用Spark SQL在Java项目中的实践。

    Spark.sql数据库部分的内容

    1. **兼容性**:Spark SQL支持通过Hive的元数据、SQL语法和Hive SerDes与Hive集成,使得在Spark上可以无缝地运行Hive的工作负载。 2. **DataFrame API**:DataFrame API提供了强类型和静态类型的API,支持Scala、...

    日志分析Spark SQL 的世界

    日志数据是系统运行过程中产生的记录信息,包含了丰富的业务行为和系统状态,如用户活动、系统错误、性能指标等。通过有效的日志分析,我们可以发现问题、优化系统性能、提升用户体验,甚至挖掘出有价值的商业洞察。...

    spark运行原理解析

    ### Spark运行原理解析 #### 一、Spark简介与核心价值 Spark是一个开源的大数据处理框架,它提供了统一的数据处理接口,能够支持多种类型的数据处理任务,如批处理、流处理、交互式查询以及机器学习等。Spark的...

    spark sql 代码实现

    这个示例展示了如何使用 Spark SQL 处理和分析数据,从读取文件到执行 SQL 查询,再到打印结果,整个过程都是在 Spark 的分布式环境中进行的。在 Spark SQL 中,数据被表示为 DataFrame 或 Dataset,这些数据结构...

    spark笔记整理文档

    与Hadoop MapReduce相比,Spark通过内存计算显著提升了迭代算法的执行效率,同时支持多种数据处理模型,包括批处理、交互式查询(Spark SQL)、流处理(Spark Streaming)和机器学习(MLlib)。 2. Spark核心组件:...

    spark相关学习资料.zip

    4. **Spark SQL**:Spark SQL是Spark用于结构化数据处理的模块,它可以与Hive兼容,支持SQL查询,并且可以与DataFrame和Dataset API集成,提供更高效的查询性能。 5. **Spark Streaming**:Spark Streaming处理实时...

    spark_学习demo

    - **RDD(弹性分布式数据集)**:Spark的核心数据抽象,是不可变、分区的记录集合,可以在集群中进行并行操作。 - **DAG(有向无环图)**:Spark的任务调度基于DAG,它表示了任务间的依赖关系。 - **Job、Stage、...

    Spark入门(完整版)

    这份PDF教程涵盖了Spark的生态系统、部署与安装、编程模型、运行框架,以及相关的高级特性,如流处理、SQL支持、机器学习库MLlib和图计算库GraphX,还有Tachyon存储系统。 一、Spark生态圈 Spark生态圈包括一系列...

    spark大数据案例

    在这些案例中,你会看到如何在Hadoop上部署和运行Spark作业,以及如何读写HDFS上的数据。同时,Spark也支持Hadoop MapReduce和YARN资源管理系统,这使得Spark能充分利用Hadoop生态的资源。 总结,本压缩包中的Spark...

    Spark-2.4.5官网下载源码包

    Spark-2.4.5是该框架的一个稳定版本,提供了丰富的数据处理功能,包括批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)以及机器学习(通过MLlib库)和图计算(通过GraphX)。这个版本的源码...

    spark简介及使用

    Spark 通过记录 RDD 的血缘关系(lineage)来实现容错。如果某个 RDD 的部分分区丢失,Spark 可以通过重新计算丢失的分区来恢复,这个过程依赖于 RDD 的转换历史。这种机制使得 Spark 在处理大规模数据时既高效又...

    Spark编程指南中文版

    它支持批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)和机器学习(MLlib)等多种应用场景。 2. **Spark架构**:Spark的核心组件包括Driver Program、Executor、Cluster Manager和Storage System...

    Spark开发指导文档

    5. 集群部署:可以将Spark应用提交到YARN、Mesos或独立Spark集群运行。 五、优化技巧 1. 内存管理:合理设置executor内存、driver内存,避免溢出,使用Tachyon或Alluxio作为缓存系统提升速度。 2. 广播变量和累加器...

    spark-sql-performance:Spark SQL的一组性能测试

    这些代码可能会使用Spark SQL API来创建DataFrame,执行SQL查询,并记录执行时间,以便进行性能评估。 2. **测试用例**:一系列精心设计的测试场景,可能包括各种SQL操作,例如简单的SELECT,复杂的JOIN,多层嵌套...

    sparkdemo_202108.7z

    RDD是Spark的基础数据结构,它是不可变的、分区的记录集合,支持并行操作。DataFrame是Spark SQL引入的数据结构,它是基于RDD但提供了更高级别的抽象,适用于结构化数据处理。Dataset是DataFrame的类型安全版本,...

    spark-2.4.7-bin-hadoop2.6.tgz

    6. **MLlib**:Spark的机器学习库MLlib包含了多种机器学习算法,如分类、回归、聚类、协同过滤,以及模型评估和调优工具,为数据科学家提供了便捷的机器学习平台。 7. **GraphX**:Spark的图处理库,用于构建和分析...

    spark-2.2.0源码

    2. **Spark SQL**:Spark SQL是Spark处理结构化数据的模块,它将SQL查询与DataFrame API融合,使得开发者可以方便地在SQL和DataFrame之间切换。在源码中,你可以看到如何解析SQL语句,执行查询计划,并将其转化为...

Global site tag (gtag.js) - Google Analytics