`
m635674608
  • 浏览: 5028742 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark RDD使用详解2--RDD创建方式

 
阅读更多

RDD创建方式

1)从Hadoop文件系统(如HDFS、Hive、HBase)输入创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。
4)基于DB(Mysql)、NoSQL(HBase)、S3(SC3)、数据流创建。

从集合创建RDD

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

从一个Seq集合创建RDD。

参数1:Seq集合,必须。

参数2:分区数,默认为该Application分配到的资源的CPU核数

 

 

[java] view plain copy
 
  1. scala> var rdd = sc.parallelize(1 to 10)  
  2. rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21  
  3.    
  4. scala> rdd.collect  
  5. res3: Array[Int] = Array(12345678910)  
  6.    
  7. scala> rdd.partitions.size  
  8. res4: Int = 15  
  9.    
  10. //设置RDD为3个分区  
  11. scala> var rdd2 = sc.parallelize(1 to 10,3)  
  12. rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :21  
  13.    
  14. scala> rdd2.collect  
  15. res5: Array[Int] = Array(12345678910)  
  16.    
  17. scala> rdd2.partitions.size  
  18. res6: Int = 3  

 

  • makeRDD

def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

这种用法和parallelize完全相同

def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]

该用法可以指定每一个分区的preferredLocations。

 

[java] view plain copy
 
  1. scala> var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),  
  2. (11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))  
  3. collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(12345678910),  
  4. List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(1112131415),List(slave013.lxw1234.com, slave015.lxw1234.com)))  
  5.    
  6. scala> var rdd = sc.makeRDD(collect)  
  7. rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23  
  8.    
  9. scala> rdd.partitions.size  
  10. res33: Int = 2  
  11.    
  12. scala> rdd.preferredLocations(rdd.partitions(0))  
  13. res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com)  
  14.    
  15. scala> rdd.preferredLocations(rdd.partitions(1))  
  16. res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)  
指定分区的优先位置,对后续的调度优化有帮助。

 

 

从外部存储创建RDD

  • textFile

 

//从hdfs文件创建.

[java] view plain copy
 
  1. //从hdfs文件创建  
  2. scala> var rdd = sc.textFile("hdfs:///tmp/lxw1234/1.txt")  
  3. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at textFile at :21  
  4.    
  5. scala> rdd.count  
  6. res48: Long = 4  
  7.    
  8. //从本地文件创建  
  9. scala> var rdd = sc.textFile("file:///etc/hadoop/conf/core-site.xml")  
  10. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at textFile at :21  
  11.    
  12. scala> rdd.count  
  13. res49: Long = 97    

注意这里的本地文件路径需要在Driver和Executor端存在。

  • 从其他HDFS文件格式创建

hadoopFile

sequenceFile

objectFile

newAPIHadoopFile

  • 从Hadoop接口API创建

hadoopRDD

newAPIHadoopRDD

比如:从Hbase创建RDD

[java] view plain copy
 
  1. scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  
  2. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  
  3.    
  4. scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  6.    
  7. scala> import org.apache.hadoop.hbase.client.HBaseAdmin  
  8. import org.apache.hadoop.hbase.client.HBaseAdmin  
  9.    
  10. scala> val conf = HBaseConfiguration.create()  
  11. scala> conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")  
  12. scala> var hbaseRDD = sc.newAPIHadoopRDD(  
  13. conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])  
  14.    
  15. scala> hbaseRDD.count  
  16. res52: Long = 1  

 

http://blog.csdn.net/guohecang/article/details/51742557

 
分享到:
评论

相关推荐

    spark rdd 操作详解

    spark rdd相关操作详解;包括全部的操作说明和举例;

    spark-assembly-1.5.2-hadoop2.6.0.jar

    《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...

    Spark1.4.1 RDD算子详解

    结合代码详细描述RDD算子的执行流程,并配上执行流程图

    sparkrdd的讲解

    ### Spark RDD详解 #### Spark计算模型与RDD概念 在探讨Spark的弹性分布式数据集(RDD)之前,我们首先需要理解Spark的基本计算模型。Spark是一种基于内存的分布式计算框架,其核心设计思想在于通过缓存中间结果来...

    RDD编程初级实践-答案-厦门大学

    2. **熟悉使用RDD编程解决实际问题的方法**:通过具体的案例练习,使学生能够将理论知识应用到实践中。 #### 二、实验平台 - **操作系统**:Ubuntu 16.04 - **Spark版本**:2.1.0 Ubuntu 16.04是一个长期支持(LTS)...

    RDD编程初级实践-答案-实验报告-纠正版

    4. **熟悉Spark的RDD基本操作及键值对操作**:通过实践了解和掌握Resilient Distributed Datasets (RDD) 的常用操作及其在键值对数据处理中的应用。 5. **使用RDD编程解决实际问题的方法**:通过具体的案例练习,...

    spark实验5 rdd编程2.doc

    本实验旨在通过具体的数据处理任务,深入理解Apache Spark中Resilient Distributed Datasets (RDD) 的使用方法及其在解决实际问题中的作用。实验选取了一所大学计算机系的成绩数据作为分析对象,通过对这些数据的...

    大数据--Apache Spark编程详解

    - **SparkContext**:创建SparkContext对象是启动Spark程序的第一步,该对象用于配置集群访问方式,并创建RDD。 - **Master Parameter**:Master参数指定了SparkContext连接的集群类型和大小,常见的有`local`、`...

    Spark SQL编程初级实践-答案-实验报告-纠正版

    - **创建RDD**:使用Scala编写代码,通过`spark.sparkContext.textFile()`方法读取文本文件,创建RDD。 - **转换为DataFrame**:使用`.map()`和`.toDF()`方法将RDD转换为DataFrame。 - **打印DataFrame**:使用`....

    spark Core RDD持久化详解

    尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    **步骤2**:创建TaskScheduler实例,根据Spark的运行模式选择对应的SchedulerBackend。同时启动TaskScheduler,这是整个初始化过程中非常关键的一步。 ```scala private[spark] var taskScheduler = SparkContext....

    Spark dataframe使用详解

    Spark DataFrame 使用详解 Spark DataFrame 是一种基于 RDD 的分布式数据集,它提供了详细的结构信息,能够清楚地知道该数据集中包含哪些列、每列的名称和类型。相比于 RDD,DataFrame 的优点在于能够直接获得数据...

    Spark源码系列(二)RDD详解

    上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。RDD的全名是ResilientDistributedDataset,意思是容错的分布式数据集,每一个RDD都会有5个...

    advanced-spark-training.pdf

    ### 高级Spark培训知识点详解 #### 一、概述 本次文档主要针对已经熟悉Spark基本操作(如word count)的用户进行深入讲解,旨在帮助读者更深入地理解Spark的工作原理及如何优化Spark应用性能。主要内容包括:正式...

    spark机器学习K-Means算法详解.zip

    2. **Spark MLlib中的KMeans API**: - `KMeans`类提供了训练模型的方法,包括设置`k`(聚类数量)、`maxIterations`(最大迭代次数)、`tol`(容忍误差)等参数。 - 使用`fit()`方法训练模型,输入为`RDD[Vector]...

    Scala-hadoop-spark-新教程含金量最高的大数据教程

    - **创建方式**: - 从本地文件系统、HDFS等数据源读取:`sc.textFile("path/to/file")` - 从已有的集合转换:`val data = Array(1, 2, 3); val rdd = sc.parallelize(data)` **3. 使用Java创建SparkContext和RDD...

    RDD使用基础

    ### RDD使用基础详解 #### 一、RDD简介与特点 **RDD**(Resilient Distributed Dataset)是Apache Spark的核心概念之一,它代表一种只读的、可分区的分布式数据集。RDD具有高度的容错性,并能够透明地利用内存来...

Global site tag (gtag.js) - Google Analytics