`

Spark parallelize函数和makeRDD函数的区别(Array-->RDD)

 
阅读更多
我们知道,在Spark中创建RDD的创建方式大概可以分为三种:
(1)、从集合中创建RDD;
(2)、从外部存储创建RDD;
(3)、从其他RDD创建。

而从集合中创建RDD,Spark主要提供了两中函数:parallelize和makeRDD。我们可以先看看这两个函数的声明:
def parallelize[T:ClassTag](  
      seq:Seq[T],  
      numSlices:Int =defaultParallelism):RDD[T]  
   
def makeRDD[T:ClassTag](  
      seq:Seq[T],  
      numSlices:Int =defaultParallelism):RDD[T]  
   
def makeRDD[T:ClassTag](seq:Seq[(T, Seq[String])]):RDD[T]  


我们可以从上面看出makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致。其实第一种makeRDD函数实现是依赖了parallelize函数的实现,来看看Spark中是怎么实现这个makeRDD函数的:

def makeRDD[T:ClassTag](  
    seq:Seq[T],  
    numSlices:Int = defaultParallelism):RDD[T] =withScope {  
  parallelize(seq, numSlices)  
}  


我们可以看出,这个makeRDD函数完全和parallelize函数一致。但是我们得看看第二种makeRDD函数函数实现了,它接收的参数类型是Seq[(T, Seq[String])],Spark文档的说明是

Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.


原来,这个函数还为数据提供了位置信息,来看看我们怎么使用:

scala>val iteblog1= sc.parallelize(List(1,2,3))  
iteblog1:org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[10] at parallelize at <console>:21  
   
scala>val iteblog2= sc.makeRDD(List(1,2,3))  
iteblog2:org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[11] at makeRDD at <console>:21  
   
scala>val seq =List((1, List("iteblog.com","sparkhost1.com","sparkhost2.com")),  
     | (2, List("iteblog.com","sparkhost2.com")))  
seq:List[(Int, List[String])] =List((1,List(iteblog.com, sparkhost1.com, sparkhost2.com)),  
 (2,List(iteblog.com, sparkhost2.com)))  
   
scala>val iteblog3= sc.makeRDD(seq)  
iteblog3:org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[12] at makeRDD at <console>:23  
   
scala> iteblog3.preferredLocations(iteblog3.partitions(1))  
res26:Seq[String] =List(iteblog.com, sparkhost2.com)  
   
scala> iteblog3.preferredLocations(iteblog3.partitions(0))  
res27:Seq[String] =List(iteblog.com, sparkhost1.com, sparkhost2.com)  
   
scala> iteblog1.preferredLocations(iteblog1.partitions(0))  
res28:Seq[String] =List()  


我们可以看到,makeRDD函数有两种实现,第一种实现其实完全和parallelize一致;而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:

def parallelize[T:ClassTag](  
    seq:Seq[T],  
    numSlices:Int =defaultParallelism):RDD[T] =withScope {  
  assertNotStopped()  
  newParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())  
}  
   
def makeRDD[T:ClassTag](seq:Seq[(T, Seq[String])]):RDD[T] =withScope {  
  assertNotStopped()  
  valindexToPrefs =seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap  
  newParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)  
}  


都是返回ParallelCollectionRDD,而且这个makeRDD的实现不可以自己指定分区的数量,而是固定为seq参数的size大小。

分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    sparkrdd的讲解

    ### Spark RDD详解 #### Spark计算模型与RDD概念 在探讨Spark的弹性分布式数据集(RDD)之前,我们首先需要理解Spark的基本计算模型。Spark是一种基于内存的分布式计算框架,其核心设计思想在于通过缓存中间结果来...

    Spark RDD 资料

    val rdd = sc.parallelize(array) ``` ##### 2.3 其他创建方式 除了上述两种常见方式外,还可以通过读取数据库等方式创建 RDD,甚至通过其他 RDD 转换而来。例如,从 MySQL 数据库中读取数据,可以先将数据导出为...

    Spark RDD Instrocution

    val sparkRDD = inputRDD.filter(line =&gt; line.contains("Spark")) println("There are " + sparkRDD.count() + " contains Spark lines") println("Here are 3 examples:") sparkRDD.take(3).foreach(println) ``` ...

    Spark-Transformation和Action算子.md

    ### Spark Transformation和Action算子详解 #### 一、Transformation **Transformation** 在 Spark 中是指对 RDD(弹性分布式数据集)进行的各种转换操作。这些操作并不会立即执行,而是延迟执行,直到遇到 Action...

    spark演示文档

    1. **创建RDD**:通过`sc.makeRDD`或`sc.parallelize`创建RDD。例如: ```scala val rdd = sc.makeRDD(Array(1, 2, 3)) val rddParallelized = sc.parallelize(Array(1, 2, 3)) ``` 2. **查看RDD的分区**:可以...

    Spark API Master

    val rdd = sc.parallelize(Array(("a", 1), ("a", 2), ("b", 3))) val result = rdd.combineByKey( (x: Int) =&gt; (x, 1), (acc: (Int, Int), x: Int) =&gt; (acc._1 + x, acc._2 + 1), (acc1: (Int, Int), acc2: (Int...

    Scala-hadoop-spark-新教程含金量最高的大数据教程

    特别是对于初学者来说,掌握了如何使用Scala和Java创建SparkContext和RDD对象,以及如何在Eclipse环境下进行Spark程序的开发和调试是非常重要的。此外,还介绍了如何使用Maven进行项目管理和编译,以及如何手动编译...

    Spark MLlib机器学习01.pdf

    - 示例代码:`val rdd1 = sc.parallelize(1 to 9, 3) val rdd2 = rdd1.map(x =&gt; x*2)` 以上内容概述了 Spark MLlib 的基本概念、主要功能及其在实际项目中的应用。通过学习这些内容,可以帮助读者更好地理解和掌握...

    spark-python-knn:Apache Spark中用于计算K-NN的函数

    Spark Python K-nn 简单但具有存储效率的函数,用于计算K个最近的邻居。需要安装Numpy,scikit-learn。...sc.parallelize([ (1, np.array([0,0,1,1])), (2, np.array([0,1,1,1])), (3, np.array([0,0,1,1])), (4, np.ar

    Spark操作之aggregate、aggregateByKey详解

    Spark中的aggregate函数和aggregateByKey函数是两个重要的聚合操作,它们可以对RDD中的元素进行聚合操作,生成新的RDD或值。本文将详细介绍aggregate函数和aggregateByKey函数的原理、使用方法和实例代码。 一、...

    python基础编程例子之PySpark.doc

    如,`rdd = sc.parallelize(array)`将创建一个包含数组元素的RDD。 3. **RDD操作:Transformations和Actions** - **Filter**:过滤操作,如 `linesWithSpark = lines.filter(lambda line: "Spark" in line)`,...

    graphX 基本介绍

    GraphX是Apache Spark的一个扩展库,专门为图数据处理和并行计算设计。它提供了一种高效的方式来存储、操作大规模图结构,并支持各种复杂的图算法。GraphX的核心概念是`Graph`,它由顶点集合(Vertices)和边集合...

    secondSort

    在Spark中,可以通过自定义比较器或者映射函数来实现这一过程。 #### 4. 合并结果 最后一步是对各个分区的结果进行合并。由于我们已经在每个分区内部实现了排序,因此只需简单地将各个分区的结果连接起来即可得到...

Global site tag (gtag.js) - Google Analytics