`
flychao88
  • 浏览: 753199 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

跟我学Kafka源码之Broker Server

 
阅读更多

本章,我们将进入到Kafka的核心类中进行代码走读,深入分析他的存储交互和消息分发原理。

首先给大家展示一张服务端交互图,因为比较复杂我就没有再画,转发别人的一张图以供参考:

 大家看完这个图以后相信有了一个整体认识,那么下面我们就重点从整体到细节的逐步分解。

 

一、KafkaServerStartable

在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装

我们使用命令行./kafka-server-start.sh -daemon ../config/server.properties 进行启动的时候,也是调用的这个类。

 

class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
  private val server = new KafkaServer(serverConfig)

  def startup() {
    try {
      server.startup()
      AppInfo.registerInfo()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
        System.exit(1)
    }
  }

  def shutdown() {
    try {
      server.shutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
        System.exit(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte) {
    server.brokerState.newState(newState)
  }

  def awaitShutdown() = 
    server.awaitShutdown

}

 

二、Kafka Server类

在KafkaServerStartable中启动了KafkaServer,这代表一个kafka broker, 是kafka的核心,默认的情况下一个KafkaServer就是一个broker。

只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析。

 

  this.logIdent = "[Kafka Server " + config.brokerId + "], "
  private var isShuttingDown = new AtomicBoolean(false)
  private var shutdownLatch = new CountDownLatch(1)
  private var startupComplete = new AtomicBoolean(false)
  val brokerState: BrokerState = new BrokerState
  val correlationId: AtomicInteger = new AtomicInteger(0)
  var socketServer: SocketServer = null
  var requestHandlerPool: KafkaRequestHandlerPool = null
  var logManager: LogManager = null
  var offsetManager: OffsetManager = null
  var kafkaHealthcheck: KafkaHealthcheck = null
  var topicConfigManager: TopicConfigManager = null
  var replicaManager: ReplicaManager = null
  var apis: KafkaApis = null
  var kafkaController: KafkaController = null
  val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  var zkClient: ZkClient = null

 

   这个些类的初始化和启动代码如下,整个KafkaServer其实就是在初始和启动任务。

def startup() {
    try {
      info("starting")
      brokerState.newState(Starting)
      isShuttingDown = new AtomicBoolean(false)
      shutdownLatch = new CountDownLatch(1)

      /* start scheduler */
      kafkaScheduler.startup()
    
      /* setup zookeeper */
      zkClient = initZk()

      /* start log manager */
      logManager = createLogManager(zkClient, brokerState)
      logManager.startup()

      socketServer = new SocketServer(config.brokerId,
                                      config.hostName,
                                      config.port,
                                      config.numNetworkThreads,
                                      config.queuedMaxRequests,
                                      config.socketSendBufferBytes,
                                      config.socketReceiveBufferBytes,
                                      config.socketRequestMaxBytes,
                                      config.maxConnectionsPerIp,
                                      config.connectionsMaxIdleMs,
                                      config.maxConnectionsPerIpOverrides)
      socketServer.startup()

      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)

      /* start offset manager */
      offsetManager = createOffsetManager()

      kafkaController = new KafkaController(config, zkClient, brokerState)
    
      /* start processing requests */
      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
      brokerState.newState(RunningAsBroker)
   
      Mx4jLoader.maybeLoad()

      replicaManager.startup()

      kafkaController.startup()
    
      topicConfigManager = new TopicConfigManager(zkClient, logManager)
      topicConfigManager.startup()
    
      /* tell everyone we are alive */
      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
      kafkaHealthcheck.startup()

    
      registerStats()
      startupComplete.set(true)
      info("started")
    }

说明:

 

1、KafkaScheduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

2、由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

3、LogManager是kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据。

4、replicaManager是partition的备份分区管理。

5、broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的。

6、offsetManager是定期清除过期的offset数据,即compact操作,以及consumer相关的一些offset操作。

7、kafkaController是为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

8、Kafka apis调用handler的事件类型,进行相关的事件的处理。

请求的事件类型共有ProduceKey, FetchKey, OffsetsKey, MetadataKey, LeaderAndIsrKey, StopReplicaKey。包括我们之前说的producer和consumer与服务端的通信,很多就是通过这些key来获取的。
UpdateMetadataKey, ControlledShutdownKey, OffsetCommitKey, OffsetFetchKey

9、TopicConfigManager用于处理topic config的change。

  1. ./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1  

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

10、KafkaHealthcheck这个只是心跳检查机制。

 

接下来的博客将分别对这9个启动项进行逐一介绍。

 

分享到:
评论

相关推荐

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    基于Kafka的管理系统源码.zip

    基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...

    Kafka技术内幕:图文详解Kafka源码设计与实现 PD

    Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载

    kafka需要的源码包

    **Kafka 源码分析概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给 Apache 软件基金会。它被设计为高吞吐量、低延迟的消息传递系统,支持实时数据流处理。Kafka 主要用于构建实时数据管道和流应用,...

    Kafka源码解析与实战

    ### Kafka源码解析与实战 #### 一、Kafka简介 Kafka是由Apache软件基金会开发的一款开源流处理平台,主要用于构建实时数据管道以及基于流的数据处理应用。它以分布式的方式运行,具有高吞吐量、低延迟的特点,适用...

    apache kafka源码剖析高清part2

    apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社,分为part1和part2,一起下载解压

    kafka源码解析新手版本(修正版)

    ### Kafka源码解析新手版本(修正版)知识点详解 #### 一、Kafka诞生背景及其在LinkedIn的应用 **1.1 Apache Kafka项目简介** - **诞生背景:** Apache Kafka最初由LinkedIn开发,随后于2011年初开源,并在2012年...

    Kafka源码解析与实战-高清-完整目录-2018年1月

    标题中提到的“Kafka源码解析与实战”指向了关于Apache Kafka这个开源流处理平台的深入分析和应用实践。Apache Kafka是一个分布式流处理平台,最初是由LinkedIn公司开发,并于2011年开源。它主要用于构建实时数据...

    Kafka源码解析及实战

    《Kafka源码解析及实战》是一本专为深度学习Apache Kafka的读者设计的教材,旨在帮助读者深入了解Kafka的工作原理及其内部机制。通过源码级别的解析,读者可以更好地掌握Kafka在分布式消息系统中的核心功能和设计...

    kafka源码分析

    kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala

    Kafka源码解析与实战.zip

    Kafka源码解析与实战.zip

    apache kafka源码剖析高清part1

    apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社

    kafka 技术内幕 图文详解Kafka源码设计与实现

    以下是对Kafka源码设计与实现的一些关键知识点的详细说明: 1. **Kafka架构**:Kafka是一个分布式的消息中间件,主要由Producers(生产者)、Brokers(代理服务器)、Consumers(消费者)和Topics(主题)组成。...

    《Apache Kafka源码剖析》.part5.rar

    Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。

    kafka-clients源码.zip

    《深入理解Kafka-clients源码》 Kafka-clients是Apache Kafka的重要组成部分,它提供了与Kafka集群交互的API,使得开发者能够构建基于Kafka的应用程序。在2.*版本中,Kafka-clients进行了多方面的优化和改进,提升...

    Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌(2017.11).pdf

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是郑奇煌在2017年11月出版的一本深入解析Apache Kafka的技术专著。这本书详细介绍了Kafka的核心概念、工作原理以及源码分析,旨在帮助读者理解并掌握这个分布式流...

    Kafka技术内幕-图文详解Kafka源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

    《Apache Kafka源码剖析》.part2.rar

    Apache Kafka源码剖析 PDF较大,分6份上传!一起解压即可。

Global site tag (gtag.js) - Google Analytics