`

Spark基础实例

 
阅读更多

 

spark 操作的几个步骤

1  数据关联  textFile 和 parallelize

2 转换操作(JavaRDD和JavaPairRDD他们可以通过mapToPair and flatMapToPair转换) 

3  action操作,获取数据结果 

 

一、wordcount的例子

 

	//单词统计 
	public static void wordCount(JavaSparkContext ctx  ){
		String filePath = "e://log1.log";
		JavaRDD<String> lines = ctx.textFile(filePath, 1);

		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public Iterable<String> call(String s) {
				return Arrays.asList(SPACE.split(s));
			}
		});

		JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});

		JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		
		List<Tuple2<String, Integer>> output = counts.collect();
		for (Tuple2<?, ?> tuple : output) {
			System.out.println(tuple._1() + " : " + tuple._2());
		}
	}

二、 各种Transformations 和action测试准备数据

 

  

	public static void testMap(JavaSparkContext ctx) {

		List<Integer> data = Arrays.asList(1, 2, 3, 4, 5,1,2,3);
		JavaRDD<Integer> distData = ctx.parallelize(data);
		
		List<Integer> data2 = Arrays.asList(1, 2, 3, 4, 5,6);
		JavaRDD<Integer> distData2 = ctx.parallelize(data2);
		
		List<String> data3 = Arrays.asList("wang zhan,xiao ming,li xin,wang qiang,e,f".split(","));
		JavaRDD<String> data3RDD = ctx.parallelize(data3);
		
		JavaRDD<Integer> lineLengths ;
		JavaRDD<String> returnStr;
		JavaPairRDD<String, Integer> returnStr2;
		JavaPairRDD<String, Integer> returnStr3;
		
//		lineLengths = map( distData);
//		lineLengths = filter(distData );
//		lineLengths = sample(distData );
//		lineLengths = union(distData, distData2) ;
//		lineLengths = intersection(distData, distData2) ;
//		lineLengths = distinct(distData ) ;
		
//		returnStr = flatMap(data3RDD); //数据扁平打散 
//		returnStr3 = mapToPair(returnStr);// 数据变成键值对的形式 
//		reduceByKey(returnStr3); //对key进行分组计算 
		
		
//		List<Integer> list = lineLengths.collect();
//		WordCount.print(list);
		
//		List<String> list2 = returnStr.collect() ; 
//		WordCount.print(list2);
 
//		testPersist( data3RDD );
		
//		List<Tuple2<String, Integer>> listTuple = returnStr2.collect();
//		printTuple( listTuple );
		
//		returnStr3 = reduceByKey(returnStr2  );
//		List<Tuple2<String, Integer>> listTuple2 = returnStr3.collect();
//		printTuple( listTuple2 );
		//reduceByKey(returnStr2  );
		
		//action 
		//reduce( returnStr2);
		// count(returnStr2 );
	}

 

 

三、groupByKey 

	//对key进行分组处理,但如果需要统计求和则最好不要这样处理 
	private static JavaPairRDD<String, Integer> groupByKey(JavaPairRDD<String, Integer> returnStr3) {
		JavaPairRDD<String, Iterable<Integer>>  rdd = returnStr3.groupByKey();
		
		return null;
	}

 四、数据去重

  

	//数据去重 
	private static JavaRDD<Integer> distinct(JavaRDD<Integer> distData) {
		JavaRDD<Integer> rdd3 = distData.distinct( );
		print( rdd3);
		return null;
	}

 

五、交集数据

 

	//获取rdd数据的交集 数据
	private static JavaRDD<Integer> intersection(JavaRDD<Integer> distData,
			JavaRDD<Integer> distData2) {
		JavaRDD<Integer> rdd3 = distData.intersection(distData2 ); 
		print(rdd3 ); 
		
		return null;
	}

 

六数据持久化

 

	/**
	 * 持久化数据 
	 * @param data3RDD
	 */
	private static void testPersist(JavaRDD<String> data3RDD) {
		System.out.println( "持久化数据到目录。。。");
		data3RDD.persist(StorageLevel.MEMORY_ONLY());
//		data3RDD.checkpoint();  
//		data3RDD.isCheckpointed() ;
	}

七、count统计

 

	//计算统计  元素两两传入到reduce中然后计算统计
	private static void count(JavaPairRDD<String, Integer> returnStr2) {
		System.out.println("元素总数:"+returnStr2.count()  ); //获取元素总数
		System.out.println("元素总数:"+returnStr2.first() );//获取第一个元素
		System.out.println("元素总数:"+returnStr2.take(2) );//获取RDD的前2个元素
		System.out.println("元素countByKey总数:"+returnStr2.countByKey( )  ); //根据key进行统计数量 
		//returnStr2.saveAsTextFile("E://test.txt") ;//数据保存到文件中  	
		returnStr2.foreach(new VoidFunction<Tuple2<String,Integer>>() {
			@Override
			public void call(Tuple2<String, Integer> t) throws Exception {
				 System.out.println( t );
			}
		});
		
	}

  

八、reduce操作

  

	//计算统计  元素两两传入到reduce中然后计算统计
	private static void reduce(JavaPairRDD<String, Integer> returnStr2) {
		
		Tuple2<String, Integer> t = returnStr2.reduce(
				new Function2<Tuple2<String,Integer>, Tuple2<String,Integer>,  Tuple2<String,Integer>>() {
					@Override
					public Tuple2<String, Integer> call( Tuple2<String, Integer> v1,  Tuple2<String, Integer> v2) throws Exception {
						 System.out.println( v1 +"  "+v2 );
						
						return new Tuple2<String, Integer>(v1._1+v2._1,v1._2+v2._2()  );
					}
					
			
		} );
		System.out.println("reduce结果 :"+t._1 +"  "+t._2);
	}

 

九、reduceBykey使用

  

	//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中
	public static JavaPairRDD<String, Integer> reduceByKey(JavaPairRDD<String, Integer> rdd ) {
		
		JavaPairRDD<String, Integer> counts = rdd.reduceByKey(
				new Function2<Integer, Integer, Integer>() {//泛型分别是 :两个计算参数  ,最后是返回值 
			
			@Override
			public Integer call(Integer i1, Integer i2) {//每次把key相同的数据,与上一次执行的结果,依次传进来计算
				System.out.println(i1+" == "+i2);
				return i1 + i2;
			}
		});
		printPair(counts );
		return counts;
	}

 

十 flatMap使用

  

	//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中
	public static JavaRDD<String> flatMap(JavaRDD<String> rdd  ) {
		JavaRDD<String> lineLengths = rdd.flatMap( //返回值是输出的类型
				new FlatMapFunction<String, String>() { //第一个参数是输入,第二个参数是输出
			@Override
			public Iterable<String> call(String str) throws Exception {
				 
				return Arrays.asList(SPACE.split(str));
			}
		}) ; 
		
		printStr(lineLengths );
		return lineLengths;
	}

 十一、mapToPair使用

   

	// 将普通的RDD转换为 map数据的RDD方便计算处理  , a  a->1
	public static  JavaPairRDD<String, Integer>   mapToPair(JavaRDD<String> rdd ){
		JavaPairRDD<String, Integer> ones = rdd.mapToPair( new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String t) throws Exception {
				return  new Tuple2<String, Integer>( t, 1);
			}
		}) ; 
		return ones ; 
	}

 十二、 合并Rdd

  

	// 合并rdd 
	public static JavaRDD<Integer> union(JavaRDD<Integer> rdd , JavaRDD<Integer> rdd2) {
		JavaRDD<Integer> lineLengths = rdd.union(rdd2) ; 
		print(lineLengths);
		return lineLengths;
	}

 十三、抽样

   

	// Return a sampled subset of this RDD. 返回一个RDD子集抽样 
	public static JavaRDD<Integer> sample(JavaRDD<Integer> rdd) {
		JavaRDD<Integer> lineLengths = rdd.sample(false , 0.4 ) ; 
		print(lineLengths);
		return lineLengths;
	}

 十四、map使用

  

	// map 源中的每一个元素都进行一个函数操作,生成一个新的RDD ,即每个元素进行一次转换
	public static JavaRDD<Integer> map(JavaRDD<Integer> rdd) {
		JavaRDD<Integer> lineLengths = rdd.map(new Function<Integer, Integer>() {
			@Override
			public Integer call(Integer v1) throws Exception {
				
				return v1+1;
			}
		});
		
		print( lineLengths);
		return lineLengths;
	}

 十五、打印数据 

   

	/**
	 * 如果rdd中数据过多,则调用take获取一部分数据打印
	 * 1 直接rdd.foreach 打印rdd的数据,数据打印在各个executor中
	 * 2 调用rdd.collect.foreach 打印数据,数据打印在driver上 
	 * @param lineLengths
	 */
	public static void print(JavaRDD<Integer> lineLengths ){
//		lineLengths.foreach( new VoidFunction<Integer>() {
//			@Override
//			public void call(Integer t) throws Exception {
//			}
//		});
		System.out.println("开始打印");
		lineLengths.collect().forEach( new Consumer<Integer>(){
			@Override
			public void accept(Integer t) {
				System.out.println( t); 
			}
		});
		System.out.println("结束打印");
	}
	
	public static void printStr(JavaRDD<String> lineLengths ){
		System.out.println("开始打印");
		lineLengths.collect().forEach( new Consumer<String>(){
			@Override
			public void accept(String t) {
				System.out.println( t); 
			}
		});
		System.out.println("结束打印");
	}
	/**
	 * 打印tuble
	 * @param lineLengths
	 */
	public static void printPair( JavaPairRDD<String, Integer> rdd ){
 
		System.out.println("开始打印");
		rdd.collect().forEach( new Consumer<Tuple2<String,Integer>>(){
			@Override
			public void accept(Tuple2<String, Integer> t) {
				System.out.println( t._1() +"  "+t._2() );
			}
			
		});
		System.out.println("结束打印");
	}
	public static void print(List  list) {
		if (list == null || list.size() == 0) {
			return;
		}
		for (int i = 0; i < list.size(); i++) {
			System.out.println(list.get(i));
		}
	}
	
	/**
	 * 打印map的RDD
	 * @param listTuple
	 */
	public static void printTuple(List<Tuple2<String, Integer>> listTuple) {
		
		if (listTuple == null || listTuple.size() == 0) {
			return;
		}

		for (int i = 0; i < listTuple.size(); i++) {
			System.out.println(listTuple.get(i));
		}
	}

	

 十六、过滤

  

	//对每一个元素进行过滤,然后返回 ,false 的数据会被过滤掉
	public static JavaRDD<Integer> filter(JavaRDD<Integer> rdd) {
		JavaRDD<Integer> lineLengths = rdd.filter(new Function<Integer, Boolean>() {
			@Override
			public Boolean call(Integer v1) throws Exception {
				if (v1 != 1) {
					return true;
				}
				return false;
			}
		});
		
		print( lineLengths);
		return lineLengths;
	}

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Spark算子实例maven版

    在本项目中,我们重点关注的是Spark的算子应用,这些算子是Spark编程模型的基础,允许开发者进行数据的转换和操作。以下是Spark的一些关键算子及其功能: 1. **map()**: 这个算子用于将每个输入元素应用一个函数,...

    《Spark 编程基础》 教材讲义 厦门大学 林子雨

    《Spark编程基础》是厦门大学计算机科学系林子雨教授编写的一份教材讲义,主要针对2018年1月的课程版本。这本教材深入浅出地介绍了大数据处理框架Spark的核心概念、设计原理以及实际应用。通过涵盖多个章节的内容,...

    spark基础知识

    在本资料包中,我们将围绕"Spark基础知识"进行探讨,包括Spark的架构、RDD概念、Spark SQL以及Spark与Scala、Hadoop的关联。 首先,Spark的架构基于“计算向数据移动”的理念,它由Driver、Executor和Cluster ...

    Spark大数据实例开发教程-书签-完整版

    2. **RDD**:RDD是Spark中最基础的数据抽象,它是只读的、分区的、可容错的数据集合。通过RDD,Spark实现了对大规模数据的并行处理。 3. **Spark SQL**:Spark SQL是Spark用于结构化数据处理的模块,它将SQL查询与...

    hadoop+spark机器学习实例

    在大数据分析领域,Hadoop和Spark是两个至关重要的工具,它们共同构成了高效处理海量数据的基础架构。本实例“HadoopSparkExampler”旨在通过实际操作,帮助用户理解如何结合使用这两个技术进行机器学习任务。 ...

    《Spark编程基础及项目实践》课后习题及答案3.pdf

    《Spark编程基础及项目实践》课程的课后习题涵盖了Spark的核心概念和关键特性,旨在帮助学生深入理解和应用Spark框架。以下是对习题内容的详细解释: 1. Spark的运行架构: - Driver Program:是应用程序的主要...

    《Spark编程基础及项目实践》课后答案.zip

    《Spark编程基础及项目实践》课程的课后答案涵盖了Spark的核心概念、主要功能以及在实际项目中的应用。这个压缩包包含的资源旨在帮助学生深入理解并掌握Spark编程的关键知识点。 一、Spark概述 Spark是大数据处理...

    spark基础知识.zip

    这个"spark基础知识.zip"压缩包包含了多个PDF文档,覆盖了Spark的各个方面,从基础入门到高级实践,适合对Spark感兴趣或正在学习Spark的人员。 1. **Spark大数据架构概述** (1 Spark大数据架构概述.pdf) - Spark的...

    9.SparkGraphX介绍及实例.pdf

    2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...

    sparkcore相关实例

    Spark Core是Apache Spark的核心组件,它是Spark所有其他模块的基础,包括Spark SQL、Spark Streaming和MLlib等。这个压缩包文件“spark-core”很可能包含了与Spark Core相关的代码示例或者文档,帮助用户理解和学习...

    基于Scala实现的Spark算法实例+源代码+文档说明

    - 不懂运行,下载完可以私聊问...3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------

    spark机器学习简单实例文档

    这个文档可能涵盖了Spark MLlib库中的基础到进阶的机器学习算法实现。MLlib是Apache Spark的核心组件之一,提供了丰富的机器学习算法,包括分类、回归、聚类、协同过滤等,同时也支持模型评估和调优。 首先,我们来...

    大数据处理平台Spark基础实践研究.pdf

    本篇研究文献着重探讨了Spark的基础实践,并对其内部工作原理、应用案例进行了深入分析。 首先,Spark的出现解决了传统大数据处理方法中存在的效率和实时性问题。它是一个基于内存计算的并行计算框架,这使其在处理...

    Spark 入门实战系列

    2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...

    Python大数据之Spark编程基础与提升视频课程课件PPT模板.pptx

    通过实例讲解如何使用Spark MLlib构建机器学习模型,包括分类、回归、聚类等任务。 ### 第5章 企业项目实战:使用Spark机器学习库构建电影推荐引擎 5-1 spark机器学习之推荐引擎1 基于Spark的推荐系统设计,包括...

    spark详细教程课件

    1.1.1 Spark基础入门; 1.1.2 Spark集群搭建; 2.1 Spark编程实例; 2.2 简易电影受众系统; 3 Spark计算引擎剖析; 4、Spark应用经验与程序调优; 5.1 SparkSQL与DataFrame; 5.2 SparkSQL程序设计基础; 6.1Spark+...

    Spark大数据商业实战三部曲_内核解密_商业案例_性能调优 实例源码

    1. Spark Core:基础架构,提供任务调度、内存管理、故障恢复等核心功能。 2. Spark SQL:支持SQL查询和DataFrame/Dataset API,方便数据处理。 3. Spark Streaming:基于微批处理的实时流处理。 4. MLlib:机器学习...

    spark开发基础之Scala快餐.pdf

    它的这些特点使得Scala成为了开发Apache Spark的基础语言。 首先,Scala是一门函数式编程语言。函数式编程理念在Scala中得到了体现,它鼓励将程序视为一系列函数的组合。在函数式编程中,函数是一等公民,它们可以...

    8.SparkMLlib(下)--SparkMLlib实战.pdf

    2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...

    8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf

    2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--...

Global site tag (gtag.js) - Google Analytics