`
1028826685
  • 浏览: 936893 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

基于redis的分布式锁

 
阅读更多
/**
*分布式锁工厂类
*/
public class RedisLockUtil {
	private static final Logger logger = Logger.getLogger(RedisLockUtil.class);
	private static Object schemeLock = new Object();
	private static Map<String,RedisLockUtil> instances = new ConcurrentHashMap();
	public static RedisLockUtil getInstance(String schema){
		RedisLockUtil u = instances.get(schema);
		if(u==null){
			synchronized(schemeLock){
				u = instances.get(schema);
				if(u == null){
					LockObserver lo = new LockObserver(schema);		
					u = new RedisLockUtil(schema,lo);
					instances.put(schema, u);
				}
			}
		}
		return u;
	}

	private Object mutexLock = new Object();
	private Map<String,Object> mutexLockMap = new ConcurrentHashMap();
	private Map<String,RedisReentrantLock> cache = new ConcurrentHashMap<String,RedisReentrantLock>();
	private DelayQueue<RedisReentrantLock> dq = new DelayQueue<RedisReentrantLock>();
	private AbstractLockObserver lo;
	public RedisLockUtil(String schema, AbstractLockObserver lo){	
		Thread th = new Thread(lo);
		th.setDaemon(false);
		th.setName("Lock Observer:" schema);
		th.start();
		clearUselessLocks(schema);
		this.lo = lo;
	}

	public void clearUselessLocks(String schema){
		Thread th = new Thread(new Runnable(){
			@Override
			public void run() {
				while(!SystemExitListener.isOver()){
					try {
						RedisReentrantLock t = dq.take();						
						if(t.clear()){
							String key = t.getKey();
							synchronized(getMutex(key)){
								cache.remove(key);
							}
						}
						t.resetCleartime();
					} catch (InterruptedException e) {
					}
				}
			}

		});
		th.setDaemon(true);
		th.setName("Lock cleaner:" schema);
		th.start();
	}

	private Object getMutex(String key){
		Object mx = mutexLockMap.get(key);
		if(mx == null){
			synchronized(mutexLock){
				mx = mutexLockMap.get(key);
				if(mx==null){
					mx = new Object();
					mutexLockMap.put(key,mx);
				}
			}
		}
		return mx;
	}

	private RedisReentrantLock getLock(String key,boolean addref){
		RedisReentrantLock lock = cache.get(key);
		if(lock == null){
			synchronized(getMutex(key)){
				lock = cache.get(key);
				if(lock == null){
					lock = new RedisReentrantLock(key,lo);
					cache.put(key, lock);
				}
			}
		}
		if(addref){
			if(!lock.incRef()){
				synchronized(getMutex(key)){
					lock = cache.get(key);
					if(!lock.incRef()){
						lock = new RedisReentrantLock(key,lo);
						cache.put(key, lock);
					}
				}
			}
		}
		return lock;
	}

	public void reset(){
		for(String s : cache.keySet()){
			getLock(s,false).unlock();
		}
	}

	/**
	 * 尝试加锁
	 * 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁
	 * 
	 * @param key
	 * @return
	 * @throws Exception 
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	public LockStat lock(String key) {
		return lock(key,-1);
	}

	public LockStat lock(String key,int timeout) {
		RedisReentrantLock ll = getLock(key,true);
		ll.incRef();
		try{
			if(ll.isOwner(false)){
				ll.descrRef();
				return LockStat.NONEED;
			}
			if(ll.lock(timeout)){
				return LockStat.SUCCESS;
			}else{
				ll.descrRef();
				if(ll.setCleartime()){
					dq.put(ll);
				}
				return null;
			}
		}catch(LockNotExistsException e){
			ll.descrRef();
			return lock(key,timeout);
		}catch(RuntimeException e){
			ll.descrRef();
			throw e;
		}
	}

	public void unlock(String key,LockStat stat) {
		unlock(key,stat,false);
	}

	public void unlock(String key,LockStat stat,boolean keepalive){
		if(stat == null) return;
		if(LockStat.SUCCESS.equals(stat)){
			RedisReentrantLock lock = getLock(key,false);
			boolean candestroy = lock.unlock();
			if(candestroy && !keepalive){
				if(lock.setCleartime()){
					dq.put(lock);
				}
			}
		}
	}

	public static enum LockStat{
		NONEED,
		SUCCESS
	}
}

/**
*分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
	private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
 private ReentrantLock reentrantLock = new ReentrantLock();

 private RedisLock redisLock;
 private long timeout = 3*60;
 private CountDownLatch lockcount = new CountDownLatch(1);

 private String key;
 private AbstractLockObserver observer;

 private int ref = 0;
 private Object refLock = new Object();
 private boolean destroyed = false;

 private long cleartime = -1;

 public RedisReentrantLock(String key,AbstractLockObserver observer) {	
 	this.key = key;
 	this.observer = observer;
 	initWriteLock();
 }

	public boolean isDestroyed() {
		return destroyed;
	}

	private synchronized void initWriteLock(){
		redisLock = new RedisLock(key,new LockListener(){
			@Override
			public void lockAcquired() {
				lockcount.countDown();
			}
			@Override
			public long getExpire() {
				return 0;
			}

			@Override
			public void lockError() {
				/*synchronized(mutex){
					mutex.notify();
				}*/
				lockcount.countDown();
			}
 	},observer);
	}

	public boolean incRef(){
		synchronized(refLock){
			if(destroyed) return false;
 		ref ;
 	}
		return true;
	}

	public void descrRef(){
		synchronized(refLock){
 		ref --;
 	}
	}

	public boolean clear() {
		if(destroyed) return true;
		synchronized(refLock){
 		if(ref > 0){
 			return false;
 		}
 		destroyed = true;
 		redisLock.clear();
 		redisLock = null;
 		return true;
 	}
	}

 public boolean lock(long timeout) throws LockNotExistsException{
 	if(timeout <= 0) timeout = this.timeout;
 	//incRef();
 reentrantLock.lock();//多线程竞争时,先拿到第一层锁
 if(redisLock == null){
 	reentrantLock.unlock();
 	//descrRef();
 	throw new LockNotExistsException();
 }
 try{
 		lockcount = new CountDownLatch(1);
	 	boolean res = redisLock.trylock(timeout);
	 	if(!res){	 
	 		lockcount.await(timeout, TimeUnit.SECONDS);
					//mutex.wait(timeout*1000);
	 		if(!redisLock.doExpire()){
	 			reentrantLock.unlock();
						return false;
					}
	 	}
 	return true;
 }catch(InterruptedException e){
 	reentrantLock.unlock();
 	return false;
 }
 }

 public boolean lock() throws LockNotExistsException {
 	return lock(timeout);
 }

 public boolean unlock(){ 	 
 	if(!isOwner(true)) {
 		try{
 			throw new RuntimeException("big ================================================ error.key:" key);
 		}catch(Exception e){
 			logger.error("err:" e,e);
 		}
 		return false;
 	}
 try{
 	redisLock.unlock();
 	reentrantLock.unlock();//多线程竞争时,释放最外层锁
 }catch(RuntimeException e){
 	reentrantLock.unlock();//多线程竞争时,释放最外层锁
 	throw e;
 }finally{
 	descrRef();
 }
 return canDestroy();
 }

 public boolean canDestroy(){
 	synchronized(refLock){
 		return ref <= 0;
 	}
 }

 <a href="http://www.nbso.ca/">nbso online casino reviews</a>  public String getKey() {
		return key;
	}

	public void setKey(String key) {
		this.key = key;
	}

	public boolean isOwner(boolean check) {
 	synchronized(refLock){
 		if(redisLock == null) {
 			logger.error("reidsLock is null:key=" key);
 			return false;
 		}
 		boolean a = reentrantLock.isHeldByCurrentThread();
 		boolean b = redisLock.isOwner();
 		if(check){
 			if(!a || !b){
 				logger.error(key ";a:" a ";b:" b);
 			}
 		}
 		return a && b;
 	}
 }

	public boolean setCleartime() {
		synchronized(this){
			if(cleartime>0) return false;
			this.cleartime = System.currentTimeMillis() 10*1000; 
			return true;
		}
	}

	public void resetCleartime(){
		synchronized(this){
			this.cleartime = -1;
		}
	}

	@Override
	public int compareTo(Delayed object) {
		if(object instanceof RedisReentrantLock){
			RedisReentrantLock t = (RedisReentrantLock)object;
	 long l = this.cleartime - t.cleartime;

			if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0
			else if(l < 0 ) return -1;
			else return 0;
		}
		return 0;
	}

	@Override
	public long getDelay(TimeUnit unit) {
		 long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
	 return d;
	}

}

/**
*使用Redis实现的分布式锁
*基本工作原理如下:
*1. 使用setnx(key,时间戮 超时),如果设置成功,则直接拿到锁
*2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时
*3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮 超时 1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)
*/
public class RedisLock implements LockListener{
	private String key;
	private boolean owner = false;
	private AbstractLockObserver observer = null;
	private LockListener lockListener = null;
	private boolean waiting = false;
	private long expire;//锁超时时间,以秒为单位
	private boolean expired = false;

	public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {
		this.key = key;
		this.lockListener = lockListener;
		this.observer = observer;
	}

	public boolean trylock(long expire) {
		synchronized(this){
			if(owner){
				return true;
			}
			this.expire = expire;
			this.expired = false;
			if(!waiting){
				owner = observer.tryLock(key,expire);
				if(!owner){
					waiting = true;
					observer.addLockListener(key, this);
				}
			}
			return owner;
		}
	}

	public boolean isOwner() {
		return owner;
	}

	public void unlock() {
		synchronized(this){
			observer.unLock(key);
			owner = false;
		}
	}

	public void clear() {
		synchronized(this){
			if(waiting) {
				observer.removeLockListener(key);
				waiting = false;
			}
		}
	}

	public boolean doExpire(){
		synchronized(this){
			if(owner) return true;
			if(expired) return false;
			expired = true;
			clear();
		}
		return false;
	}

	@Override
	public void lockAcquired() {
		synchronized(this){
			if(expired){
				unlock();
				return;
			}
			owner = true;
			waiting = false;
		}
		lockListener.lockAcquired();
	}

	@Override
	public long getExpire() {
		return this.expire;
	}

	@Override
	public void lockError() {
		synchronized(this){
			owner = false;
			waiting = false;
			lockListener.lockError();
		}
	}

}

 

 

public class LockObserver extends AbstractLockObserver implements Runnable{
	private CacheRedisClient client;
	private Object mutex = new Object();
	private Map<String,LockListener> lockMap = new ConcurrentHashMap();
	private boolean stoped = false;
	private long interval = 500;
	private boolean terminated = false;
	private CountDownLatch doneSignal = new CountDownLatch(1);
	
	public LockObserver(String schema){
		client = new CacheRedisClient(schema);
		
		SystemExitListener.addTerminateListener(new ExitHandler(){
			public void run() {
				stoped = true;
				try {
					doneSignal.await();
				} catch (InterruptedException e) {
				}
			}
		});
	}
	
	
	public void addLockListener(String key,LockListener listener){
		if(terminated){
			listener.lockError();
			return;
		}
		synchronized(mutex){
			lockMap.put(key, listener);
		}
	}
	
	public void removeLockListener(String key){
		synchronized(mutex){
			lockMap.remove(key);
		}
	}
	
	@Override
	public void run() {
		while(!terminated){
			long p1 = System.currentTimeMillis();
			Map<String,LockListener> clone = new HashMap();
			synchronized(mutex){
				clone.putAll(lockMap);
			}			
			Set<String> keyset = clone.keySet();
			if(keyset.size() > 0){
				ConnectionFactory.setSingleConnectionPerThread(keyset.size());
				for(String key : keyset){
					LockListener ll = clone.get(key);
					try{
					 if(tryLock(key,ll.getExpire())) {
					 	ll.lockAcquired();
					 	removeLockListener(key);
					 }
					}catch(Exception e){
						ll.lockError();
						removeLockListener(key);
					}
				}
				ConnectionFactory.releaseThreadConnection();
			}else{
				if(stoped){
					terminated = true;
					doneSignal.countDown();
					return;
				}
			}
			try {
				long p2 = System.currentTimeMillis();
				long cost = p2 - p1;
				if(cost <= interval){
					Thread.sleep(interval - cost);
				}else{
					Thread.sleep(interval*2);
				}
			} catch (InterruptedException e) {
			}
		}
		
	}
	
	
	/**
	 * 超时时间单位为s!!!
	 * @param key
	 * @param expire
	 * @return
	 */
	public boolean tryLock(final String key,final long expireInSecond){
		if(terminated) return false;
		final long tt = System.currentTimeMillis();
		final long expire = expireInSecond * 1000;
		final Long ne = tt expire;
		List<Object> mm = client.multi(key, new MultiBlock(){
			@Override
			public void execute() {
				transaction.setnxObject(key, ne);
				transaction.get(SafeEncoder.encode(key));
			}
		});
		Long res = (Long)mm.get(0);
	 if(new Long(1).equals(res)) {
	 	return true;
	 }else{
	 	byte[] bb = (byte[])mm.get(1);
			Long ex = client.deserialize(bb);
	 	if(ex == null || tt > ex){
	 		Long old = client.getSet(key, new Long(ne 1));
	 		if(old == null || (ex == null&&old==null) || (ex!=null&&ex.equals(old))){
	 			return true;
	 		}
	 	}
	 }
	 return false;
	}

	public void unLock(String key){
		client.del(key);
	}
}

使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:
500线程 tps = 35000 [root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000 线程总时间:6553466;平均:13.106932 实际总时间:13609; 平均:0.027218
TPS达到35000,比方案1强了整整一个数量级;
分享到:
评论

相关推荐

    springboot基于redis分布式锁

    本教程将深入探讨如何在SpringBoot应用中实现基于Redis的分布式锁。 首先,Redis之所以常被用作分布式锁的实现,是因为其具有以下优点: 1. **高可用性**:Redis支持主从复制,可以确保在单点故障时仍有服务可用。...

    Java基于redis实现分布式锁代码实例

    Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...

    基于redis分布式锁实现“秒杀”

    ### 基于Redis分布式锁实现“秒杀” #### 一、引言 在现代互联网应用中,“秒杀”作为一种常见的促销手段,被广泛应用于电商领域。为了保证系统的稳定性和公平性,在高并发环境下实现秒杀功能时,合理地利用分布式...

    基于Redis方式实现分布式锁

    以下是一个简单的Java示例,展示了如何使用Jedis客户端库来实现Redis分布式锁。 ```java public class RedisLock { private JedisPool jedisPool; public RedisLock(JedisPool jedisPool) { this.jedisPool = ...

    C++基于redis的分布式锁redisAPI

    本文将深入探讨如何使用C++结合Redis实现分布式锁,并详细讲解Redis API在C++中的应用,以及如何处理与Boost库的集成。 首先,Redis是一个高性能的键值存储数据库,广泛用于缓存、消息队列、分布式锁等场景。分布式...

    Redis分布式锁实现Redisson 15问.doc

    Redis分布式锁实现Redisson 15问 Redis分布式锁是指在分布式系统中,多个服务实例之间对同一个资源加锁的机制,以保证数据的一致性和安全性。Redisson是一个基于Redis的分布式锁实现,它提供了一个高效、可靠的加锁...

    基于Redis的分布式锁的实现方案.pdf

    基于Redis的分布式锁的实现方案 本文讨论了分布式锁的实现方案,主要基于Redis实现分布式锁,以解决分布式系统中资源访问的同步问题。在分布式系统中,需要协调各个系统或主机之间的资源访问,以避免彼此干扰和保证...

    redlock-py, 在 python 中,Redis分布式锁.zip

    redlock-py, 在 python 中,Redis分布式锁 redlock - python 中的分布式锁这个 python 库实现了基于redis的分布式锁管理器算法( ) 。要创建锁定管理器:dlm = Redlock([{"host":"localhost","port":

    redis实现分布式锁,自旋式加锁,lua原子性解锁

    Redis中的分布式锁实现通常基于`SETNX`命令或`SET`命令的`nx`与`ex`组合。`SETNX`命令用于设置键值,但如果键已经存在,则不执行任何操作,这可以确保锁的互斥性。`SET key value EX timeout NX`则同时设置了超时...

    记录redisson实现redis分布式事务锁

    Redisson是基于Redis的Java客户端,它提供了丰富的数据结构和服务,包括分布式锁、信号量、队列、计数器等,极大地扩展了Redis在分布式系统中的应用能力。本篇文章将详细探讨如何使用Redisson实现Redis分布式事务锁...

    基于 Redis 的分布式锁

    在实现基于Redis的分布式锁时,通常会用到两个命令:NX(Not eXists)和EX(过期时间)。NX命令确保只有在键不存在时才能被设置,这样可以保证锁的互斥性。EX命令则是用来设置键的过期时间,保证锁可以在一段时间后...

    zk:redis分布式锁.zip

    本压缩包“zk:redis分布式锁.zip”提供了基于Zookeeper(zk)和Redis两种分布式锁实现的示例和相关资料。 首先,我们来看Zookeeper(zk)的分布式锁。Zookeeper是Apache的一个开源项目,提供了一个高可用的、高性能...

    redis分布式锁.zip

    Redis 分布式锁的实现方式通常基于 SETNX (Set if Not Exist) 和 EXPIRE 命令。SETNX 命令可以确保键不存在时才设置,而 EXPIRE 命令用于为锁设置一个超时时间,防止死锁的发生。例如,客户端获取锁时,会尝试设置一...

    003 redis分布式锁 jedis分布式锁 Redisson分布式锁 分段锁

    总的来说,Redis分布式锁通过Jedis或Redisson提供了可靠、高效的并发控制方案。分段锁作为一种优化策略,可以进一步提升大规模并发环境下的系统性能。在实际项目中,应根据业务需求和性能指标选择合适的锁实现,以...

    redis分布式锁源代码最新版.rar

    在本项目“redis分布式锁源代码最新版.rar”中,我们可以预见到修复了上个版本的一些细节问题,这将使得锁的实现更加健壮和安全。 分布式锁的基本概念是,当多个客户端试图同时访问共享资源时,只有一个客户端能够...

    SpringBoot 使用 Redis 分布式锁解决并发问题.docx

    在SpringBoot应用中集成Redis分布式锁,可以创建一个`RedisLock`工具类,包含加锁和解锁的方法。加锁操作需要确保原子性,即同时设置键和过期时间,这在SpringBoot 2.x版本以上可以通过`opsForValue().setIfAbsent()...

    Java Redis分布式锁的正确实现方式详解

    Java Redis分布式锁的正确实现方式详解 Java Redis分布式锁是指使用Redis实现的分布式锁机制,旨在解决分布式系统中的并发问题。分布式锁有三种实现方式:数据库乐观锁、基于Redis的分布式锁和基于ZooKeeper的...

    redis分布式锁使用实例.rar

    通过学习和实践这个“redis分布式锁使用实例”,你可以深入理解如何在实际项目中使用 Redis 和 Redisson 来实现高效且可靠的分布式锁,这对于处理分布式系统中的并发控制至关重要。同时,这也为你提供了设计和实现...

    Redis 分布式锁使用

    Redisson是一个基于Redis的Java客户端,提供了丰富的数据结构和分布式服务,其中包括分布式锁的实现。下面我们将详细介绍如何使用Redisson创建和管理分布式锁。 #### 1. 添加Redisson依赖 在项目中引入Redisson的...

    基于Redis分布式锁的实现代码

    【Redis分布式锁实现详解】 在分布式系统中,数据一致性是一个至关重要的问题,尤其是在高并发、大规模的场景下。为了在保证系统可用性的同时处理数据一致性,开发者常常采用各种技术手段,如分布式事务和分布式锁...

Global site tag (gtag.js) - Google Analytics