`

spark

 
阅读更多
1.简单关于气温topk小例子。
package jspark;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/**
* @author liyu
*
*/
public class HT {

public static void main(String[] args) {
// TODO Auto-generated method stub
if (args.length < 1) {
System.err.println("Usage: HT <file>");
System.exit(1);
}
for (String string : args) {
System.out.println("============="+string);
}
SparkConf sparkConf = new SparkConf().setAppName("ht1");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

JavaRDD<String> lines = ctx.textFile(args[0], 2);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s);
}
});
System.out.println("----------------------------------------------");
JavaPairRDD<String, Integer> km = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
int  airTemperature = 0;
String year = s.substring(15,19);
if(s.charAt(87) == '+') {
airTemperature = Integer.parseInt(s.substring(88, 92));
} else {
airTemperature = Integer.parseInt(s.substring(87, 92));
}  
return new Tuple2<String, Integer>(year, airTemperature);
}
});

//km.saveAsTextFile(args[1]);
/*List<Tuple2<String, Integer>> output = km.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}*/
System.out.println("reduce--------------------");
/*JavaRDD<Tuple2<Integer, String>>  jdd =  km.map(new Function<Tuple2<String,Integer>, Tuple2<Integer,String>>() {
public Tuple2<Integer,String> call(Tuple2<String, Integer> tuple) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<Integer, String>(tuple._2, tuple._1);
}
});
jdd.sortBy(new Function<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> call(Tuple2<Integer, String> key) throws Exception {
// TODO Auto-generated method stub

return null;
}


} ,true, 3);*/

JavaPairRDD<String, Integer> jrd = km.groupByKey().mapValues(new Function<Iterable<Integer>, Integer>() {

@Override
public Integer call(Iterable<Integer> v) throws Exception {
// TODO Auto-generated method stub
Integer max = 0;
for(Integer x:v){
max = x>max?x:max;
}
return max;
}
});
jrd.sortByKey().saveAsTextFile(args[1]);
List<Tuple2<String, Integer>> output1 = jrd.sortByKey(true).collect();
for (Tuple2<?,?> tuple : output1) {
System.out.println(tuple._1() + ": " + tuple._2());
}
//采用reduceByKey
/*JavaPairRDD<String, Integer> counts = km.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer k1, Integer k2) throws Exception {
// TODO Auto-generated method stub
return k1+k2;
}

});
counts.saveAsTextFile(args[1]);
List<Tuple2<String, Integer>> output1 = counts.collect();
for (Tuple2<?,?> tuple : output1) {
System.out.println(tuple._1() + ": " + tuple._2());
}*/
ctx.stop();



}
}
下面是处理气温的数据。
./bin/spark-submit --master spark://192.168.1.26:7077 --class  jspark.HT  --name ht1  --executor-memory 400M --driver-memory 512M  --jars /opt/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar   /opt/a/jwc.jar  "/opt/a/190*" "/opt/spark-1.3.0-bin-hadoop2.4/test/dddd14"
此例子是处理k=1最大值(最小值和最大值的处理方式一样,定义一个变量min = x>min?min:x即可。
2若要求top2或者topk,将返回值的值变成数组,里面的实现可以是各种排序
JavaPairRDD<String, Integer[]> jrd = km.groupByKey().mapValues(new Function<Iterable<Integer>, Integer[]>() {
int arr[] = new int[2];
@Override
public Integer[] call(Iterable<Integer> v) throws Exception {
// TODO Auto-generated method stub
//冒泡排序
//快速排序
                                           //最小堆处理
return null;
}
});
分享到:
评论

相关推荐

    大数据Spark纯净版安装包,用于快速集成Hive on Spark

    Spark纯净版安装包是一个用于安装Apache Spark的软件包,该软件包提供了Spark的基本功能和组件,但不包含任何额外的扩展或依赖项。纯净版安装包旨在提供一个轻量级、简单易用的Spark安装选项,适用于用户希望快速...

    spark安装包+spark实验安装软件

    Spark是Apache基金会下的一个开源大数据处理框架,以其高效、易用和可扩展性著称。Spark的核心设计理念是基于内存计算,极大地提升了数据处理速度。在本压缩包中,"spark-3.4.0-bin-without-hadoop"是Spark的一个预...

    springboot与spark整合开发, 练习spark api

    在现代大数据处理领域,Spark和Spring Boot的整合已经成为一种常见的技术组合。Spark作为一个快速、通用且可扩展的大数据处理框架,而Spring Boot则为构建微服务提供了简洁、高效的解决方案。本篇文章将深入探讨如何...

    SparkDemo.rar

    《Spark技术深度解析:从SparkCount到SparkSQL与SparkStreaming》 Spark,作为大数据处理领域的重要框架,以其高效、易用的特点受到了广泛的关注。在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark...

    spark相关jar包

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和可扩展的特性而闻名。Spark的核心设计是基于内存计算,它极大地提高了数据处理的速度,使得数据科学家和开发人员能够在处理大规模数据集时获得...

    Spark官方中文文档

    Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点...

    spark笔记整理文档

    《Spark技术深度解析》 Spark,作为大数据处理领域的重要框架,以其高效、易用和弹性伸缩等特性,被广泛应用于大规模数据处理、实时计算、机器学习和图形处理等多个场景。本篇笔记将深入探讨Spark的核心概念、架构...

    spark-2.3.3.tgz

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性而闻名。Spark 2.3.3是该框架的一个稳定版本,提供了丰富的数据处理功能,包括批处理、交互式查询(Spark SQL)、实时流处理(Spark...

    spark2 安装教程

    ### Spark2.0安装教程与Spark1.3共存配置详解 #### 一、引言 随着大数据技术的发展,Apache Spark 已成为处理大规模数据集的重要工具之一。然而,在实际应用过程中,不同的项目可能需要使用不同版本的 Spark 来...

    Spark和TiDB (Spark on TiDB)

    SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...

    spark_jar包

    Spark_JAR包是Apache Spark项目的核心组件之一,它包含了运行Spark应用程序所必需的类库和依赖。Spark作为一个快速、通用且可扩展的数据处理框架,它为大数据处理提供了丰富的API,支持Scala、Java、Python和R等多种...

    spark的Ubuntu下的安装包

    spark-3.5.0-bin-hadoop3.tgz 是Apache Spark的一个特定版本,针对Hadoop 3.x版本进行了优化和构建。Apache Spark是一个强大的分布式计算系统,用于大数据处理和分析。它提供了高效的数据处理能力,支持多种编程语言...

    spark3.0入门到精通

    ├─Spark-day01 │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种...

    Spark 入门实战系列

    Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....

    适配CDH6.3.2的Spark3.2.2

    Spark3.2.2是Apache Spark的一个重要版本,它提供了许多新特性和性能优化,而适配CDH6.3.2则意味着该版本的Spark已经经过了与Cloudera Data Hub (CDH) 6.3.2的兼容性验证。CDH是一个流行的Hadoop发行版,包含了...

    spark-2.4.0源码

    Spark是Apache软件基金会下的一个开源大数据处理框架,其2.4.0版本是对早期版本的扩展和优化,提供了更高效、稳定和丰富的功能。Spark的核心设计理念是快速、通用和可扩展的数据处理,它通过内存计算大幅度提升了...

    spark全套视频教程

    《Spark全套视频教程》是一份全面讲解Apache Spark的教育资源,旨在帮助学习者深入理解和掌握这一强大的大数据处理框架。Spark以其高效、易用和多模态处理能力,在大数据领域备受推崇,广泛应用于数据处理、机器学习...

    Flink和Spark比较

    Flink 和 Spark 比较 Flink 和 Spark 都是大数据处理领域中的重要框架,本文将对它们进行比较,分别介绍它们的核心实现、计算模型、硬件需求、数据源集成、性能对比等方面。 核心实现 Apache Spark 是基于 Scala ...

    Spark 编程指南简体中文版.pdf

    Spark 编程指南简体中文版 本资源为 Spark 编程指南简体中文版,涵盖了 Spark 的基本概念、数据处理、流处理、图形处理和 SQL 等方面的内容。以下是该资源中的知识点总结: Spark 基础 * Spark Shell:交互式 ...

    大数据Spark企业级实战

    《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...

Global site tag (gtag.js) - Google Analytics