`

基于spark1.3.1的spark-sql实战-01

阅读更多

 

sqlContext总的一个过程如下图所示:
  1. SQL语句经过SqlParse解析成UnresolvedLogicalPlan;
  2. 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan;
  3. 使用optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;
  4. 使用SparkPlan将LogicalPlan转换成PhysicalPlan;
  5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
  6. 使用execute()执行可执行物理计划;
  7. 生成SchemaRDD。

 

 

 

  1. SQL语句经过HiveQl.parseSql解析成Unresolved LogicalPlan,在这个解析过程中对hiveql语句使用getAst()获取AST树,然后再进行解析;
  2. 使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolved LogicalPlan;
  3. 使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;
  4. 使用hivePlanner将LogicalPlan转换成PhysicalPlan;
  5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
  6. 使用execute()执行可执行物理计划;
  7. 执行后,使用map(_.copy)将结果导入SchemaRDD。

 

 

spark sql 三个核心部分:

1. 可以加载各种结构化数据源(e.g., JSON, Hive, and Parquet).
2.  可以让你通过SQL ,spark 内部程序或者外部工具,通过标准的数据库连接(JDBC/ODBC)连接spark,比如一个商业智能的工具Tableau

3.当你通过使用spark程序,spark sql 提供丰富又智能的SQL或者 regular Python/Java/Scala code,包括 join RDDS ,SQL tables ,使用SQL自定义用户函数

 

DataFrames

 

A DataFrame isa distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

SQLContext:

除了SQLContext之外 ,还有HiveContext 来创建,HiveContext包含是SQLContext的,功能比SQLContext更强大,可以操作HiveQL还可以定义UDF,在spark1.3.1以后版本更推荐使用HiveContext,但是需要依赖Hive jar包

 

Creating DataFrames

拥有SQLContext就可以创建DataFrames from an existing RDD, from a Hive table, or from data sources.
由于Hadoop使用了 lzo压缩方式,所以也需要在spark指定Hadoop Lzo的jar包,否则会报错”Compression codec com.hadoop.compression.lzo.LzoCodec not found.“
在spark_home/conf/spark_env.sh目录增加如下配置:
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/home/wangyue/opt/hadoop/hadoop-2.3.0-cdh5.1.0/lib/native/Linux-amd64-64/*:/home/wangyue/opt/hadoop/hadoop-2.3.0-cdh5.1.0/share/hadoop/common/hadoop-lzo-0.4.15-cdh5.1.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/wangyue/opt/hadoop/hadoop-2.3.0-cdh5.1.0/share/hadoop/common/hadoop-lzo-0.4.15-cdh5.1.0.jar
重启spark集群后:
已经可以在classpath中看到lzo jar
加载Json文件:

通过show方法查询dataframe数据

 

 

DataFrame Operations

 

// Select everybody, but increment the age by 1

// Select people older than 21

// Count people by age

 

Running SQL Queries Programmatically

The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame.

 

 

val sqlContext = ...  // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")

目前1.3.1版本后 可以通过SQLContext 运行 SQL程序,然后返回DataFrame格式的结果

 

目前有两种方式将RDD 转成DataFrame

 

1. Inferring the Schema Using Reflection

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

 

 

 

2. Programmatically Specifying the Schema

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

For example:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t => "Name: " + t(0)).collect().foreach(println)

 

 

Data Sources

1. Generic Load/Save Functions

yal df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")

 

 

2. Manually Specifying Options

val df = sqlContext.load("people.json", "json")
df.select("name", "age").save("namesAndAges.parquet", "parquet")

Save Modes

 

Saving to Persistent Tables

在HiveContext 下,DataFrame 会使用saveAsTable命令会将数据等信息保存到HiveMetastore中,这样即使重启启动spark sql还能活取到HiveMetastore中的数据

 

在SQLContext下,DataFrame 会使用saveAsTable命令会将数据等信息保存到managed table中,但这些数据通过metastore控制,当表执行drop会删除metastore中数据

 

Parquet Files

1. Loading Data Programmatically

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
 ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.parquetFile("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.parquetFile("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

 

 

Schema merging

/ sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.saveAsParquetFile("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.saveAsParquetFile("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partiioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Configuration

JSON Datasets

 

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using one of two methods in a SQLContext:

  • jsonFile - loads data from a directory of JSON files where each line of the files is a JSON object.
  • jsonRDD - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.

Note that the file that is offered as jsonFile is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "
file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/
examples/src/main/resources/people.json"// Create a DataFrame from the file(s) pointed to by pathval people = sqlContext.jsonFile(path)// The inferred schema can be visualized using the printSchema() method.people.printSchema()// root// |-- age: integer (nullable = true)// |-- name: string (nullable = true)// Register this DataFrame as a table.people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// Alternatively, a DataFrame can be created for a JSON dataset represented by// an RDD[String] storing one JSON object per string.val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

全部查询:

scala> val anotherPeopleSql = sqlContext.sql("select name,address.city  from anotherPeople")

scala> anotherPeopleSql.map(t => "Name: " + t(0)+ " city:"+t(1)).collect().foreach(println)

 

尊重原创,未经允许不得转载:http://blog.csdn.net/stark_summer/article/details/45825177

1
0
分享到:
评论

相关推荐

    spark-1.3.1-bin-hadoop2.6.tgz

    Spark-1.3.1-bin-hadoop2.6.tgz是一个针对Linux和Windows系统的安装包,包含了Apache Spark 1.3.1版本以及与Hadoop 2.6兼容的依赖。这个压缩包为用户提供了在本地或集群环境中搭建Spark计算平台的基础。 1. **Spark...

    spark-1.3.1-bin-2.2.0_gong.tgz

    本资源“spark-1.3.1-bin-2.2.0_gong.tgz”是一个针对Hadoop 2.4版本优化的Spark 1.3.1发行版的压缩包。该版本在Spark的早期版本中引入了多项改进和新特性,旨在提升数据处理效率和用户友好性。 首先,Spark 1.3.1...

    spark-1.3.1

    3. **Spark SQL**:Spark 1.3.1引入了Spark SQL,它允许开发者使用SQL或者DataFrame API来处理结构化数据。DataFrame是Spark SQL中的一层抽象,比RDD更易用且效率更高,因为它自动处理了数据类型和schema。 4. **...

    hadoop 2.6.0 及Spark1.3.1平台搭建20150505-优化版

    本指南将详细介绍如何在基于 Hadoop 2.6.0 的环境中搭建 Spark 1.3.1 平台,以实现高效的数据处理。 首先,我们需要进行虚拟化环境的准备。这里推荐使用 VMware Workstation 11 作为虚拟化软件,它为开发和测试提供...

    spark1.3.1源码下载

    3. **Spark Streaming**:基于微批处理的实时流处理框架,可以处理连续的数据流并进行实时分析。 4. **MLlib**:Spark的机器学习库,包含了各种算法如分类、回归、聚类、协同过滤等,以及模型评估和调优工具。 5. **...

    spark_api_1.3.1

    在 Spark 1.3.1 中,Spark SQL 支持了 Hive Metastore,允许用户通过 JDBC 或者 Thrift 服务器访问 Hive 表,使得现有的 Hive 应用可以轻松地迁移到 Spark 上。同时,DataFrame API 的引入,为用户提供了更高级别的...

    spark1.3源码

    首先,Spark 1.3.0的核心特性之一是加强了SQL支持,引入了DataFrame API。DataFrame可以看作是表格形式的数据集合,提供了类似于SQL的查询接口,使得非Java或Scala背景的开发人员也能轻松使用Spark进行数据处理。...

    SPARK相关资料

    Spark官方API是开发人员编写Spark程序的基础,它包括了RDD(弹性分布式数据集)、DataFrame、Dataset等核心组件,以及Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图处理库)等高级功能的详细接口和...

    Sparkml实战

    ### Spark MLlib实战 #### 1.1 聚类实例 ##### 1.1.1 算法说明 **聚类(Cluster Analysis)** 是一种重要的无监督学习方法,其核心任务是将一组对象划分为若干个簇(cluster),使得同一个簇内的对象尽可能相似,而...

    Spark相关知识点精选

    - **1.3.1 数据科学任务**: Spark因其强大的数据处理能力而广泛应用于数据科学领域,如大数据分析、机器学习模型训练等。 - **1.3.2 数据处理应用**: 在企业级应用中,Spark常被用于处理大量结构化或非结构化数据,...

    TPC-DS测试包及tpc测试标准说明

    `tpcds_1.3.1.pdf`是TPC-DS的规范文档,详细定义了测试的场景、数据模型、查询集合以及评估准则。在这个版本中,你可以找到关于数据仓库设计、数据生成、查询执行和性能度量等方面的详细信息。数据模型通常包括多个...

    国赛二等奖-全国大学生大数据技能竞赛指导手册

    Hive是一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射成表并提供类SQL查询功能。这部分内容涵盖了MySQL Server的安装、Hive的配置及常见问题解决方法。 ##### 3.7 安装Spark Apache Spark是一种快速...

    智慧银行大数据可视化展示平台建设和应用总体解决方案.docx

    - **1.3.1 数据分析综合服务平台**:负责数据的深入分析,为决策提供科学依据。 - **1.3.2 量收系统**:用于记录和管理客户的交易量和收益信息,是分析客户行为的重要数据来源。 - **1.3.3 金融大数据平台**:汇集...

    自我介绍13.docx大数据项目+项目介绍+面试辅导

    Sqoop 1.4.6用于数据导入导出,MySQL 5.6.24作为数据库,Azkaban 2.5.0进行工作流调度,Zookeeper 3.4.10用于分布式协调,Hbase 1.3.1作为NoSQL数据库,Phoenix 4.14.1提供Hbase的SQL查询,Canal 1.1.2用于数据同步...

Global site tag (gtag.js) - Google Analytics