`
qindongliang1922
  • 浏览: 2180910 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117400
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125816
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59781
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71224
社区版块
存档分类
最新评论

在scala中使用spark sql解决特定需求(2)

阅读更多

接着上篇文章,本篇来看下如何在scala中完成使用spark sql将不同日期的数据导入不同的es索引里面。


首下看下用到的依赖包有哪些:


elasticsearch-spark-20_2.11   5.3.2
elasticsearch                 2.3.4
spark-sql_2.11                2.1.0
spark-hive_2.11               2.1.0
spark-core_2.11               2.1.0
hadoop-client                 2.7.3
scala-library                 2.11.8



下面看相关的代码,代码可直接在跑在win上的idea中,使用的是local模式,数据是模拟造的:



import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Row, SparkSession}//导入Row对象

/**
  * spark sql to es 本地测试例子
  */
object SparkGroupES {


  def main(args: Array[String]): Unit = {

    //构建spark session
    val spark = SparkSession
      .builder().master("local[1]")
      .appName("Spark SQL basic example")
      .config("es.nodes","192.168.10.125").config("es.port","9200")
      .getOrCreate()

    //导入es-spark的包
    import org.elasticsearch.spark.sql._
    import spark.implicits._


    //使用Seq造数据,四列数据
    val df = spark.sparkContext.parallelize(Seq(
      (0,"p1",30.9,"2017-03-04"),
      (0,"u",22.1,"2017-03-05"),
      (1,"r",19.6,"2017-03-04"),
      (2,"cat40",20.7,"2017-03-05"),
      (3,"cat187",27.9,"2017-03-04"),
      (4,"cat183",11.3,"2017-03-06"),
      (5,"cat8",35.6,"2017-03-08"))

     ).toDF("id", "name", "price","dt")//转化df的四列数据s
    //创建表明为pro
    df.createTempView("pro")

    import spark.sql //导入sql函数

    //按照id分组,统计每组数量,统计每组里面最小的价格,然后收集每组里面的数据
    val ds=sql("select dt, count(*) as c ,collect_list(struct(id,name, price)) as res  from pro   group by dt ")
    //需要多次查询的数据,可以缓存起来
    ds.cache()
    //获取查询的结果,遍历获取结果集
    ds.select("dt","c","res").collect().foreach(line=>{
      val dt=line.getAs[String]("dt") //获取日期
      val count=line.getAs[Long]("c")//获取数量
      val value=line.getAs[Seq[Row]]("res")//获取每组内的数据集合,注意是一个Row实体
      println("日期:"+dt+" 销售数量: "+count)

      //创建一个schema针对struct结构
      val schema = DataTypes
        .createStructType( Array[StructField](
          DataTypes.createStructField("id", DataTypes.IntegerType, false), //不允许为null
          DataTypes.createStructField("name", DataTypes.StringType, true),
          DataTypes.createStructField("price", DataTypes.DoubleType, true)
        ))
        //将value转化成rdd
        val rdd=spark.sparkContext.makeRDD(value)
        //将rdd注册成DataFrame
        val df =spark.createDataFrame(rdd,schema)
        //保存每一个分组的数据到es索引里面
        EsSparkSQL.saveToEs(df,"spark"+dt+"/spark",Map("es.mapping.id" -> "id"))
//      value.foreach(row=>{//遍历组内数据集合,然后打印
//        println(row.getAs[String]("name")+" "+row.getAs[Double]("price"))
//      })

    })
    println("索引成功")
    spark.stop()
  }

}








分析下,代码执行过程:


(1)首先创建了一个SparkSession对象,注意这是新版本的写法,然后加入了es相关配置


(2)导入了隐式转化的es相关的包


(3)通过Seq+Tuple创建了一个DataFrame对象,并注册成一个表


(4)导入spark sql后,执行了一个sql分组查询


(5)获取每一组的数据



(6)处理组内的Struct结构


(7)将组内的Seq[Row]转换为rdd,最终转化为df


(8)执行导入es的方法,按天插入不同的索引里面


(9)结束


需要注意的是必须在执行collect方法后,才能在循环内使用sparkContext,否则会报错的,在服务端是不能使用sparkContext的,只有在Driver端才可以。





0
0
分享到:
评论

相关推荐

    scala与spark基础

    Scala与Spark是大数据处理领域中的重要工具,它们的结合使用为高效的数据分析提供了可能。Scala是一种多范式编程语言,以其强大的函数式编程特性而受到欢迎,尤其在大数据处理领域,它作为Apache Spark的主要编程...

    本地使用scala操作spark示例.doc

    在Scala中使用Spark进行数据处理时,首先需要创建一个`SparkSession`实例。这一步至关重要,因为所有的Spark应用都是基于`SparkSession`进行构建的。下面我们将详细介绍如何在本地环境中配置并初始化`SparkSession`...

    spring boot + scala + spark http驱动spark计算

    标题中的“spring boot + scala + spark http驱动spark计算”揭示了一个使用现代技术栈构建的数据处理系统。这个系统基于Spring Boot框架来提供HTTP服务,利用Scala作为编程语言,并借助Apache Spark进行大数据计算...

    《Spark SQL编程指南(v1.1.0)

    Spark SQL允许用户注册自定义函数(UDF),可以是Scala、Java或Python函数,使得在SQL查询中使用自定义逻辑变得简单。 **10. 性能调优** Spark SQL提供了一系列性能调优工具,包括调整并行度、缓存策略以及查询计划...

    Spark SQL上海摩拜共享单车数据分析源码

    在这个"Spark SQL上海摩拜共享单车数据分析源码"项目中,开发者利用Spark SQL对上海摩拜共享单车的数据进行了深入分析。项目采用Java语言编写,并借助Maven构建工具来管理依赖项,确保了项目的可重复性和稳定性。 ...

    spark-hive-2.11和spark-sql-以及spark-hadoop包另付下载地址

    在标题"spark-hive-2.11和spark-sql-以及spark-hadoop包另付下载地址"中,我们关注的是Spark与Hive的特定版本(2.11)的集成,以及Spark SQL和Spark对Hadoop的支持。这里的2.11可能指的是Scala的版本,因为Spark是用...

    Spark SQL and DataFrames-java - Spark 1.6.2

    由于文档中包含了对特定版本的描述,它也提到了一些仅在Scala中有效的功能,比如隐式转换的隔离和移除,以及在org.apache.spark.sql中type aliases的移除。这表明从Spark 1.6.2版本开始,开发团队在加强Java和Scala ...

    spark-sql数据.rar

    此外,还可以使用`udf`(用户自定义函数)扩展Spark SQL的功能,以处理特定业务需求。 6. 性能优化: Spark SQL通过DAG执行计划和内存管理实现高效的大数据处理。可以使用分区、缓存和调整执行配置来优化性能。...

    Spark高手之路-Spark SQL编程动手实战

    在这一章节中,你将深入学习Spark SQL的语法和用法,包括创建DataFrame、执行SQL查询、数据转换及聚合操作等。 Spark SQL的DataFrame API允许开发者以类似SQL的方式操作数据,而DataFrame则是一种分布式的、列式...

    Spark SQL简易教程

    由于Spark SQL支持多种语言,因此每种语言都有其特定的DataFrame抽象定义,例如在Scala和Java中是Dataset[T],在Python中是DataFrame,在R中是DataFrame。在后续版本中,为了方便开发者,Spark将DataFrame和Dataset...

    大数据编程Cause of death-使用spark scala编程完成的实验源码

    这个实验旨在帮助开发者掌握Spark和Scala在大数据分析中的实际应用,同时理解如何解决特定问题,如" Cause of death"的数据探索和分析。通过实践,你可以提升在大数据处理、数据清洗、分析和可视化的技能,为未来的...

    Spark3.0安装包

    在开发Spark应用时,可以使用Scala、Java、Python或R语言,Spark提供了丰富的API来方便开发。 总的来说,Spark3.0的安装包提供了一个完整的解决方案,用于大数据的快速处理和分析,尤其适用于需要高性能计算和实时...

    Intro to DataFrames and Spark SQL

    从2014年4月发布的Spark 1.0版本开始,Spark SQL就成为了Spark核心分布包的一部分,并在2015年从Alpha阶段毕业。它不仅仅能够运行标准的SQL查询,还能够运行HiveQL查询,同时提供了对现有Hive部署的替代或补充。 ...

    spark-doc(scala).zip

    在Spark API中,Scala版本通常与特定的Spark版本相匹配,这里是针对scala2.11版本的,这意味着这些文档适用于使用Scala 2.11.x编译的Spark项目。Scala 2.11引入了许多改进和优化,包括更好的类型推断、更强大的反射...

    Spark SQL PDF

    Spark SQL 是 Apache Spark 的一个模块,它允许开发者使用 SQL 语言或者DataFrame API处理数据,同时兼容Hive查询语言(HQL)。Spark SQL 提供了一种统一的方式来操作数据,无论是来自传统的数据库、Hadoop 文件系统...

    intellij的scala工具bin文件

    "intellij的scala工具bin文件"是指IntelliJ IDEA中用于支持Scala编程语言的特定二进制工具集合。 Scala是一种强大的静态类型编程语言,它融合了面向对象和函数式编程的特性,被广泛应用于大数据处理领域,特别是...

    大数据编程Cause of death-使用spark scala编程完成的实验源码+数据集.7z

    在这个实验中,我们将使用Spark SQL加载和查询CSV或Parquet格式的数据集,这些数据可能包含各种国家的死亡率、年龄分布以及死因等信息。 接下来,我们探讨如何使用Scala进行编程。Scala是一种静态类型的编程语言,...

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    scala-2.11.12.tgz

    Scala-2.11.12.tgz是Scala编程语言的一个特定版本,主要针对Apache Spark进行开发。Spark是大数据处理领域的一款流行框架,它利用Scala作为主要的开发语言,提供了高效的分布式计算能力。 Scala 2.11.x系列是该语言...

    spark-2.3.0-bin-hadoop2-without-hive

    然后,我们可以在 Spark 任务中通过编程接口(如 Scala、Python 或 Java API)访问 Hive 表,或者使用 Spark SQL 语句进行查询,只不过这些操作不会直接依赖于 Hive 的执行引擎。 总结一下,"spark-2.3.0-bin-...

Global site tag (gtag.js) - Google Analytics