`

kafka TopicConfigManager类

阅读更多

topicconfigManager类

主要流程为

1.监控config/change节点,那个topic的config变化了

2.从zk上的topic的config目录,获取最新config信息

3.更新logmanager里指定topic的tplog(每个topic每个partition对应一个log)配置

  /**
   * 注册config change的listener
   * Begin watching for config changes
   */
  def startup() {
    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
    //监听/config/changes的子节点,ConfigChangeListener
    zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
    //启动服务,检查是否有topic的config需要更新,使用跟ConfigChangeListener相同的方法processConfigChanges
    processAllConfigChanges()
  }

主要方法processConfigChanges

 

/**
   * change config topic需要
   * 1.设置zk上的topic config;
   * 2.在zk上添加一个notification,标志哪个topic的config被改变
   * Process the given list of config changes
   */
  private def processConfigChanges(notifications: Seq[String]) {
    if (notifications.size > 0) {
      info("Processing config change notification(s)...")
      val now = time.milliseconds
      val logs = logManager.logsByTopicPartition.toBuffer
      //group by topic,Buffer[Log]                         buffer._2 := Log
      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
      for (notification <- notifications) {
        val changeId = changeNumber(notification)
        if (changeId > lastExecutedChange) {//changeid是比现在新的
          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
          if(jsonOpt.isDefined) {
            val json = jsonOpt.get
            val topic = json.substring(1, json.length - 1) // hacky way to dequote
            if (logsByTopic.contains(topic)) {
              /* combine the default properties with the overrides in zk to create the new LogConfig */
              val props = new Properties(logManager.defaultConfig.toProps)
              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))//获得最新topic config和default prop的合并值
              val logConfig = LogConfig.fromProps(props)
              for (log <- logsByTopic(topic))//获得当前logmanager对象中所有这个topic的log对象
                log.config = logConfig
              info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
              purgeObsoleteNotifications(now, notifications)
            }
          }
          lastExecutedChange = changeId
        }
      }
    }
  }

 

 

 

0
1
分享到:
评论

相关推荐

    java_kafka交互工具类.rar

    本资源"java_kafka交互工具类.rar"提供了一套Java集成Kafka的工具类,帮助开发者便捷地进行单机或集群环境下的Kafka操作。以下将详细介绍其中可能包含的关键知识点: 1. **Kafka基本概念** - **主题(Topic)**:...

    SpringBoot整合Kafka配置类方式

    SpringBoot整合Kafka配置类方式

    kafkatool 连接kafka工具

    **Kafka Tool 连接 Kafka 工具详解** 在大数据处理和实时流处理领域,Apache Kafka 是一个不可或缺的组件,它作为一个分布式的消息中间件,提供高效、可扩展且可靠的发布订阅服务。为了方便管理和操作 Kafka 集群,...

    Kafka管理工具Kafka Tool

    **Kafka Tool:高效管理Apache Kafka集群的利器** Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而...

    已编译的kafka manager

    Kafka Manager是一款强大的开源工具,专门用于管理Apache Kafka集群。这款工具由Yahoo!开发,旨在提供一个用户友好的界面,使Kafka集群的管理和监控变得更加简单。在本压缩包中,你得到了预编译的Kafka Manager版本...

    springboot 基于spring-kafka动态创建kafka消费者

    3. **创建消费者配置类**:创建一个配置类,使用`@Configuration`和`@EnableKafka`注解启用Kafka消费者。这里可以定义消费者的属性,如key和value的序列化方式。 ```java @Configuration @EnableKafka public class...

    kafka可视化工具--kafkatool

    **Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...

    kafka-java-demo 基于java的kafka生产消费者示例

    在Java中,我们可以创建一个Producer实例,配置相关的生产者属性(如bootstrap servers、key-value序列化类等),然后使用`send()`方法将消息发送到指定的主题。 【Kafka Consumer】 Kafka消费者则用于订阅和消费...

    kafka2种工具 kafkatool-64bit.exe kafka-eagle-bin-1.4.6.tar.gz

    在IT行业中,Kafka是一种广泛使用的分布式流处理平台,它由Apache软件基金会开发,主要用于构建实时数据管道和流应用。本文将围绕标题和描述中提到的两种Kafka工具——kafkatool-64bit.exe和kafka-eagle-bin-1.4.6....

    StormStorm集成Kafka 从Kafka中读取数据

    本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...

    kafka安装包-2.13-3.6.2

    **Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...

    5、kafka监控工具Kafka-Eagle介绍及使用

    Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...

    Kafka详细课程讲义

    **Kafka详细课程讲义** 本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache...

    kafka-python

    2. **创建消费者**:使用`KafkaConsumer`类,可以选择group_id(消费组),设置auto_offset_reset(自动偏移重置策略)等参数。消费者可以通过`assign()`方法手动指定要消费的分区,或通过`subscribe()`方法订阅主题...

    Kafka Tool linux版本,适用于kafka0.11及以上

    **Kafka Tool for Linux: 管理与使用Apache Kafka集群的高效工具** Apache Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用。Kafka Tool是针对Kafka集群进行管理和操作的一款图形用户界面(GUI)工具...

    Kafka the Definitive Guide 2nd Edition

    Kafka the Definitive Guide Kafka 是一个分布式流媒体平台,用于构建实时数据处理和流媒体处理系统。下面是 Kafka 的一些重要知识点: 1. Kafka 概述 Kafka 是一个基于发布/订阅模式的消息队列系统,由 LinkedIn...

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    Kafka尚硅谷.rar

    **Kafka概述** Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源消息系统,它是一个高性能、可扩展的分布式消息中间件。Kafka最初设计的目标是处理网站活动流数据,但随着时间的发展,它已被广泛应用于...

    kafka连接工具客户端.rar

    2. **Kafka可视化工具**:压缩包中的“kafka工具”可能包含了像Kafka Manager、Confluent Control Center或者Kafka Tool之类的可视化工具。例如,Kafka Manager提供了一个web界面,可以清晰地展示Kafka集群的状态,...

    kafka封装 订阅和发布类

    在这个“kafka封装 订阅和发布类”的主题中,我们将深入探讨如何对Kafka的订阅和发布功能进行简单的封装,以便更方便地处理数据。 首先,我们需要理解Kafka的核心概念:生产者(Producer)和消费者(Consumer)。...

Global site tag (gtag.js) - Google Analytics