spark 操作的几个步骤
1 数据关联 textFile 和 parallelize
2 转换操作(JavaRDD和JavaPairRDD他们可以通过mapToPair and flatMapToPair转换)
3 action操作,获取数据结果
一、wordcount的例子
//单词统计 public static void wordCount(JavaSparkContext ctx ){ String filePath = "e://log1.log"; JavaRDD<String> lines = ctx.textFile(filePath, 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + " : " + tuple._2()); } }
二、 各种Transformations 和action测试准备数据
public static void testMap(JavaSparkContext ctx) { List<Integer> data = Arrays.asList(1, 2, 3, 4, 5,1,2,3); JavaRDD<Integer> distData = ctx.parallelize(data); List<Integer> data2 = Arrays.asList(1, 2, 3, 4, 5,6); JavaRDD<Integer> distData2 = ctx.parallelize(data2); List<String> data3 = Arrays.asList("wang zhan,xiao ming,li xin,wang qiang,e,f".split(",")); JavaRDD<String> data3RDD = ctx.parallelize(data3); JavaRDD<Integer> lineLengths ; JavaRDD<String> returnStr; JavaPairRDD<String, Integer> returnStr2; JavaPairRDD<String, Integer> returnStr3; // lineLengths = map( distData); // lineLengths = filter(distData ); // lineLengths = sample(distData ); // lineLengths = union(distData, distData2) ; // lineLengths = intersection(distData, distData2) ; // lineLengths = distinct(distData ) ; // returnStr = flatMap(data3RDD); //数据扁平打散 // returnStr3 = mapToPair(returnStr);// 数据变成键值对的形式 // reduceByKey(returnStr3); //对key进行分组计算 // List<Integer> list = lineLengths.collect(); // WordCount.print(list); // List<String> list2 = returnStr.collect() ; // WordCount.print(list2); // testPersist( data3RDD ); // List<Tuple2<String, Integer>> listTuple = returnStr2.collect(); // printTuple( listTuple ); // returnStr3 = reduceByKey(returnStr2 ); // List<Tuple2<String, Integer>> listTuple2 = returnStr3.collect(); // printTuple( listTuple2 ); //reduceByKey(returnStr2 ); //action //reduce( returnStr2); // count(returnStr2 ); }
三、groupByKey
//对key进行分组处理,但如果需要统计求和则最好不要这样处理 private static JavaPairRDD<String, Integer> groupByKey(JavaPairRDD<String, Integer> returnStr3) { JavaPairRDD<String, Iterable<Integer>> rdd = returnStr3.groupByKey(); return null; }
四、数据去重
//数据去重 private static JavaRDD<Integer> distinct(JavaRDD<Integer> distData) { JavaRDD<Integer> rdd3 = distData.distinct( ); print( rdd3); return null; }
五、交集数据
//获取rdd数据的交集 数据 private static JavaRDD<Integer> intersection(JavaRDD<Integer> distData, JavaRDD<Integer> distData2) { JavaRDD<Integer> rdd3 = distData.intersection(distData2 ); print(rdd3 ); return null; }
六数据持久化
/** * 持久化数据 * @param data3RDD */ private static void testPersist(JavaRDD<String> data3RDD) { System.out.println( "持久化数据到目录。。。"); data3RDD.persist(StorageLevel.MEMORY_ONLY()); // data3RDD.checkpoint(); // data3RDD.isCheckpointed() ; }
七、count统计
//计算统计 元素两两传入到reduce中然后计算统计 private static void count(JavaPairRDD<String, Integer> returnStr2) { System.out.println("元素总数:"+returnStr2.count() ); //获取元素总数 System.out.println("元素总数:"+returnStr2.first() );//获取第一个元素 System.out.println("元素总数:"+returnStr2.take(2) );//获取RDD的前2个元素 System.out.println("元素countByKey总数:"+returnStr2.countByKey( ) ); //根据key进行统计数量 //returnStr2.saveAsTextFile("E://test.txt") ;//数据保存到文件中 returnStr2.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println( t ); } }); }
八、reduce操作
//计算统计 元素两两传入到reduce中然后计算统计 private static void reduce(JavaPairRDD<String, Integer> returnStr2) { Tuple2<String, Integer> t = returnStr2.reduce( new Function2<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> call( Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception { System.out.println( v1 +" "+v2 ); return new Tuple2<String, Integer>(v1._1+v2._1,v1._2+v2._2() ); } } ); System.out.println("reduce结果 :"+t._1 +" "+t._2); }
九、reduceBykey使用
//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中 public static JavaPairRDD<String, Integer> reduceByKey(JavaPairRDD<String, Integer> rdd ) { JavaPairRDD<String, Integer> counts = rdd.reduceByKey( new Function2<Integer, Integer, Integer>() {//泛型分别是 :两个计算参数 ,最后是返回值 @Override public Integer call(Integer i1, Integer i2) {//每次把key相同的数据,与上一次执行的结果,依次传进来计算 System.out.println(i1+" == "+i2); return i1 + i2; } }); printPair(counts ); return counts; }
十 flatMap使用
//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中 public static JavaRDD<String> flatMap(JavaRDD<String> rdd ) { JavaRDD<String> lineLengths = rdd.flatMap( //返回值是输出的类型 new FlatMapFunction<String, String>() { //第一个参数是输入,第二个参数是输出 @Override public Iterable<String> call(String str) throws Exception { return Arrays.asList(SPACE.split(str)); } }) ; printStr(lineLengths ); return lineLengths; }
十一、mapToPair使用
// 将普通的RDD转换为 map数据的RDD方便计算处理 , a a->1 public static JavaPairRDD<String, Integer> mapToPair(JavaRDD<String> rdd ){ JavaPairRDD<String, Integer> ones = rdd.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>( t, 1); } }) ; return ones ; }
十二、 合并Rdd
// 合并rdd public static JavaRDD<Integer> union(JavaRDD<Integer> rdd , JavaRDD<Integer> rdd2) { JavaRDD<Integer> lineLengths = rdd.union(rdd2) ; print(lineLengths); return lineLengths; }
十三、抽样
// Return a sampled subset of this RDD. 返回一个RDD子集抽样 public static JavaRDD<Integer> sample(JavaRDD<Integer> rdd) { JavaRDD<Integer> lineLengths = rdd.sample(false , 0.4 ) ; print(lineLengths); return lineLengths; }
十四、map使用
// map 源中的每一个元素都进行一个函数操作,生成一个新的RDD ,即每个元素进行一次转换 public static JavaRDD<Integer> map(JavaRDD<Integer> rdd) { JavaRDD<Integer> lineLengths = rdd.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1+1; } }); print( lineLengths); return lineLengths; }
十五、打印数据
/** * 如果rdd中数据过多,则调用take获取一部分数据打印 * 1 直接rdd.foreach 打印rdd的数据,数据打印在各个executor中 * 2 调用rdd.collect.foreach 打印数据,数据打印在driver上 * @param lineLengths */ public static void print(JavaRDD<Integer> lineLengths ){ // lineLengths.foreach( new VoidFunction<Integer>() { // @Override // public void call(Integer t) throws Exception { // } // }); System.out.println("开始打印"); lineLengths.collect().forEach( new Consumer<Integer>(){ @Override public void accept(Integer t) { System.out.println( t); } }); System.out.println("结束打印"); } public static void printStr(JavaRDD<String> lineLengths ){ System.out.println("开始打印"); lineLengths.collect().forEach( new Consumer<String>(){ @Override public void accept(String t) { System.out.println( t); } }); System.out.println("结束打印"); } /** * 打印tuble * @param lineLengths */ public static void printPair( JavaPairRDD<String, Integer> rdd ){ System.out.println("开始打印"); rdd.collect().forEach( new Consumer<Tuple2<String,Integer>>(){ @Override public void accept(Tuple2<String, Integer> t) { System.out.println( t._1() +" "+t._2() ); } }); System.out.println("结束打印"); } public static void print(List list) { if (list == null || list.size() == 0) { return; } for (int i = 0; i < list.size(); i++) { System.out.println(list.get(i)); } } /** * 打印map的RDD * @param listTuple */ public static void printTuple(List<Tuple2<String, Integer>> listTuple) { if (listTuple == null || listTuple.size() == 0) { return; } for (int i = 0; i < listTuple.size(); i++) { System.out.println(listTuple.get(i)); } }
十六、过滤
//对每一个元素进行过滤,然后返回 ,false 的数据会被过滤掉 public static JavaRDD<Integer> filter(JavaRDD<Integer> rdd) { JavaRDD<Integer> lineLengths = rdd.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer v1) throws Exception { if (v1 != 1) { return true; } return false; } }); print( lineLengths); return lineLengths; }
相关推荐
在本项目中,我们重点关注的是Spark的算子应用,这些算子是Spark编程模型的基础,允许开发者进行数据的转换和操作。以下是Spark的一些关键算子及其功能: 1. **map()**: 这个算子用于将每个输入元素应用一个函数,...
《Spark编程基础》是厦门大学计算机科学系林子雨教授编写的一份教材讲义,主要针对2018年1月的课程版本。这本教材深入浅出地介绍了大数据处理框架Spark的核心概念、设计原理以及实际应用。通过涵盖多个章节的内容,...
在本资料包中,我们将围绕"Spark基础知识"进行探讨,包括Spark的架构、RDD概念、Spark SQL以及Spark与Scala、Hadoop的关联。 首先,Spark的架构基于“计算向数据移动”的理念,它由Driver、Executor和Cluster ...
2. **RDD**:RDD是Spark中最基础的数据抽象,它是只读的、分区的、可容错的数据集合。通过RDD,Spark实现了对大规模数据的并行处理。 3. **Spark SQL**:Spark SQL是Spark用于结构化数据处理的模块,它将SQL查询与...
在大数据分析领域,Hadoop和Spark是两个至关重要的工具,它们共同构成了高效处理海量数据的基础架构。本实例“HadoopSparkExampler”旨在通过实际操作,帮助用户理解如何结合使用这两个技术进行机器学习任务。 ...
《Spark编程基础及项目实践》课程的课后习题涵盖了Spark的核心概念和关键特性,旨在帮助学生深入理解和应用Spark框架。以下是对习题内容的详细解释: 1. Spark的运行架构: - Driver Program:是应用程序的主要...
《Spark编程基础及项目实践》课程的课后答案涵盖了Spark的核心概念、主要功能以及在实际项目中的应用。这个压缩包包含的资源旨在帮助学生深入理解并掌握Spark编程的关键知识点。 一、Spark概述 Spark是大数据处理...
这个"spark基础知识.zip"压缩包包含了多个PDF文档,覆盖了Spark的各个方面,从基础入门到高级实践,适合对Spark感兴趣或正在学习Spark的人员。 1. **Spark大数据架构概述** (1 Spark大数据架构概述.pdf) - Spark的...
2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...
Spark Core是Apache Spark的核心组件,它是Spark所有其他模块的基础,包括Spark SQL、Spark Streaming和MLlib等。这个压缩包文件“spark-core”很可能包含了与Spark Core相关的代码示例或者文档,帮助用户理解和学习...
- 不懂运行,下载完可以私聊问...3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------
这个文档可能涵盖了Spark MLlib库中的基础到进阶的机器学习算法实现。MLlib是Apache Spark的核心组件之一,提供了丰富的机器学习算法,包括分类、回归、聚类、协同过滤等,同时也支持模型评估和调优。 首先,我们来...
本篇研究文献着重探讨了Spark的基础实践,并对其内部工作原理、应用案例进行了深入分析。 首先,Spark的出现解决了传统大数据处理方法中存在的效率和实时性问题。它是一个基于内存计算的并行计算框架,这使其在处理...
2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...
通过实例讲解如何使用Spark MLlib构建机器学习模型,包括分类、回归、聚类等任务。 ### 第5章 企业项目实战:使用Spark机器学习库构建电影推荐引擎 5-1 spark机器学习之推荐引擎1 基于Spark的推荐系统设计,包括...
1.1.1 Spark基础入门; 1.1.2 Spark集群搭建; 2.1 Spark编程实例; 2.2 简易电影受众系统; 3 Spark计算引擎剖析; 4、Spark应用经验与程序调优; 5.1 SparkSQL与DataFrame; 5.2 SparkSQL程序设计基础; 6.1Spark+...
1. Spark Core:基础架构,提供任务调度、内存管理、故障恢复等核心功能。 2. Spark SQL:支持SQL查询和DataFrame/Dataset API,方便数据处理。 3. Spark Streaming:基于微批处理的实时流处理。 4. MLlib:机器学习...
它的这些特点使得Scala成为了开发Apache Spark的基础语言。 首先,Scala是一门函数式编程语言。函数式编程理念在Scala中得到了体现,它鼓励将程序视为一系列函数的组合。在函数式编程中,函数是一等公民,它们可以...
2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...
2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...