/** *分布式锁工厂类 */ public class RedisLockUtil { private static final Logger logger = Logger.getLogger(RedisLockUtil.class); private static Object schemeLock = new Object(); private static Map<String,RedisLockUtil> instances = new ConcurrentHashMap(); public static RedisLockUtil getInstance(String schema){ RedisLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ LockObserver lo = new LockObserver(schema); u = new RedisLockUtil(schema,lo); instances.put(schema, u); } } } return u; } private Object mutexLock = new Object(); private Map<String,Object> mutexLockMap = new ConcurrentHashMap(); private Map<String,RedisReentrantLock> cache = new ConcurrentHashMap<String,RedisReentrantLock>(); private DelayQueue<RedisReentrantLock> dq = new DelayQueue<RedisReentrantLock>(); private AbstractLockObserver lo; public RedisLockUtil(String schema, AbstractLockObserver lo){ Thread th = new Thread(lo); th.setDaemon(false); th.setName("Lock Observer:" schema); th.start(); clearUselessLocks(schema); this.lo = lo; } public void clearUselessLocks(String schema){ Thread th = new Thread(new Runnable(){ @Override public void run() { while(!SystemExitListener.isOver()){ try { RedisReentrantLock t = dq.take(); if(t.clear()){ String key = t.getKey(); synchronized(getMutex(key)){ cache.remove(key); } } t.resetCleartime(); } catch (InterruptedException e) { } } } }); th.setDaemon(true); th.setName("Lock cleaner:" schema); th.start(); } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private RedisReentrantLock getLock(String key,boolean addref){ RedisReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } if(addref){ if(!lock.incRef()){ synchronized(getMutex(key)){ lock = cache.get(key); if(!lock.incRef()){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s,false).unlock(); } } /** * 尝试加锁 * 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁 * * @param key * @return * @throws Exception * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) { return lock(key,-1); } public LockStat lock(String key,int timeout) { RedisReentrantLock ll = getLock(key,true); ll.incRef(); try{ if(ll.isOwner(false)){ ll.descrRef(); return LockStat.NONEED; } if(ll.lock(timeout)){ return LockStat.SUCCESS; }else{ ll.descrRef(); if(ll.setCleartime()){ dq.put(ll); } return null; } }catch(LockNotExistsException e){ ll.descrRef(); return lock(key,timeout); }catch(RuntimeException e){ ll.descrRef(); throw e; } } public void unlock(String key,LockStat stat) { unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive){ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ RedisReentrantLock lock = getLock(key,false); boolean candestroy = lock.unlock(); if(candestroy && !keepalive){ if(lock.setCleartime()){ dq.put(lock); } } } } public static enum LockStat{ NONEED, SUCCESS } }
/** *分布式锁本地代理类 */ public class RedisReentrantLock implements Delayed{ private static final Logger logger = Logger.getLogger(RedisReentrantLock.class); private ReentrantLock reentrantLock = new ReentrantLock(); private RedisLock redisLock; private long timeout = 3*60; private CountDownLatch lockcount = new CountDownLatch(1); private String key; private AbstractLockObserver observer; private int ref = 0; private Object refLock = new Object(); private boolean destroyed = false; private long cleartime = -1; public RedisReentrantLock(String key,AbstractLockObserver observer) { this.key = key; this.observer = observer; initWriteLock(); } public boolean isDestroyed() { return destroyed; } private synchronized void initWriteLock(){ redisLock = new RedisLock(key,new LockListener(){ @Override public void lockAcquired() { lockcount.countDown(); } @Override public long getExpire() { return 0; } @Override public void lockError() { /*synchronized(mutex){ mutex.notify(); }*/ lockcount.countDown(); } },observer); } public boolean incRef(){ synchronized(refLock){ if(destroyed) return false; ref ; } return true; } public void descrRef(){ synchronized(refLock){ ref --; } } public boolean clear() { if(destroyed) return true; synchronized(refLock){ if(ref > 0){ return false; } destroyed = true; redisLock.clear(); redisLock = null; return true; } } public boolean lock(long timeout) throws LockNotExistsException{ if(timeout <= 0) timeout = this.timeout; //incRef(); reentrantLock.lock();//多线程竞争时,先拿到第一层锁 if(redisLock == null){ reentrantLock.unlock(); //descrRef(); throw new LockNotExistsException(); } try{ lockcount = new CountDownLatch(1); boolean res = redisLock.trylock(timeout); if(!res){ lockcount.await(timeout, TimeUnit.SECONDS); //mutex.wait(timeout*1000); if(!redisLock.doExpire()){ reentrantLock.unlock(); return false; } } return true; }catch(InterruptedException e){ reentrantLock.unlock(); return false; } } public boolean lock() throws LockNotExistsException { return lock(timeout); } public boolean unlock(){ if(!isOwner(true)) { try{ throw new RuntimeException("big ================================================ error.key:" key); }catch(Exception e){ logger.error("err:" e,e); } return false; } try{ redisLock.unlock(); reentrantLock.unlock();//多线程竞争时,释放最外层锁 }catch(RuntimeException e){ reentrantLock.unlock();//多线程竞争时,释放最外层锁 throw e; }finally{ descrRef(); } return canDestroy(); } public boolean canDestroy(){ synchronized(refLock){ return ref <= 0; } } <a href="http://www.nbso.ca/">nbso online casino reviews</a> public String getKey() { return key; } public void setKey(String key) { this.key = key; } public boolean isOwner(boolean check) { synchronized(refLock){ if(redisLock == null) { logger.error("reidsLock is null:key=" key); return false; } boolean a = reentrantLock.isHeldByCurrentThread(); boolean b = redisLock.isOwner(); if(check){ if(!a || !b){ logger.error(key ";a:" a ";b:" b); } } return a && b; } } public boolean setCleartime() { synchronized(this){ if(cleartime>0) return false; this.cleartime = System.currentTimeMillis() 10*1000; return true; } } public void resetCleartime(){ synchronized(this){ this.cleartime = -1; } } @Override public int compareTo(Delayed object) { if(object instanceof RedisReentrantLock){ RedisReentrantLock t = (RedisReentrantLock)object; long l = this.cleartime - t.cleartime; if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0 else if(l < 0 ) return -1; else return 0; } return 0; } @Override public long getDelay(TimeUnit unit) { long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); return d; } }
/** *使用Redis实现的分布式锁 *基本工作原理如下: *1. 使用setnx(key,时间戮 超时),如果设置成功,则直接拿到锁 *2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时 *3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮 超时 1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS) */ public class RedisLock implements LockListener{ private String key; private boolean owner = false; private AbstractLockObserver observer = null; private LockListener lockListener = null; private boolean waiting = false; private long expire;//锁超时时间,以秒为单位 private boolean expired = false; public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) { this.key = key; this.lockListener = lockListener; this.observer = observer; } public boolean trylock(long expire) { synchronized(this){ if(owner){ return true; } this.expire = expire; this.expired = false; if(!waiting){ owner = observer.tryLock(key,expire); if(!owner){ waiting = true; observer.addLockListener(key, this); } } return owner; } } public boolean isOwner() { return owner; } public void unlock() { synchronized(this){ observer.unLock(key); owner = false; } } public void clear() { synchronized(this){ if(waiting) { observer.removeLockListener(key); waiting = false; } } } public boolean doExpire(){ synchronized(this){ if(owner) return true; if(expired) return false; expired = true; clear(); } return false; } @Override public void lockAcquired() { synchronized(this){ if(expired){ unlock(); return; } owner = true; waiting = false; } lockListener.lockAcquired(); } @Override public long getExpire() { return this.expire; } @Override public void lockError() { synchronized(this){ owner = false; waiting = false; lockListener.lockError(); } } }
public class LockObserver extends AbstractLockObserver implements Runnable{ private CacheRedisClient client; private Object mutex = new Object(); private Map<String,LockListener> lockMap = new ConcurrentHashMap(); private boolean stoped = false; private long interval = 500; private boolean terminated = false; private CountDownLatch doneSignal = new CountDownLatch(1); public LockObserver(String schema){ client = new CacheRedisClient(schema); SystemExitListener.addTerminateListener(new ExitHandler(){ public void run() { stoped = true; try { doneSignal.await(); } catch (InterruptedException e) { } } }); } public void addLockListener(String key,LockListener listener){ if(terminated){ listener.lockError(); return; } synchronized(mutex){ lockMap.put(key, listener); } } public void removeLockListener(String key){ synchronized(mutex){ lockMap.remove(key); } } @Override public void run() { while(!terminated){ long p1 = System.currentTimeMillis(); Map<String,LockListener> clone = new HashMap(); synchronized(mutex){ clone.putAll(lockMap); } Set<String> keyset = clone.keySet(); if(keyset.size() > 0){ ConnectionFactory.setSingleConnectionPerThread(keyset.size()); for(String key : keyset){ LockListener ll = clone.get(key); try{ if(tryLock(key,ll.getExpire())) { ll.lockAcquired(); removeLockListener(key); } }catch(Exception e){ ll.lockError(); removeLockListener(key); } } ConnectionFactory.releaseThreadConnection(); }else{ if(stoped){ terminated = true; doneSignal.countDown(); return; } } try { long p2 = System.currentTimeMillis(); long cost = p2 - p1; if(cost <= interval){ Thread.sleep(interval - cost); }else{ Thread.sleep(interval*2); } } catch (InterruptedException e) { } } } /** * 超时时间单位为s!!! * @param key * @param expire * @return */ public boolean tryLock(final String key,final long expireInSecond){ if(terminated) return false; final long tt = System.currentTimeMillis(); final long expire = expireInSecond * 1000; final Long ne = tt expire; List<Object> mm = client.multi(key, new MultiBlock(){ @Override public void execute() { transaction.setnxObject(key, ne); transaction.get(SafeEncoder.encode(key)); } }); Long res = (Long)mm.get(0); if(new Long(1).equals(res)) { return true; }else{ byte[] bb = (byte[])mm.get(1); Long ex = client.deserialize(bb); if(ex == null || tt > ex){ Long old = client.getSet(key, new Long(ne 1)); if(old == null || (ex == null&&old==null) || (ex!=null&&ex.equals(old))){ return true; } } } return false; } public void unLock(String key){ client.del(key); } } 使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:500线程 tps = 35000 [root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000 线程总时间:6553466;平均:13.106932 实际总时间:13609; 平均:0.027218
TPS达到35000,比方案1强了整整一个数量级;
相关推荐
本教程将深入探讨如何在SpringBoot应用中实现基于Redis的分布式锁。 首先,Redis之所以常被用作分布式锁的实现,是因为其具有以下优点: 1. **高可用性**:Redis支持主从复制,可以确保在单点故障时仍有服务可用。...
Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...
### 基于Redis分布式锁实现“秒杀” #### 一、引言 在现代互联网应用中,“秒杀”作为一种常见的促销手段,被广泛应用于电商领域。为了保证系统的稳定性和公平性,在高并发环境下实现秒杀功能时,合理地利用分布式...
以下是一个简单的Java示例,展示了如何使用Jedis客户端库来实现Redis分布式锁。 ```java public class RedisLock { private JedisPool jedisPool; public RedisLock(JedisPool jedisPool) { this.jedisPool = ...
本文将深入探讨如何使用C++结合Redis实现分布式锁,并详细讲解Redis API在C++中的应用,以及如何处理与Boost库的集成。 首先,Redis是一个高性能的键值存储数据库,广泛用于缓存、消息队列、分布式锁等场景。分布式...
Redis分布式锁实现Redisson 15问 Redis分布式锁是指在分布式系统中,多个服务实例之间对同一个资源加锁的机制,以保证数据的一致性和安全性。Redisson是一个基于Redis的分布式锁实现,它提供了一个高效、可靠的加锁...
基于Redis的分布式锁的实现方案 本文讨论了分布式锁的实现方案,主要基于Redis实现分布式锁,以解决分布式系统中资源访问的同步问题。在分布式系统中,需要协调各个系统或主机之间的资源访问,以避免彼此干扰和保证...
redlock-py, 在 python 中,Redis分布式锁 redlock - python 中的分布式锁这个 python 库实现了基于redis的分布式锁管理器算法( ) 。要创建锁定管理器:dlm = Redlock([{"host":"localhost","port":
Redis中的分布式锁实现通常基于`SETNX`命令或`SET`命令的`nx`与`ex`组合。`SETNX`命令用于设置键值,但如果键已经存在,则不执行任何操作,这可以确保锁的互斥性。`SET key value EX timeout NX`则同时设置了超时...
Redisson是基于Redis的Java客户端,它提供了丰富的数据结构和服务,包括分布式锁、信号量、队列、计数器等,极大地扩展了Redis在分布式系统中的应用能力。本篇文章将详细探讨如何使用Redisson实现Redis分布式事务锁...
在实现基于Redis的分布式锁时,通常会用到两个命令:NX(Not eXists)和EX(过期时间)。NX命令确保只有在键不存在时才能被设置,这样可以保证锁的互斥性。EX命令则是用来设置键的过期时间,保证锁可以在一段时间后...
本压缩包“zk:redis分布式锁.zip”提供了基于Zookeeper(zk)和Redis两种分布式锁实现的示例和相关资料。 首先,我们来看Zookeeper(zk)的分布式锁。Zookeeper是Apache的一个开源项目,提供了一个高可用的、高性能...
Redis 分布式锁的实现方式通常基于 SETNX (Set if Not Exist) 和 EXPIRE 命令。SETNX 命令可以确保键不存在时才设置,而 EXPIRE 命令用于为锁设置一个超时时间,防止死锁的发生。例如,客户端获取锁时,会尝试设置一...
总的来说,Redis分布式锁通过Jedis或Redisson提供了可靠、高效的并发控制方案。分段锁作为一种优化策略,可以进一步提升大规模并发环境下的系统性能。在实际项目中,应根据业务需求和性能指标选择合适的锁实现,以...
在本项目“redis分布式锁源代码最新版.rar”中,我们可以预见到修复了上个版本的一些细节问题,这将使得锁的实现更加健壮和安全。 分布式锁的基本概念是,当多个客户端试图同时访问共享资源时,只有一个客户端能够...
在SpringBoot应用中集成Redis分布式锁,可以创建一个`RedisLock`工具类,包含加锁和解锁的方法。加锁操作需要确保原子性,即同时设置键和过期时间,这在SpringBoot 2.x版本以上可以通过`opsForValue().setIfAbsent()...
Java Redis分布式锁的正确实现方式详解 Java Redis分布式锁是指使用Redis实现的分布式锁机制,旨在解决分布式系统中的并发问题。分布式锁有三种实现方式:数据库乐观锁、基于Redis的分布式锁和基于ZooKeeper的...
通过学习和实践这个“redis分布式锁使用实例”,你可以深入理解如何在实际项目中使用 Redis 和 Redisson 来实现高效且可靠的分布式锁,这对于处理分布式系统中的并发控制至关重要。同时,这也为你提供了设计和实现...
Redisson是一个基于Redis的Java客户端,提供了丰富的数据结构和分布式服务,其中包括分布式锁的实现。下面我们将详细介绍如何使用Redisson创建和管理分布式锁。 #### 1. 添加Redisson依赖 在项目中引入Redisson的...
【Redis分布式锁实现详解】 在分布式系统中,数据一致性是一个至关重要的问题,尤其是在高并发、大规模的场景下。为了在保证系统可用性的同时处理数据一致性,开发者常常采用各种技术手段,如分布式事务和分布式锁...