`

Jafka学习之消息分区

    博客分类:
  • mq
阅读更多

     我们首先先来看下分区的定义:



 

   在接口上可以进行下面的操作: 

  • 初始化操作
  • 返回一个topic的所有partition
  • 跟进brokerId返回broker
  • 返回一个cluster中的所有的broker
  • updateInfo方法是用来更新zk集群里面的数据结构
  • close是做一些相关的资源关闭操作
Config类型的比较好理解,其实就是解析用户传递的相关配置文件,对brokerPartiions和allBrokers进行初始化。
比较难理解的是ZK方式,也就是集群模式下的相关配置,我们来仔细看下相关实现:
我们先来看下getZKTopicPartitionInfo方法的实现:
再看之前我们先来看下这个如何获得某个topic的partition数量,如果我们了解下面的存储结构,我们就大概了解了:

代理话题的注册

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
 
每个代理会都要注册在某话题之下,注册后它会维护并保存该话题的分区总数。
上面的[topic]就是某个话题,0...N就是brokerId,这个路径里面存储的数据就是partition的数量,它是动态变化的,我们再来看代码,就非常明了。
private Map<String, SortedSet<Partition>> getZKTopicPartitionInfo() {
        // 创建一个结果集合,里面存放topic和partition之间的对应关系
        final Map<String, SortedSet<Partition>> brokerPartitionsPerTopic = new HashMap<String, SortedSet<Partition>>();
        // 保证路径 /brokers/topics 路径在zk上存在
        ZkUtils.makeSurePersistentPathExists( zkClient, ZkUtils.BrokerTopicsPath );
        // 获取 /brokers/topics 路径下的所有child,因为路径是/brokers/topics/[topic]
        List<String> topics = ZkUtils.getChildrenParentMayNotExist( zkClient, ZkUtils.BrokerTopicsPath );
        for (String topic : topics) {
            // find the number of broker partitions registered for this topic
            String brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic;
            // 获取到所有的partition,路径如下:/brokers/topics/[topic]/[0...N]
            List<String> brokerList = ZkUtils.getChildrenParentMayNotExist( zkClient, brokerTopicPath);
            //
            final SortedSet<Partition> sortedBrokerPartitions = new TreeSet<Partition>();
            for (String bid : brokerList) {
                final String numPath = brokerTopicPath + "/" + bid;
                final Integer numPartition = Integer.valueOf(ZkUtils.readData(zkClient , numPath));
                final int ibid = Integer.parseInt(bid);
                for (int i = 0; i < numPartition.intValue(); i++) {
                    sortedBrokerPartitions.add( new Partition(ibid, i));
                }
            }
            logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions);
            brokerPartitionsPerTopic.put(topic, sortedBrokerPartitions);
        }
        return brokerPartitionsPerTopic;
    }
      我们再来看获取所有broker的代码:
首先我们还是来看下broker的路径存储格式:
/brokers/ids/[0...N] --> host:port (ephemeral node)
 
如果再来看代码的话,就非常明白了:
    
private Map<Integer, Broker> getZKBrokerInfo() {
        Map<Integer, Broker> brokers = new HashMap<Integer, Broker>();
        List<String> allBrokersIds = ZkUtils.getChildrenParentMayNotExist( zkClient, ZkUtils.BrokerIdsPath );
        if (allBrokersIds != null) {
            logger.info("read all brokers count: " + allBrokersIds.size());
            for (String brokerId : allBrokersIds) {
                String brokerInfo = ZkUtils.readData(zkClient, ZkUtils. BrokerIdsPath + "/" + brokerId);
                Broker createBroker = Broker.createBroker(Integer.valueOf(brokerId), brokerInfo);
                brokers.put(Integer.valueOf(brokerId), createBroker);
                logger.info("Loading Broker " + createBroker);
            }
        }
        return brokers;
    }
 关于获取路径信息的相关就不进行多描述了,下来我们重点看下BrokerTopicListener这个类的实现:

下面讲解基于zookeeper的负载均衡的工作原理。它主要到下面的几个事件进行监听。

  • 加入了新的代理
  • 有一个代理下线了
  • 注册了新的话题
  • 代理注册了已有话题。
 
下面我们来仔细看看zk的watch函数:
class BrokerTopicsListener implements IZkChildListener {

        private Map<String, SortedSet<Partition>> originalBrokerTopicsParitions ;

        private Map<Integer, Broker> originBrokerIds;

        public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions,
                Map<Integer, Broker> originBrokerIds) {
            super();
            this.originalBrokerTopicsParitions = new LinkedHashMap<String, SortedSet<Partition>>(
                    originalBrokerTopicsParitions);
            this.originBrokerIds = new LinkedHashMap<Integer, Broker>(originBrokerIds);
            logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + "/broker/topics, /broker/topics/<topic>, /broker/<ids>");
            logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "partition id per topic with " + originalBrokerTopicsParitions);
        }

        public void handleChildChange(final String parentPath, List<String> currentChilds) throws Exception {
            final List<String> curChilds = currentChilds != null ? currentChilds : new ArrayList<String>();
            synchronized (zkWatcherLock ) {
                // 如果parentPath等于/brokers/topics
                if (ZkUtils.BrokerTopicsPath .equals(parentPath)) {
                    // 将之前的topic删除
                    Iterator<String> updatedTopics = curChilds.iterator();
                    while (updatedTopics.hasNext()) {
                        String t = updatedTopics.next();
                        if (originalBrokerTopicsParitions .containsKey(t)) {
                            updatedTopics.remove();
                        }
                    }
                    // 对于新创建的topic和相应的brokers绑定
                    for (String addedTopic : curChilds) {
                        String path = ZkUtils.BrokerTopicsPath + "/" + addedTopic;
                        List<String> brokerList = ZkUtils.getChildrenParentMayNotExist( zkClient, path);
                        processNewBrokerInExistingTopic(addedTopic, brokerList);
                        zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + addedTopic,
                                brokerTopicsListener);
                    }
                } else if (ZkUtils.BrokerIdsPath .equals(parentPath)) {
                    // 处理broker新增的情况
                    processBrokerChange(parentPath, curChilds);
                } else {
                    //check path: /brokers/topics/<topicname>
                    String[] ps = parentPath.split("/" );
                    if (ps.length == 4 && "topics".equals(ps[2])) {
                        logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + " list of brokers -> " + curChilds + " for topic -> " + ps[3]);
                        // broker的状态发生变化
                        processNewBrokerInExistingTopic(ps[3], curChilds);
                    }
                }
                //
                //update the data structures tracking older state values
                resetState();
            }
        }


        private void processBrokerChange(String parentPath, List<String> curChilds) {
            final Map<Integer, Broker> oldBrokerIdMap = new HashMap<Integer, Broker>(originBrokerIds );
            for (int i = curChilds.size() - 1; i >= 0; i--) {
                Integer brokerId = Integer.valueOf(curChilds.get(i));
                if (oldBrokerIdMap.remove(brokerId) != null) {//old topic
                    curChilds.remove(i); //remove old topics and left new topics
                }
            }
            //now curChilds are all new brokers
            //oldBrokerIdMap are all dead brokers
            for (String newBroker : curChilds) {
                final String brokerInfo = ZkUtils.readData(zkClient, ZkUtils. BrokerIdsPath + "/" + newBroker);
                String[] brokerHostPort = brokerInfo.split(":" );//format creatorId:host:port
                final Integer newBrokerId = Integer.valueOf(newBroker);
                final Broker broker = new Broker(newBrokerId.intValue(),//
                        brokerHostPort[1], //
                        brokerHostPort[1], //
                        Integer.parseInt(brokerHostPort[2]));
                allBrokers.put(newBrokerId, broker);
                callback.producerCbk(broker.id , broker.host, broker.port );
            }
            //
            //remove all dead broker and remove all broker-partition from topic list
            for (Map.Entry<Integer, Broker> deadBroker : oldBrokerIdMap.entrySet()) {
                //remove dead broker
                allBrokers.remove(deadBroker.getKey());

                //remove dead broker-partition from topic
                for (Map.Entry<String, SortedSet<Partition>> topicParition : topicBrokerPartitions.entrySet()) {
                    Iterator<Partition> partitions = topicParition.getValue().iterator();
                    while (partitions.hasNext()) {
                        Partition p = partitions.next();
                        if (deadBroker.getKey().intValue() == p.brokerId ) {
                            partitions.remove();
                        }
                    }
                }
            }
        }

        private void processNewBrokerInExistingTopic(String topic, List<String> brokerList) {

            SortedSet<Partition> updatedBrokerParts = getBrokerPartitions(zkClient, topic, brokerList);
            SortedSet<Partition> oldBrokerParts = topicBrokerPartitions.get(topic);
            SortedSet<Partition> mergedBrokerParts = new TreeSet<Partition>();
            if (oldBrokerParts != null) {
                mergedBrokerParts.addAll(oldBrokerParts);
            }
            //override old parts or add new parts
            mergedBrokerParts.addAll(updatedBrokerParts);
            //
            // keep only brokers that are alive
            Iterator<Partition> iter = mergedBrokerParts.iterator();
            while (iter.hasNext()) {
                if (!allBrokers.containsKey(iter.next().brokerId )) {
                    iter.remove();
                }
            }
            //            mergedBrokerParts = Sets.filter(mergedBrokerParts, new Predicate<Partition>() {
            //
            //                public boolean apply(Partition input) {
            //                    return allBrokers.containsKey(input.brokerId);
            //                }
            //            });
            topicBrokerPartitions.put(topic, mergedBrokerParts);
            logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts);
        }

        private void resetState() {
            logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + this.originalBrokerTopicsParitions );
            this.originalBrokerTopicsParitions = new HashMap<String, SortedSet<Partition>>(topicBrokerPartitions );
            logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + originalBrokerTopicsParitions );
            //
            logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + originBrokerIds);
            this.originBrokerIds = new HashMap<Integer, Broker>(allBrokers );
            logger.debug("[BrokerTopicsListener] After reseting broker id map state " + originBrokerIds);
        }
    }

    class ZKSessionExpirationListener implements IZkStateListener {

        public void handleNewSession() throws Exception {
            /**
             * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has
             * reestablished a connection for us.
             */
            logger.info("ZK expired; release old list of broker partitions for topics ");
            topicBrokerPartitions = getZKTopicPartitionInfo();
            allBrokers = getZKBrokerInfo();
            brokerTopicsListener.resetState();

            // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated
            // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above,
            // it automatically recreates the watchers there itself
            for (String topic : topicBrokerPartitions.keySet()) {
                zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, brokerTopicsListener);
            }
            // there is no need to re-register other listeners as they are listening on the child changes of
            // permanent nodes
        }

        public void handleStateChanged(KeeperState state) throws Exception {
        }
    }

    /**
     * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers
     * specified
     *
     * @param topic the topic to which the brokers have registered
     * @param brokerList the list of brokers for which the partitions info is to be generated
     * @return a sequence of (brokerId, numPartitions) for brokers in brokerList
     */
    private static SortedSet<Partition> getBrokerPartitions(ZkClient zkClient, String topic, List<?> brokerList) {
        final String brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic;
        final SortedSet<Partition> brokerParts = new TreeSet<Partition>();
        for (Object brokerId : brokerList) {
            final Integer bid = Integer.valueOf(brokerId.toString());
            final Integer numPartition = Integer.valueOf(ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid));
            for (int i = 0; i < numPartition.intValue(); i++) {
                brokerParts.add( new Partition(bid, i));
            }
        }
        return brokerParts;
    }
 
  • 大小: 41.3 KB
分享到:
评论

相关推荐

    全面学习分区表及分区索引

    通过学习《三思笔记》中的全面学习分区表及分区索引,你可以深入了解这些概念,掌握如何在实际项目中应用它们,以实现高效的数据管理与查询优化。这份资料将详细讲解分区表的设计原则、索引的创建与管理,以及在不同...

    深入学习分区表及分区索引(详解oracle分区).docx

    Oracle分区技术是数据库管理系统中用于优化大规模数据存储和查询性能的一种高级特性。它允许将大表和索引分成较小、更易管理和操作的部分,每个部分称为一个分区。分区的主要目标是提高查询性能、简化管理任务并增强...

    全面学习分区表及分区索引一

    全面学习分区表及分区索引一全面学习分区表及分区索引一

    [三思笔记]全面学习分区表及分区索引.pdf

    分区表和分区索引是数据库管理系统中的重要概念,它们对于大型数据存储的高效管理和查询具有至关重要的作用。本文将深入探讨这两个主题,帮助你更好地理解它们的原理、用途以及实际应用。 1. 分区表(Partitioning...

    oracle分区表之hash分区表的使用及扩展

    Oracle分区表中的Hash分区是一种基于哈希算法的分区策略,适用于处理无法清晰定义分区范围的大型数据表。这种分区方式通过计算分区键的哈希值来决定数据存储在哪个分区,以此达到数据分散和负载均衡的目的。Hash分区...

    全面学习分区表及分区索引二

    全面学习分区表及分区索引二全面学习分区表及分区索引二

    深入学习分区表及分区索引(详解oracle分区.docx

    Oracle数据库的分区技术是一种高效管理和优化大数据量表的方法,它将大表划分为较小、更易管理的部分,称为分区。这种技术尤其适用于处理超过2GB的大表,这在32位操作系统中是一个重要的考虑因素,因为这样的大文件...

    FDisk分区模拟学习程序

    FDisk分区模拟学习程序是一款专为用户提供分区操作模拟练习的工具。在计算机系统中,硬盘分区是一项重要的管理任务,它允许我们将一个物理硬盘划分为多个逻辑存储区域,每个区域都可以独立分配驱动器字母,安装操作...

    MS SQL Server分区表、分区索引详解

    ### MS SQL Server 分区表、分区索引详解 #### 一、分区表简介 使用分区表的主要目的是为了改善大型表及具有多种访问模式的表的可伸缩性和可管理性。...同时,正确创建分区函数也是实现这一目标的关键步骤之一。

    常用分区软件介绍(分区软件)

    ### 常用分区软件详解:PQ与DM #### PartitionMagic(PQ):分区管理的魔法 **PartitionMagic**,简称PQ,是一款功能强大的磁盘分区管理软件,适用于Windows操作系统。它允许用户在不破坏现有数据的情况下,轻松...

    深入学习oracle分区表

    ### 深入学习Oracle分区表 在Oracle数据库管理中,分区是一种重要的技术手段,能够显著提升大型数据集的查询性能。特别是在Oracle 10g版本中,支持了三种主要的分区表创建方式:范围分区(Range)、哈希分区(Hash...

    动态分区存储管理

    ### 动态分区存储管理知识点解析 #### 一、动态分区存储管理概述 **动态分区存储管理**是一种在操作系统中管理...通过以上知识点的学习和实践,不仅可以加深对动态分区存储管理的理解,还能提高解决实际问题的能力。

    主分区、扩展分区、逻辑分区

    主分区、扩展分区和逻辑分区是硬盘分区中的三个主要概念,它们共同构成了硬盘的逻辑结构,使得用户能够根据需要创建多个独立的工作空间。下面将详细阐述这三个概念。 首先,我们来看主引导扇区(Master Boot Record...

    [三思笔记]全面学习分区表及分区索引

    本文将深入探讨分区表(Partitioned Tables)和分区索引(Partitioned Indexes),旨在为数据库管理员和开发人员提供全面的学习资料。我们将围绕以下核心议题展开讨论: 1. **何时使用不同类型的分区** - Range...

    Android 增加一个分区配置指南 V1.00_rk系统新增分区_android_

    在Android系统中,分区是管理操作系统和数据存储的关键部分。每个分区都有特定的功能,例如系统分区存储核心OS组件,用户分区则保存用户数据和应用程序。本文档“Android 增加一个分区配置指南 V1.00_rk系统新增分区...

Global site tag (gtag.js) - Google Analytics