`

Kafka 启动过程

 
阅读更多

1, 每个broker启动的时候都会去注册一个临时节点 /controller, 那个broker先注册这个节点,那个就是所有broker的leader,并将自己的信息写入到这个临时节点里面。如下:

[zk: 10.3.63.204:2181,10.3.63.205:2181(CONNECTED) 3] get /controller
{"version":1,"brokerid":0,"timestamp":"1407310302044"}
cZxid = 0x700000592
ctime = Wed Aug 06 15:32:01 CST 2014
mZxid = 0x700000592
mtime = Wed Aug 06 15:32:01 CST 2014
pZxid = 0x700000592
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x147aa389edd0001
dataLength = 54
numChildren = 0

每个broker都会起动kafkaController这个进程, 但只有一个是leader,controller主要是负责删除一些多余的

topic或者其他选举某个topic的pation的leader使用。

 

2,当关闭的时候,回调用KafkaServer的shutdown方法, 里面会先尝试关闭controller,具体调用代码如下:

CoreUtils.swallow(controlledShutdown())。
代码的逻辑是从zookeeper的controller读出leader的id,并从broker/ids/id读出broker的信息, 然后发送一个

ControlledShutdownRequest的请求到它上面,直到读到成功返回后才说明shutdownSuccessed

 

3,  具体处理这个请求的逻辑世在KafkaApis中来处理的, 具体的代码如下:

  def handleControlledShutdownRequest(request: RequestChannel.Request) {
    // ensureTopicExists is only for client facing requests
    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
    // stop serving data to clients for the topic being deleted
    val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
    val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
    val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
      ErrorMapping.NoError, partitionsRemaining)
    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
  }

 

 

里面可以看到,发送到主的leader上面,调用KafkaController的 def shutdownBroker(id: Int),工作的具体内容是循环topic的partittion, 然后判断当前的分区是否是主的, // If the broker leads the topic partition, transition the leader and update isr. Updates zk and // notifies all affected brokers

如果不是的,

// Stop the replica first. The state change below initiates ZK changes which should take some time

// before which the stop replica request should be completed (in most cases)

 

对应的问题是, 如果关闭controller的时间足够长的话,会导致timeout,然后会重新发送关闭的请求。因为锁的缘故,回导致再次的请求也会超时。这样会导致controller的非正常关闭, 重新启动时会有会滚的操作。 虽然这种情况下不会影响到具体的使用。

 

https://issues.apache.org/jira/browse/KAFKA-1342

 

 

 

分析启动过程:

1, 设置状态为Starting

2, kafkaScheduler.startup - 主要是后台需要定时执行的一些任务

3, initZk - 初始化和zookeeper的链接

4, logManager.startup - 这个主要是通过上面的scheduler来定时循环执行三个任务:kafka-log-retention kafka-log-flusher kafka-recovery-point-checkpoint,如果配置了清理的话,还会起动

5, socketServer.startup,是个NIO的服务, 线程模型如下

1 Acceptor thread that handles new connections

N Processor threads that each have their own selector and read requests from sockets

M Handler threads that handle requests and produce responses back to the processor threads for writing.

6, replicaManager.startup - 主要是通过调度器定时执行 maybeShrinkIsr方法的线程

7, createOffsetManager - 通过调度器启动定时执行 compact 方法的线程

8, kafkaController.startup - 注册zk session失效事件,竞争leader。如果是leader的话,则会回调

kafkaController的 onControllerFailover 方法。

9, consumerCoordinator.startup - Kafka coordinator handles consumer group and consumer offset management 主要是处理消费组和消费者偏移量的问题

10, start processing requests requestHandlerPool-KafkaApis 主要是通过KafkaRequestHandlerPool

来启动处理请求的线程,每个线程实际最后调用的还是KafkaApis

11, 设置状态 runningAsBroker

12, topicConfigManager.startup - 主要监听 /config/changes,然后 Process the given list of config changes

13, tell everyone we are alive - KafkaHealthcheck.startup,主要是和zk保持心跳连接

14, register broker metrics - 主要是一些统计信息

 

 

Broker的状态

broker 有以下几种状态

case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object RunningAsController extends BrokerStates { val state: Byte = 4 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }

 

状态之间的流转图如下:

 

/**
 * Broker states are the possible state that a kafka broker can be in.
 * A broker should be only in one state at a time.
 * The expected state transition with the following defined states is:
 *
 *                +-----------+
 *                |Not Running|
 *                +-----+-----+
 *                      |
 *                      v
 *                +-----+-----+
 *                |Starting   +--+
 *                +-----+-----+  | +----+------------+
 *                      |        +>+RecoveringFrom   |
 *                      v          |UncleanShutdown  |
 * +----------+     +-----+-----+  +-------+---------+
 * |RunningAs |     |RunningAs  |            |
 * |Controller+<--->+Broker     +<-----------+
 * +----------+     +-----+-----+
 *        |              |
 *        |              v
 *        |       +-----+------------+
 *        |-----> |PendingControlled |
 *                |Shutdown          |
 *                +-----+------------+
 *                      |
 *                      v
 *               +-----+----------+
 *               |BrokerShutting  |
 *               |Down            |
 *               +-----+----------+
 *                     |
 *                     v
 *               +-----+-----+
 *               |Not Running|
 *               +-----------+
 *
 * Custom states is also allowed for cases where there are custom kafka states for different scenarios.
 */

 

 

 

 

 

分享到:
评论

相关推荐

    kafka-0.8.1.1总结1

    1. **Kafka启动过程** 包括初始化ZooKeeper连接、加载配置、创建Socket服务器等步骤。 2. **日志初始化和清理过程** 日志分区在启动时创建,定期进行清理以保持大小和时间限制。 3. **选举Controller过程** 当...

    kafka--summary:kafka学习总结,源码剖析

    kafka-summarykafka学习总结,源码剖析目录一、基础篇开篇说明概念说明配置说明znode分类kafka协议分类Kafka线程日志存储格式kakfa架构设计二、流程篇1、kafka启动过程2、日志初始化和清理过程3、选举controller过程...

    在linux中搭建kafka集群

    10. 一旦Zookeeper集群成功启动,你就可以继续配置和启动Kafka服务器了,这个过程涉及到创建broker配置,设置broker.id,配置Zookeeper连接,以及启动`kafka-server-start.sh`脚本。 搭建Kafka集群是一个涉及多个...

    Kafka管理工具Kafka Tool

    **Kafka Tool:高效管理...总的来说,Kafka Tool是Kafka管理员和开发者的得力助手,它简化了与Kafka交互的过程,提高了工作效率,是管理复杂Kafka集群不可或缺的工具之一。无论是日常运维还是问题排查,都能从中受益。

    kafka可视化工具--kafkatool

    安装完成后,通过命令行界面输入“kafkatool”即可启动工具。 **最佳实践与注意事项**: - 在生产环境中使用Kafkatool时,确保了解操作的影响,避免误操作导致数据丢失。 - 定期更新Kafkatool至最新版本,以获取新...

    OGG_KAFKA问题及解决方法

    KAFKA和OracleGoldenGate安装过程中出现的问题及其解决方法

    StormStorm集成Kafka 从Kafka中读取数据

    5. 启动拓扑:提交拓扑到Storm集群,开始从Kafka读取和处理数据。 在处理数据时,Storm会维护一个内部offset(偏移量)来跟踪在Kafka中的位置,保证数据不丢失。`KafkaSpout`会自动处理容错和幂等性,确保在出现...

    介绍kafka及kafka集群安装

    通过本文档的学习,我们不仅深入了解了 Kafka 的架构原理和使用方法,还掌握了 Kafka 集群的安装部署过程。此外,我们还学习了 Kafka 生产者和消费者的 Java API 使用方法,以及 JMS 规范的相关概念。这些知识对于...

    kafka一键启停脚本

    5. **监控与反馈**:在启动和停止过程中,脚本可能会监测日志文件,检查Kafka服务是否成功启动或停止,并将结果输出给用户。 6. **故障处理**:如果在启动或停止过程中遇到问题,脚本可能包含错误处理逻辑,如重试...

    kafka-manager-2.0.0.2.zip

    这大大简化了安装过程,使得非开发人员也能轻松管理自己的Kafka集群。这个压缩包文件包含的文件列表可能包括启动脚本、配置文件、依赖库等,具体如下: - `bin`: 存放启动和停止Kafka Manager的脚本。 - `conf`: ...

    代码:kafka数据接入到mysql中

    - 启动Kafka Connect服务,它会读取配置文件,并开始将Kafka中的数据写入MySQL。 5. **监控和管理** - 通过Kafka Connect REST API或Confluent Control Center(如果使用了Confluent Platform)来监控数据迁移...

    kafka 可运行实例

    它简化了从Kafka主题读取数据、处理数据以及将结果写回新主题的过程。 6. **Kafka Connect**:这是一个用于集成Kafka与其他系统的工具,可以方便地导入和导出数据,比如从数据库到Kafka,或者从Kafka到数据仓库。 ...

    Kafka Tool 2.0.7(linux系统)

    这就是我们用来启动 Kafka Tool 的命令。在终端中切换到解压后的目录,并执行: ``` bash kafkatool.sh ``` 4. **确认安装**: 脚本运行时,可能会出现一些提示,比如询问是否同意许可协议。输入 `yes` 并按回车...

    kafka搭建.zip

    在本压缩包“kafka搭建.zip”中,我们聚焦于Apache Kafka的集群搭建过程,它是一个分布式流处理平台,常用于大数据实时处理、消息传递和日志聚合等场景。Kafka集群的构建离不开Zookeeper,这是一个分布式协调服务,...

    kafka报文模拟工具

    然而,在开发和测试过程中,有时我们需要模拟Kafka报文的发送,以便验证消费者的处理逻辑或者测试系统的性能。这时,一款名为"Kafka报文模拟工具"的软件就显得尤为重要。 该工具的主要功能是模拟Kafka生产者的行为...

    Kafka简介及使用PHP处理Kafka消息

    1. 启动 zookeeper 和 kafka 2. 创建由 2 个 partition 组成的、名为 testtopic 的 topic 3. 使用 PHP 生产、消费 Kafka 消息 Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,...

    spring-kafka 整合官方文档

    通过这些指南,开发者可以快速启动并运行一个基于Spring和Kafka的应用程序。 参考部分详细介绍了如何使用Spring for Apache Kafka。具体包括配置主题、发送消息、KafkaTemplate的使用、事务处理、回复式Kafka模板的...

    可视化kafka测试工具

    本文将详细介绍一款可视化Kafka测试工具,该工具能够简化Kafka消息的生产和消费过程,并提供直观的界面来帮助我们理解Kafka的工作原理。 **工具介绍** 这款可视化Kafka测试工具的主要功能是模拟发送Topic消息到...

    kafkatool kafka可视化工具

    本文将详细介绍 "Kafka Tool" 这款64位的 Kafka 可视化工具,帮助用户更直观地理解其功能、安装过程以及如何利用它提升 Kafka 的操作效率。 **一、Kafka Tool 功能概述** 1. **集群管理**:Kafka Tool 提供了集群...

    kafka_springboot_kafka_

    Spring Boot简化了在Java应用中配置和使用Kafka的过程。以下是整合Spring Boot和Kafka的关键步骤和知识点: 1. **添加依赖**:首先,你需要在`pom.xml`中引入Spring Boot的Kafka starter依赖。这将包含所有必要的库...

Global site tag (gtag.js) - Google Analytics