上一篇写了zk分布式锁的使用,现在我们来看一下Curator是怎么实现分布式锁的。
简单的来说:
(1)各个线程在当前path下生成顺序节点;
(2)序号为0的节点成功拿到锁;
(3)没有拿到锁的节点会增加一个对上一个节点的Watch,并阻塞;
(4)当第一个节点删除时,下一个节点被唤醒,重新去拿锁。(或者阻塞一定时间后删除自身节点,返回获取锁失败)
首先来看一下获取锁的代码:
lock.acquire();//会一直阻塞到获得锁成功 或者 boolean locked= lock.acquire(2000, TimeUnit.MILLISECONDS);
1.点进去可以看到,调用的方法是InterProcessMutex.internalLock()
private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); //先看当前线程是否已经获得过锁 LockData lockData = threadData.get(currentThread); if ( lockData != null ) { //有则直接重入 lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
2.再看一下internals.attemptLock(time, unit, getLockNodeBytes())
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { //关键看这两句 //先成当前的path; ourPath = driver.createsTheLock(client, path, localLockNodeBytes); //再判断是否获取到了锁 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
3.再看 driver.createsTheLock(client, path, localLockNodeBytes)
@Override public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath; if ( lockNodeBytes != null ) { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); } else { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } return ourPath; }
可以看到,只是在path下面生成临时的顺序节点。
4.判断是否获得到了锁:hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { //获取到排序之后的子节点 List<String> children = getSortedChildren(); //截取出当前顺序节点的名称 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //生成判断的结果 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { //成功获得了锁 haveTheLock = true; } else { //获取失败时,Watch前一个节点 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak //注册Watch client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } //阻塞,等待watch事件触发后的通知 wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; }
5. PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
@Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); boolean getsTheLock = ourIndex < maxLeases;//从前面可以看到maxLeases的值是1,所以只有当前节点是第一个顺序节点时,才能成功获得锁,其他都会失败。 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); //如果获取锁失败了,要继续等待,那必须要对前面的节点进行watch, return new PredicateResults(pathToWatch, getsTheLock); }
6.被watch的节点发生改变时,会唤醒阻塞的线程去重新竞争锁;
private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { notifyFromWatcher(); } }; private synchronized void notifyFromWatcher() { notifyAll(); }
相关推荐
《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,其中重点讲解了Paxos算法与Zookeeper在实际应用中的理论与实践。Paxos是分布式计算领域中著名的共识算法,为解决分布式...
**Zookeeper的分布式锁实现原理** 1. **节点创建与监视**: Zookeeper允许客户端创建临时节点,这些节点会在客户端断开连接时自动删除。分布式锁的实现通常会为每个请求创建一个临时顺序节点,按照创建的顺序形成一...
分布式锁原理** 分布式锁的目的是在多个节点之间实现对共享资源的互斥访问。在ZooKeeper中,通常通过创建临时顺序节点(ephemeral sequential nodes)来实现这一目标。每个客户端在获取锁时都会创建这样的节点,...
《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...
Zookeeper分布式锁的工作原理: 1. **会话和临时节点**:Zookeeper支持两种类型的节点,持久节点和临时节点。临时节点在客户端会话失效(例如,客户端崩溃或网络断开)时会被自动删除,这为实现分布式锁提供了一个...
总的来说,《ZooKeeper分布式过程协同技术详解》是一本全面覆盖ZooKeeper理论和实践的著作,它不仅适合初学者了解ZooKeeper的基本原理,也对有一定经验的开发者提供了深入的技术细节和实战指导。通过阅读这本书,...
5. 实战应用:通过具体案例展示如何在实际项目中运用Zookeeper解决分布式协调问题,例如配置管理、服务发现、分布式锁等。 6. 高可用与故障恢复:探讨Zookeeper的容错机制,如何保证服务的高可用性和数据的一致性。 ...
《PAXOS到ZOOKEEPER分布式一致性原理与实践》是一部深入探讨分布式一致性问题的著作,其中涵盖了从经典的PAXOS算法到广泛应用的ZooKeeper系统的关键知识点。这本书旨在帮助读者理解并掌握如何在分布式环境中实现强...
本视频资料深入浅出地讲解了分布式锁的原理、实现方式以及其在实际应用中的场景。 首先,我们来了解分布式锁的基本原理。分布式锁的目的是解决在分布式环境下,多节点并发访问同一资源时可能出现的数据不一致问题。...
4. 分布式锁:利用Zookeeper可以实现分布式锁,通过创建临时节点,监听节点变化,确保在任何时刻只有一个客户端获得锁。 5. 集群管理:Zookeeper可以作为集群中的仲裁者,用于选举主节点,监控节点状态,实现服务...
通过深入理解Zookeeper的工作原理以及ZookeeperNet库的使用,开发者可以有效地在C#环境中实现高可用的分布式锁,保障多节点之间的协同工作和数据一致性。在实际项目中,分布式锁可以广泛应用于数据库操作、并发任务...
《从Paxos到Zookeeper:分布式一致性原理与实践》这本书深入探讨了分布式系统中的一致性问题,作者倪超以其丰富的经验,为我们揭示了如何在大规模分布式环境中实现可靠的数据同步和协调。这本书的核心主题围绕着...
《从PAXOS到ZOOKEEPER分布式一致性原理与实践.pdf》这份文档,很可能会深入解析PAXOS算法的原理,包括其基本模型、消息交换过程以及各种变种,比如Multi-PAXOS。同时,它会详细介绍ZOOKEEPER的架构设计、API使用以及...
**一、Zookeeper的分布式锁原理** 1. **节点创建与删除**:在Zookeeper中,锁的实现通常依赖于临时节点(ephemeral nodes)。客户端创建一个临时节点表示获取锁,当客户端断开连接时,Zookeeper会自动删除该节点,...
- **分布式锁**:通过特定的ZNode结构,实现共享锁服务。 - **队列服务**:FIFO(先进先出)的队列可以通过ZNode的顺序创建实现。 3. **ZooKeeper的架构** - **客户端-服务器模型**:每个客户端连接到一个或多个...
Zookeeper,是由Apache开发的开源分布式协调服务,它基于Paxos等一致性算法实现,并提供了更高级别的API,使得开发者能够更容易地实现分布式锁、配置管理、命名服务等功能。Zookeeper的主要特性包括: 1. **原子性*...
4. **zkLockTest**:这个文件很可能是实现Zookeeper分布式锁的测试代码,可能包含客户端的连接、锁的获取和释放、异常处理等逻辑。通过对这个测试代码的分析和运行,我们可以深入理解Zookeeper分布式锁的工作机制。 ...
在核心功能部分,书中会详细讲解ZooKeeper的原子操作(如创建、删除、设置数据等)以及如何利用这些操作构建常见的分布式服务,例如分布式锁、队列、主选举等。这些章节将通过实例分析,使读者能够直观地理解...
### 分布式锁原理介绍 #### 一、分布式锁概览 **分布式锁**是一种用于在分布式系统中控制多个节点对共享资源进行访问的技术。它主要用于解决多节点间并发访问同一资源时产生的竞争问题,确保资源的一致性和完整性...
分布式锁的原理** 分布式锁的核心是确保在同一时间只有一个客户端能持有锁。在ZooKeeper中,这通常通过创建临时节点来实现。当一个客户端请求锁时,它会在特定的ZNode下创建一个临时节点。如果有多个客户端同时请求...