- 初始化操作
- 返回一个topic的所有partition
- 跟进brokerId返回broker
- 返回一个cluster中的所有的broker
- updateInfo方法是用来更新zk集群里面的数据结构
- close是做一些相关的资源关闭操作
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
private Map<String, SortedSet<Partition>> getZKTopicPartitionInfo() { // 创建一个结果集合,里面存放topic和partition之间的对应关系 final Map<String, SortedSet<Partition>> brokerPartitionsPerTopic = new HashMap<String, SortedSet<Partition>>(); // 保证路径 /brokers/topics 路径在zk上存在 ZkUtils.makeSurePersistentPathExists( zkClient, ZkUtils.BrokerTopicsPath ); // 获取 /brokers/topics 路径下的所有child,因为路径是/brokers/topics/[topic] List<String> topics = ZkUtils.getChildrenParentMayNotExist( zkClient, ZkUtils.BrokerTopicsPath ); for (String topic : topics) { // find the number of broker partitions registered for this topic String brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic; // 获取到所有的partition,路径如下:/brokers/topics/[topic]/[0...N] List<String> brokerList = ZkUtils.getChildrenParentMayNotExist( zkClient, brokerTopicPath); // final SortedSet<Partition> sortedBrokerPartitions = new TreeSet<Partition>(); for (String bid : brokerList) { final String numPath = brokerTopicPath + "/" + bid; final Integer numPartition = Integer.valueOf(ZkUtils.readData(zkClient , numPath)); final int ibid = Integer.parseInt(bid); for (int i = 0; i < numPartition.intValue(); i++) { sortedBrokerPartitions.add( new Partition(ibid, i)); } } logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions); brokerPartitionsPerTopic.put(topic, sortedBrokerPartitions); } return brokerPartitionsPerTopic; }我们再来看获取所有broker的代码:
/brokers/ids/[0...N] --> host:port (ephemeral node)
private Map<Integer, Broker> getZKBrokerInfo() { Map<Integer, Broker> brokers = new HashMap<Integer, Broker>(); List<String> allBrokersIds = ZkUtils.getChildrenParentMayNotExist( zkClient, ZkUtils.BrokerIdsPath ); if (allBrokersIds != null) { logger.info("read all brokers count: " + allBrokersIds.size()); for (String brokerId : allBrokersIds) { String brokerInfo = ZkUtils.readData(zkClient, ZkUtils. BrokerIdsPath + "/" + brokerId); Broker createBroker = Broker.createBroker(Integer.valueOf(brokerId), brokerInfo); brokers.put(Integer.valueOf(brokerId), createBroker); logger.info("Loading Broker " + createBroker); } } return brokers; }关于获取路径信息的相关就不进行多描述了,下来我们重点看下BrokerTopicListener这个类的实现:
- 加入了新的代理
- 有一个代理下线了
- 注册了新的话题
- 代理注册了已有话题。
class BrokerTopicsListener implements IZkChildListener { private Map<String, SortedSet<Partition>> originalBrokerTopicsParitions ; private Map<Integer, Broker> originBrokerIds; public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions, Map<Integer, Broker> originBrokerIds) { super(); this.originalBrokerTopicsParitions = new LinkedHashMap<String, SortedSet<Partition>>( originalBrokerTopicsParitions); this.originBrokerIds = new LinkedHashMap<Integer, Broker>(originBrokerIds); logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + "/broker/topics, /broker/topics/<topic>, /broker/<ids>"); logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "partition id per topic with " + originalBrokerTopicsParitions); } public void handleChildChange(final String parentPath, List<String> currentChilds) throws Exception { final List<String> curChilds = currentChilds != null ? currentChilds : new ArrayList<String>(); synchronized (zkWatcherLock ) { // 如果parentPath等于/brokers/topics if (ZkUtils.BrokerTopicsPath .equals(parentPath)) { // 将之前的topic删除 Iterator<String> updatedTopics = curChilds.iterator(); while (updatedTopics.hasNext()) { String t = updatedTopics.next(); if (originalBrokerTopicsParitions .containsKey(t)) { updatedTopics.remove(); } } // 对于新创建的topic和相应的brokers绑定 for (String addedTopic : curChilds) { String path = ZkUtils.BrokerTopicsPath + "/" + addedTopic; List<String> brokerList = ZkUtils.getChildrenParentMayNotExist( zkClient, path); processNewBrokerInExistingTopic(addedTopic, brokerList); zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + addedTopic, brokerTopicsListener); } } else if (ZkUtils.BrokerIdsPath .equals(parentPath)) { // 处理broker新增的情况 processBrokerChange(parentPath, curChilds); } else { //check path: /brokers/topics/<topicname> String[] ps = parentPath.split("/" ); if (ps.length == 4 && "topics".equals(ps[2])) { logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + " list of brokers -> " + curChilds + " for topic -> " + ps[3]); // broker的状态发生变化 processNewBrokerInExistingTopic(ps[3], curChilds); } } // //update the data structures tracking older state values resetState(); } } private void processBrokerChange(String parentPath, List<String> curChilds) { final Map<Integer, Broker> oldBrokerIdMap = new HashMap<Integer, Broker>(originBrokerIds ); for (int i = curChilds.size() - 1; i >= 0; i--) { Integer brokerId = Integer.valueOf(curChilds.get(i)); if (oldBrokerIdMap.remove(brokerId) != null) {//old topic curChilds.remove(i); //remove old topics and left new topics } } //now curChilds are all new brokers //oldBrokerIdMap are all dead brokers for (String newBroker : curChilds) { final String brokerInfo = ZkUtils.readData(zkClient, ZkUtils. BrokerIdsPath + "/" + newBroker); String[] brokerHostPort = brokerInfo.split(":" );//format creatorId:host:port final Integer newBrokerId = Integer.valueOf(newBroker); final Broker broker = new Broker(newBrokerId.intValue(),// brokerHostPort[1], // brokerHostPort[1], // Integer.parseInt(brokerHostPort[2])); allBrokers.put(newBrokerId, broker); callback.producerCbk(broker.id , broker.host, broker.port ); } // //remove all dead broker and remove all broker-partition from topic list for (Map.Entry<Integer, Broker> deadBroker : oldBrokerIdMap.entrySet()) { //remove dead broker allBrokers.remove(deadBroker.getKey()); //remove dead broker-partition from topic for (Map.Entry<String, SortedSet<Partition>> topicParition : topicBrokerPartitions.entrySet()) { Iterator<Partition> partitions = topicParition.getValue().iterator(); while (partitions.hasNext()) { Partition p = partitions.next(); if (deadBroker.getKey().intValue() == p.brokerId ) { partitions.remove(); } } } } } private void processNewBrokerInExistingTopic(String topic, List<String> brokerList) { SortedSet<Partition> updatedBrokerParts = getBrokerPartitions(zkClient, topic, brokerList); SortedSet<Partition> oldBrokerParts = topicBrokerPartitions.get(topic); SortedSet<Partition> mergedBrokerParts = new TreeSet<Partition>(); if (oldBrokerParts != null) { mergedBrokerParts.addAll(oldBrokerParts); } //override old parts or add new parts mergedBrokerParts.addAll(updatedBrokerParts); // // keep only brokers that are alive Iterator<Partition> iter = mergedBrokerParts.iterator(); while (iter.hasNext()) { if (!allBrokers.containsKey(iter.next().brokerId )) { iter.remove(); } } // mergedBrokerParts = Sets.filter(mergedBrokerParts, new Predicate<Partition>() { // // public boolean apply(Partition input) { // return allBrokers.containsKey(input.brokerId); // } // }); topicBrokerPartitions.put(topic, mergedBrokerParts); logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts); } private void resetState() { logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + this.originalBrokerTopicsParitions ); this.originalBrokerTopicsParitions = new HashMap<String, SortedSet<Partition>>(topicBrokerPartitions ); logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + originalBrokerTopicsParitions ); // logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + originBrokerIds); this.originBrokerIds = new HashMap<Integer, Broker>(allBrokers ); logger.debug("[BrokerTopicsListener] After reseting broker id map state " + originBrokerIds); } } class ZKSessionExpirationListener implements IZkStateListener { public void handleNewSession() throws Exception { /** * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has * reestablished a connection for us. */ logger.info("ZK expired; release old list of broker partitions for topics "); topicBrokerPartitions = getZKTopicPartitionInfo(); allBrokers = getZKBrokerInfo(); brokerTopicsListener.resetState(); // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above, // it automatically recreates the watchers there itself for (String topic : topicBrokerPartitions.keySet()) { zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, brokerTopicsListener); } // there is no need to re-register other listeners as they are listening on the child changes of // permanent nodes } public void handleStateChanged(KeeperState state) throws Exception { } } /** * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers * specified * * @param topic the topic to which the brokers have registered * @param brokerList the list of brokers for which the partitions info is to be generated * @return a sequence of (brokerId, numPartitions) for brokers in brokerList */ private static SortedSet<Partition> getBrokerPartitions(ZkClient zkClient, String topic, List<?> brokerList) { final String brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic; final SortedSet<Partition> brokerParts = new TreeSet<Partition>(); for (Object brokerId : brokerList) { final Integer bid = Integer.valueOf(brokerId.toString()); final Integer numPartition = Integer.valueOf(ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid)); for (int i = 0; i < numPartition.intValue(); i++) { brokerParts.add( new Partition(bid, i)); } } return brokerParts; }
分区表和分区索引是数据库管理系统中的重要概念,它们对于大型数据存储的高效管理和查询具有至关重要的作用。本文将深入探讨这两个主题,帮助你更好地理解它们的原理、用途以及实际应用。 1. 分区表(Partitioning...
- **定义**:组合分区是指在一个分区策略之上再叠加另一种分区策略。例如,可以先按照年份进行Range分区,然后再对每个月的数据进行Hash分区。 - **适用场景**:当数据结构复杂,单一的分区策略难以满足需求时。 - *...
### MS SQL Server 分区表、分区索引详解 #### 一、分区表简介 使用分区表的主要目的是为了改善大型表及具有多种访问模式的表的可伸缩性和可管理性。...同时,正确创建分区函数也是实现这一目标的关键步骤之一。
### 常用分区软件详解:PQ与DM #### PartitionMagic(PQ):分区管理的魔法 **PartitionMagic**,简称PQ,是一款功能强大的磁盘分区管理软件,适用于Windows操作系统。它允许用户在不破坏现有数据的情况下,轻松...
### 深入学习Oracle分区表 在Oracle数据库管理中,分区是一种重要的技术手段,能够显著提升大型数据集的查询性能。特别是在Oracle 10g版本中,支持了三种主要的分区表创建方式:范围分区(Range)、哈希分区(Hash...
### 动态分区存储管理知识点解析 #### 一、动态分区存储管理概述 **动态分区存储管理**是一种在操作系统中管理...通过以上知识点的学习和实践,不仅可以加深对动态分区存储管理的理解,还能提高解决实际问题的能力。
主分区、扩展分区和逻辑分区是硬盘分区中的三个主要概念,它们共同构成了硬盘的逻辑结构,使得用户能够根据需要创建多个独立的工作空间。下面将详细阐述这三个概念。 首先,我们来看主引导扇区(Master Boot Record...
ORACLE 分区表、分区索引、索引分区实例讲解 以下是对 ORACLE 分区表、分区索引、索引分区的详细知识点: 什么是分区表 在 Oracle 数据库中,分区表是一种提高应用系统性能和方便数据管理的方法。它将大型表或...
在Android系统中,分区是管理操作系统和数据存储的关键部分。每个分区都有特定的功能,例如系统分区存储核心OS组件,用户分区则保存用户数据和应用程序。本文档“Android 增加一个分区配置指南 V1.00_rk系统新增分区...