`
m635674608
  • 浏览: 5032391 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark DataFrame小试牛刀

 
阅读更多

三月中旬,Spark发布了最新的1.3.0版本,其中最重要的变化,便是DataFrame这个API的推出。DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍。这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心。DataFrame像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性的统计工程师。

以一个常见的场景 -- 日志解析为例,有时我们需要用到一些额外的结构化数据(比如做IP和地址的映射),通常这样的数据会存在MySQL,而访问的方式有两种:一是每个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD的API转化为RDD格式,然后编写繁复的函数去实现检索,显然要写更多的代码。而现在,Spark提供了一种新的选择,一行代码就能实现从MySQL到DataFrame的转化,并且支持SQL查询。

实例

首先我们在本地放置了一个JSON文件,文件内容如下:

 {"name":"Michael"}
 {"name":"Andy", "age":30}
 {"name":"Justin", "age":19}

然后我们进入spark-shell,控制台的提示说明Spark为我们创建了一个叫sqlContext的上下文,注意,它是DataFrame的起点。
接下来我们希望把本地的JSON文件转化为DataFrame

scala> val df = sqlContext.jsonFile("/path/to/your/jsonfile")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

从控制台的提示可以得知,我们成功创建了一个DataFrame的对象,包含agename两个字段。
DataFrame自带的玩法就多了:

// 输出表结构
df.printSchema()

// 选择所有年龄大于21岁的人,只保留name字段
df.filter(df("age") > 21).select("name").show()

// 选择name,并把age字段自增
df.select("name", df("age") + 1).show()

// 按年龄分组计数
df.groupBy("age").count().show()

// 左联表(注意是3个等号!)
df.join(df2, df("name") === df2("name"), "left").show()

此外,我们也可以把DataFrame对象转化为一个虚拟的表,然后用SQL语句查询,比如下面的命令就等同于df.groupBy("age").count().show()

df.registerTempTable("people")
sqlContext.sql("select age, count(*) from people group by age").show()

当然,Python有同样丰富的API(由于最终都是转化为JVM bytecode执行,Python和Scala的效率是一样的),而且Python还提供了类Pandas的操作语法。关于Python的API,可以参考Spark新年福音:一个用于大规模数据科学的API——DataFrame

MySQL

除了JSON之外,DataFrame现在已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是通过jdbc实现的。

对于不同的关系数据库,必须在SPARK_CLASSPATH变量中加入对应connector的jar包,比如希望连接MySQL的话应该这么启动spark-shell

SPARK_CLASSPATH=mysql-connector-java-x.x.x-bin.jar spark-shell

下面要将一个MySQL表转化为DataFrame对象:

val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table"))

然后十八般武艺又可以派上用场了。

Hive

Spark提供了一个HiveContext的上下文,其实是SQLContext的一个子类,但从作用上来说,sqlContext也支持Hive数据源。只要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml文件挪到$SPARK_HOME/conf路径下,我们就可以直接用Spark查询包含已有元数据的Hive表了:

sqlContext.sql("select count(*) from hive_people").show()

结语

Spark的目标在于成为一个跨环境、跨语言、跨工具的大数据处理和分析平台。DataFrame的推出很好诠释了这一目标,从初步的使用来看确实很容易上手。随着性能和稳定性的持续优化,我相信某一天所有玩数据的人,都可以使用Spark作为惟一的平台入口。

 

 

https://segmentfault.com/a/1190000002614456

分享到:
评论

相关推荐

    spark dataframe 将一列展开,把该列所有值都变成新列的方法

    ### Spark DataFrame将一列展开,把该列所有值都变成新列的方法 在处理大数据时,Apache Spark 是一个非常强大的工具。特别是在数据处理与分析领域,Spark 的 DataFrame API 提供了丰富的功能来帮助用户高效地操作...

    Spark DataFrame详解.zip

    Spark DataFrame是Apache Spark中的核心数据结构,它是基于RDD(弹性分布式数据集)的进一步抽象,提供了更加高级的数据处理能力。DataFrame在Spark SQL模块下,它结合了SQL查询的便利性和RDD的灵活性,使得数据处理...

    roohom#Code-Cookbook#[Spark]将Spark DataFrame中的数值取出1

    [Spark]将Spark DataFrame中的数值取出有时候经过Spark SQL计算得到的结果往往就一行,而且需要将该结果取出,作为字符串参与别的代码块的

    SparkSQ操作DataFrame,合并DataFrame

    例子中定义了多个List数据集合,包括用户信息,订单信息,用户订单信息,将List对象生成DataFrame,使用SparkSQL查询将多个DataFrame合成一个DataFrame,使用Scala语言编写。

    Spark DataFrame

    Apache Spark DataFrame是大数据处理领域的一项重要技术,它在分布式数据集(RDD)的基础上,提供了一个更加高效和易于使用的数据处理模型。Spark DataFrame不仅继承了Spark的强大计算能力,还融入了关系型处理的...

    pandas和spark dataframe互相转换实例详解

    在大数据处理领域,`pandas` 和 `Spark DataFrame` 是两个重要的工具。`pandas` 是 Python 中用于数据处理和分析的库,而 `Spark DataFrame` 是 Apache Spark 的核心组件,提供了一种分布式数据处理能力。本文将详细...

    Spark dataframe使用详解

    Spark DataFrame 使用详解 Spark DataFrame 是一种基于 RDD 的分布式数据集,它提供了详细的结构信息,能够清楚地知道该数据集中包含哪些列、每列的名称和类型。相比于 RDD,DataFrame 的优点在于能够直接获得数据...

    Spark DataFrame 演示Demo

    简单一个示例,演示Spark中DataFrame的创建与操作

    Spark学习笔记(三):Spark DataFrame

    Spark DataFrame 是Spark SQL的核心组件,它是Spark处理结构化数据的主要数据结构。DataFrame相较于RDD(弹性分布式数据集)具有显著的优势,因为它提供了更强的类型安全和更丰富的优化。DataFrame的引入使得Spark更...

    datacompy:熊猫与Spark DataFrame的人类比较

    然后扩展以将该功能传递给Spark Dataframes。 快速安装 pip install datacompy 熊猫细节 DataComPy将尝试在连接列列表或索引上连接两个数据框。 如果两个数据框具有基于联接值的重复项,则匹配过程将按其余字段排序...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。Dataframe可以看作是具有模式信息的分布式数据集。Dataset是类型安全的DataFrame。 在本文中,将详细介绍Spark的RDD API,...

    spark连接rabbitmq java代码 消费者consumer 插入mysql

    spark连接rabbitmq java代码 消费者consumer。写入mysql

    spark-scala-examples:该项目以Scala语言提供了Apache Spark SQL,RDD,DataFrame和Dataset示例

    目录(Scala中的Spark示例)Spark RDD示例火花蓄能器介绍将Spark RDD转换为DataFrame | 数据集 Spark SQL教程Spark创建带有示例的DataFrame Spark DataFrame withColumn 重命名Spark DataFrame上的列的方法Spark –...

    2015 Spark技术峰会-Spark SQL结构化数据分析-连城

    Databrciks工程师,Spark Committer,Spark SQL...Spark DataFrame vs.RDD,有些类似于动态语言和静态语言的区别,在很多场景下,DataFrame优势比较明显。1.3版中,Spark进一步完善了外部数据源API,并可智能进行优化。

    Spark.sql数据库部分的内容

    Spark SQL是Spark的一个重要组件,专门用于处理结构化数据,它结合了SQL查询和DataFrame API,使得开发人员可以方便地进行数据查询和分析。在这个主题中,我们将深入探讨Spark SQL的核心概念、功能以及使用方法。 ...

    JAVA spark创建DataFrame的方法

    在Spark大数据处理框架中,DataFrame是一种高效且灵活的数据抽象,它提供了一种表格形式的数据集表示,支持SQL查询和其他高级数据分析。在Java中操作Spark DataFrame,我们需要了解几个关键概念和步骤,这通常包括...

    sparkOptics:Spark DataFrame的光学元件

    litval df : DataFrame = ???import org . hablapps . sparkOptics . _df.select( Lens ( " field.subfield " )(df.schema).set(lit( 13 )) : _ * ) 要立即尝试,请单击活页夹图标,为交互式笔记本午餐。正在安装...

    spark-corenlp:用于Core Nlp SimpleApi注释器的Spark DataFrame包装器方法

    用于CoreNlp SimpleApi注释器的Spark DataFrame包装器方法。 这些方法已通过Spark 2.3.1和Standford版本3.9.1进行了测试。 要导入方法,请import static com.ziad.spark.nlp.functions.* 。 tokenize :使用适合所...

Global site tag (gtag.js) - Google Analytics