`
m635674608
  • 浏览: 5031386 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

spark-streaming-kafka包源码分析

 
阅读更多

转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/5443789.html   

 

    最近由于使用sparkstreaming的同学需要对接到部门内部的的kafka集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为spark在github的tag的v1.6.1版本。

     官方给出的JavaKafkaWordCount以及KafkaWordCount代码里产生kafka-streaming消费流数据的调用代码分别如下

1
2
3
4
5
JavaPairReceiverInputDStream<String, String> messages =
           KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
 
 
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

  

可以看到无论是java还是scala调用的都是KafkaUtils内重载实现的createStream方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
object KafkaUtils {
  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param ssc       StreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @param storageLevel  Storage level to use for storing the received objects
   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
    createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, storageLevel)
  }
 
  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param ssc         StreamingContext object
   * @param kafkaParams Map of kafka configuration parameters,
   *                    see http://kafka.apache.org/08/configuration.html
   * @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                    in its own thread.
   * @param storageLevel Storage level to use for storing the received objects
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam U type of Kafka message key decoder
   * @tparam T type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)] = {
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
  }
 
  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
   * @param jssc      JavaStreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      jssc: JavaStreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: JMap[String, JInt]
    ): JavaPairReceiverInputDStream[String, String] = {
    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
  }
 
  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param jssc      JavaStreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
   * @param groupId   The group id for this consumer.
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread.
   * @param storageLevel RDD storage level.
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      jssc: JavaStreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: JMap[String, JInt],
      storageLevel: StorageLevel
    ): JavaPairReceiverInputDStream[String, String] = {
    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
      storageLevel)
  }
 
  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param jssc      JavaStreamingContext object
   * @param keyTypeClass Key type of DStream
   * @param valueTypeClass value type of Dstream
   * @param keyDecoderClass Type of kafka key decoder
   * @param valueDecoderClass Type of kafka value decoder
   * @param kafkaParams Map of kafka configuration parameters,
   *                    see http://kafka.apache.org/08/configuration.html
   * @param topics  Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                in its own thread
   * @param storageLevel RDD storage level.
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam U type of Kafka message key decoder
   * @tparam T type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
      jssc: JavaStreamingContext,
      keyTypeClass: Class[K],
      valueTypeClass: Class[V],
      keyDecoderClass: Class[U],
      valueDecoderClass: Class[T],
      kafkaParams: JMap[String, String],
      topics: JMap[String, JInt],
      storageLevel: StorageLevel
    ): JavaPairReceiverInputDStream[K, V] = {
    implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
    implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
 
    implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
    implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
 
    createStream[K, V, U, T](
      jssc.ssc,
      kafkaParams.asScala.toMap,
      Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
      storageLevel)
  }

 

其中java相关的第三个和第四个createStream调用了第一个createStream,而第一个createStream最后调用的是第二个createStream,所以所有的rdd数据流都是从下面这句代码产生的:

new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)

查看KafkaInputDStream类定义,发现获取receiver有两种类型:KafkaReceiver和ReliableKafkaReceiver。

1
2
3
4
5
6
7
def getReceiver(): Receiver[(K, V)] = {
  if (!useReliableReceiver) {
    new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
  else {
    new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
  }
}

其中,KafkaReceiver实现比较简单,调用的是kafka的high level api产生数据流,产生的每个线程的数据流都被放到一个线程池由单独的线程来消费

val topicMessageStreams = consumerConnector.createMessageStreams(
  topics, keyDecoder, valueDecoder)

 ReliableKafkaReceiver是结合了spark的预写日志(Write Ahead Logs)功能,开启这个功能需要设置sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假)

这个receiver会把收到的kafka数据首先存储到日志上,然后才会向kafka提交offset,这样保证了在driver程序出现问题的时候不会丢失kafka数据。

 

 

参考文章 Spark Streaming容错的改进和零数据丢失

分享到:
评论

相关推荐

    spark-2.4.7-bin-hadoop2.6.tgz

    5. **Spark Streaming**:Spark Streaming提供了微批处理的方式处理实时流数据,它能够处理来自各种数据源(如Kafka、Flume等)的连续数据流。 6. **MLlib**:Spark的机器学习库MLlib包含了多种机器学习算法,如...

    基于spark streaming+kafka的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip

    基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip ...

    spark-2.2.1.tar.gz 源码

    Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图处理)。Spark Core是基础,提供分布式任务调度、内存管理、错误恢复和网络通信等功能。Spark SQL支持SQL查询和...

    基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码.zip

    基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...

    基于flume ,kafka , spark-streaming 统计分析大数据实战项目+源代码+文档说明

    该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! &lt;项目介绍&gt; 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! ...

    基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip

    【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip

    spark Streaming和structed streaming分析

    Apache Spark Streaming是Apache Spark用于处理实时流数据的一...Coolplay Spark是一个专注于此类内容的社区和资源集合,提供了大量关于Spark Streaming和Structured Streaming的源码解析、类库、代码和技术交流资源。

    基于spark streaming+flume+kafka+hbase的实时日志处理分析系统+源代码+文档说明

    1、资源内容:基于spark streaming+flume+kafka+hbase的实时日志处理分析系统(分为控制台版本和基于springboot、Echarts等的Web UI可视化版本) 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便...

    spark2.2.0源码------

    4. **Spark Streaming增强**:Spark Streaming在这一版本中增加了对Kafka Direct Stream的支持,允许用户更高效地从Kafka读取数据,减少了数据处理的延迟。此外,容错机制也得到了改善,提高了系统的健壮性。 5. **...

    spark-2.2.0.tgz源码

    Spark Streaming处理实时数据流,支持多种数据源如Kafka和TCP套接字。MLlib则提供了广泛的机器学习算法,而GraphX则用于图计算。 2. **RDD(弹性分布式数据集)**:Spark 2.2.0中的关键概念是RDD,它是Spark的基础...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    《基于Spark Streaming、Kafka与HBase的日志统计分析系统》 Spark Streaming是Apache Spark框架的一个模块,专门用于处理实时数据流。它提供了一个高级抽象,使得开发人员能够轻松地处理持续的数据流,同时保持...

    基于Spark Streaming + Kafka + Flume 实现的日志收集处理系统.zip

    Spark Streaming作为消费者,连接到Kafka主题,实时读取并处理这些日志数据,进行清洗、过滤、统计分析等操作,最后将处理结果存储到如HDFS或数据库中,供后续分析或展示。 项目源码“sparktime-master”包含了完整...

    基于spark streaming和kafka,hbase的日志统计分析系统+源代码+文档说明

    1、资源内容:基于spark streaming和kafka,hbase的日志统计分析系统+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行成功,...

    Spark Streaming Real-time big-data processing

    在Spark Streaming中,开发者可以使用多种工具集成实时数据源,如Kafka、Flume、Twitter等。这些工具可以方便地将实时数据引入到Spark Streaming的工作流程中。此外,Spark还提供丰富的库来处理各种数据转换和分析...

    spark源码结构,源码说明

    - `spark-sql-kafka-0-10`:与Kafka集成,用于实时数据摄取。 3. **Spark UI和日志**: Spark的用户界面(UI)是基于Java Swing构建的,提供监控作业状态和性能的可视化界面。`/ui`目录下的源码包含了UI组件的...

    spark-parent_2.11:spark2.2中文源码包-源码包

    Spark Streaming 提供了微批处理模型来处理实时数据流,它可以接收来自多种数据源(如 Kafka、Flume、TCP 套接字等)的数据,并以时间窗口进行处理。源码中包含了 DStream(离散化流)的实现,以及如何将 DStream ...

    spark源码分析系列

    在本知识点中,我们将探讨Spark Streaming中的Direct Approach模式,并结合源码分析,理解如何处理和预防在使用Spark Streaming消费Kafka数据时出现的一些常见问题。 **Spark Streaming Direct Approach核心机制** ...

    spark1.6.0-src.rar

    源码分析: 1. **Spark Core**:Spark的核心组件,负责任务调度、内存管理、故障恢复和与存储系统的交互。在源码中,你可以看到DAGScheduler如何将任务分解为Stage,以及TaskScheduler如何将这些Stage转化为具体的...

    spark大数据商业实战三部曲源码及资料.zip

    源码分析有助于深入理解Spark的内部工作原理,例如: - DAGScheduler如何将作业拆分成Stage,Stage再拆分成Task。 - TaskScheduler如何将Task分配到Executor上执行。 - Shuffle过程是如何实现的,包括...

    基于SparkStreaming的实时音乐推荐系统源码.zip

    在本项目中,"基于SparkStreaming的实时音乐推荐系统源码.zip",主要涉及的是如何利用Apache Spark Streaming这一强大的实时处理框架,构建一个能够实时分析用户行为并进行个性化音乐推荐的系统。Spark Streaming是...

Global site tag (gtag.js) - Google Analytics