`
qindongliang1922
  • 浏览: 2184543 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117546
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125937
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59934
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71308
社区版块
存档分类
最新评论

Spark Streaming优雅的关闭策略优化

阅读更多


前面文章介绍了不少有关Spark Streaming的offset的管理以及如何优雅的关闭Spark Streaming的流程序。

到目前为止还有几个问题:


(1)有关spark streaming集成kafka时,如果kafka新增分区, 那么spark streaming程序能不能动态识别到而不用重启?


(2)如果需要重启,那么在自己管理offset时,如何才能识别到新增的分区?

(3)spark streaming优雅关闭的策略还有那些?




首先第一个问题,如果kafka要新增分区,对于正在运行的实时流程序能否动态识别到?

经过测试,是不能识别的,我推测使用createDirectStream创建流对象一旦创建就是不可变的,也就是说创建实例那一刻的分区数量,会一直使用直到流程序结束,就算中间kafka的分区数量扩展了,流程序也是不能识别到的。所以在扩展kafka分区前,一定要先把流程序给停掉,然后扩展完成后需要再次重启流程序。




然后看第二个问题,如果是我们自己管理offset时,一定要考虑到kafka扩展分区的情况,每次启动程序前都得检测下目前保存的偏移量里面的kafka的分区个数是否小于kafka实际元数据里面实际的分区个数,正常没扩展分区的情况下两个值应该是相等的,如果值不一致,就说明是kafka分区得到扩展了,所以我们的程序需要能够兼容这种情况。


核心代码如下:
        //这个topic在zk里面最新的分区数量
        val  lastest_partitions= ZkUtils.getPartitionsForTopics(zkClient,Seq(topic)).get(topic).get
        var offsets = offsetsRangesStr.split(",")//按逗号split成数组
          .map(s => s.split(":"))//按冒号拆分每个分区和偏移量
          .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }//加工成最终的格式
          .toMap//返回一个Map

        //说明有分区扩展了
        if(offsets.size<lastest_partitions.size){
          //得到旧的所有分区序号
          val old_partitions=offsets.keys.map(p=>p.partition).toArray
          //通过做差集得出来多的分区数量数组
          val add_partitions=lastest_partitions.diff(old_partitions)
          if(add_partitions.size>0){
            log.warn("发现kafka新增分区:"+add_partitions.mkString(","))
            add_partitions.foreach(partitionId=>{
              offsets += (TopicAndPartition(topic,partitionId)->0)
              log.warn("新增分区id:"+partitionId+"添加完毕....")
            })

          }

        }else{
          log.warn("没有发现新增的kafka分区:"+lastest_partitions.mkString(","))
        }




上面的代码在每次启动程序时,都会检查当前我们自己管理的offset的分区数量与zk元数据里面实际的分区数量,如果不一致就会把新增的分区id给加到TopicAndPartition里面并放入到Map对象里面,这样在启动前就会传入到createDirectStream对象中,就能兼容新增的分区了。




最后一个问题,前面的文章谈到过我们可以有两种方式来更加优雅的停止流程序,分别是通过http暴露服务,和通过HDFS做消息中转来定时扫描mark文件是否存在来触发关闭服务。

下面我们先来看下通过http暴露服务的核心代码:
  /****
    * 负责启动守护的jetty服务
    * @param port 对外暴露的端口号
    * @param ssc Stream上下文
    */
  def daemonHttpServer(port:Int,ssc: StreamingContext)={
    val server=new Server(port)
    val context = new ContextHandler();
    context.setContextPath( "/close" );
    context.setHandler( new CloseStreamHandler(ssc) )
    server.setHandler(context)
    server.start()
  }

  /*** 负责接受http请求来优雅的关闭流
    * @param ssc  Stream上下文
    */
  class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
    override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
      log.warn("开始关闭......")
      ssc.stop(true,true)//优雅的关闭
      response.setContentType("text/html; charset=utf-8");
      response.setStatus(HttpServletResponse.SC_OK);
      val out = response.getWriter();
      out.println("close success");
      baseRequest.setHandled(true);
      log.warn("关闭成功.....")
    }
  }
然后在来看下另一种方式扫描HDFS文件的方式:
  /***
    * 通过一个消息文件来定时触发是否需要关闭流程序
    * @param ssc StreamingContext
    */
  def stopByMarkFile(ssc:StreamingContext):Unit= {
    val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
    var isStop = false
    val hdfs_file_path = "/spark/streaming/stop" //判断消息文件是否存在,如果存在就
    while (!isStop) {
      isStop = ssc.awaitTerminationOrTimeout(intervalMills)
      if (!isStop && isExistsMarkFile(hdfs_file_path)) {
        log.warn("2秒后开始关闭sparstreaming程序.....")
        Thread.sleep(2000)
        ssc.stop(true, true)
      }

    }
  }

    /***
      * 判断是否存在mark file
      * @param hdfs_file_path  mark文件的路径
      * @return
      */
    def isExistsMarkFile(hdfs_file_path:String):Boolean={
      val conf = new Configuration()
      val path=new Path(hdfs_file_path)
      val fs =path.getFileSystem(conf);
      fs.exists(path)
    }



上面是两种方式的核心代码,最后提下触发停止流程序:

第一种需要在启动服务的机器上,执行下面封装的脚本:


## tx.log是提交spark任务后的输出log重定向的log 
## &> tx.log &  

 #!/bin/bash
    driver=`cat tx.log | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`

    echo $driver

    curl http://$driver:port/close/

  echo "stop finish"



第二种方式,找到一个拥有HDFS客户端机器,向HDFS上写入指定的文件:

#生成文件后,10秒后程序就会自动停止
hadoop fs -touch /spark/streaming/stop

#下次启动前,需要清空这个文件,否则程序启动后就会停止
hadoop fs -rm -r /spark/streaming/stop





所有代码,已经同步更新到我的github上,有兴趣的朋友可以参考这个链接:
https://github.com/qindongliang/streaming-offset-to-zk

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。



0
0
分享到:
评论

相关推荐

    基于Spark Streaming的大数据实时流计算平台和框架,并且是基于运行在yarn模式运行的spark streaming

    一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...

    spark Streaming和structed streaming分析

    Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...

    SparkStreaming预研报告

    Spark Streaming预研报告覆盖了Apache Spark Streaming的主要方面,包括其简介、架构、编程模型以及性能调优。以下是基于文档提供内容的详细知识点: 1. Spark Streaming简介与渊源 Spark Streaming是Spark生态中...

    kafka+spark streaming开发文档

    kafka+Spark Streaming开发文档 本文档主要讲解了使用Kafka和Spark Streaming进行实时数据处理的开发文档,涵盖了Kafka集群的搭建、Spark Streaming的配置和开发等内容。 一、Kafka集群搭建 首先,需要安装Kafka...

    sparkStreaming消费数据不丢失

    sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失

    深入理解SparkStreaming执行模型

    常见的性能瓶颈包括straggler(处理较慢的节点)和bottleneck(系统瓶颈),对于这些情况,Spark Streaming提供了一些优化手段,如通过预计算模型(pre-computed model)来减少重复计算,并且可以通过并行化操作和...

    Flume对接Spark Streaming的相关jar包

    在大数据处理领域,Flume 和 Spark Streaming 是两个重要的工具,它们分别用于数据收集与实时流处理。本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark ...

    spark streaming相关15篇论文,包含几篇硕士论文,包含几篇期刊论,有的结合自然语言处理

    这些论文可能会详细讨论如何配置和优化Spark Streaming以处理大数据量的实时输入。 在自然语言处理(NLP)方面,论文可能涉及如何利用Spark Streaming与Apache Spark的MLlib库结合,进行大规模文本分析。例如,它们...

    SparkStreaming入门案例

    Spark Streaming 入门案例 Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一...

    SparkStreaming流式日志过滤与分析

    (1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...

    spark Streaming和storm的对比

    流处理系统如Apache Spark Streaming和Apache Storm,都致力于提供高吞吐量、低延迟的数据处理能力。尽管它们的目的是类似的,但各自的设计哲学、运行模型、容错机制等方面存在着显著差异。以下将详细介绍Spark ...

    spark之sparkStreaming 理解

    ### Spark Streaming概述 #### 一、Spark Streaming定义与特点 **Spark Streaming** 是Apache Spark生态中的一个重要组件,它主要用于处理实时数据流。该模块构建在基础Spark API之上,旨在实现可扩展、高吞吐量...

    Spark Streaming 示例

    Spark Streaming 是 Apache Spark 的一个模块,它允许开发者处理实时数据流。这个强大的工具提供了一种弹性、容错性好且易于编程的模型,用于构建实时流处理应用。在这个"Spark Streaming 示例"中,我们将深入探讨...

    Spark Streaming Real-time big-data processing

    **Spark Streaming:实时大数据处理** Spark Streaming是Apache Spark框架的一部分,专为实时数据处理而设计。...结合其容错性和性能优化,Spark Streaming在各种实时场景中都展现出强大的应用潜力。

    spark streaming

    Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。作为Spark核心API的一个扩展,它延续了Spark的易用性和高效性,能够将实时数据流处理与批量数据处理无缝集成在一起。利用...

    Hadoop原理与技术Spark Streaming操作实验

    1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...

    Spark Streaming实时流处理项目实战.rar.rar

    Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...

    SparkStreaming和kafka的整合.pdf

    根据提供的文件信息,本文将详细解析“Spark Streaming与Kafka的整合”这一主题,并结合代码片段探讨其在实际场景中的应用。尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将...

    flume整合 SparkStreaming.rar

    1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf

    sparkstreaming

    ### Spark Streaming 基础概念及应用 #### Spark Streaming 概述 Spark Streaming 是 Apache Spark 生态系统中的一个重要模块,它提供了对实时流数据进行高效处理的能力。与传统的批处理不同,Spark Streaming ...

Global site tag (gtag.js) - Google Analytics