早在Spark Submit 2013里就有介绍到Spark SQL,不过更多的是介绍Catalyst查询优化框架。经过一年的开发,在今年Spark Submit 2014上,Databricks宣布放弃Shark 的开发,而转投Spark SQL,理由是Shark继承了Hive太多,优化出现了瓶颈,如图:
今天把Spark最新的代码签了下来,测试了一下:
1、编译SparkSQL
-bash-3.2$ git config --global http.sslVerify false -bash-3.2$ git clone https://github.com/apache/spark.git 正克隆到 'spark'... remote: Reusing existing pack: 107821, done. remote: Counting objects: 103, done. remote: Compressing objects: 100% (72/72), done. remote: Total 107924 (delta 20), reused 64 (delta 16) Receiving objects: 100% (107924/107924), 69.06 MiB | 3.39 MiB/s, done. Resolving deltas: 100% (50174/50174), done.
这里还是需要先build一下的,sbt/sbt assembly(如何build匹配版本,请参考 Spark编译及集群搭建
)
运行 sbt/sbt hive/console也会进行编译。
最新的spark sql提供了一个console,在这里可以直接的运行交互式查下,也提供了几个例子。
2、执行Spark SQL
官方提供给我们了一个测试用例。通过查看log,find . -name TestHive* 找到了位于:
/app/hadoop/shengli/spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveTestHive.scala 有兴趣可以自己打开 编译 调试下看看。
首先进入控制台:
sbt/sbt hive/console [info] Starting scala interpreter... [info] import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl._ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.parquet.ParquetTestData Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_20). Type in expressions to have them evaluated. Type :help for more information. scala>
查看一下当前RunTime下都提供了哪些方法:
scala>
<init> DslAttribute DslExpression DslString DslSymbol
ParquetTestData SqlCmd analyzer autoConvertJoinSize binaryToLiteral
booleanToLiteral byteToLiteral cacheTable cacheTables catalog
classOf clear clone configure contains
createParquetFile createSchemaRDD createTable decimalToLiteral describedTable
doubleToLiteral emptyResult eq equals executePlan
executeSql execution finalize floatToLiteral get
getAll getClass getHiveFile getOption hashCode
hiveDevHome hiveFilesTemp hiveHome hivePlanner hiveQTestUtilTables
hiveconf hiveql hql inRepoTests inferSchema
intToLiteral isCached joinBroadcastTables jsonFile jsonRDD
loadTestTable logger logicalPlanToSparkQuery longToLiteral metastorePath
ne notify notifyAll numShufflePartitions optimizer
originalUdfs outputBuffer parquetFile parseSql parser
planner prepareForExecution registerRDDAsTable registerTestTable reset
runHive runSqlHive sessionState set shortToLiteral
sparkContext sql stringToLiteral symbolToUnresolvedAttribute synchronized
table testTables timestampToLiteral toDebugString toString
uncacheTable wait warehousePath
我们发现,这个测试用例里面有一个testTables,由于这些成员都是lazy的,所以一开始没有被加载:
查看测试用例要加载哪些表:
scala> testTables 14/07/02 18:45:59 INFO spark.SecurityManager: Changing view acls to: hadoop 14/07/02 18:45:59 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop) 14/07/02 18:46:00 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/07/02 18:46:00 INFO Remoting: Starting remoting 14/07/02 18:46:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@web02.dw:42984] 14/07/02 18:46:00 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@web02.dw:42984] 14/07/02 18:46:00 INFO spark.SparkEnv: Registering MapOutputTracker 14/07/02 18:46:00 INFO spark.SparkEnv: Registering BlockManagerMaster 14/07/02 18:46:00 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140702184600-9e16 14/07/02 18:46:00 INFO network.ConnectionManager: Bound socket to port 48348 with id = ConnectionManagerId(web02.dw,48348) 14/07/02 18:46:00 INFO storage.MemoryStore: MemoryStore started with capacity 1097.0 MB 14/07/02 18:46:00 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/07/02 18:46:00 INFO storage.BlockManagerInfo: Registering block manager web02.dw:48348 with 1097.0 MB RAM 14/07/02 18:46:00 INFO storage.BlockManagerMaster: Registered BlockManager 14/07/02 18:46:00 INFO spark.HttpServer: Starting HTTP Server 14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v20131031 14/07/02 18:46:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:36260 14/07/02 18:46:01 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.1.8.207:36260 14/07/02 18:46:01 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ca40f66c-edc3-484f-b317-d3f512aab244 14/07/02 18:46:01 INFO spark.HttpServer: Starting HTTP Server 14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v20131031 14/07/02 18:46:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:57821 14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v20131031 14/07/02 18:46:02 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/07/02 18:46:02 INFO ui.SparkUI: Started SparkUI at http://web02.dw:4040 metastore path is /tmp/sparkHiveMetastore8060064816530828092 warehousePath path is /tmp/sparkHiveWarehouse5366068035857129261 hiveHome path is Some(/home/hadoop/Java/lib/hive-0.6.0) hiveDevHome path is None res0: scala.collection.mutable.HashMap[String,org.apache.spark.sql.hive.test.TestHive.TestTable] = Map(sales -> TestTable(sales,WrappedArray(<function0>, <function0>)), src -> TestTable(src,WrappedArray(<function0>, <function0>)), src1 -> TestTable(src1,WrappedArray(<function0>, <function0>)), serdeins -> TestTable(serdeins,WrappedArray(<function0>, <function0>)), src_thrift -> TestTable(src_thrift,WrappedArray(<function0>)), srcpart -> TestTable(srcpart,WrappedArray(<function0>)), episodes -> TestTable(episodes,WrappedArray(<function0>, <function0>)), srcpart1 -> TestTable(srcpart1,WrappedArray(<function0>)))
测试select语句
1.首先声明一个sql
2.这是测试用例会用hive的metastore,创建一个derby的数据库
3.创建上述的所以表,并把数据加载进去。
4.Parse这条select * from sales 语句。
5. 生成SchemaRDD并产生查询计划。
6. 当对querySales这个RDD执行Action的时候,会计算这条sql的执行。
以下是执行的详细结果:(可以看到log打出的大概执行步骤)
scala> val querySales = sql("select * from sales") 14/07/02 18:51:19 INFO test.TestHive$: Loading test table sales 14/07/02 18:51:19 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ("input.regex" = "([^ ]*) ([^ ]*)") 14/07/02 18:51:19 INFO parse.ParseDriver: Parse Completed 14/07/02 18:51:19 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/07/02 18:51:19 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/07/02 18:51:19 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/07/02 18:51:19 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions 14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=Driver.run> 14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=TimeToSubmit> 14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=compile> 14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=parse> 14/07/02 18:51:19 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ("input.regex" = "([^ ]*) ([^ ]*)") 14/07/02 18:51:19 INFO parse.ParseDriver: Parse Completed 14/07/02 18:51:19 INFO ql.Driver: </PERFLOG method=parse start=1404298279883 end=1404298279885 duration=2> 14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=semanticAnalyze> 14/07/02 18:51:19 INFO parse.SemanticAnalyzer: Starting Semantic Analysis 14/07/02 18:51:19 INFO parse.SemanticAnalyzer: Creating table sales position=27 14/07/02 18:51:20 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 14/07/02 18:51:20 INFO metastore.ObjectStore: ObjectStore, initialize called 14/07/02 18:51:20 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 14/07/02 18:51:21 WARN bonecp.BoneCPConfig: Max Connections < 1. Setting to 20 14/07/02 18:51:25 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 14/07/02 18:51:25 INFO metastore.ObjectStore: Initialized ObjectStore 14/07/02 18:51:26 WARN bonecp.BoneCPConfig: Max Connections < 1. Setting to 20 14/07/02 18:51:26 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.12.0 14/07/02 18:51:27 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:27 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:27 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 14/07/02 18:51:27 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 14/07/02 18:51:28 INFO ql.Driver: Semantic Analysis Completed 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=semanticAnalyze start=1404298279885 end=1404298288331 duration=8446> 14/07/02 18:51:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=compile start=1404298279840 end=1404298288340 duration=8500> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=Driver.execute> 14/07/02 18:51:28 INFO ql.Driver: Starting command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ("input.regex" = "([^ ]*) ([^ ]*)") 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=TimeToSubmit start=1404298279840 end=1404298288351 duration=8511> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=runTasks> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=task.DDL.Stage-0> 14/07/02 18:51:28 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:sales, dbName:default, owner:hadoop, createTime:1404298288, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:string, comment:null), FieldSchema(name:value, type:int, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.RegexSerDe, parameters:{serialization.format=1, input.regex=([^ ]*) ([^ ]*)}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null)) 14/07/02 18:51:28 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=create_table: Table(tableName:sales, dbName:default, owner:hadoop, createTime:1404298288, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:string, comment:null), FieldSchema(name:value, type:int, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.RegexSerDe, parameters:{serialization.format=1, input.regex=([^ ]*) ([^ ]*)}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null)) 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=task.DDL.Stage-0 start=1404298288351 end=1404298288589 duration=238> 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=runTasks start=1404298288351 end=1404298288589 duration=238> 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=Driver.execute start=1404298288340 end=1404298288589 duration=249> 14/07/02 18:51:28 INFO ql.Driver: OK 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=releaseLocks> 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298288590 end=1404298288590 duration=0> 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=Driver.run start=1404298279839 end=1404298288590 duration=8751> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=releaseLocks> 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298288590 end=1404298288590 duration=0> 14/07/02 18:51:28 INFO parse.ParseDriver: Parsing command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales 14/07/02 18:51:28 INFO parse.ParseDriver: Parse Completed 14/07/02 18:51:28 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/07/02 18:51:28 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/07/02 18:51:28 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/07/02 18:51:28 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=Driver.run> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=TimeToSubmit> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=compile> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=parse> 14/07/02 18:51:28 INFO parse.ParseDriver: Parsing command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales 14/07/02 18:51:28 INFO parse.ParseDriver: Parse Completed 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=parse start=1404298288629 end=1404298288629 duration=0> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=semanticAnalyze> 14/07/02 18:51:28 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:28 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/02 18:51:28 INFO ql.Driver: Semantic Analysis Completed 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=semanticAnalyze start=1404298288630 end=1404298288942 duration=312> 14/07/02 18:51:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=compile start=1404298288628 end=1404298288943 duration=315> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=Driver.execute> 14/07/02 18:51:28 INFO ql.Driver: Starting command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales 14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=TimeToSubmit start=1404298288628 end=1404298288943 duration=315> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=runTasks> 14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=task.COPY.Stage-0> 14/07/02 18:51:28 INFO exec.Task: Copying data from file:/app/hadoop/spark/sql/hive/src/test/resources/data/files/sales.txt to file:/tmp/hive-hadoop/hive_2014-07-02_18-51-28_629_2309366591646930035-1/-ext-10000 14/07/02 18:51:28 INFO exec.Task: Copying file: file:/app/hadoop/spark/sql/hive/src/test/resources/data/files/sales.txt 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=task.COPY.Stage-0 start=1404298288943 end=1404298289037 duration=94> 14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=task.MOVE.Stage-1> 14/07/02 18:51:29 INFO exec.Task: Loading data to table default.sales from file:/tmp/hive-hadoop/hive_2014-07-02_18-51-28_629_2309366591646930035-1/-ext-10000 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: alter_table: db=default tbl=sales newtbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=alter_table: db=default tbl=sales newtbl=sales 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=task.MOVE.Stage-1 start=1404298289037 end=1404298289196 duration=159> 14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=task.STATS.Stage-2> 14/07/02 18:51:29 INFO exec.StatsTask: Executing stats task 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: alter_table: db=default tbl=sales newtbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=alter_table: db=default tbl=sales newtbl=sales 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:29 INFO exec.Task: Table default.sales stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 13, raw_data_size: 0] 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=task.STATS.Stage-2 start=1404298289196 end=1404298289282 duration=86> 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=runTasks start=1404298288943 end=1404298289282 duration=339> 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=Driver.execute start=1404298288943 end=1404298289282 duration=339> 14/07/02 18:51:29 INFO ql.Driver: OK 14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=releaseLocks> 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298289282 end=1404298289282 duration=0> 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=Driver.run start=1404298288628 end=1404298289282 duration=654> 14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=releaseLocks> 14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298289282 end=1404298289282 duration=0> 14/07/02 18:51:29 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/07/02 18:51:29 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales 14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=sales 14/07/02 18:51:29 INFO storage.MemoryStore: ensureFreeSpace(355913) called with curMem=0, maxMem=1150314086 14/07/02 18:51:29 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 347.6 KB, free 1096.7 MB) 14/07/02 18:51:29 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/07/02 18:51:29 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions querySales: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:100 == Query Plan == HiveTableScan [key#2,value#3], (MetastoreRelation default, sales, None), None执行spark sql
scala> querySales.collect()
14/07/02 18:57:32 WARN snappy.LoadSnappy: Snappy native library is available
14/07/02 18:57:32 WARN snappy.LoadSnappy: Snappy native library not loaded
14/07/02 18:57:32 INFO mapred.FileInputFormat: Total input paths to process : 1
14/07/02 18:57:32 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:52
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Got job 0 (collect at SparkPlan.scala:52) with 3 output partitions (allowLocal=false)
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:52)
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Missing parents: List()
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at SparkPlan.scala:52), which has no missing parents
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 0 (MappedRDD[5] at map at SparkPlan.scala:52)
14/07/02 18:57:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Re-computing pending task lists.
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 3606 bytes in 20 ms
14/07/02 18:57:32 INFO executor.Executor: Running task ID 0
14/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally
14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:0+6
14/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 0 is 1947
14/07/02 18:57:32 INFO executor.Executor: Sending result for 0 directly to driver
14/07/02 18:57:32 INFO executor.Executor: Finished task ID 0
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 3606 bytes in 0 ms
14/07/02 18:57:32 INFO executor.Executor: Running task ID 1
14/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 0 in 243 ms on localhost (progress: 1/3)
14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:6+6
14/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 1 is 1948
14/07/02 18:57:32 INFO executor.Executor: Sending result for 1 directly to driver
14/07/02 18:57:32 INFO executor.Executor: Finished task ID 1
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:2 as TID 2 on executor localhost: localhost (PROCESS_LOCAL)
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:2 as 3606 bytes in 1 ms
14/07/02 18:57:32 INFO executor.Executor: Running task ID 2
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 1 in 36 ms on localhost (progress: 2/3)
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
14/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally
14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:12+1
14/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 2 is 1721
14/07/02 18:57:32 INFO executor.Executor: Sending result for 2 directly to driver
14/07/02 18:57:32 INFO executor.Executor: Finished task ID 2
14/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 2 in 96 ms on localhost (progress: 3/3)
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)
14/07/02 18:57:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/07/02 18:57:32 INFO scheduler.DAGScheduler: Stage 0 (collect at SparkPlan.scala:52) finished in 0.366 s
14/07/02 18:57:32 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.454512333 s
res1: Array[org.apache.spark.sql.Row] = Array([Joe,2], [Hank,2])
Array([Joe,2], [Hank,2])
查询计划优化:
<pre name="code" class="python">scala> val query = sql("SELECT * FROM (SELECT * FROM src) a")
query: org.apache.spark.sql.SchemaRDD = SchemaRDD[6] at RDD at SchemaRDD.scala:100 == Query Plan ==
<span style="font-family: Arial, Helvetica, sans-serif;">HiveTableScan [key#6,value#7], (MetastoreRelation default, src, None), None</span>
3、Spark SQL LINQ
在Spark相关的框架里,一切的核心体都是RDD,SchemaRDD提供类似LINQ的语法api:
such as take, where... etc
scala> query. ++ aggregate as asInstanceOf baseLogicalPlan baseSchemaRDD cache cartesian checkpoint coalesce collect compute context count countApprox countApproxDistinct countByValue countByValueApprox dependencies distinct filter filterWith first flatMap flatMapWith fold foreach foreachPartition foreachWith generate getCheckpointFile getPartitions getStorageLevel glom groupBy id insertInto intersection isCheckpointed isInstanceOf iterator join keyBy limit map mapPartitions mapPartitionsWithContext mapPartitionsWithIndex mapPartitionsWithSplit mapWith max min name name_= orderBy partitioner partitions persist pipe preferredLocations printSchema queryExecution randomSplit reduce registerAsTable repartition sample saveAsObjectFile saveAsParquetFile saveAsTable saveAsTextFile schemaString select setName sortBy sparkContext sqlContext subtract take takeOrdered takeSample toArray toDebugString toJavaRDD toJavaSchemaRDD toLocalIterator toSchemaRDD toString top union unionAll unpersist where zip zipPartitions zipWithIndex zipWithUniqueId
注意key前面带了一撇,这个是Catalyst的查下语法,以后我会写一篇详细介绍:
scala> query.where('key === 100).collect()
14/07/02 19:07:55 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/07/02 19:07:55 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/07/02 19:07:55 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=src
14/07/02 19:07:55 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=src
14/07/02 19:07:55 INFO storage.MemoryStore: ensureFreeSpace(358003) called with curMem=713876, maxMem=1150314086
14/07/02 19:07:55 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 349.6 KB, free 1096.0 MB)
14/07/02 19:07:55 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
14/07/02 19:07:55 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
14/07/02 19:07:55 INFO mapred.FileInputFormat: Total input paths to process : 1
14/07/02 19:07:55 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:52
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Got job 2 (collect at SparkPlan.scala:52) with 2 output partitions (allowLocal=false)
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Final stage: Stage 2(collect at SparkPlan.scala:52)
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Missing parents: List()
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52), which has no missing parents
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52)
14/07/02 19:07:55 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
14/07/02 19:07:55 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 5 on executor localhost: localhost (PROCESS_LOCAL)
14/07/02 19:07:55 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 3854 bytes in 0 ms
14/07/02 19:07:55 INFO executor.Executor: Running task ID 5
14/07/02 19:07:55 INFO storage.BlockManager: Found block broadcast_3 locally
14/07/02 19:07:55 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/src/kv1.txt:0+2906
14/07/02 19:07:55 INFO executor.Executor: Serialized size of result for 5 is 1951
14/07/02 19:07:55 INFO executor.Executor: Sending result for 5 directly to driver
14/07/02 19:07:55 INFO scheduler.TaskSetManager: Starting task 2.0:1 as TID 6 on executor localhost: localhost (PROCESS_LOCAL)
14/07/02 19:07:55 INFO scheduler.TaskSetManager: Serialized task 2.0:1 as 3854 bytes in 0 ms
14/07/02 19:07:55 INFO executor.Executor: Finished task ID 5
14/07/02 19:07:55 INFO executor.Executor: Running task ID 6
14/07/02 19:07:55 INFO scheduler.TaskSetManager: Finished TID 5 in 44 ms on localhost (progress: 1/2)
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Completed ResultTask(2, 0)
14/07/02 19:07:55 INFO storage.BlockManager: Found block broadcast_3 locally
14/07/02 19:07:55 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/src/kv1.txt:2906+2906
14/07/02 19:07:55 INFO executor.Executor: Serialized size of result for 6 is 1951
14/07/02 19:07:55 INFO executor.Executor: Sending result for 6 directly to driver
14/07/02 19:07:55 INFO executor.Executor: Finished task ID 6
14/07/02 19:07:55 INFO scheduler.TaskSetManager: Finished TID 6 in 19 ms on localhost (progress: 2/2)
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Completed ResultTask(2, 1)
14/07/02 19:07:55 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/07/02 19:07:55 INFO scheduler.DAGScheduler: Stage 2 (collect at SparkPlan.scala:52) finished in 0.062 s
14/07/02 19:07:55 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.06947625 s
res6: Array[org.apache.spark.sql.Row] = Array([100,val_100], [100,val_100])
查询出2个key为100的结果。
4、总结:
Spark SQL 提供了一种Catalyst查询优化框架,在把SQL解析成逻辑执行计划,对执行计划优化,最后变成RDD操作,多种框架一种API,简单,规范。
本文暂且为止,后续还会继续相关的深入研究。
http://www.tuicool.com/articles/2Efi22
相关推荐
Spark SQL是一款强大的大数据处理工具,它提供了对JSON数据的内置支持,使得在处理JSON格式的数据时更加便捷。本文将详细介绍Spark SQL操作JSON字段的几个关键函数:get_json_object、from_json 和 to_json,以及...
4. **执行Spark SQL操作**:当Spark SQL通过Hive接口执行DML(Data Manipulation Language)或DDL(Data Definition Language)操作时,Hive Hook会捕获这些操作的元数据变更。 5. **血缘信息收集和存储**:捕获的...
Learning Spark SQL 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
《Spark SQL在法律服务网站数据分析中的应用》 Spark SQL是Apache Spark的重要组件,它将SQL查询语言与大数据处理相结合,使得非程序员也能轻松地对大规模数据进行分析。本实训指导书将带你深入理解如何利用Spark ...
《Learning Spark SQL - Aurobindo Sarkar》这本书是针对Apache Spark SQL的深入学习指南,由Aurobindo Sarkar撰写。Spark SQL是Apache Spark框架的一部分,它允许开发者使用SQL或者DataFrame API处理大规模数据。...
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
### Spark SQL 中时间字段少8个小时问题解决 #### 问题背景 在进行数据处理时,尤其是在涉及时间戳字段转换的过程中,经常会遇到时区问题。本文将详细探讨在使用Spark SQL处理Hive表中的时间戳字段时,遇到的时间...
Spark SQL是Apache Spark项目的一部分,它是处理SQL查询和数据集成的强大工具。Spark SQL结合了DataFrame API和传统的SQL接口,使得开发人员可以灵活地在结构化和半结构化数据上进行高性能计算。在这个源码分析中,...
《Spark SQL操作大全》 Spark SQL是Apache Spark项目的一个核心组件,它提供了处理结构化数据的强大功能,使得在大数据分析领域中,Spark SQL成为了一种不可或缺的工具。本资料主要涵盖了Spark SQL的基础概念、核心...
Spark SQL是Apache Spark的一个模块,它提供了对结构化数据的查询和处理能力。它允许用户使用SQL查询语言对分布式数据集进行查询和分析。Spark SQL不仅支持SQL标准,还支持 HiveQL,同时兼容Hive的表和UDF(用户定义...
Apache Spark SQL是Apache Spark的一个模块,专门用于处理结构化数据。它是Spark用于SQL和数据帧API的一个组件,能够在Spark程序中查询结构化数据。该模块集成在Spark中,使得用户能够以声明式的方式使用SQL来处理...
《Spark SQL大数据实例开发教程》是一本专注于Spark SQL学习的指南,由王家林和祝茂农等人编著。本书旨在帮助企业级开发人员深入理解和掌握Spark SQL,它在Spark生态系统中扮演着至关重要的角色,是处理大规模数据的...
**Spark SQL编程指南** Spark SQL是Apache Spark的一个重要模块,专为处理结构化数据而设计。它是Apache Spark的原生SQL接口,允许开发者使用SQL或DataFrame/Dataset API进行数据分析。在Spark SQL中,数据可以被...
Databrciks工程师,Spark Committer,Spark SQL主要开发者之一的连城详细解读了“Spark SQL结构化数据分析”。他介绍了Spark1.3版本中的很多新特性。重点介绍了DataFrame。其从SchemaRDD演变而来,提供了更加高层...
Spark SQL是Apache Spark项目的一部分,它提供了一个用于处理结构化数据的强大框架,允许开发者使用SQL或者DataFrame/Dataset API来查询数据。本资料“Spark SQL源码概览.zip”包含了一份详细的Spark SQL源码分析,...
在本项目中,ANTLR4被用来创建一个解析器,这个解析器支持多种SQL方言,包括Spark SQL、TiDB SQL以及Flink SQL,同时还支持Spark和Flink的运行命令解析。 Spark SQL是Apache Spark的一个组件,主要负责处理结构化的...
参考Spark官网以及一些文献,制作的Spark SQL教学幻灯片,适合进行Spark入门介绍与教学!所有的Spark教学系列都在我的资源内!
Spark SQL是Apache Spark项目的一部分,专门用于处理结构化数据,它提供了一种高效且易于使用的接口来进行SQL查询和数据处理。在这个"Spark SQL上海摩拜共享单车数据分析源码"项目中,开发者利用Spark SQL对上海摩拜...
表达式计算在Spark SQL中随处可见,本演讲将简介表达式、UDF、UDAF、UDTF的概念,主要的API,以及如何扩展Spark SQL函数库。本演讲还将提及Catalyst在计划阶段和Project Tungsten在执行层做的优化,以及未来性能提升...