`
windyrails
  • 浏览: 3268 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
文章分类
社区版块
存档分类
最新评论

scala 开发spark程序

 
阅读更多

Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,可以阅读网络教程A Scala Tutorial for Java Programmers或者相关Scala书籍进行学习。

 

本文将介绍3个Scala Spark编程实例,分别是WordCount、TopK和SparkJoin,分别代表了Spark的三种典型应用。

1. WordCount编程实例

WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数,编写步骤如下:

步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:

1
2
val sc = new SparkContext(args(0), "WordCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是Hadoop中的TextInputFormat解析输入数据,举例如下:

1
val textFile = sc.textFile(args(1))

当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:

1
2
val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]]
var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])

或者直接创建一个HadoopRDD对象:

1
2
var hadoopRdd = new HadoopRDD(sc, conf,
     classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])

步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:

1
2
3
    val result = hadoopRdd.flatMap{
        case(key, value)  => value.toString().split("\\s+");
}.map(word => (word, 1)). reduceByKey (_ + _)

其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,这些函数的具体含义可参考:Spark Transformation

步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:

1
result.saveAsSequenceFile(args(2))

当然,一般我们写Spark程序时,需要包含以下两个头文件:

1
2
import org.apache.spark._
import SparkContext._

WordCount完整程序已在“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文中进行了介绍,在次不赘述。

需要注意的是,指定输入输出文件时,需要指定hdfs的URI,比如输入目录是hdfs://hadoop-test/tmp/input,输出目录是hdfs://hadoop-test/tmp/output,其中,“hdfs://hadoop-test”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体替换成你的配置即可。

2. TopK编程实例

TopK程序的任务是对一堆文本进行词频统计,并返回出现频率最高的K个词。如果采用MapReduce实现,则需要编写两个作业:WordCount和TopK,而使用Spark则只需一个作业,其中WordCount部分已由前面实现了,接下来顺着前面的实现,找到Top K个词。注意,本文的实现并不是最优的,有很大改进空间。

步骤1:首先需要对所有词按照词频排序,如下:

1
2
3
val sorted = result.map {
  case(key, value) => (value, key); //exchange key and value
}.sortByKey(true, 1)

步骤2:返回前K个:

1
val topK = sorted.top(args(3).toInt)

步骤3:将K各词打印出来:

1
topK.foreach(println)

注意,对于应用程序标准输出的内容,YARN将保存到Container的stdout日志中。在YARN中,每个Container存在三个日志文件,分别是stdout、stderr和syslog,前两个保存的是标准输出产生的内容,第三个保存的是log4j打印的日志,通常只有第三个日志中有内容。

本程序完整代码、编译好的jar包和运行脚本可以从这里下载。下载之后,按照“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文操作流程运行即可。

3. SparkJoin编程实例

在推荐领域有一个著名的开放测试集是movielens给的,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt,本节给出的SparkJoin实例则通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m。程序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.spark._
import SparkContext._
object SparkJoin {
  def main(args: Array[String]) {
    if (args.length != 4 ){
      println("usage is org.test.WordCount <master> <rating> <movie> <output>")
      return
    }
    val sc = new SparkContext(args(0), "WordCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
 
    // Read rating from HDFS file
    val textFile = sc.textFile(args(1))
 
    //extract (movieid, rating)
    val rating = textFile.map(line => {
        val fileds = line.split("::")
        (fileds(1).toInt, fileds(2).toDouble)
       })
 
    val movieScores = rating
       .groupByKey()
       .map(data => {
         val avg = data._2.sum / data._2.size
         (data._1, avg)
       })
 
     // Read movie from HDFS file
     val movies = sc.textFile(args(2))
     val movieskey = movies.map(line => {
       val fileds = line.split("::")
        (fileds(0).toInt, fileds(1))
     }).keyBy(tup => tup._1)
 
     // by join, we get <movie, averageRating, movieName>
     val result = movieScores
       .keyBy(tup => tup._1)
       .join(movieskey)
       .filter(f => f._2._1._2 > 4.0)
       .map(f => (f._1, f._2._1._2, f._2._2._2))
 
    result.saveAsTextFile(args(3))
  }
}

你可以从这里下载代码、编译好的jar包和运行脚本。

这个程序直接使用Spark编写有些麻烦,可以直接在Shark上编写HQL实现,Shark是基于Spark的类似Hive的交互式查询引擎,具体可参考:Shark

4. 总结

Spark 程序设计对Scala语言的要求不高,正如Hadoop程序设计对Java语言要求不高一样,只要掌握了最基本的语法就能编写程序,且常见的语法和表达方式是很少的。通常,刚开始仿照官方实例编写程序,包括Scala、Java和Python三种语言实例。

分享到:
评论

相关推荐

    scala开发spark代码

    总结来说,这个压缩包提供了Scala开发Spark应用程序的实践代码,涵盖了Spark Core的基本操作、Spark SQL的结构化查询、Spark Streaming的实时处理,以及Spark RDD的使用。通过深入研究这些代码,你可以掌握Spark的...

    上手提示:使用新版IDEA+Maven+Scala编写Spark程序

    ### 使用新版IDEA+Maven+Scala编写Spark程序的关键步骤 #### 一、理解IDEA在2017版中的变化 2017版的IntelliJ IDEA(简称IDEA)相较于之前版本,在界面设计与操作流程上进行了较大的改进与优化。对于初次接触这个...

    scala与spark基础

    Scala是一种多范式编程语言,以其强大的函数式编程特性而受到欢迎,尤其在大数据处理领域,它作为Apache Spark的主要编程语言,使得开发人员能够构建高性能、可扩展的数据处理应用程序。 Scala融合了面向对象和函数...

    sparkscala开发依赖包

    5. 编写和运行代码:使用Scala编写Spark程序,然后通过ECLIPSE的运行配置来启动Spark集群并提交作业。 总之,Spark Scala开发依赖包对于在ECLIPSE中进行大数据处理项目至关重要。正确配置这些依赖将使开发者能够在...

    spring boot + scala + spark http驱动spark计算

    标题中的“spring boot + scala + spark http驱动spark计算”揭示了一个使用现代技术栈构建的数据处理系统。这个系统基于Spring Boot框架来提供HTTP服务,利用Scala作为编程语言,并借助Apache Spark进行大数据计算...

    scala与spark文档合集

    7. **Spark Shell**:交互式的命令行界面,方便用户直接在集群上测试和开发Spark应用程序。 这些文档合集包含了Scala的基础教程、进阶内容和语言规范,以及Spark的开发基础和快速数据处理的实践指南,对于学习和...

    Scala Spark Bindings.pdf

    总体来说,Scala Spark Bindings文档不仅提供了对Mahout在Scala和Spark环境下操作的指导,也展示了如何利用Scala和Spark强大的计算和编程能力来处理复杂的线性代数问题,这对于在大数据环境下进行快速开发和数据处理...

    scala and spark for big data analytics

    Spark的高层次API使得开发复杂的数据分析应用变得简单。 - 复杂数据处理:它们提供了丰富的数据处理功能,能够处理包括结构化、半结构化和非结构化数据在内的多种类型数据。 - 社区支持:Scala和Spark拥有活跃的社区...

    java开发spark程序

    本篇文章将深入探讨如何使用Java开发Spark程序,并基于提供的"sparkJava"压缩包文件中的示例代码进行解析。 首先,我们需要理解Spark的核心概念。Spark主要由四个组件构成:Spark Core、Spark SQL、Spark Streaming...

    最新整理的大数据scala和spark视频教程

    同时,你会深入到Spark的世界,学习如何创建Spark应用程序,理解RDD(弹性分布式数据集)、DataFrame和DataSet的概念,以及如何在Spark SQL中进行查询操作。此外,你还将接触到Spark Streaming的基本用法,了解如何...

    Spark和Scala合集

    Spark和Scala是大数据处理领域中的重要工具,它们的结合为数据科学家和开发人员提供了高效、灵活的解决方案。本文将深入探讨这两个技术,并结合其他相关概念,如Python、Hadoop以及机器学习,帮助读者构建全面的...

    Linux_64bit_scala+spark.zip

    安装完成后,可以通过命令行界面启动Spark,进行数据处理或开发应用程序。例如,通过运行`bin/spark-shell`进入Scala的REPL(Read-Eval-Print Loop),或者用`bin/pyspark`启动Python交互式环境。 在大数据分析和...

    基于scala语言的spark操作,包含连接操作mysql,连接hdfs.zip

    在 Spark 中,Scala API 提供了与 Spark 库紧密集成的接口,使得开发者能够编写高性能的大数据处理程序。 **2. Apache Spark 概述** Spark 是一个用于大规模数据处理的开源集群计算框架,提供了内存计算以加速数据...

    java+hadopp+scala+spark配置win10版

    6. **运行和测试**:配置完成后,你可以通过编写简单的Java或Scala程序,利用Hadoop和Spark处理本地文件或模拟集群进行测试。例如,使用WordCount示例来验证配置是否正确。 这个压缩包中的Word文档可能包含了配置...

    Big Data Frameworks Scala and Spark Tutorial

    ### Big Data Frameworks: Scala...Scala的简洁语法和丰富的功能集合使其成为编写Spark应用程序的理想选择。通过学习这些工具和技术,开发者能够更有效地处理和分析大规模数据集,从而在数据科学和大数据领域取得成功。

    idea开发spark程序的环境搭建.docx

    idea开发spark程序的环境搭建 在本篇文章中,我们将详细介绍如何使用 IDEA 开发 Spark 程序的环境搭建。我们将使用 IDEA、Maven 和 Scala 三者结合,来实现 Spark 程序的开发环境搭建。 首先,需要安装 IDEA。这里...

    scala&spark;-v4.0

    对于初学者来说,安装Scala开发环境是一个重要的步骤。下面介绍一种简单的安装方式: 1. **下载与安装ScalaShell**:ScalaShell是一个交互式的开发工具,可以帮助开发者快速测试Scala代码。可以从官网下载适合...

    Scala and Spark for Big Data Analytics (True PDF)

    Scala的这些特性让它成为开发Spark应用程序的首选语言。 2. **Apache Spark核心概念**:Spark是一个分布式数据处理框架,它的核心是一个高度抽象的API,允许开发者在内存中处理数据,以达到比传统大数据处理框架更...

Global site tag (gtag.js) - Google Analytics