在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。Databricks表示,Spark SQL将涵盖Shark的所有特性,用户可以从Shark 0.9进行无缝的升级。现在Databricks推广的Shark相关项目一共有两个,分别是Spark SQL和新的Hive on Spark(HIVE-7292)。如下图所示:

Spark SQL运行以SQL的方式来操作数据,类似Hive和Pig。其核心组件为一种新类型的RDD——JavaSchemaRDD,一个JavaSchemaRDD就好比传统关系型数据库中的一张表。JavaSchemaRDD可以从已有的RDD创建,还可以从Parquet文件、JSON数据集、HIVE、普通数据文件中创建。但现阶段(1.0.2版本)的Spark SQL还是alpha版,日后的API难免会发生变化,所以是否要使用该功能,现阶段还值得商榷。
程序示例
Bean,必须要有get方法,底层采用反射来获取各属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
publicstaticclassPersonimplementsSerializable{
privateStringname;
privateintage;
publicStringgetName(){
returnname;
}
publicintgetAge(){
returnage;
}
publicPerson(Stringname,intage){
this.name=name;
this.age=age;
}
}
|
Spark SQL示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
publicstaticvoidmain(String[]args){
SparkConf sparkConf=newSparkConf()
.setAppName("JavaSparkSQL")
.setMaster("local[2]");
JavaSparkContext ctx=newJavaSparkContext(sparkConf);
JavaSQLContext sqlCtx=newJavaSQLContext(ctx);
JavaRDD<Person>people=ctx.textFile("/home/yurnom/people.txt")//文档内容见下文
.map(line->{
String[]parts=line.split(",");
returnnewPerson(parts[0],Integer.parseInt(parts[1].trim()));//创建一个bean
});
JavaSchemaRDD schemaPeople=sqlCtx.applySchema(people,Person.class);
schemaPeople.registerAsTable("people");//注册为一张table
JavaSchemaRDD teenagers=sqlCtx.sql(//执行sql语句,属性名同bean的属性名
"SELECT name FROM people WHERE age >= 13 AND age <= 19");
List<String>teenagerNames=teenagers
.map(row->"Name: "+row.getString(0)).collect();
for(Strings:teenagerNames){
System.out.println(s);
}
}
|
运行结果
people.txt文件内容
|
Michael,29
Andy,30
Justin,19
|
使用Parquet Files
Parquet文件允许将schema信息和数据信息固化在磁盘上,以供下一次的读取。
|
//存为Parquet文件
schemaPeople.saveAsParquetFile("people.parquet");
//从Parquet文件中创建JavaSchemaRDD
JavaSchemaRDD parquetFile=sqlCtx.parquetFile("people.parquet");
//注册为一张table
parquetFile.registerAsTable("parquetFile");
JavaSchemaRDD teenagers2=sqlCtx.sql("SELECT * FROM parquetFile WHERE age >= 25");
for(Rowr:teenagers2.collect()){
System.out.println(r.get(0));
System.out.println(r.get(1));
}
|
运行结果
可以看到输出属性的顺序和Bean中的不一样,此处猜测可能采用的字典序,但未经过测试证实。
JSON数据集
Spark SQL还可以采用JSON格式的文件作为输入源。people.json文件内容如下:
|
{"name":"Michael","age":29}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
|
将上方程序示例中代码行8-14行替换为下方代码即可:
|
JavaSchemaRDD schemaPeople=sqlCtx.jsonFile("/home/yurnom/people.json");
|
运行结果与上文相同。此外还可以用如下方式加载JSON数据:
|
List<String>jsonData=Arrays.asList(
"{\"name\":\"Yurnom\",\"age\":26}");
JavaRDD<String>anotherPeopleRDD=sc.parallelize(jsonData);
JavaSchemaRDD anotherPeople=sqlContext.jsonRDD(anotherPeopleRDD);
|
连接Hive
Spark SQL运行使用SQL语句来读写Hive的数据,但由于Hive的依赖包过多,默认情况下要连接Hive需要自行添加相关的依赖包。可以使用以下命令来生成一个含有Hive依赖的Jar,而此Jar必须分发到Spark集群中的每一台机器上去。
|
SPARK_HIVE=truesbt/sbt assembly/assembly
|
最后将Hive的配置文件拷贝至conf文件夹下即可。官方Hive使用示例:
|
// sc is an existing JavaSparkContext.
JavaHiveContext hiveContext=neworg.apache.spark.sql.hive.api.java.HiveContext(sc);
hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL.
Row[]results=hiveContext.hql("FROM src SELECT key, value").collect();
|
总结
Spark SQL将原本就已经封装的很好的Spark原语的使用再简化了一次,使得懂SQL语句的运维人员都可以通过Spark SQL来进行大数据分析。目前来说Spark SQL还处于alpha版本,对于开发人员的意义不大,静观后续的变化。
http://blog.selfup.cn/657.html
分享到:
相关推荐
小结 利用Spark SQL对传统数据源进行分析可以提高我们的工作效率和数据处理能力。通过上面的步骤,我们可以将传统数据源中的数据导入到Spark SQL中,并对其进行处理、分析和展示。Spark SQL的强大功能使得我们可以...
Spark的一些笔记,包含Spark SQL的使用和一些函数的使用
**8.3 小结** - 对综合应用进行了总结。 #### 九、SparkSQL的调优 这一节提供了关于如何优化SparkSQL性能的一些建议。 **9.1 并行性** - 讨论了如何调整并行度以提高查询性能。 **9.2 高效的数据格式** - ...
4. **小结**: - 通过上述步骤,我们学会了如何使用Spark Shell加载一个简单的文本文件。这对于后续的数据处理任务至关重要,比如清洗数据、转换数据格式以及执行复杂的分析操作等。 - 此外,通过交互式地使用Spark...
实验还可能涉及性能调优,如减少shuffle操作、利用Spark SQL的Catalyst优化器、调整内存管理策略等,以提升整体处理速度。 ### 结果分析与结论 实验结果分析包括对比不同操作的时间复杂度、资源消耗,以及优化措施...
#### 实验小结 通过本次实验,不仅掌握了如何使用Spark进行数据处理和分析,还学会了如何利用K-means算法进行聚类分析,并且通过地图可视化的方式呈现结果。这对于理解城市交通状况、出租车分布规律等方面具有重要...
Java 和 Scala 实现 Spark RDD 转换成 DataFrame 的两种方法小结 在本文中,我们将讨论如何使用 Java 和 Scala 将 Spark RDD 转换成 DataFrame,並且介绍两种实现方法。 准备数据源 在项目下新建一个 student.txt...
### 小结 通过以上内容的学习,您将能够熟练地使用 Apache Spark 进行大数据处理。无论是数据探索、SQL 查询、流处理还是机器学习,Spark 都能提供强大的支持。同时,加入 Spark 的开发者社区,可以进一步提升技能...
#### 五、本章小结 通过本章节的学习,我们可以了解到Spark MLlib提供了强大的机器学习工具,特别是`spark.ml`库,它不仅提供了丰富的API来支持机器学习的各种需求,还通过引入ML Pipeline的概念极大地简化了机器...
Hive并不存储数据,而是依赖于HDFS进行数据存储,并利用MapReduce、Tez或Spark作为计算引擎执行SQL语句转化的分布式计算任务。它是一个读多写少的系统,主要用于静态数据分析,不支持频繁的数据修改和删除。 1.1 ...
Parcel是CDH的一种分发机制,用于在集群中安装和管理软件组件,如Hadoop、Spark和Phoenix等。Phoenix是一个基于HBase的SQL查询接口,它允许用户通过SQL语句与NoSQL数据库进行交互,简化了对大数据存储的访问。 描述...
### 小结 通过本篇文章,我们了解到R语言在数据库交互和统计绘图两方面都拥有强大的功能。对于数据库交互而言,RODBC提供了一种简单有效的方法来读取和写入数据;而对于统计绘图,R提供了丰富的绘图函数和参数,...
### 大数据技术之Hive全方位解析...**小结**:通过以上的介绍,我们不仅了解了Hive的基本概念、架构和优缺点,还深入探讨了如何安装和配置Hive。对于想要入门大数据分析领域的开发者来说,掌握Hive是非常重要的第一步。
- Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。 - 实验中选择了Hive 2.1.1版本进行安装,软件包为`apache-hive-2.1.1-bin.tar.gz`。 - Hive需要依赖JDK和...
#### 小结 通过上述步骤,您可以成功地在Centos 7环境中部署和配置HUE,以便高效地管理和分析Hadoop集群中的数据。HUE不仅简化了Hadoop集群的使用,还提供了丰富的图形界面,让非技术背景的用户也能轻松地进行数据...