/** * DistributedLockUtil.java * 分布式锁工厂类,所有分布式请求都由该工厂类负责 **/ public class DistributedLockUtil { private static Object schemeLock = new Object(); private static Object mutexLock = new Object(); private static Map<String,Object> mutexLockMap = new ConcurrentHashMap(); private String schema; private Map<String,DistributedReentrantLock> cache = new ConcurrentHashMap<String,DistributedReentrantLock>(); private static Map<String,DistributedLockUtil> instances = new ConcurrentHashMap(); public static DistributedLockUtil getInstance(String schema){ DistributedLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ u = new DistributedLockUtil(schema); instances.put(schema, u); } } } return u; } private DistributedLockUtil(String schema){ this.schema = schema; } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private DistributedReentrantLock getLock(String key){ DistributedReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new DistributedReentrantLock(key,schema); cache.put(key, lock); } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s).unlock(); } } /** * 尝试加锁 * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁 * * @param key * @return * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) throws InterruptedException, KeeperException{ if(getLock(key).isOwner()){ return LockStat.NONEED; } getLock(key).lock(); return LockStat.SUCCESS; } public void clearLock(String key) throws InterruptedException, KeeperException{ synchronized(getMutex(key)){ DistributedReentrantLock l = cache.get(key); l.clear(); cache.remove(key); } } public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{ unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ DistributedReentrantLock lock = getLock(key); boolean hasWaiter = lock.unlock(); if(!hasWaiter && !keepalive){ synchronized(getMutex(key)){ lock.clear(); cache.remove(key); } } } } public static enum LockStat{ NONEED, SUCCESS } }
/** *DistributedReentrantLock.java *本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销 */ public class DistributedReentrantLock { private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class); private ReentrantLock reentrantLock = new ReentrantLock(); private WriteLock writeLock; private long timeout = 3*60*1000; private final Object mutex = new Object(); private String dir; private String schema; private final ExitListener exitListener = new ExitListener(){ @Override public void execute() { initWriteLock(); } }; private synchronized void initWriteLock(){ logger.debug("初始化writeLock"); writeLock = new WriteLock(dir,new LockListener(){ @Override public void lockAcquired() { synchronized(mutex){ mutex.notify(); } } @Override public void lockReleased() { } },schema); if(writeLock != null && writeLock.zk != null){ writeLock.zk.addExitListener(exitListener); } synchronized(mutex){ mutex.notify(); } } public DistributedReentrantLock(String dir,String schema) { this.dir = dir; this.schema = schema; initWriteLock(); } public void lock(long timeout) throws InterruptedException, KeeperException { reentrantLock.lock();//多线程竞争时,先拿到第一层锁 try{ boolean res = writeLock.trylock(); if(!res){ synchronized(mutex){ mutex.wait(timeout); } if(writeLock == null || !writeLock.isOwner()){ throw new InterruptedException("锁超时"); } } }catch(InterruptedException e){ reentrantLock.unlock(); throw e; }catch(KeeperException e){ reentrantLock.unlock(); throw e; } } public void lock() throws InterruptedException, KeeperException { lock(timeout); } public void destroy() throws KeeperException { writeLock.unlock(); } public boolean unlock(){ if(!isOwner()) return false; try{ writeLock.unlock(); reentrantLock.unlock();//多线程竞争时,释放最外层锁 }catch(RuntimeException e){ reentrantLock.unlock();//多线程竞争时,释放最外层锁 throw e; } return reentrantLock.hasQueuedThreads(); } public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner(); } public void clear() { writeLock.clear(); } }
public class WriteLock extends ZkPrimative {
private static final Logger LOG = Logger.getLogger(WriteLock.class);
private final String dir;
private String id;
private LockNode idName;
private String ownerId;
private String lastChildId;
private byte[] data = {0x12, 0x34};
private LockListener callback;
public WriteLock(String dir,String schema) {
this.dir = dir;
public WriteLock(String dir,LockListener callback,String schema) {
<a href="http://www.nbso.ca/">nbso online casino reviews</a> this.callback = callback;
public LockListener getLockListener() {
return this.callback;
public void setLockListener(LockListener callback) {
this.callback = callback;
public synchronized void unlock() throws RuntimeException {
if(zk == null || zk.isClosed()){
if (id != null) {
try {
zk.delete(id, -1);
} catch (InterruptedException e) {
LOG.warn("Caught: " e, e);
//set that we have been interrupted.
} catch (KeeperException.NoNodeException e) {
// do nothing
} catch (KeeperException e) {
LOG.warn("Caught: " e, e);
throw (RuntimeException) new RuntimeException(e.getMessage()).
}finally {
if (callback != null) {
id = null;
private class LockWatcher implements Watcher {
public void process(WatchedEvent event) {
LOG.debug("Watcher fired on path: " event.getPath() " state: "
event.getState() " type " event.getType());
try {
} catch (Exception e) {
LOG.warn("Failed to acquire lock: " e, e);
private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
throws KeeperException, InterruptedException {
List<String> names = zookeeper.getChildren(dir, false);
for (String name : names) {
if (name.startsWith(prefix)) {
id = dir "/" name;
if (LOG.isDebugEnabled()) {
LOG.debug("Found id created last time: " id);
if (id == null) {
id = zookeeper.create(dir "/" prefix, data,
if (LOG.isDebugEnabled()) {
LOG.debug("Created id: " id);
public void clear() {
if(zk == null || zk.isClosed()){
try {
zk.delete(dir, -1);
} catch (Exception e) {
LOG.error("clear error: " e,e);
public synchronized boolean trylock() throws KeeperException, InterruptedException {
if(zk == null){
LOG.info("zk 是空");
return false;
if (zk.isClosed()) {
LOG.info("zk 已经关闭");
return false;
LOG.debug("id:" id);
do {
if (id == null) {
long sessionId = zk.getSessionId();
String prefix = "x-" sessionId "-";
idName = new LockNode(id);
LOG.debug("idName:" idName);
if (id != null) {
List<String> names = zk.getChildren(dir, false);
if (names.isEmpty()) {
LOG.warn("No children in: " dir " when we've just "
"created one! Lets recreate it...");
id = null;
} else {
SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
for (String name : names) {
sortedNames.add(new LockNode(dir "/" name));
ownerId = sortedNames.first().getName();
LOG.debug("all:" sortedNames);
SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
LOG.debug("less than me:" lessThanMe);
if (!lessThanMe.isEmpty()) {
LockNode lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
if (LOG.isDebugEnabled()) {
LOG.debug("watching less than me node: " lastChildId);
Stat stat = zk.exists(lastChildId, new LockWatcher());
if (stat != null) {
return Boolean.FALSE;
} else {
LOG.warn("Could not find the"
" stats for less than me: " lastChildName.getName());
} else {
if (isOwner()) {
if (callback != null) {
return Boolean.TRUE;
while (id == null);
return Boolean.FALSE;
public String getDir() {
return dir;
public boolean isOwner() {
return id != null && ownerId != null && id.equals(ownerId);
public String getId() {
return this.id;
实现基于Zookeeper的分布式锁,主要涉及以下核心概念: 1. **节点创建**:在Zookeeper中,每个锁可以表示为一个临时节点。当客户端需要获取锁时,会在特定路径下创建一个临时节点。由于Zookeeper的顺序性,创建的...
**Zookeeper分布式锁的关键特性包括:** 1. **顺序一致性:** Zookeeper中的节点被创建顺序是全局唯一的,这有助于实现锁的唯一性。 2. **原子性:** 创建和删除节点的操作在Zookeeper中都是原子性的,这保证了...
基于一致性模型,ZooKeeper确保了数据的一致性和有序性,使其成为构建分布式锁的理想选择。 **2. 分布式锁原理** 分布式锁的目的是在多个节点之间实现对共享资源的互斥访问。在ZooKeeper中,通常通过创建临时顺序...
ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...
2. 分布式锁:利用ZNODE的创建、删除操作,实现跨节点的互斥访问控制。 3. 配置管理:集中式存储和分发系统的配置信息,保证所有节点访问的配置是一致的。 4. 命名服务:为分布式组件提供全局唯一的ID或名称,简化...
《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...
ZooKeeper分布式锁的实现基于ZooKeeper的节点机制。ZooKeeper的节点可以分为四种类型:持久节点、临时节点、顺序节点和ephemeral节点。其中,临时节点和ephemeral节点可以被用来实现分布式锁。 临时节点实现分布式...
以下是一个简单的基于Zookeeper的分布式锁实现示例: ```java @Service public class ZookeeperDistributedLock { @Autowired private CuratorFramework curatorFramework; private String lockPath = "/...
Java(SpringBoot)基于zookeeper的分布式锁实现 本文主要介绍了Java(SpringBoot)基于zookeeper的分布式锁实现,通过示例代码详细介绍了分布式锁的实现过程,对大家的学习或者工作具有一定的参考学习价值。 分布式锁...
**Zookeeper分布式锁** Zookeeper是一个高可用的分布式协调服务,它维护了一致的命名空间和分布式事件通知。Zookeeper中的分布式锁通常通过创建临时节点(ephemeral nodes)来实现。当客户端创建一个临时节点并持有...
Zookeeper的分布式锁主要基于两种数据结构:临时节点(ephemeral nodes)和顺序节点(sequential nodes)。 首先,临时节点是当客户端断开连接时会被自动删除的节点。这种特性使得Zookeeper可以检测到节点的存活...