`
bo_hai
  • 浏览: 565799 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

spark基于Streaming的累加器(updateStateByKey)

阅读更多

使用spark streaming 需要搭建Kafka、zookeeper,搭建的方法网上有很多,再此不再多讲:

文章中的代码参考:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/

代码如下:

import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object WebPagePopularityValueCalculator {
  private val checkpointDir = "popularity-data-checkpoint"
  //kafka message consumer group ID
  private val msgConsumerGroup = "user-behavior-topic-message-consumer-group"

  
  def main(args:Array[String]){
    //定义zkserver和端口号及多少秒收集一次数据
    val Array(zkServers,processingInterval) = Array("172.4.23.99:2181,172.4.23.99:2182,172.4.23.99:2183","2")
    val conf = new SparkConf().setAppName("WebPagePopularityValueCalculator")
    val ssc = new StreamingContext(conf,Seconds(processingInterval.toInt))
    //using updateStateByKey asks for enabling checkpoint
    ssc.checkpoint(checkpointDir)
    //msgConsumerGroup:kafka的客户端可以是一个组,一条信息只能由组里的一台机器消费
    val kafkaStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkServers,msgConsumerGroup,Map("user-behavior-topic" -> 3))
    //返回的数据集是k,v形型,k:topic,v:写入kafka的数据集
    val msgDataRDD = kafkaStream.map(_._2)
    //for debug use only
    //println("Coming data in this interval...")
    //msgDataRDD.print()
    //计算权重
    val popularityData = msgDataRDD.map{msgLine => {
      val dataArr:Array[String] = msgLine.split("[|]")
      val pageID = dataArr(0)
      val popValue:Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
      (pageID,popValue)
    }}

    //sum the previous popularity value and current value
    //iterator有三个参数,第一个参数为key,第二个参数:上一次的数据集,第三个参数:本次的数据
    //处理逻辑是:将上一次的数据集求和,再加上本次的数据,返回结果之和
    //再返回(key,sumedValue)
    val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
      iterator.flatMap(t => {
        val newValue:Double = t._2.sum
        val stateValue:Double = t._3.getOrElse(0)
        Some(newValue + stateValue)
      }.map(sumedValue => (t._1, sumedValue)))
    }

    val initialRDD = ssc.sparkContext.parallelize(List(("page1",0.00)))
    val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,new HashPartitioner(ssc.sparkContext.defaultParallelism),true,initialRDD)
    //set the checkpoint interval to avoid too frequently data checkpoint which may
    //may significantly reduce operation throughput
    stateDstream.checkpoint(Duration(8 * processingInterval.toInt * 1000))
    //after calculation, we need to sort the result and only show the top 10 hot pages
    stateDstream.foreachRDD(rdd => {
      val sortedData = rdd.map{case (k,v) => (v,k)}.sortByKey(false)
      val topKData = sortedData.take(10).map{case (v,k) => (k,v)}
      topKData.foreach(x => {
        println(x)
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 上述代码的核心是:updateStateByKey。看一下源码:

  /**
   * Return a new "state" DStream where the state for each key is updated by applying
   * the given function on the previous state of the key and the new values of each key.
   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
   * @param updateFunc State update function. Note, that this function may generate a different
   *                   tuple with a different key than the input key. Therefore keys may be removed
   *                   or added in this way. It is up to the developer to decide whether to
   *                   remember the partitioner despite the key being changed.
   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
   *                    DStream
   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
      partitioner: Partitioner,
      rememberPartitioner: Boolean
    ): DStream[(K, S)] = ssc.withScope {
     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
  }

 

  • initialRDD是(K,S)类型的RDD,它表示一组Key的初始状态,每个(K,S)表示一个Key以及它对应的State状态。 K表示updateStateByKey的Key的类型,比如String,而S表示Key对应的状态(State)类型,在上例中,是Int
  • rememberPartitioner: 表示是否在接下来的Spark Streaming执行过程中产生的RDD使用相同的分区算法
  • partitioner: 分区算法,上例中使用的Hash分区算法,分区数为ssc.sparkContext.defaultParallelism
  • updateFunc是函数常量,类型为(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],表示状态更新函数


    (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]如何解读?

    入参: 三元组迭代器,三元组中K表示Key,Seq[V]表示一个时间间隔中产生的Key对应的Value集合(Seq类型,需要对这个集合定义累加函数逻辑进行累加),Option[S]表示上个时间间隔的累加值(表示这个Key上个时间点的状态)

    出参:二元组迭代器,二元组中K表示Key,S表示当前时间点执行结束后,得到的累加值(即最新状态)

    总结:(1)环境的搭建;(2)理解Kafka的基本源理;(3)明白updateStateByKey的使用;

    参考:
    http://bit1129.iteye.com/blog/2198682
    https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/

0
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics