`
qindongliang1922
  • 浏览: 2189087 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117678
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126079
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:60034
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71405
社区版块
存档分类
最新评论

使用Spark SQL的临时表解决一个小问题

阅读更多


最近在使用spark处理一个业务场景时,遇到一个小问题,我在scala代码里,使用spark sql访问hive的表,然后根据一批id把需要的数据过滤出来,本来是非常简单的需求直接使用下面的伪SQL即可:
````
select * from table where  id in (id1,id2,id3,id4,idn)
````



但现在遇到的问题是id条件比较多,大概有几万个,这样量级的in是肯定会出错的,看网上文章hive的in查询超过3000个就报错了。

如何解决?

主要有两种解决方法:


(一)分批执行,就是把几万个id,按3000一组查询一次,最后把所有的查询结果在汇合起来。


(二)使用join,把几万个id创建成一张hive表,然后两表关联,可以一次性把结果给获取到。




这里倾向于第二种解决办法,比较灵活和方便扩展,尽量不要把数据集分散,一旦分散意味着客户端需要做更多的工作来合并结果集,比如随便一个sum或者dinstict,如果是第一种则需要在最终的结果集再次sum或者distinct。


下面看看如何使用第二种解决:


由于我们id列表是动态的,每个任务的id列表都有可能变换,所以要满足第二种方法,就得把他们变成一张临时表存储在内存中,当spark任务停止时,就自动销毁,因为他们不需要持久化到硬盘上。

在spark中使用临时表是非常简单的,我们只需要把id列表的数据放入rdd中,然后再把rdd注册成一个张表,就可以和hive库里面已有的表做各种join操作了,一个demo代码如下:
````
import org.apache.spark.sql.SparkSession


object SparkSQLJoinDemo {

  def main(args: Array[String]): Unit = {

    val spark=SparkSession//
      .builder()
      .appName(" spark sql demo ")
      .enableHiveSupport().getOrCreate()

    import spark.implicits._
    import spark.sql

    sql(" use  hivedb ")//指定hive的db库

    val ids="1,2,3,4,5"//模拟的id列表

    val data=ids.split(",").toSeq//转化成Seq结构

    val school_table=spark.sparkContext.makeRDD(data).toDF("id")//指定列名

    school_table.createOrReplaceTempView("temp_table")//在spark的内存里面创建一张临时表

    //这里假设hive_table是存在hive里面的一张表
    val xr=sql("select * from hive_table where hive_id=id")

    println("数据量:"+xr.count()) //打印数据量

    spark.close()//close

    
  }

}

````
上面代码里的ids,就是我们需要转化成内存表的数据,然后需要转成Seq,并生成RDD,再通过RDD转成DataFrame,注意如果要使用DF,需要导入 import spark.implicits._包下面的函数,这样就能隐式的直接转成DF,在转成DF的同时,我们给数据指定了列名叫id,这里如果有多列,后面可以继续逗号分隔,添加多个列名,最终我们给它注册成了内存临时表,然后在下面的语句中就可以直接使用hive里面存在的表与内存表进行join,最终我们打印一下成功join后数量,可以验证下程序是否正常运行。

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

0
0
分享到:
评论

相关推荐

    Spark SQL常见4种数据源详解

    一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的...

    Spark SQL操作大全.zip

    Spark SQL是Apache Spark项目的一个核心组件,它提供了处理结构化数据的强大功能,使得在大数据分析领域中,Spark SQL成为了一种不可或缺的工具。本资料主要涵盖了Spark SQL的基础概念、核心特性、操作方法以及实战...

    《Spark SQL编程指南(v1.1.0)

    用户可以通过创建临时视图或者将DataFrame注册为一个表,然后使用SQL查询它们。这使得对SQL熟悉的开发人员能够快速上手Spark SQL。 **3. Catalyst优化器** Catalyst是Spark SQL中的查询优化器,它负责将SQL查询转化...

    Spark.sql数据库部分的内容

    Spark SQL是Spark的一个重要组件,专门用于处理结构化数据,它结合了SQL查询和DataFrame API,使得开发人员可以方便地进行数据查询和分析。在这个主题中,我们将深入探讨Spark SQL的核心概念、功能以及使用方法。 ...

    基于spark sql引擎的即席查询服务.zip

    通过创建一个临时视图或数据表,用户可以使用标准SQL语句执行查询,这对于数据分析师和业务人员来说非常友好。 为了构建即席查询服务,我们需要考虑以下几点: 1. **性能优化**:Spark SQL支持Code Generation和...

    Python3实战Spark大数据分析及调度-第8章 Spark SQL.zip

    1. **Spark SQL基础**:Spark SQL是Apache Spark的一个模块,它允许开发人员以SQL或DataFrame API的方式对结构化数据进行处理。DataFrame API提供了类似于SQL的接口,但适用于分布式计算环境,使得Python程序员能够...

    spark-sql入门

    此外,Spark SQL支持创建视图,这使得我们可以为复杂查询定义临时或永久的逻辑表。视图可以在后续的查询中作为表来使用,简化代码并提高可读性。 Spark SQL还具备交互式查询的能力,这得益于它的Shark和Hive的兼容...

    Spark SQL最佳实践

    **Spark SQL最佳实践** Spark SQL是Apache Spark框架的一部分,它提供了与传统SQL接口进行数据处理的能力,使得数据科学...通过深入理解和实践这些知识点,可以更好地在Java项目中利用Spark SQL来解决复杂的数据问题。

    spark sql 代码实现

    Spark SQL 是 Apache Spark 的一个模块,它允许开发者使用 SQL 查询数据或者通过 DataFrame 和 Dataset API 进行编程。在 Spark 1.3 版本中,Spark SQL 已经成为了一个核心组件,它提供了与传统 SQL 引擎类似的接口...

    Learning Spark SQL_source_code - Aurobindo Sarkar

    Spark SQL是Apache Spark的一个核心模块,它允许开发者使用SQL或者DataFrame API来处理数据。在2.2版本中,Spark SQL引入了重要的改进,如DataFrame/Dataset API的优化,使得数据处理更加高效且类型安全。DataFrame ...

    Spark SQL.xmind.zip

    Spark SQL是Spark的一个模块,它将SQL查询语言与DataFrame API结合,使得开发者可以使用熟悉的SQL语法进行大数据处理,同时享受Spark的高性能。 Spark SQL支持多种数据源,如HDFS、Cassandra、Hive等,可以读取和...

    Spark SQL 重点知识总结.docx

    - **SQL 风格**:需将 DataFrame 注册为临时表或全局临时表,然后通过 `sparkSession.sql()` 方法执行 SQL 查询。 2. **DataSet 查询**: - 定义一个 Case Class,然后将数据转换为 Dataset。 **四、DataFrame、...

    spark SQL应用解析

    - **DataFrame**:在Spark SQL中,DataFrame是一个表或关系的概念,它具有列名和列类型,可以看作是schemaRDD的升级版,提供了一种更高级别的抽象,使得数据处理更高效且易于理解。 - **DataSet**:DataFrame的类型...

    Intro to DataFrames and Spark SQL (training)

    Spark SQL还支持DataFrame和Hive表之间的互操作性,可以无缝地读写Hive metastore中的数据,这对于已经使用Hive的组织来说是一个巨大的优势。 在实际项目中,Spark SQL通常用于复杂的数据处理流水线,它能够处理多...

    Spark SQL 基础

    SparkSession是操作Spark SQL的入口,可以用来创建DataFrame,执行SQL查询,注册临时表,缓存表以及读取Parquet文件等。 接下来,创建DataFrame的方法有两种,一种是基于RDD推断Schema,另一种是使用已知的Schema。...

    spark sql介绍

    在Spark SQL 1.2版本中,一个重要的更新是引入了外部数据源API(External Data Source API)。这一API允许开发者定义新的输入源,极大地增强了Spark SQL处理不同格式数据的能力。以下是一些关键点: - **新特性**:...

    spark - 小实践(2)-- 模拟数据

    DataFrame是Spark SQL的一个核心概念,它提供了一种结构化的数据处理方式,可以看作是SQL表的分布式集合。在Scala或Python中,我们可以使用`spark.read.json()`函数来读取JSON文件: ```scala // Scala val df = ...

    成功编译后的 spark-2.1.0-bin-2.6.0-cdh5.7.0

    DataFrame API支持SQL查询,通过创建DataFrame并注册为临时表,用户可以直接使用SQL语句进行数据分析,这对于熟悉SQL的开发人员来说非常友好。 Spark SQL还与Hive兼容,可以读取和写入Hive表,这意味着你可以利用...

    Spark编程基础(Python版).rar

    7. **Spark SQL**:学习如何注册DataFrame为临时表,然后通过SQL语句进行查询,以及使用DataFrame API执行复杂的SQL操作。 8. **Spark Streaming**:如果包内涉及,会介绍如何使用PySpark进行实时数据流处理,创建...

Global site tag (gtag.js) - Google Analytics