版权声明:原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明,否则将追究法律责任。
1. 更强的控制自由度
2. 语义一致性
object DirectKafkaWordCount{
def main(args:Array[String]){
if(args.length <2){
System.err.println(s"""
|Usage:DirectKafkaWordCount<brokers><topics>
|<brokers> is a list of one or more Kafka brokers
|<topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics)= args
// Create context with 2 second batch interval
val sparkConf =newSparkConf().setAppName("DirectKafkaWordCount")
val ssc =newStreamingContext(sparkConf,Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams =Map[String,String]("metadata.broker.list"-> brokers)
val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x =>(x,1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/**
* A batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/
private[kafka]
classKafkaRDD[
K:ClassTag,
V:ClassTag,
U <:Decoder[_]:ClassTag,
T <:Decoder[_]:ClassTag,
R:ClassTag]private[spark](
sc:SparkContext,
kafkaParams:Map[String,String],
val offsetRanges:Array[OffsetRange], //该RDD的数据偏移量
leaders:Map[TopicAndPartition,(String,Int)],
messageHandler:MessageAndMetadata[K, V]=> R
)extends RDD[R](sc,Nil) with Logging with HasOffsetRanges
trait HasOffsetRanges{
def offsetRanges:Array[OffsetRange]
}
inal classOffsetRangeprivate(
val topic:String,
val partition:Int,
val fromOffset:Long,
val untilOffset:Long)extendsSerializable
override def getPartitions:Array[Partition]={
offsetRanges.zipWithIndex.map {case(o, i)=>
val (host, port)= leaders(TopicAndPartition(o.topic, o.partition))
newKafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
private[kafka]
classKafkaRDDPartition(
val index:Int,
val topic:String,
val partition:Int,
val fromOffset:Long,
val untilOffset:Long,
val host:String,
val port:Int
)extendsPartition{
/** Number of messages this partition refers to */
def count():Long= untilOffset - fromOffset
}
override def compute(thePart:Partition, context:TaskContext):Iterator[R]={
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if(part.fromOffset == part.untilOffset){
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset "+
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
}else{
newKafkaRDDIterator(part, context)
}
}
privateclassKafkaRDDIterator(
part:KafkaRDDPartition,
context:TaskContext)extendsNextIterator[R]{
context.addTaskCompletionListener{ context => closeIfNeeded()}
log.info(s"Computing topic ${part.topic}, partition ${part.partition} "+
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
val kc =newKafkaCluster(kafkaParams)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter:Iterator[MessageAndOffset]=null
val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc, kafkaParams, topicsSet)
def createDirectStream[
K:ClassTag,
V:ClassTag,
KD <:Decoder[K]:ClassTag,
VD <:Decoder[V]:ClassTag](
ssc:StreamingContext,
kafkaParams:Map[String,String],
topics:Set[String]
):InputDStream[(K, V)]={
val messageHandler =(mmd:MessageAndMetadata[K, V])=>(mmd.key, mmd.message)
val kc =newKafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
newDirectKafkaInputDStream[K, V, KD, VD,(K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}
override def compute(validTime:Time):Option[KafkaRDD[K, V, U, T, R]]={
//计算最近的数据终止偏移量
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
val rdd =KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
// Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map {case(tp, fo)=>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t"+
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata =Map(
"offsets"-> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo =StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
总结:
而且KafkaRDDPartition只能属于一个topic,不能让partition跨多个topic,直接消费一个kafkatopic,topic不断进来、数据不断偏移,Offset代表kafka数据偏移量指针。
数据不断流进kafka,batchDuration假如每十秒都会从配置的topic中消费数据,每次会消费一部分直到消费完,下一个batchDuration会再流进来的数据,又可以从头开始读或上一个数据的基础上读取数据。
思考直接抓取kafka数据和receiver读取数据:
好处一:
直接抓取fakfa数据的好处,没有缓存,不会出现内存溢出等之类的问题。但是如果kafka Receiver的方式读取会存在缓存的问题,需要设置读取的频率和block interval等信息。
好处二:
采用receiver方式的话receiver默认情况需要和worker的executor绑定,不方便做分布式,当然可以配置成分布式,采用direct方式默认情况下数据会存在多个worker上的executor。Kafkardd数据默认都是分布在多个executor上的,天然数据是分布式的存在多个executor,而receiver就不方便计算。
好处三:
数据消费的问题,在实际操作的时候采用receiver的方式有个弊端,消费数据来不及处理即操作数据有deLay多才时,Spark Streaming程序有可能奔溃。但如果是direct方式访问kafka数据不会存在此类情况。因为diect方式直接读取kafka数据,如果delay就不进行下一个batchDuration读取。
好处四:
完全的语义一致性,不会重复消费数据,而且保证数据一定被消费,跟kafka进行交互,只有数据真正执行成功之后才会记录下来。
生产环境下强烈建议采用direct方式读取kafka数据。
相关推荐
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions...
### Spark Core 源码解读 Spark Core源码的解读涉及到理解以下几个关键部分: 1. **架构组件**:在Spark的主从架构中,包括Driver和Executor两种角色。Driver负责作业的分解和任务调度,而Executor则执行Driver...
Apache Spark Streaming是Apache Spark用于处理实时流数据的一...Coolplay Spark是一个专注于此类内容的社区和资源集合,提供了大量关于Spark Streaming和Structured Streaming的源码解析、类库、代码和技术交流资源。
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...
c)源码解析SparkStreaming数据清理的工作无论是在实际开发中,还是自己动手实践中都是会面临的,Spark Streaming中BatchDurations中会不断的产生RDD,这样会不断的有内存对象生成,其中包含元数据和数据本身。由此...
kafka+Spark Streaming开发文档 本文档主要讲解了使用Kafka和Spark Streaming进行实时数据处理的开发文档,涵盖了Kafka集群的搭建、Spark Streaming的配置和开发等内容。 一、Kafka集群搭建 首先,需要安装Kafka...
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。以下是基于文档提供内容的详细知识点: 1. Spark Streaming简介与渊源 Spark Streaming是Spark生态中...
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
### Spark Streaming概述 #### 一、Spark Streaming定义与特点 **Spark Streaming** 是Apache Spark生态中的一个重要组件,它主要用于处理实时数据流。该模块构建在基础Spark API之上,旨在实现可扩展、高吞吐量...
在大数据处理领域,Flume 和 Spark Streaming 是两个重要的工具,它们分别用于数据收集与实时流处理。...这种架构充分利用了 Flume 的稳定性与 Spark Streaming 的高性能,是大数据实时处理领域常见的解决方案之一。
Spark Streaming 入门案例 Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一...
本资源集合包含了15篇与Spark Streaming相关的学术论文,其中涵盖了几篇硕士论文和期刊文章,这些论文深入探讨了Spark Streaming在实际应用中的各种场景和挑战。 首先,Spark Streaming 提供了一个可扩展且容错的...
Spark Streaming的源码展示了其内部实现机制,包括如何接收数据、如何创建和操作DStream、以及如何调度和执行微批处理任务。深入理解源码可以帮助开发者优化性能,解决故障,并实现更复杂的实时流处理逻辑。 **工具...
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。...对于从事大数据处理的工程师和数据科学家来说,了解并运用Spark Streaming处理实时数据流是必备技能之一。
Spark Streaming是Apache Spark的重要组成部分,它提供了一种高吞吐量、可容错的实时数据处理方式。Spark Streaming的核心是一个执行模型,这个执行模型基于微批处理(micro-batch processing)的概念,允许将实时数据...
Spark Streaming 是 Apache Spark 的一个模块,它允许开发者处理实时数据流。这个强大的工具提供了一种弹性、容错性好且易于编程的模型,用于构建实时流处理应用。在这个"Spark Streaming 示例"中,我们将深入探讨...
流处理系统如Apache Spark Streaming和Apache Storm,都致力于提供高吞吐量、低延迟的数据处理能力。尽管它们的目的是类似的,但各自的设计哲学、运行模型、容错机制等方面存在着显著差异。以下将详细介绍Spark ...
随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架 MapReduce 已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析,决策。例如实时的用户推荐,在 618 这样的刺激环境下普通历史数据的推荐...