此处代码还有三个功能
1、spark流式文件处理
2、全局统计
3、广播变量
以下代码可运行
package com.sunbin.stream; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; /** * 模拟统计最近20秒内 读取的单词的个数 * * * @author root * */ public class WindowOnStreaming { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WindowOnStreaming"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); jsc.checkpoint("checkpoint"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999); List<Tuple2<String, Boolean>> blacklist = new ArrayList<Tuple2<String, Boolean>>(); blacklist.add(new Tuple2<String, Boolean>("zhangsan2", true)); blacklist.add(new Tuple2<String, Boolean>("lisi2", true)); /** * 黑名单放入广播变量 */ final Broadcast<List<Tuple2<String, Boolean>>> blacklistRDD = jsc.sparkContext().broadcast(blacklist); /** * 广播变量在transform中处理 */ JavaDStream<String> filterlines = lines.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { public JavaRDD<String> call(JavaRDD<String> str) throws Exception { JavaRDD<String> strretu = str.filter(new Function<String, Boolean>() { public Boolean call(String arg0) throws Exception { return !blacklistRDD.getValue().contains(new Tuple2<String, Boolean>(arg0, true)); } }); return strretu; } }); JavaPairDStream<String, Integer> words = filterlines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Iterable<Tuple2<String, Integer>> call(String lines)throws Exception { ArrayList<Tuple2<String, Integer>> tuplelist = new ArrayList<Tuple2<String,Integer>>(); String[] split = lines.split(","); for(String word : split){ tuplelist.add(new Tuple2<String, Integer>(word, 1)); } System.out.println("------读取了一次-----"); return tuplelist; } }); /** * 全局统计,使用reduceByKeyAndWindow,此次必须配合checkpoint使用 */ JavaPairDStream<String, Integer> reduceByKeyAndWindow = words.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }, new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1-v2; } /** * 每10秒计算最30秒的值 */ }, Durations.seconds(20), Durations.seconds(10)); /** * 为了方便测试: * 窗口宽度:20秒 * 窗口滑动:10秒 * 我们输入数据的时候: * 第一次5秒输入:a,b,c * 第二次5秒输入:d,e,f * 第三次5秒不输入 * 第四次5秒不输入 * * 然后看文件,观察在第30秒的时候是不是文件中的数据重新都没有 * * 下面我们将结果写入文件,方便查看 */ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); String format = sdf.format(new Date()); reduceByKeyAndWindow.print(); // reduceByKeyAndWindow.dstream().saveAsTextFiles("savedata/prefix"+format+"--", "txt"); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sunbin</groupId> <artifactId>stream</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>stream</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <!-- Maven 自带的中央仓库使用的Id为central 如果其他的仓库声明也是用该Id就会覆盖中央仓库的配置 --> <id>cdh</id> <name>cdh</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <layout>default</layout> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.5.0-cdh5.8.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-assembly_2.10</artifactId> <version>1.6.0-cdh5.8.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.0-cdh5.8.3</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>sparksql.dataframe.CreateDataFrameFromHive</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
相关推荐
接着,通过Spark Streaming读取实时数据流,与黑名单数据集进行匹配。匹配过程中,可以采用Join操作或自定义函数来实现。 **5. 实现步骤** 1) **设置环境**:确保已经安装了Spark、Scala和Hadoop等相关依赖。 2) **...
【Spark技术实践——词频统计】在大数据领域,Spark作为一种高效的数据处理框架,以其快速、通用和可扩展性而受到广泛关注。本实践旨在基于已经搭建的Hadoop平台,利用Spark组件进行文本词频统计,以此深入理解Scala...
统计本地文件中单词出现次数 二.操作流程 1.读取外部文件创建JavaRDD; 2.通过flatMap转化操作切分字符串,获取单词新JavaRDD; 3.通过mapToPair,以key为单词,value统一为1的键值JavaPairRDD; 4.通过reduceByKey...
在IT领域,尤其是在大数据分析和处理中,Apache Spark是一个广泛使用的分布式计算框架。这个场景涉及到对电影评分数据的统计分析,我们主要会关注三个文件:`movies.dat`, `ratings.dat`, 和 `users.dat`,这些文件...
基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的...
基于Spark对全国历史气象数据的分析 用PySpark处理数据 将所有文件读为一个****RDD rdd = sc.wholeTextFiles("file:///" + os.getcwd() + "/china/") **数据清洗 ** 去除字母, -9999等无效数据 进行计算、排序等...
【标题】:“基于Spark的网易云音乐数据分析”项目是一个毕业设计,主要利用Apache Spark进行大规模音乐数据的处理和分析。这个项目提供了完整的源代码,确保能够运行,为学习和研究大数据处理提供了一个实用的实例...
Spark是Apache软件基金会旗下的开源大数据处理框架,由加州大学伯克利分校的AMP实验室开发,是基于内存计算的大数据并行处理系统。它提供了高层次的APIs,比如Java、Scala、Python、R等,用于数据挖掘、机器学习、...
Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据...
该系统利用Spark的微批处理能力,通过TCP套接字接收实时数据流,并根据预定义的黑名单对数据进行过滤。在设计中,黑名单以广播变量的形式在集群中分发,以减少数据传输并提高处理效率。 系统的核心在于使用`...
《基于Java Spark的淘宝大数据分析可视化系统》 在当今数据驱动的时代,大数据分析与可视化已经成为企业决策的关键工具。本项目“源码地java spark淘宝大数据分析可视化系统”提供了一个全面的解决方案,它结合了...
本文提出了一种基于Spark Streaming技术的实时数据处理系统,旨在解决实时数据处理问题,提高数据处理的实时性。 二、Spark Streaming技术与实时数据处理系统设计 Spark Streaming是Apache Spark用于实时数据流处理...
该项目是大三下学期的课程设计,使用的数据集来自知名数据网站 Kaggle 的 tmdb-movie-metadata 电影数据集,以Python为编程语言,使用大数据框架Spark对数据进行了预处理,然后分别从多个方面对数据进行了分类和分析...
《Spark SQL在法律服务网站数据分析中的应用》 Spark SQL是Apache Spark的重要组件,它将SQL查询语言与大数据处理相结合,使得非程序员也能轻松地对大规模数据进行分析。本实训指导书将带你深入理解如何利用Spark ...
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
### Spark或MR引擎插入的数据,Hive表查询数据为0的问题解析 #### 问题背景与现象 在大数据处理场景中,经常会遇到使用不同执行引擎(如Spark、MapReduce (MR) 或 Tez)进行数据处理的情况。其中一种常见的问题是...
在本项目实战中,我们将探讨如何使用Java编程语言,结合Spark和Hive,将Hive中的数据高效地导入到ElasticSearch(ES)中,并利用ES的别名机制实现数据更新的平滑过渡。以下是对这个流程的详细解析: 1. **Hive数据...
基于Spark的咖啡连锁店数据处理分析系统开题报告 本报告旨在设计一个基于Spark的咖啡连锁店数据处理分析系统,以帮助咖啡连锁店提高销量和利润。本系统将使用Spark作为计算框架,以Hadoop平台作为数据存储,以HDFS...
Scala和Spark是大数据分析领域中的两个重要工具,它们在处理大规模数据时表现出强大的性能和灵活性。Scala是一种静态类型的函数式编程语言,而Spark是一个分布式计算框架,尤其适合于大数据处理和分析。本教程将深入...