`
zhao_rock
  • 浏览: 191860 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

Spark将RDD转换成DataFrame的两种方式

阅读更多
介绍一下Spark将RDD转换成DataFrame的两种方式。
1.通过是使用case class的方式,不过在scala 2.10中最大支持22个字段的case class,这点需要注意
2.是通过spark内部的StructType方式,将普通的RDD转换成DataFrame
装换成DataFrame后,就可以使用SparkSQL来进行数据筛选过滤等操作

下面直接代码说话

package spark_rdd

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

object SparkRDDtoDF {
  
  //StructType and convert RDD to DataFrame
  def rddToDF(sparkSession : SparkSession):DataFrame = {
    //设置schema结构
    val schema = StructType(
      Seq(
        StructField("name",StringType,true)          
        ,StructField("age",IntegerType,true)
      )
    )
    val rowRDD = sparkSession.sparkContext
      .textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
      .map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))  
    sparkSession.createDataFrame(rowRDD,schema)
  }
  
  //use case class Person
  case class Person(name:String,age:Int)
  def rddToDFCase(sparkSession : SparkSession):DataFrame = {
    //导入隐饰操作,否则RDD无法调用toDF方法
    import sparkSession.implicits._
    val peopleRDD = sparkSession.sparkContext
      .textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
      .map( x => x.split(",")).map( x => Person(x(0),x(1).trim().toInt)).toDF()
    peopleRDD
  }
  
  def main(agrs : Array[String]):Unit = {
      val conf = new SparkConf().setMaster("local[2]")
      conf.set("spark.sql.warehouse.dir","file:/E:/scala_workspace/z_spark_study/")
      conf.set("spark.sql.shuffle.partitions","20")
      val sparkSession = SparkSession.builder().appName("RDD to DataFrame")
            .config(conf).getOrCreate()
       //通过代码的方式,设置Spark log4j的级别
      sparkSession.sparkContext.setLogLevel("WARN")
      import sparkSession.implicits._
      //use case class convert RDD to DataFrame
      //val peopleDF = rddToDFCase(sparkSession)
      
      //use StructType  convert RDD to DataFrame
      val peopleDF = rddToDF(sparkSession)
      peopleDF.show()
      peopleDF.select($"name",$"age").filter($"age">20).show()
      
  }
  
}
0
0
分享到:
评论

相关推荐

    Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

    在本文中,我们将讨论如何使用 Java 和 Scala 将 Spark RDD 转换成 DataFrame,並且介绍两种实现方法。 准备数据源 在项目下新建一个 student.txt 文件,内容如下: 1,zhangsan,20 2,lisi,21 3,wanger,19 4,...

    spark: RDD与DataFrame之间的相互转换方法

    在大数据处理框架Spark中,RDD(弹性分布式数据集)和DataFrame是两种常用的抽象数据类型,它们分别对应于不同的操作接口和优化技术。RDD是最基础的分布式数据集合,提供了一种容错的并行处理数据的方式。而...

    Spark DataFrame详解.zip

    使用`sqlContext.createDataFrame()`方法可以将RDD转换为DataFrame,而`DataFrame.select()`和`DataFrame.filter()`等方法则允许我们对已有DataFrame进行操作,构造新的DataFrame。 3. Action操作: Action是触发...

    spark rdd转dataframe 写入mysql的实例讲解

    RDD到DataFrame的转换主要有两种方式: 1. **隐式转换**:这是最常用的方式,通过导入`SQLContext`的隐式转换,可以直接将RDD转换为DataFrame。在这个例子中,首先定义了一个case类`memberbase`来表示数据结构,...

    RDD、DataFrame和DataSet三者之间的关系

    - DataFrame可以从RDD转换而来,如 `rdd.toDF()`,并且可以转换回RDD,如 `df.rdd`。 - Dataset可以从DataFrame转换得到,如 `df.toDS()`,或者通过编码器以特定类型创建,如 `df.as[CaseClass]`,同时,Dataset也...

    pandas和spark dataframe互相转换实例详解

    本文将详细介绍如何在 `pandas` 和 `Spark DataFrame` 之间进行数据转换,以便在不同场景下灵活使用这两种工具。 首先,我们来创建一个 `Spark DataFrame` 从现有的 `pandas DataFrame`。`SparkSession` 是 Spark 2...

    【SparkSql篇01】SparkSql之DataFrame和DataSet1

    从RDD转换为DataFrame有两种常见方式:手动转换和样例类转换。手动转换通常涉及创建一个Schema并使用`toDF()`方法。样例类转换更为推荐,因为它提供了类型安全和编译时检查。首先,定义一个样例类,然后使用...

    Spark编程基础:Spark SQL单元测验与答案.docx

    * RDD 转换为 DataFrame 包含两种典型方法:利用反射机制推断 RDD 模式和使用编程方式定义 RDD 模式。 * 使用编程方式定义 RDD 模式时,主要包括三个步骤:制作“表头”、制作“表中的记录”和把“表头”和“表中的...

    Spark学习笔记(三):Spark DataFrame

    创建DataFrame有两种主要方式:一是通过反射机制推断RDD的模式,二是通过编程方式定义RDD模式。 1. 反射机制推断模式: 当RDD的数据结构已知时,可以使用这种方法。首先,定义一个Row类,然后将RDD转换为Row类型的...

    SparkSQL入门级教程

    将 RDD 转换为 DataFrame,主要有以下两种方式: 1. **使用 `toDF` 方法**:如果你的 RDD 是由元组或者case class 构成的,可以使用 `toDF` 方法直接转换。例如,如果你有一个由元组组成的 RDD,可以通过导入 `sqc....

    mySparkSqlDoc

    Spark SQL可以构建在RDD之上,通过DataFrame API将RDD转换为DataFrame,然后利用SQL进行复杂查询。这种方式既能利用RDD的灵活性,又能享受SQL的便利。同时,DataFrame API也提供了toRdd()方法,将DataFrame转换回RDD...

    Spark大数据技术与应用教学大纲.pdf

    - PySpark的安装与配置:通过pip或离线包两种方式安装PySpark。 - PyCharm集成:设置PyCharm以支持Spark应用开发,包括基本配置和应用编写。 4. Spark RDD: - RDD基础知识:定义、特点和创建方法。 - ...

    Spark 2.0入门+项目实战

    RDD支持两种操作类型:转换(Transformation)和动作(Action)。转换操作是对数据集进行某种操作并返回一个新的数据集,而动作则会触发实际的计算过程并将结果返回到驱动程序中。 #### 2.2 DataFrame API DataFrame ...

    spark_code_basic

    RDD提供了两种操作类型:转换(Transformation)和动作(Action)。转换创建新的RDD,而动作则触发计算并可能返回结果到驱动程序。 2. **Spark架构** Spark采用了主-从架构,由Driver Program、SparkContext、...

    spark技术讲解

    2. **DAG执行计划**:Spark将一系列RDD转换和动作组合成一个有向无环图(DAG),这个图代表了任务的执行顺序。DAG调度器将任务分解为Stage,Stage内部的任务尽可能在同一台机器上并行执行,减少了数据在网络中的传输...

    Spark从入门到精通

    6、大量全网唯一的知识点:基于排序的wordcount,Spark二次排序,Spark分组取topn,DataFrame与RDD的两种转换方式,Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、...

    Spark技术内幕 深入解析Spark内核架构设计与实现原理

    用户编写的应用程序被分解为一系列的RDD转换和动作,这些转换和动作构成一个DAG,Spark的JobScheduler会将这个DAG拆分成多个Stage。Stage是基于数据依赖关系划分的,每个Stage内部的任务可以并行执行,提高了整体...

    spark全案例

    3. RDD(弹性分布式数据集)操作:RDD是Spark的基本数据抽象,包括转换(transformations)和动作(actions)两种操作。 三、数据读写 1. 读取数据:可以使用Java API从多种数据源(如HDFS、Cassandra、HBase等)...

    深入理解Spark 核心思想与源码分析

    2. **运算模型:转换与行动**:Spark支持两种操作类型——转换(Transformation)和行动(Action)。转换创建新的RDD,但不立即执行;行动触发计算,并可能返回结果或写入外部存储。 3. **内存计算**:Spark的一大...

Global site tag (gtag.js) - Google Analytics