1.概述
目前,Kafka 官网最新版[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。
2.内容
其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了自一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClient的API操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。
在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的topic中。
当然,其实她实现的原理也让我们很熟悉,利用 Kafka 自身的 Topic,以消费的Group,Topic,以及Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到 ack(数据安全性极好,当然,其速度会有所影响)。所以 Kafka 又在内存中维护了一个关于 Group,Topic 和 Partition 的三元组来维护最新的 offset 信息,消费者获取最新的offset的时候会直接从内存中获取。
3.实现
那我们如何实现获取这部分消费的 offset,我们可以在内存中定义一个Map集合,来维护消费中所捕捉到 offset,如下所示:
protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();
然后,我们通过一个监听线程来更新内存中的Map,代码如下所示:
private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(consumerOffsetTopic, new Integer(1)); KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0); ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator(); while (true) { MessageAndMetadata<byte[], byte[]> offsetMsg = it.next(); if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) { try { GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key())); if (offsetMsg.message() == null) { continue; } OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message())); offsetMap.put(commitKey, commitValue); } catch (Exception e) { e.printStackTrace(); } } } }
在拿到这部分更新后的offset数据,我们可以通过 RPC 将这部分数据共享出去,让客户端获取这部分数据并可视化。RPC 接口如下所示:
namespace java org.smartloli.kafka.eagle.ipc service KafkaOffsetServer{ string query(1:string group,2:string topic,3:i32 partition), string getOffset(), string sql(1:string sql), string getConsumer(), string getActiverConsumer() }
这里,如果我们不想写接口来操作 offset,可以通过 SQL 来操作消费的 offset 数组,使用方式如下所示:
- 引入依赖JAR
<dependency> <groupId>org.smartloli</groupId> <artifactId>jsql-client</artifactId> <version>1.0.0</version> </dependency>
- 使用接口
JSqlUtils.query(tabSchema, tableName, dataSets, sql);
tabSchema:表结构;tableName:表名;dataSets:数据集;sql:操作的SQL语句。
4.预览
消费者预览如下图所示:
正在消费的关系图如下所示:
消费详细 offset 如下所示:
消费和生产的速率图,如下所示:
5.总结
这里,说明一下,当 offset 存入到 Kafka 的topic中后,消费线程ID信息并没有记录,不过,我们通过阅读Kafka消费线程ID的组成规则后,可以手动生成,其消费线程ID由:Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由于消费者在其他节点,我们暂时无法确定ConsumerLocalAddress。最后,欢迎大家使用 Kafka 集群监控 ——[ Kafka Eagle ],[ 操作手册 ]。
转载:http://www.cnblogs.com/smartloli/p/6266453.html
相关推荐
这个工具使得管理员能够更方便地管理和维护Kafka集群,特别是对于offset值的修改,这是一个非常重要的特性,因为offset是Kafka消费者跟踪消息位置的关键。 首先,了解Kafka的基本概念。Kafka是一个分布式流处理平台...
包含Mac 和 windows版本, 可以连接kafka,非常方便的查看topic、consumer、consumer-group 等信息。 1、首先在Properties页签下填写好 zookeeper 地址和端口 2、再从 Advanced页签下填写 broker地址和端口
有个别同学反馈上一次传的有部分资源没有完全本地化,所以今天特地重新编译了一版重新上传,再此也和之前下载的同学说声抱歉。希望对大家有用。
kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset ...
《Kafka Tool Offset Explorer 2.2:洞察Kafka数据流的神器》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的消息中间件,扮演着至关重要的角色。而Kafka Tool Offset Explorer 2.2则是一款专为Kafka设计的...
而"offset kafka监控工具"则是针对Kafka集群进行管理和监控的重要辅助工具,它允许用户查看和管理Kafka主题中的消费偏移量,这对于理解和调试生产者和消费者的同步状态至关重要。 "offset explore"是这类工具的典型...
Offset Explorer(原名Kafka Tool)是一款用于管理和使用Apache Kafka集群的图形用户界面(GUI)应用程序。它为用户提供了直观的UI界面,方便快速查看Kafka集群中的对象以及集群主题中存储的消息。 - 最新版本的...
kafka-0.9.0-重置offset-ResetOff.java;
《Kafka-Manager 1.3.3.22:解决Unknown offset schema version 3异常详解》 在大数据处理领域,Apache Kafka作为一个高效、可扩展的实时数据流平台,广泛应用于消息传递和数据集成。然而,在实际操作中,用户可能...
offsetExplore2 实际上是 Kafka 的一个工具,用于管理和监控 Apache Kafka 中的偏移量(offset)。在 Kafka 中,偏移量是用来标识消费者在一个特定分区中的位置的标识符,它可以用来记录消费者消费消息的进度。 ...
KafkaOffsetMonitor 最新版本 0.4.6 , google js 替换已替换 内外可以访问页面
共有3个安装包: kafka:kafka_2.12-2.8.0.tgz zookeeper:apache-zookeeper-3.7.0-bin.tar.gz kafka可视化工具:offsetexplorer_64bit.exe
### Apache Flink 如何管理 Kafka 消费者 Offsets #### 一、Flink与Kafka结合实现Checkpointing 在探讨Flink如何管理和利用Kafka消费者Offsets的过程中,首先要理解Flink与Kafka如何共同实现检查点(Checkpointing...
Spring Boot 中如何实现 Kafka 指定 Offset 消费 在 Spring Boot 中实现 Kafka 指定 Offset 消费是非常重要的,特别是在生产环境中,需要重新消费某个 Offset 的数据时。下面我们将详细介绍如何在 Spring Boot 中...
Kafka Tools 3.0 支持 JAAS config 配置 支持最新版本Kafka 3.7
Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
- **消费者组重置**: `offset-reset` 命令可以帮助你将消费者组的偏移量重置到特定位置,如最早的记录或最新的记录。 **5. 数据操作** - **生产消息**: `produce` 命令可以向指定主题发送消息,这对于测试和调试...
4. **查看与管理Offsets**:Kafkatool允许用户查看每个partition的最小和最大offset,以及consumer group的当前offset。用户还可以手动设置或重置offset,这对于调试和测试是非常有用的。 5. **创建与删除Topic**:...
kafka-connect-storage-cloud是一套旨在用于在Kafka和公共云存储(例如Amazon S3)之间复制数据。 下面列出了当前可用的连接器: 适用于Amazon Simple Storage Service(S3)的Kafka Connect接收器连接器 可以在...