前两天开始研究SparkSQL, 其主要分为HiveContext以及SQLContext
目前打算先学习SQLContent, 因为Hive环境还没搭好,
一步一步来 先把spark的原理弄明白后再去研究hadoop的组件。
这篇文章主要是讲如何使用SQLContext去读取csv文件, 然后根据表头注册表, 进行数据分析
要通过SQLContext去操作csv文件, 那么我们需要用到spark-csv_xxx.jar
sbt地址为:
// https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.4.0"
maven地址:
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
引入jar包后, 我们创建一个scala object, 在里面创建sparkconfig, sparkcontext, 以及SQLContext
val sparkconfig = new SparkConf().setAppName("SQLTest").setMaster("local[2]")
val sc = new SparkContext(sparkconfig)
val sqlContext = new SQLContext(sc)
在天池里面参加一个项目后拿到了一个一份数据, 这个方法里面主要就是分析这份数据。 数据主要结构为: (2.csv)
re_monitor_id re_index value release_time exceed_flag
主要是一个排污检测数据, 每一个小时检测一次, 有很多个站点, 记录不一样项目的监测值, 如果这个项目超标了, 那么 exceed_flag就变成t否则是f
re_monitor_id是检测点id
re_index 是检测项目
value是检测值
release_time是检测时间
还有一个csv文件 存储所有的检测项目, 我们只用到一个表头 (3.csv)
index
检测项目
首先第一步, 把两个文件中有用的表头注册一下, 注册到临时表:
val companyEmission = sqlContext.csvFile("C:/cache/2.csv").select("re_monitor_id", "re_index", "re_monitor_info_id", "value", "release_time", "exceed_flag").toDF
val typeEmission = sqlContext.csvFile("C:/cache/3.csv").select("monitor_id", "index").toDF
companyEmission.registerTempTable("cp");
typeEmission.registerTempTable("type")
然后把检测类型取出来作为一个数组 (由于项目不多, 不用担心OOM)
var monitorType = sqlContext.sql("select distinct(index) from type").toDF
val typeArray = monitorType.collect
其次再将检测数据拿过来, 存成(re_index, re_monitor_id_release_time) 的(K,V)格式 --- 只取exceed_flag为t的row
val overData = sqlContext.sql("select re_monitor_id, re_index, release_time from cp where exceed_flag =" + '"' + "t" + '"').map(f => (f.get(1).toString, f.get(0).toString + "_" + f.get(2)))
遍历检测类型, 目的是获取每个检测类型根据每个小时来看, 一个月内在同一小时内超标最多的检测点
typeArray.foreach(x => {
var typeD = x.get(0).toString
val sepData = overData.filter(y => y._1.contains(typeD)).map(z =>
{
// println(z._2.split(" ").apply(1))
val date = sp.parse(z._2.split("_").apply(1).toString().substring(0, 19))
val hour = date.getHours()
(hour, z._2.split("_").apply(0).toString())
}).groupByKey
var sortedData = sepData.sortBy(_._2.size, false)
if (sortedData.count > 0) {
println("****************************************************")
println("result for " + typeD + "is")
println("Hour: " + sortedData.first._1 + "\r\n" + "Times: " + sortedData.first._2.size)
println("****************************************************")
}
})
这里只取出了各种类型的污染, 哪个小时超排最多。 其实我们可以根据数据, 获取到哪些点对哪些污染数据超标最多, 然后还可以知道是不是每次在晚上特定时间断排放的特别多。 这样分析出来的数据 就比较有用了. 数据请看附件
分享到:
相关推荐
SparkSQL的DataFrame提供了丰富的数据处理能力,结合CSV文件的加载和处理,使得数据分析变得更加简单和高效。通过对`BeijingPM20100101_20151231_noheader.csv`文件的处理,我们可以深入理解DataFrame的创建和操作,...
- 从结构化的文件(如CSV、JSON等)中读取数据创建。 #### 四、总结 SparkSQL通过其强大的功能和灵活性,成为了处理结构化数据的重要工具。通过DataFrame和DataSet这样的高级抽象,开发者可以更简单高效地进行...
- **统一的数据访问方式**:可以使用相同的API处理不同来源的数据,如Hive表、JSON、CSV等。 #### 二、DataFrames详解 **2.1 DataFrame的概念** DataFrame是一种分布式的、带有模式的不可变表,它是RDD的一种更...
Spark SQL是Apache Spark框架的一部分,专门用于处理结构化和半结构化数据的查询和分析。它结合了SQL查询和DataFrame API,使得数据处理更加高效且易于理解。在本压缩包中,你将学习到如何使用Spark SQL进行基本操作...
1. CSV数据读取与写入:演示如何从CSV文件读取数据,并将结果写回到新的CSV文件。 2. SQL查询:展示如何使用SQL查询DataFrame,并返回结果。 3. 转换操作:包括选择特定列、过滤数据、分组聚合等操作。 4. Join操作...
例如,读取HDFS上的JSON或CSV文件到DataFrame只需调用`read.json()`或`read.csv()`方法,并传递文件路径。 从RDD转换为DataFrame有两种常见方式:手动转换和样例类转换。手动转换通常涉及创建一个Schema并使用`toDF...
1. 从结构化数据文件(如 CSV、JSON、Parquet 等)加载。 2. 从 Hive 中的表直接读取。 3. 从外部数据库(如 MySQL、PostgreSQL 等)导入。 4. 将已有的 RDD 转换成 DataFrame。 RDD 转换成 DataFrame 是通过 Spark...
Spark SQL是Apache Spark的一部分,它提供了一种交互式的方式来处理数据,使得数据分析人员可以像使用SQL一样操作分布式数据。在Spark SQL中,DataFrame是主要的数据结构,它支持各种数据源的读取和写入操作。本篇...
- `.read`用于读取各种数据源,如CSV、JSON、Parquet等,转化为DataFrame。 Spark SQL 的这些特性使其成为大数据处理和分析的强大工具,无论是在数据工程、数据科学还是机器学习项目中,都能发挥重要作用。它简化...