`
bo_hai
  • 浏览: 564637 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

基于spark的DataFrame实战

阅读更多

Spark 中的另一核心功能是DataFrame,方便处理结构化数据。实例中还是以上一篇博客中的数据为基础。

我们要求以下数据:

1、查看338用户的评分记录;

2、将结果保存成csv格式;

3、评论电影最多的用户id;

4、被用户评论最多的电影id、title;

5、评论电影年龄最小者、最大者;

6、25至30岁的用户最喜欢的电影;

7、最受用户喜爱的电影;

代码如下:

 

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}


/**
  * 更多内容请参考:http://www.iteblog.com/archives/1566#DataFrame-4
  *
  */
object MoviesDataStatistics {

  case class Ratings(userId: Int, movieId: Int, rating: Double)

  case class Movies(id: Int, movieTitle: String, releaseDate: String)

  case class Users(id: Int, age: Int, gender: String)

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("MoviesDataStatistics")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val ratingsDF: DataFrame = sc.textFile("/data/ratings.data").map(x => x.split("::")).map(line => Ratings(line(0).toInt, line(1).toInt, line(2).toDouble)).toDF()
    ratingsDF.registerTempTable("ratings")
    //查看338评分记录条数
    println("sql for 338 rateing info is : ")
    sqlContext.sql("select * from ratings where userId = 338").show()
    println("dataframe 338 rateing info is : ")
    ratingsDF.filter(ratingsDF("userId").equalTo(338)).show()

    val userDataDF = sc.textFile("/data/user.data").map(x => x.split("[|]")).map(line => Users(line(0).toInt, line(1).toInt, line(2))).toDF()
    userDataDF.registerTempTable("users")
    sqlContext.sql("select * from users where id = 338").show()
    userDataDF.filter(userDataDF("id").equalTo(338)).show()

    val movieDF = sc.textFile("/data/movies.data").map(x => x.split("::")).map(line => Movies(line(0).toInt, line(1), line(2))).toDF()
    movieDF.registerTempTable("movies")
    movieDF.collect()
    sqlContext.sql("select * from movies where id = 1").show()
    movieDF.filter(movieDF("id").equalTo(1)).show()

    sqlContext.sql("select r.userId,m.movieTitle,r.rating from movies m inner join ratings r on m.id = r.movieId and r.userId = 338 order by r.rating desc ").show()
    val resultDF = movieDF.join(ratingsDF.filter(ratingsDF("userId").equalTo(338)), movieDF("id").equalTo(ratingsDF("movieId")))
      .sort(ratingsDF("rating").desc).select("userId", "movieTitle", "rating")

    resultDF.collect().foreach(println)
    import org.apache.spark.sql.functions._
    //将结果保存至csv格式
    //val saveOptions = Map("header" -> "true", "path" -> "/data/rat_movie.csv")
    //resultDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveOptions).save()
    // 评论电影最多的用户id
    sqlContext.sql("select userId,count(*) as count from ratings group by userId order by count desc ").show(1)
    val userIdCountDF = ratingsDF.groupBy("userId").count()
    userIdCountDF.join(userIdCountDF.agg(max("count").alias("max_count")), $"count".equalTo($"max_count")).select("userId").show(1)

    // 被用户评论最多的电影id、title
    val movieIDGroupDF = ratingsDF.groupBy("movieId").count()
    val movieCountDF = movieIDGroupDF.join(movieIDGroupDF.agg(max("count").alias("max_count"))).filter($"count".equalTo($"max_count"))
    //星球大战是被用户评论最多的电影
    movieCountDF.join(movieDF).filter($"movieId".equalTo($"id")).select("movieId", "movieTitle", "releaseDate").show()

    // 评论电影年龄最小者、最大者
    // 年龄最大的73岁,最小的7岁
    ratingsDF.join(userDataDF, ratingsDF("userId").equalTo(userDataDF("id")))
      .agg(min($"age").alias("min_age"), max($"age").alias("max_age"))
      .join(userDataDF, $"age".isin($"min_age", $"max_age"))
      .select("id", "age", "gender").show(2)
    // https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/GroupedData.html
    // 25至30岁的用户欢迎的电影
    userDataDF.filter($"age".between(25, 30)).join(ratingsDF, $"id".equalTo($"userId"))
      .select("userId", "movieId", "rating").join(movieDF, $"rating".equalTo(5)).select("movieId", "movieTitle").show(10)
    // 最受用户喜爱的电影
    ratingsDF.groupBy("movieId").agg(avg("rating").alias("avg_rate"))
      .sort($"avg_rate".desc).limit(10)
      .join(movieDF, $"movieId".equalTo($"id"))
      .select("movieTitle").show(false)
  }
}

 总结:

 

1、创建DF时需要引入import sqlContext.implicits._

2、使用DF函数时,需要import org.apache.spark.sql.functions._

3、DF的函数功能非常强大,基本的函数功能一定要掌握;

4、个人认为DF的功能比Sql的功能强大

参考:

https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice3/

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/GroupedData.html

0
1
分享到:
评论

相关推荐

    spark商业实战三部曲

    1.2.1 通过DataFrame实战电影点评系统案例... 7 1.2.2 通过DataSet实战电影点评系统案例... 1.3 Spark 2.2源码阅读环境搭建及源码阅读体验... 11 第2章 Spark2.2技术及原理... 14 2.1 Spark 2.2综述... 14 ...

    基于spark的电影点评系统

    **基于Spark的电影点评系统详解** 本项目是一个大三下学期的课程设计,核心是利用Apache Spark构建一个电影点评系统,旨在对用户的行为数据进行分析,以便为用户提供个性化的电影推荐。Spark作为大数据处理框架,以...

    SparkSql和DataFrame实战.docx

    - **优化的查询执行**:基于DataFrame的查询可以被Spark SQL自动优化,以提高查询效率。 - **广泛的兼容性**:Spark支持多种数据源,包括JSON、Avro、Parquet、SequenceFile等多种格式的外部文件,Hive、关系数据库...

    大数据期末课设~基于spark的气象数据处理与分析

    例如,使用Spark的DataFrame API读取JSON数据,处理缺失值和异常值,然后通过groupByKey或groupBy函数按城市进行分组,计算每个城市的气温和降水量平均值。此外,可能还会运用窗口函数来计算时间序列上的滑动平均值...

    基于SPARK的大数据实战(在线电影推荐)

    在本实践项目“基于SPARK的大数据实战(在线电影推荐)”中,我们将深入探讨如何利用Apache Spark这一高效的大数据处理框架,结合机器学习算法,为在线用户提供个性化的电影推荐服务。这个项目不仅展示了大数据技术...

    基于Spark2.x机器学习十大案例全方位剖析视频教程

    基于Spark2.x机器学习十大案例全方位剖析视频教程 十大案例全方位剖析: 案例1、基于Kaggle的StumbleUpon数据集构建分类系统 案例2、基于BikeSharing数据集构建回归模型 案例3、基于NewsCorpora数据集文本处理新闻...

    Spark商业实战三部曲源码

    例如,你可能会看到如何使用Spark SQL进行复杂的数据分析,如何利用DataFrame和Dataset API进行高效的数据操作,或者如何构建Spark Streaming应用来处理实时数据流。 在实践中,你将学习到如何设置Spark环境,配置...

    基于Java实现Spark2x新闻网大数据实时分析可视化系统项目【100012794】

    该项目名为“基于Java实现Spark2x新闻网大数据实时分析可视化系统”,是一个综合性的大数据处理与可视化解决方案,主要利用Java和Spark2x技术栈进行构建。在实际的企业环境中,此类项目对于实时处理海量新闻数据,...

    大数据Spark企业级实战

    2. **Spark架构设计**:Spark基于Master-Worker模式,由一个Driver程序控制多个Executor进程进行计算。Spark的内存计算模型使其在处理大量数据时速度远超Hadoop MapReduce,因为它减少了磁盘I/O,更多地利用了内存...

    Spark入门实战相关文档

    Spark的架构基于Master-Worker模式,由一个主节点(Spark Master)管理和调度多个工作节点(Worker Nodes)。工作节点上运行的任务被称为Executor,它们负责执行计算任务并存储中间结果在内存中。Spark作业(Job)被...

    Spark大数据商业实战三部曲_内核解密_商业案例_性能调优 实例源码

    2. Spark SQL:支持SQL查询和DataFrame/Dataset API,方便数据处理。 3. Spark Streaming:基于微批处理的实时流处理。 4. MLlib:机器学习库,包含多种算法和实用工具。 5. GraphX:图计算框架,适用于社交网络分析...

    Spark实战高手之路-第6章Spark SQL编程动手实战(1)

    - **第四阶段:掌握基于Spark的核心框架应用** - 包括Spark SQL、MLlib(机器学习库)、GraphX(图计算库)、Spark Streaming(流处理库)等。 - 这些工具和技术可以帮助解决不同类型的数据处理需求。 - **第五...

    超市spark数据处理和数据分析项目实战Dataframe风格

    spark数据处理和数据分析项目实战Dataframe风格里面包括数据和代码,启动idea就可以练习

    Spark实战开发

    《Spark实战高手之路-从零开始》文档可能涵盖了从基础概念到进阶技巧的全程指导,包括Spark项目的创建、Spark Shell的使用、DataFrame/Dataset的创建与操作、Spark SQL的查询语法、Spark Streaming的基本用法,以及...

    基于spark的商品推荐系统.zip

    《基于Spark的商品推荐系统》 在当今大数据时代,利用人工智能技术进行商品推荐已经成为电商行业的常态。Spark作为一款高效的大数据处理框架,凭借其强大的并行计算能力,被广泛应用于推荐系统的构建。本压缩包...

    基于Spark的家庭收支统计和分析管理系统.zip

    《基于Spark的家庭收支统计与分析管理系统详解》 在大数据处理领域,Apache Spark以其高效、易用的特点成为众多开发者和研究者的首选工具。本项目“基于Spark的家庭收支统计和分析管理系统”是一个利用Spark技术...

    基于Spark+Scala+MongoDB的大数据实战,商品推荐系统设计与实现.zip

    本项目“基于Spark+Scala+MongoDB的大数据实战,商品推荐系统设计与实现”则深入展示了如何利用这些技术进行实际的业务应用。Scala作为Spark的主要编程语言,提供了简洁且高效的API,而MongoDB作为NoSQL数据库,对于...

    Hadoop+Spark生态系统操作与实战指南.epub

    第2部分(第8~11章)讲解Spark的原生态组件,包括SparkCore、SparkSQL、SparkStreaming、DataFrame,以及介绍Scala、SparkAPI、SparkSQL、SparkStreaming、DataFrame原理和CDH版本环境下实战操作,其中Flume和Kafka...

    项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)

    在本项目实战中,我们将探讨如何使用Java编程语言,结合Spark和Hive,将Hive中的数据高效地导入到ElasticSearch(ES)中,并利用ES的别名机制实现数据更新的平滑过渡。以下是对这个流程的详细解析: 1. **Hive数据...

    计算机课程毕设:基于Spark2.2的新闻网大数据实时分析系统设计与实现.zip

    该项目是针对计算机科学与技术领域的学生,特别是那些在学习数据处理和分布式计算方面的人群,设计的一个基于Spark 2.2的新闻网大数据实时分析系统。这个系统旨在展示如何利用Apache Spark的强大功能,处理大规模...

Global site tag (gtag.js) - Google Analytics