业务场景多了,就应该把场景代码工具化,减少重复代码;趁周六在这里总结一下java实现的redis分布式锁代码;
使用的技术点:
1、redis函数setnx;
2、redis监控函数watch;
3、String.intern在jvm的内存操作,和String.intern的替代方案:guava;
4、@FunctionalInterface函数接口和lambda表达式的应用。
5、RedisTemplate;尽量使用RedisTemplate操作redis;直接操作jedispool人多的时候会乱。
reidis锁操作类
package com.framework.cache; import java.util.Random; import com.framework.cache.execption.RedisExecption; /** * redis锁操作类 * @author lyq * */ public class RedisLock { public static final String LOCKED = "TRUE"; public static final long MILLI_NANO_CONVERSION = 1000000L; public static final long DEFAULT_TIME_OUT = 1000L; public static final Random RANDOM = new Random(); public static final int EXPIRE = 1200; private String key; private String lockId; private boolean locked = false; public RedisLock(String key) { lockId = System.currentTimeMillis()+""; this.key = String.format("lock:%s", key); } /** * 加锁 * @param expireSeconds 超时时间,单位:秒 * @return 获取锁成功返回true;超时返回false;其它报异常 */ public boolean lock(int expireSeconds) { long nano = System.nanoTime(); long timeout= expireSeconds * 1000000000L; try { // 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。 if (RedisHelper.getExpire(this.key) == -1) { RedisHelper.expire(this.key, expireSeconds); } while (System.nanoTime() - nano < timeout) { if (RedisHelper.setnx(this.key, lockId)) { RedisHelper.expire(this.key, expireSeconds); this.locked = true; return this.locked; } Thread.sleep(3L, RANDOM.nextInt(500)); } } catch (Exception e) { throw new RedisExecption("Locking error", e); } return false; } public boolean lock() { return lock(2); } /** * 解锁 */ @SuppressWarnings("unchecked") public void unlock() { if (this.locked) { String _lockId = this.lockId; RedisHelper.getRedisTemplate().execute(connection->{ byte[] keyByte = RedisHelper.serialize(key); //开启watch之后,如果key的值被修改,则事务失败,exec方法返回null connection.watch(keyByte); String startTime = RedisHelper.unserialize(connection.get(keyByte), String.class); if (_lockId.equals(startTime)) { // 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。 connection.multi(); connection.del(keyByte); connection.exec(); } connection.unwatch(); return true; },true); } } public String getLockKey(){ return this.key; } }
redis对外帮助类
package com.framework.cache; import java.lang.reflect.Type; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import com.framework.cache.execption.RedisExecption; import com.framework.cache.execption.RedisLockTimeException; import com.framework.cache.lock.LockCallback; import com.framework.utils.NumUtil; import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import redis.clients.jedis.JedisPoolConfig; /** * redis对外静态帮助类 * @author lyq * */ public class RedisHelper { public static RedisTemplate redisTemplate = null; private static Interner<Object> pool = Interners.newWeakInterner(); private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd hh:mm:ss").create(); private static final String IGNORE_EXEC_KEY = "_ignoreExec"; /** * 加锁执行 * @param callBack 回调函数 * @param lockKey * @param expireSeconds 执行超时时间,单位:秒 * @return 返回callback回调函数执行结果 */ public static <T> T lockExec(LockCallback<T> callBack, String lockKey, Integer expireSeconds) { /** * redisLock其实就是规定时间内一直循环网络请求redis资源; * 为了防止并发的时候,太多线程循环抢redis锁;导致资源不足,需要对字符串加锁; * 字符加锁,需要使用intern;而1.7以后,String.intern的对象会一直存在jvm内存里面,大量使用会导致内存异常;替代方案就是使用guava的Interners.newWeakInterner()代替String.intern; * synchronized不支持过期,如果需要加过期时间,则使用ReentrantLock 的 tryLock(long timeout, TimeUnit unit)方法。 */ synchronized (pool.intern(lockKey)) { RedisLock lock = new RedisLock(lockKey); try { if(lock.lock(expireSeconds)){ return callBack.exec(); }else{ throw new RedisLockTimeException(lockKey,expireSeconds); } } catch (Exception e) { throw new RedisExecption(e); } finally { lock.unlock(); } } } /** * 获取超时时间 * @param key * @return 单位秒 */ public static Long getExpire(String key){ byte[] rawKey = RedisHelper.serialize(key); return (Long) redisTemplate.execute(connection -> connection.ttl(rawKey), true); } /** * 设置超时时间 * @param key * @param seconds 单位秒 * @return */ public static Boolean expire(String key,Integer seconds){ byte[] rawKey = RedisHelper.serialize(key); return (Boolean) redisTemplate.execute(connection -> connection.expire(rawKey,seconds), true); } /** * 尝试加锁并执行操作 如果加锁失败直接返回 如果加锁成功,则指定锁时间 * * @param action * 待执行的动作 * @param key * 加锁的key * @param expireSeconds * 锁超时时间,单位秒 * @param <T> * 泛型参数 * @return */ public static <T> T mutexExec(LockCallback<T> action, String key, int expireSeconds) { if (setnx(key + IGNORE_EXEC_KEY, "TRUE", expireSeconds)) { try { return action.exec(); } catch (Exception e) { throw new RedisExecption("ignoreExec error", e); } finally { del(key + IGNORE_EXEC_KEY); } } return null; } /** * 设置值成功返回true,key已存在则设置值失败,返回false * @param key * @param value * @param expireSeconds,设置值过期时间,即使设置值失败,也会一直覆盖过期是时间 * @return * @throws RedisExecption */ @SuppressWarnings({ "unchecked" }) public static boolean setnx(final String key, final Object value, final Integer expireSeconds) throws RedisExecption { if (StringUtils.isEmpty(key) || value == null) { throw new RedisExecption("key或value不能为空"); } return (Boolean) redisTemplate.execute(connection-> { try { Boolean result = false; connection.multi(); setnx(key, value, expireSeconds, connection); List<Object> list = connection.exec(); if (CollectionUtils.isNotEmpty(list)) { result = (Boolean) list.get(0); } return result; } catch (Exception e) { // connection.discard(); throw new RuntimeException(e); } }, true); } /** * 设置值成功返回true,key已存在则设置值失败,返回false * @param key * @param value * @return * @throws RedisExecption */ @SuppressWarnings({ "unchecked" }) public static boolean setnx(final String key, final Object value) throws RedisExecption { if (StringUtils.isEmpty(key) || value == null) { throw new RedisExecption("key或value不能为空"); } return (Boolean) redisTemplate.execute(connection-> { try { Boolean result = false; connection.multi(); setnx(key, value, -1, connection); List<Object> list = connection.exec(); if (CollectionUtils.isNotEmpty(list)) { result = (Boolean) list.get(0); } return result; } catch (Exception e) { // connection.discard(); throw new RuntimeException(e); } }, true); } /** * 事务执行,不返回 * @param key * @param value * @param expireSeconds * @param connection */ public static void setnx(final String key, final Object value, final Integer expireSeconds, final RedisConnection connection) { byte[] byteKey = serialize(key); connection.setNX(byteKey, serialize(value)); if (NumUtil.intValue(expireSeconds) != -1) { connection.expire(byteKey, expireSeconds); } } /** * 删除键 * @param key * @return * @throws RedisExecption */ @SuppressWarnings("unchecked") public static Long del(final String key) throws RedisExecption { return (Long) redisTemplate.execute(connection-> del(key, connection), true); } public static Long del(final String key, final RedisConnection connection) { try { return connection.del(serialize(key)); } catch (Exception e) { throw new RuntimeException(e); } } /** * 序列化 * @param value * @return */ public static byte[] serialize(Object value) { String valStr = getJsonStr(value); return redisTemplate.getStringSerializer().serialize(valStr); } /** * 返序列化 * @param bytes * @param clz * @return */ public static <T> T unserialize(byte[] bytes, Class<T> clz) { // String valStr = getJsonStr); Object json = redisTemplate.getStringSerializer().deserialize(bytes); if (json == null) { return null; } if (clz == null || clz.getName().equals("java.lang.String")) { return (T) json; } else { return gson.fromJson(json.toString(), clz); } } /** * private static java.lang.reflect.Type imgType = new TypeToken<ArrayList * <T>>() {}.getType(); * * @param bytes * @param type * @return */ public static <T> T unserialize(byte[] bytes, Type type) { // String valStr = getJsonStr); Object json = redisTemplate.getStringSerializer().deserialize(bytes); if (json == null) { return null; } if (type == null) { return (T) json; } else { return gson.fromJson(json.toString(), type); } } private static String getJsonStr(Object value) { String valStr = null; if (value == null) { valStr = null; } if (value instanceof String) { valStr = value.toString(); } else if (value instanceof Number) { valStr = value + ""; } else { valStr = gson.toJson(value); } return valStr; } @SuppressWarnings("rawtypes") public static RedisTemplate getRedisTemplate() { return redisTemplate; } @SuppressWarnings({ "rawtypes", "static-access" }) public void setRedisTemplate(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } }
redis异常处理类
package com.framework.cache.execption; import java.io.PrintWriter; import java.io.StringWriter; /** * redis异常类,redis执行异常报错 * @author lyq * */ public class RedisExecption extends RuntimeException { private static final long serialVersionUID = 155L; //private Logger log = Logger.getLogger(this.getClass()); public RedisExecption() { } public RedisExecption(String message) { super(message); } public RedisExecption(Throwable cause) { super(cause); } public RedisExecption(String message, Throwable cause) { super(message, cause); } /** * 以字符串形式返回异常堆栈信息 * @param e * @return 异常堆栈信息字符串 */ public static String getStackTrace(Exception e) { StringWriter writer = new StringWriter(); e.printStackTrace(new PrintWriter(writer,true)); return writer.toString(); } }
分布式锁超时异常
package com.framework.cache.execption; /** * 分布式锁超时报错 * @author lyq * */ public class RedisLockTimeException extends RedisExecption { /** * 超时报错 * @param expireSeconds 单位秒 */ private static final long serialVersionUID = 1L; public RedisLockTimeException(String lockKey,Integer expireSeconds) { super(String.format("分布式锁 ‘%s’等待超时: %ss",lockKey,expireSeconds)); } }
分布式锁测试类
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import com.framework.cache.RedisHelper; import redis.clients.jedis.JedisPoolConfig; public class RedisTest { public static void main(String[] args) { RedisTemplate t = new RedisTemplate(); // InputStream is = // RedisHelper.class.getClassLoader().getResourceAsStream("redis.properties"); String hostname = ""; String password = ""; String port = ""; // try { // prop.load(is); // } catch (IOException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } /*** 热备start ***/ // String sentinelHost = "localhost"; // int sentinelport = 26379; // RedisSentinelConfiguration sentinelConfiguration = new // RedisSentinelConfiguration(); // sentinelConfiguration.setMaster("mymaster"); // sentinelConfiguration.sentinel(sentinelHost, sentinelport); // JedisConnectionFactory connectionFactory = new // JedisConnectionFactory(sentinelConfiguration); /*** 热备end ***/ /*** 单机start ***/ // hostname = "120.25.226.230"; hostname = "127.0.0.1"; password = ""; port = "6379"; JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setDatabase(5); connectionFactory.setHostName(hostname); connectionFactory.setPort(Integer.parseInt(port)); /*** 单机end ***/ JedisPoolConfig config = new redis.clients.jedis.JedisPoolConfig(); config.setMaxTotal(5000); config.setMaxIdle(200); config.setMaxWaitMillis(5000); config.setTestOnBorrow(true); connectionFactory.afterPropertiesSet(); t.setConnectionFactory(connectionFactory); connectionFactory.setPoolConfig(config); t.afterPropertiesSet(); RedisHelper.redisTemplate = t; /************************************ * 分布式锁测试 start *****************************************/ ThreadPoolExecutor executor = null; // 有限队列,缓存30个请求等待,线程池默认策略,缓存超出上限报错 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(30); // 初始化30个线程,最多30个线程 executor = new ThreadPoolExecutor(30, 30, 10, TimeUnit.SECONDS, workQueue, new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("redis-lock-thread"); return thread; } }); CyclicBarrier barrier = new CyclicBarrier(30); for (Integer i = 0; i < 30; i++) { final int _i = i; executor.execute(() -> { try { barrier.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } //锁等待10秒超时 RedisHelper.lockExec(() -> { System.out.println(_i + "====="); try { Thread.sleep(2000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return 2; }, "test",10); }); } /************************************ * 分布式锁测试 end *****************************************/ } }
相关推荐
Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...
以下是一个简单的C# Redis分布式锁实现: ```csharp public bool TryAcquireLock(string lockKey, int lockTimeout) { var redis = ConnectionMultiplexer.Connect("localhost:6379"); var db = redis.Get...
现在很多项目单机版已经不满足了,分布式变得越受欢迎,同时也带来很多问题,分布式锁也变得没那么容易实现,分享一个redis分布式锁工具类,里面的加锁采用lua脚本(脚本比较简单,采用java代码实现,无须外部调用...
redis实现分布式锁(java/jedis),其中包含工具方法以及使用demo 本资源是利用java的jedis实现 redis实现分布式锁(java/jedis),其中包含工具方法以及使用demo 本资源是利用java的jedis实现
在实际项目中,为了提高代码的健壮性和可维护性,通常会封装一个 Redis 分布式锁的客户端库,例如 Java 中的 Jedis 或 lettuce 库。这些库提供了更高级别的接口,隐藏了底层的 Redis 操作细节,使开发者更容易地使用...
3. **简单的API**:Redis提供了丰富的命令集,如`SETNX`、`EXPIRE`等,方便实现锁的逻辑。 要在SpringBoot中集成Redis,你需要先在项目中添加对应的依赖。SpringBoot提供了对Redis的自动配置支持,只需在`pom.xml`...
本篇文章将详细探讨如何使用Redisson实现Redis分布式事务锁,以及在Spring Boot环境中如何进行集成。 首先,Redis作为一个内存数据库,其高速读写性能使其成为实现分布式锁的理想选择。分布式锁的主要作用是在多...
Java Redis分布式锁的正确实现方式详解 Java Redis分布式锁是指使用Redis实现的分布式锁机制,旨在解决分布式系统中的并发问题。分布式锁有三种实现方式:数据库乐观锁、基于Redis的分布式锁和基于ZooKeeper的...
本文将深入探讨如何使用Redis实现分布式锁,以及如何利用自旋式加锁和Lua脚本实现原子性解锁。 首先,我们来理解分布式锁的基本概念。分布式锁是在多节点之间共享资源时,用于协调各个节点的访问控制机制。在分布式...
Redis 分布式锁工具包是为了解决在分布式系统中实现锁的问题,它提供了一种纯Java的方式来调用,使得在传统的Spring工程中可以轻松集成和使用。在现代的高并发、分布式环境下,单机锁已经无法满足需求,因为它们不能...
在IT行业中,尤其是在高并发的电子商务系统中,"redis分布式锁实现抢单秒杀"是一个常见的挑战。这个场景模拟了多个用户同时参与秒杀活动,系统需要确保库存的准确性和抢单的公平性,避免超卖和数据不一致的问题。...
2. Redlock:由Redis作者Antirez提出的分布式锁算法,通过在多台Redis实例上实现锁来提高可用性和容错性。 综上所述,Redis是实现分布式锁的理想选择,其高效、原子性的特性使得在分布式系统中控制共享资源成为可能...
Java开发基于SpringBoot+WebSocket+Redis分布式即时通讯群聊系统。一个基于Spring Boot + WebSocket + Redis,可快速开发的分布式即时通讯群聊系统。适用于直播间聊天、游戏内聊天、客服聊天等临时性群聊场景。 ...
redis分布式锁的工具类,采用的是Lua代码的方式,保证了Java执行方法的原子性。
redis 分布式锁java 实现
java面试题Redis分布式锁+zk 分布式锁.md
Redis分布式锁工具包简化了这一过程,提供了封装好的API,使开发者能够快速、安全地在代码中实现锁的功能。 该工具包可能包含以下核心功能: 1. **锁的获取与释放**:工具包通常会提供一个`lock()`方法用于获取锁...
在学习Java多线程编程的时候,锁是一个很重要也很基础的概念,锁可以看成是多... 掌握redis分布式锁的实现原理 掌握redis分布式锁在微服务项目中的应用 掌握redis分布式锁常见的面试题 以下是课程部分讲义截图:
以下是一个简单的Java示例,展示了如何使用Jedis客户端库来实现Redis分布式锁。 ```java public class RedisLock { private JedisPool jedisPool; public RedisLock(JedisPool jedisPool) { this.jedisPool = ...