三月中旬,Spark发布了最新的1.3.0版本,其中最重要的变化,便是DataFrame
这个API的推出。DataFrame
让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍。这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心。DataFrame
像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性的统计工程师。
以一个常见的场景 -- 日志解析为例,有时我们需要用到一些额外的结构化数据(比如做IP和地址的映射),通常这样的数据会存在MySQL,而访问的方式有两种:一是每个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD
的API转化为RDD格式,然后编写繁复的函数去实现检索,显然要写更多的代码。而现在,Spark提供了一种新的选择,一行代码就能实现从MySQL到DataFrame
的转化,并且支持SQL查询。
实例
首先我们在本地放置了一个JSON文件,文件内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
然后我们进入spark-shell
,控制台的提示说明Spark为我们创建了一个叫sqlContext
的上下文,注意,它是DataFrame
的起点。
接下来我们希望把本地的JSON文件转化为DataFrame
:
scala> val df = sqlContext.jsonFile("/path/to/your/jsonfile")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
从控制台的提示可以得知,我们成功创建了一个DataFrame
的对象,包含age
和name
两个字段。
而DataFrame
自带的玩法就多了:
// 输出表结构
df.printSchema()
// 选择所有年龄大于21岁的人,只保留name字段
df.filter(df("age") > 21).select("name").show()
// 选择name,并把age字段自增
df.select("name", df("age") + 1).show()
// 按年龄分组计数
df.groupBy("age").count().show()
// 左联表(注意是3个等号!)
df.join(df2, df("name") === df2("name"), "left").show()
此外,我们也可以把DataFrame
对象转化为一个虚拟的表,然后用SQL语句查询,比如下面的命令就等同于df.groupBy("age").count().show()
:
df.registerTempTable("people")
sqlContext.sql("select age, count(*) from people group by age").show()
当然,Python有同样丰富的API(由于最终都是转化为JVM bytecode
执行,Python和Scala的效率是一样的),而且Python还提供了类Pandas
的操作语法。关于Python的API,可以参考Spark新年福音:一个用于大规模数据科学的API——DataFrame。
MySQL
除了JSON之外,DataFrame
现在已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是通过jdbc
实现的。
对于不同的关系数据库,必须在SPARK_CLASSPATH
变量中加入对应connector的jar包,比如希望连接MySQL
的话应该这么启动spark-shell
:
SPARK_CLASSPATH=mysql-connector-java-x.x.x-bin.jar spark-shell
下面要将一个MySQL表转化为DataFrame
对象:
val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table"))
然后十八般武艺又可以派上用场了。
Hive
Spark提供了一个HiveContext
的上下文,其实是SQLContext
的一个子类,但从作用上来说,sqlContext
也支持Hive数据源。只要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml
文件挪到$SPARK_HOME/conf
路径下,我们就可以直接用Spark查询包含已有元数据的Hive表了:
sqlContext.sql("select count(*) from hive_people").show()
结语
Spark的目标在于成为一个跨环境、跨语言、跨工具的大数据处理和分析平台。DataFrame
的推出很好诠释了这一目标,从初步的使用来看确实很容易上手。随着性能和稳定性的持续优化,我相信某一天所有玩数据的人,都可以使用Spark作为惟一的平台入口。
https://segmentfault.com/a/1190000002614456
相关推荐
### Spark DataFrame将一列展开,把该列所有值都变成新列的方法 在处理大数据时,Apache Spark 是一个非常强大的工具。特别是在数据处理与分析领域,Spark 的 DataFrame API 提供了丰富的功能来帮助用户高效地操作...
Spark DataFrame是Apache Spark中的核心数据结构,它是基于RDD(弹性分布式数据集)的进一步抽象,提供了更加高级的数据处理能力。DataFrame在Spark SQL模块下,它结合了SQL查询的便利性和RDD的灵活性,使得数据处理...
[Spark]将Spark DataFrame中的数值取出有时候经过Spark SQL计算得到的结果往往就一行,而且需要将该结果取出,作为字符串参与别的代码块的
例子中定义了多个List数据集合,包括用户信息,订单信息,用户订单信息,将List对象生成DataFrame,使用SparkSQL查询将多个DataFrame合成一个DataFrame,使用Scala语言编写。
Apache Spark DataFrame是大数据处理领域的一项重要技术,它在分布式数据集(RDD)的基础上,提供了一个更加高效和易于使用的数据处理模型。Spark DataFrame不仅继承了Spark的强大计算能力,还融入了关系型处理的...
在大数据处理领域,`pandas` 和 `Spark DataFrame` 是两个重要的工具。`pandas` 是 Python 中用于数据处理和分析的库,而 `Spark DataFrame` 是 Apache Spark 的核心组件,提供了一种分布式数据处理能力。本文将详细...
Spark DataFrame 使用详解 Spark DataFrame 是一种基于 RDD 的分布式数据集,它提供了详细的结构信息,能够清楚地知道该数据集中包含哪些列、每列的名称和类型。相比于 RDD,DataFrame 的优点在于能够直接获得数据...
简单一个示例,演示Spark中DataFrame的创建与操作
Spark DataFrame 是Spark SQL的核心组件,它是Spark处理结构化数据的主要数据结构。DataFrame相较于RDD(弹性分布式数据集)具有显著的优势,因为它提供了更强的类型安全和更丰富的优化。DataFrame的引入使得Spark更...
然后扩展以将该功能传递给Spark Dataframes。 快速安装 pip install datacompy 熊猫细节 DataComPy将尝试在连接列列表或索引上连接两个数据框。 如果两个数据框具有基于联接值的重复项,则匹配过程将按其余字段排序...
而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。Dataframe可以看作是具有模式信息的分布式数据集。Dataset是类型安全的DataFrame。 在本文中,将详细介绍Spark的RDD API,...
spark连接rabbitmq java代码 消费者consumer。写入mysql
目录(Scala中的Spark示例)Spark RDD示例火花蓄能器介绍将Spark RDD转换为DataFrame | 数据集 Spark SQL教程Spark创建带有示例的DataFrame Spark DataFrame withColumn 重命名Spark DataFrame上的列的方法Spark –...
Databrciks工程师,Spark Committer,Spark SQL...Spark DataFrame vs.RDD,有些类似于动态语言和静态语言的区别,在很多场景下,DataFrame优势比较明显。1.3版中,Spark进一步完善了外部数据源API,并可智能进行优化。
Spark SQL是Spark的一个重要组件,专门用于处理结构化数据,它结合了SQL查询和DataFrame API,使得开发人员可以方便地进行数据查询和分析。在这个主题中,我们将深入探讨Spark SQL的核心概念、功能以及使用方法。 ...
在Spark大数据处理框架中,DataFrame是一种高效且灵活的数据抽象,它提供了一种表格形式的数据集表示,支持SQL查询和其他高级数据分析。在Java中操作Spark DataFrame,我们需要了解几个关键概念和步骤,这通常包括...
litval df : DataFrame = ???import org . hablapps . sparkOptics . _df.select( Lens ( " field.subfield " )(df.schema).set(lit( 13 )) : _ * ) 要立即尝试,请单击活页夹图标,为交互式笔记本午餐。正在安装...
用于CoreNlp SimpleApi注释器的Spark DataFrame包装器方法。 这些方法已通过Spark 2.3.1和Standford版本3.9.1进行了测试。 要导入方法,请import static com.ziad.spark.nlp.functions.* 。 tokenize :使用适合所...