`
侯上校
  • 浏览: 223489 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

SparkStreaming定时器生成Job

 
阅读更多
/**
 * An interface to represent clocks, so that they can be mocked out in unit tests.
 */
private[spark] trait Clock {
  def getTimeMillis(): Long
  def waitTillTime(targetTime: Long): Long
}

/**
 * A clock backed by the actual time from the OS as reported by the `System` API.
 */
private[spark] class SystemClock extends Clock {

  val minPollTime = 25L

  /**
   * @return the same time (milliseconds since the epoch)
   *         as is reported by `System.currentTimeMillis()`
   */
  def getTimeMillis(): Long = System.currentTimeMillis()

  /**
   * @param targetTime block until the current time is at least this value
   * @return current system time when wait has completed
   */
  def waitTillTime(targetTime: Long): Long = {
    var currentTime = 0L
    currentTime = System.currentTimeMillis()

    var waitTime = targetTime - currentTime
    if (waitTime <= 0) {
      return currentTime
    }

    val pollTime = math.max(waitTime / 10.0, minPollTime).toLong

    while (true) {
      currentTime = System.currentTimeMillis()
      waitTime = targetTime - currentTime
      if (waitTime <= 0) {
        return currentTime
      }
      val sleepTime = math.min(waitTime, pollTime)
      Thread.sleep(sleepTime)
    }
    -1
  }
}


private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
  extends Logging {

  private val thread = new Thread("RecurringTimer - " + name) {
//    setDaemon(true)
    override def run() { loop }
  }

  @volatile private var prevTime = -1L
  @volatile private var nextTime = -1L
  @volatile private var stopped = false

  /**
   * Get the time when this timer will fire if it is started right now.
   * The time will be a multiple of this timer's period and more than
   * current system time.
   */
  def getStartTime(): Long = {
    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
  }

  /**
   * Get the time when the timer will fire if it is restarted right now.
   * This time depends on when the timer was started the first time, and was stopped
   * for whatever reason. The time must be a multiple of this timer's period and
   * more than current time.
   */
  def getRestartTime(originalStartTime: Long): Long = {
    val gap = clock.getTimeMillis() - originalStartTime
    (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
  }

  /**
   * Start at the given start time.
   */
  def start(startTime: Long): Long = synchronized {
    nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }

  /**
   * Start at the earliest time it can start based on the period.
   */
  def start(): Long = {
    start(getStartTime())
  }

  /**
   * Stop the timer, and return the last time the callback was made.
   *
   * @param interruptTimer True will interrupt the callback if it is in progress (not guaranteed to
   *                       give correct time in this case). False guarantees that there will be at
   *                       least one callback after `stop` has been called.
   */
  def stop(interruptTimer: Boolean): Long = synchronized {
    if (!stopped) {
      stopped = true
      if (interruptTimer) {
        thread.interrupt()
      }
      thread.join()
      logInfo("Stopped timer for " + name + " after time " + prevTime)
    }
    prevTime
  }

  private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
    callback(nextTime)
    prevTime = nextTime
    nextTime += period
    logInfo("Callback for " + name + " called at time " + prevTime)
  }

  /**
   * Repeatedly call the callback every interval.
   */
  private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
}

private[streaming]
object RecurringTimer extends Logging {

  def main(args: Array[String]) {
    var lastRecurTime = 0L
    val period = 2000

    def onRecur(time: Long) {
      val currentTime = System.currentTimeMillis()
      logInfo("" + currentTime + ": " + (currentTime - lastRecurTime))
      lastRecurTime = currentTime
    }
    val timer = new  RecurringTimer(new SystemClock(), period, onRecur, "Test")
    timer.start()
//    Thread.sleep(30 * 1000)
//    timer.stop(true)
  }
}

 

分享到:
评论

相关推荐

    Spark Streaming 与 Kafka 集成原理.pdf

    Spark Streaming 与 Kafka 集成原理 Spark Streaming 与 Kafka 集成原理是指 Spark Streaming 框架与 Kafka 消息队列系统的集成,旨在实现高效、实时的数据处理和分析。该集成原理主要包括两种方案:Receiver-based...

    单片机定时器代码生成器

    这个东东能很方便的生成单片机定时器模块的代码,省去了大量的计算,很方便

    job定时器操作存储过程

    在IT领域,特别是数据库管理与自动化任务调度方面,Oracle的JOB定时器机制提供了一种高效且灵活的方式来执行定期任务,如备份、数据清理或报告生成等。本文将深入解析如何利用Oracle JOB定时器来操作存储过程,包括...

    spwm.rar_SPWM 产生_SPWM产生_spwm 定时器_单片机 spwm_定时器生成SPWM

    通常,我们会选择两个定时器:一个作为主定时器,负责生成基本的PWM时钟;另一个作为从定时器,根据正弦表或查表法计算出当前脉冲的宽度,并触发中断来改变输出状态。 #### 主定时器 主定时器设置为连续计数模式,...

    Generating PWM and PFM by Using FlexIO_通过IO口和定时器生成任意个数的PWM_

    本教程将详细讲解如何使用FlexIO(Flexible Input/Output)接口和定时器在单片机平台上生成任意个数的PWM和PFM信号。 1. PWM与PFM概述: - PWM是一种通过调整脉冲宽度来改变信号平均电压的技术。其工作原理是通过...

    c51定时器C程序自动生成软件

    C51定时器C程序自动生成软件是一款专为C51编程设计的工具,它能够帮助程序员快速生成针对8051系列微控制器的定时器0和定时器1操作的C语言代码,同时包含了中断处理的相关功能。8051微控制器是广泛应用于嵌入式系统的...

    555定时器生成丢失脉冲探测、压控振荡、PWM等波形的电路自动生成软件.zip

    本文档的主要内容详细介绍的是555定时器电路设计软件V1.2免费下载。 555定时器电路图设计软件,一个小巧的电路设计工具,它列出了555电路可实现的十几种应用电路单元,如丢失脉冲探测器、长延时定时器、压控振荡...

    SparkStreaming_HBase:将从Kafka收集过来的数据保存到HBase中

    SparkStreaming_HBase将从Kafka收集过来的数据保存到HBase中数据来源:日志生成器。 编写一个python工程,用于产生行为日志,每运行一次,产生所设定的数量数据,使用Linux的定时器,每隔60s执行一次,行为日志保存...

    Cube生成定时器2触发双ADC同步采集并用DMA传输

    用cube生成一个用定时器触发ADC1,ADC2同步采集的程序,单片机选择的是STM32L476RGT6,用定时器2进行ADC采集触发,更改定时器2的定时周期便可以更改ADC的采样周期,ADC1和ADC2使用同步规则模式,并用DMA进行数据的...

    Quartz定时器,表达式自动生成工具

    总的来说,"Quartz定时器,表达式自动生成工具"是Quartz定时器的有益补充,它降低了学习和使用Quartz的门槛,使得开发者可以更专注于业务逻辑,而不是繁琐的定时表达式设置。通过下载提供的"Quartz定时表达式自动...

    dsp定时器/计数器原理及举例

    dsp定时器/计数器原理是指dsp(Digital Signal Processor,数字信号处理器)中的一种计时机制,它可以生成各种时钟信号,实现dsp系统的时钟控制。dsp定时器/计数器原理主要包括定时器结构、时钟发生器、PLL(Phase-...

    51定时器计算工具

    6. **代码生成**:软件能够自动生成51单片机汇编语言或C语言的定时器初始化代码,方便直接应用到项目中,节省编程时间。 7. **用户友好界面**:51定时器计算工具通常具备直观的图形用户界面,使得用户无需深入理解...

    STM32 定时器级联

    在编程实现过程中,应使用STM32CubeMX进行初始化配置,生成对应的HAL或LL库代码。然后在应用程序中编写相应的中断服务程序或轮询函数,处理定时器的事件和状态。 总结来说,STM32定时器级联是一种强大的功能,能够...

    辅助软件 定时器计算器 辅助软件 定时器计算器

    辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 定时器计算器辅助软件 ...

    定时器0和定时器1 ARM PROTEUS

    在通信协议中,它们可以用来生成精确的时序信号。 总结来说,ARM处理器的定时器0和定时器1是极其灵活的硬件资源,通过合理的配置和编程,可以满足各种复杂的定时需求。在PROTEUS仿真环境中,开发者可以直观地看到...

    【STM32】HAL库-使用一个定时器使能另一个定时器-主定时器触发模式demo

    如果先初始化从模式定时器3,定时器3工作在触发模式下,TRGI信号来源为定时器2的TRGO,而定时器2还未初始化,故TRGO信号来源默认是UG位,故在定时器2初始化过程中会置位UG位,从而触发定时器3的从模式触发模式,导致...

    DSP28335定时器程序

    【标题】"DSP28335定时器程序"涉及的是TI公司生产的TMS320F28335数字信号处理器(DSP)中的定时器功能,特别是定时器0的编程应用。这款处理器广泛应用于实时控制、信号处理以及嵌入式系统等领域,其内置的定时器是...

    通过STM32F103单片机高级定时器TIM1,同时生成频率、占空比。可调的4路PWM.

    总的来说,STM32F103的高级定时器TIM1结合库函数,可以轻松实现4路PWM信号的生成,并且频率和占空比均可灵活调整。这种功能对于许多应用,如电机速度控制、LED亮度调节、电源电压控制等,都是非常实用的。通过深入...

    通过STM32F103单片机高级定时器TIM8,同时生成频率、占空比。可调的4路PWM.

    在本项目中,我们将聚焦于利用其高级定时器TIM8来同时生成四路可调频率和占空比的PWM(脉宽调制)信号。 高级定时器TIM8是STM32F103单片机中的一个重要组件,它提供了一种灵活的方法来产生精确的时间间隔,特别适合...

    MFC定时器的使用 MFC定时器的使用

    在Windows编程环境中,MFC(Microsoft Foundation Classes)库提供了一种方便的方式来实现应用程序中的定时功能,这就是MFC定时器。MFC定时器主要用于在特定时间间隔后触发一个事件,这在许多应用场景中都非常有用,...

Global site tag (gtag.js) - Google Analytics