spark streaming 在 start 之后 ,我想改变计算规则,系统报告不能修改. 异常如下
Exception in thread "Thread-14" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)
at org.apache.spark.streaming.dstream.FlatMappedDStream.<init>(FlatMappedDStream.scala:29)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
我的版本是 2.0. 但是我们的业务需求,就是要根据 用户定义的报表规则来统计报表。 经过分析, 报表的计算规则 ,可以包装成 对象, 来在 driver 中更新,用数据驱动的方式,来完成这一需求。
但是 spark streaming 一段 start 之后,闭包里面的 函数 和对象都是不能再修改的。 所有需要自己动手修源码。 修改后 打包 测试通过。
修改如下:
**************************************************************************************************************
org.apache.spark.streaming.DStreamGraph
//yunzhi.lyz val -> var
private var outputStreams = new ArrayBuffer[DStream[_]]()
def start(time: Time) {
this.synchronized {
// yunzhi.lyz delete
//require(zeroTime == null, "DStream graph computation already started")
zeroTime = time
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart)
inputStreams.par.foreach(_.start())
}
}
// yunzhi.lyz + def
def clearOutputStreams()= this.synchronized{
outputStreams = new ArrayBuffer[DStream[_]]()
}
**************************************************************************************************************
org.apache.spark.streaming.StreamingContext
// yunzhi.lyz +def
def getGraph = graph
def getScheduler() = scheduler
**************************************************************************************************************
org.apache.spark.streaming.dstream.DStream
// yunzhi.lyz + def
def getGraph() = graph
private[streaming] def initialize(time: Time) {
// yunzhi.lyz delete
// if (zeroTime != null && zeroTime != time) {
// throw new SparkException("ZeroTime is already initialized to " + zeroTime
// + ", cannot initialize it again to " + time)
// }
zeroTime = time
......
private def validateAtInit(): Unit = {
ssc.getState() match {
case StreamingContextState.INITIALIZED =>
// good to go
// yunzhil.lyz --
case StreamingContextState.ACTIVE =>
// throw new IllegalStateException(
// "Adding new inputs, transformations, and output operations after " +
// "starting a context is not supported")
case StreamingContextState.STOPPED =>
throw new IllegalStateException(
"Adding new inputs, transformations, and output operations after " +
"stopping a context is not supported")
}
}
/**
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
// yunzhi.lyz --
// if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
// yunzhi.lyz --
// } else {
// None
// }
}
}
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
// yunzhi.lyz --
// case None => None
}
}
private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
// yunzhi.lyz change
val oldRDDs = generatedRDDs
.filter(_._1 != null)
.filter(_._2 != null)
.filter(_._1 <= (time - rememberDuration))
//val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [
**************************************************************************************************************
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl
/** Report error to the receiver tracker */
def reportError(message: String, error: Throwable) {
// yunzhi.lyz --
// val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
// trackerEndpoint.send(ReportError(streamId, message, errorString))
logWarning("Reported error " + message + " - " + error)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
// yunzhi.lyz --
//val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, ""))
logInfo("Stopped receiver " + streamId)
}
**************************************************************************************************************
org.apache.spark.streaming.scheduler.JobGenerator
// yunzhi.lyz ++
private var nextTime = -1L
def getTimer() = timer
// yunzhi.lyz -- private
def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
**************************************************************************************************************
org.apache.spark.streaming.scheduler.JobScheduler
// yunzhi.lyz +
def getjobGenerator() = jobGenerator
**************************************************************************************************************
org.apache.spark.streaming.util.RecurringTimer
//yunzhi.lyz change val -> var
private var thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
override def run() { loop }
}
// yunzhi.lyz ++ def
def getReStartTime(t:Int): Long = {
(math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}
def getNextTime() :Long= {nextTime}
// yunzhi.lyz ++ def
def resume(startTime:Long):Long = synchronized{
nextTime = startTime
thread.resume()
logInfo("ReStarted timer for " + name + " at time " + nextTime)
nextTime
}
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
stopped = true
if (interruptTimer) {
thread.interrupt()
}
// yunzhi.lyz
//thread.join()
thread.suspend()
logInfo("Stopped timer for " + name + " after time " + prevTime)
}
//prevTime
nextTime
}
修改之后 ,重新编译 打包。
测试代码
def exchangeworkbatch(tp: String, st: String) {
if (tp.equals("+")) workbatch += (st) if (tp.equals("-")) workbatch += (st) } cnt = cnt + 1 val cntn = cnt val wbArray = workbatch.toArray workbatch.foreach(x => println("*************" + x)) TTObject.ssc.getGraph.clearOutputStreams wordCounts5 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x)) .map(x => ("wordCounts4** " + cntn + " **" + x, 1)) .reduceByKey(_ + _) wordCounts5.print wordCounts3 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x)) .map(x => ("wordCounts3** " + cntn + " **" + x, 1)) .reduceByKey(_ + _) wordCounts3.print TTObject.ssc.getGraph.start(new Time(TTObject.ssc.getScheduler.getjobGenerator.getTimer.getNextTime))
测试代码:
package scalasrc
import org.apache.spark.streaming.ttstream.TTUtils import org.apache.spark.SparkConf import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import scala.collection._ object TTWordCountFreshRule { val splitTag = new mutable.ArrayBuffer[String] def main(args: Array[String]) { val tagName = "" val subId = "" val accessKey = "" val sparkConf = new SparkConf().setAppName("TTWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //ssc.checkpoint("checkpoint") val numStreams = 3 var offSet = 1478448000l if (offSet == 0) offSet = System.currentTimeMillis() / 1000 val ttStreams = (1 to numStreams).map { i => TTUtils.createStream(ssc, tagName, subId, accessKey, offSet, i - 1).map(_._2) } val lines = ssc.union(ttStreams).filter(x => x != null) splitTag += "dd" val splitTagarray = splitTag.toArray val wordCounts5 = lines .flatMap(x => splitString(x, splitTagarray)) .map(x => (x, 1)).reduceByKey(_ + _) wordCounts5.count().print wordCounts5.print runJObThread ssc.start() ssc.awaitTermination() def runJObThread() { new Thread() { override def run { while (true) { try { println("runJObThread begin.....") Thread.sleep(20000) splitTag += "cc" // 重复提交 var cnt = 0 splitTag.foreach(x => { println(s" cnt: $cnt ,value : $x "); cnt += 1 }) ssc.getGraph.clearOutputStreams val splitTagarray = splitTag.toArray val wordCounts5 = lines.flatMap(x => splitString(x, splitTagarray)).map(x => (x, 1)).reduceByKey(_ + _) wordCounts5.count().print wordCounts5.print ssc.getGraph.start(new Time(ssc.getScheduler.getjobGenerator.getTimer.getNextTime)) } catch { case ex: Exception => println(ex.getMessage) } } } }.start } } def splitString(st: String, splitTagarray: Array[String]) = { var splitArray = new mutable.ArrayBuffer[String] splitTagarray.foreach(x => splitArray ++= st.split(x)) splitArray.toArray } }
如果有定制化报表,不停止 streaming 作业, 只需要在 Thread 里面 去 读存储,数据驱动,就可以完成计算规则动态化的需求。
问题,发现这么改 checkpoint ,有状态的计算 有问题。
************************************************************************************
现在想想,不用改的那么蛋疼,在 worker 里面 引用个 object , 里面起个线程,去动态在库中刷配置就好了。这个唯一可能导致的问题, 各个 worker 获取 元数据时间不一致, 导致 准确性存在问题。 不过如果是 刚建立的报表 也有一段调试期的,不影响业务。
虽然 spark 的 map ,reduce 的算子里面 没有 setup ,但是 work 上可以 调 object 静态方法, object 可以起进程。 map ,reduce 等算子中的函数,可以是object 中的函数, 在work 上 第一次 调用 object 的函数时候。会执行 除方法以外的所有语句。
package streaming import scala.collection._ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext, _} object workerObjectGetMeta { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10)) val lines = ssc.socketTextStream("192.168.0.140", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(x => CalUtils.splitS(x)) val wordCounts = words.map(x => (x, 1)) wordCounts.reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } object CalUtils{ val splitArray = new mutable.ArrayBuffer[String] splitArray += "aa" // new Thread() {}.start() 作为语句, 线程 在object 第一次被调用的时候自动起来 new Thread() { override def run { while (true) { try { println("runJObThread begin.....") Thread.sleep(5000) splitArray += "cc" // 重复提交 var cnt = 0 splitArray.foreach(x => { println(s" cnt: $cnt ,value : $x "); cnt += 1 }) } } } }.start() def splitS(s:String):Array[String] = { val rArray = new mutable.ArrayBuffer[String] splitArray.foreach( x => { rArray ++= s.split(x) }) println(s"splitArray: ${splitArray.mkString(",")} ") println(s"aArray: ${rArray.mkString(",")} ") rArray.toArray } }
相关推荐
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming
Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...
Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。以下是基于文档提供内容的详细知识点: 1. Spark Streaming简介与渊源 Spark Streaming是Spark生态中...
在将Kafka和Spark Streaming集成时,需要将Kafka的主题创建在Spark Streaming中,并且需要配置Spark Streaming的输入流,以便从Kafka中消费数据。 四、主题创建和消息发送 在Kafka中,需要创建主题,并且需要使用...
通过结合 Spark Streaming 和 Kafka,你可以构建一个强大的实时数据处理系统,从 Kafka 消费数据,进行复杂的计算,并将结果输出到其他系统。这个示例将帮助你理解这两个工具如何协同工作,为实时数据分析和流处理...
在大数据处理领域,Flume 和 Spark Streaming 是两个重要的工具,它们分别用于数据收集与实时流处理。本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark ...
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一批一批地数据进行处理。每一...
- **血统追踪**:记录RDD的更新序列,使得Spark能够在发生故障时重新计算丢失的数据。 综上所述,Spark Streaming不仅具备强大的实时数据处理能力,还能够通过与其他Spark模块的集成,提供更为丰富的数据处理功能。...
Spark Streaming是Apache Spark的重要组成部分,它提供了一种高吞吐量、可容错的实时数据处理方式。Spark Streaming的核心是一个执行模型,这个执行模型基于微批处理(micro-batch processing)的概念,允许将实时数据...
Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。作为Spark核心API的一个扩展,它延续了Spark的易用性和高效性,能够将实时数据流处理与批量数据处理无缝集成在一起。利用...
Spark Streaming利用了Spark强大的批处理能力,并将流式计算转换成一系列的批处理作业。这些批处理作业是连续进行的,从而实现了近似实时的流处理效果。Spark Streaming的核心概念是DStream(Discretized Stream),...
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...
Spark Streaming能够以微批处理的方式处理数据流,将实时数据流分解成小的时间窗口(称为DStream,Discretized Stream),然后应用Spark的计算模型进行处理。 **DStream:离散化数据流** DStream是Spark Streaming...
根据提供的文件信息,本文将详细解析“Spark Streaming与Kafka的整合”这一主题,并结合代码片段探讨其在实际场景中的应用。尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将...
本资源集合包含了15篇与Spark Streaming相关的学术论文,其中涵盖了几篇硕士论文和期刊文章,这些论文深入探讨了Spark Streaming在实际应用中的各种场景和挑战。 首先,Spark Streaming 提供了一个可扩展且容错的...
在Spark Streaming中,数据流被划分为一系列的小批量(称为DStreams,即Discretized Streams),这些小批量数据在Spark的弹性分布式数据集(RDDs)上进行处理。RDDs是Spark的基础抽象,它们是不可变的、分区的数据...
Spark Streaming是Apache Spark的一个扩展,用于处理实时数据流。它允许用户以分布式方式处理实时数据流,并将其与批处理和交互式查询相结合。Spark Streaming支持从多种数据源接收数据流,如Kafka、Flume、Kinesis...