`
sunbin
  • 浏览: 354562 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

spark 统计黑名单之外的数据次数

 
阅读更多

此处代码还有三个功能

1、spark流式文件处理

2、全局统计

3、广播变量

以下代码可运行

 

package com.sunbin.stream;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;
/**
 * 模拟统计最近20秒内 读取的单词的个数
 * 
 * 
 * @author root
 *
 */
public class WindowOnStreaming {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WindowOnStreaming");
		JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
		jsc.checkpoint("checkpoint");
		JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999);
		
		List<Tuple2<String, Boolean>> blacklist = new ArrayList<Tuple2<String, Boolean>>();
		blacklist.add(new Tuple2<String, Boolean>("zhangsan2", true));
		blacklist.add(new Tuple2<String, Boolean>("lisi2", true));
		/**
		 * 黑名单放入广播变量
		 */
		final Broadcast<List<Tuple2<String, Boolean>>> blacklistRDD = jsc.sparkContext().broadcast(blacklist);
		
		/**
		 * 广播变量在transform中处理
		 */
		JavaDStream<String> filterlines = lines.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
			public JavaRDD<String> call(JavaRDD<String> str) throws Exception {
				JavaRDD<String> strretu = str.filter(new Function<String, Boolean>() {
					public Boolean call(String arg0) throws Exception {
						return !blacklistRDD.getValue().contains(new Tuple2<String, Boolean>(arg0, true));
					}
				});
				return strretu;
			}
		});
		
		JavaPairDStream<String, Integer> words = filterlines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			public Iterable<Tuple2<String, Integer>> call(String lines)throws Exception {
				ArrayList<Tuple2<String, Integer>> tuplelist = new ArrayList<Tuple2<String,Integer>>(); 
				String[] split = lines.split(",");
				for(String word : split){
					tuplelist.add(new Tuple2<String, Integer>(word, 1));
				}
				System.out.println("------读取了一次-----");
				return tuplelist;
			}
		});
		
		/**
		 * 全局统计,使用reduceByKeyAndWindow,此次必须配合checkpoint使用
		 */
		JavaPairDStream<String, Integer> reduceByKeyAndWindow = words.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
			
			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1+v2;
			}
		}, new Function2<Integer, Integer, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1-v2;
			}
			/**
			 *  每10秒计算最30秒的值
			 */
		}, Durations.seconds(20), Durations.seconds(10));
		/**
		 * 为了方便测试:
		 * 窗口宽度:20秒
		 * 窗口滑动:10秒
		 * 我们输入数据的时候:
		 * 第一次5秒输入:a,b,c
		 * 第二次5秒输入:d,e,f
		 * 第三次5秒不输入
		 * 第四次5秒不输入
		 * 
		 * 然后看文件,观察在第30秒的时候是不是文件中的数据重新都没有
		 * 
		 * 下面我们将结果写入文件,方便查看
		 */
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
		String format = sdf.format(new Date());
		reduceByKeyAndWindow.print();
//		reduceByKeyAndWindow.dstream().saveAsTextFiles("savedata/prefix"+format+"--", "txt");
		
		jsc.start();
		jsc.awaitTermination();
		jsc.close();
	}
}

 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.sunbin</groupId>
  <artifactId>stream</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>stream</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
 
 
	<repositories>
		<repository>
			<!-- Maven 自带的中央仓库使用的Id为central 如果其他的仓库声明也是用该Id就会覆盖中央仓库的配置 -->
			<id>cdh</id>
			<name>cdh</name>
			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
			<layout>default</layout>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>
		
	<dependencies>
	
	    <dependency>
	      <groupId>junit</groupId>
	      <artifactId>junit</artifactId>
	      <version>3.8.1</version>
	      <scope>test</scope>
	    </dependency>
		<dependency>
		    <groupId>org.apache.kafka</groupId>
		    <artifactId>kafka_2.10</artifactId>
		    <version>0.9.0.0</version>
		</dependency>
		
		<dependency>
		    <groupId>com.twitter</groupId>
		    <artifactId>parquet-hadoop</artifactId>
		    <version>1.5.0-cdh5.8.3</version>
		</dependency>
		
		<dependency>
			<groupId>mysql</groupId>
    		<artifactId>mysql-connector-java</artifactId>
    		<version>5.1.8</version>
		</dependency>
		
		<dependency>
			<groupId>org.apache.spark</groupId>
		    <artifactId>spark-assembly_2.10</artifactId>
		    <version>1.6.0-cdh5.8.3</version>
		    <scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.6.0-cdh5.8.3</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>
	
	<build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>sparksql.dataframe.CreateDataFrameFromHive</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>  
</project>

 

分享到:
评论

相关推荐

    Spark实现黑名单实时过滤-内含源码以及设计说明书(可以自己运行复现).zip

    接着,通过Spark Streaming读取实时数据流,与黑名单数据集进行匹配。匹配过程中,可以采用Join操作或自定义函数来实现。 **5. 实现步骤** 1) **设置环境**:确保已经安装了Spark、Scala和Hadoop等相关依赖。 2) **...

    大数据技术实践——Spark词频统计

    【Spark技术实践——词频统计】在大数据领域,Spark作为一种高效的数据处理框架,以其快速、通用和可扩展性而受到广泛关注。本实践旨在基于已经搭建的Hadoop平台,利用Spark组件进行文本词频统计,以此深入理解Scala...

    Spark学习---统计文件单词出现次数

    统计本地文件中单词出现次数 二.操作流程 1.读取外部文件创建JavaRDD; 2.通过flatMap转化操作切分字符串,获取单词新JavaRDD; 3.通过mapToPair,以key为单词,value统一为1的键值JavaPairRDD; 4.通过reduceByKey...

    Spark统计电影评分数据:movies.dat,retings.dat,users.dat

    在IT领域,尤其是在大数据分析和处理中,Apache Spark是一个广泛使用的分布式计算框架。这个场景涉及到对电影评分数据的统计分析,我们主要会关注三个文件:`movies.dat`, `ratings.dat`, 和 `users.dat`,这些文件...

    基于Java+spark的离线统计移动端数据分析源码.zip

    基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的...

    人工智能-spark-基于Spark对全国历史气象数据的分析

    基于Spark对全国历史气象数据的分析 用PySpark处理数据 将所有文件读为一个****RDD rdd = sc.wholeTextFiles("file:///" + os.getcwd() + "/china/") **数据清洗 ** 去除字母, -9999等无效数据 进行计算、排序等...

    [毕业设计]基于Spark网易云音乐数据分析 .zip

    【标题】:“基于Spark的网易云音乐数据分析”项目是一个毕业设计,主要利用Apache Spark进行大规模音乐数据的处理和分析。这个项目提供了完整的源代码,确保能够运行,为学习和研究大数据处理提供了一个实用的实例...

    spark-集群与大数据处理

    Spark是Apache软件基金会旗下的开源大数据处理框架,由加州大学伯克利分校的AMP实验室开发,是基于内存计算的大数据并行处理系统。它提供了高层次的APIs,比如Java、Scala、Python、R等,用于数据挖掘、机器学习、...

    Spark 案例测试数据

    Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据...

    通过Python实现基于Spark-Streaming的黑名单实时过滤系统的设计与实现.zip

    该系统利用Spark的微批处理能力,通过TCP套接字接收实时数据流,并根据预定义的黑名单对数据进行过滤。在设计中,黑名单以广播变量的形式在集群中分发,以减少数据传输并提高处理效率。 系统的核心在于使用`...

    源码地java spark淘宝大数据分析可视化系统(源码+数据+报告)址.zip

    《基于Java Spark的淘宝大数据分析可视化系统》 在当今数据驱动的时代,大数据分析与可视化已经成为企业决策的关键工具。本项目“源码地java spark淘宝大数据分析可视化系统”提供了一个全面的解决方案,它结合了...

    基于Spark Streaming的实时数据处理系统设计与实现.pdf

    本文提出了一种基于Spark Streaming技术的实时数据处理系统,旨在解决实时数据处理问题,提高数据处理的实时性。 二、Spark Streaming技术与实时数据处理系统设计 Spark Streaming是Apache Spark用于实时数据流处理...

    基于Spark的电影数据集分析

    该项目是大三下学期的课程设计,使用的数据集来自知名数据网站 Kaggle 的 tmdb-movie-metadata 电影数据集,以Python为编程语言,使用大数据框架Spark对数据进行了预处理,然后分别从多个方面对数据进行了分类和分析...

    实训指导书_使用Spark SQL进行法律服务网站数据分析.zip

    《Spark SQL在法律服务网站数据分析中的应用》 Spark SQL是Apache Spark的重要组件,它将SQL查询语言与大数据处理相结合,使得非程序员也能轻松地对大规模数据进行分析。本实训指导书将带你深入理解如何利用Spark ...

    Spark SQL常见4种数据源详解

    Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...

    spark或mr引擎插入的数据,hive表查询数据为0

    ### Spark或MR引擎插入的数据,Hive表查询数据为0的问题解析 #### 问题背景与现象 在大数据处理场景中,经常会遇到使用不同执行引擎(如Spark、MapReduce (MR) 或 Tez)进行数据处理的情况。其中一种常见的问题是...

    项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)

    在本项目实战中,我们将探讨如何使用Java编程语言,结合Spark和Hive,将Hive中的数据高效地导入到ElasticSearch(ES)中,并利用ES的别名机制实现数据更新的平滑过渡。以下是对这个流程的详细解析: 1. **Hive数据...

    基于Spark的咖啡连锁店数据处理分析系统开题报告.docx

    基于Spark的咖啡连锁店数据处理分析系统开题报告 本报告旨在设计一个基于Spark的咖啡连锁店数据处理分析系统,以帮助咖啡连锁店提高销量和利润。本系统将使用Spark作为计算框架,以Hadoop平台作为数据存储,以HDFS...

    Scala和Spark大数据分析函数式编程、数据流和机器学习

    Scala和Spark是大数据分析领域中的两个重要工具,它们在处理大规模数据时表现出强大的性能和灵活性。Scala是一种静态类型的函数式编程语言,而Spark是一个分布式计算框架,尤其适合于大数据处理和分析。本教程将深入...

Global site tag (gtag.js) - Google Analytics