curator版本:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.5.0</version> </dependency>
代码实现
private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null )//当前线程已经获得锁(重入),对线程数据的锁+1 { // re-entering lockData.lockCount.incrementAndGet(); return true; } //尝试获取锁,获取锁成功返回zk路径 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
代码调用
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { //创建节点,模式:EPHEMERAL_SEQUENTIAL 直译:瞬态的-有序的,不会持久化,client与zk服务断掉链接的时候,此节点会自动删除,这样在服务停机时再重启时,不会出现锁无法释放 if ( localLockNodeBytes != null ) { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes); } else { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } //循环尝试获取锁 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
代码调用
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //验证当前线程持有的node是不是路径下最靠前的node,如果是,获取到锁,否则没有获取到 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { //获取到路径下排在当前节点前面的节点,watch此节点,防止大规模的watch String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } //获取不到,等待指定时间,等待watcher叫醒 wait(millisToWait); } else { //获取不到锁,等待watcher叫醒 wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; } @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); //判断是否是第一个节点 boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
zookeeper四种模式的节点:
/** * The znode will not be automatically deleted upon client's disconnect. */ PERSISTENT (0, false, false), /** * The znode will not be automatically deleted upon client's disconnect, * and its name will be appended with a monotonically increasing number. */ PERSISTENT_SEQUENTIAL (2, false, true), /** * The znode will be deleted upon the client's disconnect. */ EPHEMERAL (1, true, false), /** * The znode will be deleted upon the client's disconnect, and its name * will be appended with a monotonically increasing number. */ EPHEMERAL_SEQUENTIAL (3, true, true);
curator分布式锁实现创建的是EPHEMERAL_SEQUENTIAL,使用时不用担心服务停掉,锁无法释放的问题
相关推荐
1. **顺序一致性:** Zookeeper中的节点被创建顺序是全局唯一的,这有助于实现锁的唯一性。 2. **原子性:** 创建和删除节点的操作在Zookeeper中都是原子性的,这保证了分布式锁操作的原子性。 3. **排他性:** 一个...
在实际应用中,我们可以通过以下代码示例来体验如何使用Curator实现ZooKeeper分布式锁: ```java import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks....
《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...
在实际项目中,可以使用Java的ZooKeeper客户端库(如Curator)来简化分布式锁的实现。这些库提供了高级API,帮助开发者更方便地处理ZooKeeper的操作,例如创建、删除节点,以及设置节点监视器等。 总之,ZooKeeper...
总结来说,SpringBoot集成Zookeeper实现分布式锁,通过Zookeeper的临时顺序节点特性,能有效地解决分布式环境下的并发控制问题,避免羊群效应,提高系统的稳定性和效率。在实际应用中,开发者可以根据具体需求调整和...
在分布式锁的实现中,CuratorFramework可以用于创建、连接zookeeper的客户端。 分布式锁的实现 在示例代码中,首先创建了一个CuratorFramework的客户端,用于连接zookeeper。然后,通过CuratorFramework的API创建...
本示例将详细介绍如何利用 Curator 在 ZooKeeper 上进行数据操作以及实现分布式锁。 一、ZooKeeper 与 Curator 的基本概念 1. ZooKeeper:ZooKeeper 是一款分布式协调服务,它为分布式应用提供一致性服务,如命名...
这种特性使得Zookeeper可以检测到节点的存活状态,进而实现锁的自动释放。其次,顺序节点在创建时会自动附加一个递增的序列号,这有助于我们解决分布式环境中的公平性问题。 实现分布式共享锁的基本步骤如下: 1. ...
分布式锁Curator是Java开发中的一个关键工具,用于在分布式环境中实现锁的机制。Curator是Apache ZooKeeper的一个客户端库,提供了丰富的高级API和工具,简化了ZooKeeper的使用,同时也包括了分布式锁的实现。...
3. 使用Curator的InterProcessMutex实现锁: ```java InterProcessMutex lock = new InterProcessMutex(client, "/path/to/lock"); try (LockHandle handle = lock.acquire()) { // 执行关键操作 } catch ...
Curator的亮点在于其丰富的客户端工具集,包括如ExponentialBackoffRetry重试策略、LedgerAllocator数据存储管理、Recipe库(如分布式锁、队列、命名空间等)等。这个版本的Curator确保了与Zookeeper 3.4.6的兼容性...
本书的第二部分还介绍了C语言客户端的使用以及Curator框架,它是对ZooKeeper API的一个高级封装库,简化了客户端的开发工作。 在第三部分中,“第9章ZooKeeper内部原理”和“第10章运行ZooKeeper”深入探讨了...
本实践教程将指导你如何利用Zookeeper实现分布式锁,以便在分布式环境中保证资源访问的互斥性。 **1. Zookeeper概述** Zookeeper是一个分布式协调服务,它为分布式应用提供了简单而强大的命名服务、配置管理、集群...
ZooKeeper经典应用场景 ZooKeeper是一个高可用的... ZooKeeper分布式锁的实现可以基于临时znode和Curator客户端中的各种官方实现的分布式锁,而ZooKeeper服务注册中心可以提供高可用性、强一致性和实时性等特性。
添加 Lock 方法,可以使用 ZooKeeper 分布式锁来实现锁的获取和释放操作。 ZooKeeper 分布式锁的优点 ZooKeeper 分布式锁的优点是可以自动释放锁,避免了死锁的出现,同时也可以解决分布式系统中的锁问题,使得...
分布式锁是一种在分布式系统中实现锁机制的技术,用于在多节点之间协调访问共享资源,确保在高并发环境下数据的一致性和完整性。本压缩包“zk:redis分布式锁.zip”提供了基于Zookeeper(zk)和Redis两种分布式锁实现...
Zookeeper分布式系统开发实战 Zookeeper是一款开源的分布式协调服务,最初由雅虎研究院开发,后来 BecameApache软件基金会的开源项目。Zookeeper的主要作用是提供一种高效、可靠、可扩展的分布式系统协调机制,用于...
1. Zookeeper分布式锁: Zookeeper是一个分布式协调服务,提供了丰富的数据一致性操作,如创建临时节点实现分布式锁。在Java中,通常使用ZKClient或Curator库来操作Zookeeper。Zookeeper的分布式锁特点是强一致性和...
使用Curator可以极大地简化与Zookeeper交互的代码,例如实现分布式锁只需要几行代码。此外,Curator还提供了各种场景的“Recipe”,如分布式锁服务、选举机制和分布式计数器等,这些预定义的解决方案使开发者能更...