`

kakfa offset

 
阅读更多
package cn.analysys.stream.state

import java.nio.ByteBuffer

import cn.analysys.meta.MetaMapInfo
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import cn.analysys.third.hllc.HLLCounter
import cn.analysys.utils.{BridgeUtils, ParseUtils}
import org.apache.spark.streaming.State
import org.apache.spark.streaming.dstream.DStream


object StateProcess {
  val M = 10
  val TOPN = 20
  val MINUTECOMBINE = 300000
  val TIMEOUTMINUTE = 1440
  val OUTPUT = 100
  val funcLongAdd = (key: String, tempId: Option[Long], state: State[Long]) => {
    val sum = state.getOption.getOrElse(0L) + tempId.getOrElse(1l)
    val hllc: HLLCounter = null
    val output = (key, (sum, hllc))
    state.update(sum)
    output
  }

  def isNewKey(key: String, hllc: HLLCounter): Boolean = {
    var isNew = false
    val begin = hllc.getCountEstimate
    hllc.add(key)
    val end = hllc.getCountEstimate
    if (end > begin) isNew = true
    isNew
  }


  val appKeyDeviceIdessionIDDistinct = (key: Int, loadPage: Option[LoadPageClassType], state: State[HLLCounter]) => {
    val hllc = state.getOption().getOrElse(new HLLCounter(M))
    var page: LoadPageClassType = null
    val begin = hllc.getCountEstimate
    hllc.add(key)
    val end = hllc.getCountEstimate
    if (end > begin) page = loadPage.getOrElse(null)
    val output = page
    state.update(hllc)
    output
  }

  // 为了减少后面的处理数据,把出现过的基数设置为Null。
  val funcHllcAdd = (key: String, tempId: Option[String], state: State[HLLCounter]) => {
    val hllc = state.getOption().getOrElse(new HLLCounter(M))
    val begin = hllc.getCountEstimate
    hllc.add(tempId.getOrElse(""))
    val end = hllc.getCountEstimate
    val output = (if (end > begin) key else "", (hllc.getCountEstimate, hllc))
    state.update(hllc)
    output
  }

  //所有 KV 返回的都是 一个 String ,一个 Long 或者 hllc
  //约定:对于输出的Key ,根据Key 的MetaMapInfo.SEG 区分如果是两段,就存K:V 结构,如果是三段,就存 K:K:V结构
  //KVType : MetaMapInfo.KEYTYPEKV ,MetaMapInfo.KEYTYPEKKV
  //考虑 redis 的压力。 1、两段式阶段的会根据 KEY 或者最大值。写入存储
  //                  2,三段式,会根据 第三个字段排序,获取前20.并且把所有的维度的值得总和,记录一下。写入库里面
  def longDstreamSortAndOutPut(resultDstreamOrg: DStream[(String, (Long, HLLCounter))], KVType: String,
                               isHLLC: Boolean, sorted: Boolean = false) = {
    //debug
    //    if(isHLLC)
    //      resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1}   hllc value ${x._2._2.getCountEstimate } ")))
    //    else resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1}  long   value ${x._2._1}")))
    val resultDstream =  resultDstreamOrg.filter(result =>  result._2._1>OUTPUT  )
    resultDstream.foreachRDD(resultRdd => {
      val comparedTopRdd = resultRdd.reduceByKey((a, b) => if (a._1 > b._1) a else b) // mapwithstate 过来的记录和原记录是一样多的。
      if (MetaMapInfo.KEYTYPEKV.equals(KVType)) {
        // 两段式 KEY 直接写
        comparedTopRdd.foreachPartition(partition => {
          BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
          if (isHLLC) partition.foreach(KV =>  BridgeUtils.putRedis(KV._1, getByteFromHllc(KV._2._2)))
          else partition.foreach(KV =>  BridgeUtils.putRedis(KV._1, KV._2._1))
        })
      } else {
        //三段样式 要排序
        val dimentionRdd = comparedTopRdd.filter(
          kv => kv._1.contains(MetaMapInfo.DIMSEG) && kv._1.split(MetaMapInfo.DIMSEG).size == 2)
          .map(kv => {
            val dims = kv._1.split(MetaMapInfo.DIMSEG)
            (dims(0), (dims(1), kv._2))
          })
        // top 20 需要排序的
        if (sorted) {
          dimentionRdd.groupByKey().map(keyValueCollect => {
            val key = keyValueCollect._1
            val valueCollection = keyValueCollect._2
            val sortedKeyValue = valueCollection.toArray.sortWith(_._2._1 > _._2._1)
            (key, if (sortedKeyValue.size > TOPN) sortedKeyValue.take(TOPN) else sortedKeyValue)
          }).foreachPartition(partition => {
            BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
            partition.foreach(kvCollection => {
              val key = kvCollection._1
              // val sumValue = kvCollection._2
              val dimValue = kvCollection._2
              // BridgeUtils.putRedis(key, MetaMapInfo.ALLDIMENSIONCOUNT, sumValue)
              if (isHLLC) dimValue.foreach(kv =>  if(kv._2._2!=null) BridgeUtils.putRedis(key, kv._1, getByteFromHllc(kv._2._2)))
              else dimValue.foreach(kv => BridgeUtils.putRedis(key, kv._1, kv._2._1))
            })
          })
        }
        else {
          dimentionRdd.foreachPartition(partition => {
            BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
            partition.foreach(kvCollection => {
              val key = kvCollection._1
              val dimValue = kvCollection._2
              if (isHLLC&&dimValue._2._2!=null)  BridgeUtils.putRedis(key, dimValue._1, getByteFromHllc(dimValue._2._2))
              else  BridgeUtils.putRedis(key, dimValue._1, dimValue._2._1)
            }
            )
          })
        }
      }
    })
  }


  def getByteFromHllc(hllc: HLLCounter): Array[Byte] = {
    val out1 = ByteBuffer.allocate(hllc.maxLength())
    hllc.writeRegisters(out1)
    out1.array()
  }

  def getHllcFromByte(bytes: Array[Byte], compressMode: Int): HLLCounter = {
    val hllCounter = new HLLCounter(compressMode)
    hllCounter.readRegisters(ByteBuffer.wrap(bytes))
    hllCounter
  }


  def createContext(brokers: String, topics: String, batchseconds: Int, checkpointDirectory: String
                    , DataType: String, calDate: String, offset: String): StreamingContext = {

    println(s" createContext \n  " +
      s" calbrokersDate : ${brokers} \n  " +
      s" topics : ${topics} \n  " +
      s" batchseconds : ${batchseconds}  \n " +
      s" checkpointDirectory : ${checkpointDirectory}  \n " +
      s" DataType : ${DataType}  \n " +
      s" calDate : ${calDate}  \n " +
      s" offset : ${offset}   ")

    val sparkConf = new SparkConf().setAppName("ArkStreamApp")
      .set("spark.streaming.backpressure.enabled","true")
      .set("spark.streaming.kafka.maxRatePerPartition","5")

    val ssc = new StreamingContext(sparkConf, Seconds(batchseconds))
    ssc.checkpoint(checkpointDirectory)
    val topicsSet = topics.split(",").toSet
    var kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
      , "auto.offset.reset" -> "largest"
    )
    if ("smallest".equals(offset)) kafkaParams = kafkaParams.updated("auto.offset.reset", "smallest")


    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    var offsetRanges = Array.empty[OffsetRange]

    messages.transform(r => {
      offsetRanges = r.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
      r
    }).count().print()


//    val dataInfoStream = messages.flatMap(
//      record => ParseUtils.parseArray(record._2, DataType)
//    ).filter(record => record.dataType.equals(MetaMapInfo.PINFO)) //只处理 pinfo数据
//      .map(record => {
//      var mapData = record.dataMap
//      mapData = ParseUtils.eventTimeProcess(mapData) //时间处理
//      mapData = ParseUtils.appIDProcess(mapData) // appID 处理
//      mapData = ParseUtils.sourceProcess(mapData) //来源类型处理
//      mapData
//    }).filter(dataMap =>
//      (dataMap.contains(MetaMapInfo.H5MAP("ApplicationKey"))
//        && dataMap(MetaMapInfo.H5MAP("ApplicationKey")) != null
//        && !(dataMap(MetaMapInfo.H5MAP("ApplicationKey")).isEmpty)
//        && dataMap("APPID") != null
//        && !dataMap("APPID").isEmpty
//        && dataMap("APPID") != "-1"
//        && dataMap("APPID").equals("100024")
//        && (!(dataMap("eventTime").isEmpty))
//        && dataMap("eventTime").contains(calDate)))



    //dataInfoStream.foreachRDD( r => r.take(100).foreach( record =>  println(s" dataInfoStream2 ${record("eventTime")} ")))
//   dataInfoStream.count().print
    // business type:
    // 一、计算业务 UV 类, COUNT(DISTINCT TEMPID)
    //val UV = "UV"
    //val REGION_PROVINCE = "PROVINCE"
    //val REGION_CITY = "CITY"
    //val DEVICE_TYPE = "DEVICE-TYPE"
    //DstreamUVProcess.uvDStreamProcess(dataInfoStream)
    //DstreamUVProcess.uvDStreamProcessDim(dataInfoStream)
    // 二、这个就是一个统计量
    //val PAGE_COUNT = "PAGE-COUNT"
    //DstreamPagePvProcess.pagePVDStreamProcess(dataInfoStream)
    // 三、 着陆页 相关统计
    //val SOURCE_TYPE = "SOURCE-TYPE"
    //val SOURCE_AD = "SOURCE-AD"
    //val SOURCE_KEYWORD = "SOURCE-KEYWORD"
    //val LOAD_PAGE_INCOUNT = "LOAD-PAGE-INCOUNT"
    //DstreamLoadPageProcess.loadPageDStreamProcess(dataInfoStream)
    ssc
  }

}
分享到:
评论

相关推荐

    Kafka Offset Monitor 资源本地化编译版

    有个别同学反馈上一次传的有部分资源没有完全本地化,所以今天特地重新编译了一版重新上传,再此也和之前下载的同学说声抱歉。希望对大家有用。

    kafka-manager1.3.0.8 可修改offset值

    这个工具使得管理员能够更方便地管理和维护Kafka集群,特别是对于offset值的修改,这是一个非常重要的特性,因为offset是Kafka消费者跟踪消息位置的关键。 首先,了解Kafka的基本概念。Kafka是一个分布式流处理平台...

    Mac和Windows版本Kafka可视化工具kafkatool Offset Explorer

    包含Mac 和 windows版本, 可以连接kafka,非常方便的查看topic、consumer、consumer-group 等信息。 1、首先在Properties页签下填写好 zookeeper 地址和端口 2、再从 Advanced页签下填写 broker地址和端口

    Offset Explorer(原名Kafka Tool)3.1 版本-Windows64位版本

    Offset Explorer(原名Kafka Tool)是一款用于管理和使用Apache Kafka集群的图形用户界面(GUI)应用程序。它为用户提供了直观的UI界面,方便快速查看Kafka集群中的对象以及集群主题中存储的消息。 - 最新版本的...

    kafka tool offset explorer 2.2

    《Kafka Tool Offset Explorer 2.2:洞察Kafka数据流的神器》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的消息中间件,扮演着至关重要的角色。而Kafka Tool Offset Explorer 2.2则是一款专为Kafka设计的...

    kafka客户端offset

    kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset ...

    kafka监控工具KafkaOffsetMnitor angularjs和css

    【Kafka监控工具KafkaOffsetMonitor详解】 KafkaOffsetMonitor是一款强大的开源监控工具,专为Apache Kafka设计,用于实时监控和分析Kafka集群中的消费者偏移量。它可以帮助管理员跟踪消费者的消费进度,确保数据的...

    offset kafka监控工具 免费

    而"offset kafka监控工具"则是针对Kafka集群进行管理和监控的重要辅助工具,它允许用户查看和管理Kafka主题中的消费偏移量,这对于理解和调试生产者和消费者的同步状态至关重要。 "offset explore"是这类工具的典型...

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。

    Apache Flink如何管理Kafka消费者offsets

    ### Apache Flink 如何管理 Kafka 消费者 Offsets #### 一、Flink与Kafka结合实现Checkpointing 在探讨Flink如何管理和利用Kafka消费者Offsets的过程中,首先要理解Flink与Kafka如何共同实现检查点(Checkpointing...

    kafka消息监控(linux运行_window查看)

    在Linux环境中运行Kafka Offset Monitor,你需要下载`KafkaOffsetMonitor-assembly-0.2.0.jar`文件,这是一个包含所有依赖的可执行JAR包。运行监控工具的命令通常如下: ```bash java -jar KafkaOffsetMonitor-...

    kafka-0.9.0-重置offset-ResetOff.java

    kafka-0.9.0-重置offset-ResetOff.java;

    kafkaoffsetmonitor

    《Kafka Offset Monitor 深度解析》 在大数据处理领域,Apache Kafka 是一个不可或缺的实时数据流平台,它提供高吞吐量、低延迟的消息传递能力。在 Kafka 的使用过程中,监控消费者组的消费进度至关重要,这有助于...

    springboot中如何实现kafa指定offset消费

    Spring Boot 中如何实现 Kafka 指定 Offset 消费 在 Spring Boot 中实现 Kafka 指定 Offset 消费是非常重要的,特别是在生产环境中,需要重新消费某个 Offset 的数据时。下面我们将详细介绍如何在 Spring Boot 中...

    apache kafka查询手册

    4. **Kafka Offset管理**:Offset是消费者在分区中位置的标识,记录了消费者已经读取到哪条消息。消费者可以保存自己的offset到Kafka的__consumer_offsets主题,以便恢复时从上次的位置继续消费。 5. **Kafka ...

    kafka-manager最新编译版1.3.3.22,解决了异常Unknown offset schema version 3

    《Kafka-Manager 1.3.3.22:解决Unknown offset schema version 3异常详解》 在大数据处理领域,Apache Kafka作为一个高效、可扩展的实时数据流平台,广泛应用于消息传递和数据集成。然而,在实际操作中,用户可能...

    KafkaOffsetMonitor0.4.6

    **Kafka Offset Monitor 0.4.6:监控与管理Kafka消费进度的利器** Kafka Offset Monitor是一款用于监控Apache Kafka消费者组消费进度的工具,它可以帮助管理员和开发者实时了解Kafka集群中各个消费者组的offset状态...

    KafkaOffsetMonitor监控工具2017年1月发布的版本

    Created function tryParseOffsetMessage to attempt to parse a kafka offset message retrieved from the internal committed offset topic: Handles messages of other types and questionable correctness. ...

    php操作kafka的扩展包php-rdkafka

    7. **Kafka Offset管理** 消费者可以通过`$consumer-&gt;offsetStore()`方法保存当前消费的位置,以便在程序重启后从上次离开的地方继续消费。 8. **监控与调试** 可以通过设置`debug`配置选项来输出详细的调试...

    KafkaOffsetMonitor-assembly-0.2.0 以及kafka基础文档

    KafkaOffsetMonitor是Kafka的一个实用工具,用于可视化展示消费者的offset状态,帮助管理员监控消费者的消费情况。它提供以下功能: 1. **实时监控** 显示每个消费者组在每个主题分区上的最新offset,以及消息的...

Global site tag (gtag.js) - Google Analytics