`

基于Redis实现分布式锁-Redisson使用及源码分析

 
阅读更多

【转】

 

在分布式场景下,有很多种情况都需要实现最终一致性。在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可 以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等)。通过 Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性。
本文 主要探讨另外一种实现分布式最终一致性的解决方案——采用分布式锁。基于分布式锁的解决方案,比如zookeeper,redis都是相较于持久化(如利 用InnoDB行锁,或事务,或version乐观锁)方案提供了高可用性,并且支持丰富化的使用场景。 本文通过Java版本的redis分布式锁开源框架——Redisson来解析一下实现分布式锁的思路。

分布式锁的使用场景

如果是不跨限界上下文的情况,跟本地领域服务相关的数据一致性,尽量还是用事务来保证。但也有些无法用事务或者乐观锁来处理的情况,这些情况大多是对于一个共享型的数据源,有并发写操作的场景,但又不是对于单一领域的操作。
举 个例子,还是用租书来比喻,A和B两个人都来租书,在查看图书的时候,发现自己想要看的书《大设计》库存仅剩一本。书店系统中,书作为一种商品,是在商品 系统中,以Item表示出租商品的领域模型,同时每一笔交易都会产生一个订单,Order是在订单系统(交易限界上下文)中的领域模型。这里假设先不考虑 跨系统通信的问题(感兴趣的可以参考下领域服务、领域事件), 也暂时不考虑支付环节,但是我们需要保证A,B两个人不会都对于《大设计》产生订单就可以,也就是其中一个人是可以成功下单,另外一个人只要提示库存已没 即可。此时,书的库存就是一种共享的分布式资源,下订单,减库存就是一个需要保证一致性的写操作。但又因为两个操作不能在同一个本地事务,或者说,不共享 持久化的数据源的情况,这时候就可以考虑用分布式锁来实现。本例子中,就需要对于共享资源——书的库存进行加锁,至于锁的key可以结合领域模型的唯一标 识,如itemId,以及操作类型(如操作类型是RENT的)设计一个待加锁的资源标识。当然,这里还有一个并发性能的问题,如果是个库存很多的秒杀类型 的业务,那么就不能单纯在itemId 加类型加锁,还需要设计排队队列以及合理的调度算法,防止超卖等等,那些就是题外话了。本文只是将这个场景作为一个切入点,具体怎么设计锁,什么场景用还 要结合业务。

需要解决的问题

分布式的思路和线程同步锁ReentrantLock的思路是一样的。我们也要考虑如以下几个问题:

  • 死锁的情况。复杂的网络环境下,当加锁成功,后续操作正在处理时,获得锁的节点忽然宕机,无法释放锁的情况。如A在Node1 节点申请到了锁资源,但是Node1宕机,锁一直无法释放,订单没有生成,但是其他用户将无法申请到锁资源。
  • 锁的性能效率。分布式锁不能成为性能瓶颈或者单点故障不能导致业务异常。
  • 如果关键业务,可能需要重入场景,是否设计成可重入锁。这个可以参考下在多线程的情况下,比如ReentrantLock就是一种可重入锁,其内部又提供了公平锁和非公平锁两种实现和应用,本文不继续探讨。带着以上问题,和场景,沿着下文,来一一找到解决方案。

基于Redis实现

Redis 命令

在Redisson介绍前,回顾下Redis的命令,以及不通过任何开源框架,可以基于redis怎么设计一个分布式锁。基于不同应用系统实现的语言,也可以通过其他一些如Jedis,或者Spring的RedisOperations 等,来执行Reids命令Redis command list
分布式锁主要需要以下redis命令,这里列举一下。在实现部分可以继续参照命令的操作含义。

  1. SETNX key value (SET if Not eXists):当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。详见:SETNX commond
  2. GETSET key value:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。详见:GETSET commond
  3. GET key:返回 key 所关联的字符串值,如果 key 不存在那么返回 nil 。详见:GET Commond
  4. DEL key [KEY …]:删除给定的一个或多个 key ,不存在的 key 会被忽略,返回实际删除的key的个数(integer)。详见:DEL Commond
  5. HSET key field value:给一个key 设置一个{field=value}的组合值,如果key没有就直接赋值并返回1,如果field已有,那么就更新value的值,并返回0.详见:HSET Commond
  6. HEXISTS key field:当key 中存储着field的时候返回1,如果key或者field至少有一个不存在返回0。详见HEXISTS Commond
  7. HINCRBY key field increment:将存储在 key 中的哈希(Hash)对象中的指定字段 field 的值加上增量 increment。如果键 key 不存在,一个保存了哈希对象的新建将被创建。如果字段 field 不存在,在进行当前操作前,其将被创建,且对应的值被置为 0。返回值是增量之后的值。详见:HINCRBY Commond
  8. PEXPIRE key milliseconds:设置存活时间,单位是毫秒。expire操作单位是秒。详见:PEXPIRE Commond
  9. PUBLISH channel message:向channel post一个message内容的消息,返回接收消息的客户端数。详见PUBLISH Commond

Redis 实现分布式锁

假设我们现在要给itemId 1234 和下单操作 OPORDER 加锁,key是**OPORDER1234,结合上面的redis命令,似乎加锁的时候只要一个SETNX OPORDER1234 currentTimestamp ,如果返回1代表加锁成功,返回0 表示锁被占用着。然后再用DEL OPORDER_1234** 解锁,返回1表示解锁成功,0表示已经被解锁过。然而却还存在着很多问题:SETNX会存在锁竞争,如果在执行过程中客户端宕机,也会引起死锁问题,即锁 资源无法释放。并且当一个资源解锁的时候,释放锁之后,其他之前等待的锁没有办法再次自动重试申请锁(除非重新申请锁)。解决死锁的问题其实可以可以向 Mysql的死锁检测学习,设置一个失效时间,通过key的时间戳来判断是否需要强制解锁。但是强制解锁也存在问题,一个就是时间差问题,不同的机器的本 地时间可能也存在时间差,在很小事务粒度的高并发场景下还是会存在问题,比如删除锁的时候,在判断时间戳已经超过时效,有可能删除了其他已经获取锁的客户 端的锁。另外,如果设置了一个超时时间,但是确实执行时间超过了超时时间,那么锁会被自动释放,原来持锁的客户端再次解锁的时候会出现问题,而且最为严重 的还是一致性没有得到保障。
所以设计的时候需要考虑以下几点:

  1. 锁的时效设置。避免单点故障造成死锁,影响其他客户端获取锁。但是也要保证一旦一个客户端持锁,在客户端可用时不会被其他客户端解锁。(网上很多解决方案都是其他客户端等待队列长度判断是否强制解锁,但其实在偶发情况下就不能保证一致性,也就失去了分布式锁的意义)。
  2. 持锁期间的check,尽量在关键节点检查锁的状态,所以要设计成可重入锁,但在客户端使用时要做好吞吐量的权衡。
  3. 减少获取锁的操作,尽量减少redis压力。所以需要让客户端的申请锁有一个等待时间,而不是所有申请锁的请求要循环申请锁。
  4. 加锁的事务或者操作尽量粒度小,减少其他客户端申请锁的等待时间,提高处理效率和并发性。
  5. 持锁的客户端解锁后,要能通知到其他等待锁的节点,否则其他节点只能一直等待一个预计的时间再触发申请锁。类似线程的notifyAll,要能同步锁状态给其他客户端,并且是分布式消息。
  6. 考虑任何执行句柄中可能出现的异常,状态的正确流转和处理。比如,不能因为一个节点解锁失败,或者锁查询失败(redis 超时或者其他运行时异常),影响整个等待的任务队列,或者任务池。

锁设计

由于时间戳的设计有很多问题,以及上述几个问题,所以再换一种思路。先回顾几个关于锁的概念和经典java API。通过一些java.util.concurrent的API来处理一些本地队列的同步以及等待信号量的处理。

  • Semaphore :Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。其内部维护了一个int 类型的permits。有一个关于厕所的比喻很贴切,10个人在厕所外面排队,厕所有5个坑,只能最多进去五个人,那么就是初始化一个 permits=5的Semaphore。当一个人出来,会release一个坑位,其他等坑的人会被唤醒然后开始要有人进坑。SemaphoreReentrantLock一 样都是基于AbstractQueuedSynchronizer提供了公平锁和非公平锁两种实现。如果等待的人有秩序的排队等着,就说明选择了 Semaphore的公平锁实现,如果外面的人没有秩序,谁抢到是谁的(活跃线程就会一直有机会,存在线程饥饿可能),那就是Semaphore的非公平 锁实现。无论外面人怎么个等法Semaphore对于出坑的控制是一致的,每次只能是从一个坑里出来一个人。理解起来,其实就是厕所的5个坑位是一个共享 资源,也就是permits的值=5,每次acquire一下就是外面来了个人排队,每次release一下就是里面出来个人。厕所聊多有点不雅观,再回 归到分布式锁的话题。在刚才讲述的redis实现分布式锁的“第三点”,减少redis申请锁调用频率上就可以通过Semaphore来控制请求。虽然 Semaphore只是虚拟机内部的锁粒度的实现(不能跨进程),但是也可以一定程度减轻最后请求redis节点的压力。当然,也有种方法是,随机 sleep一段时间再去tryLock之类的,也可以达到减轻最后redis节点压力,但是毕竟使用信号量能更好得控制。而且我们可以再简单点,对于同一 个锁对象的申请锁操作,可以设计一个初始化permits = 0的LockEntry,permits = 0也就顾名思义,谁都进不来,厕所维修中。当有一个持锁对象unlock的时候,通过分布式消息机制通知所有等待节点,这时候,再release,这时候 permits=1,也就是本虚拟机中只能有一个线程能在acquire()的阻塞中脱颖而出(当然只是进了坑,但不一定能获取得到分布式锁)。
  • ConcurrentHashMap:这个应该不必多说,之谈谈在设计分布式锁中的用途。在上述的“第一点”,对于锁 的时效性的设置里提到了,要在持锁线程正常运行(持锁节点没有宕机或内部异常)的时候,保证其一直占用锁。只要占着茅坑的人还在用着,只要他还没有暴毙或 者无聊占着茅坑不XX,那就应该让外面的人都等着,不能强行开门托人。再收回来。。。这里ConcurrentHashMap的key无疑是锁对象的标识 (我们需要设计的redis的key),value就是一个时间任务对象,比如可以netty的TimerTask或其他定时API,定时得触发给我的锁 重新设置延时。这就是好比(好吧,再次用厕所比喻),蹲在里面的人的一种主动行为,隔1分钟敲两下厕所门,让外面的等的人知道,里面的人正在使用中,如果 里面的人1分钟超过还没有敲门,可能是里面人挂掉了,那么再采取强制措施,直接开门拽人,释放坑位。

并发API以及一些框架的使用主要是控制锁的进入和调度,加锁的流程以及锁的逻辑也是非常重要。因为redis支持hash结构,除了key作为锁 的标识,还可以利用value的结构<field,value>再给锁添加一个field存储一个唯一标识,可以通过UUID.randomUUID();加上一个线程号Thread.currentThread().getId()来 生成一个锁的name。这样通过key加field的双重保障,一来可以用作锁重入时的判断,而来可以确保解锁的时候,不会被其他客户端解锁掉(因为 unlock的时候del直接按照key删除)。value设计成一个int类型的计数器,最初设置成=1,当锁重入的时候,可以用HINCRBY 加1,解锁的时候,如果减一之后值不等于0,就说明锁在重入,那么不能删除,知道HINCRBY减到0,解锁。详细过程可以参照Redisson的批处理 命令,下面对加锁解锁过程加以描述。

加锁

下面参数的含义先说明下 :

  • KEYS[1] :需要加锁的key,这里需要是字符串类型。
  • ARGV[1] :锁的超时时间,防止死锁
  • ARGV[2] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + ":" + threadId

 

C代码  收藏代码
  1. <code>// 检查是否key已经被占用,如果没有则设置超时时间和唯一标识,初始化value=1  
  2. if (redis.call('exists', KEYS[1]) == 0)   
  3. then    
  4. redis.call('hset', KEYS[1], ARGV[2], 1);   
  5. redis.call('pexpire', KEYS[1], ARGV[1]);    
  6. return nil;   
  7. end;   
  8. // 如果锁重入,需要判断锁的key field 都一直情况下 value 加一  
  9. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)   
  10. then   
  11. redis.call('hincrby', KEYS[1], ARGV[2], 1);  
  12. redis.call('pexpire', KEYS[1], ARGV[1]);//锁重入重新设置超时时间  
  13. return nil;   
  14. end;   
  15. // 返回剩余的过期时间  
  16. return redis.call('pttl', KEYS[1]); </code>  

 

 

以上的方法,当返回空是,说明获取到锁,如果返回一个long数值(pttl 命令的返回值),说明锁已被占用,通过返回剩余时间,外部可以做一些等待时间的判断和调整。

解锁

也还是先说明一下参数信息: - KEYS[1] :需要加锁的key,这里需要是字符串类型。 - KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:"redisson_lockchannel{" + getName() + "}" - ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁 消息的客户端线程申请锁。 - ARGV[2] :锁的超时时间,防止死锁 - ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + ":" + threadId

 

C代码  收藏代码
  1. <code>// 如果key已经不存在,说明已经被解锁,直接发布(publihs)redis消息  
  2. if (redis.call('exists', KEYS[1]) == 0)   
  3. then  
  4. redis.call('publish', KEYS[2], ARGV[1]);  
  5. return 1;  
  6. end;  
  7. // key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。  
  8. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)  
  9. then   
  10. return nil;  
  11. end;   
  12. // 将value减1  
  13. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);   
  14. // 如果counter>0说明锁在重入,不能删除key  
  15. if (counter > 0)    
  16. then  
  17. redis.call('pexpire', KEYS[1], ARGV[2]);            
  18. return 0;   
  19. else   
  20. // 删除key并且publish 解锁消息  
  21. redis.call('del', KEYS[1]);                             
  22. redis.call('publish', KEYS[2], ARGV[1]);   
  23. return 1;   
  24. end;   
  25. return nil;</code>  

 

 


这就是解锁过程,当然建议提供强制解锁的接口,直接删除key,以防一些紧急故障出现的时候,关键业务节点受到影响。这里还有一个关键点, 就是publish命令,通过在锁的唯一通道发布解锁消息,可以减少其他分布式节点的等待或者空转,整体上能提高加锁效率。至于redis的消息订阅可以 有多种方式,基于Jedis的订阅API或者Spring的MessageListener都可以实现订阅,这里就可以结合刚才说的Semaphore,在第一次申请锁失败后acquire,接收到分布式消息后release就可以控制申请锁流程的再次进入。下面结合Redisson源码,相信会有更清晰的认识。

使用Redisson示例

Redisson使用起来很方便,但是需要redis环境支持eval命令,否则一切都是悲剧,比如me.结果还是要用RedisCommands 去写一套。例子就如下,获得一个RLock锁对象,然后tryLock 和unlock。trylock方法提供了锁重入的实现,并且客户端一旦持有锁,就会在能正常运行期间一直持有锁,直到主动unlock或者节点故障,主 动失效(超过默认的过期时间)释放锁。

 

Java代码  收藏代码
  1. <code>public boolean doMyBusiness(Object t) {  
  2.         RLock lock = redissonClient.getLock(getLockKey(t));  
  3.         try {  
  4.             if (lock.tryLock()) {  
  5.                 //do need Business  
  6.                 return true;  
  7.             } else {  
  8.                 // do other Business or return error.  
  9.                 return false;  
  10.             }  
  11.         } finally {  
  12.             lock.unlock();  
  13.         }  
  14. } </code>  

 

 

Redisson还提供了设置最长等待时间以及设置释放锁时间的含参tryLock接口 boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 。Redisson的lock 扩展了java.util.concurrent.locks.Lock的实现,也基本按照了Lock接口的实现方案。lock()方法会一直阻塞申请锁资源,直到有可用的锁释放。下面一部分会详细解析一部分关键实现的代码。

Redisson源码解析

Redisson 的异步任务(Future,Promise,FutureListener API),任务计时器(Timeout,TimerTask),以及通过AbstractChannel连接redis以及写入执行批处理命令等很多都是 基于netty框架的。po主因为不能使用eval,所以用Spring提供的redisApi ,RedisOperations来处理redis指令,异步调度等用了Spring的AsyncResultMessageListener以及一些concurrent api。这里还是先看一下Redisson的实现。

trylock

这里以带参数的trylock解析一下,无参的trylock是一种默认参数的实现。先源码走读一下。

 

Java代码  收藏代码
  1. @Override  
  2.     public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {  
  3.         long time = unit.toMillis(waitTime);  
  4.         // 申请锁,返回还剩余的锁过期时间  
  5.         Long ttl = tryAcquire(leaseTime, unit);  
  6.         // 如果为空,表示申请锁成功  
  7.         if (ttl == null) {  
  8.             return true;  
  9.         }  
  10.         // 订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。  
  11.         Future<RedissonLockEntry> future = subscribe();  
  12.         // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。  
  13.         if (!future.await(time, TimeUnit.MILLISECONDS)) {  
  14.             future.addListener(new FutureListener<RedissonLockEntry>() {  
  15.                 @Override  
  16.                 public void operationComplete(Future<RedissonLockEntry> future) throws Exception {  
  17.                     if (future.isSuccess()) {    
  18.                         unsubscribe(future);  
  19.                     }  
  20.                 }  
  21.             });  
  22.             return false;  
  23.         }  
  24.   
  25.         try {  
  26.             while (true) {  
  27.             // 再次尝试一次申请锁  
  28.                 ttl = tryAcquire(leaseTime, unit);  
  29.                 // 获得锁,返回  
  30.                 if (ttl == null) {  
  31.                     return true;  
  32.                 }  
  33.                 // 不等待申请锁,返回  
  34.                 if (time <= 0) {  
  35.                     return false;  
  36.                 }  
  37.   
  38.                 // 阻塞等待锁  
  39.                 long current = System.currentTimeMillis();  
  40.                 RedissonLockEntry entry = getEntry();  
  41.   
  42.                 if (ttl >= 0 && ttl < time) {  
  43.                    // 通过信号量(共享锁)阻塞,等待解锁消息.  
  44.                 // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。  
  45.                 // 否则就在wait time 时间范围内等待可以通过信号量  
  46.                     entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
  47.                 } else {  
  48.                     entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);  
  49.                 }  
  50.     // 更新等待时间(最大等待时间-已经消耗的阻塞时间)  
  51.                 long elapsed = System.currentTimeMillis() - current;  
  52.                 time -= elapsed;  
  53.             }  
  54.         } finally {  
  55.            // 无论是否获得锁,都要取消订阅解锁消息  
  56.             unsubscribe(future);  
  57.         }  
  58.   
  59.     }  

 

 

 

上述方法,调用加锁的逻辑就是在tryAcquire(long leaseTime, TimeUnit unit)

private Long tryAcquire(long leaseTime, TimeUnit unit) {
    if (leaseTime != -1) {
        return get(tryLockInnerAsync(leaseTime, unit, Thread.currentThread().getId()));
    }
    return get(tryLockInnerAsync(Thread.currentThread().getId()));
}


tryAcquire(long leaseTime, TimeUnit unit)只是针对leaseTime的不同参数进行不同的转发处理,再提一下,trylock的无参方法就是直接调用了get(tryLockInnerAsync(Thread.currentThread().getId())); 所以下面再看核心的tryLockInnerAsync 基本命令已经在之前解析过,相信这里看起来应该比较轻松,返回的是一个future对象,是为了异步处理IO,提高系统吞吐量。

Java代码  收藏代码
  1. Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {  
  2.         internalLockLeaseTime = unit.toMillis(leaseTime);  
  3.   
  4.         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,  
  5. "if (redis.call('exists', KEYS[1]) == 0) then " +  
  6. "redis.call('hset', KEYS[1], ARGV[2], 1); " +  
  7. "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
  8. "return nil; " +  
  9.  "end; " +  
  10. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  
  11. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  
  12. "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
  13. "return nil; " +  
  14. "end; " +  
  15. "return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));  
  16. }  

 

 

再说明一下,tryLock(long waitTime, long leaseTime, TimeUnit unit)有leaseTime参数的申请锁方法是会按照leaseTime时间来自动释放锁的。但是没有leaseTime参数的,比如tryLock()或者tryLock(long waitTime, TimeUnit unit)以及lock()是会一直持有锁的。再来看一下没有leaseTime参数的tryLockInnerAsync(Thread.currentThread().getId())

 

Java代码  收藏代码
  1. private Future<Long> tryLockInnerAsync(long threadId) {  
  2.   // 设置了默认的30秒的失效时间  
  3.        Future<Long> ttlRemaining = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId);  
  4.          
  5.        ttlRemaining.addListener(new FutureListener<Long>() {  
  6.            @Override  
  7.            public void operationComplete(Future<Long> future) throws Exception {  
  8.            // 如果future方法没有执行完成(IO被中断等原因)直接返回,不继续处理  
  9.                if (!future.isSuccess()) {  
  10.                    return;  
  11.                }  
  12.   
  13.                Long ttlRemaining = future.getNow();  
  14.                // 成功申请到锁,开始一个调度程序  
  15.               
  16.                if (ttlRemaining == null) {  
  17.                    scheduleExpirationRenewal();  
  18.                }  
  19.            }  
  20.        });  
  21.        return ttlRemaining;  
  22.    }  

 

 

这里比有leaseTime参数的trylock就多了异步scheduleExpirationRenewal调度。可以继续看一下,这里的expirationRenewalMap就是之前降到的一个ConcurrentMap结构。下面的这个调度方式很精妙。除非被unlock的cancleTask方法触发,否则会一直循环重置过期时间。

Java代码  收藏代码
  1. private static final ConcurrentMap<String, Timeout> expirationRenewalMap = PlatformDependent.newConcurrentHashMap();    
  2.   
  3.  private void scheduleExpirationRenewal() {  
  4.         // 保证任务不会被重复创建  
  5.         if (expirationRenewalMap.containsKey(getName())) {  
  6.             return;  
  7.         }  
  8.         // 添加一个netty的Timeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次  
  9.         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {  
  10.             @Override  
  11.             public void run(Timeout timeout) throws Exception {  
  12.             // 异步调用redis的pexpire命令,重置过期时间  
  13.                 expireAsync(internalLockLeaseTime, TimeUnit.MILLISECONDS);  
  14.                 // 移除,确保下一次调用  
  15.                 expirationRenewalMap.remove(getName());  
  16.                 scheduleExpirationRenewal(); // 再次循环调用  
  17.             }  
  18.         }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);  
  19.         // expirationRenewalMap如果已经有getName()任务,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建  
  20.         if (expirationRenewalMap.putIfAbsent(getName(), task) != null) {  
  21.             task.cancel();  
  22.         }  
  23.     }  

 

 

这个任务,其实还有一个问题,个人觉得在expirationRenewalMap.containsKey判断时也加上isLocked判断会比 较好,以防止unlock时出现redis节点异常的时候,任务没有办法自动停止,或者设置一个最大执行次数的限制也可以,否则极端情况下也会耗尽本地节 点的CPU资源。

unlock

解锁的逻辑相对简单,如下,redis 命令相信看起来也会比较轻松了。

Java代码  收藏代码
  1.  @Override  
  2.     public void unlock() {  
  3.         Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
  4. "if (redis.call('exists', KEYS[1]) == 0) then " +  
  5. "redis.call('publish', KEYS[2], ARGV[1]); " +  
  6. "return 1; " +  
  7. "end;" +  
  8. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  
  9. "return nil;" +  
  10. "end; " +  
  11. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  
  12. "if (counter > 0) then " +  
  13. "redis.call('pexpire', KEYS[1], ARGV[2]); " +  
  14. "return 0; " +  
  15. "else " +  
  16. "redis.call('del', KEYS[1]); " +  
  17. "redis.call('publish', KEYS[2], ARGV[1]); " +  
  18. "return 1; "+  
  19. "end; " +  
  20. "return nil;",  
  21. Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));  
  22.     if (opStatus == null) {  
  23.             throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "  
  24.                     + id + " thread-id: " + Thread.currentThread().getId());  
  25.     }  
  26.     // 解锁成功之后取消更新锁expire的时间任务  
  27.     if (opStatus) {  
  28.             cancelExpirationRenewal();  
  29.     }  

 

这里的 cancelExpirationRenewal对应着取消 scheduleExpirationRenewal的重置expire时间任务。

Java代码  收藏代码
  1. void cancelExpirationRenewal() {  
  2.       Timeout task = expirationRenewalMap.remove(getName());  
  3.       if (task != null) {  
  4.           task.cancel();  
  5.       }  
  6.   }  

 

 

再看一下Redisson是如何处理unlock的redis消息的。这里的消息内容就是unlockMessage = 0L和unlock方法中publish的内容是对应的。

 

Java代码  收藏代码
  1. public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {  
  2.   
  3.     public static final Long unlockMessage = 0L;  
  4.   
  5.     @Override  
  6.     protected RedissonLockEntry createEntry(Promise<RedissonLockEntry> newPromise) {  
  7.         return new RedissonLockEntry(newPromise);  
  8.     }  
  9.   
  10.     @Override  
  11.     protected void onMessage(RedissonLockEntry value, Long message) {  
  12.         if (message.equals(unlockMessage)) {  
  13.         // 释放一个许可,唤醒等待的entry.getLatch().tryAcquire去再次尝试获取锁。  
  14.             value.getLatch().release();  
  15.             // 如果entry还有其他Listeners回调,也唤醒执行。  
  16.             synchronized (value) {  
  17.                 Runnable runnable = value.getListeners().poll();  
  18.                 if (runnable != null) {  
  19.                     if (value.getLatch().tryAcquire()) {  
  20.                         runnable.run();  
  21.                     } else {  
  22.                         value.getListeners().add(runnable);  
  23.                     }  
  24.                 }  
  25.             }  
  26.         }  
  27.     }  
  28.   
  29. }  

 

Redisson还支持Redis的多种集群配置,一主一备,一主多备,单机等等。也是通过netty的EventExecutorGroup,Promise,Future等API实现调度的。

结语

在思考是否采用分布式锁以及采用哪种实现方案的时候,还是要基于业务,技术方案一定是基于业务基础,服务于业务,并且衡量过投入产出比的。所以如果 有成熟的解决方案,在业务可承受规模肯定是不要重复造轮子,当然还要经过严谨的测试。在po主用Spring的redis api实现时,也遇到了一些问题。
比如hIncrBy 的字符集问题,在使用命令的时候,当然可以直接set a 1然后incr a 1,这个问题可以参考ERR value is not an integer or out of range 问题,但在使用RedisConnection的时候,需要通过转码,byte[] value =SafeEncoder.encode(String.valueOf("1"))connection.hSet(key, field, value)这样才可以,或者自己通过String转成正确的编码也可以。
还有刚才说的调度pexpire任务,在unlock异常的时候,任务池中的任务无法自动结束。另外就是Spring的MessageListeneronMessage(Message message, byte[] pattern)回调方法message.getBody()是byte数组,消息内容转化的时候要处理一下。

分享到:
评论

相关推荐

    Redisson分布式锁源码解析

    Redisson分布式锁是一种基于Redis的分布式锁实现,继承了Reentrant Lock的特性,具有超时、重试、中断等功能,并且由于Redis的分布式特性,非常适合用来做Java中的分布式锁。本文将对Redisson分布式锁的源码进行解析...

    Redisson分布式锁.docx

    本文将详细介绍 Redisson 分布式锁的实现原理、使用方法和源码分析。 Redisson 分布式锁的实现原理 Redisson 分布式锁的实现原理基于 Redis 的 SETNX 命令, SETNX 命令可以设置一个键值,并且只有在键不存在时才...

    java版的分布式锁示例源码(Redission).zip

    分布式锁是一种在分布式系统中实现锁的机制,用于控制不同节点之间的并发操作。在Java中,通常通过协调服务(如Redis)来实现。分布式锁的主要目标是确保在分布式环境中同一时间只有一个线程或进程可以访问特定的...

    Redis学习实践 - 超实用超详细

    redis基本命令 Redis数据结构、原理分析、应用实战 什么是Redis Redis的作用 ...Redis实战及源码分析 分布式锁实战 管道模式 Redis的应用架构 缓存与数据一致性问题 缓存雪崩与缓存穿透 布隆过滤器

    Redis学习实践 - 适合初学者 从0到精通

    redis命令实践 Redis数据结构、原理分析、应用实战 什么是Redis Redis的作用 ...Redis实战及源码分析 分布式锁实战 管道模式 Redis的应用架构 缓存与数据一致性问题 缓存雪崩与缓存穿透 布隆过滤器

    踩到一个关于分布式锁的非比寻常的BUG!.doc

    Redisson 是一个流行的 Java 客户端,提供了对 Redis 的丰富支持,包括分布式锁的实现。其内部的“看门狗”机制是为了解决分布式锁在特定场景下的问题,比如锁的自动续期和防止死锁。 看门狗(Watchdog)是 ...

    《Redis 开发与运维》1

    4. 分布式锁:利用SETNX或Redisson等工具实现分布式环境下的锁服务。 5. 数据聚合:利用集合、有序集合进行数据的聚合计算。 但同时,Redis也有一些不能做的事情: 1. 大数据存储:Redis不适用于存储大量数据,因为...

    SpringBoot SpringCloud Redis

    标题和描述中提到的“SpringBoot、SpringCloud的案例源码,以及Redis安装,并附带上高可用集群”,涉及的知识点涵盖了SpringBoot、SpringCloud、Redis以及高可用集群的构建。以下是对这些技术知识点的详细介绍: ...

    jedis-2.9.0源码+jar包

    Lettuce提供了更现代的Netty支持,而Redisson则集成了更多高级特性,如分布式锁、ID生成器等。 总结,Jedis 2.9.0作为Java开发者与Redis交互的首选工具,其源码不仅提供了对Redis命令的详细实现,也展现了良好的...

    redisson-priority-queue

    在Redisson中,`redisson-priority-queue`是实现优先级队列(Priority Queue)的一个组件,它基于Redis的数据存储,用于处理需要按特定顺序执行或检索的元素。优先级队列通常在任务调度、事件处理、多线程环境中的...

    springboot高级-后8章-源码、资料、课件.rar

    6. **分布式系统**:在分布式环境中,SpringBoot支持分布式会话(例如RedisSession)、分布式锁(如Redisson)和服务发现(如Netflix Eureka、Consul或Zookeeper)。这些技术有助于构建高可用和可扩展的应用。 7. *...

    梳理的一些java开发中用上的框架和开发工具,肯定会遗漏,欢迎补充

    Redisson 是一个在Redis之上的Java客户端库,实现了分布式对象。 #### 日志记录 - **Log4j2**: 框架。Apache Log4j 2 是一款日志框架,提供比原版Log4j更加强大的功能。 - **Logback**: 框架。Logback 是Log4j的一...

    boot-multi-datasource:SpringBoot项目中的多数据源支持+redis缓存

    5. **分布式锁**:Redis的原子性操作(如INCR)使得它非常适合实现分布式锁,解决多节点并发问题。 三、整合应用 1. **数据源与缓存结合**:在多数据源环境中,可以根据业务需求将部分数据缓存在Redis中,提高读取...

    基于springboot+vue的二手图书交易平台源码数据库.zip

    4. 分布式锁和队列服务,如Redisson和RabbitMQ,解决并发问题。 5. 日志收集与分析,如ELK(Elasticsearch、Logstash、Kibana)堆栈,便于故障排查。 综上所述,基于SpringBoot和Vue的二手图书交易平台是一个综合性...

    lixuemin.github.io:个人博客

    Redisson是一个基于Redis的Java客户端,提供了丰富的分布式服务功能,如分布式锁、队列、信号量、计数器等。它使得开发者能够轻松地在分布式环境中实现高级数据结构和同步服务。在关于Redisson的内容中,李雪敏可能...

Global site tag (gtag.js) - Google Analytics