`

kakfa offset

 
阅读更多
package cn.analysys.stream.state

import java.nio.ByteBuffer

import cn.analysys.meta.MetaMapInfo
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import cn.analysys.third.hllc.HLLCounter
import cn.analysys.utils.{BridgeUtils, ParseUtils}
import org.apache.spark.streaming.State
import org.apache.spark.streaming.dstream.DStream


object StateProcess {
  val M = 10
  val TOPN = 20
  val MINUTECOMBINE = 300000
  val TIMEOUTMINUTE = 1440
  val OUTPUT = 100
  val funcLongAdd = (key: String, tempId: Option[Long], state: State[Long]) => {
    val sum = state.getOption.getOrElse(0L) + tempId.getOrElse(1l)
    val hllc: HLLCounter = null
    val output = (key, (sum, hllc))
    state.update(sum)
    output
  }

  def isNewKey(key: String, hllc: HLLCounter): Boolean = {
    var isNew = false
    val begin = hllc.getCountEstimate
    hllc.add(key)
    val end = hllc.getCountEstimate
    if (end > begin) isNew = true
    isNew
  }


  val appKeyDeviceIdessionIDDistinct = (key: Int, loadPage: Option[LoadPageClassType], state: State[HLLCounter]) => {
    val hllc = state.getOption().getOrElse(new HLLCounter(M))
    var page: LoadPageClassType = null
    val begin = hllc.getCountEstimate
    hllc.add(key)
    val end = hllc.getCountEstimate
    if (end > begin) page = loadPage.getOrElse(null)
    val output = page
    state.update(hllc)
    output
  }

  // 为了减少后面的处理数据,把出现过的基数设置为Null。
  val funcHllcAdd = (key: String, tempId: Option[String], state: State[HLLCounter]) => {
    val hllc = state.getOption().getOrElse(new HLLCounter(M))
    val begin = hllc.getCountEstimate
    hllc.add(tempId.getOrElse(""))
    val end = hllc.getCountEstimate
    val output = (if (end > begin) key else "", (hllc.getCountEstimate, hllc))
    state.update(hllc)
    output
  }

  //所有 KV 返回的都是 一个 String ,一个 Long 或者 hllc
  //约定:对于输出的Key ,根据Key 的MetaMapInfo.SEG 区分如果是两段,就存K:V 结构,如果是三段,就存 K:K:V结构
  //KVType : MetaMapInfo.KEYTYPEKV ,MetaMapInfo.KEYTYPEKKV
  //考虑 redis 的压力。 1、两段式阶段的会根据 KEY 或者最大值。写入存储
  //                  2,三段式,会根据 第三个字段排序,获取前20.并且把所有的维度的值得总和,记录一下。写入库里面
  def longDstreamSortAndOutPut(resultDstreamOrg: DStream[(String, (Long, HLLCounter))], KVType: String,
                               isHLLC: Boolean, sorted: Boolean = false) = {
    //debug
    //    if(isHLLC)
    //      resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1}   hllc value ${x._2._2.getCountEstimate } ")))
    //    else resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1}  long   value ${x._2._1}")))
    val resultDstream =  resultDstreamOrg.filter(result =>  result._2._1>OUTPUT  )
    resultDstream.foreachRDD(resultRdd => {
      val comparedTopRdd = resultRdd.reduceByKey((a, b) => if (a._1 > b._1) a else b) // mapwithstate 过来的记录和原记录是一样多的。
      if (MetaMapInfo.KEYTYPEKV.equals(KVType)) {
        // 两段式 KEY 直接写
        comparedTopRdd.foreachPartition(partition => {
          BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
          if (isHLLC) partition.foreach(KV =>  BridgeUtils.putRedis(KV._1, getByteFromHllc(KV._2._2)))
          else partition.foreach(KV =>  BridgeUtils.putRedis(KV._1, KV._2._1))
        })
      } else {
        //三段样式 要排序
        val dimentionRdd = comparedTopRdd.filter(
          kv => kv._1.contains(MetaMapInfo.DIMSEG) && kv._1.split(MetaMapInfo.DIMSEG).size == 2)
          .map(kv => {
            val dims = kv._1.split(MetaMapInfo.DIMSEG)
            (dims(0), (dims(1), kv._2))
          })
        // top 20 需要排序的
        if (sorted) {
          dimentionRdd.groupByKey().map(keyValueCollect => {
            val key = keyValueCollect._1
            val valueCollection = keyValueCollect._2
            val sortedKeyValue = valueCollection.toArray.sortWith(_._2._1 > _._2._1)
            (key, if (sortedKeyValue.size > TOPN) sortedKeyValue.take(TOPN) else sortedKeyValue)
          }).foreachPartition(partition => {
            BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
            partition.foreach(kvCollection => {
              val key = kvCollection._1
              // val sumValue = kvCollection._2
              val dimValue = kvCollection._2
              // BridgeUtils.putRedis(key, MetaMapInfo.ALLDIMENSIONCOUNT, sumValue)
              if (isHLLC) dimValue.foreach(kv =>  if(kv._2._2!=null) BridgeUtils.putRedis(key, kv._1, getByteFromHllc(kv._2._2)))
              else dimValue.foreach(kv => BridgeUtils.putRedis(key, kv._1, kv._2._1))
            })
          })
        }
        else {
          dimentionRdd.foreachPartition(partition => {
            BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
            partition.foreach(kvCollection => {
              val key = kvCollection._1
              val dimValue = kvCollection._2
              if (isHLLC&&dimValue._2._2!=null)  BridgeUtils.putRedis(key, dimValue._1, getByteFromHllc(dimValue._2._2))
              else  BridgeUtils.putRedis(key, dimValue._1, dimValue._2._1)
            }
            )
          })
        }
      }
    })
  }


  def getByteFromHllc(hllc: HLLCounter): Array[Byte] = {
    val out1 = ByteBuffer.allocate(hllc.maxLength())
    hllc.writeRegisters(out1)
    out1.array()
  }

  def getHllcFromByte(bytes: Array[Byte], compressMode: Int): HLLCounter = {
    val hllCounter = new HLLCounter(compressMode)
    hllCounter.readRegisters(ByteBuffer.wrap(bytes))
    hllCounter
  }


  def createContext(brokers: String, topics: String, batchseconds: Int, checkpointDirectory: String
                    , DataType: String, calDate: String, offset: String): StreamingContext = {

    println(s" createContext \n  " +
      s" calbrokersDate : ${brokers} \n  " +
      s" topics : ${topics} \n  " +
      s" batchseconds : ${batchseconds}  \n " +
      s" checkpointDirectory : ${checkpointDirectory}  \n " +
      s" DataType : ${DataType}  \n " +
      s" calDate : ${calDate}  \n " +
      s" offset : ${offset}   ")

    val sparkConf = new SparkConf().setAppName("ArkStreamApp")
      .set("spark.streaming.backpressure.enabled","true")
      .set("spark.streaming.kafka.maxRatePerPartition","5")

    val ssc = new StreamingContext(sparkConf, Seconds(batchseconds))
    ssc.checkpoint(checkpointDirectory)
    val topicsSet = topics.split(",").toSet
    var kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
      , "auto.offset.reset" -> "largest"
    )
    if ("smallest".equals(offset)) kafkaParams = kafkaParams.updated("auto.offset.reset", "smallest")


    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    var offsetRanges = Array.empty[OffsetRange]

    messages.transform(r => {
      offsetRanges = r.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
      r
    }).count().print()


//    val dataInfoStream = messages.flatMap(
//      record => ParseUtils.parseArray(record._2, DataType)
//    ).filter(record => record.dataType.equals(MetaMapInfo.PINFO)) //只处理 pinfo数据
//      .map(record => {
//      var mapData = record.dataMap
//      mapData = ParseUtils.eventTimeProcess(mapData) //时间处理
//      mapData = ParseUtils.appIDProcess(mapData) // appID 处理
//      mapData = ParseUtils.sourceProcess(mapData) //来源类型处理
//      mapData
//    }).filter(dataMap =>
//      (dataMap.contains(MetaMapInfo.H5MAP("ApplicationKey"))
//        && dataMap(MetaMapInfo.H5MAP("ApplicationKey")) != null
//        && !(dataMap(MetaMapInfo.H5MAP("ApplicationKey")).isEmpty)
//        && dataMap("APPID") != null
//        && !dataMap("APPID").isEmpty
//        && dataMap("APPID") != "-1"
//        && dataMap("APPID").equals("100024")
//        && (!(dataMap("eventTime").isEmpty))
//        && dataMap("eventTime").contains(calDate)))



    //dataInfoStream.foreachRDD( r => r.take(100).foreach( record =>  println(s" dataInfoStream2 ${record("eventTime")} ")))
//   dataInfoStream.count().print
    // business type:
    // 一、计算业务 UV 类, COUNT(DISTINCT TEMPID)
    //val UV = "UV"
    //val REGION_PROVINCE = "PROVINCE"
    //val REGION_CITY = "CITY"
    //val DEVICE_TYPE = "DEVICE-TYPE"
    //DstreamUVProcess.uvDStreamProcess(dataInfoStream)
    //DstreamUVProcess.uvDStreamProcessDim(dataInfoStream)
    // 二、这个就是一个统计量
    //val PAGE_COUNT = "PAGE-COUNT"
    //DstreamPagePvProcess.pagePVDStreamProcess(dataInfoStream)
    // 三、 着陆页 相关统计
    //val SOURCE_TYPE = "SOURCE-TYPE"
    //val SOURCE_AD = "SOURCE-AD"
    //val SOURCE_KEYWORD = "SOURCE-KEYWORD"
    //val LOAD_PAGE_INCOUNT = "LOAD-PAGE-INCOUNT"
    //DstreamLoadPageProcess.loadPageDStreamProcess(dataInfoStream)
    ssc
  }

}
分享到:
评论

相关推荐

    Kafka Offset Monitor 资源本地化编译版

    有个别同学反馈上一次传的有部分资源没有完全本地化,所以今天特地重新编译了一版重新上传,再此也和之前下载的同学说声抱歉。希望对大家有用。

    kafka-manager1.3.0.8 可修改offset值

    这个工具使得管理员能够更方便地管理和维护Kafka集群,特别是对于offset值的修改,这是一个非常重要的特性,因为offset是Kafka消费者跟踪消息位置的关键。 首先,了解Kafka的基本概念。Kafka是一个分布式流处理平台...

    Mac和Windows版本Kafka可视化工具kafkatool Offset Explorer

    包含Mac 和 windows版本, 可以连接kafka,非常方便的查看topic、consumer、consumer-group 等信息。 1、首先在Properties页签下填写好 zookeeper 地址和端口 2、再从 Advanced页签下填写 broker地址和端口

    Offset Explorer(原名Kafka Tool)3.1 版本-Windows64位版本

    Offset Explorer(原名Kafka Tool)是一款用于管理和使用Apache Kafka集群的图形用户界面(GUI)应用程序。它为用户提供了直观的UI界面,方便快速查看Kafka集群中的对象以及集群主题中存储的消息。 - 最新版本的...

    kafka tool offset explorer 2.2

    《Kafka Tool Offset Explorer 2.2:洞察Kafka数据流的神器》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的消息中间件,扮演着至关重要的角色。而Kafka Tool Offset Explorer 2.2则是一款专为Kafka设计的...

    kafka客户端offset

    kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset ...

    offset kafka监控工具 免费

    而"offset kafka监控工具"则是针对Kafka集群进行管理和监控的重要辅助工具,它允许用户查看和管理Kafka主题中的消费偏移量,这对于理解和调试生产者和消费者的同步状态至关重要。 "offset explore"是这类工具的典型...

    Kafka Tool 2.0.7(windows 32\64).7z

    kafkatool 国内可能下载不了,搞32 64位的都压缩在一起了Kafka Tool 2.0.7( 32\64) To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a ...

    kafka监控工具KafkaOffsetMnitor angularjs和css

    【Kafka监控工具KafkaOffsetMonitor详解】 KafkaOffsetMonitor是一款强大的开源监控工具,专为Apache Kafka设计,用于实时监控和分析Kafka集群中的消费者偏移量。它可以帮助管理员跟踪消费者的消费进度,确保数据的...

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。

    Apache Flink如何管理Kafka消费者offsets

    ### Apache Flink 如何管理 Kafka 消费者 Offsets #### 一、Flink与Kafka结合实现Checkpointing 在探讨Flink如何管理和利用Kafka消费者Offsets的过程中,首先要理解Flink与Kafka如何共同实现检查点(Checkpointing...

    kafka消息监控(linux运行_window查看)

    在Linux环境中运行Kafka Offset Monitor,你需要下载`KafkaOffsetMonitor-assembly-0.2.0.jar`文件,这是一个包含所有依赖的可执行JAR包。运行监控工具的命令通常如下: ```bash java -jar KafkaOffsetMonitor-...

    kafka-0.9.0-重置offset-ResetOff.java

    kafka-0.9.0-重置offset-ResetOff.java;

    kafkaoffsetmonitor

    《Kafka Offset Monitor 深度解析》 在大数据处理领域,Apache Kafka 是一个不可或缺的实时数据流平台,它提供高吞吐量、低延迟的消息传递能力。在 Kafka 的使用过程中,监控消费者组的消费进度至关重要,这有助于...

    springboot中如何实现kafa指定offset消费

    Spring Boot 中如何实现 Kafka 指定 Offset 消费 在 Spring Boot 中实现 Kafka 指定 Offset 消费是非常重要的,特别是在生产环境中,需要重新消费某个 Offset 的数据时。下面我们将详细介绍如何在 Spring Boot 中...

    apache kafka查询手册

    4. **Kafka Offset管理**:Offset是消费者在分区中位置的标识,记录了消费者已经读取到哪条消息。消费者可以保存自己的offset到Kafka的__consumer_offsets主题,以便恢复时从上次的位置继续消费。 5. **Kafka ...

    kafka-manager最新编译版1.3.3.22,解决了异常Unknown offset schema version 3

    《Kafka-Manager 1.3.3.22:解决Unknown offset schema version 3异常详解》 在大数据处理领域,Apache Kafka作为一个高效、可扩展的实时数据流平台,广泛应用于消息传递和数据集成。然而,在实际操作中,用户可能...

    KafkaOffsetMonitor0.4.6

    **Kafka Offset Monitor 0.4.6:监控与管理Kafka消费进度的利器** Kafka Offset Monitor是一款用于监控Apache Kafka消费者组消费进度的工具,它可以帮助管理员和开发者实时了解Kafka集群中各个消费者组的offset状态...

    KafkaOffsetMonitor监控工具2017年1月发布的版本

    Created function tryParseOffsetMessage to attempt to parse a kafka offset message retrieved from the internal committed offset topic: Handles messages of other types and questionable correctness. ...

    php操作kafka的扩展包php-rdkafka

    7. **Kafka Offset管理** 消费者可以通过`$consumer-&gt;offsetStore()`方法保存当前消费的位置,以便在程序重启后从上次离开的地方继续消费。 8. **监控与调试** 可以通过设置`debug`配置选项来输出详细的调试...

Global site tag (gtag.js) - Google Analytics