在之前的文章中Spark的demo对RDD的简单操作,通过学习《Spark快速大数据分析》,记录一下对RDD的详细操作
1.转化操作
map
我们可以使用map()来做各种各样的事情:可以把我们的URL集合中的每个URL对应的
主机名提取出来,也可以简单到只对各个数字求平方值。map()的返回值类型不需要和输
入类型一样。这样如果有一个字符串RDD,并且我们的map()函数是用来把字符串解析
并返回一个Double值的,那么此时我们的输入RDD类型就是RDD[String],而输出类型
是RDD[Double]。
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));
flatMap
有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作flatMap()。
和map()类似,我们提供给flatMap()的函数被分别应用到了输入RDD的每个元素上。不
过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组
成的。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
words.first(); // 返回"hello"
distinct()
我们的RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只
要唯一的元素,我们可以使用RDD.distinct()转化操作来生成一个只包含不同元素的新
RDD。不过需要注意,distinct()操作的开销很大,因为它需要将所有数据通过网络进行
混洗(shuffle),以确保每个元素都只有一份。第4章会详细介绍数据混洗,以及如何避免
数据混洗。
在例子中还有一些其他的操作:
filter
JavaRDD<String> errorLines = lines.filter (new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
return v1.contains( "error");
}
});
foreach
badLines.foreach(new VoidFunction<String>() {
@Override
public void call(String t) throws Exception {
System. out.println(t);
}
});
2.集合的操作(也属于转化操作)
3.行动操作
reduce()
你很有可能会用到基本
RDD
上最常见的行动操作reduce()。它接收一个函数作为参数,这个
函数要操作两个
RDD
的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就
是函数
+,可以用它来对我们的RDD
进行累加。使用
reduce(),可以很方便地计算出RDD
中所有元素的总和、元素的个数,以及其他类型的聚合操作
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
例:
JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
Integer aaa = lines.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(aaa);//结果为55
fold()
fold()
和reduce()
类似,接收一个与
reduce()
接收的函数签名相同的函数,再加上一个
“初始值”来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作
的单位元素;也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如
+
对应的0,
*
对应的1
,或拼接操作对应的空列表)。
JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
Integer aaa = lines.fold(100, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(aaa);//结果为255
aggregate()public class AvgCount implements Serializable{
public AvgCount( int total, int num) {
this. total = total;
this. num = num;
}
public int total;
public int num;
public double avg() {
return total / ( double) num;
}
}
JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
AvgCount initial = new AvgCount(0, 0);
Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount result = lines.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
4.持久化
persist()
unpersist()
分享到:
相关推荐
在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及调用 RDD 操作进行求值。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象, ...
在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...
RDD操作可以分为两大类:转换操作和行动操作。 1. 转换操作:返回一个新的RDD,所有转换函数都是Lazy,不会立即执行,需要行动函数触发。 2. 行动操作:返回值不是RDD(无返回值或返回其他),所有行动函数立即执行...
惰性求值:RDD的转化操作是惰性求值的,即在被调用行动操作之前Spark不会开始计算,相反,Spark会在内部记录下索要求执行的操作的相关信息。例如,当我们调用jsc.textFile()时,数据并没有读取进来,而是在必要时才...
JavaRDD<Integer> flatNumbersRDD = nestedNumbersRDD.flatMap(new FlatMapFunction, Integer>() { private static final long serialVersionUID = 1L; @Override public Iterator<Integer> call(List<Integer> ...
本资源是关于Spark 2.0.1 Java API的详细指南,通常以CHM(Microsoft HTML Help)格式呈现,这种格式便于用户搜索和查阅所需的具体API。 Spark的核心组件包括: 1. **Spark Core**:这是Spark的基础,提供了分布式...
JavaRDD<String> rdd = sc.textFile("hdfs://path/to/file"); ``` 3. **转换(Transformations)**: RDD上的算子,如`map()`, `filter()`, `reduceByKey()`, `groupByKey()`等,它们不立即执行,而是创建一个新的...
本篇文章将深入探讨如何使用Java开发Spark程序,并基于提供的"sparkJava"压缩包文件中的示例代码进行解析。 首先,我们需要理解Spark的核心概念。Spark主要由四个组件构成:Spark Core、Spark SQL、Spark Streaming...
与RDD紧密相关的是Spark中的并行操作,包括转换(transformations)和行动(actions)。转换是惰性的,不会立即执行,只有当它们被行动触发时才会执行。文档对各种转换操作和行动操作进行了说明,包括对键值对(key-...
- **Spark 的实现**:Spark 中的 RDD 是通过 Scala 语言实现的,并提供 Java 和 Python 等语言的 API 接口。此外,Spark 还提供了对 SQL 查询的支持以及机器学习库 MLlib。 - **用户案例与基准测试**:Spark 项目...
9. **交互式编程**:Spark提供了Scala、Java、Python和R等多种API,支持交互式Shell,方便开发者进行快速原型验证和调试。 在描述中提到的"Spark RDD以及其特性的流程图",可能是对RDD创建、转换、持久化和执行流程...
在Spark中,RDD操作主要有两种类型:转换(Transformation)和行动(Action)。转换操作如map、mapPartitionsWithIndex和filter等,它们创建新的RDD而不立即执行任何计算。行动操作如collect、count和save等,触发...
课时3:Spark RDD操作 课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理...
- **JavaRDD**:Spark的Java接口,提供了对RDD的基本操作。通过`SparkContext`的`parallelize()`方法创建。 ```java JavaRDD<String> rdd = sc.parallelize(Arrays.asList("line1", "line2")); ``` ### 3. RDD操作...
EXTENDING SPARK WITH JAVA AGENTS EXTENDING SPARK WITH JAVA AGENTS是指使用Java代理扩展Spark的功能,以提高性能和优化大数据应用程序的性能。该技术主要涉及到Spark缓存和Java代理的集成,旨在解决Spark缓存...
Java API提供了创建和操作RDD的方法。 2. DataFrame:基于DataFrame的API提供了更高级别的抽象,它是一个表或关系的概念,支持SQL查询和DataFrame API。 3. Dataset:是DataFrame的类型安全版本,结合了RDD的性能和...
在这个程序中,我们将深入理解Spark的核心概念,如RDD(弹性分布式数据集)以及并行计算的基本操作。 **1. Spark核心概念** - **RDD(Resilient Distributed Datasets)**:RDD是Spark中的基本数据抽象,它是不可...
Spark 支持多种编程语言,包括 Scala、Python、Java 等。 Spark 的运行模式 Spark 支持多种运行模式,包括: * Local 模式:用于测试和开发。 * Standalone 模式:独立集群模式。 * Spark on Yarn 模式:Spark ...
(1)一组分片(Partition),即数据集的基本组成单位 (2)一个计算每个分区的函数 (3)RDD之间的依赖关系 (4)一个Partitioner,即RD
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function....