`

Zookeeper分布式锁的原理

 
阅读更多

上一篇写了zk分布式锁的使用,现在我们来看一下Curator是怎么实现分布式锁的。

简单的来说:

(1)各个线程在当前path下生成顺序节点;

(2)序号为0的节点成功拿到锁;

(3)没有拿到锁的节点会增加一个对上一个节点的Watch,并阻塞;

(4)当第一个节点删除时,下一个节点被唤醒,重新去拿锁。(或者阻塞一定时间后删除自身节点,返回获取锁失败)

 

 

首先来看一下获取锁的代码:

 

lock.acquire();//会一直阻塞到获得锁成功

或者

boolean locked= lock.acquire(2000, TimeUnit.MILLISECONDS);

1.点进去可以看到,调用的方法是InterProcessMutex.internalLock()

 

 

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        Thread currentThread = Thread.currentThread();
        //先看当前线程是否已经获得过锁
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            //有则直接重入
            lockData.lockCount.incrementAndGet(); 
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

 2.再看一下internals.attemptLock(time, unit, getLockNodeBytes())

 

 

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                //关键看这两句
                //先成当前的path;
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //再判断是否获取到了锁
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

 3.再看 driver.createsTheLock(client, path, localLockNodeBytes)

 

 @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

 可以看到,只是在path下面生成临时的顺序节点。

 

 

4.判断是否获得到了锁:hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);

 

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //获取到排序之后的子节点
                List<String>        children = getSortedChildren();
                //截取出当前顺序节点的名称
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                //生成判断的结果
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    //成功获得了锁
                    haveTheLock = true;
                }
                else
                {
                    //获取失败时,Watch前一个节点
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            //注册Watch
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
                                //阻塞,等待watch事件触发后的通知
                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

 5. PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);

  

 @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
        int             ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);

        boolean         getsTheLock = ourIndex < maxLeases;//从前面可以看到maxLeases的值是1,所以只有当前节点是第一个顺序节点时,才能成功获得锁,其他都会失败。
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); //如果获取锁失败了,要继续等待,那必须要对前面的节点进行watch,

        return new PredicateResults(pathToWatch, getsTheLock);
    }

 6.被watch的节点发生改变时,会唤醒阻塞的线程去重新竞争锁;

 private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            notifyFromWatcher();
        }
    };

  private synchronized void notifyFromWatcher()
    {
        notifyAll();
    }

 

 

 

 

 

分享到:
评论

相关推荐

    从Paxos到Zookeeper分布式一致性原理与实践PDF

    《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,其中重点讲解了Paxos算法与Zookeeper在实际应用中的理论与实践。Paxos是分布式计算领域中著名的共识算法,为解决分布式...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践

    《从PAXOS到ZOOKEEPER分布式一致性原理与实践》这本书深入探讨了分布式系统中的一个重要议题——一致性。一致性是确保分布式系统中各个节点数据同步的关键,它在高可用、高性能的分布式服务中扮演着核心角色。PAXOS...

    zookeeper分布式锁实现和客户端简单实现

    **Zookeeper的分布式锁实现原理** 1. **节点创建与监视**: Zookeeper允许客户端创建临时节点,这些节点会在客户端断开连接时自动删除。分布式锁的实现通常会为每个请求创建一个临时顺序节点,按照创建的顺序形成一...

    从Paxos到Zookeeper分布式一致性原理与实践包括源码

    Zookeeper基于Paxos和其他一致性算法的实现,为分布式应用程序提供了命名服务、配置管理、分布式锁、群组服务等功能。Zookeeper通过ZNode(类似于文件系统的节点)来存储和操作数据,并采用观察者模式来实时监控数据...

    从Paxos到Zookeeper分布式一致性原理与实践 + ZooKeeper-分布式过程协同技术详解 pdf

    《从Paxos到Zookeeper分布式一致性原理与实践》与《ZooKeeper-分布式过程协同技术详解》这两本书深入探讨了分布式系统中的一个重要概念——一致性,以及如何通过ZooKeeper这一工具来实现高效的分布式协同。...

    zookeeper做分布式锁

    分布式锁原理** 分布式锁的目的是在多个节点之间实现对共享资源的互斥访问。在ZooKeeper中,通常通过创建临时顺序节点(ephemeral sequential nodes)来实现这一目标。每个客户端在获取锁时都会创建这样的节点,...

    一文彻底理解ZooKeeper分布式锁的实现原理

    《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...

    zookeeper分布式锁

    Zookeeper分布式锁的工作原理: 1. **会话和临时节点**:Zookeeper支持两种类型的节点,持久节点和临时节点。临时节点在客户端会话失效(例如,客户端崩溃或网络断开)时会被自动删除,这为实现分布式锁提供了一个...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践 源码

    《从PAXOS到ZOOKEEPER分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的书籍,其中包含了源码分析,可以帮助读者更深入地理解相关技术。这本书籍主要围绕PAXOS协议和ZOOKEEPER分布式协调服务展开,旨在...

    ZooKeeper分布式过程协同技术详解_new.pdf

    总的来说,《ZooKeeper分布式过程协同技术详解》是一本全面覆盖ZooKeeper理论和实践的著作,它不仅适合初学者了解ZooKeeper的基本原理,也对有一定经验的开发者提供了深入的技术细节和实战指导。通过阅读这本书,...

    《Paxos到Zookeeper——分布式一致性原理与实践》高清完整版

    5. 实战应用:通过具体案例展示如何在实际项目中运用Zookeeper解决分布式协调问题,例如配置管理、服务发现、分布式锁等。 6. 高可用与故障恢复:探讨Zookeeper的容错机制,如何保证服务的高可用性和数据的一致性。 ...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践完整版

    《从PAXOS到ZOOKEEPER分布式一致性原理与实践》是一本深入探讨分布式系统中一致性问题的重要书籍。在分布式计算领域,一致性是确保多个节点间数据同步的关键特性,它对于构建可靠、高可用的系统至关重要。这本书主要...

    PAXOS到ZOOKEEPER分布式一致性原理与实践带目录详细版

    《PAXOS到ZOOKEEPER分布式一致性原理与实践》是一部深入探讨分布式一致性问题的著作,其中涵盖了从经典的PAXOS算法到广泛应用的ZooKeeper系统的关键知识点。这本书旨在帮助读者理解并掌握如何在分布式环境中实现强...

    分布式锁原理讲解视频资料

    本视频资料深入浅出地讲解了分布式锁的原理、实现方式以及其在实际应用中的场景。 首先,我们来了解分布式锁的基本原理。分布式锁的目的是解决在分布式环境下,多节点并发访问同一资源时可能出现的数据不一致问题。...

    《从PAXOS到ZOOKEEPER分布式一致性原理与实践》高清版(带书签)

    《从PAXOS到ZOOKEEPER分布式一致性原理与实践》是一本深入探讨分布式系统核心概念的书籍,尤其关注在分布式环境中如何实现数据的一致性。PAXOS算法是分布式计算领域的一个里程碑,它为解决分布式系统中的共识问题...

    从Paxos到Zookeeper 分布式一致性原理与实践

    4. 分布式锁:利用Zookeeper可以实现分布式锁,通过创建临时节点,监听节点变化,确保在任何时刻只有一个客户端获得锁。 5. 集群管理:Zookeeper可以作为集群中的仲裁者,用于选举主节点,监控节点状态,实现服务...

    ZookeeperNet实现分布式锁

    通过深入理解Zookeeper的工作原理以及ZookeeperNet库的使用,开发者可以有效地在C#环境中实现高可用的分布式锁,保障多节点之间的协同工作和数据一致性。在实际项目中,分布式锁可以广泛应用于数据库操作、并发任务...

Global site tag (gtag.js) - Google Analytics