logManager类:管理kafka数据log的类,包括数据clean,flush等操作
Log类:每个tplog的对象
logSegment:每个tplog目录下的文件对象
filemessageSet:每个log file的管道类
base offset:在topic中的绝对offset值
offsetindex:每个log index的管道map类,存储相对offset值和文件position
按照partition分区topic,分发到各个机子上
partition上有多个log文件,每个log文件一个索引文件
log文件是实际的数据,索引文件是log文件里数据的相对偏移量和在log文件里的position,偏移量offset是一段数据生成一个offset,避免offset文件过大
1.初始化:
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicAndPartition, Log]()//所有log的对象,一个topicpartition 一个log对象 //获得log文件,并获得文件channel锁 createAndValidateLogDirs(logDirs) private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap //遍历所有的log,生成Log对象,并且执行log clean(checkposition) loadLogs()
主要方法loadLogs:
if (cleanShutdownFile.exists) {//表示上次关闭kafka时,已经clean完,这次不需要clean debug( "Found clean shutdown file. " + "Skipping recovery for all logs in data directory: " + dir.getAbsolutePath) } else { // log recovery itself is being performed by `Log` class during initialization brokerState.newState(RecoveringFromUncleanShutdown) } //获得log下recover文件 val recoveryPoints = this.recoveryPointCheckpoints(dir).read val jobsForDir = for { dirContent <- Option(dir.listFiles).toList logDir <- dirContent if logDir.isDirectory } yield { Utils.runnable { debug("Loading log '" + logDir.getName + "'") //从文件目录上获得topic和partition val topicPartition = Log.parseTopicPartitionName(logDir.getName) //从map中获得topic的自定义config,如果 val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) val previous = this.logs.put(topicPartition, current) //判断是否有重复的topic+partition if (previous != null) { throw new IllegalArgumentException( "Duplicate log directories found: %s, %s!".format( current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } } //对每个logDir执行 上边的runnable,生成Log对象添加到log pool中 jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
其中new Log方法,为初始化log file和index
主方法:loadSegments
1.处理swap文件,log则重新加载(rename),index则删除
2.加载log和index,恢复不存在的index
private def loadSegments() { // create the log directory if it doesn't exist dir.mkdirs() // first do a pass through the files in the log directory and remove any temporary files // and complete any interrupted swap operations for(file <- dir.listFiles if file.isFile) { if(!file.canRead) throw new IOException("Could not read file " + file) val filename = file.getName if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { // if the file ends in .deleted or .cleaned, delete it file.delete() } else if(filename.endsWith(SwapFileSuffix)) {//文件用于swap时候,恢复log // we crashed in the middle of a swap operation, to recover: // if a log, swap it in and delete the .index file // if an index just delete it, it will be rebuilt //如果是index则删除,如果是log则重新加载(重命名),并删除已经存在的index val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, "")) if(baseName.getPath.endsWith(IndexFileSuffix)) { file.delete() } else if(baseName.getPath.endsWith(LogFileSuffix)){ // delete the index val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) index.delete() // complete the swap operation val renamed = file.renameTo(baseName) if(renamed) info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath)) else throw new KafkaException("Failed to rename file %s.".format(file.getPath)) } } } // now do a second pass and load all the .log and .index files for(file <- dir.listFiles if file.isFile) { val filename = file.getName if(filename.endsWith(IndexFileSuffix)) { // if it is an index file, make sure it has a corresponding .log file 查看index log是否对应的 log,如果没有则删除 val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) if(!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) file.delete() } } else if(filename.endsWith(LogFileSuffix)) { // if its a log file, load the corresponding log segment // 文件名是start offset val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong val hasIndex = Log.indexFilename(dir, start).exists //建立tplog中 每个日志文件对象 logsegment,包含filemessage,offsetindex,baseoffset值 val segment = new LogSegment(dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time) if(!hasIndex) { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) //重建index文件和内存索引,文件和内存索引是用的channel map机制 segment.recover(config.maxMessageSize) } segments.put(start, segment) } } if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 segments.put(0L, new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time)) } else { recoverLog() // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) } // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment for (s <- logSegments) s.index.sanityCheck() }
-----------------------------初始化完毕---------------------------------
startup方法中三个功能:
1.cleanupLogs
2.flushDirtyLogs
3.checkpointRecoveryPointOffsets
1.cleanupLogs
两个方法一个是超时(超时是modify时间),一个是大小(大小是最老的小于diff)
private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds //参数为log manager开始时间-tplog的修改时间 和 配置retention时间 比较,超过则需要删除,返回true //删除的是最后一次修改时间超过retention time的 log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) }
/** * 删除规则,是tplog超过阈值,从最老的开始找,找到file的大小小于diff的时候删除 * 如果当前log file大小大于diff,则停止(原则是等最后一个文件可删除) * Runs through the log removing segments until the size of the log * is at least logRetentionSize bytes in size */ private def cleanupSegmentsToMaintainSize(log: Log): Int = { if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) return 0//当配置小于0,或log大小小于配置 var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) {//如果需要删除的大小 大于或等于 logfile,则返回true diff -= segment.size true } else { false } } log.deleteOldSegments(shouldDelete) }
参数:
清理日志,距离上次修改时间大于config时间,则删除
val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))
log clean参数,达到log大小上限,log的position
val logRetentionBytes = props.getLong("log.retention.bytes", -1)
def deleteOldSegments(predicate: LogSegment => Boolean): Int = { // find any segments that match the user-supplied predicate UNLESS it is the final segment // and it is empty (since we would just end up re-creating it val lastSegment = activeSegment //超时,并且包含segment,则删除,获得删除list segment val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) val numToDelete = deletable.size if(numToDelete > 0) { lock synchronized { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first if(segments.size == numToDelete) roll() // remove the segments for lookups deletable.foreach(deleteSegment(_))//从segment集合中移除,修改文件名称为delete结尾,并异步删除 } } numToDelete }
2.flushDirtyLogs
flush的message条数和时间间隔 /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs) /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ private def flushDirtyLogs() = { debug("Checking for dirty logs to flush...") for ((topicAndPartition, log) <- logs) { try { val timeSinceLastFlush = time.milliseconds - log.lastFlushTime debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs + " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) if(timeSinceLastFlush >= log.config.flushMs) log.flush } catch { case e: Throwable => error("Error flushing topic " + topicAndPartition.topic, e) } } } @threadsafe def flush() { LogFlushStats.logFlushTimer.time { log.flush() index.flush() } }
3.checkpointRecoveryPointOffsets
checkpointRecoveryPointOffsets,标记logdir上的恢复点,避免启动时,需要恢复所有log,生成index
是按照logdir遍历,logdir中包含多个tplog
/** * Make a checkpoint for all logs in provided directory. */ private def checkpointLogsInDir(dir: File): Unit = { //获得当前dir的所有tplog,value:Map【TopicAndPartition, Log】 val recoveryPoints = this.logsByDir.get(dir.toString) if (recoveryPoints.isDefined) { //mapValues重新生成map的value,write参数(topicAndPartition:recoverPoint); //write将tplog的offset写入recover文件的tmp文件中,删除旧文件,rename为recover文件 _是Log对象(value) this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) } }
logmanager里实现log compact功能
if(cleanerConfig.enableCleaner) cleaner.startup()//log compact
相关推荐
Kafka 存储机制详解 Kafka 是一个分布式、分区的、多副本的、多订阅者、基于 zookeeper 协调的分布式日志系统,也可以当做 MQ 系统。它常见用于 web/nginx 日志、访问日志、消息服务等等。下面将从 Kafka 文件存储...
### Flume采集数据到Kafka,然后从Kafka取数据存储到HDFS的方法思路和完整步骤 #### 一、概述 随着大数据技术的发展,高效的数据采集、处理与存储变得尤为重要。本文将详细介绍如何利用Flume采集日志数据,并将其...
Kafka接收Flume数据并存储至HDFS Kafka是Apache软件基金会下的一个开源流处理平台,由LinkedIn开发,现已捐赠给Apache软件基金会。Kafka提供高吞吐量、持久性、可扩展性和容错性等特点,使其成为大数据处理的首选...
**Kafka Tool 连接 Kafka 工具详解** 在大数据处理和实时流处理领域,Apache Kafka 是一个不可或缺的组件,它作为一个分布式的消息中间件,提供高效、可扩展且可靠的发布订阅服务。为了方便管理和操作 Kafka 集群,...
标题"使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据"揭示了这个项目的核心内容:通过Netty接收TCP长连接的数据,并将这些数据存储到Kafka中,同时利用Kafka的批量消费功能对数据进行处理。下面我们将...
本资源"java_kafka交互工具类.rar"提供了一套Java集成Kafka的工具类,帮助开发者便捷地进行单机或集群环境下的Kafka操作。以下将详细介绍其中可能包含的关键知识点: 1. **Kafka基本概念** - **主题(Topic)**:...
**大数据采集技术与Kafka的消息存储机制** 大数据采集技术在当今的信息时代中扮演着至关重要的角色,它涉及从各种来源收集、处理和分析大量数据。Kafka作为一个高效、可扩展的分布式流处理平台,尤其在大数据采集和...
**Kafka Tool:高效管理Apache Kafka集群的利器** Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而...
* 分区(Partition):Kafka 中的分区机制,用于分布式存储和处理消息。 * 生产者(Producer):Kafka 中的生产者角色,用于发送消息。 * 消费者(Consumer):Kafka 中的消费者角色,用于消费消息。 * Broker:...
该软件包包括Kafka的核心组件,如Kafka生产者和消费者API,Kafka协调器,Kafka存储层等。它还包括一些有用的工具和库,如Kafka Connect和Kafka Streams等,可以帮助用户更轻松地使用Kafka,并提供更丰富的功能。 ...
7. **安全支持**:如果Kafka集群启用了SASL/SSL或Kerberos等安全机制,Kafka Tool也能很好地与之兼容,确保管理操作的安全性。 8. **命令行集成**:虽然Kafka Tool提供了一个直观的UI,但它也支持通过命令行执行...
2. **报警机制**:当监控的指标超过预设阈值时,Kafka-Eagle可以触发报警,帮助及时发现并解决问题。 3. **可视化管理**:通过图表和仪表板展示数据,使集群状态一目了然,方便管理和决策。 4. **操作便捷**:提供...
在Kafka中,数据是以主题(Topic)的形式存储的。主题可以被分成多个分区(Partition),每个分区可以有多个副本(Replica)以实现数据冗余和高可用性。 【多线程与Kafka】 在"Kafka-java-demo"中,你可能会看到...
《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...
3. **创建消费者配置类**:创建一个配置类,使用`@Configuration`和`@EnableKafka`注解启用Kafka消费者。这里可以定义消费者的属性,如key和value的序列化方式。 ```java @Configuration @EnableKafka public class...
1. **Broker**: Kafka集群中的服务器节点,负责存储和处理消息。 2. **Topic**: 消息的分类,可以理解为数据库中的表。 3. **Partition**: 为了提高并行处理能力,每个主题被分为多个分区,每个分区在不同的broker上...
4. **报警机制**:当监控指标超过预设阈值时,Kafka-Eagle 可以触发报警,及时通知管理员处理问题。 5. **开源免费**:Kafka-Eagle 是一款开源软件,社区活跃,不断更新和改进,适应不断变化的监控需求。 在部署 ...
**Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...
3. **容错性**:通过副本机制,Kafka能够自动处理节点故障,确保服务的连续性和数据的一致性。 4. **实时处理**:Kafka支持实时数据处理,消息一旦被发布就会立即可供消费,无需等待批处理。 5. **可伸缩性**:...