`
1028826685
  • 浏览: 939062 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

基于zookeeper的分布式锁

 
阅读更多

一。为何使用分布式锁?
当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。

二。对分布式锁的要求
1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入

三。方案1,基于zookeeper的分布式锁

 

 

/**
* DistributedLockUtil.java
* 分布式锁工厂类,所有分布式请求都由该工厂类负责
**/
public class DistributedLockUtil {
	private static Object schemeLock = new Object();
	private static Object mutexLock = new Object();
	private static Map<String,Object> mutexLockMap = new ConcurrentHashMap();	
	private String schema;
	private Map<String,DistributedReentrantLock> cache = new ConcurrentHashMap<String,DistributedReentrantLock>();
	
	private static Map<String,DistributedLockUtil> instances = new ConcurrentHashMap();
	public static DistributedLockUtil getInstance(String schema){
		DistributedLockUtil u = instances.get(schema);
		if(u==null){
			synchronized(schemeLock){
				u = instances.get(schema);
				if(u == null){
					u = new DistributedLockUtil(schema);
					instances.put(schema, u);
				}
			}
		}
		return u;
	}
	
	private DistributedLockUtil(String schema){
		this.schema = schema;
	}
	
	private Object getMutex(String key){
		Object mx = mutexLockMap.get(key);
		if(mx == null){
			synchronized(mutexLock){
				mx = mutexLockMap.get(key);
				if(mx==null){
					mx = new Object();
					mutexLockMap.put(key,mx);
				}
			}
		}
		return mx;
	}
	
	private DistributedReentrantLock getLock(String key){
		DistributedReentrantLock lock = cache.get(key);
		if(lock == null){
			synchronized(getMutex(key)){
				lock = cache.get(key);
				if(lock == null){
					lock = new DistributedReentrantLock(key,schema);
					cache.put(key, lock);
				}
			}
		}
		return lock;
	}
	
	public void reset(){
		for(String s : cache.keySet()){
			getLock(s).unlock();
		}
	}
	
	/**
	 * 尝试加锁
	 * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁
	 * 
	 * @param key
	 * @return
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	public LockStat lock(String key) throws InterruptedException, KeeperException{
		if(getLock(key).isOwner()){
			return LockStat.NONEED;
		}
		getLock(key).lock();
		return LockStat.SUCCESS;
	}
	
	public void clearLock(String key) throws InterruptedException, KeeperException{
		synchronized(getMutex(key)){
			DistributedReentrantLock l = cache.get(key);
			l.clear();
			cache.remove(key);
		}
	}	
	
	public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{
		unlock(key,stat,false);
	}

	public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{
		if(stat == null) return;
		if(LockStat.SUCCESS.equals(stat)){
			DistributedReentrantLock lock = getLock(key);
			boolean hasWaiter = lock.unlock();
			if(!hasWaiter && !keepalive){
				synchronized(getMutex(key)){
					lock.clear();
					cache.remove(key);
				}
			}
		}
	}
	
	public static enum LockStat{
		NONEED,
		SUCCESS
	}
}

/**
*DistributedReentrantLock.java
*本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销
*/
public class DistributedReentrantLock {
	private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class);
 private ReentrantLock reentrantLock = new ReentrantLock();

 private WriteLock writeLock;
 private long timeout = 3*60*1000;
 
 private final Object mutex = new Object();
 private String dir;
 private String schema;
 
 private final ExitListener exitListener = new ExitListener(){
		@Override
		public void execute() {
			initWriteLock();
		}
	};
	
	private synchronized void initWriteLock(){
		logger.debug("初始化writeLock");
		writeLock = new WriteLock(dir,new LockListener(){

			@Override
			public void lockAcquired() {
				synchronized(mutex){
					mutex.notify();
				}
			}
			@Override
			public void lockReleased() {
			}
 		
 	},schema);
		
		if(writeLock != null && writeLock.zk != null){
			writeLock.zk.addExitListener(exitListener);
		}
		
		synchronized(mutex){
			mutex.notify();
		}
	}
	
 public DistributedReentrantLock(String dir,String schema) {	
 	this.dir = dir;
 	this.schema = schema;
 	initWriteLock();
 }

 public void lock(long timeout) throws InterruptedException, KeeperException {
 reentrantLock.lock();//多线程竞争时,先拿到第一层锁
 try{
 	boolean res = writeLock.trylock();
 	if(!res){
	 	synchronized(mutex){
					mutex.wait(timeout);
				}
	 	if(writeLock == null || !writeLock.isOwner()){
	 		throw new InterruptedException("锁超时");
	 	}
 	}
 }catch(InterruptedException e){
 	reentrantLock.unlock();
 	throw e;
 }catch(KeeperException e){
 	reentrantLock.unlock();
 	throw e;
 }
 }
 
 public void lock() throws InterruptedException, KeeperException {
 	lock(timeout);
 }

 public void destroy() throws KeeperException {
 	writeLock.unlock();
 }
 

 public boolean unlock(){
 	if(!isOwner()) return false;
 try{
 	writeLock.unlock();
 	reentrantLock.unlock();//多线程竞争时,释放最外层锁
 }catch(RuntimeException e){
 	reentrantLock.unlock();//多线程竞争时,释放最外层锁
 	throw e;
 }
 
 return reentrantLock.hasQueuedThreads();
 }



 public boolean isOwner() {
 return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();
 }

	public void clear() {
		writeLock.clear();
	}

}

/**
*WriteLock.java
*基于zk的锁实现
*一个最简单的场景如下:
*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1
*2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2
*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁
*4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件
*5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)
*6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。
*/
public class WriteLock extends ZkPrimative {
 private static final Logger LOG = Logger.getLogger(WriteLock.class);

 private final String dir;
 private String id;
 private LockNode idName;
 private String ownerId;
 private String lastChildId;
 private byte[] data = {0x12, 0x34};
 private LockListener callback;
 
 public WriteLock(String dir,String schema) {
 super(schema,true);
 this.dir = dir;
 }
 
 public WriteLock(String dir,LockListener callback,String schema) {
 	this(dir,schema);
 <a href="http://www.nbso.ca/">nbso online casino reviews</a>  this.callback = callback;
 }

 public LockListener getLockListener() {
 return this.callback;
 }
 
 public void setLockListener(LockListener callback) {
 this.callback = callback;
 }

 public synchronized void unlock() throws RuntimeException {
 	if(zk == null || zk.isClosed()){
 		return;
 	}
 if (id != null) {
 try {
 	 zk.delete(id, -1); 
 } catch (InterruptedException e) {
 LOG.warn("Caught: " e, e);
 //set that we have been interrupted.
 Thread.currentThread().interrupt();
 } catch (KeeperException.NoNodeException e) {
 // do nothing
 } catch (KeeperException e) {
 LOG.warn("Caught: " e, e);
 throw (RuntimeException) new RuntimeException(e.getMessage()).
 initCause(e);
 }finally {
 if (callback != null) {
 callback.lockReleased();
 }
 id = null;
 }
 }
 }
 
 private class LockWatcher implements Watcher {
 public void process(WatchedEvent event) {
 LOG.debug("Watcher fired on path: " event.getPath() " state: " 
 event.getState() " type " event.getType());
 try {
 trylock();
 } catch (Exception e) {
 LOG.warn("Failed to acquire lock: " e, e);
 }
 }
 }
 
 private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) 
 throws KeeperException, InterruptedException {
 List<String> names = zookeeper.getChildren(dir, false);
 for (String name : names) {
 if (name.startsWith(prefix)) {
 id = dir "/" name;
 if (LOG.isDebugEnabled()) {
 LOG.debug("Found id created last time: " id);
 }
 break;
 }
 }
 if (id == null) {
 id = zookeeper.create(dir "/" prefix, data, 
 acl, EPHEMERAL_SEQUENTIAL);

 if (LOG.isDebugEnabled()) {
 LOG.debug("Created id: " id);
 }
 }

 }

	public void clear() {
		if(zk == null || zk.isClosed()){
 		return;
 	}
		try {
			zk.delete(dir, -1);
		} catch (Exception e) {
			 LOG.error("clear error: " e,e);
		} 
	}
	
 public synchronized boolean trylock() throws KeeperException, InterruptedException {
 	if(zk == null){
 		LOG.info("zk 是空");
 		return false;
 	}
 if (zk.isClosed()) {
 	LOG.info("zk 已经关闭");
 return false;
 }
 ensurePathExists(dir);
 
 LOG.debug("id:" id);
 do {
 if (id == null) {
 long sessionId = zk.getSessionId();
 String prefix = "x-" sessionId "-";
 idName = new LockNode(id);
 LOG.debug("idName:" idName);
 }
 if (id != null) {
 List<String> names = zk.getChildren(dir, false);
 if (names.isEmpty()) {
 LOG.warn("No children in: " dir " when we've just " 
 "created one! Lets recreate it...");
 id = null;
 } else {
 SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
 for (String name : names) {
 sortedNames.add(new LockNode(dir "/" name));
 }
 ownerId = sortedNames.first().getName();
 LOG.debug("all:" sortedNames);
 SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
 LOG.debug("less than me:" lessThanMe);
 if (!lessThanMe.isEmpty()) {
 	LockNode lastChildName = lessThanMe.last();
 lastChildId = lastChildName.getName();
 if (LOG.isDebugEnabled()) {
 LOG.debug("watching less than me node: " lastChildId);
 }
 Stat stat = zk.exists(lastChildId, new LockWatcher());
 if (stat != null) {
 return Boolean.FALSE;
 } else {
 LOG.warn("Could not find the" 
 		" stats for less than me: " lastChildName.getName());
 }
 } else {
 if (isOwner()) {
 if (callback != null) {
 callback.lockAcquired();
 }
 return Boolean.TRUE;
 }
 }
 }
 }
 }
 while (id == null);
 return Boolean.FALSE;
 }

 public String getDir() {
 return dir;
 }

 public boolean isOwner() {
 return id != null && ownerId != null && id.equals(ownerId);
 }

 public String getId() {
 return this.id;
 }
}


使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般;
分享到:
评论

相关推荐

    C#基于zookeeper分布式锁的实现源码

    总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...

    基于zookeeper的分布式锁简单实现

    实现基于Zookeeper的分布式锁,主要涉及以下核心概念: 1. **节点创建**:在Zookeeper中,每个锁可以表示为一个临时节点。当客户端需要获取锁时,会在特定路径下创建一个临时节点。由于Zookeeper的顺序性,创建的...

    基于zookeeper的分布式锁实现demo

    **Zookeeper分布式锁的关键特性包括:** 1. **顺序一致性:** Zookeeper中的节点被创建顺序是全局唯一的,这有助于实现锁的唯一性。 2. **原子性:** 创建和删除节点的操作在Zookeeper中都是原子性的,这保证了...

    基于zookeeper实现分布式锁

    zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是集群的管理者。提供了文件系统和通知机制。...在开发项目的过程中,很多大型项目都是分布式部署的,那么我们现在使用zookeeper实现一个分布式锁。

    zookeeper做分布式锁

    基于一致性模型,ZooKeeper确保了数据的一致性和有序性,使其成为构建分布式锁的理想选择。 **2. 分布式锁原理** 分布式锁的目的是在多个节点之间实现对共享资源的互斥访问。在ZooKeeper中,通常通过创建临时顺序...

    从Paxos到Zookeeper分布式一致性原理与实践PDF

    《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,其中重点讲解了Paxos算法与Zookeeper在实际应用中的理论与实践。Paxos是分布式计算领域中著名的共识算法,为解决分布式...

    zookeeper分布式锁实例源码

    ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践

    2. 分布式锁:利用ZNODE的创建、删除操作,实现跨节点的互斥访问控制。 3. 配置管理:集中式存储和分发系统的配置信息,保证所有节点访问的配置是一致的。 4. 命名服务:为分布式组件提供全局唯一的ID或名称,简化...

    ZooKeeper分布式过程协同技术详解_new.pdf

    《ZooKeeper分布式过程协同技术详解》是一本深入解析ZooKeeper核心技术的书籍,适合所有对分布式系统和ZooKeeper有研究兴趣的读者。ZooKeeper是Apache软件基金会的一个开源项目,它为分布式应用程序提供了一个高效、...

    从Paxos到Zookeeper分布式一致性原理与实践 + ZooKeeper-分布式过程协同技术详解 pdf

    ZooKeeper基于Paxos等一致性算法,实现了简单易用的接口,使得开发者无需关心底层复杂的共识机制,就能轻松实现诸如命名服务、配置管理、集群同步、分布式锁等功能。ZooKeeper的数据模型是一个层次化的命名空间,...

    从Paxos到Zookeeper分布式一致性原理与实践包括源码

    Zookeeper基于Paxos和其他一致性算法的实现,为分布式应用程序提供了命名服务、配置管理、分布式锁、群组服务等功能。Zookeeper通过ZNode(类似于文件系统的节点)来存储和操作数据,并采用观察者模式来实时监控数据...

    一文彻底理解ZooKeeper分布式锁的实现原理

    《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...

    zk分布式锁1

    ZooKeeper分布式锁的实现基于ZooKeeper的节点机制。ZooKeeper的节点可以分为四种类型:持久节点、临时节点、顺序节点和ephemeral节点。其中,临时节点和ephemeral节点可以被用来实现分布式锁。 临时节点实现分布式...

    springboot zookeeper 分布式锁

    以下是一个简单的基于Zookeeper的分布式锁实现示例: ```java @Service public class ZookeeperDistributedLock { @Autowired private CuratorFramework curatorFramework; private String lockPath = "/...

    浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    Java(SpringBoot)基于zookeeper的分布式锁实现 本文主要介绍了Java(SpringBoot)基于zookeeper的分布式锁实现,通过示例代码详细介绍了分布式锁的实现过程,对大家的学习或者工作具有一定的参考学习价值。 分布式锁...

    吊打面试官之基于zookeeper实现分布式锁源码

    总结,基于ZooKeeper实现的分布式锁方案,充分利用了ZooKeeper的特性,提供了高可用、高性能的锁服务。在SpringCloud框架下,可以轻松地整合ZooKeeper,简化分布式系统的开发。不过,在实际应用中,还需要根据业务...

    redis和zookeeper实现分布式锁的区别

    **Zookeeper分布式锁** Zookeeper是一个高可用的分布式协调服务,它维护了一致的命名空间和分布式事件通知。Zookeeper中的分布式锁通常通过创建临时节点(ephemeral nodes)来实现。当客户端创建一个临时节点并持有...

    ZookeeperNet实现分布式锁

    通过深入理解Zookeeper的工作原理以及ZookeeperNet库的使用,开发者可以有效地在C#环境中实现高可用的分布式锁,保障多节点之间的协同工作和数据一致性。在实际项目中,分布式锁可以广泛应用于数据库操作、并发任务...

    使用zookeeper实现分布式共享锁

    Zookeeper的分布式锁主要基于两种数据结构:临时节点(ephemeral nodes)和顺序节点(sequential nodes)。 首先,临时节点是当客户端断开连接时会被自动删除的节点。这种特性使得Zookeeper可以检测到节点的存活...

Global site tag (gtag.js) - Google Analytics