`
bit1129
  • 浏览: 1072386 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十八】Spark Streaming累加器操作(updateStateByKey)

 
阅读更多

在实时计算的实际应用中,有时除了需要关心一个时间间隔内的数据,有时还可能会对整个实时计算的所有时间间隔内产生的相关数据进行统计。

比如: 对Nginx的access.log实时监控请求404时,有时除了需要统计某个时间间隔内出现的次数,有时还需要统计一整天出现了多少次404,也就是说404监控横跨多个时间间隔。

 

Spark Streaming的解决方案是累加器,工作原理是,定义一个类似全局的可更新的变量,每个时间窗口内得到的统计值都累加到上个时间窗口得到的值,这样这个累加值就是横跨多个时间间隔

 

package spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._

/**
 * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
 * second starting with initial value of word count.
 * Usage: StatefulNetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
 * data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 * `$ nc -lk 9999`
 * and then run the example
 * `$ bin/run-example
 * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
 */
object StatefulNetworkWordCount {
  def main(args: Array[String]) {

    ///函数常量定义,返回类型是Some(Int),表示的含义是最新状态
    ///函数的功能是将当前时间间隔内产生的Key的value集合,加到上一个状态中,得到最新状态
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    ///入参是三元组遍历器,三个元组分别表示Key、当前时间间隔内产生的对应于Key的Value集合、上一个时间点的状态
    ///newUpdateFunc的返回值要求是iterator[(String,Int)]类型的
    val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
      ///对每个Key调用updateFunc函数(入参是当前时间间隔内产生的对应于Key的Value集合、上一个时间点的状态)得到最新状态
      ///然后将最新状态映射为Key和最新状态
      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
    }

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[3]")
    // Create the context with a 5 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    ssc.checkpoint(".")
    // Initial RDD input to updateStateByKey
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream("192.168.26.140", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    
    // Update the cumulative count using updateStateByKey
    // This will give a Dstream made of state (which is the cumulative count of the words)
    //注意updateStateByKey的四个参数,第一个参数是状态更新函数
    val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
      new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 

上面的核心操作时DStream的updateStateByKey函数操作,它接受四个参数,

 

2. DStream.updateStateByKey剖析

方法说明:

 /**
   * Return a new "state" DStream where the state for each key is updated by applying
   * the given function on the previous state of the key and the new values of each key.
   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
   * @param updateFunc State update function. Note, that this function may generate a different
   *                   tuple with a different key than the input key. Therefore keys may be removed
   *                   or added in this way. It is up to the developer to decide whether to
   *                   remember the  partitioner despite the key being changed.
   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
   *                    DStream
   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
   * @param initialRDD initial state value of each key.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
      partitioner: Partitioner, 
      rememberPartitioner: Boolean,
      initialRDD: RDD[(K, S)]
    ): DStream[(K, S)] = {
     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
       rememberPartitioner, Some(initialRDD))
  }
  • initialRDD是(K,S)类型的RDD,它表示一组Key的初始状态,每个(K,S)表示一个Key以及它对应的State状态。 K表示updateStateByKey的Key的类型,比如String,而S表示Key对应的状态(State)类型,在上例中,是Int
  • rememberPartitioner: 表示是否在接下来的Spark Streaming执行过程中产生的RDD使用相同的分区算法
  • partitioner: 分区算法,上例中使用的Hash分区算法,分区数为ssc.sparkContext.defaultParallelism
  • updateFunc是函数常量,类型为(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],表示状态更新函数

(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]如何解读?

入参: 三元组迭代器,三元组中K表示Key,Seq[V]表示一个时间间隔中产生的Key对应的Value集合(Seq类型,需要对这个集合定义累加函数逻辑进行累加),Option[S]表示上个时间间隔的累加值(表示这个Key上个时间点的状态)

出参:二元组迭代器,二元组中K表示Key,S表示当前时间点执行结束后,得到的累加值(即最新状态)

 

 

 

 

 

 

 

 

 

分享到:
评论
1 楼 bo_hai 2016-07-19  
看了你的文章,updateStateByKey 这个方式的使用搞明白啦。谢谢!

相关推荐

    SparkStreaming原理介绍

    - **使用 Accumulators**:在需要全局计数器的场景下,使用累加器可以减少通信开销。 - **合理使用 Checkpointing**:频繁的检查点可能会引入额外的 I/O 开销,应根据实际需求调整检查点的频率。 综上所述,Spark ...

    Spark Streaming解析

    累加器和广播变量是 Spark Streaming 中用于优化和共享数据的工具。累加器用于收集分布式计算中的统计数据,而广播变量则用于缓存只读的大对象。 ##### 4.7 DataFrame and SQL Operations Spark Streaming 还支持...

    基于scala语言的spark操作,包含连接操作mysql,连接hdfs.zip

    累加器是 Spark 中的一种共享变量,常用于实现计数或求和等全局聚合操作。 **9. Spark SQL** Spark SQL 提供了 SQL 查询支持,可以通过 `spark.sql()` 函数执行 SQL 语句,或者将 DataFrame 转换为 `...

    scala开发spark代码

    `sparkstreamingsource`和`sparkstreaming`两个文件可能包含了创建输入源、定义转换操作(例如,过滤、聚合)和输出操作的示例,用于处理连续的数据流。 4. **Spark RDD(弹性分布式数据集)**: 虽然RDD在Spark 2.x...

    SparkStreaming

    SparkContext是Spark程序的主要入口点,它负责连接集群,创建RDD、累加器和广播变量,是整个程序的基础。SqlContext则是用于结构化数据处理的入口,支持DataFrame操作和SQL查询。而StreamingContext是SparkStreaming...

    Spark 2.0.2 Spark 2.2 中文文档 本资源为网页,不是PDF

    Accumulators (累加器) 部署应用到集群中 使用 Java / Scala 运行 spark Jobs 单元测试 Spark 1.0 版本前的应用程序迁移 下一步 Spark Streaming Spark Streaming 概述 一个简单的示例 基本概念 依赖 ...

    spark大数据案例

    同时,案例涵盖了状态操作(如滑动窗口、累加器)和复杂事件时间处理,这些都是实时数据处理的关键。此外,Spark Streaming与Spark Core和Spark SQL的集成,使得流处理可以无缝地与批处理和交互式查询结合。 四、...

    spark商业实战三部曲

    2.2.3 累加器API 17 2.3 Spark 2.2 SQL. 19 2.3.1 Spark SQL. 20 2.3.2 DataFrame和DatasetAPI 20 2.3.3 Timed Window.. 21 2.4 Spark 2.2 Streaming. 21 2.4.1 StructuredStreaming. 21 2.4.2 增量输出模式....

    大数据-spark

    此外,Spark还支持多种共享变量,如广播变量和累加器,这些共享变量能够帮助开发者更好地管理和利用集群资源。 #### 二、引入Spark与环境配置 在编写Spark应用程序之前,首先需要配置好开发环境。Spark支持多种...

    Spark分布式内存计算框架视频教程

    8.广播变量和累加器 9.Spark 内核调度 10.Spark 并行度 第三章、SparkSQL 模块 1.快速入门:词频统计 2.SparkSQL 概述 3.DataFrame 4.RDD与DataFrame转换 5.数据分析SQL和DSL 6.案例:电影评分数据分析 7.DataSet 8....

    Apache Spark 2.0.2 中文文档

    - **Accumulators**(累加器):一种只增不减的变量,主要用于在集群中收集统计信息。 - **部署应用到集群中**: - **Java/Scala 运行 sparkJobs**:介绍如何使用 Java 和 Scala API 来编写和运行 Spark 应用程序...

    spark全案例

    2. 累加器:允许多个任务更新同一个变量,但只能用于累加操作。 3. Partitioner:控制数据分区,优化数据分布和计算效率。 七、容错机制 1. lineage:Spark通过记录数据转换过程来实现容错,即使部分数据丢失,也能...

    spark-programming-guide(Spark 编程指南)-高清文字版

    - **共享变量**:说明在Spark中如何使用广播变量和累加器来优化性能和管理状态。 #### 三、Spark Streaming - **一个快速的例子**:给出一个简单的Spark Streaming应用示例,以便读者快速理解其工作原理。 - **基本...

    Spark开发指导文档

    2. 广播变量和累加器:减少数据在网络中的传输,提高性能。 3. 数据分区:根据业务需求调整数据分区策略,优化数据分布和计算效率。 4. 复用SparkContext:避免频繁创建和销毁SparkContext,提升应用程序启动速度。 ...

    spark官方文档中文版

    Apache Spark 是一个流行的开源大数据处理框架,以其高效、易用...它还会深入讲解 Spark 的高级特性,如广播变量、累加器、持久化策略等。对于希望深入理解和应用 Spark 的开发者来说,这份文档是不可或缺的参考资料。

    图解Spark核心技术与案例实战

    9. **Spark性能优化**:这包括配置参数调整、减少Shuffle操作、利用广播变量和累加器、以及利用Spark的内存管理策略来提升计算速度。 10. **案例实战**:书中会包含多个实战案例,如日志分析、推荐系统、流数据处理...

    工信部spark初级考试参考题目

    Task 的关系、Executor 的作用、软件安装命令、应用提交命令、Spark 的 API、配置文件、日志输出控制、有向无环图、计算逻辑、Spark 应用程序配置、Spark 集群节点配置、RDD 算子类型、广播变量、累加器、全局共享...

    spark 2.0.1 JavaAPI

    - **累加器**(Accumulators):只允许添加操作的共享变量,可用于实现计数或求和等。 - **持久化**:通过.cache()或.persist()方法,可以将RDD存储在内存或磁盘上,提高重用效率。 - **错误恢复**:Spark的容错机制...

Global site tag (gtag.js) - Google Analytics