`
weitao1026
  • 浏览: 1048448 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据

阅读更多

前提Spark集群已经搭建完毕,如果不知道怎么搭建,请参考这个链接:
http://qindongliang.iteye.com/blog/2224797

注意提交作业,需要使用sbt打包成一个jar,然后在主任务里面添加jar包的路径远程提交即可,无须到远程集群上执行测试,本次测试使用的是Spark的Standalone方式

sbt依赖如下:


Java代码 复制代码 收藏代码
  1. name := "spark-hello"  
  2.   
  3. version := "1.0"  
  4.   
  5. scalaVersion := "2.11.7"  
  6. //使用公司的私服  
  7. resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"  
  8. //使用内部仓储  
  9. externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)  
  10. //Hadoop的依赖  
  11. libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1"  
  12. //Spark的依赖  
  13. libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.4.1"  
  14. //Spark SQL 依赖  
  15. libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.4.1"  
  16. //java servlet 依赖  
  17. libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1"  
  18.       
name := "spark-hello"

version := "1.0"

scalaVersion := "2.11.7"
//使用公司的私服
resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"
//使用内部仓储
externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)
//Hadoop的依赖
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1"
//Spark的依赖
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.4.1"
//Spark SQL 依赖
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.4.1"
//java servlet 依赖
libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1"
    


demo1:使用Scala读取HDFS的数据:

Java代码 复制代码 收藏代码
  1. /** * 
  2.    * Spark读取来自HDFS的数据 
  3.    */  
  4. ef readDataFromHDFS(): Unit ={  
  5.    //以standalone方式运行,提交到远程的spark集群上面  
  6.    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")  
  7.    conf.setJars(Seq(jarPaths));  
  8.    //得到一个Sprak上下文  
  9.    val sc = new SparkContext(conf)  
  10.    val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")  
  11.    //获取第一条数据  
  12.    //val data=textFile.first()  
  13.   // println(data)  
  14.    //遍历打印  
  15.      /** 
  16.       * collect() 方法 游标方式迭代收集每行数据 
  17.       * take(5)   取前topN条数据 
  18.       * foreach() 迭代打印 
  19.       * stop()    关闭链接 
  20.       */  
  21.   textFile.collect().take(5).foreach( line => println(line) )  
  22.    //关闭资源  
  23.    sc.stop()  
 /** *
    * Spark读取来自HDFS的数据
    */
def readDataFromHDFS(): Unit ={
    //以standalone方式运行,提交到远程的spark集群上面
    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")
    conf.setJars(Seq(jarPaths));
    //得到一个Sprak上下文
    val sc = new SparkContext(conf)
    val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")
    //获取第一条数据
    //val data=textFile.first()
   // println(data)
    //遍历打印
      /**
       * collect() 方法 游标方式迭代收集每行数据
       * take(5)   取前topN条数据
       * foreach() 迭代打印
       * stop()    关闭链接
       */
   textFile.collect().take(5).foreach( line => println(line) )
    //关闭资源
    sc.stop()
}


demo2:使用Scala 在客户端造数据,测试Spark Sql:

Java代码 复制代码 收藏代码
  1. def mappingLocalSQL1() {  
  2.    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("hdfs data count")  
  3.    conf.setJars(Seq(jarPaths));  
  4.    val sc = new SparkContext(conf)  
  5.    val sqlContext=new SQLContext(sc);  
  6.    //导入隐式sql的schema转换  
  7.    import sqlContext.implicits._  
  8.    val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()  
  9.    df.registerTempTable("records")  
  10.    println("Result of SELECT *:")  
  11.    sqlContext.sql("SELECT * FROM records").collect().foreach(println)  
  12.    //聚合查询  
  13.    val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)  
  14.    println(s"COUNT(*): $count")  
  15.    sc.stop()  
  16.  }  
 def mappingLocalSQL1() {
    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("hdfs data count")
    conf.setJars(Seq(jarPaths));
    val sc = new SparkContext(conf)
    val sqlContext=new SQLContext(sc);
    //导入隐式sql的schema转换
    import sqlContext.implicits._
    val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()
    df.registerTempTable("records")
    println("Result of SELECT *:")
    sqlContext.sql("SELECT * FROM records").collect().foreach(println)
    //聚合查询
    val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)
    println(s"COUNT(*): $count")
    sc.stop()
  }




Spark SQL 映射实体类的方式读取HDFS方式和字段,注意在Scala的Objcet最上面有个case 类定义,一定要放在
这里,不然会出问题:





demo2:使用Scala 远程读取HDFS文件,并映射成Spark表,以Spark Sql方式,读取top10:

Java代码 复制代码 收藏代码
  1.  val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"  
  2.   /**Spark SQL映射的到实体类的方式**/  
  3.   def mapSQL2(): Unit ={  
  4.     //使用一个类,参数都是可选类型,如果没有值,就默认为NULL  
  5.     //SparkConf指定master和任务名  
  6.     val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")  
  7.     //设置上传需要jar包  
  8.     conf.setJars(Seq(jarPaths));  
  9.     //获取Spark上下文  
  10.     val sc = new SparkContext(conf)  
  11.     //得到SQL上下文  
  12.     val sqlContext=new SQLContext(sc);  
  13.     //必须导入此行代码,才能隐式转换成表格  
  14.     import sqlContext.implicits._  
  15.     //读取一个hdfs上的文件,并根据某个分隔符split成数组  
  16.     //然后根据长度映射成对应字段值,并处理数组越界问题  
  17.     val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1"))  
  18.       .map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))  
  19.     else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)  
  20.     else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)  
  21.     else   Model( Some(p(0)),None,None,None )  
  22.       )).toDF()//转换成DF  
  23.     //注册临时表  
  24.     model.registerTempTable("monitor")  
  25.     //执行sql查询  
  26.     val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")  
  27. //    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")  
  28.       println("开始")  
  29.       it.collect().take(8).foreach(line => println(line))  
  30.       println("结束")  
  31.     sc.stop();  
  32.   }  
 val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"
  /**Spark SQL映射的到实体类的方式**/
  def mapSQL2(): Unit ={
    //使用一个类,参数都是可选类型,如果没有值,就默认为NULL
    //SparkConf指定master和任务名
    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")
    //设置上传需要jar包
    conf.setJars(Seq(jarPaths));
    //获取Spark上下文
    val sc = new SparkContext(conf)
    //得到SQL上下文
    val sqlContext=new SQLContext(sc);
    //必须导入此行代码,才能隐式转换成表格
    import sqlContext.implicits._
    //读取一个hdfs上的文件,并根据某个分隔符split成数组
    //然后根据长度映射成对应字段值,并处理数组越界问题
    val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1"))
      .map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))
    else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)
    else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)
    else   Model( Some(p(0)),None,None,None )
      )).toDF()//转换成DF
    //注册临时表
    model.registerTempTable("monitor")
    //执行sql查询
    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")
//    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")
      println("开始")
      it.collect().take(8).foreach(line => println(line))
      println("结束")
    sc.stop();
  }


在IDEA的控制台,可以输出如下结果:

 

分享到:
评论

相关推荐

    sparksql打包运行demo

    使用IDE如IntelliJ IDEA或Eclipse创建一个新的Scala项目。在`build.sbt`(如果你使用SBT)或`pom.xml`(如果你使用Maven)文件中,添加SparkSQL及其依赖。例如,在SBT项目中,你可能会看到如下配置: ```scala ...

    Spark开发指南

    在安装和配置方面,介绍了JDK、Scala的安装配置,以及如何使用sbt、maven创建项目,并指导如何在Eclipse和IntelliJ IDEA中开发Spark应用。这些工具和方法能够帮助开发者搭建起一个稳定高效的Spark开发环境。 Spark ...

    Windows下idea运行spark程序相关的hadoop2.7.3插件工具 hadoop.dll winutils.exe

    为了在IntelliJ IDEA(简称IDEA)中顺利运行Spark程序,我们需要确保正确地配置了环境变量。首先,需要将解压后的Hadoop目录添加到系统的PATH环境变量中,以便Java可以找到`hadoop.dll`和`winutils.exe`。然后,需要...

    flink-1.14.4 安装包 scala 2.12

    4. **编译和运行**: 使用 SBT 或 IntelliJ IDEA 等 IDE 编译项目,然后通过 Flink 的命令行客户端提交作业到集群: ``` bin/flink run -m localhost:8081 path/to/your/compiled.jar ``` **Flink 的核心概念** ...

    gis geospark 环境搭建全家桶

    你可以通过Spark的命令行接口或者集成开发环境(如IntelliJ IDEA或Eclipse)提交你的应用到Spark集群。 在实际应用中,可能还需要安装其他工具,如PostGIS(用于存储和查询地理空间数据的数据库扩展)以及Jupyter ...

    SparkLocalTest:基于IDEA的Win10本地开发测试Spark IDE工程

    在Spark的开发环境中,IDEA(IntelliJ IDEA)是一个常用且强大的集成开发环境,尤其适合Scala编程。本文将深入探讨如何在Windows 10操作系统上利用IDEA进行Spark的本地开发与测试,以及"SparkLocalTest"项目的核心...

    spark 集群环境开发部署(hadoop,yarn,zookeeper,alluxio,idea开发环境)

    通过以上步骤,您可以成功搭建一套包括Spark Standalone/YARN模式、Hadoop、Zookeeper、Alluxio以及IDEA Scala开发环境在内的完整集群环境。这不仅有助于理解分布式系统的架构原理,还能够满足实际开发需求。在整个...

    winutils.exe+hadoop.dll

    4. **修改Spark配置**:在IDEA中,打开你的Spark项目,找到`pom.xml`(如果是Maven项目)或`build.sbt`(如果是Scala sbt项目),确保有适当的Hadoop依赖。在Spark的配置中,添加或修改`spark.master`属性,根据你的...

    big-data-ex

    6. **Spark开发环境**:通常使用IntelliJ IDEA或ScalaIDE等集成开发环境,配合Spark的构建工具SBT,可以方便地创建、编译和运行Spark项目。 7. **大数据生态**:除了Spark,大数据生态系统还包括Hadoop(分布式文件...

    SpatialSparkIndexer

    6. **开发环境**:为了运行这个项目,你需要安装 Scala 和 Spark,并配置好 Hadoop(如果需要存储和读取 HDFS 上的数据)。此外,确保你的开发环境中包含了所有必要的依赖,这些依赖通常通过 sbt(Scala 的构建工具...

Global site tag (gtag.js) - Google Analytics