一。为何使用分布式锁?
当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。
二。对分布式锁的要求
1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入
三。方案1,基于zookeeper的分布式锁
001 |
/** |
002 |
* DistributedLockUtil.java |
003 |
* 分布式锁工厂类,所有分布式请求都由该工厂类负责 |
004 |
**/ |
005 |
public class DistributedLockUtil {
|
006 |
private static Object schemeLock = new Object();
|
007 |
private static Object mutexLock = new Object();
|
008 |
private static Map<String,Object> mutexLockMap = new ConcurrentHashMap();
|
009 |
private String schema;
|
010 |
private Map<String,DistributedReentrantLock> cache = new ConcurrentHashMap<String,DistributedReentrantLock>();
|
011 |
|
012 |
private static Map<String,DistributedLockUtil> instances = new ConcurrentHashMap();
|
013 |
public static DistributedLockUtil getInstance(String schema){
|
014 |
DistributedLockUtil u = instances.get(schema);
|
015 |
if (u== null ){
|
016 |
synchronized (schemeLock){
|
017 |
u = instances.get(schema);
|
018 |
if (u == null ){
|
019 |
u = new DistributedLockUtil(schema);
|
020 |
instances.put(schema, u);
|
021 |
}
|
022 |
}
|
023 |
}
|
024 |
return u;
|
025 |
}
|
026 |
|
027 |
private DistributedLockUtil(String schema){
|
028 |
this .schema = schema;
|
029 |
}
|
030 |
|
031 |
private Object getMutex(String key){
|
032 |
Object mx = mutexLockMap.get(key);
|
033 |
if (mx == null ){
|
034 |
synchronized (mutexLock){
|
035 |
mx = mutexLockMap.get(key);
|
036 |
if (mx== null ){
|
037 |
mx = new Object();
|
038 |
mutexLockMap.put(key,mx);
|
039 |
}
|
040 |
}
|
041 |
}
|
042 |
return mx;
|
043 |
}
|
044 |
|
045 |
private DistributedReentrantLock getLock(String key){
|
046 |
DistributedReentrantLock lock = cache.get(key);
|
047 |
if (lock == null ){
|
048 |
synchronized (getMutex(key)){
|
049 |
lock = cache.get(key);
|
050 |
if (lock == null ){
|
051 |
lock = new DistributedReentrantLock(key,schema);
|
052 |
cache.put(key, lock);
|
053 |
}
|
054 |
}
|
055 |
}
|
056 |
return lock;
|
057 |
}
|
058 |
|
059 |
public void reset(){
|
060 |
for (String s : cache.keySet()){
|
061 |
getLock(s).unlock();
|
062 |
}
|
063 |
}
|
064 |
|
065 |
/**
|
066 |
* 尝试加锁
|
067 |
* 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁
|
068 |
*
|
069 |
* @param key
|
070 |
* @return
|
071 |
* @throws InterruptedException
|
072 |
* @throws KeeperException
|
073 |
*/
|
074 |
public LockStat lock(String key) throws InterruptedException, KeeperException{
|
075 |
if (getLock(key).isOwner()){
|
076 |
return LockStat.NONEED;
|
077 |
}
|
078 |
getLock(key).lock();
|
079 |
return LockStat.SUCCESS;
|
080 |
}
|
081 |
|
082 |
public void clearLock(String key) throws InterruptedException, KeeperException{
|
083 |
synchronized (getMutex(key)){
|
084 |
DistributedReentrantLock l = cache.get(key);
|
085 |
l.clear();
|
086 |
cache.remove(key);
|
087 |
}
|
088 |
}
|
089 |
|
090 |
public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{
|
091 |
unlock(key,stat, false );
|
092 |
}
|
093 |
094 |
public void unlock(String key,LockStat stat, boolean keepalive) throws InterruptedException, KeeperException{
|
095 |
if (stat == null ) return ;
|
096 |
if (LockStat.SUCCESS.equals(stat)){
|
097 |
DistributedReentrantLock lock = getLock(key);
|
098 |
boolean hasWaiter = lock.unlock();
|
099 |
if (!hasWaiter && !keepalive){
|
100 |
synchronized (getMutex(key)){
|
101 |
lock.clear();
|
102 |
cache.remove(key);
|
103 |
}
|
104 |
}
|
105 |
}
|
106 |
}
|
107 |
|
108 |
public static enum LockStat{
|
109 |
NONEED,
|
110 |
SUCCESS
|
111 |
}
|
112 |
} |
001 |
/** |
002 |
*DistributedReentrantLock.java |
003 |
*本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销 |
004 |
*/ |
005 |
public class DistributedReentrantLock {
|
006 |
private static final Logger logger = Logger.getLogger(DistributedReentrantLock. class );
|
007 |
private ReentrantLock reentrantLock = new ReentrantLock();
|
008 |
009 |
private WriteLock writeLock;
|
010 |
private long timeout = 3 * 60 * 1000 ;
|
011 |
|
012 |
private final Object mutex = new Object();
|
013 |
private String dir;
|
014 |
private String schema;
|
015 |
|
016 |
private final ExitListener exitListener = new ExitListener(){
|
017 |
@Override
|
018 |
public void execute() {
|
019 |
initWriteLock();
|
020 |
}
|
021 |
};
|
022 |
|
023 |
private synchronized void initWriteLock(){
|
024 |
logger.debug( "初始化writeLock" );
|
025 |
writeLock = new WriteLock(dir, new LockListener(){
|
026 |
027 |
@Override
|
028 |
public void lockAcquired() {
|
029 |
synchronized (mutex){
|
030 |
mutex.notify();
|
031 |
}
|
032 |
}
|
033 |
@Override
|
034 |
public void lockReleased() {
|
035 |
}
|
036 |
|
037 |
},schema);
|
038 |
|
039 |
if (writeLock != null && writeLock.zk != null ){
|
040 |
writeLock.zk.addExitListener(exitListener);
|
041 |
}
|
042 |
|
043 |
synchronized (mutex){
|
044 |
mutex.notify();
|
045 |
}
|
046 |
}
|
047 |
|
048 |
public DistributedReentrantLock(String dir,String schema) {
|
049 |
this .dir = dir;
|
050 |
this .schema = schema;
|
051 |
initWriteLock();
|
052 |
}
|
053 |
054 |
public void lock( long timeout) throws InterruptedException, KeeperException {
|
055 |
reentrantLock.lock(); //多线程竞争时,先拿到第一层锁
|
056 |
try {
|
057 |
boolean res = writeLock.trylock();
|
058 |
if (!res){
|
059 |
synchronized (mutex){
|
060 |
mutex.wait(timeout);
|
061 |
}
|
062 |
if (writeLock == null || !writeLock.isOwner()){
|
063 |
throw new InterruptedException( "锁超时" );
|
064 |
}
|
065 |
}
|
066 |
} catch (InterruptedException e){
|
067 |
reentrantLock.unlock();
|
068 |
throw e;
|
069 |
} catch (KeeperException e){
|
070 |
reentrantLock.unlock();
|
071 |
throw e;
|
072 |
}
|
073 |
}
|
074 |
|
075 |
public void lock() throws InterruptedException, KeeperException {
|
076 |
lock(timeout);
|
077 |
}
|
078 |
079 |
public void destroy() throws KeeperException {
|
080 |
writeLock.unlock();
|
081 |
}
|
082 |
|
083 |
084 |
public boolean unlock(){
|
085 |
if (!isOwner()) return false ;
|
086 |
try {
|
087 |
writeLock.unlock();
|
088 |
reentrantLock.unlock(); //多线程竞争时,释放最外层锁
|
089 |
} catch (RuntimeException e){
|
090 |
reentrantLock.unlock(); //多线程竞争时,释放最外层锁
|
091 |
throw e;
|
092 |
}
|
093 |
|
094 |
return reentrantLock.hasQueuedThreads();
|
095 |
}
|
096 |
097 |
098 |
099 |
public boolean isOwner() {
|
100 |
return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();
|
101 |
}
|
102 |
103 |
public void clear() {
|
104 |
writeLock.clear();
|
105 |
}
|
106 |
107 |
} |
001 |
/** |
002 |
*WriteLock.java |
003 |
*基于zk的锁实现 |
004 |
*一个最简单的场景如下: |
005 |
*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1 |
006 |
*2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2 |
007 |
*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁 |
008 |
*4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件 |
009 |
*5.结点A拿到锁,处理业务,处理完,释放锁(删除自己) |
010 |
*6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。 |
011 |
*/ |
012 |
public class WriteLock extends ZkPrimative {
|
013 |
private static final Logger LOG = Logger.getLogger(WriteLock. class );
|
014 |
015 |
private final String dir;
|
016 |
private String id;
|
017 |
private LockNode idName;
|
018 |
private String ownerId;
|
019 |
private String lastChildId;
|
020 |
private byte [] data = { 0x12 , 0x34 };
|
021 |
private LockListener callback;
|
022 |
|
023 |
public WriteLock(String dir,String schema) {
|
024 |
super (schema, true );
|
025 |
this .dir = dir;
|
026 |
}
|
027 |
|
028 |
public WriteLock(String dir,LockListener callback,String schema) {
|
029 |
this (dir,schema);
|
030 |
<a href= "http://www.nbso.ca/" >nbso online casino reviews</a> this .callback = callback;
|
031 |
}
|
032 |
033 |
public LockListener getLockListener() {
|
034 |
return this .callback;
|
035 |
}
|
036 |
|
037 |
public void setLockListener(LockListener callback) {
|
038 |
this .callback = callback;
|
039 |
}
|
040 |
041 |
public synchronized void unlock() throws RuntimeException {
|
042 |
if (zk == null || zk.isClosed()){
|
043 |
return ;
|
044 |
}
|
045 |
if (id != null ) {
|
046 |
try {
|
047 |
zk.delete(id, - 1 );
|
048 |
} catch (InterruptedException e) {
|
049 |
LOG.warn( "Caught: " e, e);
|
050 |
//set that we have been interrupted.
|
051 |
Thread.currentThread().interrupt();
|
052 |
} catch (KeeperException.NoNodeException e) {
|
053 |
// do nothing
|
054 |
} catch (KeeperException e) {
|
055 |
LOG.warn( "Caught: " e, e);
|
056 |
throw (RuntimeException) new RuntimeException(e.getMessage()).
|
057 |
initCause(e);
|
058 |
} finally {
|
059 |
if (callback != null ) {
|
060 |
callback.lockReleased();
|
061 |
}
|
062 |
id = null ;
|
063 |
}
|
064 |
}
|
065 |
}
|
066 |
|
067 |
private class LockWatcher implements Watcher {
|
068 |
public void process(WatchedEvent event) {
|
069 |
LOG.debug( "Watcher fired on path: " event.getPath() " state: "
|
070 |
event.getState() " type " event.getType());
|
071 |
try {
|
072 |
trylock();
|
073 |
} catch (Exception e) {
|
074 |
LOG.warn( "Failed to acquire lock: " e, e);
|
075 |
}
|
076 |
}
|
077 |
}
|
078 |
|
079 |
private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
|
080 |
throws KeeperException, InterruptedException {
|
081 |
List<String> names = zookeeper.getChildren(dir, false );
|
082 |
for (String name : names) {
|
083 |
if (name.startsWith(prefix)) {
|
084 |
id = dir "/" name;
|
085 |
if (LOG.isDebugEnabled()) {
|
086 |
LOG.debug( "Found id created last time: " id);
|
087 |
}
|
088 |
break ;
|
089 |
}
|
090 |
}
|
091 |
if (id == null ) {
|
092 |
id = zookeeper.create(dir "/" prefix, data,
|
093 |
acl, EPHEMERAL_SEQUENTIAL);
|
094 |
095 |
if (LOG.isDebugEnabled()) {
|
096 |
LOG.debug( "Created id: " id);
|
097 |
}
|
098 |
}
|
099 |
100 |
}
|
101 |
102 |
public void clear() {
|
103 |
if (zk == null || zk.isClosed()){
|
104 |
return ;
|
105 |
}
|
106 |
try {
|
107 |
zk.delete(dir, - 1 );
|
108 |
} catch (Exception e) {
|
109 |
LOG.error( "clear error: " e,e);
|
110 |
}
|
111 |
}
|
112 |
|
113 |
public synchronized boolean trylock() throws KeeperException, InterruptedException {
|
114 |
if (zk == null ){
|
115 |
LOG.info( "zk 是空" );
|
116 |
return false ;
|
117 |
}
|
118 |
if (zk.isClosed()) {
|
119 |
LOG.info( "zk 已经关闭" );
|
120 |
return false ;
|
121 |
}
|
122 |
ensurePathExists(dir);
|
123 |
|
124 |
LOG.debug( "id:" id);
|
125 |
do {
|
126 |
if (id == null ) {
|
127 |
long sessionId = zk.getSessionId();
|
128 |
String prefix = "x-" sessionId "-" ;
|
129 |
idName = new LockNode(id);
|
130 |
LOG.debug( "idName:" idName);
|
131 |
}
|
132 |
if (id != null ) {
|
133 |
List<String> names = zk.getChildren(dir, false );
|
134 |
if (names.isEmpty()) {
|
135 |
LOG.warn( "No children in: " dir " when we've just "
|
136 |
"created one! Lets recreate it..." );
|
137 |
id = null ;
|
138 |
} else {
|
139 |
SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
|
140 |
for (String name : names) {
|
141 |
sortedNames.add( new LockNode(dir "/" name));
|
142 |
}
|
143 |
ownerId = sortedNames.first().getName();
|
144 |
LOG.debug( "all:" sortedNames);
|
145 |
SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
|
146 |
LOG.debug( "less than me:" lessThanMe);
|
147 |
if (!lessThanMe.isEmpty()) {
|
148 |
LockNode lastChildName = lessThanMe.last();
|
149 |
lastChildId = lastChildName.getName();
|
150 |
if (LOG.isDebugEnabled()) {
|
151 |
LOG.debug( "watching less than me node: " lastChildId);
|
152 |
}
|
153 |
Stat stat = zk.exists(lastChildId, new LockWatcher());
|
154 |
if (stat != null ) {
|
155 |
return Boolean.FALSE;
|
156 |
} else {
|
157 |
LOG.warn( "Could not find the"
|
158 |
" stats for less than me: " lastChildName.getName());
|
159 |
}
|
160 |
} else {
|
161 |
if (isOwner()) {
|
162 |
if (callback != null ) {
|
163 |
callback.lockAcquired();
|
164 |
}
|
165 |
return Boolean.TRUE;
|
166 |
}
|
167 |
}
|
168 |
}
|
169 |
}
|
170 |
}
|
171 |
while (id == null );
|
172 |
return Boolean.FALSE;
|
173 |
}
|
174 |
175 |
public String getDir() {
|
176 |
return dir;
|
177 |
}
|
178 |
179 |
public boolean isOwner() {
|
180 |
return id != null && ownerId != null && id.equals(ownerId);
|
181 |
}
|
182 |
183 |
public String getId() {
|
184 |
return this .id;
|
185 |
}
|
186 |
} |
使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般;
相关推荐
Redis和Zookeeper作为两种常用的分布式锁实现方式,各有优缺点。在实际应用中,应根据业务需求和系统特性来选择适合的方案。同时,使用分布式锁时,必须考虑锁的可扩展性、公平性、安全性和容错性,确保在分布式环境...
在分布式系统中,为了保证数据的一致性和安全性,分布式锁是一种常见的解决方案。本文将深入探讨如何使用Redisson和...提供的代码示例可以作为学习和参考的起点,帮助开发者更好地理解和应用这两种分布式锁的实现。
redis分布式锁的工具类,采用的是Lua代码的方式,保证了Java执行方法的原子性。
分布式锁是一种在分布式系统中实现同步访问同一资源的机制,特别是在多节点环境下,它能确保在任何时刻只有一个节点能够对共享资源进行操作。基于Netty实现的分布式锁,利用了Netty作为高效的异步事件驱动网络应用...
分布式锁是分布式系统中用于同步资源访问的一种机制,它能够保证在分布式部署的应用系统中,同一时刻只允许一个客户端对共享...在实际应用中,需要根据具体业务场景和需求,权衡利弊,选择最适合的分布式锁实现方案。
当前使用较多的分布式锁方案主要基于 Redis 和 ZooKeeper 提供的功能特性加以封装来实现的。 Redis 实现的锁服务: 加锁流程: * SET resource_name my_random_value NX PX max-lock-time 解锁流程: * if ...
Redisson的分布式锁实现更加完善,支持可重入锁、公平锁、读写锁,还具有锁自动续期功能,避免了因网络延迟导致的锁丢失。使用Redisson创建分布式锁只需几行代码,通过`RLock`接口的`lock()`和`unlock()`方法即可...
分布式锁是一种在分布式系统中实现同步的技术,它允许多个节点在同一时间访问共享资源。在大型分布式环境中,确保数据的一致性和正确性至关重要,这就是分布式锁发挥作用的地方。Zookeeper,一个由Apache开发的...
Redlock算法是一种更为高级的分布式锁解决方案,它可以解决锁提前释放的问题。Redlock算法通过在多个Redis实例上尝试获取锁,并根据多数投票原则来决定是否真正获得锁,从而提高了锁的可靠性。此外,Redlock还提供了...
分布式锁是分布式系统中一个非常重要的概念,通过不断的技术演进和完善,我们已经可以从最初的简单实现逐步过渡到更加成熟稳定的解决方案。无论是Redis还是ZooKeeper,它们都在各自的领域内发挥了重要作用,为企业级...
lock4j作为一款高性能的分布式锁实现,自v2.2.5版本以来,凭借其优秀的性能和稳定性,受到了广大开发者的青睐。本文将深入探讨lock4j的核心特性和应用场景,以期帮助读者理解并掌握这款分布式锁的使用。 一、lock4j...
Java中常见的分布式锁实现有基于Zookeeper、Redis以及数据库的解决方案。例如,使用Redis的`setnx`命令可以创建一个唯一键来实现锁,或者通过Zookeeper的临时节点来模拟锁的逻辑。 在使用多线程和分布式锁时,需要...
综上所述,根据实际应用场景的需求选择合适的分布式锁实现方案是非常重要的。例如,在对性能要求极高的场景下,可以选择基于缓存的实现;而在需要更高可靠性和一致性保障的场景下,则可以选择基于Zookeeper的实现。
在实际应用中,根据业务需求和系统架构,选择合适的分布式锁实现方案至关重要。例如,对于读多写少的场景,可以使用读写锁来提高读取性能;对于强一致性要求,可能需要使用更复杂的分布式锁策略,如两阶段锁或乐观锁...
1. 存储空间:分布式锁需要一个所有参与节点都能访问的共享存储空间。常见的实现包括数据库(如行锁、乐观锁)、缓存系统(如Redis、Tair、Memcached、MongoDB)以及专门的分布式协调服务(如Zookeeper)。这些存储...
掌握分布式锁和信号量不仅有助于解决现有业务中的技术难题,还能为未来可能出现的新需求提供灵活的解决方案。 ### 总结 总而言之,分布式锁与信号量是实现分布式系统中资源共享与访问控制不可或缺的技术手段。通过...
Redis和Zookeeper是两种常用的分布式协调服务,它们都可以用来实现分布式锁,但具体实现方式和特性有所不同。 **Redis分布式锁** Redis作为内存数据库,具有高性能、低延迟的特点,常被用作实现分布式锁。Redis...
Redis 分布式锁是解决分布式系统中多个服务并发访问共享资源问题的一种常见方案。在分布式环境中,传统的单机锁无法满足需求,因为它们不能确保跨多个服务器或进程的同步。Redis,作为一个高性能的键值存储系统,...
在实践中,ZooKeeper的分布式锁解决方案需要考虑的方面还包括异常处理(如网络分区)、锁的超时机制、公平性策略(确保等待时间最长的线程优先获得锁)以及锁的粒度(细粒度锁可减少锁竞争,但会增加系统复杂性)。...
基于Redis的分布式锁实现通常简单易用,但并非无懈可击。一个基本的实现方式包括以下两个关键点: 1. 加锁与解锁必须使用相同的标识,通常通过生成唯一的ID来实现。 2. 设置锁的过期时间,以防止资源被永久锁定。 ...