`
bit1129
  • 浏览: 1077318 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark】Spark二:Spark RDD初步

 
阅读更多

什么是RDD

Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是说,RDD是Spark框架的核心基石。RDD是一个可容错的数据集,这个数据集合中的数据是可以并行处理的。

 

RDD的特点:

  • A list of partitions 一系列的分片,比如说64M一片;类似于Hadoop中的split;
  • A function for computing each split   在每个分片上都有一个函数去迭代/执行/计算它
  • A list of dependencies on other RDDs  一系列的依赖:RDD a转换为RDD b,RDD b转换为RDD c,那么RDD c就依赖于RDD b,RDDb就依赖于RDDa
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)  对于key-value的RDD可指定一个partitioner,告诉它如何分片;常用的有hash,range
  • Optionally, a list of preferred location(s) to compute each split on (e.g. block locations for an HDFS file) 要运行的计算/执行最好在哪(几)个机器上运行。数据本地性。

前三个特点对应于Lineage,后两个对应于Optimized execution

对于如上的5个特点,对应于RDD中的5个方法
getPartitions the set of partitions in this RDD
compute compute a given partition
getDependencies return how this RDD depends on parent RDDs
partitioner specify how they are partitioned
getPreferredLocations specify placement preferences
 
 
 
 
 
  HadoopRDD Filtered RDD JoinedRDD
partitions HDFS上的block 与父RDD一致 一个partition一个任务
dependencies 与父RDD 一对一 依赖shuffle的每个父RDD
compute 读取每个block的信息 计算父RDD的每个分区并过滤 读取shuffle数据
partitioner HDFS block所在位置 HashPartitioner
preferredLocations 无(与父RDD一致)
 

参考:http://www.cnblogs.com/luogankun/p/3801035.html

 

Spark Shell

Spark Shell通过初始化一个SparkContext,然后通过Scala语言的脚本特性,可以以脚本的方式来学习Spark提供的API,通过这一点也可以看出Scala确实是有比Java方便简洁的特性。如下是Spark Shell的支持的命令参数



 

RDD(弹性分布式数据集,Resilent Distributed Dataset)

Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是说,RDD是Spark框架的核心基石。RDD是一个可容错的数据集,这个数据集合中的数据是可以并行处理的。有两种方式可以创建RDD:

1. 基于开发者提供的数据集来创建RDD

2. 基于引用外部存储系统中的数据集来创建RDD,比如共享文件系统,HDFS,HBase或者Hadoop InputFormat提供的任意数据源都可以用来创建RDD

 

并行数据集(程序数据)

Spark可以通过SparkContext的parallelize方法实现对Scala程序提供的数据集创建RDD。 程序中的元素通过数据拷贝的形式创建一个RDD)。比如,下面的例子创建了一个RDD,它包含了1到5的5个数字,例如下面的Scala代码:

 

 1. 创建ParallelCollectionRDD



 
2. 执行ParallelCollectionRDD的collect方法

 

 

 

3. 执行RDD的count方法

 



 
 

4. 执行RDD的saveAsTextFile("ParallelCollectionRDD")

 

rdd.saveAsTextFile("ParallelCollectionRDD");

 

执行结果是在HDFS的当前用户目录(/user/hadoop)下创建了一个ParallelCollectionRDD目录,并且有个part-00000作为存放文本的文件

 

[hadoop@hadoop bin]$ hdfs dfs -cat /user/hadoop/ParallelCollectionRDD/part-00000
1
2
3
4
5

 

 

 

外部数据集

1.执行如下操作,则Spark创建了一个RDD(sparkData),而这个RDD的数据来源是HDFS(hdfs://hadoop.master:9000/user/hadoop/sparkdata.txt),也就是说,Spark默认是从当前用户(hadoop)的/user/hadoop下寻找sparkdata.txt文件。

 

val sparkData = sc.textFile("sparkdata.txt");

 

注:sparkdata.txt的内容如下:

 

1 2
3 4
5 6
7 8
9 10

2. sparkData的类型

 创建的RDD sparkData的详细信息是如下,可见sparkData是一个MappedRDD类型

 

sparkData: org.apache.spark.rdd.RDD[String] = sparkdata.txt MappedRDD[4] at textFile at <console>:12

 

RDD基本操作

 

1.统计sparkData RDD中有多少行

 

sparkData.count()

 

 结果显示如下,得到的结果5,表示sparkData.txt有五行

 

 

2. 统计sparkData RDD中所有行的总长度

 

 

2.1 Map操作

 

var lineLengths = sparkData.map(line=>line.length)

 

执行的结果是: lineLengths是五个MappedDD

 

lineLengths: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at <console>:14

 

2.2 Reduce操作

 

var total = lineLengthMap.reduce((a,b)=>a + b);

 

执行的结果是:total是一个Int类型的数据。事实上,观察sparkdata.txt的数据,确实是所有行的总长度是16

 

total: Int = 16

 

3. Key/Pair RDD

 

3.1 执行如下操作

 

lines = sc.textFile("sparkdata.txt")

 

结果:

 

 

3.2 执行如下操作

 

val pairs = lines.map(s => (s, 1))

 

结果:

 

 

 

 

3.3 执行如下操作

 

val counts = pairs.reduceByKey((a, b) => a + b)

 

结果:



 

 

3.4 执行如下操作:

 

counts.collect()

 

 结果:
 

 

RDD Transform基本方法

 

 

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
 

RDD Action基本方法

  Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
 
 
 
  • 大小: 98.4 KB
  • 大小: 18.5 KB
  • 大小: 129.4 KB
  • 大小: 41.1 KB
  • 大小: 7 KB
  • 大小: 10.3 KB
  • 大小: 174.4 KB
  • 大小: 32.1 KB
  • 大小: 9.5 KB
  • 大小: 110.5 KB
  • 大小: 111.3 KB
分享到:
评论

相关推荐

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    spark RDD操作详解

    #### 二、RDD的创建 RDD可以通过多种方式创建: - **并行化集合**:可以直接从驱动程序上的集合创建RDD。 - **外部数据源**:从文件系统、数据库或其他外部数据源加载数据来创建RDD。 #### 三、RDD操作类型 RDD...

    Spark 编程指南简体中文版.pdf

    * Spark 初始化:如何初始化 Spark,包括 SparkContext 和 RDD 的介绍 * RDDs:Resilient Distributed Datasets,Spark 的核心数据结构 * Transformations 和 Actions:RDD 的两种基本操作 * Spark 的持久化和共享...

    spark安装包+spark实验安装软件

    Spark通过Stage划分任务,每个Stage由多个Task组成,任务之间通过RDD的血缘关系(lineage)进行容错。 **5. Spark SQL的使用** Spark SQL可以通过DataFrame API进行数据处理,它允许开发者使用SQL或DataFrame API...

    Spark The Definitive Guide-201712

    1. **Spark架构**:Spark的核心设计理念是弹性分布式数据集(Resilient Distributed Datasets, RDD),它是一个不可变的数据集合,可以在集群中进行并行操作。Spark 2.x引入了DataFrame和Dataset API,进一步提高了...

    spark RDD 论文 中文版

    ### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...

    Apache Spark:Spark项目实战:实时推荐系统.docx

    - 安装MLlib库:Spark的机器学习库,包含多种算法和工具,通常随Spark二进制包安装。 - 安装Kafka:分布式流处理平台,用于实时数据流处理,可从源码编译或使用包管理器安装。 #### 实时推荐系统实战开发 在完成...

    spark RDD 论文

    #### 二、RDD 的核心概念 - **定义**:RDD 是一种容错的分布式内存抽象,它允许开发者在大规模集群上执行内存中的计算任务,同时确保系统的容错能力。 - **特点**: - **容错性**:通过记录数据集的转换操作,可以...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    知识点二:RDD 编程 * map(func) 算子将原数据的每个元素传给函数 func 进行格式化,返回一个新的分布式数据集。 * mapPartitionsWithIndex(func) 算子将处理的数据以分区为单位发送到计算节点进行处理,这里的处理...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    2023-Spark-实验十三:Spark RDD 求员工工资总额

    求员工工资总额样例数据

    Apache Spark:Spark项目实战:大数据分析案例.docx

    Spark SQL支持结构化数据处理,允许用户将SQL查询与RDD操作无缝结合。Spark Streaming提供了流式数据处理的能力,用于处理实时数据流。MLlib是一个机器学习算法库,提供了易于使用的机器学习功能。GraphX则提供了...

    spark-rdd-APi

    标题:“Spark RDD API”说明了本文档将专注于Apache Spark中弹性分布式数据集(RDD)的API。RDD是Spark的核心概念,它是一个容错的、并行的数据结构,使得用户能够处理大数据集。本文档将基于Scala语言中的RDD实现...

    Apache Spark:Spark项目实战:机器学习模型部署.docx

    它的核心是弹性分布式数据集(RDD),通过它可以实现容错性的并行操作。Spark拥有易用的高级API,如Spark SQL用于处理结构化数据,MLlib用于机器学习,GraphX用于图形处理,以及流处理能力。Spark的机器学习库MLlib...

    Spark RDD以及其特性.rar_RDD_Spark!_parallelbwz_spark_特性

    Spark的弹性分布式数据集(Resilient Distributed Datasets, 简称RDD)是其核心的数据抽象,也是Spark实现高效并行计算的关键所在。在理解RDD及其特性之前,我们需要先了解Spark的基本工作原理。Spark是一种基于内存...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...

    Spark大数据处理 技术、应用与性能优化_PDF电子书下载 带书签目录 高清完整版 simple.pdf

    2. **Spark架构**:Spark的核心设计理念是弹性分布式数据集(Resilient Distributed Datasets, RDD),它是一个不可变、分区的数据集合,支持并行操作。此外,Spark的调度器(包括DAGScheduler和TaskScheduler)负责...

    spark rdd 论文翻译_中文_spark老汤

    二是它们可以通过一系列的转换操作(transformations)来生成,这些转换操作会记录下RDD的生成历史,这被称为lineage或血统信息。这种设计使得在节点故障时,RDD能够通过重新执行依赖的转换操作来恢复,从而实现容错...

    spark: RDD与DataFrame之间的相互转换方法

    在大数据处理框架Spark中,RDD(弹性分布式数据集)和DataFrame是两种常用的抽象数据类型,它们分别对应于不同的操作接口和优化技术。RDD是最基础的分布式数据集合,提供了一种容错的并行处理数据的方式。而...

    Spark统计电影评分数据:movies.dat,retings.dat,users.dat

    在IT领域,尤其是在大数据分析和处理中,Apache Spark是一个广泛使用的分布式计算框架。这个场景涉及到对电影评分数据的统计分析,我们主要会关注三个文件:`movies.dat`, `ratings.dat`, 和 `users.dat`,这些文件...

Global site tag (gtag.js) - Google Analytics