本章,我们将进入到Kafka的核心类中进行代码走读,深入分析他的存储交互和消息分发原理。
首先给大家展示一张服务端交互图,因为比较复杂我就没有再画,转发别人的一张图以供参考:
大家看完这个图以后相信有了一个整体认识,那么下面我们就重点从整体到细节的逐步分解。
一、KafkaServerStartable
在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装
我们使用命令行./kafka-server-start.sh -daemon ../config/server.properties 进行启动的时候,也是调用的这个类。
class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { private val server = new KafkaServer(serverConfig) def startup() { try { server.startup() AppInfo.registerInfo() } catch { case e: Throwable => fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e) // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code System.exit(1) } } def shutdown() { try { server.shutdown() } catch { case e: Throwable => fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e) System.exit(1) } } /** * Allow setting broker state from the startable. * This is needed when a custom kafka server startable want to emit new states that it introduces. */ def setServerState(newState: Byte) { server.brokerState.newState(newState) } def awaitShutdown() = server.awaitShutdown }
二、Kafka Server类
在KafkaServerStartable中启动了KafkaServer,这代表一个kafka broker, 是kafka的核心,默认的情况下一个KafkaServer就是一个broker。
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析。
this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null var offsetManager: OffsetManager = null var kafkaHealthcheck: KafkaHealthcheck = null var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var zkClient: ZkClient = null
这个些类的初始化和启动代码如下,整个KafkaServer其实就是在初始和启动任务。
def startup() { try { info("starting") brokerState.newState(Starting) isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) /* start scheduler */ kafkaScheduler.startup() /* setup zookeeper */ zkClient = initZk() /* start log manager */ logManager = createLogManager(zkClient, brokerState) logManager.startup() socketServer = new SocketServer(config.brokerId, config.hostName, config.port, config.numNetworkThreads, config.queuedMaxRequests, config.socketSendBufferBytes, config.socketReceiveBufferBytes, config.socketRequestMaxBytes, config.maxConnectionsPerIp, config.connectionsMaxIdleMs, config.maxConnectionsPerIpOverrides) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) /* start offset manager */ offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() registerStats() startupComplete.set(true) info("started") }
说明:
1、KafkaScheduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现
2、由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信
3、LogManager是kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据。
4、replicaManager是partition的备份分区管理。
5、broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的。
6、offsetManager是定期清除过期的offset数据,即compact操作,以及consumer相关的一些offset操作。
7、kafkaController是为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性
8、Kafka apis调用handler的事件类型,进行相关的事件的处理。
请求的事件类型共有ProduceKey, FetchKey, OffsetsKey, MetadataKey, LeaderAndIsrKey, StopReplicaKey。包括我们之前说的producer和consumer与服务端的通信,很多就是通过这些key来获取的。UpdateMetadataKey, ControlledShutdownKey, OffsetCommitKey, OffsetFetchKey
9、TopicConfigManager用于处理topic config的change。
- ./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
比如你可以这样设置,那么这些topic config如何生效的?
topic-level config默认是被存储在,
/brokers/topics/<topic_name>/config 但是topic很多的情况下,为了避免创建太多的watcher,
所以单独创建一个目录
/brokers/config_changes
来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化
/brokers/config_changes/config_change_13321
10、KafkaHealthcheck这个只是心跳检查机制。
接下来的博客将分别对这9个启动项进行逐一介绍。
相关推荐
《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载
### Kafka源码解析与实战 #### 一、Kafka简介 Kafka是由Apache软件基金会开发的一款开源流处理平台,主要用于构建实时数据管道以及基于流的数据处理应用。它以分布式的方式运行,具有高吞吐量、低延迟的特点,适用...
apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社,分为part1和part2,一起下载解压
### Kafka源码解析新手版本(修正版)知识点详解 #### 一、Kafka诞生背景及其在LinkedIn的应用 **1.1 Apache Kafka项目简介** - **诞生背景:** Apache Kafka最初由LinkedIn开发,随后于2011年初开源,并在2012年...
**Kafka 源码分析概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给 Apache 软件基金会。它被设计为高吞吐量、低延迟的消息传递系统,支持实时数据流处理。Kafka 主要用于构建实时数据管道和流应用,...
标题中提到的“Kafka源码解析与实战”指向了关于Apache Kafka这个开源流处理平台的深入分析和应用实践。Apache Kafka是一个分布式流处理平台,最初是由LinkedIn公司开发,并于2011年开源。它主要用于构建实时数据...
《Kafka源码解析及实战》是一本专为深度学习Apache Kafka的读者设计的教材,旨在帮助读者深入了解Kafka的工作原理及其内部机制。通过源码级别的解析,读者可以更好地掌握Kafka在分布式消息系统中的核心功能和设计...
kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala
Kafka源码解析与实战.zip
apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社
以下是对Kafka源码设计与实现的一些关键知识点的详细说明: 1. **Kafka架构**:Kafka是一个分布式的消息中间件,主要由Producers(生产者)、Brokers(代理服务器)、Consumers(消费者)和Topics(主题)组成。...
Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。
《深入理解Kafka-clients源码》 Kafka-clients是Apache Kafka的重要组成部分,它提供了与Kafka集群交互的API,使得开发者能够构建基于Kafka的应用程序。在2.*版本中,Kafka-clients进行了多方面的优化和改进,提升...
《Kafka技术内幕:图文详解Kafka源码设计与实现》是郑奇煌在2017年11月出版的一本深入解析Apache Kafka的技术专著。这本书详细介绍了Kafka的核心概念、工作原理以及源码分析,旨在帮助读者理解并掌握这个分布式流...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
Apache Kafka源码剖析 PDF较大,分6份上传!一起解压即可。