`

Kafka启动的流程

 
阅读更多

调用KafkaServer的startup方法启动kafka

 /**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup() {
    try {
      info("starting")
      brokerState.newState(Starting)
      isShuttingDown = new AtomicBoolean(false)
      shutdownLatch = new CountDownLatch(1)

      /* start scheduler */
      kafkaScheduler.startup()
    
      /* setup zookeeper */
      zkClient = initZk()

      /* start log manager */
      logManager = createLogManager(zkClient, brokerState)
      logManager.startup()
      //socket server接收请求,和处理请求
      socketServer = new SocketServer(config.brokerId,
                                      config.hostName,
                                      config.port,
                                      config.numNetworkThreads,
                                      config.queuedMaxRequests,
                                      config.socketSendBufferBytes,
                                      config.socketReceiveBufferBytes,
                                      config.socketRequestMaxBytes,
                                      config.maxConnectionsPerIp,
                                      config.connectionsMaxIdleMs,
                                      config.maxConnectionsPerIpOverrides)
      socketServer.startup()
      //replicat manager,负责写多个副本
      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)

      /* start offset manager */
      offsetManager = createOffsetManager()
      //kafka controller,相当于master,用于协调管理,kafka的master和broker合在一起,不用单独部署
      kafkaController = new KafkaController(config, zkClient, brokerState)
    
      /* start processing requests */
      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
     //请求处理线程池 
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
      brokerState.newState(RunningAsBroker)
   
      Mx4jLoader.maybeLoad()

      replicaManager.startup()

      kafkaController.startup()
      
      //用于监控配置信息的变更
      topicConfigManager = new TopicConfigManager(zkClient, logManager)
      topicConfigManager.startup()
    
      /* tell everyone we are alive */
      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
      kafkaHealthcheck.startup()

    
      registerStats()
      startupComplete.set(true)
      info("started")
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        shutdown()
        throw e
    }
  }

 

分享到:
评论

相关推荐

    在linux中搭建kafka集群

    10. 一旦Zookeeper集群成功启动,你就可以继续配置和启动Kafka服务器了,这个过程涉及到创建broker配置,设置broker.id,配置Zookeeper连接,以及启动`kafka-server-start.sh`脚本。 搭建Kafka集群是一个涉及多个...

    Kafka管理工具Kafka Tool

    **Kafka Tool:高效管理...总的来说,Kafka Tool是Kafka管理员和开发者的得力助手,它简化了与Kafka交互的过程,提高了工作效率,是管理复杂Kafka集群不可或缺的工具之一。无论是日常运维还是问题排查,都能从中受益。

    flume-kafka流程

    ### Flume-Kafka集成流程详解 #### 一、Flume与Kafka简介 - **Flume**:Flume是一款高可靠、高性能的日志采集、聚合和传输系统,支持在日志系统中定制各类数据发送方无缝地接入。 - **Kafka**:Kafka是一个分布式...

    kafka-0.8.1.1总结1

    1. **Kafka启动过程** 包括初始化ZooKeeper连接、加载配置、创建Socket服务器等步骤。 2. **日志初始化和清理过程** 日志分区在启动时创建,定期进行清理以保持大小和时间限制。 3. **选举Controller过程** 当...

    kafka可视化工具--kafkatool

    安装完成后,通过命令行界面输入“kafkatool”即可启动工具。 **最佳实践与注意事项**: - 在生产环境中使用Kafkatool时,确保了解操作的影响,避免误操作导致数据丢失。 - 定期更新Kafkatool至最新版本,以获取新...

    kafka集群部署步骤

    - 上述命令同样会在后台启动Kafka服务,并将日志输出到指定的日志文件中。 #### 四、测试Kafka集群 - **创建Topic:** - 使用`kafka-topics.sh`脚本创建名为`test`的Topic: ```bash bin/kafka-topics.sh --...

    OGG_KAFKA问题及解决方法

    KAFKA和OracleGoldenGate安装过程中出现的问题及其解决方法

    sh代码-Shell 脚本方式启动Kafka服务

    总的来说,使用Shell脚本启动Kafka服务是运维工作中常见的实践,它简化了操作流程,提高了效率。通过理解`main.sh`中的逻辑,你可以根据自己的需求定制脚本,适应不同的环境和需求。记住,编写和维护这类脚本时,...

    StormStorm集成Kafka 从Kafka中读取数据

    5. 启动拓扑:提交拓扑到Storm集群,开始从Kafka读取和处理数据。 在处理数据时,Storm会维护一个内部offset(偏移量)来跟踪在Kafka中的位置,保证数据不丢失。`KafkaSpout`会自动处理容错和幂等性,确保在出现...

    kafka一键启停脚本

    5. **监控与反馈**:在启动和停止过程中,脚本可能会监测日志文件,检查Kafka服务是否成功启动或停止,并将结果输出给用户。 6. **故障处理**:如果在启动或停止过程中遇到问题,脚本可能包含错误处理逻辑,如重试...

    kafka资源下载kafka_2.11-2.0.0.tgz

    - 启动Kafka Broker:`$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties`。 - 创建主题并发布消息进行测试,验证安装是否成功。 #### 版本特性 Kafka 2.0.0版本主要引入了以下特性...

    介绍kafka及kafka集群安装

    通过本文档的学习,我们不仅深入了解了 Kafka 的架构原理和使用方法,还掌握了 Kafka 集群的安装部署过程。此外,我们还学习了 Kafka 生产者和消费者的 Java API 使用方法,以及 JMS 规范的相关概念。这些知识对于...

    kafka报文模拟工具

    然而,在开发和测试过程中,有时我们需要模拟Kafka报文的发送,以便验证消费者的处理逻辑或者测试系统的性能。这时,一款名为"Kafka报文模拟工具"的软件就显得尤为重要。 该工具的主要功能是模拟Kafka生产者的行为...

    Kafka简介及使用PHP处理Kafka消息

    1. 启动 zookeeper 和 kafka 2. 创建由 2 个 partition 组成的、名为 testtopic 的 topic 3. 使用 PHP 生产、消费 Kafka 消息 Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,...

    kafka-manager-2.0.0.2.zip

    这大大简化了安装过程,使得非开发人员也能轻松管理自己的Kafka集群。这个压缩包文件包含的文件列表可能包括启动脚本、配置文件、依赖库等,具体如下: - `bin`: 存放启动和停止Kafka Manager的脚本。 - `conf`: ...

    Kafka Tool 2.0.7(linux系统)

    这就是我们用来启动 Kafka Tool 的命令。在终端中切换到解压后的目录,并执行: ``` bash kafkatool.sh ``` 4. **确认安装**: 脚本运行时,可能会出现一些提示,比如询问是否同意许可协议。输入 `yes` 并按回车...

    kafka 可运行实例

    它简化了从Kafka主题读取数据、处理数据以及将结果写回新主题的过程。 6. **Kafka Connect**:这是一个用于集成Kafka与其他系统的工具,可以方便地导入和导出数据,比如从数据库到Kafka,或者从Kafka到数据仓库。 ...

    免安装配置的kafka环境

    总的来说,这个免安装的Kafka环境极大地简化了Windows用户的部署流程,让开发者可以快速地投入到Kafka的学习和应用中。只需解压、启动脚本,就可以开始处理实时数据流,对于学习、测试或快速原型开发非常友好。但...

    kafka_2.12-2.4.1.zip

    《Kafka 2.4.1在Windows与Linux环境下的部署与使用》 ...理解Kafka的核心概念和操作流程,对于构建实时数据处理系统至关重要。在实践中不断探索,你将能够充分利用Kafka的强大功能,实现高效的数据流转与处理。

Global site tag (gtag.js) - Google Analytics