`

ZooKeeper的锁、同步和队列分析

阅读更多

集群管理结构图

 


清单 3. Leader Election 关键代码

                               
 void findLeader() throws InterruptedException { 
        byte[] leader = null; 
        try { 
            leader = zk.getData(root + "/leader", true, null); 
        } catch (Exception e) { 
            logger.error(e); 
        } 
        if (leader != null) { 
            following(); 
        } else { 
            String newLeader = null; 
            try { 
                byte[] localhost = InetAddress.getLocalHost().getAddress(); 
                newLeader = zk.create(root + "/leader", localhost, 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 
            } catch (Exception e) { 
                logger.error(e); 
            } 
            if (newLeader != null) { 
                leading(); 
            } else { 
                mutex.wait(); 
            } 
        } 
    } 

 

共享锁(Locks)

共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

  Zookeeper 实现 Locks 的流程图

同步锁的实现代码如下,完整的代码请看源代码:


清单 4. 同步锁的关键代码

                               
 void getLock() throws KeeperException, InterruptedException{ 
        List<String> list = zk.getChildren(root, false); 
        String[] nodes = list.toArray(new String[list.size()]); 
        Arrays.sort(nodes); 
        if(myZnode.equals(root+"/"+nodes[0])){ 
            doAction(); 
        } 
        else{ 
            waitForLock(nodes[0]); 
        } 
    } 
    void waitForLock(String lower) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true); 
        if(stat != null){ 
            mutex.wait(); 
        } 
        else{ 
            getLock(); 
        } 
    } 

 

队列管理

Zookeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 Zookeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start。

用下面的流程图更容易理解:

同步队列流程图

同步队列的关键代码如下,完整的代码请看附件:


清单 5. 同步队列

                               
 void addQueue() throws KeeperException, InterruptedException{ 
        zk.exists(root + "/start",true); 
        zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, 
        CreateMode.EPHEMERAL_SEQUENTIAL); 
        synchronized (mutex) { 
            List<String> list = zk.getChildren(root, false); 
            if (list.size() < size) { 
                mutex.wait(); 
            } else { 
                zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT); 
            } 
        } 
 } 

 

当队列没满是进入 wait(),然后会一直等待 Watch 的通知,Watch 的代码如下:

 public void process(WatchedEvent event) { 
        if(event.getPath().equals(root + "/start") &&
         event.getType() == Event.EventType.NodeCreated){ 
            System.out.println("得到通知"); 
            super.process(event); 
            doAction(); 
        } 
    } 

 

FIFO 队列用 Zookeeper 实现思路如下:

实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO

下面是生产者和消费者这种队列形式的示例代码,完整的代码请看附件:


清单 6. 生产者代码

                               
 boolean produce(int i) throws KeeperException, InterruptedException{ 
        ByteBuffer b = ByteBuffer.allocate(4); 
        byte[] value; 
        b.putInt(i); 
        value = b.array(); 
        zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT_SEQUENTIAL); 
        return true; 
    } 



清单 7. 消费者代码

                               
 int consume() throws KeeperException, InterruptedException{ 
        int retvalue = -1; 
        Stat stat = null; 
        while (true) { 
            synchronized (mutex) { 
                List<String> list = zk.getChildren(root, true); 
                if (list.size() == 0) { 
                    mutex.wait(); 
                } else { 
                    Integer min = new Integer(list.get(0).substring(7)); 
                    for(String s : list){ 
                        Integer tempValue = new Integer(s.substring(7)); 
                        if(tempValue < min) min = tempValue; 
                    } 
                    byte[] b = zk.getData(root + "/element" + min,false, stat); 
                    zk.delete(root + "/element" + min, 0); 
                    ByteBuffer buffer = ByteBuffer.wrap(b); 
                    retvalue = buffer.getInt(); 
                    return retvalue; 
                } 
            } 
        } 
 } 

 

总结

Zookeeper 作为 Hadoop 项目中的一个子项目,是 Hadoop 集群管理的一个必不可少的模块,它主要用来控制集群中的数据,如它管理 Hadoop 集群中的 NameNode,还有 Hbase 中 Master Election、Server 之间状态同步等。

分享到:
评论

相关推荐

    JAVA面试题(Zookeeper、消息队列、分布式等最新的也有)

    Zookeeper提供了诸如命名服务、配置管理、集群同步、分布式锁和领导者选举等功能。在Java面试中,理解Zookeeper的核心概念如会话、节点(ZNode)、数据模型(树形结构)以及其提供的原子操作(如创建、删除、读取和...

    zookeeper 经典应用设计 示例代码

    zookeeper 经典应用设计 锁、同步和队列分析

    C#基于zookeeper分布式锁的实现源码

    在实际项目中,为了提高性能和减少ZooKeeper的压力,通常会使用一个中间缓存层,比如内存中的队列,来暂存和排序请求。这样,客户端不需要频繁地与ZooKeeper交互,而只需在锁的状态改变时进行通信。 在提供的源码中...

    Zookeeper搭建和原理学习

    1. 分布式锁:Zookeeper 可以实现分布式锁,以确保分布式系统的同步和一致性。 2. 高效性:Zookeeper 的性能非常高效,可以支撑高并发和大规模的分布式系统。 3. 高可用性:Zookeeper 使用 Paxos 算法来确保分布式...

    Zookeeper-3.4.10和Zookeeper-3.5.7.zip

    Zookeeper提供了一种中心化的服务,用于命名、配置管理、分布式同步和组服务。这两个版本,Zookeeper-3.4.10和Zookeeper-3.5.7,代表了该系统的不同稳定性和功能增强阶段。 Zookeeper-3.4.10是3.4.x系列的一个稳定...

    apache-zookeeper-3.5.10-bin 环境搭配

    apache-zookeeper-3.5.10-bin 环境搭配 ...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在$zookeeper_home\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    zookeeper3.8和Dubbo安装包

    6. **集群同步**: 支持分布式锁和队列,实现多个服务间的同步。 **安装Zookeeper 3.8** 1. **下载**: 首先,你需要从Apache官网或镜像站点下载Zookeeper 3.8的安装包。 2. **解压**: 解压缩下载的文件,通常命名为...

    zookeeper-3.4.11.tar.gz

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    Zookeeper篇.pdf

    1.9 zk 的分布式锁 2.0 zk 队列管理 2.1 zk 数据复制 2.2 zk 的工作原理 2.3 zk 是如何保证事物的顺序一致性 2.4 zk 集群下 server 工作状态 2.5 zk 是如何选举 Leader 的? 2.6 zk 同步流程 2.7 分布式通知和协调 ...

    zookeeper-3.4.12版本

    1. **配置管理**:集中存储和管理分布式系统的配置信息,确保所有节点同步更新。 2. **命名服务**:为分布式组件提供全局唯一ID,如分布式服务注册与发现。 3. **分布式锁**:实现分布式环境下的互斥锁,确保同一...

    zookeeper-3.3.6.rar

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    zookeeper 3.8.4

    - **分布式锁**:提供读写锁等同步机制,确保多线程或分布式环境下的数据一致性。 - **队列服务**:实现先进先出(FIFO)的队列,适用于任务调度或消息传递。 ### 3. ZooKeeper 集群架构 Zookeeper 集群由多个...

    zookeeper详解

    - **队列服务(Queuing Services)**:Zookeeper还可以用来构建分布式队列服务,例如实现公平队列和非公平队列。 #### 总结 通过对Zookeeper的深入了解,我们可以看出其在分布式系统中的重要作用。无论是从基础安装...

    深入分析zookeeper实现原理

    ZooKeeper 是一个开源的分布式协调服务,主要用于维护配置信息、提供命名服务、实现分布式同步等。它的设计目标是简化分布式应用的开发过程。 #### 数据模型(znode) - **文件系统:** ZooKeeper 使用类似于文件...

    zookeeper服务和客户端工具

    4. **分布式同步**:Zookeeper提供了原子的读写操作,可以用来实现分布式锁、队列等同步机制,确保在分布式环境中操作的顺序性和一致性。 5. **领导选举**:在Zookeeper集群中,通过选举算法,可以自动选择一个主...

    Zookeeper

    Zookeeper的主要功能包括命名服务、配置管理、集群管理、分布式锁、分布式队列等。 ### 1. 基本概念 - **Znode(节点)**:Zookeeper中的数据单元,类似于文件系统的文件或目录。每个Znode都可以存储数据,并拥有...

    zookeeper-3.4.5-cdh5.15.2.tar.gz

    5. **分发及锁服务**:Zookeeper支持分布式同步和队列,可以实现分布式锁和分布式队列的功能。 Zookeeper的架构由三部分组成:服务器(Server)、客户端(Client)和数据存储(ZNode)。每个Zookeeper服务器都维护...

    zookeeper-3.4.14.rar

    ZooKeeper,一个在分布式系统中广泛使用的开源组件,是Apache Hadoop项目的一部分,主要用于实现分布式环境中的命名服务、配置管理、集群同步以及分布式锁等功能。本文将深入探讨ZooKeeper 3.4.14版本的关键特性和...

    apache-zookeeper-3.6.3.zip

    - **分布式同步**:实现分布式锁、分布式队列等功能,保证数据的一致性。 - **组服务**:实现服务发现,监控服务的健康状态。 在日常运维中,了解Zookeeper的监控工具,如`zkCli.sh`客户端,用于与Zookeeper服务器...

    zookeeper客户端连接工具: zktools

    - **队列和消息队列**:构建简单的FIFO队列,用于协调不同组件的处理顺序。 - **领导者选举**:在分布式系统中确定一个主节点,负责协调其他节点的工作。 zktools作为ZooKeeper的客户端工具,对于开发者和运维人员...

Global site tag (gtag.js) - Google Analytics