`
功夫小当家
  • 浏览: 186466 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

spark - RDD转成DataFrame

阅读更多

1.RDD转成DataFrame的两种方式:

package df

import org.apache.spark.sql.{Row, SparkSession}

object RDD2DataFrame {

  case class Person(name: String, score: Int)

  def main(args: Array[String]): Unit = {
    //enableHiveSupport()开启支持hive
    val spark = SparkSession.builder().appName("DF_DEMO").master("local").getOrCreate()

    //---------------1.基于反射的方式(必须事先知道schema,通过case class定义schema,通过反射拿到case class中的字段和类型 ,spark1.6版本case class只支持22个字段,高版本不限制字段个数)---------------------
    /**
      * 1.创建case class
      * 2.创建rdd => rdd [case class] => .toDF().
      */
    //导入隐式转换,才能调用toDF()方法
    import spark.implicits._

    /**
      * 测试数据内容如下:
      * a,100
      * b,90
      * c,80
      */
    val df = spark.sparkContext.textFile("file:///F:\\test\\2.txt").map(x => x.split(",")).map(x => Person(x(0), x(1).toInt)).toDF()
    df.show()

    /**
      * 输出结果:
      * +--------+
      * |   value|
      * +--------+
      * |name : a|
      * |name : b|
      * |name : c|
      * +--------+
      */
    df.map(x => "name : " + x.getAs[String]("name")).show()

    //---------------2. 基于编程的方式指定 ---------------------
    /**
      * 1.创建schemaString =>  StructField => StructType
      * 2.创建rdd => Rdd[Row]
      * 3.spark.createDataFrame(rowRDD, StructType)
      */
    //导入隐式转换(否则StringType找不到)
    import org.apache.spark.sql.types._

    val schemaString = "name score"
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    val rowRDD = spark.sparkContext.textFile("file:///F:\\test\\2.txt").map(x => x.split(",")).map(x => Row(x(0), x(1)))
    val df2 = spark.createDataFrame(rowRDD, schema)
    df2.show()
    spark.stop()
  }
}

 

0
0
分享到:
评论

相关推荐

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在本文中,将详细介绍Spark的RDD API,以及如何在RDD、DataFrame和Dataset之间相互转换,以及如何使用Spark SQL进行操作。 首先来看RDD API的聚合操作aggregate。aggregate函数是RDD API中非常重要的一个动作操作,...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合,可以在集群中并行操作。 - DataFrame:Spark SQL引入的数据模型,它是基于表和列的抽象,提供了更高级别的抽象和优化。 - Dataset:...

    spark: RDD与DataFrame之间的相互转换方法

    在大数据处理框架Spark中,RDD(弹性分布式数据集)和DataFrame是两种常用的抽象数据类型,它们分别对应于不同的操作接口和优化技术。RDD是最基础的分布式数据集合,提供了一种容错的并行处理数据的方式。而...

    Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

    Java 和 Scala 实现 Spark RDD 转换成 DataFrame 的两种方法小结 在本文中,我们将讨论如何使用 Java 和 Scala 将 Spark RDD 转换成 DataFrame,並且介绍两种实现方法。 准备数据源 在项目下新建一个 student.txt...

    spark-assembly-1.5.2-hadoop2.6.0.jar

    - Spark SQL:提供了对结构化数据的处理能力,可以将SQL查询直接转换为DataFrame操作,方便数据分析。 - Spark Streaming:处理实时数据流,通过微批处理实现高吞吐量和低延迟的数据处理。 - MLlib:Spark的机器...

    spark-2.4.7-bin-hadoop2.6.tgz

    4. **Spark SQL**:Spark SQL是Spark的一个模块,用于处理结构化数据,它集成了SQL查询与DataFrame API,提供了一种统一的方式来处理结构化和半结构化数据。 5. **Spark Streaming**:Spark Streaming提供了微...

    spark-1.6.0-bin-hadoop2.6.tgz

    - **Spark SQL**:Spark的SQL模块,支持使用SQL查询DataFrame,同时兼容Hive的元数据和SQL语法。 - **Spark Streaming**:用于实时流数据处理,通过将数据流划分为微批次进行处理。 - **MLlib**:机器学习库,包含...

    spark-3.1.3-bin-without-hadoop.tgz

    Spark Streaming则构建在RDD之上,通过微批处理实现对实时数据流的处理,支持复杂的窗口操作和状态管理。这对于实时监控、在线分析等应用场景非常有用。 MLlib是Spark的机器学习库,包含了多种算法如分类、回归、...

    spark-3.1.2-bin-hadoop3.2.tgz

    2. **Resilient Distributed Datasets (RDD)**:RDD是Spark的基本数据结构,它是不可变、分区的元素集合,能够在集群中的节点上分布式存储。 3. **弹性**:Spark提供了容错机制,当工作节点失败时,可以自动恢复丢失...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    8. **生态系统**: Spark的生态非常丰富,包括DataFrame/Dataset API、Spark SQL、Spark Streaming、GraphX(图计算)、MLlib(机器学习库)和SparkR(R语言接口)。这些组件可以组合使用,满足各种大数据处理需求。 ...

    spark-2.2.2-bin-hadoop2.7.tgz

    在Spark 2.2.2中,除了基本的RDD接口,还引入了DataFrame和Dataset,它们提供了一种更高级的、类型安全的数据处理方式,使得开发人员能够更方便地进行SQL查询和复杂的数据分析。 Spark 2.2.2支持多种数据源,包括...

    spark-3.1.3-bin-hadoop3.2.tgz

    使用Spark时,你可以编写Python、Scala或Java代码来创建DataFrame、RDD,并利用Spark的并行计算能力处理大数据。 总之,Apache Spark 3.1.3 是一个强大且功能丰富的大数据处理工具,尤其适用于需要高性能、实时处理...

    spark-1.6.0-bin-hadoop2.4.tgz

    此外,Spark的弹性分布式数据集(Resilient Distributed Datasets, RDD)是其核心概念,它提供了一种在内存中处理数据的方式,大大提升了计算效率。Spark 1.6.0还引入了DataFrame,进一步优化了数据处理性能,简化了...

    spark-2.4.0-bin-hadoop2.6.tgz

    1. **Spark核心概念**:Spark的核心组件是弹性分布式数据集(Resilient Distributed Datasets, RDD),这是一种容错的、不可变的数据集合,可以在集群中的多个节点上并行操作。此外,Spark还提供了DataFrame和...

    spark-2.2.0-bin-hadoop2.6.tgz

    而Dataset API是DataFrame的类型安全版本,它结合了RDD的高性能和DataFrame的易用性。 3. **机器学习库MLlib**:Spark的机器学习库在2.2.0中进一步加强,新增和改进了多种算法,如支持向量机(SVM)、随机森林...

    spark-assembly-1.5.2-hadoop2.6.0jar包

    在实际应用中,开发人员可以利用Spark的API编写Scala代码,创建DataFrame或RDD(弹性分布式数据集),并利用Spark的并行计算能力对数据进行处理。例如,可以使用Spark SQL执行复杂查询,或者使用Spark Streaming实现...

    spark-2.3.0-bin-hadoop2.7版本.zip

    1. **弹性分布式数据集(Resilient Distributed Datasets, RDD)**:RDD是Spark的基本数据结构,它是不可变的、分区的、容错的。用户可以通过并行操作对RDD进行计算,且RDD之间的转换都是延迟执行的,直到需要结果时...

    spark-2.1.1-bin-hadoop2.7.tgz.7z

    2. **Spark SQL**:用于处理结构化数据,它支持SQL查询并通过DataFrame API提供了与多种数据源的交互。 3. **Spark Streaming**:提供了对实时数据流处理的支持,可以处理来自各种源(如Kafka、Flume或TCP套接字)的...

    spark-scala-examples:该项目以Scala语言提供了Apache Spark SQL,RDD,DataFrame和Dataset示例

    目录(Scala中的Spark示例)Spark RDD示例火花蓄能器介绍将Spark RDD转换为DataFrame | 数据集 Spark SQL教程Spark创建带有示例的DataFrame Spark DataFrame withColumn 重命名Spark DataFrame上的列的方法Spark –...

    spark-2.0.0-bin-hadoop2.7.tgz.zip

    Spark的核心在于它的弹性分布式数据集(Resilient Distributed Datasets,简称RDD),这是一种可以在集群中并行操作的数据结构。RDD具有容错性,一旦创建,就能在节点故障时自动恢复。Spark 2.0.0在此基础上引入了...

Global site tag (gtag.js) - Google Analytics