- 浏览: 125255 次
- 性别:
- 来自: 杭州
文章分类
最新评论
抄的别人的,觉得写的特别好
val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
org.apache.spark.sql.catalyst.catalog.SessionCatalog#tableRelationCache
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
}
这里记录的是 QualifiedTableName 和 LogicalPlan 的对应关系。
每个表 和这个表的关系
这里是处理了刷新表 ,这里有好几个 Map 都存了什么
tableRelationCache
tempTables
globalTempViewManager
def refreshTable(name: TableIdentifier): Unit = synchronized {
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)
// Go through temporary tables and invalidate them.
// If the database is defined, this may be a global temporary view.
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(tableName).foreach(_.refresh())
} else if (dbName == globalTempViewManager.database) {
globalTempViewManager.get(tableName).foreach(_.refresh())
}
// Also invalidate the table relation cache.
val qualifiedTableName = QualifiedTableName(dbName, tableName)
tableRelationCache.invalidate(qualifiedTableName)
}
https://blog.csdn.net/junerli/article/details/78607708
meta 这一块
org.apache.spark.sql.catalyst.catalog.SessionCatalog
org.apache.spark.sql.hive.HiveSessionCatalog
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
val metastoreCatalog: HiveMetastoreCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader)
extends SessionCatalog(
externalCatalog,
globalTempViewManager,
functionRegistry,
conf,
hadoopConf,
parser,
functionResourceLoader) {
Spark SQL运行流程
将SQL语句通过词法和语法解析生成未绑定的逻辑执行计划(Unresolved LogicalPlan),包含Unresolved Relation、Unresolved Function和Unresolved Attribute,然后在后续步骤中使用不同的Rule应用到该逻辑计划上
Analyzer使用Analysis Rules,配合元数据(如SessionCatalog 或是 Hive Metastore等)完善未绑定的逻辑计划的属性而转换成绑定的逻辑计划。具体流程是县实例化一个Simple Analyzer,然后遍历预定义好的Batch,通过父类Rule Executor的执行方法运行Batch里的Rules,每个Rule会对未绑定的逻辑计划进行处理,有些可以通过一次解析处理,有些需要多次迭代,迭代直到达到FixedPoint次数或前后两次的树结构没变化才停止操作。
Optimizer使用Optimization Rules,将绑定的逻辑计划进行合并、列裁剪和过滤器下推等优化工作后生成优化的逻辑计划。
Planner使用Planning Strategies,对优化的逻辑计划进行转换(Transform)生成可以执行的物理计划。根据过去的性能统计数据,选择最佳的物理执行计划CostModel,最后生成可以执行的物理执行计划树,得到SparkPlan。
在最终真正执行物理执行计划之前,还要进行preparations规则处理,最后调用SparkPlan的execute执行计算RDD。
三、SparkContext运行原理分析
前面我们队Spark SQL运行架构进行分析,知道从输入SQL语句到生成Dataset分为5个步骤,但实际运行过程中在输入SQL语句之前,Spark还有加载SessionCatalog步骤。
3.1 使用SessionCatalog保存元数据
在解析SQL语句之前需要初始化SQLContext,它定义了Spark SQL执行的上下文,并把元数据保存在SessionCatalog中,这些元数据包括表名称、表字段名称和字段类型等。
SessionCatalog中保存的是表名和逻辑执行计划对应的哈希列表,这些数据将在解析未绑定的逻辑计划上使用。(SessionCatalog中的表名对应的逻辑执行计划是什么?是这个Dataset对应的逻辑执行计划)
3.2 使用Antlr生成未绑定的逻辑计划
Spark 2.0版本起使用Antlr进行词法和语法解析。使用Antlr生成未绑定的逻辑计划分为两个阶段:第一阶段的过程为词法分析(Lexical Analysis),负责将符号(Token)分组成符号类(Token class or Token type),第二阶段就是真正的Parser,默认Antlr会构建出一颗分析树(Parser Tree)或者叫语法树(Syntax Tree)。
SQLContext类中定义了SQL的解析方法parseSql。具体的SQL解析在AbastrctSqlParser抽象类中的parse方法进行,解析完毕后生成语法树,语法树会根据系统初始化的AstBuilder解析生成表达式、逻辑计划或表标识对象。
在AbstractSqlParse的parse方法中,先实例化词法解析器SqlBaseLexer和语法解析器SqlBaseParser,然后尝试用Antlr较快的解析模式SLL,如果解析失败,则会再尝试使用普通解析模型LL,解析完毕后返回解析结果。
3.3 使用Analyzer绑定逻辑执行计划
该阶段Analyzer使用Analysis Rules,结合SessionCatalog元数据,对未绑定的逻辑计划进行解析,生成了已绑定的逻辑计划。在该处理过程中,先实例化一个Analyzer,在Analyzer中定义了FiexedPoint和Seq[Batch]两个变量,其中FixedPoint为迭代次数的上线,而Seq[Batch]为所定义需要执行批处理的序列,每个批处理由一系列Rule和策略所组成,策略一般分为Once和FixedPoint(可理解为迭代次数)
3.4 使用Optimizer优化逻辑计划
Optimizer的实现和处理方式同Analyzer类似,在该类中定义一系列Rule并同样继承于RuleExecutor。利用这些Rule对逻辑计划和Expression进行迭代处理,从而达到对树的节点进行合并和优化。其中主要的优化策略总结起来是合并、列裁剪和过滤器下推等几大类。
Optimizer的优化策略不仅对已绑定的逻辑计划进行优化,而且对逻辑计划中的Expression也进行了优化,其原理就是遍历树,然后应用优化Rule,但是注意一点,对逻辑计划处理时先序遍历,而对Expression的处理是后续遍历(根据树节点类型来判断是逻辑计划还是Expression吗?)
3.5 使用SparkPlanner生成可执行物理计划
SparkPlanner使用Planning Strategies对优化的逻辑计划进行转换(Transform),生成可以执行的物理计划。在QueryExecution类代码中,调用SparkPlanner.plan方法对优化的逻辑计划进行处理,而SparkPlanner并未定义plan方法,实际是调用SparkPlanner的祖父类QueyrPlanner的plan方法,然后会返回一个Iterator[Physicalplan]。
SparkPlanner继承于SparkStrategies,而SparkStrategies继承了QueryPlanner。其中SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的GenericStrategy。在SparkPlanner通过改写祖父类QueryPlanner中strategies策略变量,在该变量中定义了转换成物理计划所执行的策略。
3.6 使用QueryExecution执行物理计划
在该步骤中先调用SparkPlanner.preparations对物理计划进行准备工作,规范返回数据行的格式等,然后调用SparkPlan.execute执行物理计划,从数据库中查询数据并生成RDD。
SparkPlan的preparations其实是一个RuleExecutor[SparkPlan],它会调用RuleExecutor的execut方法对前面生成的物理计划应用Rule进行匹配,最终生成一个SparkPlan。
SparkPlan继承于QueryPlan,SparkPlan中定义了SQL语句执行的execute方法,执行完execute方法返回的是一个RDD,之后可以运行Spark作业对该RDD进行操作。
val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
org.apache.spark.sql.catalyst.catalog.SessionCatalog#tableRelationCache
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
}
这里记录的是 QualifiedTableName 和 LogicalPlan 的对应关系。
每个表 和这个表的关系
这里是处理了刷新表 ,这里有好几个 Map 都存了什么
tableRelationCache
tempTables
globalTempViewManager
def refreshTable(name: TableIdentifier): Unit = synchronized {
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)
// Go through temporary tables and invalidate them.
// If the database is defined, this may be a global temporary view.
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(tableName).foreach(_.refresh())
} else if (dbName == globalTempViewManager.database) {
globalTempViewManager.get(tableName).foreach(_.refresh())
}
// Also invalidate the table relation cache.
val qualifiedTableName = QualifiedTableName(dbName, tableName)
tableRelationCache.invalidate(qualifiedTableName)
}
https://blog.csdn.net/junerli/article/details/78607708
meta 这一块
org.apache.spark.sql.catalyst.catalog.SessionCatalog
org.apache.spark.sql.hive.HiveSessionCatalog
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
val metastoreCatalog: HiveMetastoreCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader)
extends SessionCatalog(
externalCatalog,
globalTempViewManager,
functionRegistry,
conf,
hadoopConf,
parser,
functionResourceLoader) {
Spark SQL运行流程
将SQL语句通过词法和语法解析生成未绑定的逻辑执行计划(Unresolved LogicalPlan),包含Unresolved Relation、Unresolved Function和Unresolved Attribute,然后在后续步骤中使用不同的Rule应用到该逻辑计划上
Analyzer使用Analysis Rules,配合元数据(如SessionCatalog 或是 Hive Metastore等)完善未绑定的逻辑计划的属性而转换成绑定的逻辑计划。具体流程是县实例化一个Simple Analyzer,然后遍历预定义好的Batch,通过父类Rule Executor的执行方法运行Batch里的Rules,每个Rule会对未绑定的逻辑计划进行处理,有些可以通过一次解析处理,有些需要多次迭代,迭代直到达到FixedPoint次数或前后两次的树结构没变化才停止操作。
Optimizer使用Optimization Rules,将绑定的逻辑计划进行合并、列裁剪和过滤器下推等优化工作后生成优化的逻辑计划。
Planner使用Planning Strategies,对优化的逻辑计划进行转换(Transform)生成可以执行的物理计划。根据过去的性能统计数据,选择最佳的物理执行计划CostModel,最后生成可以执行的物理执行计划树,得到SparkPlan。
在最终真正执行物理执行计划之前,还要进行preparations规则处理,最后调用SparkPlan的execute执行计算RDD。
三、SparkContext运行原理分析
前面我们队Spark SQL运行架构进行分析,知道从输入SQL语句到生成Dataset分为5个步骤,但实际运行过程中在输入SQL语句之前,Spark还有加载SessionCatalog步骤。
3.1 使用SessionCatalog保存元数据
在解析SQL语句之前需要初始化SQLContext,它定义了Spark SQL执行的上下文,并把元数据保存在SessionCatalog中,这些元数据包括表名称、表字段名称和字段类型等。
SessionCatalog中保存的是表名和逻辑执行计划对应的哈希列表,这些数据将在解析未绑定的逻辑计划上使用。(SessionCatalog中的表名对应的逻辑执行计划是什么?是这个Dataset对应的逻辑执行计划)
3.2 使用Antlr生成未绑定的逻辑计划
Spark 2.0版本起使用Antlr进行词法和语法解析。使用Antlr生成未绑定的逻辑计划分为两个阶段:第一阶段的过程为词法分析(Lexical Analysis),负责将符号(Token)分组成符号类(Token class or Token type),第二阶段就是真正的Parser,默认Antlr会构建出一颗分析树(Parser Tree)或者叫语法树(Syntax Tree)。
SQLContext类中定义了SQL的解析方法parseSql。具体的SQL解析在AbastrctSqlParser抽象类中的parse方法进行,解析完毕后生成语法树,语法树会根据系统初始化的AstBuilder解析生成表达式、逻辑计划或表标识对象。
在AbstractSqlParse的parse方法中,先实例化词法解析器SqlBaseLexer和语法解析器SqlBaseParser,然后尝试用Antlr较快的解析模式SLL,如果解析失败,则会再尝试使用普通解析模型LL,解析完毕后返回解析结果。
3.3 使用Analyzer绑定逻辑执行计划
该阶段Analyzer使用Analysis Rules,结合SessionCatalog元数据,对未绑定的逻辑计划进行解析,生成了已绑定的逻辑计划。在该处理过程中,先实例化一个Analyzer,在Analyzer中定义了FiexedPoint和Seq[Batch]两个变量,其中FixedPoint为迭代次数的上线,而Seq[Batch]为所定义需要执行批处理的序列,每个批处理由一系列Rule和策略所组成,策略一般分为Once和FixedPoint(可理解为迭代次数)
3.4 使用Optimizer优化逻辑计划
Optimizer的实现和处理方式同Analyzer类似,在该类中定义一系列Rule并同样继承于RuleExecutor。利用这些Rule对逻辑计划和Expression进行迭代处理,从而达到对树的节点进行合并和优化。其中主要的优化策略总结起来是合并、列裁剪和过滤器下推等几大类。
Optimizer的优化策略不仅对已绑定的逻辑计划进行优化,而且对逻辑计划中的Expression也进行了优化,其原理就是遍历树,然后应用优化Rule,但是注意一点,对逻辑计划处理时先序遍历,而对Expression的处理是后续遍历(根据树节点类型来判断是逻辑计划还是Expression吗?)
3.5 使用SparkPlanner生成可执行物理计划
SparkPlanner使用Planning Strategies对优化的逻辑计划进行转换(Transform),生成可以执行的物理计划。在QueryExecution类代码中,调用SparkPlanner.plan方法对优化的逻辑计划进行处理,而SparkPlanner并未定义plan方法,实际是调用SparkPlanner的祖父类QueyrPlanner的plan方法,然后会返回一个Iterator[Physicalplan]。
SparkPlanner继承于SparkStrategies,而SparkStrategies继承了QueryPlanner。其中SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的GenericStrategy。在SparkPlanner通过改写祖父类QueryPlanner中strategies策略变量,在该变量中定义了转换成物理计划所执行的策略。
3.6 使用QueryExecution执行物理计划
在该步骤中先调用SparkPlanner.preparations对物理计划进行准备工作,规范返回数据行的格式等,然后调用SparkPlan.execute执行物理计划,从数据库中查询数据并生成RDD。
SparkPlan的preparations其实是一个RuleExecutor[SparkPlan],它会调用RuleExecutor的execut方法对前面生成的物理计划应用Rule进行匹配,最终生成一个SparkPlan。
SparkPlan继承于QueryPlan,SparkPlan中定义了SQL语句执行的execute方法,执行完execute方法返回的是一个RDD,之后可以运行Spark作业对该RDD进行操作。
发表评论
-
thriftserver log4j.properties 生效
2018-04-09 11:46 451/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 444udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 668DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 626Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 586org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 415正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 533#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 553sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 525sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 867spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 624org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 349jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 943sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1299CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 570def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 473export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 590./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 492package org.test.udf import co ... -
test code
2017-08-24 17:52 290def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 680spark aggregator class H ...
相关推荐
Spark SQL支持加密和访问控制,确保数据在传输和处理过程中不被非法获取。同时,应遵循数据最小化原则,只处理必要的数据,并对敏感信息进行脱敏处理。 总结,本实训指导书将引导你通过Spark SQL进行法律服务网站的...
通过学习《Mastering Spark SQL》,读者可以掌握如何利用Spark SQL进行大数据处理,提高数据分析的效率和质量。对于Java开发者来说,这本书将帮助他们更好地理解和应用Spark SQL在Java项目中的实践。
1. **兼容性**:Spark SQL支持通过Hive的元数据、SQL语法和Hive SerDes与Hive集成,使得在Spark上可以无缝地运行Hive的工作负载。 2. **DataFrame API**:DataFrame API提供了强类型和静态类型的API,支持Scala、...
日志数据是系统运行过程中产生的记录信息,包含了丰富的业务行为和系统状态,如用户活动、系统错误、性能指标等。通过有效的日志分析,我们可以发现问题、优化系统性能、提升用户体验,甚至挖掘出有价值的商业洞察。...
### Spark运行原理解析 #### 一、Spark简介与核心价值 Spark是一个开源的大数据处理框架,它提供了统一的数据处理接口,能够支持多种类型的数据处理任务,如批处理、流处理、交互式查询以及机器学习等。Spark的...
这个示例展示了如何使用 Spark SQL 处理和分析数据,从读取文件到执行 SQL 查询,再到打印结果,整个过程都是在 Spark 的分布式环境中进行的。在 Spark SQL 中,数据被表示为 DataFrame 或 Dataset,这些数据结构...
与Hadoop MapReduce相比,Spark通过内存计算显著提升了迭代算法的执行效率,同时支持多种数据处理模型,包括批处理、交互式查询(Spark SQL)、流处理(Spark Streaming)和机器学习(MLlib)。 2. Spark核心组件:...
4. **Spark SQL**:Spark SQL是Spark用于结构化数据处理的模块,它可以与Hive兼容,支持SQL查询,并且可以与DataFrame和Dataset API集成,提供更高效的查询性能。 5. **Spark Streaming**:Spark Streaming处理实时...
- **RDD(弹性分布式数据集)**:Spark的核心数据抽象,是不可变、分区的记录集合,可以在集群中进行并行操作。 - **DAG(有向无环图)**:Spark的任务调度基于DAG,它表示了任务间的依赖关系。 - **Job、Stage、...
这份PDF教程涵盖了Spark的生态系统、部署与安装、编程模型、运行框架,以及相关的高级特性,如流处理、SQL支持、机器学习库MLlib和图计算库GraphX,还有Tachyon存储系统。 一、Spark生态圈 Spark生态圈包括一系列...
在这些案例中,你会看到如何在Hadoop上部署和运行Spark作业,以及如何读写HDFS上的数据。同时,Spark也支持Hadoop MapReduce和YARN资源管理系统,这使得Spark能充分利用Hadoop生态的资源。 总结,本压缩包中的Spark...
Spark-2.4.5是该框架的一个稳定版本,提供了丰富的数据处理功能,包括批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)以及机器学习(通过MLlib库)和图计算(通过GraphX)。这个版本的源码...
Spark 通过记录 RDD 的血缘关系(lineage)来实现容错。如果某个 RDD 的部分分区丢失,Spark 可以通过重新计算丢失的分区来恢复,这个过程依赖于 RDD 的转换历史。这种机制使得 Spark 在处理大规模数据时既高效又...
它支持批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)和机器学习(MLlib)等多种应用场景。 2. **Spark架构**:Spark的核心组件包括Driver Program、Executor、Cluster Manager和Storage System...
5. 集群部署:可以将Spark应用提交到YARN、Mesos或独立Spark集群运行。 五、优化技巧 1. 内存管理:合理设置executor内存、driver内存,避免溢出,使用Tachyon或Alluxio作为缓存系统提升速度。 2. 广播变量和累加器...
这些代码可能会使用Spark SQL API来创建DataFrame,执行SQL查询,并记录执行时间,以便进行性能评估。 2. **测试用例**:一系列精心设计的测试场景,可能包括各种SQL操作,例如简单的SELECT,复杂的JOIN,多层嵌套...
RDD是Spark的基础数据结构,它是不可变的、分区的记录集合,支持并行操作。DataFrame是Spark SQL引入的数据结构,它是基于RDD但提供了更高级别的抽象,适用于结构化数据处理。Dataset是DataFrame的类型安全版本,...
6. **MLlib**:Spark的机器学习库MLlib包含了多种机器学习算法,如分类、回归、聚类、协同过滤,以及模型评估和调优工具,为数据科学家提供了便捷的机器学习平台。 7. **GraphX**:Spark的图处理库,用于构建和分析...
2. **Spark SQL**:Spark SQL是Spark处理结构化数据的模块,它将SQL查询与DataFrame API融合,使得开发者可以方便地在SQL和DataFrame之间切换。在源码中,你可以看到如何解析SQL语句,执行查询计划,并将其转化为...