概述
spark sql 是一个结构化执行的数据模块,它并不像基本的spark rdd操作,spark sql可以提供更多的基于数据操作的接口,而且有更多的优化操作,这里提供了几种方式去个spark sql 交互,DataFrames API 和 Datasets API。
SQL
DataFrame
dataframe 是一个结构化的分布式的类似数据库表的一种存储结构,但是它对比与关系型数据库有更加丰富的操作和更优的性能,dataframe 的数据来源可以是structured data files, tables in Hive, external databases, existing RDDs等。
Datasets
datasets是在spark 1.6 中提供的更新,他为spark rdd提供更多丰富的接口。
开始:SqlContext
创建一个sqlcontext
除了使用sqlContext 外,你还可以使用hiveContext,他是sqlContext 的一个超类,它不仅包含了sqlContext 的操作外,还提供的对hive 的操作,使用hiveContext 不需要去构建hive环境,仅仅需要依赖jar即可。
可以使用setconf设置spark.sql.dialect 来指定要执行的语句类型,例如sqlContext 默认就是 sql ,而hiveContext默认就是hiveql,推荐大家使用hiveql。
创建一个dataframe
操作dataframe
使用sql 语句执行一个查询
创建一个Datasets
dataset和rdd类似,但是它没有使用java 的序列化 以及kyro压缩机制,dataset使用了一种特殊的编码器来序列数据从而可以让数据可以执行或传输,编码器动态的将对象编码成字节流,允许spark在不反序列化对象的情况下执行很多类似rdd的操作。
spark sql 和 rdd之间的操作
spark sql 有两种方法可以将rdd转换成dataFrames;
第一种方式是通过反射来推断出rdd包含的对象包含的具体类型。如果你已经知道你所定义的dataframes的schema类型,推荐使用这种方法;
第二种方式是通过一个接口,这个接口允许你构建一个schema,这种方式适用于在运行期才能获取到schema以及类型;
第一种方式
case class事先定义好table 的schema,然后通过反射机制将读取进来的数据映射为对象的属性,case class可以包含复杂类型,例如sequence或者是array,而后rdd隐式转换为dataframe,并注册成为一个table;
第二种方式
Data Sources
spark sql 支持操作各种数据源,一个dataframe能够像操作RDD类似的功能来北操作哦,且能够被注册成为一张临时表,临时表允许用户去运行sql来查询数据。
load/save 函数
手动指定数据源
直接在文件上运行sql
Save Modes
执行save mode 可以指定 在执行save操作后,如果原数据存在,将执行什么操作
保存为持久表
当使用hiveContext时候,dataFrames可以通过saveAsTable命令将数据保存为永久存在的表,这个表就称之为 manager table,它不会随着spark的重启而消失,一旦保存后,就可以通过SqlContext 来操作相对应的表
Parquet Files
分区
分区表在hive系统中是非常常用的一种优化手段,在一个分区表中,数据会根据分区条件被存储在不同的目录下,目前parquet也支持分区,例如我们存储yoghurt信息到一个分区表,通过两列来分区,分别是 gender、country
通过SQLContext.read.parquet 或者 SQLContext.read.load 来访问路径path/to/table,便可以访问到数据,schema中会自动将分区列 gender和country加载进来 例如
可以打看到数据类型被自动自动的推导了出来,目前spark支持两种数据类型:数字类型、字符串类型;但有时用户并不希望自动推导类型,那么我们可以通过设置park.sql.sources.partitionColumnTypeInference.enabled=false ,这样所有的类型都是以字符串格式进行处理了;
在spark 1.6中,如果通过SQLContext.read.parquet 或者 SQLContext.read.load 访问路径 path/to/table/gender=male,那么gender就不会存在于schema。
Schema Merging
parquet支持schema的合并,例如表test 有两个分区 key=1和key=2,在key=1 中有两个schema :A、B 在 key=2中有两个schema:B、C,对两个分区进行merge,则会产生三个schema :A、B、C
注意:因为merge是一个比较耗费性能的操作,所以从1.5开始默认不支持
当向Hive metastore中读写Parquet表时,Spark SQL将使用Spark SQL自带的Parquet SerDe(SerDe:Serialize/Deserilize的简称,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自带的SerDe拥有更好的性能。这个优化的配置参数为spark.sql.hive.convertMetastoreParquet,默认值为开启。
Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)
从表Schema处理的角度对比Hive和Parquet,有两个区别:
Hive区分大小写,Parquet不区分大小写
hive允许所有的列为空,而Parquet不允许所有的列全为空
由于这两个区别,当将Hive metastore Parquet表转换为Spark SQL Parquet表时,需要将Hive metastore schema和Parquet schema进行一致化。一致化规则如下:
这两个schema中的同名字段必须具有相同的数据类型。一致化后的字段必须为Parquet的字段类型。这个规则同时也解决了空值的问题。
一致化后的schema只包含Hive metastore中出现的字段。
忽略只出现在Parquet schema中的字段
只在Hive metastore schema中出现的字段设为nullable字段,并加到一致化后的schema中
元数据刷新(Metadata Refreshing)
Spark SQL缓存了Parquet元数据以达到良好的性能。当Hive metastore Parquet表转换为enabled时,表修改后缓存的元数据并不能刷新。所以,当表被Hive或其它工具修改时,则必须手动刷新元数据,以保证元数据的一致性。示例如下:
Configuration
JSON数据集
Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。
需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:
Spark SQL支持对Hive的读写操作。需要注意的是,Hive所依赖的包,没有包含在Spark assembly包中。增加Hive时,需要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到所有的worker节点上。因为worker节点在访问Hive中数据时,会调用Hive的 serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包。
Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令之前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须可以被driverhe和所有的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中通过--jars选项和--file选项指定。
操作Hive时,必须创建一个HiveContext对象,HiveContext继承了SQLContext,并增加了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法可以执行HiveQL语法的查询语句。
JDBC To Other Databases
Spark SQL支持使用JDBC访问其他数据库。当时用JDBC访问其它数据库时,最好使用JdbcRDD。使用JdbcRDD时,Spark SQL操作返回的DataFrame会很方便,也会很方便的添加其他数据源数据。JDBC数据源因为不需要用户提供ClassTag,所以很适合使用Java或Python进行操作。
使用JDBC访问数据源,需要在spark classpath添加JDBC driver配置。
故障排除(Troubleshooting)
在客户端session和所有的executors上,JDBC driver必须对启动类加载器(primordial class loader)设置为visible。因为当创建一个connection时,Java的DriverManager类会执行安全验证,安全验证将忽略所有对启动类加载器为非visible的driver。一个很方便的解决方法是,修改所有worker节点上的compute_classpath.sh脚本,将driver JARs添加至脚本。
有些数据库(例:H2)将所有的名字转换为大写,所以在这些数据库中,Spark SQL也需要将名字全部大写。
spark sql 是一个结构化执行的数据模块,它并不像基本的spark rdd操作,spark sql可以提供更多的基于数据操作的接口,而且有更多的优化操作,这里提供了几种方式去个spark sql 交互,DataFrames API 和 Datasets API。
SQL
DataFrame
dataframe 是一个结构化的分布式的类似数据库表的一种存储结构,但是它对比与关系型数据库有更加丰富的操作和更优的性能,dataframe 的数据来源可以是structured data files, tables in Hive, external databases, existing RDDs等。
Datasets
datasets是在spark 1.6 中提供的更新,他为spark rdd提供更多丰富的接口。
开始:SqlContext
创建一个sqlcontext
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._
除了使用sqlContext 外,你还可以使用hiveContext,他是sqlContext 的一个超类,它不仅包含了sqlContext 的操作外,还提供的对hive 的操作,使用hiveContext 不需要去构建hive环境,仅仅需要依赖jar即可。
可以使用setconf设置spark.sql.dialect 来指定要执行的语句类型,例如sqlContext 默认就是 sql ,而hiveContext默认就是hiveql,推荐大家使用hiveql。
创建一个dataframe
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show()
操作dataframe
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
使用sql 语句执行一个查询
val sqlContext = ... // An existing SQLContext val df = sqlContext.sql("SELECT * FROM table")
创建一个Datasets
dataset和rdd类似,但是它没有使用java 的序列化 以及kyro压缩机制,dataset使用了一种特殊的编码器来序列数据从而可以让数据可以执行或传输,编码器动态的将对象编码成字节流,允许spark在不反序列化对象的情况下执行很多类似rdd的操作。
// Encoders for most common types are automatically provided by importing sqlContext.implicits._ val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // Returns: Array(2, 3, 4) // Encoders are also created for case classes. case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]
spark sql 和 rdd之间的操作
spark sql 有两种方法可以将rdd转换成dataFrames;
第一种方式是通过反射来推断出rdd包含的对象包含的具体类型。如果你已经知道你所定义的dataframes的schema类型,推荐使用这种方法;
第二种方式是通过一个接口,这个接口允许你构建一个schema,这种方式适用于在运行期才能获取到schema以及类型;
第一种方式
case class事先定义好table 的schema,然后通过反射机制将读取进来的数据映射为对象的属性,case class可以包含复杂类型,例如sequence或者是array,而后rdd隐式转换为dataframe,并注册成为一个table;
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19)
第二种方式
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val people = sc.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType,StructField,StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println)
Data Sources
spark sql 支持操作各种数据源,一个dataframe能够像操作RDD类似的功能来北操作哦,且能够被注册成为一张临时表,临时表允许用户去运行sql来查询数据。
load/save 函数
val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手动指定数据源
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
直接在文件上运行sql
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save Modes
执行save mode 可以指定 在执行save操作后,如果原数据存在,将执行什么操作
Scala/Java | Any Language | Meaning |
SaveMode.ErrorIfExists (default) | "error" | When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. |
SaveMode.Append | "append" | When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. |
SaveMode.Overwrite | "overwrite" | Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. |
SaveMode.Ignore | "ignore" | Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL. |
保存为持久表
当使用hiveContext时候,dataFrames可以通过saveAsTable命令将数据保存为永久存在的表,这个表就称之为 manager table,它不会随着spark的重启而消失,一旦保存后,就可以通过SqlContext 来操作相对应的表
Parquet Files
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
分区
分区表在hive系统中是非常常用的一种优化手段,在一个分区表中,数据会根据分区条件被存储在不同的目录下,目前parquet也支持分区,例如我们存储yoghurt信息到一个分区表,通过两列来分区,分别是 gender、country
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
通过SQLContext.read.parquet 或者 SQLContext.read.load 来访问路径path/to/table,便可以访问到数据,schema中会自动将分区列 gender和country加载进来 例如
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
可以打看到数据类型被自动自动的推导了出来,目前spark支持两种数据类型:数字类型、字符串类型;但有时用户并不希望自动推导类型,那么我们可以通过设置park.sql.sources.partitionColumnTypeInference.enabled=false ,这样所有的类型都是以字符串格式进行处理了;
在spark 1.6中,如果通过SQLContext.read.parquet 或者 SQLContext.read.load 访问路径 path/to/table/gender=male,那么gender就不会存在于schema。
Schema Merging
parquet支持schema的合并,例如表test 有两个分区 key=1和key=2,在key=1 中有两个schema :A、B 在 key=2中有两个schema:B、C,对两个分区进行merge,则会产生三个schema :A、B、C
注意:因为merge是一个比较耗费性能的操作,所以从1.5开始默认不支持
/ sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
当向Hive metastore中读写Parquet表时,Spark SQL将使用Spark SQL自带的Parquet SerDe(SerDe:Serialize/Deserilize的简称,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自带的SerDe拥有更好的性能。这个优化的配置参数为spark.sql.hive.convertMetastoreParquet,默认值为开启。
Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)
从表Schema处理的角度对比Hive和Parquet,有两个区别:
Hive区分大小写,Parquet不区分大小写
hive允许所有的列为空,而Parquet不允许所有的列全为空
由于这两个区别,当将Hive metastore Parquet表转换为Spark SQL Parquet表时,需要将Hive metastore schema和Parquet schema进行一致化。一致化规则如下:
这两个schema中的同名字段必须具有相同的数据类型。一致化后的字段必须为Parquet的字段类型。这个规则同时也解决了空值的问题。
一致化后的schema只包含Hive metastore中出现的字段。
忽略只出现在Parquet schema中的字段
只在Hive metastore schema中出现的字段设为nullable字段,并加到一致化后的schema中
元数据刷新(Metadata Refreshing)
Spark SQL缓存了Parquet元数据以达到良好的性能。当Hive metastore Parquet表转换为enabled时,表修改后缓存的元数据并不能刷新。所以,当表被Hive或其它工具修改时,则必须手动刷新元数据,以保证元数据的一致性。示例如下:
sqlContext.refreshTable("my_table")
Configuration
JSON数据集
Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。
需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Spark SQL支持对Hive的读写操作。需要注意的是,Hive所依赖的包,没有包含在Spark assembly包中。增加Hive时,需要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到所有的worker节点上。因为worker节点在访问Hive中数据时,会调用Hive的 serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包。
Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令之前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须可以被driverhe和所有的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中通过--jars选项和--file选项指定。
操作Hive时,必须创建一个HiveContext对象,HiveContext继承了SQLContext,并增加了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法可以执行HiveQL语法的查询语句。
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
JDBC To Other Databases
Spark SQL支持使用JDBC访问其他数据库。当时用JDBC访问其它数据库时,最好使用JdbcRDD。使用JdbcRDD时,Spark SQL操作返回的DataFrame会很方便,也会很方便的添加其他数据源数据。JDBC数据源因为不需要用户提供ClassTag,所以很适合使用Java或Python进行操作。
使用JDBC访问数据源,需要在spark classpath添加JDBC driver配置。
val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
故障排除(Troubleshooting)
在客户端session和所有的executors上,JDBC driver必须对启动类加载器(primordial class loader)设置为visible。因为当创建一个connection时,Java的DriverManager类会执行安全验证,安全验证将忽略所有对启动类加载器为非visible的driver。一个很方便的解决方法是,修改所有worker节点上的compute_classpath.sh脚本,将driver JARs添加至脚本。
有些数据库(例:H2)将所有的名字转换为大写,所以在这些数据库中,Spark SQL也需要将名字全部大写。
发表评论
-
Sort-based Shuffle的设计与实现
2016-03-15 08:49 812原文 http://www.cnblogs.com/hsea ... -
spark 中GC的调优
2016-03-14 11:02 1352注:本文转自:http://www.csdn.net/arti ... -
spark Tungsten-将硬件性能彻底压榨
2016-03-08 11:06 1033Tungsten项目将是Spark自诞生以来内核级别的最大改动 ... -
关于Spark的Broadcast解析
2016-02-20 08:37 4521本文重点关注 数据块切分方法以及P2P下载数据方法 Broad ... -
spark的几个重要概念
2015-12-04 14:09 0本节主要记录以下几个概念 一:RDD的五大特点 二:RDD 窄 ... -
spark部署安装调试
2015-12-02 11:28 739本节记录spark下载-->编译-->安装--&g ... -
spark基本概念
2015-11-12 10:45 790记录一下课堂笔记: ...
相关推荐
标题“Spark SQL and DataFrames-java - Spark 1.6.2”和描述中的知识点涉及了Apache Spark 1.6.2版本的SQL和DataFrames模块。在这一部分中,我们将详细讨论Spark SQL、DataFrames和Datasets的概念,以及如何在Java...
总结一下,"Intro to DataFrames and Spark SQL"的培训将涵盖如何使用DataFrame API进行数据处理,如何通过Spark SQL进行SQL查询,以及如何优化查询性能。通过学习这个课程,开发者将能够更好地利用Spark处理大规模...
Spark SQL是Apache Spark的一个模块,用于处理结构化数据。它提供了SQL查询语言的接口,可以访问多种数据源。从2014年4月发布的Spark 1.0版本开始,Spark SQL就成为了Spark核心分布包的一部分,并在2015年从Alpha...
result = spark.sql("SELECT species, COUNT(*) AS count FROM iris_table GROUP BY species") result.show() ``` 此外,我们也可以使用 DataFrame API 来完成相同的操作: ```python from pyspark.sql.functions ...
Know how to use Spark SQL and DataFrames using Scala and Python Get an introduction to Spark programming using R Perform Spark data processing, charting, and plotting using Python Get acquainted with ...
Know how to use Spark SQL and DataFrames using Scala and Python Get an introduction to Spark programming using R Perform Spark data processing, charting, and plotting using Python Get acquainted with ...
This book discusses various components of Spark such as Spark Core, DataFrames, Datasets and SQL, Spark Streaming, Spark MLib, and R on Spark with the help of practical code snippets for each topic....
Get a gentle overview of big data and Spark Learn about DataFrames, SQL, and Datasets-Spark's core APIs-through worked examples Dive into Spark's low-level APIs, RDDs, and execution of SQL and ...
从 Spark SQL 1.0~1.2 升级到 1.3 兼容 Apache Hive 参考 数据类型 NaN 语义 Structured Streaming MLlib(机器学习) 机器学习库(MLlib)指南 ML Pipelines(ML管道) Extracting, transforming and ...
Spark is a fast and ...rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.
Second, we especially wanted to explore the higher-level “structured” APIs that were finalized in Apache Spark 2.0—namely DataFrames, Datasets, Spark SQL, and Structured Streaming—which older ...
You will also learn about the updates on the APIs and how DataFrames and Datasets affect SQL, machine learning, graph processing, and streaming. You will learn to use Spark as a big data operating ...
《Developer Training for Apache Spark and Hadoop: Hands-On Exercise》是一份深入实践的教程,旨在帮助开发者熟悉Apache Spark和Hadoop两大大数据处理框架。本教程通过一系列的手动操作练习,覆盖了从基础环境...
- 交互式数据分析:Spark SQL和DataFrames为数据分析师提供了查询和分析数据的能力,且具有良好的交互性。 - 机器学习:MLlib提供了多种机器学习算法,可以在大规模数据集上训练模型,进行分类、回归、聚类等任务。 ...
Further on, you will be introduced to working with RDDs, DataFrames and Datasets to operate on schema aware data, and real-time streaming with various sources such as Twitter Stream and Apache Kafka....
spark入门联系wordcount等...rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.