`
1028826685
  • 浏览: 942523 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

Kafka Offset Storage

 
阅读更多

1.概述

  目前,Kafka 官网最新版[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。

2.内容

  其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了自一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClient的API操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。

  在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的topic中。

  当然,其实她实现的原理也让我们很熟悉,利用 Kafka 自身的 Topic,以消费的Group,Topic,以及Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到 ack(数据安全性极好,当然,其速度会有所影响)。所以 Kafka 又在内存中维护了一个关于 Group,Topic 和 Partition 的三元组来维护最新的 offset 信息,消费者获取最新的offset的时候会直接从内存中获取。

3.实现

  那我们如何实现获取这部分消费的 offset,我们可以在内存中定义一个Map集合,来维护消费中所捕捉到 offset,如下所示:

protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();

  然后,我们通过一个监听线程来更新内存中的Map,代码如下所示:

复制代码
private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(consumerOffsetTopic, new Integer(1));
        KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0);

        ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();
        while (true) {
            MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();
            if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {
                try {
                    GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));
                    if (offsetMsg.message() == null) {
                        continue;
                    }
                    OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
                    offsetMap.put(commitKey, commitValue);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
复制代码

  在拿到这部分更新后的offset数据,我们可以通过 RPC 将这部分数据共享出去,让客户端获取这部分数据并可视化。RPC 接口如下所示:

复制代码
namespace java org.smartloli.kafka.eagle.ipc

service KafkaOffsetServer{
    string query(1:string group,2:string topic,3:i32 partition),
    string getOffset(),
    string sql(1:string sql),
    string getConsumer(),
    string getActiverConsumer()
}
复制代码

  这里,如果我们不想写接口来操作 offset,可以通过 SQL 来操作消费的 offset 数组,使用方式如下所示:

  • 引入依赖JAR
<dependency>
    <groupId>org.smartloli</groupId>
    <artifactId>jsql-client</artifactId>
    <version>1.0.0</version>
</dependency>
  • 使用接口
JSqlUtils.query(tabSchema, tableName, dataSets, sql);

  tabSchema:表结构;tableName:表名;dataSets:数据集;sql:操作的SQL语句。

4.预览

  消费者预览如下图所示:

  正在消费的关系图如下所示:

  消费详细 offset 如下所示:

  消费和生产的速率图,如下所示:

5.总结

  这里,说明一下,当 offset 存入到 Kafka 的topic中后,消费线程ID信息并没有记录,不过,我们通过阅读Kafka消费线程ID的组成规则后,可以手动生成,其消费线程ID由:Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由于消费者在其他节点,我们暂时无法确定ConsumerLocalAddress。最后,欢迎大家使用 Kafka 集群监控 ——[ Kafka Eagle ],[ 操作手册 ]。

 

转载:http://www.cnblogs.com/smartloli/p/6266453.html

分享到:
评论

相关推荐

    kafka-manager1.3.0.8 可修改offset值

    这个工具使得管理员能够更方便地管理和维护Kafka集群,特别是对于offset值的修改,这是一个非常重要的特性,因为offset是Kafka消费者跟踪消息位置的关键。 首先,了解Kafka的基本概念。Kafka是一个分布式流处理平台...

    Mac和Windows版本Kafka可视化工具kafkatool Offset Explorer

    包含Mac 和 windows版本, 可以连接kafka,非常方便的查看topic、consumer、consumer-group 等信息。 1、首先在Properties页签下填写好 zookeeper 地址和端口 2、再从 Advanced页签下填写 broker地址和端口

    Kafka Offset Monitor 资源本地化编译版

    有个别同学反馈上一次传的有部分资源没有完全本地化,所以今天特地重新编译了一版重新上传,再此也和之前下载的同学说声抱歉。希望对大家有用。

    kafka客户端offset

    kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset kafka客户端offset ...

    kafka tool offset explorer 2.2

    《Kafka Tool Offset Explorer 2.2:洞察Kafka数据流的神器》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的消息中间件,扮演着至关重要的角色。而Kafka Tool Offset Explorer 2.2则是一款专为Kafka设计的...

    offset kafka监控工具 免费

    而"offset kafka监控工具"则是针对Kafka集群进行管理和监控的重要辅助工具,它允许用户查看和管理Kafka主题中的消费偏移量,这对于理解和调试生产者和消费者的同步状态至关重要。 "offset explore"是这类工具的典型...

    Offset Explorer(原名Kafka Tool)3.1 版本-Windows64位版本

    Offset Explorer(原名Kafka Tool)是一款用于管理和使用Apache Kafka集群的图形用户界面(GUI)应用程序。它为用户提供了直观的UI界面,方便快速查看Kafka集群中的对象以及集群主题中存储的消息。 - 最新版本的...

    kafka-0.9.0-重置offset-ResetOff.java

    kafka-0.9.0-重置offset-ResetOff.java;

    kafka-manager最新编译版1.3.3.22,解决了异常Unknown offset schema version 3

    《Kafka-Manager 1.3.3.22:解决Unknown offset schema version 3异常详解》 在大数据处理领域,Apache Kafka作为一个高效、可扩展的实时数据流平台,广泛应用于消息传递和数据集成。然而,在实际操作中,用户可能...

    Kafka可视化工具offsetExplore2

    offsetExplore2 实际上是 Kafka 的一个工具,用于管理和监控 Apache Kafka 中的偏移量(offset)。在 Kafka 中,偏移量是用来标识消费者在一个特定分区中的位置的标识符,它可以用来记录消费者消费消息的进度。 ...

    kafka-offset-monitor 0.4.6 内外可以访问页面

    KafkaOffsetMonitor 最新版本 0.4.6 , google js 替换已替换 内外可以访问页面

    kafka、zookeeper、offset Explorer安装包

    共有3个安装包: kafka:kafka_2.12-2.8.0.tgz zookeeper:apache-zookeeper-3.7.0-bin.tar.gz kafka可视化工具:offsetexplorer_64bit.exe

    Apache Flink如何管理Kafka消费者offsets

    ### Apache Flink 如何管理 Kafka 消费者 Offsets #### 一、Flink与Kafka结合实现Checkpointing 在探讨Flink如何管理和利用Kafka消费者Offsets的过程中,首先要理解Flink与Kafka如何共同实现检查点(Checkpointing...

    springboot中如何实现kafa指定offset消费

    Spring Boot 中如何实现 Kafka 指定 Offset 消费 在 Spring Boot 中实现 Kafka 指定 Offset 消费是非常重要的,特别是在生产环境中,需要重新消费某个 Offset 的数据时。下面我们将详细介绍如何在 Spring Boot 中...

    Kafka Tools 3.0,现改名 offsetexplorer

    Kafka Tools 3.0 支持 JAAS config 配置 支持最新版本Kafka 3.7

    Spring Boot集群管理工具KafkaAdminClient使用方法解析

    Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。

    kafkatool 连接kafka工具

    - **消费者组重置**: `offset-reset` 命令可以帮助你将消费者组的偏移量重置到特定位置,如最早的记录或最新的记录。 **5. 数据操作** - **生产消息**: `produce` 命令可以向指定主题发送消息,这对于测试和调试...

    kafka可视化工具--kafkatool

    4. **查看与管理Offsets**:Kafkatool允许用户查看每个partition的最小和最大offset,以及consumer group的当前offset。用户还可以手动设置或重置offset,这对于调试和测试是非常有用的。 5. **创建与删除Topic**:...

    kafka-connect-storage-cloud:用于云存储的Kafka Connect连接器套件套件(当前包括Amazon S3)

    kafka-connect-storage-cloud是一套旨在用于在Kafka和公共云存储(例如Amazon S3)之间复制数据。 下面列出了当前可用的连接器: 适用于Amazon Simple Storage Service(S3)的Kafka Connect接收器连接器 可以在...

Global site tag (gtag.js) - Google Analytics