目录
Spark开发指南
简介
总的来说,每一个Spark应用程序,都是由一个驱动程序组成,它运行用户的main函数,并且在一个集群上执行各种各样的并行操作。Spark提 供的主要的抽象(概念)是一个弹性分布式数据集,它是一个元素集合,划分到集群的不同节点上,可以被并行操作。RDDs的创建可以从Hadoop文件系统 (或者任何支持Hadoop的文件系统)上的一个文件开始,或者通过转换这个驱动程序中已存在的Scala集合而来。用户也可以使Spark持久化一个 RDD到内存中,使其能在并行操作中被有效的重用。最后,RDDs能自动从节点故障中恢复。
Spark中的第二个抽象(概念)是共享变量,他可以在并行操作中使用。默认情况下,Spark通过不同节点上的一系列任务来并行运行一个函数。他 将每一个函数中用的到变量的拷贝传递到每一个任务中。有时候,一个变量需要在不同的任务之间,或者任务和驱动程序之间共享。Spark支持两种类型的共享 变量:广播变量,可以再所有节点的内存中缓存一个值,累加器,一个只能做加法的变量,例如计数器和求和。
本指南通过每一种Spark支持的语言来展示Spark的每个特性。It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.
接入Spark
Java
Spark1.0.2工作在Java6或者java6以后之上。如果你在使用Java8,Spark支持lamdba表达式来简化函数编写,否则,你可以使用org.apache.spark.api.java.function 包下的类。
用Java编写Spark应用,你需要添加Spark的依赖,Spark可以通过Maven Central使用:
groupId=org.apache.spark
artifactId=spark-core_2.10
version=1.0.2
另外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本添加一个hadoop-client依赖。一些常用的HDFS版本标签显示在页面。
groupId=org.apache.hadoop
artifactId=hadoop-client
version=
最后,你需要在你的程序中导入一些Spark类,通过添加如下几行:
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.SparkConf
初始化Spark
Java
Spark程序需要做的第一件事就是创建一个JavaSparkContext对象 ,它将告诉Spark怎样访问一个集群。创建一个SparkContext,你首先必须创建SparkConf对象,它包含关于你的应用程序的信息。
SparkConf conf=new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc=new JavaSparkContext(conf);
appName参数是你的应用程序的名字,将会在集群的UI上显示。master是Spark、Mesos、或者YARN 集群URL,或者一个专用的字符串”local“使其在本地模式下运行。在实践中,当运行在一个集群上,你将不会想要把master硬编码到程序中,而是 通过使用spark-submit运行程序并且接受master。但是,在本地测试或者单元测试中,你可以传递”local“在进程内运行Spark。
弹性分布式数据集
Spark反复围绕的一个概念是弹性分布式数据集。它是一个有容错机制的元素集合,并且可以被并行操作。有两种创建RDDs的方法。并行化你的驱动 程序中已存在的集合,或者引用一个外部存储系统的数据集,例如一个共享文件系统,HDFS、HBase、或者任何可以提供一个Hadoop InputFormat的数据源。
并行集合
并行集合通过调用JavaSparkContext的parallelize方法,在你的驱动程序中已存在的Collection上创建。集合的元素将会拷贝组成一个可以被并行操作的分布式数据集。例如,下面是如何创建一个包含数字1到5的并行集合:
List data=Arrays.asList(1,2,3,4,5);
JavaRDD distData=sc.parallelize(data);
一旦创建,分布式数据集(distData)就可以并行操作。例如,我们可以调用 distData.reduce((a,b)->a+b)来将列表中的元素相加。我们稍后将会在分布式数据集的操作中描述。
注意:在这个指南中,我们经常使用简洁的Java8 lamdba语法来定义java functions,但是在老的Java版本中,你可以实现org.apache.spark.api.java.function包中的接口。我们将会 在下面详细描述passing functions to Spark。
并行集合的另一个重要的参数是数据集被切分成切片(slices)的数量。Spark将会为集群中的每一个slice运行一个task。通常情况 下,你要为集群中的每个CPU 2-4个slice。通常,Spark会尝试根据你的集群自动设置slice的数量。然而,你可以手动的设置它,把它作为第二个参数传递给 parallelize(例如:sc.parallelize(data,10)).
外部数据集
Spark可以通过任何Hadoop支持的存储源创建分布式数据集。包括你的本地文件系 统,HDFS,Cassandra,HBase,Amazon S3等等。Spark支持text files(文本文件),SequenceFiles(序列化文件),和任何其他的Hadoop InputFormat(输入格式)。
Text file 可以通过使用SparkContext的textFile方式创建。这个方法接受一个文件的URI(或者机器上的一个本地路径,或者hdfs://,s3n:// 等URI)并且把这个文件读成一个行的集合。下面是一个调用的例子:
JavaRDD distFile=sc.textFile(“data.txt”);
一旦创建,distFile可以被进行数据集操作。例如:我们可以通过使用map和reduce将所有数据行的长度相加.例如:distFile.map(s->s.length()).reduce((a,b)->(a+b)).
Spark读文件时的一些注意事项:
- 如果使用本地文件系统上的路径,
- Spark的所有基于文件的输入方法,包括textFile,支持运行目录,压缩文件盒通配符。例如,你可以食用textFile(“/my/directory/“),textFile(“/my/directory/.txt”),和textFile(“/my/directory/.gz”)
- textFile方法也可以接受一个可选的第二参数来控制这个文件的slice数目。默认情况下,Spark为每一个文件创建一个 slice(HDFS中block默认为64MB)。但是你可以通过传递一个较大的值来指定一个跟高的slice值。注意你的slice数不能小于 block数。
除了文本文件,Spark的Java API 也支持集中其他数据格式。
- JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
- 对于序列化文件(SequenceFiles),使用SparkContext的sequenceFile[K,V],K和V是文件中key和value的类型。他们必须是Hadoop的Writeable接口的子类,像IntWriteable和Text。
- 对于其他的Hadoop输入格式,你可以使用JavaSparkContext.hadoopRDD方法。它可以接受任意(类型)的 JobConf和输入格式类,key类和value类。按照像Hadoop Job一样,来设置输入源就可以了。你也可以为InputFormats使用JavaSparkContext.newHadoopRDD,基 于”new“MapReduce API(org.apache.hadoop.mapreduce).
- JavaRDD.saveAsObjectFile 和JavaContext.objectFile支持以一种由Java对象序列化组成的简单的格式保存RDD。虽然这不是有效地专门的格式向Avro,但是它提供了一个简单的方式存储RDD。
RDD操作
RDDs支持两种类型的操作:转换(transformations),它从一个现有的数据集创建一个新的数据集。动作(actions),它在数 据集上运行计算后,返回一个值给驱动程序。例如:map就是一个转换,它将数据集的每一个元素传递给一个函数,并且返回一个新的RDD表示结果。另一方 面,reduce是一个动作,他通过一些行数将一些RDD的所有元素聚合起来,并把最终的结果返回给驱动程序(不过还有一个并行的 reduceByKey,它返回一个分布式数据集)。
Spark中的所有转换都是惰性的,也就是说,他们不会立即计算出结果。相反,他们只是记住应用到这些基础数据集(例如file)上的转换。只有当 发生一个需要返回一个结果给驱动程序的动作时,这些转换才真正执行。这样的设计使得Spark运行更加高效——例如,我们可以实现,通过map创建一个数 据集,并在reduce中使用,最终只返回reduce的结果给驱动程序,而不是整个大的新数据集。
默认情况下,每一个转换过的RDD都会在你在它上面运行一个action时重新计算。然而,你也可以使用persist方法(或者cache)持久 化一个RDD到内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你访问这个RDD时,它将能够更快速访问,。在磁盘上持久化数据集,或 者在集群间复制数据集也是支持的。
基本操作
为了说明RDD基础,考虑下面的简单的程序:
JavaDDD lines=sc.textFile(“data.txtt”);
JavaRDD lineLengths=lines.map(s->s.length());
int totalLength=lineLengths.reduce((a,b)->a+b);
第一行通过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行动作。lines仅仅是这个文件的一个指针。第二行定义 了lineLengths作为map转换的结果。此外,lineLengths因为惰性没有立即计算。最后,我们运行reduce,他是一个 action。这时候,Spark将这个计算拆分成不同的task,并使其运行在独立的机器上,并且每台机器运行它自己的map部分和本地的 reducation,仅仅返回他的结果给驱动程序。
如果我们想在以后重复使用lineLengths,我们可以添加:
lineLengths.persist();
在reduce之前,这将导致lineLengths在第一次被计算之后被保存在内存中。
传递Functions到Spark
Spark的API,在很大程度上依赖于传递函数使其驱动程序在集群上运行。在Java中,函数有实现了org.apache.spark.api.java.function包中接口的类表示。有两种创建这样的函数的方式:
- 在你自己的类中实现Function接口,可以是匿名内部类,后者命名类,并且你要传递他的一个实例到Spark
- 在Java8中,使用lamdba表达式来简洁的定义一种实现
为了简洁起见,本指南中的大多数使用lamdba语法,它易于使用,所有的APIs in long-form,例如,我们可以编写上面的代码如下:
1
2
3
4
5
6
7
|
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(newFunction<String, Integer>() {
publicInteger call(String s) {returns.length(); }
});
inttotalLength = lineLengths.reduce(newFunction2<Integer, Integer, Integer>() {
publicInteger call(Integer a, Integer b) {returna + b; }
});
|
或者,如果编写内联函数显得很笨拙:
1
2
3
4
5
6
7
8
9
10
|
classGetLengthimplementsFunction<String, Integer> {
publicInteger call(String s) {returns.length(); }
}
classSumimplementsFunction2<Integer, Integer, Integer> {
publicInteger call(Integer a, Integer b) {returna + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(newGetLength());
inttotalLength = lineLengths.reduce(newSum());
|
Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages
Wroking with Key-Value Pairs使用键/值对工作
虽然大多数Spark操作工作在包含各种类型的对象的RDDs之上,一些特殊的操作仅仅能够使用包含key-value对的RDDs。最常见的操作之一是分布式”shuffle“操作,例如通过key分组或者聚合元素。
在Java中,key-value对使用scala标准包下的scala Tuple2类表示。你可以简单的调用new Tuple2(a,b)去创建一个tuuple,并且通过tuple.1()和tuple.2()访问它的字段。
key-value对的RDDs通过JavaPairRDD表示。你可以通过JavaRDDs构建JavaPairRDDs,使用指定的map操作 版本,像mapToPair和flatMapToPair。JavaPair将不仅拥有标准RDD函数,并且有特殊的key-value函数。
例如,下面的代码在key-value对上使用reduceByKey操作来计算在一个文件中每行文本出现的次数和。
JavaRDD lines=sc.textFile(“data.txt”);
JavaPairRDD pairs=lines.mapToPair(s->new Tuple2(s,1))
JavaPairRDD counts=pairs.reduceByKey((a,b)->a+b);
我们也可以使用counts.sortByKey(),例如,按照字母顺序排序这个键值对。并且最后调用counts.collect()作为一个对象数组返回给驱动程序。
注意:当使用自定义的对象作为key-value对操作的key时,你必须确保自定义equals()方法伴随着一个匹配的hashCode()方法。有关详情,参考 Object.hashCode() 文档大纲中列出的规定。
转换
下面的表格列出了Spark支持的常见的转换。更多信息可以参考RDD API 文档和pair RDD 函数文档。
动作
下面的表格列出了Spark支持的常见的动作。更多信息可以参考RDD API 文档和pair RDD 函数文档。
RDD持久化
Spark最重要的一个功能是在不同的操作间,持久化(或者缓存)一个数据集到内存中。当你持久化一个RDD时,每一个节点都把它计算的分片结果保 存在内存中,并且在对此数据集(或者衍生出的数据集)进行其他动作时重用。这将使后续的动作变得更快(通过快109倍以上)。缓存是(Spark)迭代算 法和快速交互使用的关键工具。
你可以使用persist()和cache()方法来标记一个将要持久化的RDD。第一次他被一个动作进行计算,他将会保留在这个节点的内存中。Spark的缓存有容错性-如果RDD的任何一个分区丢失了,他会通过使用最初创建的它转换操作,自动重新计算。
此外,每一个持久化RDD可以使用不同的存储级别存储。允许你,例如,持久化数据集到磁盘,持久化数据集到内存作为序列化的Java对象(节省空 间),跨节点复制,或者 store it off-heap in Tachyon。这些级别通过传递一个StorageLevel对象(Scala,Java,Python)到persist()来设置。cache() 方法是使用默认存储级别的快捷方法,即StorageLevel.MEMORY_ONLY(存储反序列化对象到内存),完整的存储级别设置为:
Spark也会在shuffle操作(例如,reduceByKey)中自动的持久化一些中间数据。甚至当用户未调用persist方法。这样做是 为了阻止在进行shuffle操作时由于一个节点故障而重新计算整个输入。我们依然推荐用户在作为结果的RDD上调用persist如果想打算重用它。
存储级别的选择
移除数据
Spark自动监视每一个节点上的缓存使用,并且使用LRU方式删除老的数据分区。如果你想手工的删除yige RDD而不是等他自动从缓存中清除,使用RDD.unpersist()方法。
相关推荐
### Spark开发指南知识点详解 #### 一、Spark简介与特性 **Spark** 是一个高度优化且功能丰富的集群计算框架,其核心优势在于基于内存的数据处理能力,这使得它相较于传统的 MapReduce 框架有着显著的性能提升。...
Spark程序在CDH6.3.2环境下开发并运行
安全认证接口 CQL开发指南 HBase开发指南 HDFS开发指南 Hive开发指南 Kafka开发指南 MapReduce开发指南 Oozie开发指南 Redis开发指南 Solr开发指南 Spark开发指南 Storm开发指南 YARN开发指南
7. **MLlib**:Spark的机器学习库MLlib提供了多种机器学习算法,包括分类、回归、聚类、协同过滤等,同时还有模型选择和评估工具,方便开发人员构建和优化机器学习模型。 8. **Spark Shuffle**:Shuffle是Spark中...
### Spark开发及本地环境搭建指南 #### 构建本机上的Spark开发环境 在构建Spark开发环境时,首先需要确保你的计算机上安装了必要的软件,包括但不限于Linux操作系统、Java Development Kit (JDK)、Scala、Maven...
Spark API和开发指南的知识点涵盖了如何使用Spark API进行视频平台的功能对接,包括API的基本使用条件、远程通信的协议和约定、接口的详细使用说明以及上传视频的具体操作流程。以下是对这些知识点的详细阐述: 1. ...
### Spark开发及本地环境搭建指南 #### 一、构建本机开发环境 为了高效地进行Spark的开发工作,首先需要构建一个稳定且高效的本地开发环境。以下是一些关键步骤: 1. **操作系统的选择**:推荐使用Linux操作系统...
- Spark性能优化指南将优化方案分为四个主要部分:开发调优、资源调优、数据倾斜调优和shuffle调优。 3. **开发调优**: - 开发调优关注在Spark作业开发过程中应遵循的一些基本原则。 - 开发调优的关键点包括RDD...
对于开发人员来说,Scala和Java API是Spark的主要编程接口。文档会涵盖如何创建和操作RDD,使用DataFrame和Dataset进行更高级的数据处理,以及如何使用Spark SQL进行SQL查询。DataFrame和Dataset提供了更丰富的类型...
本压缩包包含四本电子书,分别是《Spark SQL入门与实践指南》、《Hadoop权威指南(中文第3版)》、《Hadoop源代码分析(完整版)》以及《Spark快速大数据分析》,旨在帮助读者深入理解这两个平台的核心技术和应用。...
总的来说,Spark编程指南涵盖了Spark的核心组件和使用方法,包括RDD的创建和操作、Spark Streaming的使用,以及Spark SQL和GraphX的应用,为开发者提供了全面的Spark开发入门指导。通过学习和实践这些内容,可以深入...
**Spark SQL编程指南** Spark SQL是Apache Spark的一个重要模块,专为处理结构化数据而设计。它是Apache Spark的原生SQL接口,允许开发者使用SQL或DataFrame/Dataset API进行数据分析。在Spark SQL中,数据可以被...
Spark是一个开源的大数据处理框架,由加州大学伯克利分校的AMP实验室开发。它诞生于2009年,是基于内存计算的分布式数据处理平台,提供了一个快速、通用的计算引擎。Spark支持多种编程语言,如Java、Scala、Python和...
《Spark SQL大数据实例开发教程》是一本专注于Spark SQL学习的指南,由王家林和祝茂农等人编著。本书旨在帮助企业级开发人员深入理解和掌握Spark SQL,它在Spark生态系统中扮演着至关重要的角色,是处理大规模数据的...
Spark性能优化是一个重要的大数据处理话题。在处理大规模数据集时,性能优化直接关系到任务的执行效率,尤其在资源有限的情况下。本文将着重于介绍Spark中的高级性能优化技巧,特别是针对数据倾斜和shuffle过程的...
本文档提供了使用Kafka和Spark Streaming进行实时数据处理的详细开发指南,涵盖了Kafka集群搭建、Spark Streaming配置、Kafka和Spark Streaming的集成、主题创建和消息发送、查看主题状态等内容,旨在帮助开发者快速...
Spark粒子开发包是一款专为游戏开发和视觉效果设计的高性能粒子系统。Spark Particle Engine是一个强大的工具,用于创建复杂的、交互式的粒子效果,如火、烟雾、水、光束等,广泛应用于游戏、电影和电视制作。这个...