上一篇介绍了分布式锁的概念、作用、基本原理(http://guwq2014.iteye.com/blog/2365658),
这一篇看看如何使用redis实现一个分布式锁:
第一步:分布式锁实现类:
import redis.clients.jedis.ShardedJedis; import com.suning.framework.sedis.ShardedJedisAction; import com.suning.framework.sedis.impl.ShardedJedisClientImpl; /** * 基于redis实现的分布式锁 * * @author guweiqiang */ public class DistributedSedisLock { private ShardedJedisClientImpl jedisClient; // jedis client private String lockKey; // 锁的redis key private int expireMsecs = 60 * 1000; // 锁超时,防止线程在入锁以后,无限的执行等待 private int timeoutMsecs = 10 * 1000; // 锁等待,防止线程饥饿 private boolean locked = false;// 拿到锁的标示:true表示拿到了锁 private static final long DEFAULT_SLEEP_TIME = 100; // 线程睡眠时间100毫秒 /*********************构造方法 start************************************/ public DistributedSedisLock(ShardedJedisClientImpl jedisClient, String lockKey) { this.jedisClient = jedisClient; this.lockKey = lockKey; } public DistributedSedisLock(ShardedJedisClientImpl jedisClient, String lockKey, int timeoutMsecs) { this(jedisClient, lockKey); this.timeoutMsecs = timeoutMsecs; } public DistributedSedisLock(ShardedJedisClientImpl jedisClient, String lockKey, int timeoutMsecs, int expireMsecs) { this(jedisClient, lockKey, timeoutMsecs); this.expireMsecs = expireMsecs; } /*********************构造方法 end************************************/ public String getLockKey() { return lockKey; } /** * 判断是否拿到了锁(对外提供的获取锁的方法) * @return true:拿到了锁;false:没有拿到锁 * @throws InterruptedException */ public synchronized boolean acquire() throws InterruptedException { return acquire(jedisClient); } /** * 判断是否拿到了锁 * @param redisClient * @return true:拿到了锁;false:没有拿到锁 * @throws InterruptedException */ private synchronized boolean acquire(ShardedJedisClientImpl jedisClient) throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0) { long expires = System.currentTimeMillis() + expireMsecs + 1; final String expiresStr = String.valueOf(expires); // 锁到期时间 // 加锁 Long setnxResult = jedisClient.execute(new ShardedJedisAction<Long>() { public Long doAction(ShardedJedis jedis) { return jedis.setnx(lockKey, expiresStr); } }); if (setnxResult!=null && setnxResult.intValue()==1) { // setnx返回1,表示设置成功 // lock acquired success locked = true; return true; } // 获取redis里的时间 String currentValueStr = jedisClient.execute(new ShardedJedisAction<String>() { public String doAction(ShardedJedis jedis) { return jedis.get(lockKey); } }); if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { // 判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的 // lock is expired String oldValueStr = jedisClient.execute(new ShardedJedisAction<String>() { public String doAction(ShardedJedis jedis) { return jedis.getSet(lockKey, expiresStr); } }); // 获取上一个锁到期时间,并设置现在的锁到期时间, // 只有一个线程才能获取上一个线程的设置时间,因为jedis.getSet是同步的(原子的) if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { // 如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁 // lock acquired locked = true; return true; } } timeout -= DEFAULT_SLEEP_TIME; Thread.sleep(DEFAULT_SLEEP_TIME); } return false; } /** * 释放锁(对外提供的释放锁的方法) */ public synchronized void release() { release(jedisClient); } /** * 释放锁 */ private synchronized void release(ShardedJedisClientImpl jedisClient) { if (locked) { jedisClient.execute(new ShardedJedisAction<Long>() { public Long doAction(ShardedJedis jedis) { return jedis.del(lockKey); } }); locked = false; } } }
第二步:对外暴露的使用工具类:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.suning.framework.sedis.impl.ShardedJedisClientImpl; /** * 分布式锁使用工具类 * * @author guweiqiang */ public class DistributedLock { private static final Logger LOGGER = LoggerFactory.getLogger(DistributedLock.class.getName()); private DistributedSedisLock distributedSedisLock; // 分布式锁 private static ShardedJedisClientImpl jedisClient; // jedis client private String lockKey; // 锁的redis key private int expireMsecs; // 锁超时,防止线程在入锁以后,无限的执行等待 private int timeoutMsecs; // 锁等待,防止线程饥饿 public DistributedLock(String lockKey){ this(lockKey, 3000, 300000); } public DistributedLock(String lockKey, int timeoutMsecs, int expireMsecs){ this.lockKey = "YEB:BYUTIL:SHP:LOCK:" + lockKey; this.timeoutMsecs = timeoutMsecs; this.expireMsecs = expireMsecs; this.distributedSedisLock = new DistributedSedisLock(jedisClient, this.lockKey.intern(), timeoutMsecs, expireMsecs); } /** * 线程包装 */ public void wrap(Runnable runnable){ long begin = System.currentTimeMillis(); try { // timeout超时,等待入锁的时间,设置为3秒;expiration过期,锁存在的时间设置为5分钟 LOGGER.info("begin logck,lockKey={},timeoutMsecs={},expireMsecs={}", lockKey, timeoutMsecs, expireMsecs); if(distributedSedisLock.acquire()){ // 拿到了锁,执行线程任务 runnable.run(); } else { LOGGER.info("The time wait for lock more than [{}] ms ", timeoutMsecs); } } catch(Exception e){ LOGGER.error("acquire lock Exception ", e); } finally { LOGGER.info("[{}]cost={}", lockKey, System.currentTimeMillis() - begin); // 释放锁 if(distributedSedisLock!=null){ distributedSedisLock.release(); } } } /** * 初始化jedisClient * @param jedisClient */ public static synchronized void setShardedJedisClient(ShardedJedisClientImpl jedisClient) { DistributedLock.jedisClient = jedisClient; } }
在配置一个监听器,用来初始化jedis client(使用其他方式进行初始化也可以):
import javax.servlet.ServletContextEvent; import org.springframework.context.ApplicationContext; import org.springframework.web.context.ContextLoaderListener; import com.suning.framework.sedis.impl.ShardedJedisClientImpl; import com.suning.shp.utils.DistributedLock; /** * 启动监听器 * * @author guweiqiang */ public class SystemListener extends ContextLoaderListener { private ApplicationContext applicationContext; @Override public void contextInitialized(ServletContextEvent event) { super.contextInitialized(event); /************* spring *********/ applicationContext = super.getCurrentWebApplicationContext(); /**** redis 分布式锁 *****/ DistributedLock.setShardedJedisClient(applicationContext.getBean("jedisClient", ShardedJedisClientImpl.class)); } @Override public void contextDestroyed(ServletContextEvent event) { super.contextDestroyed(event); } }
监听器写好之后,需要在web.xml里配置一下:
<listener> <listener-class>com.suning.shp.listener.SystemListener</listener-class> </listener>
至此,一个基于redis实现的分布式锁就可以使用了,使用方法如下:
DistributedLock lock = new DistributedLock(key, 10000, 5000); try { lock.wrap(new Runnable(){ @Override public void run() { // 这里写需要加分布式锁的业务代码 } }); } catch (Exception e){ LOGGER.error("发生异常:" + e.getMessage(), e); }
相关推荐
以下是一个基于Redis的分布式限流实现,利用了Redis的数据结构——有序集合(Sorted Set)来创建令牌桶算法的一个变体: ```java public class RedisRateLimiter { private static final String BUCKET = "BUCKET...
本文将详细介绍基于Redis实现的分布式锁——“Tedu五阶段Redis分布式锁”。 #### 二、Redis基本概念及应用场景 Redis是一种开源的、高性能的键值存储系统,提供了多种数据结构的支持,如字符串(String)、哈希...
这里我们可以利用Redis的发布订阅功能,当数据库中的数据发生变更时,触发一个消息通知,Java应用程序监听这些消息并更新对应的Redis缓存。此外,还可以通过设置Redis的过期时间,使得缓存数据在一段时间后自动失效...
#### 二、Redis实现分布式锁的基本步骤 最初尝试使用Redis实现分布式锁时,通常采用`setnx`命令来获取锁。`setnx`命令是Redis提供的一个设置键值对的方法,如果键不存在则设置成功并返回1,如果键已存在则返回0。...
《Redis分布式——客户端库tinyredis1详解》 在分布式系统中,Redis作为一种高性能的键值存储服务,常被用来构建高效的数据存储和缓存解决方案。本文将重点介绍一种名为tinyredis的客户端库,它是实现Redis分布式...
本文将详细介绍如何部署一个基于 Redis 3.2 版本的分布式缓存集群。 ##### 3.1 下载 Redis 首先下载 Redis 安装包: ``` wget http://download.redis.io/releases/redis-3.2.3.tar.gz ``` 解压并移动到指定目录...
文档接下来介绍了Redis分布式锁的使用,Redis是广泛使用的一种内存数据结构存储系统,由于其执行速度快,且提供了原子操作的支持,因此常被用于实现分布式锁。setnx(set if not exists)指令被用来“占坑”,即尝试...
Redisson 是一个流行的 Java 客户端,用于与 Redis 数据库进行交互,提供了丰富的数据结构支持,包括分布式锁。在分布式系统中,正确地管理锁是确保数据一致性的重要手段。Redisson 分布式锁提供了多种锁类型,如可...
Redis分布式锁是一种常用的解决方案。 ##### 4.1 SETNX命令 使用`SETNX`命令来实现分布式锁是最简单的方法之一。`SETNX`命令会在键不存在时将其设置为指定值,如果键已存在则不做任何操作。这种方式虽然简单,但...
Redis提供了一种机制——原子操作,如`SETNX`命令,用于实现分布式锁。`SETNX`(Set if Not eXists)命令只有在键不存在的情况下才会设置键值,如果键已存在,则返回0。这在解决并发问题时非常有用,例如在处理库存...
**Redis入门到精通系列之九:Redis实战(六)——基于Redis队列实现异步秒杀下单测试项目代码** ...在`redis_learning_dzdp`这个压缩包中,包含了实现这个系统的具体代码,你可以下载并学习其中的实现方式。
5. **发布订阅**:通过发布/订阅模式,Redis可以实现消息传递,是微服务架构中常见的一种通信方式。 6. **主从复制**:Redis支持主从复制,可以实现数据备份和负载均衡,提升系统的可用性和扩展性。 7. **Cluster...
"redis_netlock"库正是利用了Redis的特性,为Python应用程序提供了一种可靠的分布式锁解决方案。分布式锁是在多节点环境下确保同一时刻只有一个节点执行特定操作的关键工具,它可以避免并发问题,如数据不一致性和...
3. **分布式锁**:通过SetNX命令和Expire机制,可以实现分布式锁,确保多节点间的资源同步。 4. **计数器**:使用Incr/Decr命令,可以方便地实现访问统计、用户行为追踪等功能。 总结,Spring Data Redis是Java...
【标题】:“管理系统系列--【基于shiro的权限管理系统——分布式版】”是一个全面的教程,旨在介绍如何构建一个利用Spring、MyBatis、Redis和Shiro技术的分布式权限管理系统。这个项目旨在提供一个安全、高效且可...
本文将探讨一种基于Go语言实现的分布式消息队列——KiteQ,并深入剖析其设计理念和技术细节。 传统的RPC(Remote Procedure Call)调用模式虽然能够满足一些简单的服务间通信需求,但在复杂的微服务架构中却显得...
2. 滑动窗口算法:另一种可能的实现方式是滑动窗口限流,通过计算一段时间内请求的数量,当达到阈值时进行限流。滑动窗口可以更精确地控制瞬时流量。 六、集成与使用 在lightweight-rate-limiter-master中,包含了...
总结来说,这个解决方案巧妙地结合了Java编程、SQL查询、Redis缓存、分布式锁和异步处理技术,构建了一个高效且可靠的自动数据结转系统。在应对大数据量的场景时,这样的设计可以有效降低存储压力,同时保证了系统的...
《分布式对象存储——原理、架构及Go语言实现》这本书可能会涵盖以下几个核心知识点: 1. **对象存储的基本概念**:包括对象、桶(Bucket)、元数据等基本元素的定义和作用。 2. **分布式存储架构**:例如GFS...