`
m635674608
  • 浏览: 5027721 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark2.1.0入门:DStream转换操作

 
阅读更多

DStream转换操作包括无状态转换和有状态转换。
无状态转换:每个批次的处理不依赖于之前批次的数据。
有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和追踪状态变化的转换(updateStateByKey)。

DStream无状态转换操作

下面给出一些无状态转换操作的含义:
* map(func) :对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
* flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
* filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
* repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
* union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
* count():统计源DStream中每个RDD的元素数量;
* reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
* countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
* reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
* join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
* cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
* transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

无状态转换操作实例:我们之前“套接字流”部分介绍的词频统计,就是采用无状态转换,每次统计,都是只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计。

DStream有状态转换操作

对于DStream有状态转换操作而言,当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和追踪状态变化(updateStateByKey)的转换。

滑动窗口转换操作

滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。

sparkstreaming滑动窗口
图 滑动窗口的计算过程

下面给给出一些窗口转换操作的含义:
* window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的DStream;
* countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数;
* reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
* reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数;
* reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
* countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

窗口转换操作实例:

在上一节的“Apache Kafka作为DStream数据源”内容中,在我们已经使用了窗口转换操作,也就是,在KafkaWordCount.scala代码中,你可以找到下面这一行:

val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)

这行代码中就是一个窗口转换操作reduceByKeyAndWindow,其中,Minutes(2)是滑动窗口长度,Seconds(10)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)。reduceByKeyAndWindow中就使用了加法和减法这两个reduce函数,加法和减法这两种reduce函数都是“可逆的reduce函数”,也就是说,当滑动窗口到达一个新的位置时,原来之前被窗口框住的部分数据离开了窗口,又有新的数据被窗口框住,但是,这时计算窗口内单词的词频时,不需要对当前窗口内的所有单词全部重新执行统计,而是只要把窗口内新增进来的元素,增量加入到统计结果中,把离开窗口的元素从统计结果中减去,这样,就大大提高了统计的效率。尤其对于窗口长度较大时,这种“逆函数”带来的效率的提高是很明显的。

updateStateByKey操作

当我们需要在跨批次之间维护状态时,就必须使用updateStateByKey操作。
下面我们就给出一个具体实例。我们还是以前面在“套接字流”部分讲过的NetworkWordCount为例子来介绍,在之前的套接字流的介绍中,我们统计单词词频采用的是无状态转换操作,也就是说,每个批次的单词发送给NetworkWordCount程序处理时,NetworkWordCount只对本批次内的单词进行词频统计,不会考虑之前到达的批次的单词,所以,不同批次的单词词频都是独立统计的。
对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果。
下面,我们来改造一下在套接字流介绍过的NetworkWordCount程序。
请登录Linux系统,打开一个终端,然后,执行下面命令:

  1. cd /usr/local/spark/mycode/streaming //这个streaming目录是之前已经创建好的
  2. mkdir stateful
  3. cd stateful
  4. mkdir -p src/main/scala
  5. cd src/main/scala
  6. vim NetworkWordCountStateful.scala
Shell 命令

上面使用vim编辑器新建了一个NetworkWordCountStateful.scala代码文件,请在里面输入以下代码:

package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
        StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}

保存该文件退出vim编辑器。
这里要对这段代码中新增的updataStateByKey稍微解释一下。Spark Streaming的updateStateByKey可以把DStream中的数据按key做reduce操作,然后对各个批次的数据进行累加。注意,wordDstream.updateStateByKeyInt每次传递给updateFunc函数两个参数,其中,第一个参数是某个key(即某个单词)的当前批次的一系列值的列表(Seq[Int]形式),updateFunc函数中 val currentCount = values.foldLeft(0)(_ + _)的作用(请参考之前章节“fold操作”的介绍),就是计算这个被传递进来的与某个key对应的当前批次的所有值的总和,也就是当前批次某个单词的出现次数,保存在变量currentCount中。传递给updateFunc函数的第二个参数是某个key的历史状态信息,也就是某个单词历史批次的词频汇总结果。实际上,某个单词的历史词频应该是一个Int类型,这里为什么要采用Option[Int]呢?
Option[Int]是类型 Int的容器(请参考之前章节“模式匹配”了解Option类的使用方法),更确切地说,你可以把它看作是某种集合,这个特殊的集合要么只包含一个元素(即单词的历史词频),要么就什么元素都没有(这个单词历史上没有出现过,所以没有历史词频信息)。之所以采用 Option[Int]保存历史词频信息,这是因为,历史词频可能不存在,很多时候,在值不存在时,需要进行回退,或者提供一个默认值,Scala 为Option类型提供了getOrElse方法,以应对这种情况。 state.getOrElse(0)的含义是,如果该单词没有历史词频统计汇总结果,那么,就取值为0,如果有历史词频统计结果,就取历史结果,然后赋值给变量previousCount。最后,当前值和历史值进行求和,并包装在Some中返回。

然后,再次使用vim编辑器新建一个StreamingExamples.scala文件,用于设置log4j日志级别,代码如下:

package org.apache.spark.examples.streaming
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

退出vim编辑器。下面要对代码进行sbt打包编译。这里需要一个simple.sbt文件,使用vim编辑器创建一个:

  1. cd /usr/local/spark/mycode/streaming/stateful
  2. vim simple.sbt
Shell 命令

在simple.sbt中输入以下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"

创建好simple.sbt文件后,退出vim编辑器。然后,执行下面命令:

  1. cd /usr/local/spark/mycode/streaming/stateful
  2. find .
Shell 命令

屏幕上返回的信息,应该是类似下面的文件结构:

.
./src
./src/main
./src/main/scala
./src/main/scala/NetworkWordCountStateful.scala
./src/main/scala/StreamingExamples.scala
./simple.sbt

然后,就可以执行sbt打包编译了,命令如下:

  1. cd /usr/local/spark/mycode/streaming/stateful
  2. /usr/local/sbt/sbt package
Shell 命令

(备注:根据笔者实际测试,在NetworkWordCountStateful.scala代码中使用 StreamingExamples.setStreamingLogLevels() ,还不能保证屏幕上输出我们预期的结果,同时还需要把/usr/local/spark/conf/log4j.properties设置为log4j.rootCategory=WARN, console,这样才可以让屏幕上显示我们预期的结果)
下面可以再设置一下log4j格式,请在终端内输入如下命令:

  1. cd /usr/local/spark/conf
  2. vim log4j.properties #如果不存在,就从log4j.properties.template拷贝一份得到log4j.properties
Shell 命令

打开后,要把其中的rootCategory设置为如下:

log4j.rootCategory=WARN, console

现在就可以输入以下命令启动这个程序:

  1. cd /usr/local/spark/mycode/streaming/stateful
  2. /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCountStateful" /usr/local/spark/mycode/streaming/stateful/target/scala-2.11/simple-project_2.11-1.0.jar
Shell 命令

执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口),这时,你就可以像刚才一样,新打开一个窗口作为nc窗口,启动nc程序:

  1. nc -lk 9999
  2. //在这个窗口中手动输入一些单词
  3. hadoop
  4. spark
  5. hadoop
  6. spark
  7. hadoop
  8. spark
Shell 命令

然后,你切换到刚才的监听窗口,会发现,已经输出了词频统计信息:

-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
(spark,1)
(hadoop,1)

-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
(spark,2)
(hadoop,3)

从词频统计信息可以看出,词频统计是不断累加的,也就是有状态的转换。到此,实验顺利结束!

 

http://dblab.xmu.edu.cn/blog/1390-2/

分享到:
评论

相关推荐

    spark2.1.0-bin-hadoop2.7

    4. 流处理改进:Spark Streaming引入了新的DStream操作,如join和window,增强了实时流处理的能力。 二、Hadoop 2.7集成 Hadoop 2.7是一个稳定且广泛使用的分布式存储和计算框架,其YARN资源管理系统为Spark提供了...

    spark-2.1.0-bin-without-hadoop.tgz.7z

    《Spark 2.1.0 无Hadoop版本详解及应用》 Spark作为一个高效、通用的大数据处理框架,因其强大的并行计算能力、易用性和高效性,在大数据领域备受推崇。Spark 2.1.0是其发展过程中的一个重要版本,它在前一版本的...

    Spark 2.1.0 API(java).chm

    最新版spark2.1.0 java api。如果觉得好,就给个评论,谢谢!!

    spark2.1.0.chm(spark java API)

    《Spark 2.1.0 Java API 深度解析》 Spark,作为一个分布式计算框架,因其高效、灵活和易用的特性,在大数据处理领域深受青睐。Spark 2.1.0版本对Java API进行了全面优化,使得Java开发者能够更加便捷地利用Spark...

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

    spark2.1.0 JAVA API

    Spark 2.1.0是Apache Spark的一个重要版本,它为大数据处理提供了高效、易用的计算框架。在Java API方面,Spark提供了丰富的类库,使得开发者能够利用Java语言便捷地构建分布式数据处理应用。本篇文章将深入探讨...

    2022最新版:CONTENTBERG BLOG V2.1.0主题:内容营销博客.rar

    【CONTENTBERG BLOG V2.1.0】是一款专为内容营销设计的WordPress主题,旨在帮助用户构建专业且引人入胜的博客平台。这款主题的最新版本于2022年发布,包含了多项改进和增强功能,以提升用户体验和优化内容呈现。 一...

    spark-2.1.0-bin-without-hadoop.tgz

    5. **运行Spark**:Spark可以通过命令行接口(shell)或提交应用程序进行操作。Spark Shell提供了交互式的Python(pyspark)和Scala环境,便于快速测试和调试代码。应用程序可以通过`spark-submit`脚本提交到集群。 ...

    编译过的Hadoop2.6.0-cdh5.7.0的spark2.1.0安装包

    此压缩包"Spark2.1.0-bin-2.6.0-cdh5.7.0"是针对Hadoop2.6.0-cdh5.7.0版本编译优化后的Spark2.1.0版本,适用于Cloudera Distribution Including Apache Hadoop (CDH) 5.7.0环境。 1. **Spark核心组件**:Spark的...

    spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz

    Spark 2.1.0是Apache Spark的一个重要版本,它在Hadoop CDH5.7.0环境下进行了编译和打包,为用户提供了在CDH(Cloudera Distribution Including Apache Hadoop)平台上的Spark功能。这个版本的Spark是大数据处理领域...

    apache-carbondata-1.6.1-bin-spark2.1.0-hadoop2.7.2.jar

    apache-carbondata-1.6.1-bin-spark2.1.0-hadoop2.7.2.jar

    spark-2.1.0 api文档

    Spark-2.1.0版本是Spark发展中的一个重要里程碑,它在前一版本的基础上进行了许多优化和改进,提供了更加丰富的API和更强的性能。 在Spark API文档中,主要包含以下几个核心组件和接口: 1. **Spark Core**:这是...

    基于Docker构建的Hadoop开发测试环境,包含Hadoop,Hive,HBase,Spark

    Spark: 2.1.0 Hive: 2.1.1 HBase: 1.2.2 Zookeeper: 3.4.8 基于docker-compose管理镜像和容器,并进行集群的编排 所有软件的二进制包均通过网络下载。其中包含自行编译的Hadoop和Protobuf二进制包,保存在Github上,...

    spark2 安装教程

    本文将详细介绍如何在现有 Spark1.3 的基础上安装并配置 Spark2.1.0,实现两个版本的共存,以适应新老项目的需求。 #### 二、环境准备 在开始安装之前,请确保以下条件已满足: 1. **Java 环境**:Spark 需要 Java...

    Neat Converter Setup 2.1.0 万能文本格式转换工具 word pdf txt等.exe

    Neat Converter Setup 2.1.0 万能文本格式转换工具 word pdf txt等.exe

    apache-carbondata-1.4.0-bin-spark2.1.0-hadoop2.6.0-cdh5.11.1.jar

    carbondata-1.4,spark-2.1,hadoop-2.6.0-cdh5.11.1源码编译

    《大数据分析与内存计算》课程期末考核代码.rar

    Spark: 2.1.0 Eclipse: 3.8 (注意:Eclipse 3.8是一个较旧的版本,考虑使用更新版本) ECharts: 3.4.0 1.3 使用的核心技术 Spark大数据分析框架 MLlib机器学习库 MySQL数据库管理系统 Hadoop生态系统(HDFS, YARN等)...

    《大数据分析与内存计算》课程期末考核素材.rar

    Spark: 2.1.0 Eclipse: 3.8 (注意:Eclipse 3.8是一个较旧的版本,考虑使用更新版本) ECharts: 3.4.0 1.3 使用的核心技术 Spark大数据分析框架 MLlib机器学习库 MySQL数据库管理系统 Hadoop生态系统(HDFS, YARN等)...

    编译 telepresence_2.1.0 :(OpenOffice_4.1.1_x86_zh-CN.exe.002)

    编译 telepresence_2.1.0 第三方依赖库 : OpenOffice_4.0.0_SDK , Apache_OpenOffice_4.1.1_Win_x86_install_zh-CN.exe(两个*.rdb文件)

Global site tag (gtag.js) - Google Analytics