`

kafka broker宕机&leader选举

阅读更多

 

broker change是由BrokerChangeListener监听类,监听/brokers/ids下得brokerid

BrokerChangeListener的handleChildChange

将新的死的broker交由controller管理

class BrokerChangeListener() extends IZkChildListener with Logging {
    this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
    def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
      info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
      inLock(controllerContext.controllerLock) {
        if (hasStarted.get) {
          ControllerStats.leaderElectionTimer.time {
            try {
              val curBrokerIds = currentBrokerList.map(_.toInt).toSet
              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
              val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
              controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
              info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
              //将变化的broker,交由controller管理
              if(newBrokerIds.size > 0)
                controller.onBrokerStartup(newBrokerIds.toSeq)
              if(deadBrokerIds.size > 0)
                controller.onBrokerFailure(deadBrokerIds.toSeq)
            } catch {
              case e: Throwable => error("Error while handling broker changes", e)
            }
          }
        }
      }
    }

 

kafkacontroller的onBrokerFailure

def onBrokerFailure(deadBrokers: Seq[Int]) {
    info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
    val deadBrokersThatWereShuttingDown =
      deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
    info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
    val deadBrokersSet = deadBrokers.toSet
    // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
    // 先处理死掉的broker上的leader的tp
    val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
      deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
        !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
    //下线tp,没有meta信息更新,不发送
    partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
    // trigger OnlinePartition state changes for offline or new partitions
    //上线选举leader,并上线,更新各个broker meta信息
    partitionStateMachine.triggerOnlinePartitionStateChange()
    // filter out the replicas that belong to topics that are being deleted
    var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
    val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
    // handle dead replicas 处理死掉的replica(所有)
    replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
    // check if topic deletion state for the dead replicas needs to be updated
    val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
    if(replicasForTopicsToBeDeleted.size > 0) {
      // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
      // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
      // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
      deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
    }
  }

   

  PartitionStateMachine的triggerOnlinePartitionStateChange,调用方法是将新的leader上线

  调用electLeaderForPartition

     val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
        val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition,
          leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
        newLeaderAndIsr = leaderAndIsr
        newLeaderAndIsr.zkVersion = newVersion
        
        // update the leader cache
      controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
      stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
                                .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
      val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
      // store new leader and isr info in cache 更新broker元数据信息 updateMetadataRequestMap
      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
        newLeaderIsrAndControllerEpoch, replicas)

   partitionStateMachine的leaderselect变量OfflinePartitionLeaderSelector

选举broker上的tp的新leader

          case false =>
            //选择在线broker中得replica的第一个作为leader
            val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
            val newLeader = liveReplicasInIsr.head
            debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
                  .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
        }
        info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
        (newLeaderAndIsr, liveAssignedReplicas)

 

 

 

 

1
1
分享到:
评论

相关推荐

    Linux搭建Kafka开发环境

    在Kafka的设计思想方面,文章阐述了Kafka Broker的Leader选举机制。在Zookeeper集群的管理下,所有Kafka Broker节点会尝试在Zookeeper上注册临时节点以成为Leader。如果Leader宕机,会重新选举新的Leader。同时,...

    Kafka 分区(详细).doc

    1. 少部分副本宕机:当 leader 宕机了,会从 follower 选择一个作为 leader。当宕机的重新恢复时,会把之前 commit 的数据清空,重新从 leader 里 pull 数据。 2. 全部副本宕机: * 等待 ISR 中的一个恢复后,并选...

    kafka流程原理图.pdf

    副本的作用是为分区数据提供备份,确保在Broker宕机或其他异常情况下数据不会丢失。 生产者(Producer)是向Kafka集群写入数据的应用程序。生产者按照partition key将数据发送到对应的partition,如果key相同,则...

    Kafka 实战演练 3

    这种设计保证了即使某个Broker宕机,其他Broker仍能接替其角色,继续提供服务。 复制策略主要有两种:异步复制和同步复制。在异步复制中,Leader只需将消息写入本地日志,无需等待所有Followers确认即可返回成功...

    Kafka 实战演练 5

    - 当一个分区的领导者宕机或出现故障,ZooKeeper会检测到这个变化并创建一个新的临时节点。 - 其他副本节点尝试创建这个节点的子节点,第一个成功创建的副本成为新的领导者。 - 新的领导者会更新其ISR,并通知其他...

    kafka 与zookeeper打包.rar

    当领导者宕机时,Zookeeper会协助进行新的领导者选举。 3. **消费组管理**:Kafka的消费者是以消费组的形式工作,每个消息只能被组内的一个消费者消费。Zookeeper负责维护消费组成员信息和每个消费者所订阅的主题...

    kafka.zip 压缩包

    3. **高可用性**: 如果 leader 宕机,一个 follower 将被选举为新的 leader,确保服务的连续性。 4. **高吞吐量**: Kafka 通过批量发送和批量读取以及零拷贝技术实现高吞吐量。 5. **延迟与时效性**: Kafka 设计时...

    kafka 学习笔记 good

    - **实现机制**:Kafka的分区和副本机制确保即使某些Broker宕机,数据依然可访问。 ##### 6. 顺序保证 - **定义**:确保消息按照发送的顺序进行处理。 - **实现机制**:Kafka保证在一个分区内的消息是有顺序的,这...

    02-VIP-kafka设计原理详解1

    - 如果当前Controller宕机,Zookeeper上的临时节点会消失,其他broker会监听这个节点的变化,一旦发现节点消失,它们将再次竞争创建临时节点,确保新的Controller产生。 2. **Controller的职责** - **监听broker...

    2024年java面试题-Kafka面试题

    - **Leader选举**:当Leader宕机时,Kafka会从ISR中选择一个数据最新的Follower作为新的Leader。 - **ISR管理**:Leader维护一个与自身基本同步的Follower副本列表(ISR)。当Follower长时间未与Leader同步或落后...

    kafka原理文档

    这种设计提高了系统的可用性,因为即使主副本宕机,一个从副本可以接管并成为新的领导者,保证服务不间断。 高可用性副本机制在Kafka 0.8版本之后被引入,以解决早期版本中单个或多个broker故障导致的服务中断问题...

    Kafka知识汇总 18道1

    当 Leader 宕机时,ISR 中的副本会根据其数据的最新状态被选为新的 Leader。 Kafka 还引入了水印备份机制,包括 LEO(Last End Offset)和 HW(High Watermark)。LEO 表示副本最后一条消息的位移,而 HW 是 Leader...

    Kafka面试专题及答案.pdf

    如果消费者组内的某个消费者宕机,其负责的消息会自动分配给其他在线消费者。 10.Kafka 的高可用性和容错性: Kafka 集群中的每个topic都有多个副本(replicas),其中一个被选为leader,负责接收生产者的数据和...

    消息队列kafka介绍.pdf

    2. request.required.acks=1(默认):生产者在leader副本成功接收到消息后继续发送下一批消息,但如果leader宕机,则会丢失数据。 3. request.required.acks=-1:生产者等待ISR(In-Sync Replicas)中所有副本确认...

    kafka+zookeper

    如果领导者宕机,ZooKeeper会负责新的领导者选举,保证服务的连续性。 3. **消费者组管理**:Kafka的消费者以组的形式工作,组内的成员共享订阅的主题。ZooKeeper记录消费者组的成员信息和每个消费者所消费的分区,...

    zookeeper-3.4.14.zip

    当某个分区的领导者(Leader)宕机时,Zookeeper会负责组织选举新的领导者,确保服务的连续性。选举过程通过Zookeeper的Watch机制实时监控节点状态变化,并快速响应。 3. **配置管理**:Kafka的配置信息,如主题的...

    kafka学习非常详细的经典教程

    如果Leader宕机,从副本中会选举新的Leader。副本的引入提供了容错能力,确保了服务的高可用性。 5. **分布式特性** Kafka的分布式特性体现在多个层面:生产者、消费者和Broker之间的分布,以及分区的副本分布。...

    ZooKeeper 是一个针对大型分布式系统的可靠协调系统

    1. **数据持久化**:ZooKeeper 可以配置为持久化存储,即使部分 Server 宕机,也能从磁盘恢复数据。 2. **监控与日志**:通过监控 ZooKeeper 的日志和指标,可以了解系统运行状况,及时发现并解决问题。 3. **安全...

Global site tag (gtag.js) - Google Analytics