最近在研究kafka,本着先理清框架脉络,再看细节实现的想法,先抱着文档一阵猛看,本来以为Coordinator和Controller的流程基本一样,选举一个Coordinator为主来接收Consumer的分配。哪知后来看了下源码,坑爹呢,选举去哪了:
KafkaServer.scala
/* start kafka coordinator */ consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager) consumerCoordinator.startup()
GroupCoordinator.scala
/** * Startup logic executed at the same time when the server starts up. */ def startup() { info("Starting up.") heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) isActive.set(true) info("Startup complete.") }
服务端启动时Coordinator只启动了两个线程,一个处理心跳检测,一个处理Consumer加入,百思不得其解,然后给Guozhang Wang(Kafka开发人员之一)发了封邮件请教,才理清了来龙去脉,因此记录一下相关代码流程。
Coordinator是kafka负责consumer负载均衡,也就是你所订阅的Topic的Partition由哪个consumer消费的分配事项。具体介绍请参考以下篇文章:
http://www.infoq.com/cn/articles/kafka-analysis-part-4
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
上图参考:http://blog.daich.org/2016/02/15/kafka-consumer-0.9/
如本文开头代码所示,随着Kafka服务端的启动,Coordinator也随之启动,但是并没有Coordinator leader的选举过程,因为对于服务端来说,每一个服务端都有一个Coordinator,它们不区分leader/follower而同时工作,各自管理一部分Consumer group。这样一来,Coordinator的负载均衡也就涉及到了两个方面,一方面是Coordinator自已,哪个Coordinator负责哪个group(上图第3步实现),一方面是Consumer,哪个Partition分配给同一group的中哪个consumer(上图中第5步实现)。
具体来说,Coordinator方面,由Consumer根据之前获得的Topic的Metadata信息,向服务端发起GroupCoordinatorRequest请求,服务端收到此请求后在KafkaApi.scala中进行处理:
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
... ...
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
partitionMetadata => partitionMetadata.leader
}
val responseBody = coordinatorEndpoint match {
case None =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
case Some(endpoint) =>
new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
}
... ...
}
}
关键点在coordinator.partitionFor(groupCoordinatorRequest.groupId),这个方法最终调用GroupMetadataManager.scala中的:
相关推荐
- **Exactly Once (恰好一次)**: Kafka 并未严格实现此语义。理论上,这意味着消息仅发送一次且被正确处理一次。然而,Kafka 认为此种策略在大多数应用场景中没有必要,因为额外的复杂性和性能开销往往不值得。 ###...
赠送jar包:flink-connector-kafka-0.9_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-0.9_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-0.9_2.11-1.10.0-sources.jar; 赠送Maven依赖...
在Kafka 0.9中,对主题分区的管理也有所改进,包括动态调整分区数量、更好的故障转移和负载均衡。 10. **监控和管理工具** Kafka 0.9版本提供了更好的管理和监控工具,如Kafka Manager,可以帮助管理员监控集群...
kafka, 负载均衡,恢复 Kafka 消费,支持 Zookeeper KafkaKafka,工具和示例应用程序构建在 sarama插件包之上。库收费:基于 web sphere 支持负载平衡和偏移持久性的分布式 Kafka 使用者,支持负载平衡和偏移持久性...
Kafka 0.9 API的样例程序该项目提供了一个简单但现实的卡夫卡生产者和消费者的例子。 这些程序以一种样式和一个比例尺编写,使您可以对其进行调整以使它们接近生产样式。 带有0.9.0的新Kafka API缺少大量示例,这很...
- **分区分配**:消费者组内的消费者会自动分配不同的分区,确保每个分区被一个且仅有一个消费者消费,实现负载均衡。 - **容错性**:如果某个消费者宕机,其负责的分区会被其他消费者接管,确保消息不会丢失。 - ...
在vivo公司,Kafka的负载均衡是其大规模集群运维中的关键环节。文档主要介绍了Kafka负载均衡的实践,特别是借助Cruise Control这一自动化运维工具来优化集群管理。Cruise Control不仅包含服务的上下线管理,还涉及...
总结,Kafka 2.10-0.9.0.1 的源码分析涵盖了 Mirror Maker 工具的实现,多机房消息异步处理的架构设计,以及 Kafka 内部组件的工作原理。深入研究这些源码将帮助我们更好地理解和优化 Kafka 系统,为大数据实时处理...
《Apache Kafka 0.9.0.1:构建实时数据管道》 Apache Kafka 是一个分布式流处理平台,被广泛用于构建实时数据管道和流应用程序。这个名为 "kafka_2.11-0.9.0.1" 的压缩包包含了 Kafka 的一个特定版本——0.9.0.1,...
通过深入研究Kafka 0.9.0.0的源代码,我们可以更深入地理解其设计理念和实现细节,这对于开发、优化和维护Kafka集群具有极大的指导意义。无论是开发者、运维人员还是数据工程师,都能从中受益匪浅。
在 0.9 版本中,引入了消费者组(Consumer Group)的概念,使得多个消费者可以协作消费一个主题,实现了负载均衡和容错能力。 3. **主题与分区(Topics and Partitions)**:主题是逻辑上的分类,而分区是物理上的...
Kafka 0.9引入了消费者小组的概念,允许动态平衡分区的分配。 三、Kafka 配置 1. **Kafka 服务器配置 (Broker Configs)**:Kafka服务器的配置涉及到诸如broker.id、num.partitions、replication.factor等参数,...
Kafka是一个分布式流处理平台,最初由LinkedIn公司开发,后来成为Apache软件基金会的一个顶级项目。Kafka主要用于构建实时数据管道和流处理应用程序。它能够高效地处理高吞吐量的数据流,并且具有很好的可扩展性、...
赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...
Kafka是一个分布式流处理平台,最初由LinkedIn公司开发,后来成为Apache软件基金会的一个顶级项目。Kafka主要用于构建实时数据管道和流处理应用程序。它能够高效地处理高吞吐量的数据流,并且具有很好的可扩展性、...
标题中的"kafka_2.11-0.9.0.0.tgz"是指Apache Kafka的一个特定版本,即0.9.0.0版本,它适用于Scala 2.11构建。Kafka是一个分布式流处理平台,常被用作一个高效的消息队列系统,能够处理大量的实时数据。这个tgz文件...
这种设计实现了负载均衡和容错。 5. **offset管理**:消费者维护自己的消费位置(offset),用于追踪消费进度。消费者可以提交offset到Kafka,以便在故障恢复时继续从上次的位置消费。 6. **高吞吐量**:Kafka设计...
kafka-clients-0.9.0.0.jar