1. 前因
以前实现过一个Redis实现的全局锁, 虽然能用, 但是感觉很不完善, 不可重入, 参数太多等等.
最近看到了一个新的Redis客户端Redisson, 看了下源码, 发现了一个比较好的锁实现RLock, 于是记录下.
2. Maven依赖
1
2
3
4
5
|
< dependency >
< groupId >org.redisson</ groupId >
< artifactId >redisson</ artifactId >
< version >1.2.1</ version >
</ dependency >
|
3. 初试
Redisson中RLock的使用很简单, 来看看一个最简单的例子.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import org.redisson.Redisson;
import org.redisson.core.RLock;
public class Temp {
public static void main(String[] args) throws Exception {
Redisson redisson = Redisson.create();
RLock lock = redisson.getLock( "haogrgr" );
lock.lock();
try {
System.out.println( "hagogrgr" );
}
finally {
lock.unlock();
}
redisson.shutdown();
}
} |
4. RLock接口
通过上面的例子可以看出, 使用起来和juc里面的Lock接口使用很类似, 那么来看看RLock这个接口.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
Rlock | ----------Lock |
----------void lock()
|
----------void lockInterruptibly()
|
----------boolean tryLock()
|
----------boolean tryLock(long time, TimeUnit unit)
|
----------void unlock()
|
----------Condition newCondition()
| ----------RObject |
----------String getName()
|
----------void delete()
| ----------void lockInterruptibly(long leaseTime, TimeUnit unit) | ----------boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) | ----------void lock(long leaseTime, TimeUnit unit) | ----------void forceUnlock() | ----------boolean isLocked(); | ----------boolean isHeldByCurrentThread() | ----------int getHoldCount() |
可以看到, 该接口主要继承了Lock接口, 然后扩展了部分方法, 比如:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
新加入的leaseTime主要是用来设置锁的过期时间, 形象的解释就是, 如果超过leaseTime还没有解锁的话, 我就强制解锁.
5. RLock接口的实现
具体的实现类是RedissonLock, 下面来大概看看实现原理. 先看看 (3) 中例子执行时, 所运行的命令(通过monitor命令):
1
2
3
4
5
6
7
8
|
127.0.0.1:6379> monitor OK 1434959509.494805 [0 127.0.0.1:57911] "SETNX" "haogrgr" "{\"@class\":\"org.redisson.RedissonLock$LockValue\",\"counter\":1,\"id\":\"c374addc-523f-4943-b6e0-c26f7ab061e3\",\"threadId\":1}" 1434959509.494805 [0 127.0.0.1:57911] "GET" "haogrgr" 1434959509.524805 [0 127.0.0.1:57911] "MULTI" 1434959509.529805 [0 127.0.0.1:57911] "DEL" "haogrgr" 1434959509.529805 [0 127.0.0.1:57911] "PUBLISH" "redisson__lock__channel__{haogrgr}" "0" 1434959509.529805 [0 127.0.0.1:57911] "EXEC" |
可以看到, 大概原理是, 通过判断Redis中是否有某一key, 来判断是加锁还是等待, 最后的publish是一个解锁后, 通知阻塞在lock的线程.
分布式锁的实现依赖的单点, 这里Redis就是单点, 通过在Redis中维护状态信息来实现全局的锁. 那么来看看RedissonLock如何
实现可重入, 保证原子性等等细节.
6. 加锁源码分析
从最简单的无参数的lock参数来看源码.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ;
}
} public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(- 1 , null ); //leaseTime : -1 表示key不设置过期时间
} public void lockInterruptibly( long leaseTime, TimeUnit unit) throws InterruptedException {
Long ttl;
if (leaseTime != - 1 ) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner();
}
// lock acquired
if (ttl == null ) {
return ;
}
subscribe().awaitUninterruptibly();
try {
while ( true ) {
if (leaseTime != - 1 ) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner();
}
// lock acquired
if (ttl == null ) {
break ;
}
// waiting for message
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (ttl >= 0 ) {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
entry.getLatch().acquire();
}
}
} finally {
unsubscribe();
}
} |
代码有点多, 但是没关系, 慢慢分解, 由于这里我们是调用无参数的lock方法, 所以最后执行到的方法是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
private Long tryLockInner() {
final LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); //保存锁的状态: 客户端UUID+线程ID来唯一标识某一JVM实例的某一线程
currentLock.incCounter(); //用来保存重入次数, 实现可重入功能, 初始情况是1
//Redisson封装了交互的细节, 具体的逻辑为execute方法逻辑.
return connectionManager.write(getName(), new SyncOperation<LockValue, Long>() {
@Override
public Long execute(RedisConnection<Object, LockValue> connection) {
Boolean res = connection.setnx(getName(), currentLock); //如果key:haogrgr不存在, 就set并返回true, 否则返回false
if (!res) { //如果设置失败, 那么表示有锁竞争了, 于是获取当前锁的状态, 如果拥有者是当前线程, 就累加重入次数并set新值
connection.watch(getName()); //通过watch命令配合multi来实现简单的事务功能
LockValue lock = (LockValue) connection.get(getName());
if (lock != null && lock.equals(currentLock)) { //LockValue的equals实现为比较客户id和threadid是否一样
lock.incCounter(); //如果当前线程已经获取过锁, 则累加加锁次数, 并set更新
connection.multi();
connection.set(getName(), lock);
if (connection.exec().size() == 1 ) {
return null ; //set成功,
}
}
connection.unwatch();
//走到这里, 说明上面set的时候, 其他客户端在 watch之后->set之前 有其他客户端修改了key值
//则获取key的过期时间, 如果是永不过期, 则返回-1, 具体处理后面说明
Long ttl = connection.pttl(getName());
return ttl;
}
return null ;
}
});
} |
tryLockInner的逻辑已经看完了, 可以知道, 有三种情况:
(1) key不存在, 加锁:
当key不存在时, 设置锁的初始状态并set, 具体来看就是 setnx haogrgr LockValue{ id: Redisson对象的id, threadId: 当前线程id, counter: 当前重入次数,这里为第一次获取,所以为1}
通过上面的操作. 达到获取锁的目的, 通过setnx来达到实现类似于 if(map.get(key) == null) { map.put(key) } 的功能, 防止多个客户端同时set时, 新值覆盖老值.
(2)key存在, 且获取锁的当前线程, 重入:
这里就是锁重入的情况, 也就是锁的拥有者第二次调用lock方法, 这时, 通过先get, 然后比较客户端ID和当前线程ID来判断拥有锁的线程是不是当前线程.(客户端ID+线程ID才能唯一定位锁拥有者线程)
判断发现当前是重入情况, 则累加LockValue的counter, 然后重新set回去, 这里使用到了watch和multi命令, 防止 get -> set 期间其他客户端修改了key的值.
(3)key存在, 且是其他线程获取的锁, 等待:
首先尝试获取锁(setnx), 失败后发现锁拥有者不是当前线程, 则获取key的过期时间, 返回过期时间
那么接下来看看tryLockInner调用完成后的处理代码.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public void lockInterruptibly( long leaseTime, TimeUnit unit) throws InterruptedException {
Long ttl;
if (leaseTime != - 1 ) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner(); //lock()方法调用会走的逻辑
}
// lock acquired
if (ttl == null ) { //加锁成功(新获取锁, 重入情况) tryLockInner会返回null, 失败会返回key超时时间, 或者-1(key未设置超时时间)
return ; //加锁成功, 返回
}
//subscribe这个方法代码有点多, Redisson通过netty来和redis通讯, 然后subscribe返回的是一个Future类型,
//Future的awaitUninterruptibly()调用会阻塞, 然后Redisson通过Redis的pubsub来监听unlock的topic(getChannelName())
//例如, 5中所看到的命令 "PUBLISH" "redisson__lock__channel__{haogrgr}" "0"
//当解锁时, 会向名为 getChannelName() 的topic来发送解锁消息("0")
//而这里 subscribe() 中监听这个topic, 在订阅成功时就会唤醒阻塞在awaitUninterruptibly()的方法.
//所以线程在这里只会阻塞很短的时间(订阅成功即唤醒, 并不代表已经解锁)
subscribe().awaitUninterruptibly();
try {
while ( true ) { //循环, 不断重试lock
if (leaseTime != - 1 ) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner(); //不多说了
}
// lock acquired
if (ttl == null ) {
break ;
}
// 这里才是真正的等待解锁消息, 收到解锁消息, 就唤醒, 然后尝试获取锁, 成功返回, 失败则阻塞在acquire().
// 收到订阅成功消息, 则唤醒阻塞上面的subscribe().awaitUninterruptibly();
// 收到解锁消息, 则唤醒阻塞在下面的entry.getLatch().acquire();
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (ttl >= 0 ) {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
entry.getLatch().acquire();
}
}
} finally {
unsubscribe(); //加锁成功或异常,解除订阅
}
} |
主要的代码都加上了详细的注释, subscribe() 方法的代码复杂些, 但具体就是利用redis的pubsub提供一个通知机制来减少不断的重试.
很多的Redis锁实现都是失败后sleep一定时间后重试, 在锁被占用时间较长时, 不断的重试是浪费, 而sleep也会导致不必要的时间浪费(在sleep期间可能已经解锁了), sleep时间太长, 时间浪费, 太短, 重试次数会增加~~~.
到这里lock的逻辑已经看完了, 其他的比如tryLock方法逻辑和lock类似, 不过加了超时时间, 然后还有一种lock方法就是对key加上了过期时间.
7. 解锁源码
unlock的逻辑相对简单.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public void unlock() {
connectionManager.write(getName(), new SyncOperation<Object, Void>() {
@Override
public Void execute(RedisConnection<Object, Object> connection) {
LockValue lock = (LockValue) connection.get(getName());
if (lock != null ) {
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock.equals(currentLock)) {
if (lock.getCounter() > 1 ) {
lock.decCounter();
connection.set(getName(), lock);
} else {
unlock(connection);
}
} else {
throw new IllegalMonitorStateException( "Attempt to unlock lock, not locked by current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
} else {
// could be deleted
}
return null ;
}
});
} private void unlock(RedisConnection<Object, Object> connection) {
int counter = 0 ;
while (counter < 5 ) {
connection.multi();
connection.del(getName());
connection.publish(getChannelName(), unlockMessage);
List<Object> res = connection.exec();
if (res.size() == 2 ) {
return ;
}
counter++;
}
throw new IllegalStateException( "Can't unlock lock after 5 attempts. Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
} |
具体的逻辑比较简单, 我就不注释了, 大概就是, 如果是多次重入的, 就以此递减然后 set, 如果是只lock一次的, 就删除, 然后publish一条解锁的message到getChannelName() tocpic.
这里解锁会重试五次, 失败就抛异常.
相关推荐
本文将深入探讨C#中如何使用Lock和Redis分布式锁来解决并发问题,以秒杀系统为例进行阐述。 首先,让我们理解什么是并发控制。并发控制是指在多线程环境下确保数据的一致性和完整性,防止多个线程同时访问同一资源...
Redis 分布式锁是分布式系统中解决并发控制和数据一致性问题的一种常见机制。在大型分布式应用中,单机锁无法满足需求,因为它们局限于单个服务器。Redis 的高可用性和低延迟特性使其成为实现分布式锁的理想选择。...
现在很多项目单机版已经不满足了,分布式变得越受欢迎,同时也带来很多问题,分布式锁也变得没那么容易实现,分享一个redis分布式锁工具类,里面的加锁采用lua脚本(脚本比较简单,采用java代码实现,无须外部调用...
综上所述,这个压缩包提供的Redis分布式锁工具包为Java开发者提供了一种简单、高效的方法来解决分布式环境下的锁问题,特别适合于处理高并发的快应用和企业级应用。通过集成到Spring工程中,开发人员可以利用Redis的...
Redis分布式锁实现Redisson 15问 Redis分布式锁是指在分布式系统中,多个服务实例之间对同一个资源加锁的机制,以保证数据的一致性和安全性。Redisson是一个基于Redis的分布式锁实现,它提供了一个高效、可靠的加锁...
本教程将深入探讨如何在SpringBoot应用中实现基于Redis的分布式锁。 首先,Redis之所以常被用作分布式锁的实现,是因为其具有以下优点: 1. **高可用性**:Redis支持主从复制,可以确保在单点故障时仍有服务可用。...
redlock-py, 在 python 中,Redis分布式锁 redlock - python 中的分布式锁这个 python 库实现了基于redis的分布式锁管理器算法( ) 。要创建锁定管理器:dlm = Redlock([{"host":"localhost","port":
redis分布式锁带方法名和过期时间,如果不传方法名自动获取改方法名做key,使锁的粒度到方法级别,释放锁的时间可以根据自己的需要自定义,默认5s,为了解决超大方法执行时间太长,还没有执行完,锁就被释放掉的问题.
redis分布式锁的工具类,采用的是Lua代码的方式,保证了Java执行方法的原子性。
Redis分布式锁是解决多节点集群部署中并发控制的重要工具,但在使用过程中,它存在一些问题,需要谨慎处理。本文将详细探讨这些问题及其解决方案。 首先,死锁问题是Redis分布式锁的一个常见挑战。当一个进程获取了...
在IT行业中,尤其是在高并发的电子商务系统中,"redis分布式锁实现抢单秒杀"是一个常见的挑战。这个场景模拟了多个用户同时参与秒杀活动,系统需要确保库存的准确性和抢单的公平性,避免超卖和数据不一致的问题。...
本压缩包“zk:redis分布式锁.zip”提供了基于Zookeeper(zk)和Redis两种分布式锁实现的示例和相关资料。 首先,我们来看Zookeeper(zk)的分布式锁。Zookeeper是Apache的一个开源项目,提供了一个高可用的、高性能...
一、Redis分布式锁的优势 1. 响应速度:Redis是内存数据库,读写速度快,非常适合处理高并发的锁请求。 2. 原子性:Redis的操作是原子性的,如`SETNX`(设置如果不存在)、`EXPIRE`(设置过期时间),确保了锁的获取与...
本篇文章将深入探讨如何在C#.NET环境下利用Redis实现分布式锁,以及相关的核心知识点。 首先,让我们理解什么是分布式锁。分布式锁是在分布式系统中,用于协调不同节点间对共享资源访问的一种工具。它确保在任何...
Redis分布式锁工具包简化了这一过程,提供了封装好的API,使开发者能够快速、安全地在代码中实现锁的功能。 该工具包可能包含以下核心功能: 1. **锁的获取与释放**:工具包通常会提供一个`lock()`方法用于获取锁...
Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...
总的来说,Redis分布式锁通过Jedis或Redisson提供了可靠、高效的并发控制方案。分段锁作为一种优化策略,可以进一步提升大规模并发环境下的系统性能。在实际项目中,应根据业务需求和性能指标选择合适的锁实现,以...