1, 每个broker启动的时候都会去注册一个临时节点 /controller, 那个broker先注册这个节点,那个就是所有broker的leader,并将自己的信息写入到这个临时节点里面。如下:
[zk: 10.3.63.204:2181,10.3.63.205:2181(CONNECTED) 3] get /controller
{"version":1,"brokerid":0,"timestamp":"1407310302044"}
cZxid = 0x700000592
ctime = Wed Aug 06 15:32:01 CST 2014
mZxid = 0x700000592
mtime = Wed Aug 06 15:32:01 CST 2014
pZxid = 0x700000592
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x147aa389edd0001
dataLength = 54
numChildren = 0
每个broker都会起动kafkaController这个进程, 但只有一个是leader,controller主要是负责删除一些多余的
topic或者其他选举某个topic的pation的leader使用。
2,当关闭的时候,回调用KafkaServer的shutdown方法, 里面会先尝试关闭controller,具体调用代码如下:
CoreUtils.swallow(controlledShutdown())。
代码的逻辑是从zookeeper的controller读出leader的id,并从broker/ids/id读出broker的信息, 然后发送一个
ControlledShutdownRequest的请求到它上面,直到读到成功返回后才说明shutdownSuccessed
3, 具体处理这个请求的逻辑世在KafkaApis中来处理的, 具体的代码如下:
def handleControlledShutdownRequest(request: RequestChannel.Request) { // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.NoError, partitionsRemaining) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) }
里面可以看到,发送到主的leader上面,调用KafkaController的 def shutdownBroker(id: Int),工作的具体内容是循环topic的partittion, 然后判断当前的分区是否是主的, // If the broker leads the topic partition, transition the leader and update isr. Updates zk and // notifies all affected brokers
如果不是的,
// Stop the replica first. The state change below initiates ZK changes which should take some time
// before which the stop replica request should be completed (in most cases)
对应的问题是, 如果关闭controller的时间足够长的话,会导致timeout,然后会重新发送关闭的请求。因为锁的缘故,回导致再次的请求也会超时。这样会导致controller的非正常关闭, 重新启动时会有会滚的操作。 虽然这种情况下不会影响到具体的使用。
https://issues.apache.org/jira/browse/KAFKA-1342
分析启动过程:
1, 设置状态为Starting
2, kafkaScheduler.startup - 主要是后台需要定时执行的一些任务
3, initZk - 初始化和zookeeper的链接
4, logManager.startup - 这个主要是通过上面的scheduler来定时循环执行三个任务:kafka-log-retention kafka-log-flusher kafka-recovery-point-checkpoint,如果配置了清理的话,还会起动
5, socketServer.startup,是个NIO的服务, 线程模型如下
1 Acceptor thread that handles new connections
N Processor threads that each have their own selector and read requests from sockets
M Handler threads that handle requests and produce responses back to the processor threads for writing.
6, replicaManager.startup - 主要是通过调度器定时执行 maybeShrinkIsr方法的线程
7, createOffsetManager - 通过调度器启动定时执行 compact 方法的线程
8, kafkaController.startup - 注册zk session失效事件,竞争leader。如果是leader的话,则会回调
kafkaController的 onControllerFailover 方法。
9, consumerCoordinator.startup - Kafka coordinator handles consumer group and consumer offset management 主要是处理消费组和消费者偏移量的问题
10, start processing requests requestHandlerPool-KafkaApis 主要是通过KafkaRequestHandlerPool
来启动处理请求的线程,每个线程实际最后调用的还是KafkaApis
11, 设置状态 runningAsBroker
12, topicConfigManager.startup - 主要监听 /config/changes,然后 Process the given list of config changes
13, tell everyone we are alive - KafkaHealthcheck.startup,主要是和zk保持心跳连接
14, register broker metrics - 主要是一些统计信息
Broker的状态
broker 有以下几种状态
case object NotRunning extends BrokerStates { val state: Byte = 0 } case object Starting extends BrokerStates { val state: Byte = 1 } case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 } case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } case object RunningAsController extends BrokerStates { val state: Byte = 4 } case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 } case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
状态之间的流转图如下:
/**
* Broker states are the possible state that a kafka broker can be in.
* A broker should be only in one state at a time.
* The expected state transition with the following defined states is:
*
* +-----------+
* |Not Running|
* +-----+-----+
* |
* v
* +-----+-----+
* |Starting +--+
* +-----+-----+ | +----+------------+
* | +>+RecoveringFrom |
* v |UncleanShutdown |
* +----------+ +-----+-----+ +-------+---------+
* |RunningAs | |RunningAs | |
* |Controller+<--->+Broker +<-----------+
* +----------+ +-----+-----+
* | |
* | v
* | +-----+------------+
* |-----> |PendingControlled |
* |Shutdown |
* +-----+------------+
* |
* v
* +-----+----------+
* |BrokerShutting |
* |Down |
* +-----+----------+
* |
* v
* +-----+-----+
* |Not Running|
* +-----------+
*
* Custom states is also allowed for cases where there are custom kafka states for different scenarios.
*/
相关推荐
1. **Kafka启动过程** 包括初始化ZooKeeper连接、加载配置、创建Socket服务器等步骤。 2. **日志初始化和清理过程** 日志分区在启动时创建,定期进行清理以保持大小和时间限制。 3. **选举Controller过程** 当...
kafka-summarykafka学习总结,源码剖析目录一、基础篇开篇说明概念说明配置说明znode分类kafka协议分类Kafka线程日志存储格式kakfa架构设计二、流程篇1、kafka启动过程2、日志初始化和清理过程3、选举controller过程...
10. 一旦Zookeeper集群成功启动,你就可以继续配置和启动Kafka服务器了,这个过程涉及到创建broker配置,设置broker.id,配置Zookeeper连接,以及启动`kafka-server-start.sh`脚本。 搭建Kafka集群是一个涉及多个...
**Kafka Tool:高效管理...总的来说,Kafka Tool是Kafka管理员和开发者的得力助手,它简化了与Kafka交互的过程,提高了工作效率,是管理复杂Kafka集群不可或缺的工具之一。无论是日常运维还是问题排查,都能从中受益。
安装完成后,通过命令行界面输入“kafkatool”即可启动工具。 **最佳实践与注意事项**: - 在生产环境中使用Kafkatool时,确保了解操作的影响,避免误操作导致数据丢失。 - 定期更新Kafkatool至最新版本,以获取新...
KAFKA和OracleGoldenGate安装过程中出现的问题及其解决方法
5. 启动拓扑:提交拓扑到Storm集群,开始从Kafka读取和处理数据。 在处理数据时,Storm会维护一个内部offset(偏移量)来跟踪在Kafka中的位置,保证数据不丢失。`KafkaSpout`会自动处理容错和幂等性,确保在出现...
通过本文档的学习,我们不仅深入了解了 Kafka 的架构原理和使用方法,还掌握了 Kafka 集群的安装部署过程。此外,我们还学习了 Kafka 生产者和消费者的 Java API 使用方法,以及 JMS 规范的相关概念。这些知识对于...
5. **监控与反馈**:在启动和停止过程中,脚本可能会监测日志文件,检查Kafka服务是否成功启动或停止,并将结果输出给用户。 6. **故障处理**:如果在启动或停止过程中遇到问题,脚本可能包含错误处理逻辑,如重试...
这大大简化了安装过程,使得非开发人员也能轻松管理自己的Kafka集群。这个压缩包文件包含的文件列表可能包括启动脚本、配置文件、依赖库等,具体如下: - `bin`: 存放启动和停止Kafka Manager的脚本。 - `conf`: ...
它简化了从Kafka主题读取数据、处理数据以及将结果写回新主题的过程。 6. **Kafka Connect**:这是一个用于集成Kafka与其他系统的工具,可以方便地导入和导出数据,比如从数据库到Kafka,或者从Kafka到数据仓库。 ...
这就是我们用来启动 Kafka Tool 的命令。在终端中切换到解压后的目录,并执行: ``` bash kafkatool.sh ``` 4. **确认安装**: 脚本运行时,可能会出现一些提示,比如询问是否同意许可协议。输入 `yes` 并按回车...
在本压缩包“kafka搭建.zip”中,我们聚焦于Apache Kafka的集群搭建过程,它是一个分布式流处理平台,常用于大数据实时处理、消息传递和日志聚合等场景。Kafka集群的构建离不开Zookeeper,这是一个分布式协调服务,...
- 启动Kafka Connect服务,它会读取配置文件,并开始将Kafka中的数据写入MySQL。 5. **监控和管理** - 通过Kafka Connect REST API或Confluent Control Center(如果使用了Confluent Platform)来监控数据迁移...
1. 启动 zookeeper 和 kafka 2. 创建由 2 个 partition 组成的、名为 testtopic 的 topic 3. 使用 PHP 生产、消费 Kafka 消息 Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,...
通过这些指南,开发者可以快速启动并运行一个基于Spring和Kafka的应用程序。 参考部分详细介绍了如何使用Spring for Apache Kafka。具体包括配置主题、发送消息、KafkaTemplate的使用、事务处理、回复式Kafka模板的...
本文将详细介绍一款可视化Kafka测试工具,该工具能够简化Kafka消息的生产和消费过程,并提供直观的界面来帮助我们理解Kafka的工作原理。 **工具介绍** 这款可视化Kafka测试工具的主要功能是模拟发送Topic消息到...
本文将详细介绍 "Kafka Tool" 这款64位的 Kafka 可视化工具,帮助用户更直观地理解其功能、安装过程以及如何利用它提升 Kafka 的操作效率。 **一、Kafka Tool 功能概述** 1. **集群管理**:Kafka Tool 提供了集群...
在`install.rar`文件中,可能包含了Kafka的安装包,按照以下步骤安装和启动Kafka: 1. 解压安装包。 2. 修改`config/server.properties`配置文件,设置`broker.id`和`zookeeper.connect`。 3. 启动Zookeeper服务。 ...
Spring Boot简化了在Java应用中配置和使用Kafka的过程。以下是整合Spring Boot和Kafka的关键步骤和知识点: 1. **添加依赖**:首先,你需要在`pom.xml`中引入Spring Boot的Kafka starter依赖。这将包含所有必要的库...