1.关于Spark SQL操作Parquet
因为Parquet文件中包含了Schema信息,也就是说,Parquet文件是Schema自解释的,因此Spark SQL操作Parquet时,不需要指定Schema,因为Spark SQL可以根据Parquet文件中的Schema信息,解析出Parquet文件对应的SQL Schema
本文中的idAndName.parquet内容如下:从中可以看出hive_schema包含id,name两列
2.关于idAndName.parquet文件
idAndName.parquet来源于文章http://bit1129.iteye.com/blog/2202396,即这个文件是使用Hive写到HDFS中的文件中,文件内容如下:
3.Spark SQL处理Parquet文件的源代码
从源代码中可以看出,Spark SQL 1.3既可以使用直接load的方式加载得到DataFrame,也可以使用1.3以前注册临时Table的方式进行加载
package spark.examples.sql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} object SparkSQLParquet { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SparkSQLParquet").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val dir = "E:/open-sources/Hello2/src/spark/examples/sql" val df = sqlContext.load(dir + "/idAndName.parquet") //Spark SQL能够解析出来,给定的parquet文件是有id和name两列构成 df.select("id", "name").collect().foreach(row => println(row(0) + "," + row(1))) df.select("name").save(dir + "/name." + System.currentTimeMillis() + ".parquet") // Read in the parquet file.Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. //parquet文件包含了Schema信息,因此parquet文件是Schema自解释的 val parquetFile = sqlContext.parquetFile(dir + "/idAndName.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT * FROM parquetFile WHERE id > 10") teenagers.map(t => "id: " + t(0) + ", " + "name:" + t(1)).collect().foreach(println) } }
4. 关于Spark SQL写Parquet文件
如下是name.parquet的文件,从内容中可以看到Parquet中只有一列name
5. 运行以上代码的日志信息
C:\jdk1.7.0_51\bin\java -Didea.launcher.port=7533 "-Didea.launcher.bin.path=E:\softwareInstalled\JetBrains\IntelliJ IDEA 13.1.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\jdk1.7.0_51\jre\lib\charsets.jar;C:\jdk1.7.0_51\jre\lib\deploy.jar;C:\jdk1.7.0_51\jre\lib\javaws.jar;C:\jdk1.7.0_51\jre\lib\jce.jar;C:\jdk1.7.0_51\jre\lib\jfr.jar;C:\jdk1.7.0_51\jre\lib\jfxrt.jar;C:\jdk1.7.0_51\jre\lib\jsse.jar;C:\jdk1.7.0_51\jre\lib\management-agent.jar;C:\jdk1.7.0_51\jre\lib\plugin.jar;C:\jdk1.7.0_51\jre\lib\resources.jar;C:\jdk1.7.0_51\jre\lib\rt.jar;C:\jdk1.7.0_51\jre\lib\ext\access-bridge-32.jar;C:\jdk1.7.0_51\jre\lib\ext\dnsns.jar;C:\jdk1.7.0_51\jre\lib\ext\jaccess.jar;C:\jdk1.7.0_51\jre\lib\ext\localedata.jar;C:\jdk1.7.0_51\jre\lib\ext\sunec.jar;C:\jdk1.7.0_51\jre\lib\ext\sunjce_provider.jar;C:\jdk1.7.0_51\jre\lib\ext\sunmscapi.jar;C:\jdk1.7.0_51\jre\lib\ext\sunpkcs11.jar;C:\jdk1.7.0_51\jre\lib\ext\zipfs.jar;E:\open-sources\Hello2\out\production\SparkAndScalaExamples;E:\devsoftware\scala-2.10.4\lib\scala-library.jar;E:\devsoftware\scala-2.10.4\lib\scala-swing.jar;E:\devsoftware\scala-2.10.4\lib\scala-actors.jar;E:\open-sources\spark-1.3.0-bin-hadoop2.4\spark-1.3.0-bin-hadoop2.4\lib\spark-assembly-1.3.0-hadoop2.4.0.jar;E:\devsoftware\spark-1.2.0-bin-hadoop2.4\spark-1.2.0-bin-hadoop2.4\dependencies\spark-streaming-flume_2.11-1.2.0.jar;E:\devsoftware\apache-flume-1.5.2-bin(1)\apache-flume-1.5.2-bin\lib\flume-ng-sdk-1.5.2.jar;E:\devsoftware\apache-flume-1.5.2-bin(1)\apache-flume-1.5.2-bin\lib\flume-ng-core-1.5.2.jar;C:\Users\hadoop\Desktop\mysql-connector-java-5.1.34.jar;C:\Users\hadoop\Desktop\mongo-spark-master\mongo-spark-master\lib\mongo-hadoop-core_2.2.0-1.2.0.jar;E:\devsoftware\mongo-java-driver-2.9.3.jar;E:\devsoftware\spark-1.2.0-bin-hadoop2.4\spark-1.2.0-bin-hadoop2.4\lib\spark-examples-1.2.0-hadoop2.4.0.jar;E:\softwareInstalled\JetBrains\IntelliJ IDEA 13.1.3\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain spark.examples.sql.SparkSQLParquet Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/E:/open-sources/spark-1.3.0-bin-hadoop2.4/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/E:/devsoftware/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib/spark-examples-1.2.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/04/14 19:02:41 INFO SparkContext: Running Spark version 1.3.0 15/04/14 19:02:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/14 19:02:42 INFO SecurityManager: Changing view acls to: hadoop 15/04/14 19:02:42 INFO SecurityManager: Changing modify acls to: hadoop 15/04/14 19:02:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/04/14 19:02:44 INFO Slf4jLogger: Slf4jLogger started 15/04/14 19:02:44 INFO Remoting: Starting remoting 15/04/14 19:02:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@TP-A383-D.tom.com:60722] 15/04/14 19:02:44 INFO Utils: Successfully started service 'sparkDriver' on port 60722. 15/04/14 19:02:44 INFO SparkEnv: Registering MapOutputTracker 15/04/14 19:02:44 INFO SparkEnv: Registering BlockManagerMaster 15/04/14 19:02:44 INFO DiskBlockManager: Created local directory at C:\Users\hadoop\AppData\Local\Temp\spark-c52355be-0749-4859-be6d-d5d1e19c32df\blockmgr-e955e7e7-0cb0-4fce-8340-a2fbe58272d4 15/04/14 19:02:44 INFO MemoryStore: MemoryStore started with capacity 133.6 MB 15/04/14 19:02:44 INFO HttpFileServer: HTTP File server directory is C:\Users\hadoop\AppData\Local\Temp\spark-60338787-0109-43c3-a4c4-80cf2c127280\httpd-4e3e2224-c3b4-43fd-9f24-47c4899c8e3e 15/04/14 19:02:44 INFO HttpServer: Starting HTTP Server 15/04/14 19:02:45 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/14 19:02:45 INFO AbstractConnector: Started SocketConnector@0.0.0.0:60723 15/04/14 19:02:45 INFO Utils: Successfully started service 'HTTP file server' on port 60723. 15/04/14 19:02:45 INFO SparkEnv: Registering OutputCommitCoordinator 15/04/14 19:02:45 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/14 19:02:45 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/04/14 19:02:45 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/04/14 19:02:45 INFO SparkUI: Started SparkUI at http://TP-A383-D.tom.com:4040 15/04/14 19:02:45 INFO Executor: Starting executor ID <driver> on host localhost 15/04/14 19:02:45 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@TP-A383-D.tom.com:60722/user/HeartbeatReceiver 15/04/14 19:02:45 INFO NettyBlockTransferService: Server created on 60742 15/04/14 19:02:45 INFO BlockManagerMaster: Trying to register BlockManager 15/04/14 19:02:45 INFO BlockManagerMasterActor: Registering block manager localhost:60742 with 133.6 MB RAM, BlockManagerId(<driver>, localhost, 60742) 15/04/14 19:02:45 INFO BlockManagerMaster: Registered BlockManager SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 15/04/14 19:03:07 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 15/04/14 19:03:07 INFO MemoryStore: ensureFreeSpace(210772) called with curMem=0, maxMem=140142182 15/04/14 19:03:07 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 205.8 KB, free 133.4 MB) 15/04/14 19:03:08 INFO MemoryStore: ensureFreeSpace(32081) called with curMem=210772, maxMem=140142182 15/04/14 19:03:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 31.3 KB, free 133.4 MB) 15/04/14 19:03:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60742 (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:08 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/04/14 19:03:08 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at newParquet.scala:447 15/04/14 19:03:10 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize 15/04/14 19:03:10 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize 15/04/14 19:03:10 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy 15/04/14 19:03:10 INFO SparkContext: Starting job: collect at SparkPlan.scala:83 15/04/14 19:03:11 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:83) with 1 output partitions (allowLocal=false) 15/04/14 19:03:11 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:83) 15/04/14 19:03:11 INFO DAGScheduler: Parents of final stage: List() 15/04/14 19:03:11 INFO DAGScheduler: Missing parents: List() 15/04/14 19:03:11 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:83), which has no missing parents 15/04/14 19:03:11 INFO MemoryStore: ensureFreeSpace(3576) called with curMem=242853, maxMem=140142182 15/04/14 19:03:11 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.5 KB, free 133.4 MB) 15/04/14 19:03:11 INFO MemoryStore: ensureFreeSpace(2534) called with curMem=246429, maxMem=140142182 15/04/14 19:03:11 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 133.4 MB) 15/04/14 19:03:11 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:60742 (size: 2.5 KB, free: 133.6 MB) 15/04/14 19:03:11 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/04/14 19:03:11 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/04/14 19:03:11 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:83) 15/04/14 19:03:11 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/04/14 19:03:11 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1526 bytes) 15/04/14 19:03:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/04/14 19:03:12 INFO ParquetRelation2$$anon$1: Input split: ParquetInputSplit{part: file:/E:/open-sources/Hello2/src/spark/examples/sql/idAndName.parquet start: 0 end: 325 length: 325 hosts: [] requestedSchema: message root { optional int32 id; optional binary name (UTF8); } readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}}} 15/04/14 19:03:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 3276 bytes result sent to driver 15/04/14 19:03:12 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 942 ms on localhost (1/1) 15/04/14 19:03:12 INFO DAGScheduler: Stage 0 (collect at SparkPlan.scala:83) finished in 1.065 s 15/04/14 19:03:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/04/14 19:03:13 INFO DAGScheduler: Job 0 finished: collect at SparkPlan.scala:83, took 2.529248 s 1,MSN 10,QQ 100,Gtalk 1000,Skype null,null 15/04/14 19:03:13 INFO MemoryStore: ensureFreeSpace(210652) called with curMem=248963, maxMem=140142182 15/04/14 19:03:13 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 205.7 KB, free 133.2 MB) 15/04/14 19:03:13 INFO MemoryStore: ensureFreeSpace(32059) called with curMem=459615, maxMem=140142182 15/04/14 19:03:13 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 31.3 KB, free 133.2 MB) 15/04/14 19:03:13 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60742 (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:13 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/04/14 19:03:13 INFO SparkContext: Created broadcast 2 from NewHadoopRDD at newParquet.scala:447 15/04/14 19:03:16 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy 15/04/14 19:03:16 INFO SparkContext: Starting job: runJob at newParquet.scala:648 15/04/14 19:03:16 INFO DAGScheduler: Got job 1 (runJob at newParquet.scala:648) with 1 output partitions (allowLocal=false) 15/04/14 19:03:16 INFO DAGScheduler: Final stage: Stage 1(runJob at newParquet.scala:648) 15/04/14 19:03:16 INFO DAGScheduler: Parents of final stage: List() 15/04/14 19:03:16 INFO DAGScheduler: Missing parents: List() 15/04/14 19:03:16 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at map at newParquet.scala:542), which has no missing parents 15/04/14 19:03:16 INFO MemoryStore: ensureFreeSpace(58000) called with curMem=491674, maxMem=140142182 15/04/14 19:03:16 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 56.6 KB, free 133.1 MB) 15/04/14 19:03:16 INFO MemoryStore: ensureFreeSpace(34439) called with curMem=549674, maxMem=140142182 15/04/14 19:03:16 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 33.6 KB, free 133.1 MB) 15/04/14 19:03:16 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:60742 (size: 33.6 KB, free: 133.6 MB) 15/04/14 19:03:16 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0 15/04/14 19:03:16 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839 15/04/14 19:03:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at map at newParquet.scala:542) 15/04/14 19:03:16 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 15/04/14 19:03:16 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1523 bytes) 15/04/14 19:03:16 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 15/04/14 19:03:16 INFO ParquetRelation2$$anon$1: Input split: ParquetInputSplit{part: file:/E:/open-sources/Hello2/src/spark/examples/sql/idAndName.parquet start: 0 end: 325 length: 325 hosts: [] requestedSchema: message root { optional binary name (UTF8); } readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}}} 15/04/14 19:03:16 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 15/04/14 19:03:16 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 5 records. 15/04/14 19:03:16 INFO CodecConfig: Compression: GZIP 15/04/14 19:03:16 INFO ParquetOutputFormat: Parquet block size to 134217728 15/04/14 19:03:16 INFO ParquetOutputFormat: Parquet page size to 1048576 15/04/14 19:03:16 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576 15/04/14 19:03:16 INFO ParquetOutputFormat: Dictionary is on 15/04/14 19:03:16 INFO ParquetOutputFormat: Validation is off 15/04/14 19:03:16 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/14 19:03:17 INFO CodecPool: Got brand-new compressor [.gz] 15/04/14 19:03:17 INFO BlockManager: Removing broadcast 1 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_1_piece0 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_1_piece0 of size 2534 dropped from memory (free 139560603) 15/04/14 19:03:17 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:60742 in memory (size: 2.5 KB, free: 133.6 MB) 15/04/14 19:03:17 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_1 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_1 of size 3576 dropped from memory (free 139564179) 15/04/14 19:03:17 INFO ContextCleaner: Cleaned broadcast 1 15/04/14 19:03:17 INFO BlockManager: Removing broadcast 0 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_0 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_0 of size 210772 dropped from memory (free 139774951) 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_0_piece0 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_0_piece0 of size 32081 dropped from memory (free 139807032) 15/04/14 19:03:17 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:60742 in memory (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:17 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/04/14 19:03:17 INFO ContextCleaner: Cleaned broadcast 0 15/04/14 19:03:17 INFO InternalParquetRecordReader: at row 0. reading next block 15/04/14 19:03:17 INFO InternalParquetRecordReader: block read in memory in 0 ms. row count = 5 15/04/14 19:03:17 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 29,150,465 15/04/14 19:03:17 INFO ColumnChunkPageWriteStore: written 96B for [name] BINARY: 5 values, 44B raw, 58B comp, 1 pages, encodings: [RLE, PLAIN, BIT_PACKED] 15/04/14 19:03:17 INFO FileOutputCommitter: Saved output of task 'attempt_201504141903_0005_r_000000_0' to file:/E:/open-sources/Hello2/src/spark/examples/sql/name.1429009393304.parquet/_temporary/0/task_201504141903_0005_r_000000 15/04/14 19:03:17 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1792 bytes result sent to driver 15/04/14 19:03:17 INFO DAGScheduler: Stage 1 (runJob at newParquet.scala:648) finished in 0.971 s 15/04/14 19:03:17 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 971 ms on localhost (1/1) 15/04/14 19:03:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/04/14 19:03:17 INFO DAGScheduler: Job 1 finished: runJob at newParquet.scala:648, took 0.998114 s 15/04/14 19:03:17 INFO ParquetFileReader: Initiating action with parallelism: 5 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(212659) called with curMem=335150, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 207.7 KB, free 133.1 MB) 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(32088) called with curMem=547809, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 31.3 KB, free 133.1 MB) 15/04/14 19:03:21 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:60742 (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:21 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0 15/04/14 19:03:21 INFO SparkContext: Created broadcast 4 from NewHadoopRDD at newParquet.scala:447 15/04/14 19:03:21 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy 15/04/14 19:03:21 INFO SparkContext: Starting job: collect at SparkSQLParquet.scala:26 15/04/14 19:03:21 INFO DAGScheduler: Got job 2 (collect at SparkSQLParquet.scala:26) with 1 output partitions (allowLocal=false) 15/04/14 19:03:21 INFO DAGScheduler: Final stage: Stage 2(collect at SparkSQLParquet.scala:26) 15/04/14 19:03:21 INFO DAGScheduler: Parents of final stage: List() 15/04/14 19:03:21 INFO DAGScheduler: Missing parents: List() 15/04/14 19:03:21 INFO DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[10] at map at DataFrame.scala:776), which has no missing parents 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(4904) called with curMem=579897, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 4.8 KB, free 133.1 MB) 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(3349) called with curMem=584801, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 3.3 KB, free 133.1 MB) 15/04/14 19:03:21 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:60742 (size: 3.3 KB, free: 133.6 MB) 15/04/14 19:03:21 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0 15/04/14 19:03:21 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:839 15/04/14 19:03:21 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[10] at map at DataFrame.scala:776) 15/04/14 19:03:21 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 15/04/14 19:03:21 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1526 bytes) 15/04/14 19:03:21 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 15/04/14 19:03:21 INFO ParquetRelation2$$anon$1: Input split: ParquetInputSplit{part: file:/E:/open-sources/Hello2/src/spark/examples/sql/idAndName.parquet start: 0 end: 325 length: 325 hosts: [] requestedSchema: message root { optional int32 id; optional binary name (UTF8); } readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}}} 15/04/14 19:03:21 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 15/04/14 19:03:21 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 5 records. 15/04/14 19:03:21 INFO InternalParquetRecordReader: at row 0. reading next block 15/04/14 19:03:21 INFO InternalParquetRecordReader: block read in memory in 1 ms. row count = 5 15/04/14 19:03:21 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1840 bytes result sent to driver id: 100, name:Gtalk id: 1000, name:Skype 15/04/14 19:03:21 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 39 ms on localhost (1/1) 15/04/14 19:03:21 INFO DAGScheduler: Stage 2 (collect at SparkSQLParquet.scala:26) finished in 0.039 s 15/04/14 19:03:21 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/04/14 19:03:21 INFO DAGScheduler: Job 2 finished: collect at SparkSQLParquet.scala:26, took 0.061630 s Process finished with exit code 0
相关推荐
Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。 修改配置项spark.sql.sources.default,可修改默认数据源格式。 scala> val df = spark.read.load(hdfs://...
在本教程中,我们将深入探讨两个关键的数据格式:Parquet和JSON,以及如何在Spark SQL中操作它们。 首先,让我们了解Parquet文件。Parquet是一种列式存储格式,设计用于高效地处理大规模数据。这种格式的优点在于它...
《Spark SQL操作大全》 Spark SQL是Apache Spark项目的一个核心组件,它提供了...通过深入学习并实践《Spark SQL操作大全》中的内容,读者将能够熟练地运用Spark SQL处理各种大数据任务,提升数据分析和处理的效率。
例如,使用`spark.read.format("csv").load()`命令可以读取CSV格式的数据文件,而`df.write.format("parquet").save()`则可以将DataFrame保存为Parquet格式,这是一种高效的列式存储格式。 5. 数据处理与分析: ...
1. 数据加载:数据通常以CSV、JSON或Parquet等格式存储,Spark SQL可以方便地读取这些文件,转化为DataFrame,然后进行后续处理。 2. 数据清洗:在分析前,通常需要处理缺失值、异常值和重复数据。Spark SQL提供了...
本文旨在通过一系列的实验对比Spark SQL在不同存储格式(包括txt、Parquet、Ya100)下的性能表现。考虑到不同的机器环境及配置可能会导致结果有所差异,本测试报告仅适用于作者所使用的软硬件环境。 #### 测试结论...
5. **Data Source API**:Spark SQL引入了统一的数据源接口,使得它可以透明地读取和写入各种数据格式,如Parquet、JSON、CSV、JDBC等。 6. **性能优化**:Spark SQL采用了 Catalyst 编译器进行查询优化,包括代码...
本书将详细介绍如何创建DataFrame、执行SQL查询、转换和操作数据,以及如何使用Spark SQL进行复杂的数据分析。还会讲解DataFrame的优化,包括 Catalyst优化器和逻辑计划、物理计划的转换,以及如何通过代码优化提升...
4. **Parquet列式存储**:Parquet是一种高效的列式存储格式,Spark SQL对其有很好的支持。这一章会解释Parquet的优势和在大数据分析中的应用。 5. **Spark SQL内置函数与窗口函数**:深入讲解Spark SQL的内置函数,...
Spark SQL的核心特性之一是提供统一的数据访问接口,这个接口可以访问多种不同的数据源,如Hive、Avro、Parquet、ORC、JSON和JDBC。通过支持HiveQL语法,Spark SQL允许用户直接使用Hive中的SerDes(Serializer/...
* 灵活:Spark SQL 可以处理各种数据源和数据格式。 * 可扩展:Spark SQL 可以与其他 Spark 组件集成,实现了数据处理和分析的一体化。 Spark SQL 是一个功能强大且高效的数据处理引擎,广泛应用于大数据处理、数据...
Spark SQL内建对Parquet的支持,使得读写Parquet文件非常高效,尤其在处理大规模数据时。 **6. 表分区** Spark SQL支持表分区,这是一种数据组织策略,可以加速查询速度。通过分区,数据可以根据特定的列值分布到...
Parquet是一种面向列式存储的文件格式,特别适合于大数据分析,而且可以通过向量化操作提升查询效率。作者还强调了ColumnVector的概念,它是内存中列式数据的表示方式,包括OnHeapColumnVector和OffHeapColumnVector...
* Spark SQL 可以提供 DataFrame API,可以对内部和外部各种数据源执行各种关系操作。 * Spark SQL 可以支持大量的数据源和数据分析算法,组合使用 Spark SQL 和 Spark MLlib,可以融合传统关系数据库的结构化数据...
Spark SQL的主要特点之一是它提供了统一的数据访问接口,使得开发者可以通过一套简洁的API读取和写入各种数据格式,包括JSON、Parquet、Avro、JDBC等。这一点大大减少了代码的编写量,因为开发者不需要为每种数据...
Spark SQL支持多种数据源,如Parquet、JSON、CSV、 JDBC/ODBC等,可以读写这些格式的数据。它还提供了DataFrameWriter和DataFrameReader接口,方便用户自定义数据源。 6. **查询执行**: Spark SQL的执行计划由...
这些库允许Spark与各种数据源交互,并支持不同的数据格式。 在实际项目中,开发者可能会遇到以下几种jar包: 1. **Spark Core**:这是Spark的基础,提供了分布式任务调度和内存管理功能。核心库包括`spark-core_2....
此外,它还支持Hive的数据格式、查询语法以及用户定义函数(UDF)。 2. **Catalyst优化器**:Spark SQL使用Catalyst优化器进行查询优化,这个优化器能够对查询计划进行转换,提高执行效率,包括列裁剪、代码生成等...
* 数据源:Spark SQL 的数据源,包括 RDD、Parquet 文件和 JSON 数据集 * 性能调优:Spark SQL 的性能调优技巧 * SQL 接口:Spark SQL 的 SQL 接口,包括 Language-Integrated 的相关查询 GraphX * GraphX:Spark ...
7. **数据源API**:Spark SQL引入了统一的数据源API,允许用户透明地访问各种数据格式,如Parquet、JSON、CSV等,以及自定义的数据源。 8. **性能调优**:Spark SQL提供了一系列性能调优手段,如分区裁剪、广播JOIN...