`

spark streaming 如何在 start 之后,修改闭包对象 。计算更新

阅读更多

      spark streaming 在 start 之后 ,我想改变计算规则,系统报告不能修改. 异常如下

 

Exception in thread "Thread-14" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported

at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)

at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)

at org.apache.spark.streaming.dstream.FlatMappedDStream.<init>(FlatMappedDStream.scala:29)

 

at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)

 

 

 

 

我的版本是  2.0. 但是我们的业务需求,就是要根据  用户定义的报表规则来统计报表。 经过分析, 报表的计算规则 ,可以包装成 对象, 来在 driver 中更新,用数据驱动的方式,来完成这一需求。 

       但是  spark streaming 一段  start 之后,闭包里面的 函数  和对象都是不能再修改的。 所有需要自己动手修源码。 修改后  打包 测试通过。

 

     修改如下:

spark stream  change list

 

**************************************************************************************************************

org.apache.spark.streaming.DStreamGraph

 

 

 //yunzhi.lyz  val -> var

  private var outputStreams = new ArrayBuffer[DStream[_]]()

  

  

  

  

  def start(time: Time) {

    this.synchronized {

      // yunzhi.lyz  delete

      //require(zeroTime == null, "DStream graph computation already started")

      zeroTime = time

      startTime = time

      outputStreams.foreach(_.initialize(zeroTime))

      outputStreams.foreach(_.remember(rememberDuration))

      outputStreams.foreach(_.validateAtStart)

      inputStreams.par.foreach(_.start())

    }

  }

  

  

  

  

  

    // yunzhi.lyz + def

  def clearOutputStreams()= this.synchronized{

    

     outputStreams = new ArrayBuffer[DStream[_]]()

  }

  

  

  

  **************************************************************************************************************

  org.apache.spark.streaming.StreamingContext

  

  

  

    // yunzhi.lyz +def 

  def getGraph = graph

  def getScheduler() = scheduler

  

  

  **************************************************************************************************************

  org.apache.spark.streaming.dstream.DStream

  

    // yunzhi.lyz + def 

  def getGraph() = graph

  

  

  

    private[streaming] def initialize(time: Time) {

   // yunzhi.lyz delete 

 //    if (zeroTime != null && zeroTime != time) {

//      throw new SparkException("ZeroTime is already initialized to " + zeroTime

//        + ", cannot initialize it again to " + time)

//    }

    zeroTime = time

    ......

    

    

    

    

    private def validateAtInit(): Unit = {

    

    

    

    ssc.getState() match {

      case StreamingContextState.INITIALIZED =>

        // good to go

          // yunzhil.lyz --

      case StreamingContextState.ACTIVE =>

//        throw new IllegalStateException(

//          "Adding new inputs, transformations, and output operations after " +

//            "starting a context is not supported")

      case StreamingContextState.STOPPED =>

        throw new IllegalStateException(

          "Adding new inputs, transformations, and output operations after " +

            "stopping a context is not supported")

    }

    

    

  }

  

  

  

    /**

   * Get the RDD corresponding to the given time; either retrieve it from cache

   * or compute-and-cache it.

   */

  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {

    // If RDD was already generated, then retrieve it from HashMap,

    // or else compute the RDD

    generatedRDDs.get(time).orElse {

      // Compute the RDD if time is valid (e.g. correct time in a sliding window)

      // of RDD generation, else generate nothing.

      

      // yunzhi.lyz --

  //    if (isTimeValid(time)) {

 

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {

          // Disable checks for existing output directories in jobs launched by the streaming

          // scheduler, since we may need to write output to an existing directory during checkpoint

          // recovery; see SPARK-4835 for more details. We need to have this call here because

          // compute() might cause Spark jobs to be launched.

          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

            compute(time)

          }

        }

 

        rddOption.foreach { case newRDD =>

          // Register the generated RDD for caching and checkpointing

          if (storageLevel != StorageLevel.NONE) {

            newRDD.persist(storageLevel)

            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")

          }

          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {

            newRDD.checkpoint()

            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")

          }

          generatedRDDs.put(time, newRDD)

        }

        rddOption

        // yunzhi.lyz --

//      } else {

//        None

//      }

    }

  }

  

  

  

  

    private[streaming] def generateJob(time: Time): Option[Job] = {

    getOrCompute(time) match {

      case Some(rdd) => {

        val jobFunc = () => {

          val emptyFunc = { (iterator: Iterator[T]) => {} }

          context.sparkContext.runJob(rdd, emptyFunc)

        }

        Some(new Job(time, jobFunc))

      }

      // yunzhi.lyz --

     // case None => None

    }

  }

  

 



private[streaming] def clearMetadata(time: Time) {

    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)

    // yunzhi.lyz change 

    val oldRDDs = generatedRDDs

    .filter(_._1 != null)

    .filter(_._2 != null)

    .filter(_._1 <= (time - rememberDuration))

    //val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))

    

    logDebug("Clearing references to old RDDs: [


 

   **************************************************************************************************************

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl

 

   /** Report error to the receiver tracker */

  def reportError(message: String, error: Throwable) {

  // yunzhi.lyz --

   //  val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")

  //  trackerEndpoint.send(ReportError(streamId, message, errorString))

    logWarning("Reported error " + message + " - " + error)

  }

  

  

  

  

    override protected def onReceiverStop(message: String, error: Option[Throwable]) {

    logInfo("Deregistering receiver " + streamId)

    // yunzhi.lyz --

     //val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")

    trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, ""))

    logInfo("Stopped receiver " + streamId)

  }

  

   **************************************************************************************************************

 

 org.apache.spark.streaming.scheduler.JobGenerator

 

   // yunzhi.lyz ++

  private var nextTime = -1L

  def getTimer() = timer

  

  

  

  

  

  

    // yunzhi.lyz  -- private

  def restart() {

    // If manual clock is being used for testing, then

    // either set the manual clock to the last checkpointed time,

    // or if the property is defined set it to that time

    if (clock.isInstanceOf[ManualClock]) {

      val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds

      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)

      clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)

    }

    

**************************************************************************************************************

     

     org.apache.spark.streaming.scheduler.JobScheduler

      // yunzhi.lyz +

  def getjobGenerator() = jobGenerator

  

  

  

**************************************************************************************************************

  org.apache.spark.streaming.util.RecurringTimer

    

    

 //yunzhi.lyz change  val -> var

  private var thread = new Thread("RecurringTimer - " + name) {

    setDaemon(true)

    override def run() { loop }

  }

 

 

 

 

   

 // yunzhi.lyz  ++ def 

  

  def getReStartTime(t:Int): Long = {

    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period

  }

 

  def getNextTime() :Long= {nextTime}

  

  

  

  

  

  

  // yunzhi.lyz ++ def

  def resume(startTime:Long):Long = synchronized{

    

    nextTime = startTime

    thread.resume()

   logInfo("ReStarted timer for " + name + " at time " + nextTime)

    nextTime

   

    

  }

  

  

  

    def stop(interruptTimer: Boolean): Long = synchronized {

    if (!stopped) {

      stopped = true

      if (interruptTimer) {

        thread.interrupt()

      }

      // yunzhi.lyz

      //thread.join()

      thread.suspend()

      logInfo("Stopped timer for " + name + " after time " + prevTime)

    }

    //prevTime

    nextTime

  }

  

 

 

 

 

    修改之后 ,重新编译 打包。

 

    测试代码

       

 

 def exchangeworkbatch(tp: String, st: String) {

  if (tp.equals("+")) workbatch += (st)
  if (tp.equals("-")) workbatch += (st)
}

cnt = cnt + 1
val cntn = cnt
val wbArray = workbatch.toArray
workbatch.foreach(x => println("*************" + x))



TTObject.ssc.getGraph.clearOutputStreams

wordCounts5 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x))
  .map(x => ("wordCounts4** " + cntn + " **" + x, 1))
  .reduceByKey(_ + _)
wordCounts5.print


wordCounts3 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x))
  .map(x => ("wordCounts3** " + cntn + " **" + x, 1))
  .reduceByKey(_ + _)
wordCounts3.print
TTObject.ssc.getGraph.start(new Time(TTObject.ssc.getScheduler.getjobGenerator.getTimer.getNextTime))

  

 

 

 

测试代码:

 

 package scalasrc

import org.apache.spark.streaming.ttstream.TTUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import scala.collection._

object TTWordCountFreshRule {
  val splitTag = new mutable.ArrayBuffer[String]
  def main(args: Array[String]) {
    val tagName = ""
    val subId = ""
    val accessKey = ""
    val sparkConf = new SparkConf().setAppName("TTWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    //ssc.checkpoint("checkpoint")
    val numStreams = 3
    var offSet = 1478448000l
    if (offSet == 0) offSet = System.currentTimeMillis() / 1000
    val ttStreams = (1 to numStreams).map { i => TTUtils.createStream(ssc, tagName, subId, accessKey, offSet, i - 1).map(_._2) }
    val lines = ssc.union(ttStreams).filter(x => x != null)
    splitTag += "dd"
    val splitTagarray = splitTag.toArray
    val wordCounts5 = lines
      .flatMap(x => splitString(x, splitTagarray))
      .map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts5.count().print
    wordCounts5.print
    runJObThread
    ssc.start()
    ssc.awaitTermination()

    def runJObThread() {
      new Thread() {
        override def run {
          while (true) {
            try {
              println("runJObThread begin.....")
              Thread.sleep(20000)
              splitTag += "cc"
              // 重复提交
              var cnt = 0
              splitTag.foreach(x => {
                println(s" cnt: $cnt  ,value : $x ");
                cnt += 1
              })
              ssc.getGraph.clearOutputStreams
              val splitTagarray = splitTag.toArray
              val wordCounts5 = lines.flatMap(x => splitString(x, splitTagarray)).map(x => (x, 1)).reduceByKey(_ + _)
              wordCounts5.count().print
              wordCounts5.print
              ssc.getGraph.start(new Time(ssc.getScheduler.getjobGenerator.getTimer.getNextTime))
            } catch {
              case ex: Exception => println(ex.getMessage)
            }
          }
        }
      }.start
    }
  }


  def splitString(st: String, splitTagarray: Array[String]) = {
    var splitArray = new mutable.ArrayBuffer[String]
    splitTagarray.foreach(x => splitArray ++= st.split(x))
    splitArray.toArray
  }

}


 

如果有定制化报表,不停止 streaming 作业, 只需要在  Thread 里面 去 读存储,数据驱动,就可以完成计算规则动态化的需求。

 

问题,发现这么改 checkpoint ,有状态的计算 有问题。 

************************************************************************************

现在想想,不用改的那么蛋疼,在 worker 里面 引用个 object , 里面起个线程,去动态在库中刷配置就好了。这个唯一可能导致的问题, 各个 worker 获取 元数据时间不一致, 导致 准确性存在问题。 不过如果是 刚建立的报表  也有一段调试期的,不影响业务。

 

虽然 spark 的 map ,reduce 的算子里面 没有 setup ,但是 work 上可以 调  object 静态方法,  object 可以起进程。 map ,reduce 等算子中的函数,可以是object 中的函数, 在work 上 第一次 调用 object 的函数时候。会执行 除方法以外的所有语句。   

 

package streaming

import scala.collection._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, _}


object workerObjectGetMeta {
  def main(args: Array[String])
  {
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc  = new StreamingContext(sparkConf, Seconds(10))
    val lines = ssc.socketTextStream("192.168.0.140", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(x => CalUtils.splitS(x))
    val wordCounts = words.map(x => (x, 1))
    wordCounts.reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

object CalUtils{
  val splitArray = new mutable.ArrayBuffer[String]
  splitArray += "aa"
// new Thread() {}.start()  作为语句, 线程 在object 第一次被调用的时候自动起来
new Thread() {
    override def run {
      while (true) {
        try {
          println("runJObThread begin.....")
          Thread.sleep(5000)
          splitArray += "cc"
// 重复提交
var cnt = 0
splitArray.foreach(x => {
            println(s" cnt: $cnt  ,value : $x ");
cnt += 1
})
        }
      }
    }
  }.start()

  def splitS(s:String):Array[String] = {
    val rArray = new mutable.ArrayBuffer[String]
    splitArray.foreach( x => {
      rArray ++= s.split(x)
    })
    println(s"splitArray: ${splitArray.mkString(",")}  ")
    println(s"aArray: ${rArray.mkString(",")}  ")
    rArray.toArray
  }
}

 

 

分享到:
评论

相关推荐

    基于Spark Streaming的大数据实时流计算平台和框架,并且是基于运行在yarn模式运行的spark streaming

    一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming

    spark Streaming和structed streaming分析

    Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...

    SparkStreaming预研报告

    Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。以下是基于文档提供内容的详细知识点: 1. Spark Streaming简介与渊源 Spark Streaming是Spark生态中...

    kafka+spark streaming开发文档

    在将Kafka和Spark Streaming集成时,需要将Kafka的主题创建在Spark Streaming中,并且需要配置Spark Streaming的输入流,以便从Kafka中消费数据。 四、主题创建和消息发送 在Kafka中,需要创建主题,并且需要使用...

    Spark Streaming 示例

    通过结合 Spark Streaming 和 Kafka,你可以构建一个强大的实时数据处理系统,从 Kafka 消费数据,进行复杂的计算,并将结果输出到其他系统。这个示例将帮助你理解这两个工具如何协同工作,为实时数据分析和流处理...

    Flume对接Spark Streaming的相关jar包

    在大数据处理领域,Flume 和 Spark Streaming 是两个重要的工具,它们分别用于数据收集与实时流处理。本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark ...

    sparkStreaming消费数据不丢失

    sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失

    SparkStreaming入门案例

    Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一批一批地数据进行处理。每一...

    spark之sparkStreaming 理解

    - **血统追踪**:记录RDD的更新序列,使得Spark能够在发生故障时重新计算丢失的数据。 综上所述,Spark Streaming不仅具备强大的实时数据处理能力,还能够通过与其他Spark模块的集成,提供更为丰富的数据处理功能。...

    深入理解SparkStreaming执行模型

    Spark Streaming是Apache Spark的重要组成部分,它提供了一种高吞吐量、可容错的实时数据处理方式。Spark Streaming的核心是一个执行模型,这个执行模型基于微批处理(micro-batch processing)的概念,允许将实时数据...

    spark streaming

    Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。作为Spark核心API的一个扩展,它延续了Spark的易用性和高效性,能够将实时数据流处理与批量数据处理无缝集成在一起。利用...

    spark Streaming和storm的对比

    Spark Streaming利用了Spark强大的批处理能力,并将流式计算转换成一系列的批处理作业。这些批处理作业是连续进行的,从而实现了近似实时的流处理效果。Spark Streaming的核心概念是DStream(Discretized Stream),...

    SparkStreaming流式日志过滤与分析

    (1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...

    基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码

    基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...

    Hadoop原理与技术Spark Streaming操作实验

    1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...

    Spark Streaming Real-time big-data processing

    Spark Streaming能够以微批处理的方式处理数据流,将实时数据流分解成小的时间窗口(称为DStream,Discretized Stream),然后应用Spark的计算模型进行处理。 **DStream:离散化数据流** DStream是Spark Streaming...

    SparkStreaming和kafka的整合.pdf

    根据提供的文件信息,本文将详细解析“Spark Streaming与Kafka的整合”这一主题,并结合代码片段探讨其在实际场景中的应用。尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将...

    spark streaming相关15篇论文,包含几篇硕士论文,包含几篇期刊论,有的结合自然语言处理

    本资源集合包含了15篇与Spark Streaming相关的学术论文,其中涵盖了几篇硕士论文和期刊文章,这些论文深入探讨了Spark Streaming在实际应用中的各种场景和挑战。 首先,Spark Streaming 提供了一个可扩展且容错的...

    Spark Streaming实时流处理项目实战.rar.rar

    在Spark Streaming中,数据流被划分为一系列的小批量(称为DStreams,即Discretized Streams),这些小批量数据在Spark的弹性分布式数据集(RDDs)上进行处理。RDDs是Spark的基础抽象,它们是不可变的、分区的数据...

    Spark Streaming

    Spark Streaming是Apache Spark的一个扩展,用于处理实时数据流。它允许用户以分布式方式处理实时数据流,并将其与批处理和交互式查询相结合。Spark Streaming支持从多种数据源接收数据流,如Kafka、Flume、Kinesis...

Global site tag (gtag.js) - Google Analytics