- 浏览: 73892 次
前面一篇文章介绍了SparkStreaming是如何不停的循环submitJob的, 连接:
http://humingminghz.iteye.com/admin/blogs/2308711
既然已经知道了Spark Streaming如何循环处理, 那么我们就要看一下处理过程中是怎么获取到Dstream的, 用简单的socketTextStream 来做一个列子, 入口:
在socketTextStream直接调用socketTextStream, 再到SocketInputDStream
可以看到实际上是创建了一个SocketInputDStream对象, 而SocketInputDStream是继承自ReceiverInputDStream的:
先看一下这个getReceiver, 创建了一个receiver: SocketReceiver,SocketReceiver里面主要看他的receive方法:
可以看到这个receiver主要就是连接socket获取传输过来的数据然后存起来到block里面
那么SocketTextStreaming又是怎么从block里面把数据拿出来组成RDD的呢,那么就要看 SocketInputDStream继承的父类ReceiverInputDStream了, 点进去看有个compute方法, 我们知道这个方法是会在action里面调用, 那么compute方法里面做了什么呢, 我们先看一下代码怎么写的:
看到如果时间是正常的, 首先会拿到一个receiverTracker (在ssc.start方法里面tracker已经启动)然后从receivertracker里面拿到特定时间的blocks, 然后根据获取到的blockinfos去创建BlockRDD (createBlockRDD(validTime, blockInfos))
createBlockRDD方法:
这个方法返回的是一个RDD, 里面先会从blockinfos里面拿出所有blockid:
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
再看是不是所有block都有在WAL里面
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
如果是, 那么就创建返回WriteAheadLogBackedBlockRDD (继承自RDD)
如果不是, 则确认blockmanager里面有这个ID, 并组成validBlockids:
val validBlockIds = blockIds.filter { id =>
ssc.sparkContext.env.blockManager.master.contains(id)
}
然后根据validBlockids创建一个blockRDD:
new BlockRDD[T](ssc.sc, validBlockIds)
BlockRDD里面的compute方法是:
可以看到他从blockManager里面根据ID去拿对应的block并组成iterator返回。 到此为止一个blockRDD就生成了, 大家知道Dstream实际上就是一堆RDD的组合, 那么这个就是里面RDD是怎么来的的介绍。
接下来还会看一下blockmanager和receiverTRacker是怎么把数据存入到block里面来管理的。
http://humingminghz.iteye.com/admin/blogs/2308711
既然已经知道了Spark Streaming如何循环处理, 那么我们就要看一下处理过程中是怎么获取到Dstream的, 用简单的socketTextStream 来做一个列子, 入口:
val lines = scc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK)
在socketTextStream直接调用socketTextStream, 再到SocketInputDStream
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
可以看到实际上是创建了一个SocketInputDStream对象, 而SocketInputDStream是继承自ReceiverInputDStream的:
private[streaming] class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } }
先看一下这个getReceiver, 创建了一个receiver: SocketReceiver,SocketReceiver里面主要看他的receive方法:
/** Create a socket connection and receive data until receiver is stopped */ def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } }
可以看到这个receiver主要就是连接socket获取传输过来的数据然后存起来到block里面
那么SocketTextStreaming又是怎么从block里面把数据拿出来组成RDD的呢,那么就要看 SocketInputDStream继承的父类ReceiverInputDStream了, 点进去看有个compute方法, 我们知道这个方法是会在action里面调用, 那么compute方法里面做了什么呢, 我们先看一下代码怎么写的:
override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
看到如果时间是正常的, 首先会拿到一个receiverTracker (在ssc.start方法里面tracker已经启动)然后从receivertracker里面拿到特定时间的blocks, 然后根据获取到的blockinfos去创建BlockRDD (createBlockRDD(validTime, blockInfos))
createBlockRDD方法:
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { if (blockInfos.nonEmpty) { val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } if (areWALRecordHandlesPresent) { // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") } else { logWarning("Some blocks have Write Ahead Log information; this is unexpected") } } val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) } if (validBlockIds.size != blockIds.size) { logWarning("Some blocks could not be recovered as they were not found in memory. " + "To prevent such data loss, enabled Write Ahead Log (see programming guide " + "for more details.") } new BlockRDD[T](ssc.sc, validBlockIds) } } else { // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD // according to the configuration if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, Array.empty, Array.empty, Array.empty) } else { new BlockRDD[T](ssc.sc, Array.empty) } } }
这个方法返回的是一个RDD, 里面先会从blockinfos里面拿出所有blockid:
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
再看是不是所有block都有在WAL里面
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
如果是, 那么就创建返回WriteAheadLogBackedBlockRDD (继承自RDD)
如果不是, 则确认blockmanager里面有这个ID, 并组成validBlockids:
val validBlockIds = blockIds.filter { id =>
ssc.sparkContext.env.blockManager.master.contains(id)
}
然后根据validBlockids创建一个blockRDD:
new BlockRDD[T](ssc.sc, validBlockIds)
BlockRDD里面的compute方法是:
override def compute(split: Partition, context: TaskContext): Iterator[T] = { assertValid() val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { case Some(block) => block.data.asInstanceOf[Iterator[T]] case None => throw new Exception("Could not compute split, block " + blockId + " not found") } }
可以看到他从blockManager里面根据ID去拿对应的block并组成iterator返回。 到此为止一个blockRDD就生成了, 大家知道Dstream实际上就是一堆RDD的组合, 那么这个就是里面RDD是怎么来的的介绍。
接下来还会看一下blockmanager和receiverTRacker是怎么把数据存入到block里面来管理的。
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1105最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1415之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1844前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1473前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3360之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4120在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6642前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2233前面一片文章介绍了SocketTextStream 是如何从b ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10151前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4654一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4728在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8791最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7404林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
### Apache Spark源码走读之四:DStream实时流数据处理 #### 一、系统概述与流数据特性 本文档探讨了Apache Spark Streaming的核心概念之一——**DStream**(Discretized Stream)及其如何实现对实时流数据的有效...
SparkStreaming之Dstream入门 Spark Streaming是Apache Spark中的一个组件,用于处理流式数据。它可以从多种数据源中接收数据,如Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等,并使用Spark的高度抽象原语如...
### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...
DStream是SparkStreaming的核心抽象,它代表了连续不断的数据块序列。DStream可以由多种方式创建,包括从基本的数据源(如Kafka、Flume、HDFS等)或通过转换现有DStream。本文主要介绍了两种创建DStream的方法:通过...
Spark Streaming提供了一种名为DStream(Discretized Stream)的高级抽象,其将实时数据流建模为一系列小批量数据。它在处理时,将这些小批量数据转换为RDD(Resilient Distributed Dataset),这是Spark核心API的一...
在本项目中,"基于SparkStreaming的实时音乐推荐系统源码.zip",主要涉及的是如何利用Apache Spark Streaming这一强大的实时处理框架,构建一个能够实时分析用户行为并进行个性化音乐推荐的系统。Spark Streaming是...
在Spark Streaming中,数据流被划分为一系列的小批量数据,称为Discretized Stream (DStream)。DStream是Spark Streaming的核心抽象,它代表了一种连续不断的数据流。 在本篇文章中,我们将重点讨论两个关键的概念...
Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一批一批地数据进行处理。每一...
标题中的“Flume push数据到SparkStreaming”是指在大数据处理领域中,使用Apache Flume将实时数据流推送到Apache Spark Streaming进行进一步的实时分析和处理的过程。这两个组件都是Apache Hadoop生态系统的重要...
DStream是Spark Streaming对连续数据流的概念抽象,它是由一系列连续的RDD(弹性分布式数据集)组成的。每个RDD表示在一个时间窗口内的数据块。通过这种方式,Spark Streaming能够利用RDD的并行计算能力来高效处理...
Spark Streaming的核心概念是DStream(Discretized Stream),它代表了一个连续的数据流,可以由输入数据源创建,也可以通过其他DStream的转换得到。DStream由一系列RDD(Resilient Distributed Dataset,弹性分布式...
标题中提到的“Spark Streaming Spark流数据”指的是Apache Spark的一个子项目,即Spark Streaming,用于进行实时数据流处理。Apache Spark是一个强大的分布式数据处理框架,提供了包括批处理、流处理、机器学习、图...
- **定义**:Spark Streaming提供了名为DStream(Discretized Stream)的高级抽象,它代表了一个连续的数据流。 - **创建方式**:DStream可以通过来自Kafka、Flume、Kinesis等数据源的输入流创建,也可以通过对现有...
除了对数据流的处理,Spark Streaming还支持从Spark的批处理结果中获取数据,实现双向的数据流处理。这意味着Spark Streaming不仅仅是单向实时数据流的处理,还可以是双向数据处理,将实时计算和批量计算有机结合...
Spark Streaming是Apache Spark的重要组成部分,它提供了一种高吞吐量、可容错的实时数据处理方式。Spark Streaming的核心是一个执行模型,这个执行模型基于微批处理(micro-batch processing)的概念,允许将实时数据...
- DStreams:Spark Streaming中的核心抽象,代表连续的数据流,由一系列时间窗口内的RDDs组成。 - Windowing:用于定义时间间隔,决定数据如何分组和处理。例如,可以按每分钟、每5分钟的数据进行处理。 - ...
`SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...
Spark Streaming则提供了基于微批处理的实时计算框架,能够高效地处理持续的数据流;而HBase作为分布式列式数据库,适合存储海量结构化半结构化数据。这三者之间的集成,使得实时数据流能够被快速处理并持久化存储。...
Spark Streaming将数据流分割成一系列小批量的数据块进行处理,这种机制使得Spark Streaming既能够处理实时数据流,又能利用Spark的核心API进行复杂的数据处理。 #### 2. Kafka简介 Apache Kafka是一种分布式的发布...
- **DStream(Discretized Stream)**:Spark Streaming 的核心抽象,表示连续的数据流,是由一系列时间间隔内的 RDD(Resilient Distributed Datasets)组成的。 - **窗口操作**:Spark Streaming 提供了时间窗口...