- 浏览: 483679 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
龘龘龘:
TrueBrian 写道有个问题,Sample 1中,为了控制 ...
What's New on Java 7 Phaser -
龘龘龘:
楼主总结的不错。
What's New on Java 7 Phaser -
TrueBrian:
有个问题,Sample 1中,为了控制线程的启动时机,博主实际 ...
What's New on Java 7 Phaser -
liguanqun811:
不知道楼主是否对zookeeper实现的分布式锁进行过性能测试 ...
Distributed Lock -
hobitton:
mysql的get lock有版本限制,否则get lock可 ...
Distributed Lock
1 Overview
在分布式系统中,通常会避免使用分布式锁。然而在某些场景下,还是存在对分布式锁的需求。跟普通锁相比,分布式锁面需要对的问题更多,例如怎样保证某个进程在持有锁时意外终止之后,其它进程也能够正常地获得锁等等。笔者认为一个比较好的分布式锁实现是Terracotta,但是这不是本文的重点,感兴趣的读者可以参考笔者的Terracotta in Action 系列文章(http://whitesock.iteye.com/blog/351780 , http://whitesock.iteye.com/blog/352876 , http://whitesock.iteye.com/blog/354587 )。
除了Terracotta,不少其它开源项目也声称支持分布式锁,例如ZooKeeper,JGroups和Hazelcast等。在这些项目中,笔者倾向于使用ZooKeeper。ZooKeeper在其官方文档的ZooKeeper Recipes and Solutions章节中介绍了一个分布式锁的实现,本文主要对该版本进行了改良。关于Hazelcast,笔者不得不说,其官方文档文字不少但却苍白,很多内容介绍的都是浅尝辄止,难道是强迫开发人员去仔细地阅读源码,或者参加其价格不菲的培训?
2 Implementation
首先,笔者希望分布式锁能够支持Java并发包中的Lock接口,并且最好是可重入的。此外,在某个进程持有分布式锁的过程中,如果不能保证该锁不会被其它进程同时持有(例如网络故障),那么至少应该能够通知锁的持有者,以便其采取相应的应对措施。以下是笔者对分布式锁的定义:
import java.util.concurrent.locks.Lock; public interface DistributedLock extends Lock { Listener getListener(); void setListener(Listener listener); /** * */ interface Listener { void onAbort(DistributedLock lock, Exception e); } }
其中Listener接口的作用是,在无法排它独占该锁时进行回调。接下来是笔者的两个实现的共通父类。
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public abstract class AbstractDistributedLock implements DistributedLock { // protected volatile boolean verbose; protected volatile Listener listener; protected final ReentrantLock lock = new ReentrantLock(); // protected abstract void doLock(); protected abstract void doUnlock(); protected abstract boolean doTryLock(); protected abstract void doLockInterruptibly() throws InterruptedException; protected abstract boolean doTryLock(long timeout, TimeUnit unit) throws InterruptedException; /** * */ public boolean isVerbose() { return verbose; } public void setVerbose(boolean verbose) { this.verbose = verbose; } public boolean isLocked() { return this.lock.isLocked(); } public boolean isHeldByCurrentThread() { return this.lock.isHeldByCurrentThread(); } /** * */ @Override public Listener getListener() { return this.listener; } @Override public void setListener(Listener listener) { this.listener = listener; } /** * */ @Override public void lock() { // this.lock.lock(); if(this.lock.getHoldCount() > 1) return; // boolean succeed = false; try { doLock(); succeed = true; } finally { if(!succeed) { this.lock.unlock(); } } } @Override public void lockInterruptibly() throws InterruptedException { // this.lock.lockInterruptibly(); if(this.lock.getHoldCount() > 1) return; // boolean succeed = false; try { doLockInterruptibly(); succeed = true; } finally { if(!succeed) { this.lock.unlock(); } } } @Override public boolean tryLock() { // if(!this.lock.tryLock()) return false; if(this.lock.getHoldCount() > 1) return true; // boolean succeed = false; try { succeed = doTryLock(); } finally { if(!succeed) { this.lock.unlock(); } } return succeed; } @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { // final long mark = System.nanoTime(); if(!this.lock.tryLock(timeout, unit)) return false; if(this.lock.getHoldCount() > 1) return true; // boolean succeed = false; try { timeout = TimeUnit.NANOSECONDS.convert(timeout, unit) - (System.nanoTime() - mark); if(timeout >= 0) { succeed = doTryLock(timeout, TimeUnit.NANOSECONDS); } } finally { if(!succeed) { this.lock.unlock(); } } return succeed; } @Override public void unlock() { // if(!this.lock.isHeldByCurrentThread()) return; if(this.lock.getHoldCount() > 1) return; // try { doUnlock(); } finally { this.lock.unlock(); } } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } }
2.1 MySQL Named Lock
在讨论ZooKeeper的分布式锁实现之前,先介绍一下笔者基于MySQL Named Lock的一个实现。
import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.sql.DataSource; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.commons.lang.exception.NestableRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class MySQLNamedLock extends AbstractDistributedLock { // private static final Logger LOGGER = LoggerFactory.getLogger(MySQLNamedLock.class); // private String name; private DataSource dataSource; private long validationInterval = 1000L; private ScheduledExecutorService scheduler; private final AtomicReference<Connection> connection; private final AtomicReference<ScheduledFuture<?>> future; /** * */ public MySQLNamedLock() { this(null, null, null); } public MySQLNamedLock(String name, DataSource dataSource, ScheduledExecutorService scheduler) { this.name = name; this.scheduler = scheduler; this.dataSource = dataSource; this.connection = new AtomicReference<Connection>(); this.future = new AtomicReference<ScheduledFuture<?>>(); } /** * */ @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("name", this.name).toString(); } /** * */ public String getName() { return name; } public void setName(String name) { this.name = name; } public long getValidationInterval() { return validationInterval; } public void setValidationInterval(long interval) { this.validationInterval = interval; } public DataSource getDataSource() { return dataSource; } public void setDataSource(DataSource dataSource) { this.dataSource = dataSource; } public ScheduledExecutorService getScheduler() { return scheduler; } public void setScheduler(ScheduledExecutorService scheduler) { this.scheduler = scheduler; } /** * */ @Override protected void doLock() { doTryLock(Integer.MAX_VALUE, TimeUnit.SECONDS); } @Override protected void doLockInterruptibly() { doTryLock(Integer.MAX_VALUE, TimeUnit.SECONDS); } @Override protected boolean doTryLock() { return doTryLock(0, TimeUnit.SECONDS); } @Override protected boolean doTryLock(long timeout, TimeUnit unit) { // Integer r = null; ResultSet rs = null; PreparedStatement ps = null; try { this.connection.set(this.dataSource.getConnection()); ps = this.connection.get().prepareStatement("SELECT GET_LOCK(?, ?)"); ps.setString(1, this.name); ps.setInt(2, (int)TimeUnit.SECONDS.convert(timeout, unit)); rs = ps.executeQuery(); if(rs.next()) { r = rs.getInt(1); if(rs.wasNull()) r = null; } } catch(Exception e) { throw new NestableRuntimeException("failed to lock, name: " + this.name, e); } finally { JdbcUtils.closeQuietly(rs); JdbcUtils.closeQuietly(ps); } // final boolean succeed = (r != null && r == 1); if(succeed && this.listener != null) { final long interval = this.validationInterval; this.future.set(this.scheduler.scheduleWithFixedDelay(new ValidationTask(), interval, interval, TimeUnit.MILLISECONDS)); } // return succeed; } @Override protected void doUnlock() { // final ScheduledFuture<?> f = this.future.getAndSet(null); if(f != null) f.cancel(true); // Integer r = null; ResultSet rs = null; PreparedStatement ps = null; try { // ps = this.connection.get().prepareStatement("SELECT RELEASE_LOCK(?)"); ps.setString(1, this.name); rs = ps.executeQuery(); if(rs.next()) { r = rs.getInt(1); if(rs.wasNull()) r = null; } // if(r == null) { LOGGER.warn("lock does NOT exist, name: {}", this.name); } else if(r == 0) { LOGGER.warn("lock was NOT accquired by current thread, name: {}", this.name); } else { LOGGER.warn("failed to unlock, name: {}, result: {}", this.name, r); } } catch(Exception e) { throw new NestableRuntimeException("failed to unlock, name: " + this.name, e); } finally { JdbcUtils.closeQuietly(rs); JdbcUtils.closeQuietly(ps); JdbcUtils.closeQuietly(this.connection.getAndSet(null)); } } /** * */ private class ValidationTask implements Runnable { @Override public void run() { try { ((com.mysql.jdbc.Connection)connection.get()).ping(); } catch(Exception e) { // if(isLocked() && listener != null && connection.get() != null) { listener.onAbort(MySQLNamedLock.this, e); } // throw new NestableRuntimeException(e); // Note: suppress subsequent executions } } } }
需要注意的是,如果在该锁上注册了Listener,并且Connection在持有锁的过程中失效,那么该Listener会被回调。
2.2 ZooKeeper Lock
以下代码是笔者对ZooKeeper官方版本的改良:
import java.lang.management.ManagementFactory; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.commons.lang.exception.NestableRuntimeException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class ZooKeeperLock extends AbstractDistributedLock { // private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperLock.class); // private String directory; private ZooKeeper zookeeper; private final String processName; private final AtomicReference<ZooKeeperLocker> locker; /** * */ public ZooKeeperLock() { this(null, null); } public ZooKeeperLock(ZooKeeper zookeeper, String directory) { this.zookeeper = zookeeper; this.directory = directory; this.locker = new AtomicReference<ZooKeeperLocker>(); this.processName = ManagementFactory.getRuntimeMXBean().getName(); } /** * */ @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("directory", this.directory).toString(); } /** * */ public String getDirectory() { return directory; } public void setDirectory(String directory) { this.directory = directory; } public ZooKeeper getZookeeper() { return zookeeper; } public void setZookeeper(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } /** * */ @Override protected void doLock() { doTryLock(Integer.MAX_VALUE, TimeUnit.SECONDS); } @Override protected void doLockInterruptibly() { doTryLock(Integer.MAX_VALUE, TimeUnit.SECONDS); } @Override protected boolean doTryLock() { return doTryLock(0, TimeUnit.SECONDS); } @Override protected boolean doTryLock(long timeout, TimeUnit unit) { try { this.locker.set(new ZooKeeperLocker()); return this.locker.get().lock(timeout, unit); } catch(Exception e) { throw new NestableRuntimeException("failed to lock, directory: " + this.directory, e); } } @Override protected void doUnlock() { try { this.locker.get().unlock(); } catch(Exception e) { throw new NestableRuntimeException("failed to unlock, directory: " + this.directory, e); } finally { this.locker.set(null); } } /** * */ private class ZooKeeperLocker implements Watcher { // private volatile String name; private volatile CountDownLatch latch; /** * */ @Override public void process(WatchedEvent event) { // if(this.latch != null) { this.latch.countDown(); } // if(isVerbose() && LOGGER.isInfoEnabled()) { LOGGER.info("received an event: {}", event); } } public boolean lock(long timeout, TimeUnit unit) throws Exception { boolean succeed = false; try { do { final long mark = System.nanoTime(); timeout = TimeUnit.NANOSECONDS.convert(timeout, unit); try { succeed = doLock(timeout, TimeUnit.NANOSECONDS); break; } catch (KeeperException.ConnectionLossException e) { timeout -= (System.nanoTime() - mark); if(isVerbose() && LOGGER.isInfoEnabled()) { LOGGER.info("connection was lost, directory: {}, name: {}, message: {}", new Object[]{directory, this.name, e.getMessage()}); } } } while(timeout > 0); } finally { if(!succeed) { // Unlock quietly try { unlock(); } catch(Exception e) { LOGGER.warn("failed to unlock, directory: " + directory + ", name: " + this.name, e); } } } return succeed; } public void unlock() throws Exception { try { zookeeper.delete(directory + "/" + this.name, -1); } catch (KeeperException.NoNodeException e) { LOGGER.warn("node does NOT exist, directory: {}, name: {}, message: {}", new Object[]{directory, this.name, e.getMessage()}); } finally { this.name = null; } } /** * */ private Boolean doLock(long timeout, TimeUnit unit) throws Exception { boolean succeed = false; do { // final long mark = System.nanoTime(); timeout = TimeUnit.NANOSECONDS.convert(timeout, unit); // if (this.name == null) { this.name = findOrCreateChild(); } // final List<String> children = zookeeper.getChildren(directory, false); if (children.isEmpty()) { this.name = null; LOGGER.warn("could not find any child, directory: {}, name: {}", new Object[]{directory, this.name}); } else { final SequenceComparator comparator = new SequenceComparator(); Collections.sort(children, comparator); final int index = Collections.binarySearch(children, this.name, comparator); if (index > 0) { // Not the first one this.latch = new CountDownLatch(1); final String previous = children.get(index - 1); final Stat stat = zookeeper.exists(directory + "/" + previous, this); if (stat != null) { this.latch.await(timeout, TimeUnit.NANOSECONDS); this.latch = null; } else { LOGGER.warn("could not find the previous child, directory: {}, name: {}", new Object[]{directory, this.name}); } } else { final String owner = children.get(0); if (this.name != null && owner != null && this.name.equals(owner)) { succeed = true; } else { LOGGER.warn("the lock should be held by current thread, directory: {}, name: {}, owner: {}", new Object[]{directory, this.name, owner}); } } } // timeout -= (System.nanoTime() - mark); } while (!succeed && timeout >= 0); return succeed; } private String findOrCreateChild() throws Exception { // final String prefix = zookeeper.getSessionId() + "-"; final List<String> children = zookeeper.getChildren(directory, false); for (String child : children) { if (child.startsWith(prefix)) { if(isVerbose() && LOGGER.isInfoEnabled()) { LOGGER.info("found a child, directory: {}, child: {}", new Object[]{directory, child}); } return child; } } // final String data = Thread.currentThread().getId() + "@" + processName; final String path = zookeeper.create(directory + "/" + prefix, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); final String child = path.substring(path.lastIndexOf("/") + 1); if(isVerbose() && LOGGER.isInfoEnabled()) { LOGGER.info("created a child, directory: {}, path: {}", new Object[]{directory, child}); } return child; } } /** * */ private static class SequenceComparator implements Comparator<String> { @Override public int compare(String lhs, String rhs) { final int index1 = lhs.lastIndexOf('-'); final int index2 = rhs.lastIndexOf('-'); final int sequence1 = Integer.parseInt(lhs.substring(index1 + 1)); final int sequence2 = Integer.parseInt(rhs.substring(index2 + 1)); return sequence1 - sequence2; } } }
ZooKeeperLock是fair的,并且在Node中保存的数据是线程ID,进程ID以及主机名。需要注意的是,应该为ZooKeeper部署集群,此外还需要保证传入ZooKeeperLock构造函数中的ZooKepper实例已经跟Server建立的连接,否则zookeeper.getSessionId()会返回0,从而导致错误。
3 disclaimer
笔者只对以上代码进行了简单的测试,因此可能存在错误,请慎重使用。如果发现问题,感谢反馈。
评论
http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock
5.7.5之前的解决方案真实一个蛋疼的解决方案。
上面的观点不认同,他的文档我觉得还是写的很好的,通俗易懂,而且把该说的都说了。 http://www.hazelcast.com/docs/2.3/manual/pdf/hazelcast-documentation.pdf
发表评论
-
Understanding the Hash Array Mapped Trie
2012-03-30 10:36 0mark -
Atomic Bit Operation in Linux Kernel
2012-02-08 00:27 2098Linux Kernel支持atomic bit operat ... -
A Hierarchical CLH Queue Lock
2012-01-14 19:01 2148A Hierarchical CLH Queue Lock ( ... -
Inside AbstractQueuedSynchronizer (4)
2012-01-08 17:06 3519Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (3)
2012-01-07 23:37 4728Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (2)
2012-01-07 17:54 6365Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (1)
2012-01-06 11:04 7947Inside AbstractQueuedSynchroniz ... -
Code Optimization
2011-10-14 00:11 1607当前开发人员在进行编码的时候,可能很少关注纯粹代码级别的优化了 ... -
What's New on Java 7 Phaser
2011-07-29 10:15 82711 Overview Java 7的并 ... -
Sequantial Lock in Java
2011-06-07 17:00 22141 Overview Linux内核中常见的同步机 ... -
Feature or issue?
2011-04-26 22:23 121以下代码中,为何CglibTest.intercept ... -
Bloom Filter
2010-10-19 00:41 50731 Overview Bloom filt ... -
Inside java.lang.Enum
2010-08-04 15:40 64761 Introduction to enum J ... -
Open Addressing
2010-07-07 17:59 34581 Overview Open addressi ... -
JLine
2010-06-17 09:11 11008Overview JLine 是一个用来处理控 ... -
ID Generator
2010-06-14 14:45 1676关于ID Generator,想 ... -
inotify-java
2009-07-22 22:58 82971 Overview 最近公 ... -
Perf4J
2009-06-11 23:13 84871 Overview Perf4j是一个用于计算 ... -
Progress Estimator
2009-02-22 19:37 1534Jakarta Commons Cookbook这本书 ... -
jManage
2008-12-22 00:40 39571 Overview 由于项目需要, 笔者开发了一个 ...
相关推荐
《Django分布式锁库django-distributedlock的深度解析》 在Python后端开发领域,Django框架因其高效、稳定及强大的功能而广受开发者青睐。然而,在多线程或多进程环境中,为了保证数据的一致性和安全性,我们经常...
DistributedLock.java
分布式锁DistributedLock是一个.NET库,它基于各种基础技术提供了健壮且易于使用的分布式互斥锁,读写器锁和信号灯。 使用DistributedLock,跨多个应用程序/机器同步对代码区域的访问非常简单: using ( await ...
标题"DistributedLock-master.zip"暗示了这是一个关于`.netCore`实现分布式锁的项目源代码,可能包含一个或多个实现分布式锁解决方案的类库或示例。通过在GitHub上搜索并下载这个项目,你可以深入理解如何在`....
在这个名为 "DistributedLock:redis分布式锁" 的项目中,我们可以探讨以下几个关键知识点: 1. **分布式锁的概念**:在分布式系统中,当多个节点需要并发访问共享资源时,为了避免数据不一致性和竞态条件,需要一种...
克隆包含`DistributedLock`项目的存储库后,运行其中的单元测试可以验证分布式锁的正确性。 总结,使用Redis作为分布式锁是解决分布式系统中资源同步问题的有效方法。通过Java中的Jedis客户端,我们可以方便地实现...
Zookeeper 非公平锁/公平锁/共享锁demo代码
分布式锁定 分布式锁的简单C#实现,可以在专注于可扩展性的多客户端(流程)环境中使用。快速开始该库旨在通过抽象使用,因此,无论选择哪种...var distributedLock = await provider . TryAcquireAsync ( resourceI
String lockKey = "distributedLock"; if (redisTemplate.opsForValue().setIfAbsent(lockKey, "lock")) { // 获取锁成功,执行业务逻辑 } else { // 获取锁失败,处理等待或回滚等逻辑 } ``` 这种方法简单易行,...
public Object lock(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable { RLock lock = redissonClient.getLock(distributedLock.key()); try { if (distributedLock.waitTime...
1. 声明式锁:Lock4j 提供了注解 `@DistributedLock`,可以直接在方法上进行标注,实现自动加锁和解锁。这使得开发者无需在业务代码中处理锁的细节,提高了代码的可读性和可维护性。 2. 编程式锁:对于更复杂的需求...
这个名为“distributed-lock-seckill.zip”的压缩包很可能包含了实现这种秒杀场景的Java源代码,旨在演示如何利用分布式锁来解决秒杀过程中的问题。 首先,我们需要理解分布式锁的概念。分布式锁是在分布式系统中,...
import com.example.springbootdemo.commons.lock.DistributedLock; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.spring...
分布式锁分布式锁:使用mysql和zookeeper实现mysql_lock.py:Mysql 分布式锁zk_lock.py:zookeeper 分布式锁(需要 python 模块 'kazoo') test_mysql_lock.py:测试Mysql分布式锁test_zookeeper_lock.py:测试...
java7 源码 现在面试都会聊聊分布式系统,通常面试官都会从服务框架(Spring Cloud、Dubbo),一路聊到分布式事务、分布式锁、ZooKeeper等知识。今天就来聊聊分布式锁这块的知识,先具体的来看看Redis分布式锁的实现...
标题"redis-distributed-lock-starter.rar"暗示这是一个Spring Boot起步依赖项目,目的是简化集成Redis分布式锁到Spring Boot应用的过程。Spring Boot的“start”规范通常包含自动配置、Bean定义和其他便利功能,...
在Linux系统中,分布式锁管理(Distributed Lock Manager,简称ldlm)是用于控制多个进程或节点间对共享资源访问的重要机制。ldlm_lock是这个框架的核心部分,它处理锁的创建、获取、释放等操作。本篇文章将深入探讨...
分布式锁分布式锁定可确保您的方法无法从多个JVM(服务器,微服务集群等)并行运行。 它使用公用存储来跟踪已使用的锁,并且您的方法需要获取一个或多个锁才能运行。 默认情况下,锁遵循方法的生命周期,它们是在...
public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable { String lockKey = distributedLock.key(); // 获取锁 if (!redisTemplate.opsForValue()....
"lock_dlm.c"文件是这次更新的核心部分,它涉及到GFS2文件系统中的分布式锁管理器(DLM,Distributed Lock Manager)。DLM是GFS2实现其并发访问控制的关键组件,确保在集群环境中对共享资源的正确锁定和解锁,避免...