package cn.analysys.stream.state
import java.nio.ByteBuffer
import cn.analysys.meta.MetaMapInfo
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import cn.analysys.third.hllc.HLLCounter
import cn.analysys.utils.{BridgeUtils, ParseUtils}
import org.apache.spark.streaming.State
import org.apache.spark.streaming.dstream.DStream
object StateProcess {
val M = 10
val TOPN = 20
val MINUTECOMBINE = 300000
val TIMEOUTMINUTE = 1440
val OUTPUT = 100
val funcLongAdd = (key: String, tempId: Option[Long], state: State[Long]) => {
val sum = state.getOption.getOrElse(0L) + tempId.getOrElse(1l)
val hllc: HLLCounter = null
val output = (key, (sum, hllc))
state.update(sum)
output
}
def isNewKey(key: String, hllc: HLLCounter): Boolean = {
var isNew = false
val begin = hllc.getCountEstimate
hllc.add(key)
val end = hllc.getCountEstimate
if (end > begin) isNew = true
isNew
}
val appKeyDeviceIdessionIDDistinct = (key: Int, loadPage: Option[LoadPageClassType], state: State[HLLCounter]) => {
val hllc = state.getOption().getOrElse(new HLLCounter(M))
var page: LoadPageClassType = null
val begin = hllc.getCountEstimate
hllc.add(key)
val end = hllc.getCountEstimate
if (end > begin) page = loadPage.getOrElse(null)
val output = page
state.update(hllc)
output
}
// 为了减少后面的处理数据,把出现过的基数设置为Null。
val funcHllcAdd = (key: String, tempId: Option[String], state: State[HLLCounter]) => {
val hllc = state.getOption().getOrElse(new HLLCounter(M))
val begin = hllc.getCountEstimate
hllc.add(tempId.getOrElse(""))
val end = hllc.getCountEstimate
val output = (if (end > begin) key else "", (hllc.getCountEstimate, hllc))
state.update(hllc)
output
}
//所有 KV 返回的都是 一个 String ,一个 Long 或者 hllc
//约定:对于输出的Key ,根据Key 的MetaMapInfo.SEG 区分如果是两段,就存K:V 结构,如果是三段,就存 K:K:V结构
//KVType : MetaMapInfo.KEYTYPEKV ,MetaMapInfo.KEYTYPEKKV
//考虑 redis 的压力。 1、两段式阶段的会根据 KEY 或者最大值。写入存储
// 2,三段式,会根据 第三个字段排序,获取前20.并且把所有的维度的值得总和,记录一下。写入库里面
def longDstreamSortAndOutPut(resultDstreamOrg: DStream[(String, (Long, HLLCounter))], KVType: String,
isHLLC: Boolean, sorted: Boolean = false) = {
//debug
// if(isHLLC)
// resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1} hllc value ${x._2._2.getCountEstimate } ")))
// else resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1} long value ${x._2._1}")))
val resultDstream = resultDstreamOrg.filter(result => result._2._1>OUTPUT )
resultDstream.foreachRDD(resultRdd => {
val comparedTopRdd = resultRdd.reduceByKey((a, b) => if (a._1 > b._1) a else b) // mapwithstate 过来的记录和原记录是一样多的。
if (MetaMapInfo.KEYTYPEKV.equals(KVType)) {
// 两段式 KEY 直接写
comparedTopRdd.foreachPartition(partition => {
BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
if (isHLLC) partition.foreach(KV => BridgeUtils.putRedis(KV._1, getByteFromHllc(KV._2._2)))
else partition.foreach(KV => BridgeUtils.putRedis(KV._1, KV._2._1))
})
} else {
//三段样式 要排序
val dimentionRdd = comparedTopRdd.filter(
kv => kv._1.contains(MetaMapInfo.DIMSEG) && kv._1.split(MetaMapInfo.DIMSEG).size == 2)
.map(kv => {
val dims = kv._1.split(MetaMapInfo.DIMSEG)
(dims(0), (dims(1), kv._2))
})
// top 20 需要排序的
if (sorted) {
dimentionRdd.groupByKey().map(keyValueCollect => {
val key = keyValueCollect._1
val valueCollection = keyValueCollect._2
val sortedKeyValue = valueCollection.toArray.sortWith(_._2._1 > _._2._1)
(key, if (sortedKeyValue.size > TOPN) sortedKeyValue.take(TOPN) else sortedKeyValue)
}).foreachPartition(partition => {
BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
partition.foreach(kvCollection => {
val key = kvCollection._1
// val sumValue = kvCollection._2
val dimValue = kvCollection._2
// BridgeUtils.putRedis(key, MetaMapInfo.ALLDIMENSIONCOUNT, sumValue)
if (isHLLC) dimValue.foreach(kv => if(kv._2._2!=null) BridgeUtils.putRedis(key, kv._1, getByteFromHllc(kv._2._2)))
else dimValue.foreach(kv => BridgeUtils.putRedis(key, kv._1, kv._2._1))
})
})
}
else {
dimentionRdd.foreachPartition(partition => {
BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
partition.foreach(kvCollection => {
val key = kvCollection._1
val dimValue = kvCollection._2
if (isHLLC&&dimValue._2._2!=null) BridgeUtils.putRedis(key, dimValue._1, getByteFromHllc(dimValue._2._2))
else BridgeUtils.putRedis(key, dimValue._1, dimValue._2._1)
}
)
})
}
}
})
}
def getByteFromHllc(hllc: HLLCounter): Array[Byte] = {
val out1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(out1)
out1.array()
}
def getHllcFromByte(bytes: Array[Byte], compressMode: Int): HLLCounter = {
val hllCounter = new HLLCounter(compressMode)
hllCounter.readRegisters(ByteBuffer.wrap(bytes))
hllCounter
}
def createContext(brokers: String, topics: String, batchseconds: Int, checkpointDirectory: String
, DataType: String, calDate: String, offset: String): StreamingContext = {
println(s" createContext \n " +
s" calbrokersDate : ${brokers} \n " +
s" topics : ${topics} \n " +
s" batchseconds : ${batchseconds} \n " +
s" checkpointDirectory : ${checkpointDirectory} \n " +
s" DataType : ${DataType} \n " +
s" calDate : ${calDate} \n " +
s" offset : ${offset} ")
val sparkConf = new SparkConf().setAppName("ArkStreamApp")
.set("spark.streaming.backpressure.enabled","true")
.set("spark.streaming.kafka.maxRatePerPartition","5")
val ssc = new StreamingContext(sparkConf, Seconds(batchseconds))
ssc.checkpoint(checkpointDirectory)
val topicsSet = topics.split(",").toSet
var kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers
, "auto.offset.reset" -> "largest"
)
if ("smallest".equals(offset)) kafkaParams = kafkaParams.updated("auto.offset.reset", "smallest")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
var offsetRanges = Array.empty[OffsetRange]
messages.transform(r => {
offsetRanges = r.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
r
}).count().print()
// val dataInfoStream = messages.flatMap(
// record => ParseUtils.parseArray(record._2, DataType)
// ).filter(record => record.dataType.equals(MetaMapInfo.PINFO)) //只处理 pinfo数据
// .map(record => {
// var mapData = record.dataMap
// mapData = ParseUtils.eventTimeProcess(mapData) //时间处理
// mapData = ParseUtils.appIDProcess(mapData) // appID 处理
// mapData = ParseUtils.sourceProcess(mapData) //来源类型处理
// mapData
// }).filter(dataMap =>
// (dataMap.contains(MetaMapInfo.H5MAP("ApplicationKey"))
// && dataMap(MetaMapInfo.H5MAP("ApplicationKey")) != null
// && !(dataMap(MetaMapInfo.H5MAP("ApplicationKey")).isEmpty)
// && dataMap("APPID") != null
// && !dataMap("APPID").isEmpty
// && dataMap("APPID") != "-1"
// && dataMap("APPID").equals("100024")
// && (!(dataMap("eventTime").isEmpty))
// && dataMap("eventTime").contains(calDate)))
//dataInfoStream.foreachRDD( r => r.take(100).foreach( record => println(s" dataInfoStream2 ${record("eventTime")} ")))
// dataInfoStream.count().print
// business type:
// 一、计算业务 UV 类, COUNT(DISTINCT TEMPID)
//val UV = "UV"
//val REGION_PROVINCE = "PROVINCE"
//val REGION_CITY = "CITY"
//val DEVICE_TYPE = "DEVICE-TYPE"
//DstreamUVProcess.uvDStreamProcess(dataInfoStream)
//DstreamUVProcess.uvDStreamProcessDim(dataInfoStream)
// 二、这个就是一个统计量
//val PAGE_COUNT = "PAGE-COUNT"
//DstreamPagePvProcess.pagePVDStreamProcess(dataInfoStream)
// 三、 着陆页 相关统计
//val SOURCE_TYPE = "SOURCE-TYPE"
//val SOURCE_AD = "SOURCE-AD"
//val SOURCE_KEYWORD = "SOURCE-KEYWORD"
//val LOAD_PAGE_INCOUNT = "LOAD-PAGE-INCOUNT"
//DstreamLoadPageProcess.loadPageDStreamProcess(dataInfoStream)
ssc
}
}
分享到:
相关推荐
有个别同学反馈上一次传的有部分资源没有完全本地化,所以今天特地重新编译了一版重新上传,再此也和之前下载的同学说声抱歉。希望对大家有用。
这个工具使得管理员能够更方便地管理和维护Kafka集群,特别是对于offset值的修改,这是一个非常重要的特性,因为offset是Kafka消费者跟踪消息位置的关键。 首先,了解Kafka的基本概念。Kafka是一个分布式流处理平台...
包含Mac 和 windows版本, 可以连接kafka,非常方便的查看topic、consumer、consumer-group 等信息。 1、首先在Properties页签下填写好 zookeeper 地址和端口 2、再从 Advanced页签下填写 broker地址和端口
Offset Explorer(原名Kafka Tool)是一款用于管理和使用Apache Kafka集群的图形用户界面(GUI)应用程序。它为用户提供了直观的UI界面,方便快速查看Kafka集群中的对象以及集群主题中存储的消息。 - 最新版本的...
《Kafka Tool Offset Explorer 2.2:洞察Kafka数据流的神器》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的消息中间件,扮演着至关重要的角色。而Kafka Tool Offset Explorer 2.2则是一款专为Kafka设计的...
kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset ...
而"offset kafka监控工具"则是针对Kafka集群进行管理和监控的重要辅助工具,它允许用户查看和管理Kafka主题中的消费偏移量,这对于理解和调试生产者和消费者的同步状态至关重要。 "offset explore"是这类工具的典型...
kafkatool 国内可能下载不了,搞32 64位的都压缩在一起了Kafka Tool 2.0.7( 32\64) To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a ...
【Kafka监控工具KafkaOffsetMonitor详解】 KafkaOffsetMonitor是一款强大的开源监控工具,专为Apache Kafka设计,用于实时监控和分析Kafka集群中的消费者偏移量。它可以帮助管理员跟踪消费者的消费进度,确保数据的...
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
### Apache Flink 如何管理 Kafka 消费者 Offsets #### 一、Flink与Kafka结合实现Checkpointing 在探讨Flink如何管理和利用Kafka消费者Offsets的过程中,首先要理解Flink与Kafka如何共同实现检查点(Checkpointing...
在Linux环境中运行Kafka Offset Monitor,你需要下载`KafkaOffsetMonitor-assembly-0.2.0.jar`文件,这是一个包含所有依赖的可执行JAR包。运行监控工具的命令通常如下: ```bash java -jar KafkaOffsetMonitor-...
kafka-0.9.0-重置offset-ResetOff.java;
《Kafka Offset Monitor 深度解析》 在大数据处理领域,Apache Kafka 是一个不可或缺的实时数据流平台,它提供高吞吐量、低延迟的消息传递能力。在 Kafka 的使用过程中,监控消费者组的消费进度至关重要,这有助于...
Spring Boot 中如何实现 Kafka 指定 Offset 消费 在 Spring Boot 中实现 Kafka 指定 Offset 消费是非常重要的,特别是在生产环境中,需要重新消费某个 Offset 的数据时。下面我们将详细介绍如何在 Spring Boot 中...
4. **Kafka Offset管理**:Offset是消费者在分区中位置的标识,记录了消费者已经读取到哪条消息。消费者可以保存自己的offset到Kafka的__consumer_offsets主题,以便恢复时从上次的位置继续消费。 5. **Kafka ...
《Kafka-Manager 1.3.3.22:解决Unknown offset schema version 3异常详解》 在大数据处理领域,Apache Kafka作为一个高效、可扩展的实时数据流平台,广泛应用于消息传递和数据集成。然而,在实际操作中,用户可能...
**Kafka Offset Monitor 0.4.6:监控与管理Kafka消费进度的利器** Kafka Offset Monitor是一款用于监控Apache Kafka消费者组消费进度的工具,它可以帮助管理员和开发者实时了解Kafka集群中各个消费者组的offset状态...
Created function tryParseOffsetMessage to attempt to parse a kafka offset message retrieved from the internal committed offset topic: Handles messages of other types and questionable correctness. ...
7. **Kafka Offset管理** 消费者可以通过`$consumer->offsetStore()`方法保存当前消费的位置,以便在程序重启后从上次离开的地方继续消费。 8. **监控与调试** 可以通过设置`debug`配置选项来输出详细的调试...