`
dulinanaaa
  • 浏览: 14992 次
文章分类
社区版块
存档分类
最新评论

Spark关于Java的RDD操作

 
阅读更多

在之前的文章中Spark的demo对RDD的简单操作,通过学习《Spark快速大数据分析》,记录一下对RDD的详细操作

1.转化操作

map
我们可以使用map()来做各种各样的事情:可以把我们的URL集合中的每个URL对应的
主机名提取出来,也可以简单到只对各个数字求平方值。map()的返回值类型不需要和输
入类型一样。这样如果有一个字符串RDD,并且我们的map()函数是用来把字符串解析
并返回一个Double值的,那么此时我们的输入RDD类型就是RDD[String],而输出类型
RDD[Double]
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));

flatMap
有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作flatMap()
map()类似,我们提供给flatMap()的函数被分别应用到了输入RDD的每个元素上。不
过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组
成的。我们得到的是一个包含各个迭代器可访问的所有元素的RDD
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
words.first(); // 返回"hello"

distinct()
我们的RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只
要唯一的元素,我们可以使用RDD.distinct()转化操作来生成一个只包含不同元素的新
RDD。不过需要注意,distinct()操作的开销很大,因为它需要将所有数据通过网络进行
混洗(shuffle),以确保每个元素都只有一份。第4章会详细介绍数据混洗,以及如何避免
数据混洗。

在例子中还有一些其他的操作:

filter

JavaRDD<String> errorLines = lines.filter (new Function<String, Boolean>() {
    @Override
    public Boolean call(String v1) throws Exception {
        return v1.contains( "error");
    }
});

foreach

badLines.foreach(new VoidFunction<String>() {

    @Override
    public void call(String t) throws Exception {
        System. out.println(t);
    }
});



2.集合的操作(也属于转化操作)



3.行动操作
reduce()
你很有可能会用到基本 RDD 上最常见的行动操作reduce()。它接收一个函数作为参数,这个
函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就
是函数 +,可以用它来对我们的RDD 进行累加。使用 reduce(),可以很方便地计算出RDD
中所有元素的总和、元素的个数,以及其他类型的聚合操作
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
例:
JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

Integer aaa = lines.reduce(new Function2<Integer, Integer, Integer>() {

    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
        return v1 + v2;
    }
});

System.out.println(aaa);//结果为55

fold()
fold() reduce() 类似,接收一个与 reduce() 接收的函数签名相同的函数,再加上一个
“初始值”来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作
的单位元素;也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如 +
对应的0 * 对应的1 ,或拼接操作对应的空列表)。
JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

Integer aaa = lines.fold(100, new Function2<Integer, Integer, Integer>() {

    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
        return v1 + v2;
    }
});

System.out.println(aaa);//结果为255

aggregate()
public class AvgCount implements Serializable{
      public AvgCount( int total, int num) {
             this. total = total;
             this. num = num;
      }
      public int total;
      public int num;
      public double avg() {
             return total / ( double) num;
      }
}

JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
AvgCount initial = new AvgCount(0, 0);

Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
        a.total += x;
        a.num += 1;
        return a;
    }
};
Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
        a.total += b.total;
        a.num += b.num;
        return a;
    }
};
AvgCount result = lines.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());




4.持久化
persist()
unpersist()




分享到:
评论

相关推荐

    大数据spark学习之rdd概述

    在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及调用 RDD 操作进行求值。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象, ...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...

    spark考试(练习题)编程题笔记!

    RDD操作可以分为两大类:转换操作和行动操作。 1. 转换操作:返回一个新的RDD,所有转换函数都是Lazy,不会立即执行,需要行动函数触发。 2. 行动操作:返回值不是RDD(无返回值或返回其他),所有行动函数立即执行...

    Spark学习--RDD编码

    惰性求值:RDD的转化操作是惰性求值的,即在被调用行动操作之前Spark不会开始计算,相反,Spark会在内部记录下索要求执行的操作的相关信息。例如,当我们调用jsc.textFile()时,数据并没有读取进来,而是在必要时才...

    25个经典Spark算子的JAVA实现

    JavaRDD&lt;Integer&gt; flatNumbersRDD = nestedNumbersRDD.flatMap(new FlatMapFunction, Integer&gt;() { private static final long serialVersionUID = 1L; @Override public Iterator&lt;Integer&gt; call(List&lt;Integer&gt; ...

    spark 2.0.1 JavaAPI

    本资源是关于Spark 2.0.1 Java API的详细指南,通常以CHM(Microsoft HTML Help)格式呈现,这种格式便于用户搜索和查阅所需的具体API。 Spark的核心组件包括: 1. **Spark Core**:这是Spark的基础,提供了分布式...

    spark2.1.0 JAVA API

    JavaRDD&lt;String&gt; rdd = sc.textFile("hdfs://path/to/file"); ``` 3. **转换(Transformations)**: RDD上的算子,如`map()`, `filter()`, `reduceByKey()`, `groupByKey()`等,它们不立即执行,而是创建一个新的...

    java开发spark程序

    本篇文章将深入探讨如何使用Java开发Spark程序,并基于提供的"sparkJava"压缩包文件中的示例代码进行解析。 首先,我们需要理解Spark的核心概念。Spark主要由四个组件构成:Spark Core、Spark SQL、Spark Streaming...

    Spark Programming Guide - Spark 2.0(Java)

    与RDD紧密相关的是Spark中的并行操作,包括转换(transformations)和行动(actions)。转换是惰性的,不会立即执行,只有当它们被行动触发时才会执行。文档对各种转换操作和行动操作进行了说明,包括对键值对(key-...

    spark RDD 论文

    - **Spark 的实现**:Spark 中的 RDD 是通过 Scala 语言实现的,并提供 Java 和 Python 等语言的 API 接口。此外,Spark 还提供了对 SQL 查询的支持以及机器学习库 MLlib。 - **用户案例与基准测试**:Spark 项目...

    Spark RDD以及其特性.rar_RDD_Spark!_parallelbwz_spark_特性

    9. **交互式编程**:Spark提供了Scala、Java、Python和R等多种API,支持交互式Shell,方便开发者进行快速原型验证和调试。 在描述中提到的"Spark RDD以及其特性的流程图",可能是对RDD创建、转换、持久化和执行流程...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    在Spark中,RDD操作主要有两种类型:转换(Transformation)和行动(Action)。转换操作如map、mapPartitionsWithIndex和filter等,它们创建新的RDD而不立即执行任何计算。行动操作如collect、count和save等,触发...

    Spark机器学习视频第4课.SparkRDD原理剖析

    课时3:Spark RDD操作 课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理...

    Spark 1.0.0 API (java)

    - **JavaRDD**:Spark的Java接口,提供了对RDD的基本操作。通过`SparkContext`的`parallelize()`方法创建。 ```java JavaRDD&lt;String&gt; rdd = sc.parallelize(Arrays.asList("line1", "line2")); ``` ### 3. RDD操作...

    藏经阁-EXTENDING SPARK WITH JAVA AGEN.pdf

    EXTENDING SPARK WITH JAVA AGENTS EXTENDING SPARK WITH JAVA AGENTS是指使用Java代理扩展Spark的功能,以提高性能和优化大数据应用程序的性能。该技术主要涉及到Spark缓存和Java代理的集成,旨在解决Spark缓存...

    spark2.1.0.chm(spark java API)

    Java API提供了创建和操作RDD的方法。 2. DataFrame:基于DataFrame的API提供了更高级别的抽象,它是一个表或关系的概念,支持SQL查询和DataFrame API。 3. Dataset:是DataFrame的类型安全版本,结合了RDD的性能和...

    WordCount_Spark!_spark_wordcount_java_

    在这个程序中,我们将深入理解Spark的核心概念,如RDD(弹性分布式数据集)以及并行计算的基本操作。 **1. Spark核心概念** - **RDD(Resilient Distributed Datasets)**:RDD是Spark中的基本数据抽象,它是不可...

    spark rdd 实战 ,基本语法

    Spark 支持多种编程语言,包括 Scala、Python、Java 等。 Spark 的运行模式 Spark 支持多种运行模式,包括: * Local 模式:用于测试和开发。 * Standalone 模式:独立集群模式。 * Spark on Yarn 模式:Spark ...

    fantj2016#java-reader#3. Spark-初识RDD1

    (1)一组分片(Partition),即数据集的基本组成单位 (2)一个计算每个分区的函数 (3)RDD之间的依赖关系 (4)一个Partitioner,即RD

    spark计数demo

    import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function....

Global site tag (gtag.js) - Google Analytics