`
flychao88
  • 浏览: 753146 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

跟我学Kafka源码之LogManager分析

 
阅读更多

一、kafka文件系统存储机制

 

1、partition存储结构说明

假如kafka集群中只有一个broker,数据文件目录为message-folder,例如创建一个topic名称为:testKJ1,  partitions=4

存储路径和目录规则为:

xxx/message-folder

                              |--testKJ1-0

                              |--testKJ1-1

                              |--testKJ1-2

                              |--testKJ1-3

 

2、partition文件存储方式

 

 partiton文件存储结构分析,每个partition会有一个巨大文件消息数据被平均分配到多个文件大小相等的文件中。即相当于一个大文件被切成很多相等大小的文件段segment file。

 

 二、LogManager源码分析

在LogManager中会管理broker中的所有topic的partition,每个partition会产生一个log日志文件,也就是将partition信息持续久到磁盘上。

 

@threadsafe
class LogManager(val logDirs: Array[File],
                 val topicConfigs: Map[String, LogConfig],
                 val defaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 ioThreads: Int,
                 val flushCheckMs: Long,
                 val flushCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 scheduler: Scheduler,
                 val brokerState: BrokerState,
                 private val time: Time) extends Logging {
  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  val LockFile = ".lock"
  val InitialTaskDelayMs = 30*1000
  private val logCreationOrDeletionLock = new Object
  private val logs = new Pool[TopicAndPartition, Log]()
  //创建有效的log目录
  createAndValidateLogDirs(logDirs)
  //创建的时候加锁保证一致性
  private val dirLocks = lockLogDirs(logDirs)
  设置每个文件的offset,以保证文件的恢复检查
  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
  //恢复并且载入所给定目录的log对象,如果有子目录,子目录中的文件就是Segment。
  loadLogs()

  // public, so we can access this from kafka.admin.DeleteTopicTest
  val cleaner: LogCleaner =
    if(cleanerConfig.enableCleaner)
      new LogCleaner(cleanerConfig, logDirs, logs, time = time)
    else
      null
}
  正如注释中说的一样,在创建LogManager对象的时候,将初始下上面代码,比较重要的是loadLogs文件。

 

 

  private def loadLogs(): Unit = {
    info("Loading logs.")

    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
    val jobs = mutable.Map.empty[File, Seq[Future[_]]]

    for (dir <- this.logDirs) {  //将每个子目录load成log,子目录中的文件就是segment文
      val pool = Executors.newFixedThreadPool(ioThreads)
      threadPools.append(pool)

      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)

      if (cleanShutdownFile.exists) {
        debug(
          "Found clean shutdown file. " +
          "Skipping recovery for all logs in data directory: " +
          dir.getAbsolutePath)
      } else {
        // log recovery itself is being performed by `Log` class during initialization
        brokerState.newState(RecoveringFromUncleanShutdown)
      }

      val recoveryPoints = this.recoveryPointCheckpoints(dir).read

      val jobsForDir = for {
        dirContent <- Option(dir.listFiles).toList
        logDir <- dirContent if logDir.isDirectory
      } yield {
        Utils.runnable {
          debug("Loading log '" + logDir.getName + "'")

          val topicPartition = Log.parseTopicPartitionName(logDir.getName)  //从目录名可以解析出topic和partition名
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
          val previous = this.logs.put(topicPartition, current)

          if (previous != null) {
            throw new IllegalArgumentException(
              "Duplicate log directories found: %s, %s!".format(
              current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
          }
        }
      }

      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
    }


    try {
      for ((cleanShutdownFile, dirJobs) <- jobs) {
        dirJobs.foreach(_.get)
        cleanShutdownFile.delete()
      }
    } catch {
      case e: ExecutionException => {
        error("There was an error in one of the threads during logs loading: " + e.getCause)
        throw e.getCause
      }
    } finally {
      threadPools.foreach(_.shutdown())
    }

    info("Logs loading complete.")
  }
 遍历logdirs,每一个dir将会产生一个线程池,并将每一个线程池添加到一个动态数据组(ArrayBuffer)中。

 

val jobsForDir = for {
        dirContent <- Option(dir.listFiles).toList
        logDir <- dirContent if logDir.isDirectory
      } yield {
        Utils.runnable {
          debug("Loading log '" + logDir.getName + "'")

          val topicPartition = Log.parseTopicPartitionName(logDir.getName)  //从目录名可以解析出topic和partition名
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
          val previous = this.logs.put(topicPartition, current)

          if (previous != null) {
            throw new IllegalArgumentException(
              "Duplicate log directories found: %s, %s!".format(
              current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
          }
        }
      }
 遍历listFiles,并且判断logDir目录中是文件还是目录,在遍历的过程中将执行yield,这个语法主要是边遍历边执行一些操作,启动线程异步执行如下步骤:
1、解析topic和partition。

2、获取LogConfig配置信息。
LogConfig构造内容如下:
LogConfig(val segmentSize: Int = Defaults.SegmentSize,                      
                                //默认Segment的大小
                     val segmentMs: Long = Defaults.SegmentMs,              
                     //多长时间会产生一个新的Segment
                     val segmentJitterMs: Long = Defaults.SegmentJitterMs,   
                     val flushInterval: Long = Defaults.FlushInterval,      
                     //将日志刷新到磁盘的间隔时间
                     val flushMs: Long = Defaults.FlushMs,                  
                     //在刷新日志到磁盘之前,日志可以有脏数据的时间
                     val retentionSize: Long = Defaults.RetentionSize,      
                     //日志可以使用的近似总字节数
                     val retentionMs: Long = Defaults.RetentionMs,          
                     //最后一个segment所保留的最大长时间
                     val maxMessageSize: Int = Defaults.MaxMessageSize,     
                     //message消息文件的最大容量
                     val maxIndexSize: Int = Defaults.MaxIndexSize,         
                     //index索引文件的最大容量
                     val indexInterval: Int = Defaults.IndexInterval,       
                     //索引项之间的字节数
                     val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,  
                     //在从文件系统中删除文件的等待时间
                     val deleteRetentionMs: Long = Defaults.DeleteRetentionMs,  
                     //在日志中保留删除标记的时间。只适用于被压缩的日志。
                     val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
                     val compact: Boolean = Defaults.Compact,
                     val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
                     val minInSyncReplicas: Int = Defaults.MinInSyncReplicas)   
                     //如果指定的数低于要求的同步副步数,我们将停止接受-1(全部)的请求确认。
 这个构造方法主要是设置日志文件的一些参数。
3、获取日志恢复读取点
主要是判断offset位置

4、生成Log文件对象
这个对象非常重要,其中最为重要的是logSegments方法我们下面重点分析:
private def loadSegments() {
    // create the log directory if it doesn't exist
    dir.mkdirs()
    
    // 通过检查目录文件,删除临时文件
    // and complete any interrupted swap operations
    for(file <- dir.listFiles if file.isFile) {
      if(!file.canRead)
        throw new IOException("Could not read file " + file)
      val filename = file.getName
      if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
        // if the file ends in .deleted or .cleaned, delete it
        file.delete()
      } else if(filename.endsWith(SwapFileSuffix)) {
        // we crashed in the middle of a swap operation, to recover:
        // if a log, swap it in and delete the .index file
        // if an index just delete it, it will be rebuilt
        /* 我们撞击一个中间的交换操作,恢复:
           如果一个日志,交换它并且删除索引文件
           如果一个索引文件刚刚删除它,将进行重建 */
        val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
        if(baseName.getPath.endsWith(IndexFileSuffix)) {
          file.delete()
        } else if(baseName.getPath.endsWith(LogFileSuffix)){
          // delete the index
          val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
          index.delete()
          // complete the swap operation
          val renamed = file.renameTo(baseName)
          if(renamed)
            info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
          else
            throw new KafkaException("Failed to rename file %s.".format(file.getPath))
        }
      }
    }

    // 二次加载索引文件和日志文件
    for(file <- dir.listFiles if file.isFile) {
      val filename = file.getName
      //清理无效的索引文件
      if(filename.endsWith(IndexFileSuffix)) { 
        // if it is an index file, make sure it has a corresponding .log file
        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
        if(!logFile.exists) {
          warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
          file.delete()
        }
      } else if(filename.endsWith(LogFileSuffix)) {
        // 加载partiton的所有segment文件
        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
        val hasIndex = Log.indexFilename(dir, start).exists
        val segment = new LogSegment(dir = dir, 
                                     startOffset = start,
                                     indexIntervalBytes = config.indexInterval, 
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time)
        if(!hasIndex) { //如果没有找到index文件,则重新rebuild index文件。
          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
          segment.recover(config.maxMessageSize)
        }
        segments.put(start, segment)
      }
    }

    if(logSegments.size == 0) {
      // no existing segments, create a new mutable segment beginning at offset 0
      segments.put(0L, new LogSegment(dir = dir,
                                     startOffset = 0,
                                     indexIntervalBytes = config.indexInterval, 
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time))
    } else {
      recoverLog()
      // reset the index size of the currently active log segment to allow more entries
      activeSegment.index.resize(config.maxIndexSize)
    }

    // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
    for (s <- logSegments)
      s.index.sanityCheck()
  } 
Segment是个逻辑概念,为了防止log文件过大, 将log分成许多的LogSegments
Segment又分为两部分,MessageSet文件和Index文件,分别命名为[base_offset].log和[base_offset].index
base_offset就是该Segment的起始offset,比前一个segment里面的offset都要大。
partion文件存储中segement file组成结构:
A、segment file组成:由2大部分组成,分别为segment data file和segment index file,此2个文件一一对应,成对出现.
B、segment index file索引文件组成结构如下:00000000000000000000.index       文件名称,文件串大小最大支持2^64bit
C、每次记录相应log文件记录的相对条数和物理偏移位置位置,共8bytes
D、4byte 当前segment file offset - last seg file offset记录条数 
E、4byte对应segment file物理偏移地址 position
 说明:
4 byte CRC32:使用crc32算法计算除CRC32这4byte外的buffer。
1 byte “magic":表示数据文件协议版本号
1 byte “attributes":表示标识独立版本,标识压缩类型,编码类型。
key data:可选,可以存储判断或表示这个消息块的元数据信息。
payload data:消息体,该消息体可能会存储多条消息记录,内部是按照序号有序存储的。
在同一个topic下有多个分区,每个分区下面会划分为多个segment文件,只有一个当前文件在写,其他文件只读。
当写满一个文件也就是达到配置的默认值或者达到时间上限则分隔切换文件,新建一个当前文件用来写,老的当前文件切换为只读。文件的命名以起始偏移量来命名。
 
假设testKJ1这个topic下的0-0分区可能有以下这些文件:
 • 00000000000000000000.index
 • 00000000000000000000.log 
 • 00000000000000500000.index 
 • 00000000000000500000.log 
 • 000000000000001000000.index 
 • 000000000000001000000.log
 • 000000000000001500000.index
 • 000000000000001500000.log  

5、分隔segment规则
  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
    val appendInfo = analyzeAndValidateMessageSet(messages)
    
    // if we have any valid messages, append them to the log
    if(appendInfo.shallowCount == 0)
      return appendInfo
      
    // trim any invalid bytes or partial messages before appending it to the on-disk log
    var validMessages = trimInvalidBytes(messages, appendInfo)

    try {
      // they are valid, insert them in the log
      lock synchronized {
        appendInfo.firstOffset = nextOffsetMetadata.messageOffset

        if(assignOffsets) {
          // assign offsets to the message set
          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
          try {
            validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
          } catch {
            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
          }
          appendInfo.lastOffset = offset.get - 1
        } else {
          // we are taking the offsets we are given
          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
            throw new IllegalArgumentException("Out of order offsets found in " + messages)
        }

        // re-validate message sizes since after re-compression some may exceed the limit
        for(messageAndOffset <- validMessages.shallowIterator) {
          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
            // we record the original message set size instead of trimmed size
            // to be consistent with pre-compression bytesRejectedRate recording
            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
          }
        }

        // check messages set size may be exceed config.segmentSize
        if(validMessages.sizeInBytes > config.segmentSize) {
          throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
            .format(validMessages.sizeInBytes, config.segmentSize))
        }


        // maybe roll the log if this segment is full
        val segment = maybeRoll(validMessages.sizeInBytes)

        // now append to the log
        segment.append(appendInfo.firstOffset, validMessages)

        // increment the log end offset
        updateLogEndOffset(appendInfo.lastOffset + 1)

        trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
                .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))

        if(unflushedMessages >= config.flushInterval)
          flush()

        appendInfo
      }
    } catch {
      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
    }
  }
 其中val segment = maybeRoll(validMessages.sizeInBytes)这个方法用来判断是否需要分隔segment,代码如下:
private def maybeRoll(messagesSize: Int): LogSegment = {
    val segment = activeSegment
    if (segment.size > config.segmentSize - messagesSize ||
        segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||
        segment.index.isFull) {
      debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
            .format(name,
                    segment.size,
                    config.segmentSize,
                    segment.index.entries,
                    segment.index.maxEntries,
                    time.milliseconds - segment.created,
                    config.segmentMs - segment.rollJitterMs))
      roll()
    } else {
      segment
    }
  }
 公式如下:
 A、segment.size > config.segmentSize - messagesSize  
    segment的字节数>配置的默认大小-消息字节数大小

B、time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs
   这几个参数在前面有过说明,当前时间-segment创建时间 > 多长时间产生新的segment-最大随机抖动时间

C、segment.index是否已满,这个已满其实就是大于等于maxIndexSize这个参数的值,这个参数在前面已经有过说明
 
  • 大小: 39.6 KB
  • 大小: 79.1 KB
分享到:
评论

相关推荐

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    7. **连接器(Connectors)和流处理(Kafka Streams)**:Kafka Connect允许用户方便地集成其他系统,如数据库,而Kafka Streams则提供了一种在Kafka之上进行流处理的API,使得实时分析和复杂事件处理变得简单。...

    kafka源码分析

    kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala

    基于Kafka的管理系统源码.zip

    基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...

    Kafka技术内幕:图文详解Kafka源码设计与实现 PD

    Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载

    消息队列kafka源码详细讲解分析

    《Kafka消息队列源码深度解析》 Kafka,由LinkedIn开源并被Apache基金会接纳,是一款高吞吐量的分布式消息系统,广泛应用于大数据实时处理、日志收集、流计算等多个领域。本文将深入剖析Kafka的核心概念,以及其...

    apache kafka源码剖析高清part2

    apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社,分为part1和part2,一起下载解压

    Kafka源码解析与实战

    #### 三、Kafka源码分析 ##### 3.1 生产者发送流程 1. **消息序列化**:生产者在发送消息之前,首先需要将消息对象序列化为字节数组。 2. **消息分发**:根据用户指定的分区策略(如轮询、按key哈希等),将消息...

    kafka源码解析新手版本(修正版)

    ### Kafka源码解析新手版本(修正版)知识点详解 #### 一、Kafka诞生背景及其在LinkedIn的应用 **1.1 Apache Kafka项目简介** - **诞生背景:** Apache Kafka最初由LinkedIn开发,随后于2011年初开源,并在2012年...

    Kafka源码解析及实战

    《Kafka源码解析及实战》是一本专为深度学习Apache Kafka的读者设计的教材,旨在帮助读者深入了解Kafka的工作原理及其内部机制。通过源码级别的解析,读者可以更好地掌握Kafka在分布式消息系统中的核心功能和设计...

    Kafka源码解析与实战.zip

    Kafka源码解析与实战.zip

    apache kafka源码剖析高清part1

    apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社

    kafka需要的源码包

    **Kafka 源码分析概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给 Apache 软件基金会。它被设计为高吞吐量、低延迟的消息传递系统,支持实时数据流处理。Kafka 主要用于构建实时数据管道和流应用,...

    Kafka源码解析与实战-高清-完整目录-2018年1月

    标题中提到的“Kafka源码解析与实战”指向了关于Apache Kafka这个开源流处理平台的深入分析和应用实践。Apache Kafka是一个分布式流处理平台,最初是由LinkedIn公司开发,并于2011年开源。它主要用于构建实时数据...

    《Apache Kafka源码剖析》.part5.rar

    Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。

    Kafka技术内幕-图文详解Kafka源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

    apache kafka技术内幕 和 apacke kafka源码分析2本

    apache kafka技术内幕 和 apacke kafka源码分析2本PDF 电子书 网盘下载

    kafka 技术内幕 图文详解Kafka源码设计与实现

    以下是对Kafka源码设计与实现的一些关键知识点的详细说明: 1. **Kafka架构**:Kafka是一个分布式的消息中间件,主要由Producers(生产者)、Brokers(代理服务器)、Consumers(消费者)和Topics(主题)组成。...

    Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌(2017.11).pdf

    这本书详细介绍了Kafka的核心概念、工作原理以及源码分析,旨在帮助读者理解并掌握这个分布式流处理平台。 Kafka是一个高吞吐量、可扩展的开源消息系统,最初由LinkedIn开发,后来成为Apache软件基金会的顶级项目。...

    Kafka源码解析与实战.pdf,超清无水印,有书签导航,精品

    《Kafka源码解析与实战》是一本深入探讨Apache Kafka技术的专业书籍,旨在帮助读者理解Kafka的核心原理、源码实现及其在大数据处理中的实际应用。这本书以清晰无水印的PDF格式呈现,包含了详尽的书签导航,便于读者...

    kafka-clients源码.zip

    本文将通过分析kafka-clients-2.1.0.jar和kafka-clients-2.1.0-sources.jar这两个源码文件,深入探讨其核心知识点。 一、消费者API 1. **消费者组管理**:Kafka消费者通过消费者组来实现负载均衡和故障恢复。消费...

Global site tag (gtag.js) - Google Analytics