package cn.com.sparkdemo.myspark;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class OnlineWordCount {
public static void main(String[] args) {
/**
* 第一步:配置sparkConf 1.至少2条线程,以为spark streaming
* 应用程序在运行的时候至少有一条线程用于不断接受数据,并且至少有一个
* 线程用户处理接受的数据(否则的话,没有线程用户处理数据,随着时间的推移,内存和磁盘都是不堪重负)
* 2、对于集群而言,每一个Executor一般肯定不止一个Thread,那么对于处理spark Streaming
* 的程序而言,每个Executor一般分配多少个Core比较合适?core最佳个数一般是奇数:5,7
*/
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
"online count");
conf.set("spark.testing.memory", "2147480000");
/**
* 第二步:创建SparkStreamingContext: 这是个SparkStreaming应用程序所有功能的起点和程序调度的核心
* SparkStreamingContext构建基于SparkConf参数,可以基于持久化的SparkStreamingContext内容
* 恢复过来(典型的应用场景是Driver崩溃后重新启动,由于Spark Streaming具有连续7*24)不间断运行特征
* 所有需要在Driver重新启动后继续上一次的状态,此时的状态恢复需要基于曾记的checkPoint) 2、在一个spark
* streaming 应用程序中可以创建若干个SparkStreamContext对象,使用下一个SparkStreamingContext
* 之前需要把前面正在运行的sparkStreamContext对象关闭
* 。因此我们获得一个重大的启发,sparkStreaming框架也只是spark core
* 上的一个应用程序而已,至不过sparkStreaming 框架想运行的话需要spark 工程师写业务逻辑处理代码
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf,
Durations.seconds(6));
/*
* 第三步:创建Spark Streaming输入数据来源input Stream:
* 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 2,
* 在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口
* 的数据(当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Streaming
* 应用程序的运行而言,有无数据其处理流程都是一样的);
* 3,如果经常在每间隔5秒钟没有数据的话不断的启动空的Job其实是会造成调度资源的浪费,因为并没有数据需要发生计算,所以
* 实例的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job;
*/
JavaReceiverInputDStream<String> lines = jsc.socketTextStream(
"127.0.0.1", 9999);
/**
* 第四步: 像对于RDD 编程一样基于DStream进行编程。 DStream 是RDD产生的模版,在spark stream具体发生计算前
* 其实质是 把每一个batch 的操作翻译成对RDD的操作!!
* 对于初始的DStream进行Transformation级别的处理,例如map、filter等告诫函数的编程来进行具体的数据计算:
*
*/
JavaPairDStream<String, Integer> pairs = lines.flatMap(
new FlatMapFunction<String, String>() {
/**
* 第4.1步:讲每一行的字符串拆分成单个的单词
*/
private static final long serialVersionUID = 1L;
public Iterable<String> call(String strs) throws Exception {
return Arrays.asList(strs.split(" "));
}
/**
* 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
*/
}).mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String str) throws Exception {
return new Tuple2<String, Integer>(str, 1);
}
});
JavaPairDStream<String, Integer> words = pairs
.reduceByKey(new Function2<Integer, Integer, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Integer call(Integer i1, Integer i2)
throws Exception {
return i1 + i2;
}
});
/*
* 此处的print并不会直接出发Job的执行,因为现在的一切都是在Spark Streaming框架的控制之下的,对于Spark
* Streaming 而言具体是否触发真正的Job运行是基于设置的Duration时间间隔的
*
* 诸位一定要注意的是Spark Streaming应用程序要想执行具体的Job,对Dtream就必须有output Stream操作,
* output
* Stream有很多类型的函数触发,类print、saveAsTextFile、saveAsHadoopFiles等,最为重要的一个
* 方法是foraeachRDD,因为Spark
* Streaming处理的结果一般都会放在Redis、DB、DashBoard等上面,foreachRDD
* 主要就是用用来完成这些功能的,而且可以随意的自定义具体数据到底放在哪里!!!
*/
words.print();
jsc.start();
jsc.awaitTermination();
jsc.stop();
}
}
控制台输出:
17/02/18 19:50:00 INFO JobScheduler: Finished job streaming job 1487418600000 ms.0 from job set of time 1487418600000 ms
17/02/18 19:50:00 INFO JobScheduler: Total delay: 0.017 s for time 1487418600000 ms (execution: 0.016 s)
17/02/18 19:50:00 INFO ShuffledRDD: Removing RDD 228 from persistence list
17/02/18 19:50:00 INFO BlockManager: Removing RDD 228
17/02/18 19:50:00 INFO MapPartitionsRDD: Removing RDD 227 from persistence list
17/02/18 19:50:00 INFO BlockManager: Removing RDD 227
17/02/18 19:50:00 INFO MapPartitionsRDD: Removing RDD 226 from persistence list
17/02/18 19:50:00 INFO BlockManager: Removing RDD 226
17/02/18 19:50:00 INFO BlockRDD: Removing RDD 225 from persistence list
17/02/18 19:50:00 INFO BlockManager: Removing RDD 225
17/02/18 19:50:00 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[225] at socketTextStream at OnlineWordCount.java:49 of time 1487418600000 ms
17/02/18 19:50:00 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1487418588000 ms)
17/02/18 19:50:00 INFO InputInfoTracker: remove old batch metadata: 1487418588000 ms
相关推荐
- **编程模型及Spark Shell实战:**介绍了Spark的基本编程模型,如RDD(弹性分布式数据集)的概念,并通过具体的示例演示如何使用Spark Shell进行数据处理。 - **IDEA搭建及实战:**针对使用IntelliJ IDEA进行...
该资源是一个关于Spark的入门到精通的Java版本课程,共包括278章,涵盖了大数据Java版本的示例代码。课程主要涉及Spark的基础知识、Scala编程语言、函数式编程、面向对象编程、模式匹配、类型参数、隐式转换与隐式...
Spark的编程模型非常友好,它提供了Python、Java、Scala和R等多语言API,使得不同背景的开发者都能轻松上手。其中,PySpark是Python接口,适合数据科学家和熟悉Python的开发人员;Spark SQL则允许用户通过SQL或者...
《Spark 入门之 Scala 语言解释及示例讲解》则为初学者提供了Scala语言的基础知识,包括基本语法、类型系统、函数式编程特性等,并通过实例演示如何将这些知识应用到Spark开发中。 总的来说,这些资料构成了一个...
二、Spark Java API入门 1. 创建SparkConf配置对象:Spark应用的初始化从创建SparkConf开始,设置应用名、Master地址等。 2. 获取SparkContext:基于配置对象创建SparkContext,它是Spark与集群交互的主要接口。 3. ...
Spark-book压缩包可能包含了更深入的Spark教程或者示例代码,涵盖更多Spark的功能和使用场景,如数据加载、转换、聚合、过滤等操作,以及如何使用Spark SQL进行结构化数据处理,甚至可能涉及Spark Streaming的实时...
然后编辑spark-env.sh文件,配置JAVA_HOME、SCALA_HOME、SPARK_MASTER_IP、SPARK_LOCAL_IP、SPARK_WORKER_MEMORY以及HADOOP_CONF_DIR等参数。 4. **启动与停止Spark**:启动Spark集群使用start-all.sh脚本,停止使用...
4. **Spark Streaming**:学习如何使用Spark Streaming处理实时数据流。 5. **MLlib**:了解Spark MLlib中的机器学习算法及其应用场景。 6. **GraphX**:掌握使用GraphX进行图形处理的方法。 7. **部署与集群管理**...
本教程将基于Java语言,介绍如何使用Spark进行简单的词频统计(WordCount)操作,这也是Spark入门的经典示例。 首先,我们需要了解Spark的基本架构。Spark的核心概念是弹性分布式数据集(Resilient Distributed ...
使用 Java / Scala 运行 spark Jobs 单元测试 Spark 1.0 版本前的应用程序迁移 下一步 Spark Streaming Spark Streaming 概述 一个简单的示例 基本概念 依赖 初始化 StreamingContext Discretized ...
此外,本书还介绍如何将函数传递给Spark,并分别针对Python、Scala和Java语言给出了示例。 处理键值对(Key-Value Pairs)的章节揭示了在Spark中处理分布式的键值对数据的重要性。在这一章节中,作者阐述了如何创建...
### Apache Spark 入门知识点详解 #### 一、Apache Spark 概述 - **Apache Spark** 是一个开源的大数据处理框架,它提供了高速的数据处理能力,适用于大规模数据集的应用场景。 - Spark 支持多种计算模型,包括...
总的来说,《Spark快速大数据分析》这本书将引导读者深入理解Spark的原理和实践,通过丰富的示例和实践经验,帮助读者快速掌握大数据分析的核心技能,从而在实际工作中更有效地利用Spark处理大规模数据。
接下来,入门程序示例是理解Spark核心概念的关键。Spark主要通过Scala、Java、Python和R语言接口提供API,这些API允许用户创建DataFrame、RDD(弹性分布式数据集)并执行计算。例如,你可以创建一个简单的Spark程序...
- **WordCount**:Spark的经典入门示例,统计文本文件中的单词出现次数,展示了RDD(弹性分布式数据集)的基本操作。 - **JavaSparkSQLExample**:利用Spark SQL执行SQL查询,读取和写入数据到Hive表或Parquet文件...
- **快速例子**:通过一个快速示例介绍了如何使用Spark Streaming处理实时数据流。 - **基本概念**:讲解了Spark Streaming的核心概念,如DStreams(离散化流)和窗口操作。 - **初始化StreamingContext**:介绍了...
下面是一些简单的入门示例: **5.1 Hadoop 伪分布式环境配置示例** ```xml <!-- Hadoop 伪分布式环境配置示例 --> <!-- 编辑 /etc/hadoop/core-site.xml --> <name>fs.defaultFS <value>hdfs://localhost:...