`
1028826685
  • 浏览: 938752 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

两种分布式锁实现方案一

 
阅读更多

一。为何使用分布式锁?
当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。

二。对分布式锁的要求
1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入

三。方案1,基于zookeeper的分布式锁

001 /**
002 * DistributedLockUtil.java
003 * 分布式锁工厂类,所有分布式请求都由该工厂类负责
004 **/
005 public class DistributedLockUtil {
006     private static Object schemeLock = new Object();
007     private static Object mutexLock = new Object();
008     private static Map<String,Object> mutexLockMap = new ConcurrentHashMap();
009     private String schema;
010     private Map<String,DistributedReentrantLock> cache = new ConcurrentHashMap<String,DistributedReentrantLock>();
011      
012     private static Map<String,DistributedLockUtil> instances = new ConcurrentHashMap();
013     public static DistributedLockUtil getInstance(String schema){
014         DistributedLockUtil u = instances.get(schema);
015         if(u==null){
016             synchronized(schemeLock){
017                 u = instances.get(schema);
018                 if(u == null){
019                     u = new DistributedLockUtil(schema);
020                     instances.put(schema, u);
021                 }
022             }
023         }
024         return u;
025     }
026      
027     private DistributedLockUtil(String schema){
028         this.schema = schema;
029     }
030      
031     private Object getMutex(String key){
032         Object mx = mutexLockMap.get(key);
033         if(mx == null){
034             synchronized(mutexLock){
035                 mx = mutexLockMap.get(key);
036                 if(mx==null){
037                     mx = new Object();
038                     mutexLockMap.put(key,mx);
039                 }
040             }
041         }
042         return mx;
043     }
044      
045     private DistributedReentrantLock getLock(String key){
046         DistributedReentrantLock lock = cache.get(key);
047         if(lock == null){
048             synchronized(getMutex(key)){
049                 lock = cache.get(key);
050                 if(lock == null){
051                     lock = new DistributedReentrantLock(key,schema);
052                     cache.put(key, lock);
053                 }
054             }
055         }
056         return lock;
057     }
058      
059     public void reset(){
060         for(String s : cache.keySet()){
061             getLock(s).unlock();
062         }
063     }
064      
065     /**
066      * 尝试加锁
067      * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁
068      *
069      * @param key
070      * @return
071      * @throws InterruptedException
072      * @throws KeeperException
073      */
074     public LockStat lock(String key) throws InterruptedException, KeeperException{
075         if(getLock(key).isOwner()){
076             return LockStat.NONEED;
077         }
078         getLock(key).lock();
079         return LockStat.SUCCESS;
080     }
081      
082     public void clearLock(String key) throws InterruptedException, KeeperException{
083         synchronized(getMutex(key)){
084             DistributedReentrantLock l = cache.get(key);
085             l.clear();
086             cache.remove(key);
087         }
088     }  
089      
090     public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{
091         unlock(key,stat,false);
092     }
093  
094     public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{
095         if(stat == nullreturn;
096         if(LockStat.SUCCESS.equals(stat)){
097             DistributedReentrantLock lock = getLock(key);
098             boolean hasWaiter = lock.unlock();
099             if(!hasWaiter && !keepalive){
100                 synchronized(getMutex(key)){
101                     lock.clear();
102                     cache.remove(key);
103                 }
104             }
105         }
106     }
107      
108     public static enum LockStat{
109         NONEED,
110         SUCCESS
111     }
112 }

 

001 /**
002 *DistributedReentrantLock.java
003 *本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销
004 */
005 public class DistributedReentrantLock {
006     private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class);
007  private ReentrantLock reentrantLock = new ReentrantLock();
008  
009  private WriteLock writeLock;
010  private long timeout = 3*60*1000;
011   
012  private final Object mutex = new Object();
013  private String dir;
014  private String schema;
015   
016  private final ExitListener exitListener = new ExitListener(){
017         @Override
018         public void execute() {
019             initWriteLock();
020         }
021     };
022      
023     private synchronized void initWriteLock(){
024         logger.debug("初始化writeLock");
025         writeLock = new WriteLock(dir,new LockListener(){
026  
027             @Override
028             public void lockAcquired() {
029                 synchronized(mutex){
030                     mutex.notify();
031                 }
032             }
033             @Override
034             public void lockReleased() {
035             }
036          
037     },schema);
038          
039         if(writeLock != null && writeLock.zk != null){
040             writeLock.zk.addExitListener(exitListener);
041         }
042          
043         synchronized(mutex){
044             mutex.notify();
045         }
046     }
047      
048  public DistributedReentrantLock(String dir,String schema) {   
049     this.dir = dir;
050     this.schema = schema;
051     initWriteLock();
052  }
053  
054  public void lock(long timeout) throws InterruptedException, KeeperException {
055  reentrantLock.lock();//多线程竞争时,先拿到第一层锁
056  try{
057     boolean res = writeLock.trylock();
058     if(!res){
059         synchronized(mutex){
060                     mutex.wait(timeout);
061                 }
062         if(writeLock == null || !writeLock.isOwner()){
063             throw new InterruptedException("锁超时");
064         }
065     }
066  }catch(InterruptedException e){
067     reentrantLock.unlock();
068     throw e;
069  }catch(KeeperException e){
070     reentrantLock.unlock();
071     throw e;
072  }
073  }
074   
075  public void lock() throws InterruptedException, KeeperException {
076     lock(timeout);
077  }
078  
079  public void destroy() throws KeeperException {
080     writeLock.unlock();
081  }
082   
083  
084  public boolean unlock(){
085     if(!isOwner()) return false;
086  try{
087     writeLock.unlock();
088     reentrantLock.unlock();//多线程竞争时,释放最外层锁
089  }catch(RuntimeException e){
090     reentrantLock.unlock();//多线程竞争时,释放最外层锁
091     throw e;
092  }
093   
094  return reentrantLock.hasQueuedThreads();
095  }
096  
097  
098  
099  public boolean isOwner() {
100  return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();
101  }
102  
103     public void clear() {
104         writeLock.clear();
105     }
106  
107 }

 

001 /**
002 *WriteLock.java
003 *基于zk的锁实现
004 *一个最简单的场景如下:
005 *1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1
006 *2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2
007 *3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁
008 *4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件
009 *5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)
010 *6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。
011 */
012 public class WriteLock extends ZkPrimative {
013  private static final Logger LOG = Logger.getLogger(WriteLock.class);
014  
015  private final String dir;
016  private String id;
017  private LockNode idName;
018  private String ownerId;
019  private String lastChildId;
020  private byte[] data = {0x120x34};
021  private LockListener callback;
022   
023  public WriteLock(String dir,String schema) {
024  super(schema,true);
025  this.dir = dir;
026  }
027   
028  public WriteLock(String dir,LockListener callback,String schema) {
029     this(dir,schema);
030  <a href="http://www.nbso.ca/">nbso online casino reviews</a>  this.callback = callback;
031  }
032  
033  public LockListener getLockListener() {
034  return this.callback;
035  }
036   
037  public void setLockListener(LockListener callback) {
038  this.callback = callback;
039  }
040  
041  public synchronized void unlock() throws RuntimeException {
042     if(zk == null || zk.isClosed()){
043         return;
044     }
045  if (id != null) {
046  try {
047      zk.delete(id, -1);
048  catch (InterruptedException e) {
049  LOG.warn("Caught: " e, e);
050  //set that we have been interrupted.
051  Thread.currentThread().interrupt();
052  catch (KeeperException.NoNodeException e) {
053  // do nothing
054  catch (KeeperException e) {
055  LOG.warn("Caught: " e, e);
056  throw (RuntimeException) new RuntimeException(e.getMessage()).
057  initCause(e);
058  }finally {
059  if (callback != null) {
060  callback.lockReleased();
061  }
062  id = null;
063  }
064  }
065  }
066   
067  private class LockWatcher implements Watcher {
068  public void process(WatchedEvent event) {
069  LOG.debug("Watcher fired on path: " event.getPath() " state: "
070  event.getState() " type " event.getType());
071  try {
072  trylock();
073  catch (Exception e) {
074  LOG.warn("Failed to acquire lock: " e, e);
075  }
076  }
077  }
078   
079  private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
080  throws KeeperException, InterruptedException {
081  List<String> names = zookeeper.getChildren(dir, false);
082  for (String name : names) {
083  if (name.startsWith(prefix)) {
084  id = dir "/" name;
085  if (LOG.isDebugEnabled()) {
086  LOG.debug("Found id created last time: " id);
087  }
088  break;
089  }
090  }
091  if (id == null) {
092  id = zookeeper.create(dir "/" prefix, data,
093  acl, EPHEMERAL_SEQUENTIAL);
094  
095  if (LOG.isDebugEnabled()) {
096  LOG.debug("Created id: " id);
097  }
098  }
099  
100  }
101  
102     public void clear() {
103         if(zk == null || zk.isClosed()){
104         return;
105     }
106         try {
107             zk.delete(dir, -1);
108         catch (Exception e) {
109              LOG.error("clear error: " e,e);
110         }
111     }
112      
113  public synchronized boolean trylock() throws KeeperException, InterruptedException {
114     if(zk == null){
115         LOG.info("zk 是空");
116         return false;
117     }
118  if (zk.isClosed()) {
119     LOG.info("zk 已经关闭");
120  return false;
121  }
122  ensurePathExists(dir);
123   
124  LOG.debug("id:" id);
125  do {
126  if (id == null) {
127  long sessionId = zk.getSessionId();
128  String prefix = "x-" sessionId "-";
129  idName = new LockNode(id);
130  LOG.debug("idName:" idName);
131  }
132  if (id != null) {
133  List<String> names = zk.getChildren(dir, false);
134  if (names.isEmpty()) {
135  LOG.warn("No children in: " dir " when we've just "
136  "created one! Lets recreate it...");
137  id = null;
138  else {
139  SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
140  for (String name : names) {
141  sortedNames.add(new LockNode(dir "/" name));
142  }
143  ownerId = sortedNames.first().getName();
144  LOG.debug("all:" sortedNames);
145  SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
146  LOG.debug("less than me:" lessThanMe);
147  if (!lessThanMe.isEmpty()) {
148     LockNode lastChildName = lessThanMe.last();
149  lastChildId = lastChildName.getName();
150  if (LOG.isDebugEnabled()) {
151  LOG.debug("watching less than me node: " lastChildId);
152  }
153  Stat stat = zk.exists(lastChildId, new LockWatcher());
154  if (stat != null) {
155  return Boolean.FALSE;
156  else {
157  LOG.warn("Could not find the"
158         " stats for less than me: " lastChildName.getName());
159  }
160  else {
161  if (isOwner()) {
162  if (callback != null) {
163  callback.lockAcquired();
164  }
165  return Boolean.TRUE;
166  }
167  }
168  }
169  }
170  }
171  while (id == null);
172  return Boolean.FALSE;
173  }
174  
175  public String getDir() {
176  return dir;
177  }
178  
179  public boolean isOwner() {
180  return id != null && ownerId != null && id.equals(ownerId);
181  }
182  
183  public String getId() {
184  return this.id;
185  }
186 }

 

使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般;

分享到:
评论

相关推荐

    42_分布式锁是啥?对比下redis和zk两种分布式锁的优劣?.zip

    Redis和Zookeeper作为两种常用的分布式锁实现方式,各有优缺点。在实际应用中,应根据业务需求和系统特性来选择适合的方案。同时,使用分布式锁时,必须考虑锁的可扩展性、公平性、安全性和容错性,确保在分布式环境...

    java分布式锁实现代码

    在分布式系统中,为了保证数据的一致性和安全性,分布式锁是一种常见的解决方案。本文将深入探讨如何使用Redisson和...提供的代码示例可以作为学习和参考的起点,帮助开发者更好地理解和应用这两种分布式锁的实现。

    redis分布式锁实现类

    redis分布式锁的工具类,采用的是Lua代码的方式,保证了Java执行方法的原子性。

    分布式锁,基于Netty长连接实现,自定义协议,高性能锁

    分布式锁是一种在分布式系统中实现同步访问同一资源的机制,特别是在多节点环境下,它能确保在任何时刻只有一个节点能够对共享资源进行操作。基于Netty实现的分布式锁,利用了Netty作为高效的异步事件驱动网络应用...

    基于 Redis 的分布式锁

    分布式锁是分布式系统中用于同步资源访问的一种机制,它能够保证在分布式部署的应用系统中,同一时刻只允许一个客户端对共享...在实际应用中,需要根据具体业务场景和需求,权衡利弊,选择最适合的分布式锁实现方案。

    分布式锁技术实现原理.docx

    当前使用较多的分布式锁方案主要基于 Redis 和 ZooKeeper 提供的功能特性加以封装来实现的。 Redis 实现的锁服务: 加锁流程: * SET resource_name my_random_value NX PX max-lock-time 解锁流程: * if ...

    003 redis分布式锁 jedis分布式锁 Redisson分布式锁 分段锁

    Redisson的分布式锁实现更加完善,支持可重入锁、公平锁、读写锁,还具有锁自动续期功能,避免了因网络延迟导致的锁丢失。使用Redisson创建分布式锁只需几行代码,通过`RLock`接口的`lock()`和`unlock()`方法即可...

    zookeeper分布式锁

    分布式锁是一种在分布式系统中实现同步的技术,它允许多个节点在同一时间访问共享资源。在大型分布式环境中,确保数据的一致性和正确性至关重要,这就是分布式锁发挥作用的地方。Zookeeper,一个由Apache开发的...

    超卖和分布式锁解决方案.docx

    Redlock算法是一种更为高级的分布式锁解决方案,它可以解决锁提前释放的问题。Redlock算法通过在多个Redis实例上尝试获取锁,并根据多数投票原则来决定是否真正获得锁,从而提高了锁的可靠性。此外,Redlock还提供了...

    分布式锁演进过程.doc

    分布式锁是分布式系统中一个非常重要的概念,通过不断的技术演进和完善,我们已经可以从最初的简单实现逐步过渡到更加成熟稳定的解决方案。无论是Redis还是ZooKeeper,它们都在各自的领域内发挥了重要作用,为企业级...

    lock4j高性能分布式锁 v2.2.5.zip

    lock4j作为一款高性能的分布式锁实现,自v2.2.5版本以来,凭借其优秀的性能和稳定性,受到了广大开发者的青睐。本文将深入探讨lock4j的核心特性和应用场景,以期帮助读者理解并掌握这款分布式锁的使用。 一、lock4j...

    多线程入门,分布式锁,等相关资料

    Java中常见的分布式锁实现有基于Zookeeper、Redis以及数据库的解决方案。例如,使用Redis的`setnx`命令可以创建一个唯一键来实现锁,或者通过Zookeeper的临时节点来模拟锁的逻辑。 在使用多线程和分布式锁时,需要...

    分布式锁的三种实现方式.pdf

    综上所述,根据实际应用场景的需求选择合适的分布式锁实现方案是非常重要的。例如,在对性能要求极高的场景下,可以选择基于缓存的实现;而在需要更高可靠性和一致性保障的场景下,则可以选择基于Zookeeper的实现。

    【批量下载】分布式锁等.zip

    在实际应用中,根据业务需求和系统架构,选择合适的分布式锁实现方案至关重要。例如,对于读多写少的场景,可以使用读写锁来提高读取性能;对于强一致性要求,可能需要使用更复杂的分布式锁策略,如两阶段锁或乐观锁...

    分布式环境下的解决方案-分布式锁.docx

    1. 存储空间:分布式锁需要一个所有参与节点都能访问的共享存储空间。常见的实现包括数据库(如行锁、乐观锁)、缓存系统(如Redis、Tair、Memcached、MongoDB)以及专门的分布式协调服务(如Zookeeper)。这些存储...

    什么是分布式锁与信号量以及学习分布式锁与信号量的意义是什么

    掌握分布式锁和信号量不仅有助于解决现有业务中的技术难题,还能为未来可能出现的新需求提供灵活的解决方案。 ### 总结 总而言之,分布式锁与信号量是实现分布式系统中资源共享与访问控制不可或缺的技术手段。通过...

    redis和zookeeper实现分布式锁的区别

    Redis和Zookeeper是两种常用的分布式协调服务,它们都可以用来实现分布式锁,但具体实现方式和特性有所不同。 **Redis分布式锁** Redis作为内存数据库,具有高性能、低延迟的特点,常被用作实现分布式锁。Redis...

    redis分布式锁使用实例.rar

    Redis 分布式锁是解决分布式系统中多个服务并发访问共享资源问题的一种常见方案。在分布式环境中,传统的单机锁无法满足需求,因为它们不能确保跨多个服务器或进程的同步。Redis,作为一个高性能的键值存储系统,...

    zookeeper分布式锁实例源码

    在实践中,ZooKeeper的分布式锁解决方案需要考虑的方面还包括异常处理(如网络分区)、锁的超时机制、公平性策略(确保等待时间最长的线程优先获得锁)以及锁的粒度(细粒度锁可减少锁竞争,但会增加系统复杂性)。...

    使用分布式锁出现的问题及解决方案.doc

    基于Redis的分布式锁实现通常简单易用,但并非无懈可击。一个基本的实现方式包括以下两个关键点: 1. 加锁与解锁必须使用相同的标识,通常通过生成唯一的ID来实现。 2. 设置锁的过期时间,以防止资源被永久锁定。 ...

Global site tag (gtag.js) - Google Analytics