- 浏览: 599574 次
- 性别:
- 来自: 厦门
文章分类
- 全部博客 (669)
- oracle (36)
- java (98)
- spring (48)
- UML (2)
- hibernate (10)
- tomcat (7)
- 高性能 (11)
- mysql (25)
- sql (19)
- web (42)
- 数据库设计 (4)
- Nio (6)
- Netty (8)
- Excel (3)
- File (4)
- AOP (1)
- Jetty (1)
- Log4J (4)
- 链表 (1)
- Spring Junit4 (3)
- Autowired Resource (0)
- Jackson (1)
- Javascript (58)
- Spring Cache (2)
- Spring - CXF (2)
- Spring Inject (2)
- 汉字拼音 (3)
- 代理模式 (3)
- Spring事务 (4)
- ActiveMQ (6)
- XML (3)
- Cglib (2)
- Activiti (15)
- 附件问题 (1)
- javaMail (1)
- Thread (19)
- 算法 (6)
- 正则表达式 (3)
- 国际化 (2)
- Json (3)
- EJB (3)
- Struts2 (1)
- Maven (7)
- Mybatis (7)
- Redis (8)
- DWR (1)
- Lucene (2)
- Linux (73)
- 杂谈 (2)
- CSS (13)
- Linux服务篇 (3)
- Kettle (9)
- android (81)
- protocol (2)
- EasyUI (6)
- nginx (2)
- zookeeper (6)
- Hadoop (41)
- cache (7)
- shiro (3)
- HBase (12)
- Hive (8)
- Spark (15)
- Scala (16)
- YARN (3)
- Kafka (5)
- Sqoop (2)
- Pig (3)
- Vue (6)
- sprint boot (19)
- dubbo (2)
- mongodb (2)
最新评论
我们知道,在Spark中创建RDD的创建方式大概可以分为三种:
(1)、从集合中创建RDD;
(2)、从外部存储创建RDD;
(3)、从其他RDD创建。
而从集合中创建RDD,Spark主要提供了两中函数:parallelize和makeRDD。我们可以先看看这两个函数的声明:
我们可以从上面看出makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致。其实第一种makeRDD函数实现是依赖了parallelize函数的实现,来看看Spark中是怎么实现这个makeRDD函数的:
我们可以看出,这个makeRDD函数完全和parallelize函数一致。但是我们得看看第二种makeRDD函数函数实现了,它接收的参数类型是Seq[(T, Seq[String])],Spark文档的说明是
原来,这个函数还为数据提供了位置信息,来看看我们怎么使用:
我们可以看到,makeRDD函数有两种实现,第一种实现其实完全和parallelize一致;而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:
都是返回ParallelCollectionRDD,而且这个makeRDD的实现不可以自己指定分区的数量,而是固定为seq参数的size大小。
(1)、从集合中创建RDD;
(2)、从外部存储创建RDD;
(3)、从其他RDD创建。
而从集合中创建RDD,Spark主要提供了两中函数:parallelize和makeRDD。我们可以先看看这两个函数的声明:
def parallelize[T:ClassTag]( seq:Seq[T], numSlices:Int =defaultParallelism):RDD[T] def makeRDD[T:ClassTag]( seq:Seq[T], numSlices:Int =defaultParallelism):RDD[T] def makeRDD[T:ClassTag](seq:Seq[(T, Seq[String])]):RDD[T]
我们可以从上面看出makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致。其实第一种makeRDD函数实现是依赖了parallelize函数的实现,来看看Spark中是怎么实现这个makeRDD函数的:
def makeRDD[T:ClassTag]( seq:Seq[T], numSlices:Int = defaultParallelism):RDD[T] =withScope { parallelize(seq, numSlices) }
我们可以看出,这个makeRDD函数完全和parallelize函数一致。但是我们得看看第二种makeRDD函数函数实现了,它接收的参数类型是Seq[(T, Seq[String])],Spark文档的说明是
Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.
原来,这个函数还为数据提供了位置信息,来看看我们怎么使用:
scala>val iteblog1= sc.parallelize(List(1,2,3)) iteblog1:org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[10] at parallelize at <console>:21 scala>val iteblog2= sc.makeRDD(List(1,2,3)) iteblog2:org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[11] at makeRDD at <console>:21 scala>val seq =List((1, List("iteblog.com","sparkhost1.com","sparkhost2.com")), | (2, List("iteblog.com","sparkhost2.com"))) seq:List[(Int, List[String])] =List((1,List(iteblog.com, sparkhost1.com, sparkhost2.com)), (2,List(iteblog.com, sparkhost2.com))) scala>val iteblog3= sc.makeRDD(seq) iteblog3:org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[12] at makeRDD at <console>:23 scala> iteblog3.preferredLocations(iteblog3.partitions(1)) res26:Seq[String] =List(iteblog.com, sparkhost2.com) scala> iteblog3.preferredLocations(iteblog3.partitions(0)) res27:Seq[String] =List(iteblog.com, sparkhost1.com, sparkhost2.com) scala> iteblog1.preferredLocations(iteblog1.partitions(0)) res28:Seq[String] =List()
我们可以看到,makeRDD函数有两种实现,第一种实现其实完全和parallelize一致;而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:
def parallelize[T:ClassTag]( seq:Seq[T], numSlices:Int =defaultParallelism):RDD[T] =withScope { assertNotStopped() newParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } def makeRDD[T:ClassTag](seq:Seq[(T, Seq[String])]):RDD[T] =withScope { assertNotStopped() valindexToPrefs =seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap newParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) }
都是返回ParallelCollectionRDD,而且这个makeRDD的实现不可以自己指定分区的数量,而是固定为seq参数的size大小。
发表评论
文章已被作者锁定,不允许评论。
-
Spark 会把数据都载入到内存吗
2017-06-01 10:14 819前言 很多初学者其实对Spark的编程模式还是RDD这个概念理 ... -
Spark Driver和Executor资源调度学习
2017-05-31 16:14 979一、引子 在Worker Actor中,每次LaunchE ... -
Spark 实现TopN的问题(groupBy)
2017-05-31 14:11 1404t2.txt ab 11 ab 23 ab 13 a ... -
Spark block和partition的区别
2017-05-31 13:48 978hdfs中的block是分布式存储的最小单元,类似于盛放文件的 ... -
Spark 什么是DAG(有向无环图)(窄依赖和宽依赖)
2017-05-26 16:46 2155在Spark里每一个操作生成一个RDD,RDD之间连一条边,最 ... -
Spark 为什么比Hadoop快
2017-05-25 16:12 1337Spark SQL比Hadoop Hive快, ... -
Spark 集群的搭建(1.6.3)
2017-05-24 10:41 8参考内容:http://www.cnblogs.com/one ... -
Spark shuffle实现详细探究学习
2017-04-28 15:08 573Background 在MapReduce框架中,shuffl ... -
Spark collect和take函数学习(RDD-->Array)
2017-04-27 15:44 2109将RDD转成Scala数组,并返回。 函数原型 def ... -
Spark MLlib平台的协同过滤算法---电影推荐系统学习
2017-04-27 15:33 627import org.apache.log4j.{Level, ... -
Spark Streaming实时计算学习
2017-04-27 10:31 943随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处 ... -
Spark 集群的搭建学习(1.6.3)
2017-04-25 14:30 788Spark是一个快速、通用的计算集群框架,它的内核使用Scal ... -
Spark SQL简单示例学习
2017-04-25 14:17 763Spark SQL 作为Apache Spark大数据框架的一 ... -
Spark RDD基于内存的集群计算容错抽象(核心概念)
2017-04-11 20:09 675摘要 本文提出了分布 ... -
Spark 入门知识学习
2017-04-08 11:46 406什么是Spark Apache Spark是 ...
相关推荐
### Spark RDD详解 #### Spark计算模型与RDD概念 在探讨Spark的弹性分布式数据集(RDD)之前,我们首先需要理解Spark的基本计算模型。Spark是一种基于内存的分布式计算框架,其核心设计思想在于通过缓存中间结果来...
val sparkRDD = inputRDD.filter(line => line.contains("Spark")) println("There are " + sparkRDD.count() + " contains Spark lines") println("Here are 3 examples:") sparkRDD.take(3).foreach(println) ``` ...
### Spark Transformation和Action算子详解 #### 一、Transformation **Transformation** 在 Spark 中是指对 RDD(弹性分布式数据集)进行的各种转换操作。这些操作并不会立即执行,而是延迟执行,直到遇到 Action...
1. **创建RDD**:通过`sc.makeRDD`或`sc.parallelize`创建RDD。例如: ```scala val rdd = sc.makeRDD(Array(1, 2, 3)) val rddParallelized = sc.parallelize(Array(1, 2, 3)) ``` 2. **查看RDD的分区**:可以...
val rdd = sc.parallelize(Array(("a", 1), ("a", 2), ("b", 3))) val result = rdd.combineByKey( (x: Int) => (x, 1), (acc: (Int, Int), x: Int) => (acc._1 + x, acc._2 + 1), (acc1: (Int, Int), acc2: (Int...
特别是对于初学者来说,掌握了如何使用Scala和Java创建SparkContext和RDD对象,以及如何在Eclipse环境下进行Spark程序的开发和调试是非常重要的。此外,还介绍了如何使用Maven进行项目管理和编译,以及如何手动编译...
- 示例代码:`val rdd1 = sc.parallelize(1 to 9, 3) val rdd2 = rdd1.map(x => x*2)` 以上内容概述了 Spark MLlib 的基本概念、主要功能及其在实际项目中的应用。通过学习这些内容,可以帮助读者更好地理解和掌握...
Spark Python K-nn 简单但具有存储效率的函数,用于计算K个最近的邻居。需要安装Numpy,scikit-learn。...sc.parallelize([ (1, np.array([0,0,1,1])), (2, np.array([0,1,1,1])), (3, np.array([0,0,1,1])), (4, np.ar
Spark中的aggregate函数和aggregateByKey函数是两个重要的聚合操作,它们可以对RDD中的元素进行聚合操作,生成新的RDD或值。本文将详细介绍aggregate函数和aggregateByKey函数的原理、使用方法和实例代码。 一、...
如,`rdd = sc.parallelize(array)`将创建一个包含数组元素的RDD。 3. **RDD操作:Transformations和Actions** - **Filter**:过滤操作,如 `linesWithSpark = lines.filter(lambda line: "Spark" in line)`,...
GraphX是Apache Spark的一个扩展库,专门为图数据处理和并行计算设计。它提供了一种高效的方式来存储、操作大规模图结构,并支持各种复杂的图算法。GraphX的核心概念是`Graph`,它由顶点集合(Vertices)和边集合...
在Spark中,可以通过自定义比较器或者映射函数来实现这一过程。 #### 4. 合并结果 最后一步是对各个分区的结果进行合并。由于我们已经在每个分区内部实现了排序,因此只需简单地将各个分区的结果连接起来即可得到...