Java实现基于Redis的分布式锁
单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式。其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去这里看看原理,我就不翻译了,免得变味了,看懂了之后看代码应该就容易理解了。
时间统一问题:各个客户端加锁时需要获取时间,而这个时间都不应当从本地获取,因为各个客户端的时间并不是一致的,因此需要提供一个TimeServer提供获取时间的服务,下面源码中用到的关于时间服务的三个类(包括TimeServer、TimeClient和Time Client Exception)会在另一篇博客《Java NIO时间服务》给源码.
我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition()方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是
最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)。
package cc.lixiaohui.lock; import java.util.concurrent.TimeUnit; public interface Lock extends Releasable{ /** * 阻塞性的获取锁, 不响应中断 */ void lock(); /** * 阻塞性的获取锁, 响应中断 * * @throws InterruptedException */ void lockInterruptibly() throws InterruptedException; /** * 尝试获取锁, 获取不到立即返回, 不阻塞 */ boolean tryLock(); /** * 超时自动返回的阻塞性的获取锁, 不响应中断 * * @param time * @param unit * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 * */ boolean tryLock(long time, TimeUnit unit); /** * 超时自动返回的阻塞性的获取锁, 响应中断 * * @param time * @param unit * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 * @throws InterruptedException 在尝试获取锁的当前线程被中断 */ boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException; /** * 释放锁 */ void unlock(); }
Releasable.java :
package cc.lixiaohui.lock; /** * 代表持有资源的对象, 例如 * <ul> * <li> 基于jedis的锁自然持有与redis server的连接 </li> * <li> 基于时间统一的的锁自然持有与time server的连接</li> * </ul> * 因此锁应该实现该接口, 并在{@link Releasable#resease() release} 方法中释放相关的连接 * * @author lixiaohui * */ public interface Releasable { /** * 释放持有的所有资源 */ void release(); }
看Lock的抽象实现:
package cc.lixiaohui.lock; import java.util.concurrent.TimeUnit; /** * 锁的骨架实现, 真正的获取锁的步骤由子类去实现. * * @author lixiaohui * */ public abstract class AbstractLock implements Lock { /** * <pre> * 这里需不需要保证可见性值得讨论, 因为是分布式的锁, * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性 * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了. * </pre> */ protected volatile boolean locked; /** * 当前jvm内持有该锁的线程(if have one) */ private Thread exclusiveOwnerThread; public void lock() { try { lock(false, 0, null, false); } catch (InterruptedException e) { // TODO ignore } } public void lockInterruptibly() throws InterruptedException { lock(false, 0, null, true); } public boolean tryLock(long time, TimeUnit unit) { try { return lock(true, time, unit, false); } catch (InterruptedException e) { // TODO ignore } return false; } public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException { return lock(true, time, unit, true); } public void unlock() { // TODO 检查当前线程是否持有锁 if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException("current thread does not hold the lock"); } unlock0(); setExclusiveOwnerThread(null); } protected void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } protected abstract void unlock0(); /** * 阻塞式获取锁的实现 * * @param useTimeout * @param time * @param unit * @param interrupt 是否响应中断 * @return * @throws InterruptedException */ protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException; }
基于Redis的最终实现(not reentrant),关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:
package cc.lixiaohui.lock; import java.io.IOException; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import redis.clients.jedis.Jedis; import cc.lixiaohui.lock.time.nio.client.TimeClient; /** * <pre> * 基于Redis的SETNX操作实现的分布式锁 * * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞 * * <a href="http://redis.io/commands/setnx">SETNX操作参考资料</a> * </pre> * * @author lixiaohui * */ public class RedisBasedDistributedLock extends AbstractLock { private Jedis jedis; private TimeClient timeClient; // 锁的名字 protected String lockKey; // 锁的有效时长(毫秒) protected long lockExpires; public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires, SocketAddress timeServerAddr) throws IOException { this.jedis = jedis; this.lockKey = lockKey; this.lockExpires = lockExpires; timeClient = new TimeClient(timeServerAddr); } // 阻塞式获取锁的实现 protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{ if (interrupt) { checkInterruption(); } // 超时控制 的时间可以从本地获取, 因为这个和锁超时没有关系, 只是一段时间区间的控制 long start = localTimeMillis(); long timeout = unit.toMillis(time); // if !useTimeout, then it's useless while (useTimeout ? isTimeout(start, timeout) : true) { if (interrupt) { checkInterruption(); } long lockExpireTime = serverTimeMillis() + lockExpires + 1;//锁超时时间 String stringOfLockExpireTime = String.valueOf(lockExpireTime); if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁 // TODO 成功获取到锁, 设置相关标识 locked = true; setExclusiveOwnerThread(Thread.currentThread()); return true; } String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // 假设多个线程(非单jvm)同时走到这里 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的) // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了 if (oldValue != null && isTimeExpired(oldValue)) { // TODO 成功获取到锁, 设置相关标识 locked = true; setExclusiveOwnerThread(Thread.currentThread()); return true; } } else { // TODO lock is not expired, enter next loop retrying } } return false; } public boolean tryLock() { long lockExpireTime = serverTimeMillis() + lockExpires + 1;//锁超时时间 String stringOfLockExpireTime = String.valueOf(lockExpireTime); if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁 // TODO 成功获取到锁, 设置相关标识 locked = true; setExclusiveOwnerThread(Thread.currentThread()); return true; } String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // 假设多个线程(非单jvm)同时走到这里 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的) // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了 if (oldValue != null && isTimeExpired(oldValue)) { // TODO 成功获取到锁, 设置相关标识 locked = true; setExclusiveOwnerThread(Thread.currentThread()); return true; } } else { // TODO lock is not expired, enter next loop retrying } return false; } /** * Queries if this lock is held by any thread. * * @return {@code true} if any thread holds this lock and * {@code false} otherwise */ public boolean isLocked() { if (locked) { return true; } else { String value = jedis.get(lockKey); // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了, // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断 // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就 // 不是同步控制, 它只是一种锁状态的报告. return !isTimeExpired(value); } } @Override protected void unlock0() { // TODO 判断锁是否过期 String value = jedis.get(lockKey); if (!isTimeExpired(value)) { doUnlock(); } } public void release() { jedis.close(); timeClient.close(); } private void doUnlock() { jedis.del(lockKey); } private void checkInterruption() throws InterruptedException { if(Thread.currentThread().isInterrupted()) { throw new InterruptedException(); } } private boolean isTimeExpired(String value) { // 这里拿服务器的时间来比较 return Long.parseLong(value) < serverTimeMillis(); } private boolean isTimeout(long start, long timeout) { // 这里拿本地的时间来比较 return start + timeout > System.currentTimeMillis(); } private long serverTimeMillis(){ return timeClient.currentTimeMillis(); } private long localTimeMillis() { return System.currentTimeMillis(); } }
如果将来还换一种实现方式(比如memcached,数据库之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0()方法即可(所谓抽象嘛)
测试
模拟全局ID增长器,设计一个IDGenerator类,该类负责生成全局递增ID,其代码如下:
package cc.lixiaohui.lock.example; import java.math.BigInteger; import java.util.concurrent.TimeUnit; import cc.lixiaohui.lock.Lock; import cc.lixiaohui.lock.Releasable; /** * 模拟分布式环境中的ID生成 * @author lixiaohui * */ public class IDGenerator implements Releasable{ private static BigInteger id = BigInteger.valueOf(0); private final Lock lock; private static final BigInteger INCREMENT = BigInteger.valueOf(1); public IDGenerator(Lock lock) { this.lock = lock; } public String getAndIncrement() { if (lock.tryLock(3, TimeUnit.SECONDS)) { try { // TODO 这里获取到锁, 访问临界区资源 System.out.println(Thread.currentThread().getName() + " get lock"); return getAndIncrement0(); } finally { lock.unlock(); } } return null; //return getAndIncrement0(); } public void release() { lock.release(); } private String getAndIncrement0() { String s = id.toString(); id = id.add(INCREMENT); return s; } }
测试主逻辑:同一个JVM内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取ID(我这里并不是死循环而是跑20s),获取到ID存到同一个Set里面,在存之前先检查该ID在set中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同JVM(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:
package cc.lixiaohui.DistributedLock.DistributedLock; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.HashSet; import java.util.Set; import org.junit.Test; import redis.clients.jedis.Jedis; import cc.lixiaohui.lock.Lock; import cc.lixiaohui.lock.RedisBasedDistributedLockV0_0; import cc.lixiaohui.lock.RedisBasedDistributedLock; import cc.lixiaohui.lock.example.IDGenerator; public class IDGeneratorTest { private static Set<String> generatedIds = new HashSet<String>(); private static final String LOCK_KEY = "lock.lock"; private static final long LOCK_EXPIRE = 5 * 1000; @Test public void testV1_0() throws Exception { SocketAddress addr = new InetSocketAddress("localhost", 9999); Jedis jedis1 = new Jedis("localhost", 6379); Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE, addr); IDGenerator g1 = new IDGenerator(lock1); IDConsumeTask consume1 = new IDConsumeTask(g1, "consume1"); Jedis jedis2 = new Jedis("localhost", 6379); Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE, addr); IDGenerator g2 = new IDGenerator(lock2); IDConsumeTask consume2 = new IDConsumeTask(g2, "consume2"); Thread t1 = new Thread(consume1); Thread t2 = new Thread(consume2); t1.start(); t2.start(); Thread.sleep(20 * 1000); //让两个线程跑20秒 IDConsumeTask.stopAll(); t1.join(); t2.join(); } static String time() { return String.valueOf(System.currentTimeMillis() / 1000); } static class IDConsumeTask implements Runnable { private IDGenerator idGenerator; private String name; private static volatile boolean stop; public IDConsumeTask(IDGenerator idGenerator, String name) { this.idGenerator = idGenerator; this.name = name; } public static void stopAll() { stop = true; } public void run() { System.out.println(time() + ": consume " + name + " start "); while (!stop) { String id = idGenerator.getAndIncrement(); if (id != null) { if(generatedIds.contains(id)) { System.out.println(time() + ": duplicate id generated, id = " + id); stop = true; continue; } generatedIds.add(id); System.out.println(time() + ": consume " + name + " add id = " + id); } } // 释放资源 idGenerator.release(); System.out.println(time() + ": consume " + name + " done "); } } }
说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。
测试结果
跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:
当IDGererator没有加锁(即IDGererator的getAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:
这个1秒都不到:
这个也1秒都不到:
改进
在Java 实现基于Redis的分布式可重入锁 中改进使其可重入
如有问题麻烦指正
相关推荐
Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...
本教程将深入探讨如何在SpringBoot应用中实现基于Redis的分布式锁。 首先,Redis之所以常被用作分布式锁的实现,是因为其具有以下优点: 1. **高可用性**:Redis支持主从复制,可以确保在单点故障时仍有服务可用。...
Redisson是基于Redis的Java客户端,它提供了丰富的数据结构和服务,包括分布式锁、信号量、队列、计数器等,极大地扩展了Redis在分布式系统中的应用能力。本篇文章将详细探讨如何使用Redisson实现Redis分布式事务锁...
Redis中的分布式锁实现通常基于`SETNX`命令或`SET`命令的`nx`与`ex`组合。`SETNX`命令用于设置键值,但如果键已经存在,则不执行任何操作,这可以确保锁的互斥性。`SET key value EX timeout NX`则同时设置了超时...
以下是一个简单的Java示例,展示了如何使用Jedis客户端库来实现Redis分布式锁。 ```java public class RedisLock { private JedisPool jedisPool; public RedisLock(JedisPool jedisPool) { this.jedisPool = ...
Redis 分布式锁工具包是为了解决在分布式系统中实现锁的问题,它提供了一种纯Java的方式来调用,使得在传统的Spring工程中可以轻松集成和使用。在现代的高并发、分布式环境下,单机锁已经无法满足需求,因为它们不能...
Java开发基于SpringBoot+WebSocket+Redis分布式即时通讯群聊系统。一个基于Spring Boot + WebSocket + Redis,可快速开发的分布式即时通讯群聊系统。适用于直播间聊天、游戏内聊天、客服聊天等临时性群聊场景。 ...
Java Redis分布式锁的正确实现方式详解 Java Redis分布式锁是指使用Redis实现的分布式锁机制,旨在解决分布式系统中的并发问题。分布式锁有三种实现方式:数据库乐观锁、基于Redis的分布式锁和基于ZooKeeper的...
在实现基于Redis的分布式锁时,通常会用到两个命令:NX(Not eXists)和EX(过期时间)。NX命令确保只有在键不存在时才能被设置,这样可以保证锁的互斥性。EX命令则是用来设置键的过期时间,保证锁可以在一段时间后...
### 基于Redis分布式锁实现“秒杀” #### 一、引言 在现代互联网应用中,“秒杀”作为一种常见的促销手段,被广泛应用于电商领域。为了保证系统的稳定性和公平性,在高并发环境下实现秒杀功能时,合理地利用分布式...
实现基于Redis的分布式锁,可以通过`SETNX`命令来设置键(key)并检查是否已存在,如果不存在则设置成功,表示获取锁。为了防止死锁,通常会为锁设置一个超时时间,利用`EXPIRE`命令进行设置。此外,解锁时需使用`...
分布式锁是一种在分布式系统中实现锁的机制,用于协调分布在不同节点上的进程对共享资源的访问。在单机系统中,我们通常使用线程锁(如Java中的`synchronized`关键字或`java.util.concurrent.locks.Lock`接口)来...
接下来,我们将实现基于Redis的分布式锁。分布式锁的主要目的是在多节点环境下确保同一时刻只有一个节点可以执行特定操作。以下是一个简单的分布式锁实现: ```java public class DistributedLock { private ...
"java基于jedisLock—redis分布式锁实现示例代码" java基于jedisLock—redis分布式锁实现示例代码主要介绍了jedisLock—redis分布式锁实现示例代码,以下是对标题和描述中所说的知识点的详细说明: 分布式锁是啥?...
通过学习和实践这个“redis分布式锁使用实例”,你可以深入理解如何在实际项目中使用 Redis 和 Redisson 来实现高效且可靠的分布式锁,这对于处理分布式系统中的并发控制至关重要。同时,这也为你提供了设计和实现...
Redisson是一个基于Redis的Java客户端,提供了丰富的数据结构和分布式服务,其中包括分布式锁的实现。下面我们将详细介绍如何使用Redisson创建和管理分布式锁。 #### 1. 添加Redisson依赖 在项目中引入Redisson的...
Redis作为一款高性能的键值存储系统,常被用作实现分布式锁的工具,而Redisson是基于Redis的Java客户端,提供了丰富的数据结构和服务功能,包括对分布式锁的支持。本文将深入探讨如何使用Redis和Redisson来构建...
分布式锁的实现方式有多种,包括数据库乐观锁、基于Redis的分布式锁以及基于ZooKeeper的分布式锁。 本文主要关注基于Redis的分布式锁。Redis是一个内存数据存储系统,由于其快速响应和丰富的数据结构,常被用来实现...
本篇文章将深入探讨如何通过注解的方式在Java应用中实现基于Redis的分布式锁。 首先,我们要理解什么是分布式锁。分布式锁是分布式系统中的一个概念,它允许多个节点在同一时刻只有一个能够执行特定的操作,以避免...
### Java基于Redis分布式消息队的报文过滤系统的设计与实现 #### 一、课题背景与研究意义 随着民用航空业的快速发展,特别是近年来各地新建或扩建民用机场的趋势,导致航空报文数量急剧增加。传统的报文处理系统...