`
gaojingsong
  • 浏览: 1182749 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【JAVA之spark streaming入门示例】

阅读更多

package cn.com.sparkdemo.myspark;

 

import java.util.Arrays;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

 

import scala.Tuple2;

 

public class OnlineWordCount {

 

public static void main(String[] args) {

/**

* 第一步:配置sparkConf 1.至少2条线程,以为spark streaming

* 应用程序在运行的时候至少有一条线程用于不断接受数据,并且至少有一个

* 线程用户处理接受的数据(否则的话,没有线程用户处理数据,随着时间的推移,内存和磁盘都是不堪重负)

* 2、对于集群而言,每一个Executor一般肯定不止一个Thread,那么对于处理spark Streaming

* 的程序而言,每个Executor一般分配多少个Core比较合适?core最佳个数一般是奇数:5,7

*/

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(

"online count");

conf.set("spark.testing.memory", "2147480000");

/**

* 第二步:创建SparkStreamingContext: 这是个SparkStreaming应用程序所有功能的起点和程序调度的核心

* SparkStreamingContext构建基于SparkConf参数,可以基于持久化的SparkStreamingContext内容

* 恢复过来(典型的应用场景是Driver崩溃后重新启动,由于Spark Streaming具有连续7*24)不间断运行特征

* 所有需要在Driver重新启动后继续上一次的状态,此时的状态恢复需要基于曾记的checkPoint) 2、在一个spark

* streaming 应用程序中可以创建若干个SparkStreamContext对象,使用下一个SparkStreamingContext

* 之前需要把前面正在运行的sparkStreamContext对象关闭

* 。因此我们获得一个重大的启发,sparkStreaming框架也只是spark core

* 上的一个应用程序而已,至不过sparkStreaming 框架想运行的话需要spark 工程师写业务逻辑处理代码

*/

JavaStreamingContext jsc = new JavaStreamingContext(conf,

Durations.seconds(6));

/*

* 第三步:创建Spark Streaming输入数据来源input Stream:

* 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 2,

* 在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口

* 的数据(当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Streaming

* 应用程序的运行而言,有无数据其处理流程都是一样的);

* 3,如果经常在每间隔5秒钟没有数据的话不断的启动空的Job其实是会造成调度资源的浪费,因为并没有数据需要发生计算,所以

* 实例的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job;

*/

JavaReceiverInputDStream<String> lines = jsc.socketTextStream(

"127.0.0.1", 9999);

/**

* 第四步: 像对于RDD 编程一样基于DStream进行编程。 DStream 是RDD产生的模版,在spark stream具体发生计算前

* 其实质是 把每一个batch 的操作翻译成对RDD的操作!!

* 对于初始的DStream进行Transformation级别的处理,例如map、filter等告诫函数的编程来进行具体的数据计算:

*/

 

JavaPairDStream<String, Integer> pairs = lines.flatMap(

new FlatMapFunction<String, String>() {

 

/**

* 第4.1步:讲每一行的字符串拆分成单个的单词

*/

private static final long serialVersionUID = 1L;

 

public Iterable<String> call(String strs) throws Exception {

 

return Arrays.asList(strs.split(" "));

}

 

/**

* 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)

*/

}).mapToPair(new PairFunction<String, String, Integer>() {

 

public Tuple2<String, Integer> call(String str) throws Exception {

 

return new Tuple2<String, Integer>(str, 1);

}

});

 

JavaPairDStream<String, Integer> words = pairs

.reduceByKey(new Function2<Integer, Integer, Integer>() {

 

/**

*/

private static final long serialVersionUID = 1L;

 

public Integer call(Integer i1, Integer i2)

throws Exception {

 

return i1 + i2;

}

 

});

 

/*

* 此处的print并不会直接出发Job的执行,因为现在的一切都是在Spark Streaming框架的控制之下的,对于Spark

* Streaming 而言具体是否触发真正的Job运行是基于设置的Duration时间间隔的

* 诸位一定要注意的是Spark Streaming应用程序要想执行具体的Job,对Dtream就必须有output Stream操作,

* output

* Stream有很多类型的函数触发,类print、saveAsTextFile、saveAsHadoopFiles等,最为重要的一个

* 方法是foraeachRDD,因为Spark

* Streaming处理的结果一般都会放在Redis、DB、DashBoard等上面,foreachRDD

* 主要就是用用来完成这些功能的,而且可以随意的自定义具体数据到底放在哪里!!!

*/

words.print();

jsc.start();

 

jsc.awaitTermination();

jsc.stop();

 

}

}



 

控制台输出:

17/02/18 19:50:00 INFO JobScheduler: Finished job streaming job 1487418600000 ms.0 from job set of time 1487418600000 ms

17/02/18 19:50:00 INFO JobScheduler: Total delay: 0.017 s for time 1487418600000 ms (execution: 0.016 s)

17/02/18 19:50:00 INFO ShuffledRDD: Removing RDD 228 from persistence list

17/02/18 19:50:00 INFO BlockManager: Removing RDD 228

17/02/18 19:50:00 INFO MapPartitionsRDD: Removing RDD 227 from persistence list

17/02/18 19:50:00 INFO BlockManager: Removing RDD 227

17/02/18 19:50:00 INFO MapPartitionsRDD: Removing RDD 226 from persistence list

17/02/18 19:50:00 INFO BlockManager: Removing RDD 226

17/02/18 19:50:00 INFO BlockRDD: Removing RDD 225 from persistence list

17/02/18 19:50:00 INFO BlockManager: Removing RDD 225

17/02/18 19:50:00 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[225] at socketTextStream at OnlineWordCount.java:49 of time 1487418600000 ms

17/02/18 19:50:00 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1487418588000 ms)

17/02/18 19:50:00 INFO InputInfoTracker: remove old batch metadata: 1487418588000 ms



 

  • 大小: 83.4 KB
  • 大小: 66.3 KB
0
0
分享到:
评论

相关推荐

    spark入门实战

    - **编程模型及Spark Shell实战:**介绍了Spark的基本编程模型,如RDD(弹性分布式数据集)的概念,并通过具体的示例演示如何使用Spark Shell进行数据处理。 - **IDEA搭建及实战:**针对使用IntelliJ IDEA进行...

    Spark从入门到精通Java版

    该资源是一个关于Spark的入门到精通的Java版本课程,共包括278章,涵盖了大数据Java版本的示例代码。课程主要涉及Spark的基础知识、Scala编程语言、函数式编程、面向对象编程、模式匹配、类型参数、隐式转换与隐式...

    spark入门相关文档,适用于初学者

    Spark的编程模型非常友好,它提供了Python、Java、Scala和R等多语言API,使得不同背景的开发者都能轻松上手。其中,PySpark是Python接口,适合数据科学家和熟悉Python的开发人员;Spark SQL则允许用户通过SQL或者...

    Spark实战开发

    《Spark 入门之 Scala 语言解释及示例讲解》则为初学者提供了Scala语言的基础知识,包括基本语法、类型系统、函数式编程特性等,并通过实例演示如何将这些知识应用到Spark开发中。 总的来说,这些资料构成了一个...

    spark全案例

    二、Spark Java API入门 1. 创建SparkConf配置对象:Spark应用的初始化从创建SparkConf开始,设置应用名、Master地址等。 2. 获取SparkContext:基于配置对象创建SparkContext,它是Spark与集群交互的主要接口。 3. ...

    spark-windows helloword入门例子

    Spark-book压缩包可能包含了更深入的Spark教程或者示例代码,涵盖更多Spark的功能和使用场景,如数据加载、转换、聚合、过滤等操作,以及如何使用Spark SQL进行结构化数据处理,甚至可能涉及Spark Streaming的实时...

    实时计算框架:Spark集群搭建与入门案例.docx

    然后编辑spark-env.sh文件,配置JAVA_HOME、SCALA_HOME、SPARK_MASTER_IP、SPARK_LOCAL_IP、SPARK_WORKER_MEMORY以及HADOOP_CONF_DIR等参数。 4. **启动与停止Spark**:启动Spark集群使用start-all.sh脚本,停止使用...

    spark入门教程

    4. **Spark Streaming**:学习如何使用Spark Streaming处理实时数据流。 5. **MLlib**:了解Spark MLlib中的机器学习算法及其应用场景。 6. **GraphX**:掌握使用GraphX进行图形处理的方法。 7. **部署与集群管理**...

    spark计数demo

    本教程将基于Java语言,介绍如何使用Spark进行简单的词频统计(WordCount)操作,这也是Spark入门的经典示例。 首先,我们需要了解Spark的基本架构。Spark的核心概念是弹性分布式数据集(Resilient Distributed ...

    Spark 2.0.2 Spark 2.2 中文文档 本资源为网页,不是PDF

    使用 Java / Scala 运行 spark Jobs 单元测试 Spark 1.0 版本前的应用程序迁移 下一步 Spark Streaming Spark Streaming 概述 一个简单的示例 基本概念 依赖 初始化 StreamingContext Discretized ...

    一个关于spark介绍的PDF文件

    ### Apache Spark 入门知识点详解 #### 一、Apache Spark 概述 - **Apache Spark** 是一个开源的大数据处理框架,它提供了高速的数据处理能力,适用于大规模数据集的应用场景。 - Spark 支持多种计算模型,包括...

    spark快速大数据分析

    总的来说,《Spark快速大数据分析》这本书将引导读者深入理解Spark的原理和实践,通过丰富的示例和实践经验,帮助读者快速掌握大数据分析的核心技能,从而在实际工作中更有效地利用Spark处理大规模数据。

    spark的详细资料

    接下来,入门程序示例是理解Spark核心概念的关键。Spark主要通过Scala、Java、Python和R语言接口提供API,这些API允许用户创建DataFrame、RDD(弹性分布式数据集)并执行计算。例如,你可以创建一个简单的Spark程序...

    SparkPlayground:本项目包含几个spark示例,部分是网上的经典例子,部分是我自己写的例子

    - **WordCount**:Spark的经典入门示例,统计文本文件中的单词出现次数,展示了RDD(弹性分布式数据集)的基本操作。 - **JavaSparkSQLExample**:利用Spark SQL执行SQL查询,读取和写入数据到Hive表或Parquet文件...

    Spark编程指南简体中文版

    - **快速例子**:通过一个快速示例介绍了如何使用Spark Streaming处理实时数据流。 - **基本概念**:讲解了Spark Streaming的核心概念,如DStreams(离散化流)和窗口操作。 - **初始化StreamingContext**:介绍了...

    Hadoop 和 Spark 流行的大数据处理框架.docx

    下面是一些简单的入门示例: **5.1 Hadoop 伪分布式环境配置示例** ```xml &lt;!-- Hadoop 伪分布式环境配置示例 --&gt; &lt;!-- 编辑 /etc/hadoop/core-site.xml --&gt; &lt;name&gt;fs.defaultFS &lt;value&gt;hdfs://localhost:...

Global site tag (gtag.js) - Google Analytics