`
kane_xie
  • 浏览: 144571 次
社区版块
存档分类
最新评论

kafka实时监控

阅读更多

在kafka的开发和维护中,我们经常需要了解kafka topic以及连接在其上的consumer的实时信息,比如logsize,offset,owner等。为此kafka提供了ConsumerOffsetChecker,它的用法很简单

 

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group <group>

 

输出结果类似于

 

Group Topic Pid Offset logSize Lag Owner
Group1 a.topic 0 2 2 0 none
Group1 a.topic 1 0 0 0 none
Group1 a.topic 2 2 2 0 none

 

我们也可以通过kafka web console一类的工具直观地获取kafka信息,但如果我们要构建自己的监控系统,需要抓取这些信息的话,有两种办法:一种是运行ConsumerOffsetChecker然后解析输出的字符串;另一种就是通过SimpleConsumer和Zookeeper实时抓取信息(换句话说就是把ConsumerOffsetChecker翻译一下:)),以下介绍第二种方法的思路。

 

首先我们看kafka信息在zookeeper的存储结构

 

1,/brokers/topics/[topic]/partitions/[partitionId]/state
2,/brokers/ids/[0...N]
3,/consumers/[groupId]/ids/[consumerId]
4,/consumers/[groupId]/owners/[topic]/[partitionId]
5,/consumers/[groupId]/offsets/[topic]/[partitionId]

 

对于指定的topic和groupid,通过(1)可以拿到所有的partition信息(Pid),然后通过(5)可以拿到offset,通过(4)可以拿到owner。就差logsize还没法拿到,事实上logsize在zookeeper中并没有记录,它必须通过kafka consumer的low level的api取得。

 

 

 

private Long getLogSize(String topicName, String partitionId, long offsetRequestTime) {
	SimpleConsumer consumer = new SimpleConsumer(server, port, 10000, 1024000, "ConsumerOffsetChecker");
	Long logSize = null;
	TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, Integer.parseInt(partitionId));
	Map<TopicAndPartition, PartitionOffsetRequestInfo> map = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
	map.put(topicAndPartition, new PartitionOffsetRequestInfo(offsetRequestTime, 1));
	OffsetRequest request = new OffsetRequest(map, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
	OffsetResponse response = consumer.getOffsetsBefore(request);
	long[] aa = response.offsets(topicName, Integer.parseInt(partitionId));
	if (aa.length != 0) {
		logSize = aa[0];
	}
	return logSize;
}

 

第二行,创建simple consumer

第四行,创建topic和partition对象

第六行,创建offset request,第一个参数是记录写入的时间,如果是kafka.api.OffsetRequest.EarliestTime(),则代表当前最早的一条记录,也就是当前最小offset;如果是kafka.api.OffsetRequest.LatestTime(),则代表最新的一条记录,也就是当前最大offset。第二个参数是获取offset的个数。

 

由max offset和current offset,我们可以获得当前还有多少消息没有被消费(lag),由(lag/(maxoffset-minoffset)),我们可以算出当前还没有被消费的消息占的百分比,如果这个百分比接近100%,那么接下来很可能会导致offset out of range exception而丢失数据。

2
1
分享到:
评论

相关推荐

    5、kafka监控工具Kafka-Eagle介绍及使用

    Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...

    kafka-manager监控管理kafka工具

    3. **消费者监控**:它提供了实时的消费者组监控,包括消费进度、滞后和延迟情况,这对于优化消费者性能和排查问题至关重要。 4. **配置调整**:在不重启服务的情况下,Kafka-Manager允许你动态调整Kafka和...

    storm-kafka实时趋势分析

    3. **监控与日志**:集成监控工具(如Grafana、Prometheus)和日志系统(如Logstash、ELK Stack),实时监控系统性能和错误,及时发现问题并进行优化。 总的来说,"storm-kafka实时趋势分析"是一种强大的实时数据...

    kafka监控工具

    本文将详细探讨“kafka监控工具”,特别是通过Windows客户端连接Zookeeper来获取Kafka运行状态的方法。 Zookeeper是Apache Hadoop的一个子项目,它是一个分布式的、开放源码的协调服务,为分布式应用提供一致性服务...

    Kafka监控工具.zip

    在大数据处理领域,Apache Kafka是一款广泛应用的分布式流处理平台,它能够高效地处理大量实时数据。Kafka的核心功能包括发布订阅消息系统、数据管道以及数据存储。为了更好地管理和监控Kafka集群,有两个重要的工具...

    Windows环境下kafka监控工具之kafkaOffsetMonitor的部署

    资源介绍了kafka的监控工具-kafkaOffsetMonitor在windodws环境下的部署,以kafkaOffsetMonitor_0.2.1版本举例说明了该工具在windows环境下如何部署及对kafka参数进行监控。

    kafka消息监控(linux运行_window查看)

    这个工具通常用于实时查看消息的消费进度、监控延迟以及诊断可能的问题。 在Linux环境中运行Kafka Offset Monitor,你需要下载`KafkaOffsetMonitor-assembly-0.2.0.jar`文件,这是一个包含所有依赖的可执行JAR包。...

    kafka监控工具KafkaOffsetMnitor angularjs和css

    【Kafka监控工具KafkaOffsetMonitor详解】 KafkaOffsetMonitor是一款强大的开源监控工具,专为Apache Kafka设计,用于实时监控和分析Kafka集群中的消费者偏移量。它可以帮助管理员跟踪消费者的消费进度,确保数据的...

    zabbix监控之kafka模板_zbx_kafka_templates

    Zabbix提供了名为"zbx_kafka_templates"的模板,使得我们可以轻松集成Kafka监控。这个模板包含了各种关键性能指标,如Brokers的CPU利用率、内存使用情况、磁盘I/O、网络流量,以及主题(Topic)级别的消息生产和消费...

    zabbix监控之kafka模板

    zabbix监控之kafka模板

    实测可用kafka监控工具

    标题中的“实测可用kafka监控工具”指的是一个经过实际测试的工具,这个工具能够帮助管理员有效监控Kafka集群的状态,确保系统的稳定运行。这样的工具通常包含以下几个关键功能: 1. **基本信息仪表盘**:这是一个...

    offset kafka监控工具 免费

    总的来说,这款offset kafka监控工具通过提供实时的offset信息和便捷的操作方式,帮助用户更好地管理和维护Kafka集群。无论是为了监控生产环境的健康状况,还是为了开发和测试目的,都能发挥重要的作用。同时,其...

    linux系统的kafka监控

    监控Kafka的主要目的是实时了解系统的健康状况、性能瓶颈和可能的问题,包括但不限于主题分布、消费进度、生产与消费速率、磁盘使用情况、网络I/O等。及时的监控可以帮助我们提前发现并解决问题,避免生产环境中的...

    kafka监控工具kafka-eagle

    **Kafka Eagle** 是一款专为Apache Kafka设计的开源监控和管理工具,旨在提供更为直观、高效的监控解决方案。...尽管2018年的版本可能无法满足最新的需求,但它仍然是了解Kafka监控工具的一个良好起点。

    kafka监控安装包

    Kafka监控是确保分布式消息系统高效、稳定运行的关键环节,Kafka-Eagle是一款专为Kafka设计的可视化管理和监控工具。这款工具提供了丰富的界面展示,包括消费者、主题、 broker、集群的状态信息,以及性能指标等,...

    redhat/centos/linux系统上zookeeper和kafka进程监控脚本

    如下所示,手动指定zookeeper和kafka的bin目录、配置文件以及需要添加定时任务的周期几个参数后,执行当前脚本,脚本会自动添加定时任务并开始监控zookeeper及kafka进程,如果进程不存在则重启并放入后台,存在则...

    Kafka的监控平台jar包

    Kafka的监控平台jar包

    Kafka 监控

    **Kafka监控详解** 在大数据实时处理领域,Apache Kafka是一个不可或缺的组件,它作为一个高吞吐量、分布式的发布订阅消息系统,广泛应用于日志收集、流式数据处理和实时分析等场景。为了确保Kafka集群的稳定运行,...

    kafka集群部署、监控

    zookeeper集群部署,kafka集群部署,kafka介绍,topic创建、删除、kafka监控

    基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码

    基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...

Global site tag (gtag.js) - Google Analytics