好的, 上篇把 kafka.kafka 干的事情解析了一遍, 什么都看不出来, 是的, 什么都看不出来他干了什么。那么这章来电干货。
在kafka中, 主要资源的协调,开始运行时在
class KafkaServer(val config: KafkaConfig) extends Logging
这个类中进行的。
在初始化这个类的时候,他做了一件事情。
val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
咱们来看看 KafkaScheduler的实现
private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() { def newThread(runnable: Runnable): Thread = { val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement) t.setDaemon(isDaemon) t } }) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
看到结果了吧, 其实就是 ScheduledThreadPoolExecutor, kafka 初始化了一个单线程的 ScheduledThreadPoolExecutor 而且名字叫做 “kafka-logcleaner-”
初始化完成了, 咱们看看 startup 方法里有些什么猫腻。
isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) var needRecovery = true val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) if (cleanShutDownFile.exists) { needRecovery = false cleanShutDownFile.delete }
首先是 shutdown 的判断吧之类的操作, 很巧妙,使用了一个文件来表示运行状态。
logManager = new LogManager(config, scheduler, SystemTime, 1000L * 60 * 60 * config.logRollHours, 1000L * 60 * config.logCleanupIntervalMinutes, 1000L * 60 * 60 * config.logRetentionHours, needRecovery)
他new 了一个叫做, logManager 的东西,是的,了解kafka 的人都知道,kafka 是全磁盘操作,message全放磁盘上,此类用于磁盘io的操作。相当关键,咱们看一下。
for(dir <- subDirs) { if(!dir.isDirectory()) { warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { info("Loading log '" + dir.getName() + "'") val topic = Utils.getTopicPartition(dir.getName)._1 val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery) val topicPartion = Utils.getTopicPartition(dir.getName) logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]()) val parts = logs.get(topicPartion._1) parts.put(topicPartion._2, log) } }
首先, 他获取了topic们, 获取一些系统属性,把topic 放到 名叫 logs 的一个 pool 中, new Log 的作用是 加载 目录topic 中的log 信息到内存中。
在log 对象之中,用LogSegment 抽象了 log 的分段,因为 topic 是有 partition 的。
/* The actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments()
这个 segments 搜有的加在一起就是一个完整的 topic。
然后是按照 logsegment 的start 排个序,做个验证,完事。
接着, 把 各个topic 信息放到内存中之后,开始用
if(scheduler != null) { info("starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) }
来定时 按照 config.logCleanupIntervalMinutes 配置的分钟做一些事情。做什么事情呢, 清空一下旧的log,按照两种标准清空, 一个是超过一定时间的log,还有一个是超过大小的log。
下面到了跟zk交互的阶段
if(config.enableZookeeper) { kafkaZookeeper = new KafkaZooKeeper(config, this) kafkaZookeeper.startup zkActor = new Actor { def act() { loop { receive { case topic: String => try { kafkaZookeeper.registerTopicInZk(topic) } catch { case e => error(e) // log it and let it go } case StopActor => info("zkActor stopped") exit } } } } zkActor.start }
跟zk交互的过程包括创建以下path:
* /topics/[topic]/[node_id-partition_num]
* /brokers/[0...N] --> host:port
订阅事件
总结一下,磁盘部分在 broker 初始化的时候,加载topic 信息到内存, 定期清理以下log, 跟zk做一些注册,订阅事件。
下回咱们看下,初始化的时候, 是网络连接用的什么神奇的东西。
相关推荐
**Kafka Tool 连接 Kafka 工具详解** 在大数据处理和实时流处理领域,Apache Kafka 是一个不可或缺的组件,它作为一个分布式的消息中间件,提供高效、可扩展且可靠的发布订阅服务。为了方便管理和操作 Kafka 集群,...
**Kafka Tool:高效管理Apache Kafka集群的利器** Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而...
**Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...
在Spring Boot应用中,我们可以利用Spring Kafka框架来与Apache Kafka进行集成,实现高效的消息传递。本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:...
本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...
在IT行业中,Kafka是一种广泛使用的分布式流处理平台,它由Apache软件基金会开发,主要用于构建实时数据管道和流应用。本文将围绕标题和描述中提到的两种Kafka工具——kafkatool-64bit.exe和kafka-eagle-bin-1.4.6....
Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...
【Kafka基础知识】 Kafka是由Apache开发的分布式流处理平台,它主要被设计用来处理实时数据流。在大数据处理领域,Kafka常被用于构建实时数据管道和流应用,能够高效地处理大量的实时数据。 【Java与Kafka的结合】...
**Kafka详细课程讲义** 本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache...
**Kafka Tool for Linux: 管理与使用Apache Kafka集群的高效工具** Apache Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用。Kafka Tool是针对Kafka集群进行管理和操作的一款图形用户界面(GUI)工具...
**Kafka概述** Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源消息系统,它是一个高性能、可扩展的分布式消息中间件。Kafka最初设计的目标是处理网站活动流数据,但随着时间的发展,它已被广泛应用于...
《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
**Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...
**Kafka 2.5.1 知识点详解** Kafka 是一个分布式流处理平台,由 Apache 软件基金会开发,广泛应用于大数据实时处理、日志收集、消息系统等多个领域。`kafka_2.12-2.5.1` 是 Kafka 的一个特定版本,针对 Scala 2.12 ...
在IT行业中,网络通信和大数据处理是两个至关重要的领域,Netty和Kafka分别是这两个领域的佼佼者。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka...
Kafka Tool 2.0.4是一款专为Kafka设计的强大的客户端工具,尤其适用于Mac操作系统。它提供了一种直观且可视化的界面,让用户能够轻松地连接到Kafka服务并进行各种操作,包括但不限于管理Topic、监控集群状态以及进行...
### 关于Kafka资源下载kafka_2.11-2.0.0.tgz的知识点 #### Kafka简介 Apache Kafka是一种开源的消息队列服务,它最初由LinkedIn开发,并于2011年成为Apache软件基金会的一个顶级项目。Kafka因其高性能、可扩展性和...
Kafka the Definitive Guide Kafka 是一个分布式流媒体平台,用于构建实时数据处理和流媒体处理系统。下面是 Kafka 的一些重要知识点: 1. Kafka 概述 Kafka 是一个基于发布/订阅模式的消息队列系统,由 LinkedIn...
**Kafka Tool for Mac: 管理与使用Apache Kafka集群的理想选择** Kafka Tool是一款专为Apache Kafka设计的强大管理工具,尤其适用于Mac用户。它提供了直观的图形用户界面(GUI),使得对Kafka集群的操作变得简单易...