使用spark streaming 需要搭建Kafka、zookeeper,搭建的方法网上有很多,再此不再多讲:
文章中的代码参考:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/
代码如下:
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} object WebPagePopularityValueCalculator { private val checkpointDir = "popularity-data-checkpoint" //kafka message consumer group ID private val msgConsumerGroup = "user-behavior-topic-message-consumer-group" def main(args:Array[String]){ //定义zkserver和端口号及多少秒收集一次数据 val Array(zkServers,processingInterval) = Array("172.4.23.99:2181,172.4.23.99:2182,172.4.23.99:2183","2") val conf = new SparkConf().setAppName("WebPagePopularityValueCalculator") val ssc = new StreamingContext(conf,Seconds(processingInterval.toInt)) //using updateStateByKey asks for enabling checkpoint ssc.checkpoint(checkpointDir) //msgConsumerGroup:kafka的客户端可以是一个组,一条信息只能由组里的一台机器消费 val kafkaStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkServers,msgConsumerGroup,Map("user-behavior-topic" -> 3)) //返回的数据集是k,v形型,k:topic,v:写入kafka的数据集 val msgDataRDD = kafkaStream.map(_._2) //for debug use only //println("Coming data in this interval...") //msgDataRDD.print() //计算权重 val popularityData = msgDataRDD.map{msgLine => { val dataArr:Array[String] = msgLine.split("[|]") val pageID = dataArr(0) val popValue:Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 (pageID,popValue) }} //sum the previous popularity value and current value //iterator有三个参数,第一个参数为key,第二个参数:上一次的数据集,第三个参数:本次的数据 //处理逻辑是:将上一次的数据集求和,再加上本次的数据,返回结果之和 //再返回(key,sumedValue) val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(t => { val newValue:Double = t._2.sum val stateValue:Double = t._3.getOrElse(0) Some(newValue + stateValue) }.map(sumedValue => (t._1, sumedValue))) } val initialRDD = ssc.sparkContext.parallelize(List(("page1",0.00))) val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,new HashPartitioner(ssc.sparkContext.defaultParallelism),true,initialRDD) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDstream.checkpoint(Duration(8 * processingInterval.toInt * 1000)) //after calculation, we need to sort the result and only show the top 10 hot pages stateDstream.foreachRDD(rdd => { val sortedData = rdd.map{case (k,v) => (v,k)}.sortByKey(false) val topKData = sortedData.take(10).map{case (v,k) => (k,v)} topKData.foreach(x => { println(x) }) }) ssc.start() ssc.awaitTermination() } }
上述代码的核心是:updateStateByKey。看一下源码:
/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. Note, that this function may generate a different * tuple with a different key than the input key. Therefore keys may be removed * or added in this way. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) }
- initialRDD是(K,S)类型的RDD,它表示一组Key的初始状态,每个(K,S)表示一个Key以及它对应的State状态。 K表示updateStateByKey的Key的类型,比如String,而S表示Key对应的状态(State)类型,在上例中,是Int
- rememberPartitioner: 表示是否在接下来的Spark Streaming执行过程中产生的RDD使用相同的分区算法
- partitioner: 分区算法,上例中使用的Hash分区算法,分区数为ssc.sparkContext.defaultParallelism
- updateFunc是函数常量,类型为(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],表示状态更新函数
(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]如何解读?
入参: 三元组迭代器,三元组中K表示Key,Seq[V]表示一个时间间隔中产生的Key对应的Value集合(Seq类型,需要对这个集合定义累加函数逻辑进行累加),Option[S]表示上个时间间隔的累加值(表示这个Key上个时间点的状态)
出参:二元组迭代器,二元组中K表示Key,S表示当前时间点执行结束后,得到的累加值(即最新状态)
总结:(1)环境的搭建;(2)理解Kafka的基本源理;(3)明白updateStateByKey的使用;
参考:
http://bit1129.iteye.com/blog/2198682
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/
相关推荐
spark-streaming-flume_2.11-2.1.0.jar
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming
spark-streaming-kafka-0-8_2.11-2.4.0.jar
KafkaUtils所依赖的jar包,导入文件中KafkaUtils报错,需要导入spark-streaming-kafka_2.10-1.6.0.jar包
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
Spark Streaming 是基于 Apache Spark 的实时流处理框架,它扩展了 Spark Core 的批处理功能,使其能够处理持续流入的数据流。Spark Streaming 提供了微批处理的概念,将实时数据流分割成一系列小的批处理作业,从而...
通过Spark-Streaming,我们可以利用机器学习算法(如基于统计的方法、聚类、深度学习等)对日志数据进行实时分析,识别出与正常行为模式偏离的事件,从而发现潜在的问题或攻击。 总结来说,这个系统构建了一个完整...
spark-streaming_2.11-2.1.3-SNAPSHOT.jar
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
Structured Streaming基于Spark SQL引擎,可以轻松地与Spark SQL集成,并且可以利用Spark SQL的强大功能,如SQL查询、流处理聚合等。 Structured Streaming提供了Source和Sink的概念,它支持多种数据源作为输入(如...
spark streaming 链接kafka必用包,欢迎大家下载与使用
spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。
基于Scala2.11的spark-streaming 2.2.0 jar包,用于java开发
使用spark集成flume,由于flume默认只支持pull消息的方式,不过它可以自定义消息拉取方式,现要使用poll方式,可以使用spark-streaming-flume-sink_2.11-2.1.0.jar包下的org.apache.spark.streaming.flume.sink....
Spark-Streaming基于微批处理概念,将实时数据流分解为一系列小批次,然后利用Spark Core的并行计算能力处理这些小批次。这种设计使得Spark-Streaming能够利用已有的批处理优化,并具备高吞吐量和低延迟的特性。 2...
spark streaming kafka
- **RDD容错机制**: Spark Streaming基于RDD的Lineage机制,当部分分区丢失时,可以通过计算Lineage信息重新计算得到丢失的数据。 - **数据源容错**: 对于来自外部文件系统(如HDFS)的数据源,可以通过重新读取数据...
本文提出了一种基于Spark Streaming技术的实时数据处理系统,旨在解决实时数据处理问题,提高数据处理的实时性。 二、Spark Streaming技术与实时数据处理系统设计 Spark Streaming是Apache Spark用于实时数据流处理...