`

java实现的redis分布式锁

阅读更多

业务场景多了,就应该把场景代码工具化,减少重复代码;趁周六在这里总结一下java实现的redis分布式锁代码;

 

使用的技术点:

1、redis函数setnx;

2、redis监控函数watch;

3、String.intern在jvm的内存操作,和String.intern的替代方案:guava;

4、@FunctionalInterface函数接口和lambda表达式的应用。

5、RedisTemplate;尽量使用RedisTemplate操作redis;直接操作jedispool人多的时候会乱。

 

 

reidis锁操作类

 

package com.framework.cache;

import java.util.Random;

import com.framework.cache.execption.RedisExecption;

/**
 * redis锁操作类
 * @author lyq
 *
 */
public class RedisLock {
	public static final String LOCKED = "TRUE";
	public static final long MILLI_NANO_CONVERSION = 1000000L;
	public static final long DEFAULT_TIME_OUT = 1000L;
	public static final Random RANDOM = new Random();
	public static final int EXPIRE = 1200;
	private String key;
	private String lockId;

	private boolean locked = false;

	public RedisLock(String key) {
		lockId = System.currentTimeMillis()+"";
		this.key = String.format("lock:%s", key);
	}



	/**
	 *  加锁
	 * @param expireSeconds 超时时间,单位:秒
	 * @return 获取锁成功返回true;超时返回false;其它报异常
	 */
	public boolean lock(int expireSeconds) {

		long nano = System.nanoTime();
		long timeout=  expireSeconds * 1000000000L;
		try {
			// 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。
			if (RedisHelper.getExpire(this.key) == -1) {
				RedisHelper.expire(this.key, expireSeconds);
			}
			while (System.nanoTime() - nano < timeout) {
				if (RedisHelper.setnx(this.key, lockId)) {
					RedisHelper.expire(this.key, expireSeconds);
					this.locked = true;
					return this.locked;
				}
				Thread.sleep(3L, RANDOM.nextInt(500));
			}
		} catch (Exception e) {
			throw new RedisExecption("Locking error", e);
		}
		return false;
	}

	public boolean lock() {
		return lock(2);
	}

	/**
	 * 解锁
	 */
	@SuppressWarnings("unchecked")
	public void unlock() {
		if (this.locked) {
			String _lockId = this.lockId;
			
			RedisHelper.getRedisTemplate().execute(connection->{
				byte[] keyByte = RedisHelper.serialize(key);
				//开启watch之后,如果key的值被修改,则事务失败,exec方法返回null
				connection.watch(keyByte);
				String startTime = RedisHelper.unserialize(connection.get(keyByte), String.class);
				if (_lockId.equals(startTime)) { // 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。
					connection.multi();
					connection.del(keyByte);
					connection.exec();
				}
				connection.unwatch();
				return true;
			},true);
			
		}
	}
	
	public String getLockKey(){
		return this.key;
	}

}

 

 

 

 

redis对外帮助类

 

package com.framework.cache;

import java.lang.reflect.Type;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

import com.framework.cache.execption.RedisExecption;
import com.framework.cache.execption.RedisLockTimeException;
import com.framework.cache.lock.LockCallback;
import com.framework.utils.NumUtil;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import redis.clients.jedis.JedisPoolConfig;

/**
 * redis对外静态帮助类
 * @author lyq
 *
 */
public class RedisHelper {
	public static RedisTemplate redisTemplate = null;

	private static Interner<Object> pool = Interners.newWeakInterner();

	private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd hh:mm:ss").create();

	private static final String IGNORE_EXEC_KEY = "_ignoreExec";

	/**
	 * 加锁执行
	 * @param callBack 回调函数
	 * @param lockKey 
	 * @param expireSeconds 执行超时时间,单位:秒
	 * @return 返回callback回调函数执行结果
	 */
	public static <T> T lockExec(LockCallback<T> callBack, String lockKey, Integer expireSeconds) {
		/**
		 * redisLock其实就是规定时间内一直循环网络请求redis资源;
		 * 为了防止并发的时候,太多线程循环抢redis锁;导致资源不足,需要对字符串加锁;
		 * 字符加锁,需要使用intern;而1.7以后,String.intern的对象会一直存在jvm内存里面,大量使用会导致内存异常;替代方案就是使用guava的Interners.newWeakInterner()代替String.intern;
		 * synchronized不支持过期,如果需要加过期时间,则使用ReentrantLock 的 tryLock(long timeout, TimeUnit unit)方法。
                 */
		synchronized (pool.intern(lockKey)) {
			RedisLock lock = new RedisLock(lockKey);
			try {
				if(lock.lock(expireSeconds)){
					return callBack.exec();
				}else{
					throw new RedisLockTimeException(lockKey,expireSeconds);
				}
			} catch (Exception e) {
				throw new RedisExecption(e);
			} finally {
				lock.unlock();
			}
		}
	}



	
	/**
	 * 获取超时时间
	 * @param key 
	 * @return 单位秒
	 */
	public static Long getExpire(String key){
		byte[] rawKey = RedisHelper.serialize(key);
		return (Long) redisTemplate.execute(connection -> connection.ttl(rawKey), true);
	}
	
	/**
	 * 设置超时时间
	 * @param key
	 * @param seconds 单位秒
	 * @return
	 */
	public static Boolean expire(String key,Integer seconds){
		byte[] rawKey = RedisHelper.serialize(key);
		return (Boolean) redisTemplate.execute(connection -> connection.expire(rawKey,seconds), true);
	}
	


	/**
	 * 尝试加锁并执行操作 如果加锁失败直接返回 如果加锁成功,则指定锁时间
	 *
	 * @param action
	 *            待执行的动作
	 * @param key
	 *            加锁的key
	 * @param expireSeconds
	 *            锁超时时间,单位秒
	 * @param <T>
	 *            泛型参数
	 * @return
	 */
	public static <T> T mutexExec(LockCallback<T> action, String key, int expireSeconds) {

		if (setnx(key + IGNORE_EXEC_KEY, "TRUE", expireSeconds)) {
			try {
				return action.exec();
			} catch (Exception e) {
				throw new RedisExecption("ignoreExec error", e);
			} finally {
				del(key + IGNORE_EXEC_KEY);
			}
		}
		return null;
	}



/**
 * 设置值成功返回true,key已存在则设置值失败,返回false
 * @param key
 * @param value
 * @param expireSeconds,设置值过期时间,即使设置值失败,也会一直覆盖过期是时间
 * @return
 * @throws RedisExecption
 */
	@SuppressWarnings({ "unchecked" })
	public static boolean setnx(final String key, final Object value, final Integer expireSeconds)
			throws RedisExecption {
		if (StringUtils.isEmpty(key) || value == null) {
			throw new RedisExecption("key或value不能为空");
		}
		return (Boolean) redisTemplate.execute(connection-> {
				try {
					Boolean result = false;
					connection.multi();
					setnx(key, value, expireSeconds, connection);
					List<Object> list = connection.exec();
					if (CollectionUtils.isNotEmpty(list)) {
						result = (Boolean) list.get(0);
					}
					return result;
				} catch (Exception e) {
					// connection.discard();
					throw new RuntimeException(e);
				}
			

		}, true);
	}

	/**
	 * 设置值成功返回true,key已存在则设置值失败,返回false
	 * @param key
	 * @param value
	 * @return
	 * @throws RedisExecption
	 */
	@SuppressWarnings({ "unchecked" })
	public static boolean setnx(final String key, final Object value) throws RedisExecption {
		if (StringUtils.isEmpty(key) || value == null) {
			throw new RedisExecption("key或value不能为空");
		}
		return (Boolean) redisTemplate.execute(connection-> {
				try {
					Boolean result = false;
					connection.multi();
					 setnx(key, value, -1, connection);
					List<Object> list = connection.exec();
					if (CollectionUtils.isNotEmpty(list)) {
						result = (Boolean) list.get(0);
					}
					return result;
				} catch (Exception e) {
					// connection.discard();
					throw new RuntimeException(e);
				}

		}, true);
	}
	/**
	 * 事务执行,不返回
	 * @param key
	 * @param value
	 * @param expireSeconds
	 * @param connection
	 */
	public static void setnx(final String key, final Object value, final Integer expireSeconds,
			final RedisConnection connection) {
		byte[] byteKey = serialize(key);

		connection.setNX(byteKey, serialize(value));
		if (NumUtil.intValue(expireSeconds) != -1) {
			connection.expire(byteKey, expireSeconds);
		}
	}

	/**
	 * 删除键
	 * @param key
	 * @return
	 * @throws RedisExecption
	 */
	@SuppressWarnings("unchecked")
	public static Long del(final String key) throws RedisExecption {
		return (Long) redisTemplate.execute(connection-> del(key, connection), true);
	}

	public static Long del(final String key, final RedisConnection connection) {
		try {
			return connection.del(serialize(key));
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * 序列化
	 * @param value
	 * @return
	 */
	public static byte[] serialize(Object value) {
		String valStr = getJsonStr(value);
		return redisTemplate.getStringSerializer().serialize(valStr);
	}

	/**
	 * 返序列化
	 * @param bytes
	 * @param clz
	 * @return
	 */
	public static <T> T unserialize(byte[] bytes, Class<T> clz) {
		// String valStr = getJsonStr);

		Object json = redisTemplate.getStringSerializer().deserialize(bytes);
		if (json == null) {
			return null;
		}
		if (clz == null || clz.getName().equals("java.lang.String")) {
			return (T) json;
		} else {
			return gson.fromJson(json.toString(), clz);
		}
	}

	/**
	 * private static java.lang.reflect.Type imgType = new TypeToken<ArrayList
	 * <T>>() {}.getType();
	 *
	 * @param bytes
	 * @param type
	 * @return
	 */
	public static <T> T unserialize(byte[] bytes, Type type) {
		// String valStr = getJsonStr);

		Object json = redisTemplate.getStringSerializer().deserialize(bytes);
		if (json == null) {
			return null;
		}
		if (type == null) {
			return (T) json;
		} else {
			return gson.fromJson(json.toString(), type);
		}
	}



	private static String getJsonStr(Object value) {
		String valStr = null;
		if (value == null) {
			valStr = null;
		}

		if (value instanceof String) {
			valStr = value.toString();
		} else if (value instanceof Number) {
			valStr = value + "";
		} else {
			valStr = gson.toJson(value);
		}
		return valStr;
	}

	@SuppressWarnings("rawtypes")
	public static RedisTemplate getRedisTemplate() {
		return redisTemplate;
	}

	@SuppressWarnings({ "rawtypes", "static-access" })
	public void setRedisTemplate(RedisTemplate redisTemplate) {
		this.redisTemplate = redisTemplate;
	}
	

}

 

 

 

 

 

redis异常处理类

 

package com.framework.cache.execption;

import java.io.PrintWriter;
import java.io.StringWriter;

/**
 * redis异常类,redis执行异常报错
 * @author lyq
 *
 */
public class RedisExecption extends RuntimeException {
private static final long serialVersionUID = 155L;
//private Logger log =  Logger.getLogger(this.getClass());
	public RedisExecption() {
	}

	public RedisExecption(String message) {
		super(message);
	}

	public RedisExecption(Throwable cause) {
		super(cause);
	}

	public RedisExecption(String message, Throwable cause) {
		super(message, cause);
	}
	
	/** 
	  * 以字符串形式返回异常堆栈信息 
	  * @param e 
	  * @return 异常堆栈信息字符串 
	  */ 
	public static String getStackTrace(Exception e) { 
	  StringWriter writer = new StringWriter(); 
	  e.printStackTrace(new PrintWriter(writer,true)); 
	  return writer.toString(); 
	} 
}

 

 

分布式锁超时异常

 

package com.framework.cache.execption;

/**
 * 分布式锁超时报错
 * @author lyq
 *
 */
public class RedisLockTimeException extends RedisExecption {

	/**
	 * 超时报错
	 * @param expireSeconds 单位秒
	 */
	private static final long serialVersionUID = 1L;

	public RedisLockTimeException(String lockKey,Integer expireSeconds) {
		super(String.format("分布式锁 ‘%s’等待超时: %ss",lockKey,expireSeconds));
	}
}

 

 分布式锁测试类

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

import com.framework.cache.RedisHelper;

import redis.clients.jedis.JedisPoolConfig;

public class RedisTest {
	public static void main(String[] args) {
		RedisTemplate t = new RedisTemplate();

		// InputStream is =
		// RedisHelper.class.getClassLoader().getResourceAsStream("redis.properties");
		String hostname = "";
		String password = "";
		String port = "";
		// try {
		// prop.load(is);
		

		// } catch (IOException e) {
		// // TODO Auto-generated catch block
		// e.printStackTrace();
		// }

		/*** 热备start ***/
		// String sentinelHost = "localhost";
		// int sentinelport = 26379;
		// RedisSentinelConfiguration sentinelConfiguration = new
		// RedisSentinelConfiguration();
		// sentinelConfiguration.setMaster("mymaster");
		// sentinelConfiguration.sentinel(sentinelHost, sentinelport);
		// JedisConnectionFactory connectionFactory = new
		// JedisConnectionFactory(sentinelConfiguration);
		/*** 热备end ***/

		/*** 单机start ***/
		// hostname = "120.25.226.230";
		hostname = "127.0.0.1";
		password = "";
		
		port = "6379";
		JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
		connectionFactory.setPassword(password);
		connectionFactory.setDatabase(5);
		connectionFactory.setHostName(hostname);
		connectionFactory.setPort(Integer.parseInt(port));
		/*** 单机end ***/
		JedisPoolConfig config = new redis.clients.jedis.JedisPoolConfig();
		config.setMaxTotal(5000);
		config.setMaxIdle(200);
		config.setMaxWaitMillis(5000);
		config.setTestOnBorrow(true);
		connectionFactory.afterPropertiesSet();
		t.setConnectionFactory(connectionFactory);
		connectionFactory.setPoolConfig(config);
		t.afterPropertiesSet();
		RedisHelper.redisTemplate = t;

	
		/************************************
		 * 分布式锁测试 start
		 *****************************************/

		ThreadPoolExecutor executor = null;
		// 有限队列,缓存30个请求等待,线程池默认策略,缓存超出上限报错
		BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(30);
		// 初始化30个线程,最多30个线程
		executor = new ThreadPoolExecutor(30, 30, 10, TimeUnit.SECONDS, workQueue, new ThreadFactory() {

			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("redis-lock-thread");
				return thread;
			}
		});

		
		CyclicBarrier barrier = new CyclicBarrier(30);
		for (Integer i = 0; i < 30; i++) {
			final int _i = i;
			executor.execute(() -> {
				try {
					barrier.await();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
                        //锁等待10秒超时
				RedisHelper.lockExec(() -> {
					System.out.println(_i + "=====");
					try {
						Thread.sleep(2000);
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					return 2;
				}, "test",10);
			});

		}
		/************************************
		 * 分布式锁测试 end
		 *****************************************/

	
	}
}

 

 

分享到:
评论

相关推荐

    Java基于redis实现分布式锁代码实例

    Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...

    C#实操控制并发之Lock和Redis分布式锁

    以下是一个简单的C# Redis分布式锁实现: ```csharp public bool TryAcquireLock(string lockKey, int lockTimeout) { var redis = ConnectionMultiplexer.Connect("localhost:6379"); var db = redis.Get...

    redis分布式锁工具类

    现在很多项目单机版已经不满足了,分布式变得越受欢迎,同时也带来很多问题,分布式锁也变得没那么容易实现,分享一个redis分布式锁工具类,里面的加锁采用lua脚本(脚本比较简单,采用java代码实现,无须外部调用...

    redis实现分布式锁(java/jedis)

    redis实现分布式锁(java/jedis),其中包含工具方法以及使用demo 本资源是利用java的jedis实现 redis实现分布式锁(java/jedis),其中包含工具方法以及使用demo 本资源是利用java的jedis实现

    redis分布式锁.zip

    在实际项目中,为了提高代码的健壮性和可维护性,通常会封装一个 Redis 分布式锁的客户端库,例如 Java 中的 Jedis 或 lettuce 库。这些库提供了更高级别的接口,隐藏了底层的 Redis 操作细节,使开发者更容易地使用...

    springboot基于redis分布式锁

    3. **简单的API**:Redis提供了丰富的命令集,如`SETNX`、`EXPIRE`等,方便实现锁的逻辑。 要在SpringBoot中集成Redis,你需要先在项目中添加对应的依赖。SpringBoot提供了对Redis的自动配置支持,只需在`pom.xml`...

    记录redisson实现redis分布式事务锁

    本篇文章将详细探讨如何使用Redisson实现Redis分布式事务锁,以及在Spring Boot环境中如何进行集成。 首先,Redis作为一个内存数据库,其高速读写性能使其成为实现分布式锁的理想选择。分布式锁的主要作用是在多...

    redis实现分布式锁,自旋式加锁,lua原子性解锁

    本文将深入探讨如何使用Redis实现分布式锁,以及如何利用自旋式加锁和Lua脚本实现原子性解锁。 首先,我们来理解分布式锁的基本概念。分布式锁是在多节点之间共享资源时,用于协调各个节点的访问控制机制。在分布式...

    Java Redis分布式锁的正确实现方式详解

    Java Redis分布式锁的正确实现方式详解 Java Redis分布式锁是指使用Redis实现的分布式锁机制,旨在解决分布式系统中的并发问题。分布式锁有三种实现方式:数据库乐观锁、基于Redis的分布式锁和基于ZooKeeper的...

    redis分布式锁工具包,提供纯Java方式调用,支持传统Spring工程.zip

    Redis 分布式锁工具包是为了解决在分布式系统中实现锁的问题,它提供了一种纯Java的方式来调用,使得在传统的Spring工程中可以轻松集成和使用。在现代的高并发、分布式环境下,单机锁已经无法满足需求,因为它们不能...

    redis分布式锁实现抢单秒杀

    在IT行业中,尤其是在高并发的电子商务系统中,"redis分布式锁实现抢单秒杀"是一个常见的挑战。这个场景模拟了多个用户同时参与秒杀活动,系统需要确保库存的准确性和抢单的公平性,避免超卖和数据不一致的问题。...

    用Redis实现分布式锁_redis_分布式_

    2. Redlock:由Redis作者Antirez提出的分布式锁算法,通过在多台Redis实例上实现锁来提高可用性和容错性。 综上所述,Redis是实现分布式锁的理想选择,其高效、原子性的特性使得在分布式系统中控制共享资源成为可能...

    java开发基于SpringBoot+WebSocket+Redis分布式即时通讯群聊系统.zip

    Java开发基于SpringBoot+WebSocket+Redis分布式即时通讯群聊系统。一个基于Spring Boot + WebSocket + Redis,可快速开发的分布式即时通讯群聊系统。适用于直播间聊天、游戏内聊天、客服聊天等临时性群聊场景。 ...

    redis分布式锁实现类

    redis分布式锁的工具类,采用的是Lua代码的方式,保证了Java执行方法的原子性。

    redis 分布式锁java 实现

    redis 分布式锁java 实现

    java面试题Redis分布式锁+zk 分布式锁.md

    java面试题Redis分布式锁+zk 分布式锁.md

    redis分布式锁工具包提供纯Java方式调用

    Redis分布式锁工具包简化了这一过程,提供了封装好的API,使开发者能够快速、安全地在代码中实现锁的功能。 该工具包可能包含以下核心功能: 1. **锁的获取与释放**:工具包通常会提供一个`lock()`方法用于获取锁...

    Redis实现分布式锁(附源码+讲义)

    在学习Java多线程编程的时候,锁是一个很重要也很基础的概念,锁可以看成是多... 掌握redis分布式锁的实现原理  掌握redis分布式锁在微服务项目中的应用  掌握redis分布式锁常见的面试题 以下是课程部分讲义截图:

    基于Redis方式实现分布式锁

    以下是一个简单的Java示例,展示了如何使用Jedis客户端库来实现Redis分布式锁。 ```java public class RedisLock { private JedisPool jedisPool; public RedisLock(JedisPool jedisPool) { this.jedisPool = ...

Global site tag (gtag.js) - Google Analytics