以如下代码为例(SocketInputDStream):
Spark Streaming从Socket读取数据的代码是在SocketReceiver的receive方法中,撇开异常情况不谈(Receiver有重连机制,restart方法,默认情况下在Receiver挂了之后,间隔两秒钟重新建立Socket连接),读取到的数据通过调用store(textRead)方法进行存储。数据的流转需要关注如下几个问题:
1. 数据存储到什么位置了
2. 数据存储的结构如何?
3. 数据什么时候被读取
4. 读取到的数据(batch interval)如何转换为RDD
1. SocketReceiver#receive
/** Create a socket connection and receive data until receiver is stopped */ def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } logInfo("Stopped receiving") restart("Retrying connecting to " + host + ":" + port) } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } }
2. SocketReceiver#receive=>SocketReceiver#store
/** * Store a single item of received data to Spark's memory. * These single items will be aggregated together into data blocks before * being pushed into Spark's memory. */ def store(dataItem: T) { executor.pushSingle(dataItem) }
数据存储作为Executor功能之一,store方法调用了executor中的pushSingle操作,此时的Single可以理解为一次数据读取,而dataItem就是一次读取的数据对象
3. SocketReceiver#store=>executor.pushSingle(ReceiverSupervisorImpl.pushSingle)
/** Push a single record of received data into block generator. */ def pushSingle(data: Any) { blockGenerator.addData(data) }
数据放入到了blockGenerator数据结构中了,blockGenerator,类型为BlockGenerator,顾名思义是一个block生成器,所谓的block生成器,是指Spark Streaming每隔一段时间(默认200毫秒, private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200))将接收到的数据合并成一个block,然后将这个block写入到BlockManager,继续沿着个思路分析
4. executor.pushSingle=>BlockGenerator.addData
/** * Push a single data item into the buffer. All received data items * will be periodically pushed into BlockManager. */ def addData (data: Any): Unit = synchronized { waitToPush() ///通过阻塞控制Push的速度 currentBuffer += data,将数据追加到currentBuffer中 }
当数据写入到currentBuffer中之后,似乎线索已经断了。事实上是BlockGenerator内部开启的两个线程(BlockIntervalTimer和BlockPushingThread)在背后继续处理currentBuffer
BlockIntervalTimer默认每200毫秒执行一次updateCurrentBufferer,该函数的功能是将类型为ArrayBuffer的currentBuffer合并成一个小的Block
private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200) private val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
BlockPushingThread是通过循环调用keepPushingBlocks将BlockIntervalTimer创建的各个Block写入到BlockManager中,
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
上面说到的两个线程的同步是通过ArrayBlockQueue实现的
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
5. BlockGenerator#updateCurrentBuffer
updateCurrentBuffer由BlockIntervalTimer线程执行
/** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] //这两句对currentBuffere这样的操作,是否有线程安全问题?没有,因为currentBuffer已经标注为@volatile类型的变量 if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) //构造StreamBlockId val newBlock = new Block(blockId, newBlockBuffer) //创建出一个Block listener.onGenerateBlock(blockId) //通知谁?空实现,listener是作为BlockGenerator的构造函数传入的,这是一个所有通知时间的空实现 blocksForPushing.put(newBlock) //添加到阻塞队列中,等待BlockPushingThread读取 logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
6. BlockGenerator#keepPushingBlocks
keepPushingBlocks由BlockPushingThread执行
/** Keep pushing blocks to the BlockManager. */ private def keepPushingBlocks() { logInfo("Started block pushing thread") try { while(!stopped) { //poll是阻塞队列的非阻塞方法,但是如果队列中没有元素,则等待100ms,poll是取一个元素操作 Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } // Push out the blocks that are still left logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") while (!blocksForPushing.isEmpty) { logDebug("Getting block ") val block = blocksForPushing.take() pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } }
7. BlockGenerator#pushBlock
这个方法是针对一个Block进行push,而不是一次从队列中把所有的Block取出来,一次进行push。
private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) }
8. BlockGeneratorListener#onPushBlock
pushBlock是通过Observer模式,通知listener,这个liestener是BlockGenerator的构造函数传入的(其实是作为内部类,在构造时创建的实例)
/** Divides received data records into data blocks for pushing in BlockManager. */ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } }, streamId, env.conf)
9. BlockGenerator#pushArrayBuffer
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) }
10. BlockGenerator#pushAndReportBlock
/** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size case _ => -1 } val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout) Await.result(future, askTimeout) logDebug(s"Reported block $blockId") }
pushAndReportBlock做了两件事,一是Store Block,而是想Tracker汇报有Block加入
10.1 receivedBlockHandler.storeBlock(BlockManagerBasedBlockHandler#storeBlock)
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true) case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } BlockManagerBasedStoreResult(blockId) }
其中,blockManager是BlockManager类型的变量,定义于org.apache.spark.storage包中,实现向BlockManager写入数据,具体调用putIterator,putBytes,这是Spark存储子系统的内容,此处不赘述,重要的是,在此处写入进了BlockManager
10.2 ReceiverTracker#AddBlock
通过下面两个语句,将写入到BlockManager的信息汇报给TrackActor,这是一个进程间的同步调用(ask语法)
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout) Await.result(future, askTimeout)
trackerActor对应的实体是ReceiverTracker,AddBlock消息将触发ReceiverTracker.addBlock,进而调用ReceivedBlockTracker.addBlock
/** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo) }
11. ReceivedBlockTracker.addBlock
/** Add received block. This event will get written to the write ahead log (if enabled). */ def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized { try { writeToLog(BlockAdditionEvent(receivedBlockInfo))//写WAL getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //getReceivedBlockQueue从Map<streamId,streamReceivedBlockQueue>中获取相应的streamReceivedBlockQueue logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}") true } catch { case e: Exception => logError(s"Error adding block $receivedBlockInfo", e) false } }
相关推荐
Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
《Spark源码分析》这本书是针对那些希望深入了解大数据处理框架Spark以及与其紧密相关的Hadoop技术的专业人士所编写的。Spark作为一个快速、通用且可扩展的数据处理引擎,已经在大数据领域占据了重要地位,而深入...
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
在配置Spark Streaming时,需要将Spark版本设置为1.3.0,并且需要配置Spark Streaming的参数,包括batch interval、window duration等。 三、Kafka和Spark Streaming集成 在将Kafka和Spark Streaming集成时,需要...
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。作为Spark核心API的一个扩展,它延续了Spark的易用性和高效性,能够将实时数据流处理与批量数据处理无缝集成在一起。利用...
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
8. **Spark Streaming**:Spark Streaming构建在微批处理之上,通过将流数据划分为小批次处理,实现了低延迟的实时流处理。 9. **MLlib与Spark ML**:Spark提供了机器学习库MLlib,以及基于DataFrame的ML,支持各种...
整体而言,该预研报告为技术人员提供了关于Spark Streaming的全面了解,从基础概念到深入案例分析,再到性能调优和与其他技术的对比,是大数据流处理领域的重要参考文献。对于希望利用Spark Streaming进行实时数据...
- **处理单元**:Storm处理的是单个事件,而Spark Streaming处理的是某一时间窗口内的事件流。因此,Storm能够实现几乎即时的处理延迟(亚秒级),而Spark Streaming则通常有几秒钟的延迟。 - **语言支持**:Storm...
在大数据处理领域,Spark作为一款高效、通用的计算框架,被广泛应用在数据分析、机器学习等多个场景。本项目涉及的核心知识点包括Spark Core、Spark SQL和Spark Streaming,同时结合了Scala和Java编程语言,以及...
《Spark高级分析数据源码》是一本专注于Spark高级分析技术的书籍,其核心内容通过源码解析来深入理解Spark在大数据处理中的工作机制。这个压缩包包含的"aas-master"文件夹,很可能是书籍实例代码的仓库,对于学习...
在本知识点中,我们将探讨Spark Streaming中的Direct Approach模式,并结合源码分析,理解如何处理和预防在使用Spark Streaming消费Kafka数据时出现的一些常见问题。 **Spark Streaming Direct Approach核心机制** ...
本文提出了一种基于Spark Streaming技术的实时数据处理系统,旨在解决实时数据处理问题,提高数据处理的实时性。 二、Spark Streaming技术与实时数据处理系统设计 Spark Streaming是Apache Spark用于实时数据流处理...
Spark 源码剖析涉及的内容广泛,包括核心组件、数据处理模型、内存管理、调度系统等多个方面。在这个主题下,我们将深入探讨以下几个关键知识点: 1. **Spark 架构**:Spark 的核心架构基于 Resilient Distributed ...
本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark Streaming 进行实时分析。 Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用的服务,它...
Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一批一批地数据进行处理。每一...
Spark 学习之路,包含 Spark Core,Spark SQL,Spark Streaming,Spark mllib 学习笔记 * [spark core学习笔记及代码 * [spark sql学习笔记及代码 * [spark streaming学习笔记及代码 Spark 消息通信 ### Spark ...
DStream由一系列RDD(Resilient Distributed Dataset,弹性分布式数据集)组成,每个RDD包含了一个时间间隔内的数据。数据在DStream中的处理方式与Spark中的RDD处理方式相似,可以应用map、reduce、join等操作。 ...