在分布式场景下,有很多种情况都需要实现最终一致性。在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等)。通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性。
本文主要探讨了一种实现分布式最终一致性的解决方案——采用分布式锁。基于分布式锁的解决方案,比如zookeeper,redis都是相较于持久化(如利用InnoDB行锁,或事务,或version乐观锁)方案提供了高可用性,并且支持丰富化的使用场景。 本文通过Java版本的redis分布式锁开源框架——Redisson来解析一下实现分布式锁的思路。
Redis本身支持的事务
MULTI 、 EXEC 、 DISCARD 和 WATCH 是 Redis 事务的基础,MULTI, EXEC, DISCARD 和 WATCH 命令是Redis事务的基石。一个Redis事务允许一组Redis命令单步执行,并提供下面两个重要保证:一个事务中的所有命令串行执行;要么全部命令要么没有任何命令被处理。具体可参开这篇文章:http://blog.xiping.me/2010/12/transaction-in-redis.html。
事务可以一次执行多个命令, 并且带有以下两个重要的保证:
- 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
- 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。
EXEC 命令负责触发并执行事务中的所有命令:
如果客户端在使用 MULTI 开启了一个事务之后,却因为断线而没有成功执行 EXEC ,那么事务中的所有命令都不会被执行。另一方面,如果客户端成功在开启事务之后执行 EXEC ,那么事务中的所有命令都会被执行。当使用 AOF 方式做持久化的时候, Redis 会使用单个 write(2) 命令将事务写入到磁盘中。然而,如果 Redis 服务器因为某些原因被管理员杀死,或者遇上某种硬件故障,那么可能只有部分事务命令会被成功写入到磁盘中。如果 Redis 在重新启动时发现 AOF 文件出了这样的问题,那么它会退出,并汇报一个错误。
自己实现Redis分布式锁
根据一些分布式锁相关的文档,开始动手根据redis提供的原子操作进行实现:
在其中主要用到了Redis中的两条原子命令:
1.SETNX key value
Available since 1.0.0. Time complexity: O(1) Set key to hold string value if key does not exist. In that case, it is equal to SET. When key already holds a value, no operation is performed. SETNX is short for "SET if Not eXists". Return value Integer reply, specifically: 1 if the key was set 0 if the key was not set
2.GETSET key value
起始版本:1.0.0 时间复杂度:O(1) 自动将key对应到value并且返回原来key对应的value,返回之前的旧值,如果之前Key不存在将返回nil。
摘自文章:http://www.jeffkit.info/2011/05/994/,其中有一节:GETSET的妙用,其中有一段:
上一个经验虽说可以解决这条数据该“插入还是更新”的问题,但需要知道当前操作是否针对某数据的首次操作的需求还不少。例如我的程序会在不同时间接收到同一条消息的不同分片信息包,我需要在收到该消息的首个信息包(发送是无序的)时做些特殊处理。 早些时候的做法是为消息在MongoDB维护一个状态对象,有信息包来的时候就走“上锁->检查消息状态->根据状态决定做不做特殊操作->解锁” 这个流程,虽然同事已经把锁的粒度控制得非常细了,但有锁的程序遇上多实例部署就歇了。 Redis的GETSET是解决这个问题的终极武器,只需要把当前信息包的唯一标识对指定的状态属性进行一次GETSET操作,再判断返回值是否为空则知道是否首次操作。GETSET替我们把两次读写的操作封装成了原子操作。
实现逻辑
获取redis针对某个锁的时候,需要根据lockKey区进行setnx(set not exist,顾名思义,如果key值为空,则正常设置,返回1,否则不会进行设置并返回0)操作,如果设置成功,表示已经获得锁,否则并没有获取锁。
如果没有获得锁,去Redis上拿到该key对应的值,在该key上我们存储一个时间戳(用毫秒表示,t1),为了避免死锁以及其他客户端占用该锁超过一定时间(5秒),使用该客户端当前时间戳,与存储的时间戳作比较。
如果没有超过该key的使用时限,返回false,表示其他人正在占用该key,不能强制使用;如果已经超过时限,那我们就可以进行解锁,使用我们的时间戳来代替该字段的值。使用getset来设置该字段的值,getset可以返回设置完成之前的值t2,如果t2!=t1,说明在getset之前已经有其他线程已经设置了该字段,事实上,我们还是没有获得该锁(已经被其他人抢占)。但是这会造成一定的数据错误,因为我们已经用getset设置了一个新的时间戳(并不是抢占该锁的客户端设置的时间戳),所幸这样操作的误差比较小可以忽略不计。
但是如果在setnx失败后,get该值却无法拿到该字段时,说明在我们操作之前该锁已经被释放,这个时候,最好的办法就是重新执行一遍setnx方法来获取其值以获得该锁。当然,这仍然可能失败,但失败的逻辑与之前的相同。虽然出现这种情况非常少见,但是为了避免每次都出现导致StackOverflowError的错误,设置调用层次。
public static final int ACQUIRE_LOCK_MAX_ATTEMPTS = 10; public static final long EXPIRE_TIME = 5000L; /** * 基于时间戳根据lockKey尝试获取锁,需要与releaseLock成对使用; * <p/>使用该方法的前提是必须要保证服务器之间时间同步 * <p/>如果持有锁的时间超过 #{EXPIRE_TIME},视为超时,其他客户端可以对其进行重新获取锁的操作 * * @param lockKey - 锁键值,即争夺的资源 * @return - 当成功获取锁后,返回true,否则返回false;如果没有获取锁,需要客户端进行轮询来尝试获取 */ public boolean acquireLock(String lockKey) { return acquireLock(lockKey, 0); } private boolean acquireLock(String lockKey, int depth) { long setnx = jedis.setnx(lockKey, String.valueOf(System.currentTimeMillis())); if (setnx == 1L) { //说明客户端已经获得锁 LOG.info(String.format("lock key : %s is acquired!", lockKey)); return true; } else { //此时,该lockKey已经被其他客户端加锁 String keyTimestamp = jedis.get(lockKey); if (keyTimestamp == null) { //如果该值已经被清空,就尝试去重新获取 if (depth == ACQUIRE_LOCK_MAX_ATTEMPTS) { //如果尝试次数超过10次,则不再尝试,直接返回false LOG.info(String.format("lock key : %s exceed max attemps: %s, quit!", lockKey, ACQUIRE_LOCK_MAX_ATTEMPTS)); return false; } return acquireLock(lockKey, depth + 1); } long intervalTime = System.currentTimeMillis() - Long.valueOf(keyTimestamp); if (intervalTime < EXPIRE_TIME) { //锁在一定时间内并没有超时,获取锁失败 LOG.info("lock key : %s acquire failed! other client persist this lock!", lockKey); return false; } else { //锁已经超时,尝试执行getset操作,设置当前时间戳 String getSetTimestamp = jedis.getSet(lockKey, String.valueOf(System.currentTimeMillis())); if (getSetTimestamp == null) { //考虑非常特殊的情况,有人释放了锁执行del操作 //此时get/set拿到的是nil值,说明已经获得了锁 LOG.info(String.format("lock key : %s is acquired! GETSET returns null!", lockKey)); return true; } if (!getSetTimestamp.equals(keyTimestamp)) { //在设置时,说明该锁已经被其他client加上 //此时会有对应的副作用,比如 LOG.info("lock key : %s acquire failed! other client acquire this lock!", lockKey); return false; } else { //锁已更新,可以正常返回 LOG.info(String.format("lock key : %s is acquired! origin client is time out!", lockKey)); return true; } } } }
释放锁的过程相对来说比较简单,但是我们采用这种方式的话,不能控制什么时候该释放锁,当然也可以采用回调的方式来实现默认释放掉锁,以便于控制释放过程。
/** * 释放lockKey对应的锁,注意需要与acquireLock成对使用 * <p/>不能随意对其他人使用的锁进行释放操作 * * @param lockKey * @return - 如果释放成功返回true */ public boolean releaseLock(String lockKey) { boolean result = jedis.del(lockKey) == 1L; LOG.info(String.format("lock key: %s is released!", lockKey)); return result; }
但这套毕竟是自己实现的,还会有很多漏洞,在github发现了一套开源的实现:https://github.com/mrniko/redisson/wiki,可以对其进行深入研究并应用到我们的系统中,这样系统会更加健壮。
在简单对其使用Jmeter进行性能测试,发现库存控制比较好,能够满足实际需求,但测试过程中也遇到了一些问题。
我们如果使用这种方式,能否让没有拿到锁的线程能够及时收到通知,重新连接?
对于使用的JedisClient时,不能每次都使用同一个实例,需要在必要的情况化对其执行回收。
四月 28, 2016 5:03:18 下午 org.apache.catalina.core.StandardWrapperValve invoke 严重: Servlet.service() for servlet [springmvc] in context with path [] threw exception [Request processing failed; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Broken pipe] with root cause java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at redis.clients.util.RedisOutputStream.flushBuffer(RedisOutputStream.java:52) at redis.clients.util.RedisOutputStream.flush(RedisOutputStream.java:213) at redis.clients.jedis.Connection.flush(Connection.java:288) at redis.clients.jedis.Connection.getBinaryBulkReply(Connection.java:214) at redis.clients.jedis.Connection.getBulkReply(Connection.java:205) at redis.clients.jedis.Jedis.get(Jedis.java:101) at com.api.example.controller.UserController.decrElement(UserController.java:52) at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:777) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:706) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:943) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:877) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:966) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:857) at javax.servlet.http.HttpServlet.service(HttpServlet.java:620) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:842) at javax.servlet.http.HttpServlet.service(HttpServlet.java:727) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:88) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102) at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408) at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040) at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607) at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:314) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:745)
此时就需要使用JedisPool来获取资源,可以避免出现这个问题,注意在最后要回收资源:
Jedis jedis = jedisPool.getResource(); try { while (true) { String productCountString = jedis.get("product"); if (Integer.parseInt(productCountString) > 0) { if (acquireLock(jedis, "abc")) { int productCount = Integer.parseInt(jedis.get("product")); System.out.println(String.format("%tT --- Get product: %s", new Date(), productCount)); // System.out.println(productCount); jedis.decr("product"); releaseLock(jedis, "abc"); return "Success"; } Thread.sleep(1000L); } else { return "Over"; } } } finally { jedis.close(); }
相关推荐
Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...
本篇重点介绍基于Redis实现分布式锁的方法。Redis提供了高效的键值存储能力,可以很好地支持分布式锁的需求。 ##### 实现原理 Redis实现分布式锁主要依赖三个基本操作:`SETNX`(Set If Not eXists)、`EXPIRE` 和...
本文将深入探讨如何使用Redis实现分布式锁,以及如何利用自旋式加锁和Lua脚本实现原子性解锁。 首先,我们来理解分布式锁的基本概念。分布式锁是在多节点之间共享资源时,用于协调各个节点的访问控制机制。在分布式...
本文讨论了分布式锁的实现方案,主要基于Redis实现分布式锁,以解决分布式系统中资源访问的同步问题。在分布式系统中,需要协调各个系统或主机之间的资源访问,以避免彼此干扰和保证一致性。分布式锁是控制分布式...
本文将深入探讨如何使用C++结合Redis实现分布式锁,并详细讲解Redis API在C++中的应用,以及如何处理与Boost库的集成。 首先,Redis是一个高性能的键值存储数据库,广泛用于缓存、消息队列、分布式锁等场景。分布式...
在实现基于Redis的分布式锁时,通常会用到两个命令:NX(Not eXists)和EX(过期时间)。NX命令确保只有在键不存在时才能被设置,这样可以保证锁的互斥性。EX命令则是用来设置键的过期时间,保证锁可以在一段时间后...
接下来,我们将实现基于Redis的分布式锁。分布式锁的主要目的是在多节点环境下确保同一时刻只有一个节点可以执行特定操作。以下是一个简单的分布式锁实现: ```java public class DistributedLock { private ...
本教程将深入探讨如何在SpringBoot应用中实现基于Redis的分布式锁。 首先,Redis之所以常被用作分布式锁的实现,是因为其具有以下优点: 1. **高可用性**:Redis支持主从复制,可以确保在单点故障时仍有服务可用。...
以下是一个简单的分布式锁实现的Lua脚本示例: ```lua if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('expire', KEYS[1], ARGV[2]) return true else return false end ``` 在Java中,我们...
总结,基于Go和Redis实现的分布式锁结合看门狗和红锁策略,能够在保证高可用性的同时,解决分布式环境中的并发控制问题。在实际应用中,还需要关注性能优化、错误处理和监控等方面,确保系统的稳定运行。
本文将深入探讨如何基于Redis实现分布式方法锁,并分析其工作原理、优势以及应用场景。 **1. Redis分布式锁的概念** Redis分布式锁是一种基于键值存储系统的锁机制,它通过在Redis中设置一个带有超时时间的key来...
3. **操作难度**:Redis实现分布式锁较为复杂,需要处理时间计算、锁续期等问题;而Zookeeper通过创建和监听临时节点,实现逻辑更为清晰简单。 总结来说,Redis和Zookeeper在实现分布式锁时各有优缺点。Redis适合对...
总结起来,基于 Redis 实现的分布式锁主要依赖 Redis 的原子操作,如 SETNX 和 Lua 脚本,确保加锁、解锁的互斥性。同时,通过设置过期时间和锁续约机制来防止死锁。在实际应用中,还需要考虑锁的可重入性、公平性、...
本篇文章将深入探讨如何利用Redis实现分布式锁以及如何构建一个基于Redis的任务队列。 分布式锁是解决多节点共享资源时防止数据冲突的关键机制。在Go中,我们通常通过与Redis交互来实现这一功能。Redis提供了`SETNX...
Redis是一种高性能的键值存储系统,常被用作数据缓存、消息中间件以及分布式锁等场景。由于其支持网络通信且操作速度快,非常适合用来存储和共享Session数据。 首先,让我们来了解如何配置基于Redis的分布式Session...
以下是一个基于Redis实现分布式锁的简要介绍: 1. **加锁实现**: 使用`SET`命令的`NX`和`EX`选项,确保在键不存在时设置,并设定超时时间。例如,使用Jedis客户端的`set`方法,代码如下: ```java private ...
“基于Redis和Lua脚本的分布式锁的实现” 基于Redis和Lua脚本的分布式锁的实现是使用Redis和Lua脚本来实现分布式锁的技术。分布式锁是指在分布式系统中,多个节点之间需要协调和同步的机制,以避免同时访问共享资源...
【Redis分布式锁实现原理】 Redis 分布式锁是一种在分布式系统中实现锁的常见方法,其核心在于利用Redis的原子性操作来保证并发环境下的安全。在本例中,使用了`Redis Template`,它是Spring Data Redis提供的一个...
在多节点项目中,这种基于Redis的分布式锁可以有效防止重复操作,例如防止同一订单的多次支付、防止同一用户被多次扣减积分等问题。同时,由于锁的获取和释放都是基于Redis操作,性能开销较小,适用于高并发场景。 ...