`

Spark Programming Guide

 
阅读更多

 

RDDs原理、创建、操作

RDDs(Resilient Distributed Datasets),有两种方式可以创建RDDs:

序列化一个存在的集合

 

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

 或者指向外部存储系统的数据集,如HDFS, HBase, or any data source offering a Hadoop InputFormat

 

 

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08

ext file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:

 

Some notes on reading files with Spark:

  • If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

  • All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory")textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

  • The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

RDDs支持两种类型的操作:transformations和actions

transformations:从已有的数据集创建新的数据集;

actions:对数据集计算处理并向驱动程序返回结果值;

所有transformations操作都是懒计算的,并不会立即返回计算结果,只有当actions需要返回结果给应用程序的时候,transformations才会被计算。

默认情况下,每次执行actions都会重新计算transtormations.=。当然,你可以调用persist (or cache)方法来保存这些元素的值,以便你下次能够快速的查询到。

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the booleanascending argument.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR)

and pair RDD functions doc (ScalaJava) for details.

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement,num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating anAccumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

 

spark本地模式与集群模式

本地模式就是在单一的JVM运行,RDD和变量值都在相同的一个节点的相同内存空间中。

集群模式:运行job的时候,RDD被分解成了tasks,每一个task由executor执行。如下图所示:


 

 

Shuffle operations

集合所有的partitions并计算每一个key的结果的过程称为shuffle。

每一个partition经过shuffle后的元素集合都是确定的,并且partions之间也会进行排序,但是这些元素是没有经过排序的。如果你你想对shuffle中的数据进行排序,可以使用以下的方法:

 

  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD

下面的操作都是会产生shuffle的:包括repartition operations like repartition and coalesce‘ByKey operations (except for counting) likegroupByKey and reduceByKey, and join operations like cogroup and join.

 

Performance Impact

 

RDD Persistence

 You can mark an RDD to be persisted using the persist() or cache() methods on it. 

 In addition, each persisted RDD can be stored using a different storage level, allowing you.

 

textRDD.persist(StorageLevel.MEMORY_AND_DISK)

 

 The full set of storage levels is:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Which Storage Level to Choose?

Removing Data

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

 

Shared Variables

spark提供了两种共享变量的模式:broadcast variables and accumulators.

Broadcast Variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
 Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. 
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

 

Deploying to a Cluster

Launching Spark jobs from Java / Scala

Unit Testing

Migrating from pre-1.0 Versions of Spark

Where to Go from Here

 

官网链接:

Spark Programming Guide;

 

  • 大小: 363 KB
分享到:
评论

相关推荐

    Spark Programming Guide-java - Spark 1.6.2.

    Apache Spark 1.6.2的编程指南针对Java语言,旨在介绍如何使用Java进行Spark编程。Spark是一个开源的分布式计算系统,它提供了一个高层次的API,使数据工作者能够轻松进行大规模数据处理。 文档开头提到了Spark编程...

    Spark Programming Guide - Spark 2.0(Java)

    Spark编程指南是一份由Apache Spark官方提供的文档,旨在指导开发者使用Java语言进行Spark编程。Apache Spark是一个强大的分布式数据处理框架,专门设计用于大规模数据处理。该指南涵盖了从基础概念到高级技术的广泛...

    Spark Streaming Programming Guide 笔记

    ### Spark Streaming Programming Guide 笔记 #### 概览 **Spark Streaming** 是 Apache Spark 的一个模块,它能够处理实时的数据流。与传统的批量处理不同,Spark Streaming 接收实时输入数据流,并将其划分为一...

    spark-programming-guide-zh-tw, Spark 編程指南繁體中文版.zip

    spark-programming-guide-zh-tw, Spark 編程指南繁體中文版

    spark-programming-guide(Spark 编程指南)-高清文字版

    ### Spark 编程指南知识点概览 #### 一、引言 - **快速上手**:本章节提供了关于如何开始使用Spark的快速介绍。通过交互式shell(支持Python和Scala),用户可以熟悉Spark的API。此外,还介绍了如何在Java、Scala和...

    Apache Spark 2 for Beginners [2016]

    A practical guide aimed at beginners to get them up and running with Spark Book Description Spark is one of the most widely-used large-scale data processing engines and runs extremely fast. It is a ...

    Spark The Definitive Guide epub

    Spark supports multiple widely used programming languages (Python, Java, Scala, and R), includes libraries for diverse tasks ranging from SQL to streaming and machine learning, and runs anywhere from...

    Spark_The Definitive Guide-O'Reilly(2018).epub

    Welcome to this first edition of Spark: The Definitive Guide! We are excited to bring you the most complete resource on Apache Spark today, focusing especially on the new generation of Spark APIs ...

    Apache Beam 2019 Programming Guide

    在《Apache Beam 2019 Programming Guide》中,你将深入了解到如何使用Java API构建、运行和优化 Beam Pipelines,从而高效地处理各种数据处理任务。这份指南对于想要掌握Apache Beam的Java开发者来说是一份宝贵的...

    Spark-The Definitive Guide Big Data Processing Made Simple

    Spark-The Definitive Guide Big Data Processing Made Simple 完美true pdf。 Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters. As of ...

    Programming.APIs.With.The.Spark.Web.Framework.B017OLT37I

    Throughout the course of this guide, we introduce the benefits of using the Spark web framework, demonstrate how it works with Java, and compare language behavior with other languages like Kotlin, Go...

    Mastering Spark for Data Science

    This is an advanced guide for those with beginner-level familiarity with the Spark architecture and working with Data Science applications. Mastering Spark for Data Science is a practical tutorial ...

    用于SparkStreaming的数据挖掘streamDM.zip

    StreamDM Programming Guide 展示了 StreamDM 的细节。完整的 API 文档,可以参考这里:http://huawei-noah.github.io/streamDM/api/index.html。部分内容转载自机器之心 标签:streamDM

    Big Data Analytics with Spark 无水印pdf 0分

    Big Data Analytics with Spark is a step-by-step guide for learning Spark, which is an open-source fast and general-purpose cluster computing framework for large-scale data analysis. You will learn how...

    spark性能调优

    1.spark-config-and-tuning 2.spark-graphx-source-analysis 3.spark-ml-source-analysis 4.spark-programming-guide-zh-cn

    Big Data Analytics with Spark(Apress,2016)

    Big Data Analytics with Spark is a step-by-step guide for learning Spark, which is an open-source fast and general-purpose cluster computing framework for large-scale data analysis. You will learn how...

    Learning Apache Spark 2

    Exclusive guide that covers how to get up and running with fast data processing using Apache Spark Explore and exploit various possibilities with Apache Spark using real-world use cases in this book ...

Global site tag (gtag.js) - Google Analytics