`

Curator zookeeper 分布式锁实现

阅读更多

curator版本:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.5.0</version>
        </dependency>

 

 

代码实现

 

   private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )//当前线程已经获得锁(重入),对线程数据的锁+1
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //尝试获取锁,获取锁成功返回zk路径
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }


 

代码调用

 

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                //创建节点,模式:EPHEMERAL_SEQUENTIAL 直译:瞬态的-有序的,不会持久化,client与zk服务断掉链接的时候,此节点会自动删除,这样在服务停机时再重启时,不会出现锁无法释放
                if ( localLockNodeBytes != null )
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
                }
                else
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
                }
                //循环尝试获取锁
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

  代码调用

 

 

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                //验证当前线程持有的node是不是路径下最靠前的node,如果是,获取到锁,否则没有获取到
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    //获取到路径下排在当前节点前面的节点,watch此节点,防止大规模的watch
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
                                //获取不到,等待指定时间,等待watcher叫醒
                                wait(millisToWait);
                            }
                            else
                            {
                                //获取不到锁,等待watcher叫醒
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }


    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
        int             ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        //判断是否是第一个节点
        boolean         getsTheLock = ourIndex < maxLeases;
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

 

 zookeeper四种模式的节点:

 

    /**
     * The znode will not be automatically deleted upon client's disconnect.
     */
    PERSISTENT (0, false, false),
    /**
    * The znode will not be automatically deleted upon client's disconnect,
    * and its name will be appended with a monotonically increasing number.
    */
    PERSISTENT_SEQUENTIAL (2, false, true),
    /**
     * The znode will be deleted upon the client's disconnect.
     */
    EPHEMERAL (1, true, false),
    /**
     * The znode will be deleted upon the client's disconnect, and its name
     * will be appended with a monotonically increasing number.
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);

 curator分布式锁实现创建的是EPHEMERAL_SEQUENTIAL,使用时不用担心服务停掉,锁无法释放的问题

 

 

 

分享到:
评论

相关推荐

    基于zookeeper的分布式锁实现demo

    1. **顺序一致性:** Zookeeper中的节点被创建顺序是全局唯一的,这有助于实现锁的唯一性。 2. **原子性:** 创建和删除节点的操作在Zookeeper中都是原子性的,这保证了分布式锁操作的原子性。 3. **排他性:** 一个...

    zk使用curator实现分布式锁

    在实际应用中,我们可以通过以下代码示例来体验如何使用Curator实现ZooKeeper分布式锁: ```java import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks....

    一文彻底理解ZooKeeper分布式锁的实现原理

    《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...

    zookeeper做分布式锁

    在实际项目中,可以使用Java的ZooKeeper客户端库(如Curator)来简化分布式锁的实现。这些库提供了高级API,帮助开发者更方便地处理ZooKeeper的操作,例如创建、删除节点,以及设置节点监视器等。 总之,ZooKeeper...

    springboot zookeeper 分布式锁

    总结来说,SpringBoot集成Zookeeper实现分布式锁,通过Zookeeper的临时顺序节点特性,能有效地解决分布式环境下的并发控制问题,避免羊群效应,提高系统的稳定性和效率。在实际应用中,开发者可以根据具体需求调整和...

    浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    在分布式锁的实现中,CuratorFramework可以用于创建、连接zookeeper的客户端。 分布式锁的实现 在示例代码中,首先创建了一个CuratorFramework的客户端,用于连接zookeeper。然后,通过CuratorFramework的API创建...

    zookeeper 使用 Curator 示例监听、分布式锁

    本示例将详细介绍如何利用 Curator 在 ZooKeeper 上进行数据操作以及实现分布式锁。 一、ZooKeeper 与 Curator 的基本概念 1. ZooKeeper:ZooKeeper 是一款分布式协调服务,它为分布式应用提供一致性服务,如命名...

    使用zookeeper实现分布式共享锁

    这种特性使得Zookeeper可以检测到节点的存活状态,进而实现锁的自动释放。其次,顺序节点在创建时会自动附加一个递增的序列号,这有助于我们解决分布式环境中的公平性问题。 实现分布式共享锁的基本步骤如下: 1. ...

    分布式锁curator

    分布式锁Curator是Java开发中的一个关键工具,用于在分布式环境中实现锁的机制。Curator是Apache ZooKeeper的一个客户端库,提供了丰富的高级API和工具,简化了ZooKeeper的使用,同时也包括了分布式锁的实现。...

    java分布式锁实现代码

    3. 使用Curator的InterProcessMutex实现锁: ```java InterProcessMutex lock = new InterProcessMutex(client, "/path/to/lock"); try (LockHandle handle = lock.acquire()) { // 执行关键操作 } catch ...

    curator zookeeper 3.4.6 2.9.1

    Curator的亮点在于其丰富的客户端工具集,包括如ExponentialBackoffRetry重试策略、LedgerAllocator数据存储管理、Recipe库(如分布式锁、队列、命名空间等)等。这个版本的Curator确保了与Zookeeper 3.4.6的兼容性...

    Zookeeper 分布式过程.pdf

    本书的第二部分还介绍了C语言客户端的使用以及Curator框架,它是对ZooKeeper API的一个高级封装库,简化了客户端的开发工作。 在第三部分中,“第9章ZooKeeper内部原理”和“第10章运行ZooKeeper”深入探讨了...

    基于Zookeeper实现分布式锁实践教程

    本实践教程将指导你如何利用Zookeeper实现分布式锁,以便在分布式环境中保证资源访问的互斥性。 **1. Zookeeper概述** Zookeeper是一个分布式协调服务,它为分布式应用提供了简单而强大的命名服务、配置管理、集群...

    zookeeper经典应用场景

    ZooKeeper经典应用场景 ZooKeeper是一个高可用的... ZooKeeper分布式锁的实现可以基于临时znode和Curator客户端中的各种官方实现的分布式锁,而ZooKeeper服务注册中心可以提供高可用性、强一致性和实时性等特性。

    ZooKeeper 实现分布式锁的方法示例

    添加 Lock 方法,可以使用 ZooKeeper 分布式锁来实现锁的获取和释放操作。 ZooKeeper 分布式锁的优点 ZooKeeper 分布式锁的优点是可以自动释放锁,避免了死锁的出现,同时也可以解决分布式系统中的锁问题,使得...

    zk:redis分布式锁.zip

    分布式锁是一种在分布式系统中实现锁机制的技术,用于在多节点之间协调访问共享资源,确保在高并发环境下数据的一致性和完整性。本压缩包“zk:redis分布式锁.zip”提供了基于Zookeeper(zk)和Redis两种分布式锁实现...

    Zookeeper分布式系统开发实战[借鉴].pdf

    Zookeeper分布式系统开发实战 Zookeeper是一款开源的分布式协调服务,最初由雅虎研究院开发,后来 BecameApache软件基金会的开源项目。Zookeeper的主要作用是提供一种高效、可靠、可扩展的分布式系统协调机制,用于...

    四个Java常见分布式锁的选型和性能对比.rar

    1. Zookeeper分布式锁: Zookeeper是一个分布式协调服务,提供了丰富的数据一致性操作,如创建临时节点实现分布式锁。在Java中,通常使用ZKClient或Curator库来操作Zookeeper。Zookeeper的分布式锁特点是强一致性和...

    4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁)

    使用Curator可以极大地简化与Zookeeper交互的代码,例如实现分布式锁只需要几行代码。此外,Curator还提供了各种场景的“Recipe”,如分布式锁服务、选举机制和分布式计数器等,这些预定义的解决方案使开发者能更...

Global site tag (gtag.js) - Google Analytics