- 浏览: 1595946 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
jsrgzhangzhiyong:
关于null值的转换还是感觉不太友好,就像 mapstruct ...
我也造了个轮子:BeanMapping(属性拷贝) -
he037:
<div class="quote_title ...
基于zookeeper的分布式lock实现 -
seancheer:
qianshangding 写道首先节点启动后,尝试读取本地的 ...
zookeeper学习记录三(session,watcher,persit机制) -
雪夜归人:
您好,我想咨询一下,开源的canal都能支持mysql的哪些版 ...
Canal BinlogChange(mysql5.6) -
zhoudengyun:
copy 一份做记录,后续学习,请知悉
阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费
背景
继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。
这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制
算法
可以预先看一下zookeeper的官方文档:
lock操作过程:
- 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
- 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
- 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
- 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
- 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
- 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出
其中的几个关键点:
- node节点选择为EPHEMERAL_SEQUENTIAL很重要。
* 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
* 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。 - 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
注意:
- 使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
- 同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
没有两全其美的做法,两者取其一,选择自己一个能接受的即可
代码
public class DistributedLock { private static final byte[] data = { 0x12, 0x34 }; private ZooKeeperx zookeeper = ZooKeeperClient.getInstance(); private final String root; //根节点路径 private String id; private LockNode idName; private String ownerId; private String lastChildId; private Throwable other = null; private KeeperException exception = null; private InterruptedException interrupt = null; public DistributedLock(String root) { this.root = root; ensureExists(root); } /** * 尝试获取锁操作,阻塞式可被中断 */ public void lock() throws InterruptedException, KeeperException { // 可能初始化的时候就失败了 if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new NestableRuntimeException(other); } if (isOwner()) {//锁重入 return; } BooleanMutex mutex = new BooleanMutex(); acquireLock(mutex); // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试 try { mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true // mutex.get(); } catch (TimeoutException e) { if (!mutex.state()) { lock(); } } if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new NestableRuntimeException(other); } } /** * 尝试获取锁对象, 不会阻塞 * * @throws InterruptedException * @throws KeeperException */ public boolean tryLock() throws KeeperException { // 可能初始化的时候就失败了 if (exception != null) { throw exception; } if (isOwner()) {//锁重入 return true; } acquireLock(null); if (exception != null) { throw exception; } if (interrupt != null) { Thread.currentThread().interrupt(); } if (other != null) { throw new NestableRuntimeException(other); } return isOwner(); } /** * 释放锁对象 */ public void unlock() throws KeeperException { if (id != null) { try { zookeeper.delete(root + "/" + id, -1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } finally { id = null; } } else { //do nothing } } private void ensureExists(final String path) { try { Stat stat = zookeeper.exists(path, false); if (stat != null) { return; } zookeeper.create(path, data, CreateMode.PERSISTENT); } catch (KeeperException e) { exception = e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); interrupt = e; } } /** * 返回锁对象对应的path */ public String getRoot() { return root; } /** * 判断当前是不是锁的owner */ public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } /** * 返回当前的节点id */ public String getId() { return this.id; } // ===================== helper method ============================= /** * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作 */ private Boolean acquireLock(final BooleanMutex mutex) { try { do { if (id == null) {//构建当前lock的唯一标识 long sessionId = zookeeper.getDelegate().getSessionId(); String prefix = "x-" + sessionId + "-"; //如果第一次,则创建一个节点 String path = zookeeper.create(root + "/" + prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL); int index = path.lastIndexOf("/"); id = StringUtils.substring(path, index + 1); idName = new LockNode(id); } if (id != null) { List<String> names = zookeeper.getChildren(root, false); if (names.isEmpty()) { id = null;//异常情况,重新创建一个 } else { //对节点进行排序 SortedSet<LockNode> sortedNames = new TreeSet<LockNode>(); for (String name : names) { sortedNames.add(new LockNode(name)); } if (sortedNames.contains(idName) == false) { id = null;//清空为null,重新创建一个 continue; } //将第一个节点做为ownerId ownerId = sortedNames.first().getName(); if (mutex != null && isOwner()) { mutex.set(true);//直接更新状态,返回 return true; } else if (mutex == null) { return isOwner(); } SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { //关注一下排队在自己之前的最近的一个节点 LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); //异步watcher处理 zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() { public void asyncProcess(WatchedEvent event) { acquireLock(mutex); } }); if (stat == null) { acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去 } } else { if (isOwner()) { mutex.set(true); } else { id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同 } } } } } while (id == null); } catch (KeeperException e) { exception = e; if (mutex != null) { mutex.set(true); } } catch (InterruptedException e) { interrupt = e; if (mutex != null) { mutex.set(true); } } catch (Throwable e) { other = e; if (mutex != null) { mutex.set(true); } } if (isOwner() && mutex != null) { mutex.set(true); } return Boolean.FALSE; } }
相关说明:
测试代码:
@Test public void test_lock() { ExecutorService exeucotr = Executors.newCachedThreadPool(); final int count = 50; final CountDownLatch latch = new CountDownLatch(count); final DistributedLock[] nodes = new DistributedLock[count]; for (int i = 0; i < count; i++) { final DistributedLock node = new DistributedLock(dir); nodes[i] = node; exeucotr.submit(new Runnable() { public void run() { try { Thread.sleep(1000); node.lock(); //获取锁 Thread.sleep(100 + RandomUtils.nextInt(100)); System.out.println("id: " + node.getId() + " is leader: " + node.isOwner()); } catch (InterruptedException e) { want.fail(); } catch (KeeperException e) { want.fail(); } finally { latch.countDown(); try { node.unlock(); } catch (KeeperException e) { want.fail(); } } } }); } try { latch.await(); } catch (InterruptedException e) { want.fail(); } exeucotr.shutdown(); }
升级版
实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。
大致原理就是ReentrantLock 和 DistributedLock的一个结合。
- 单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
- 拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。
代码:
public class DistributedReentrantLock extends DistributedLock { private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]"; private ReentrantLock reentrantLock = new ReentrantLock(); public DistributedReentrantLock(String root) { super(root); } public void lock() throws InterruptedException, KeeperException { reentrantLock.lock();//多线程竞争时,先拿到第一层锁 super.lock(); } public boolean tryLock() throws KeeperException { //多线程竞争时,先拿到第一层锁 return reentrantLock.tryLock() && super.tryLock(); } public void unlock() throws KeeperException { super.unlock(); reentrantLock.unlock();//多线程竞争时,释放最外层锁 } @Override public String getId() { return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId()); } @Override public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() && super.isOwner(); } }
测试代码:
@Test public void test_lock() { ExecutorService exeucotr = Executors.newCachedThreadPool(); final int count = 50; final CountDownLatch latch = new CountDownLatch(count); final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁 for (int i = 0; i < count; i++) { exeucotr.submit(new Runnable() { public void run() { try { Thread.sleep(1000); lock.lock(); Thread.sleep(100 + RandomUtils.nextInt(100)); System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner()); } catch (InterruptedException e) { want.fail(); } catch (KeeperException e) { want.fail(); } finally { latch.countDown(); try { lock.unlock(); } catch (KeeperException e) { want.fail(); } } } }); } try { latch.await(); } catch (InterruptedException e) { want.fail(); } exeucotr.shutdown(); }
最后
其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下
大致思路:
- 竞争资源标示: read_自增id , write_自增id
- 首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
- watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)
评论
8 楼
he037
2018-07-20
a417930422 写道
引用
使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
有几个问题:
1
引用
竞争资源的下一个id就可以获取锁
如果id标示被销毁,只会通知此id后的watcher。
根据这段代码:ownerId = sortedNames.first().getName();
如果被销毁的id非最小的id,此id后的watcher如何可以获得锁?
2
引用
这时可能会有两个process同时拿到锁在跑任务
如果销毁了某个id,zookeeper只会通知比它大的client,只通知一个客户端,怎么会有两个process同时拿到锁?
设想如下的执行序列:
客户端1创建了znode节点 /lock ,获得了锁。
客户端1进入了长时间的GC pause。
客户端1连接到ZooKeeper的Session过期了。znode节点 /lock 被自动删除。
客户端2创建了znode节点 /lock ,从而获得了锁。
客户端1从GC pause中恢复过来,它仍然认为自己持有锁。
最后,客户端1和客户端2都认为自己持有了锁,冲突了
7 楼
a417930422
2016-02-18
引用
使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
有几个问题:
1
引用
竞争资源的下一个id就可以获取锁
如果id标示被销毁,只会通知此id后的watcher。
根据这段代码:ownerId = sortedNames.first().getName();
如果被销毁的id非最小的id,此id后的watcher如何可以获得锁?
2
引用
这时可能会有两个process同时拿到锁在跑任务
如果销毁了某个id,zookeeper只会通知比它大的client,只通知一个客户端,怎么会有两个process同时拿到锁?
6 楼
weiboxie
2015-12-06
大哥,能提供一下LockNode这个对象结构吗
5 楼
zhaoshijie
2015-09-12
建议楼主把源码弄出来
4 楼
kojavaee
2015-03-15
大哥,你的分布锁有遇到zkclient节点宕机了,造成死锁的问题么?
可以提供一下ZooKeeperClient的代码么?
可以提供一下ZooKeeperClient的代码么?
3 楼
agapple
2013-11-18
accp_huangxin 写道
大哥,能提供一下LockNode这个对象结构吗
public class LockNode implements Comparable<LockNode> { private final String name; private String prefix; private int sequence = -1; public LockNode(String name){ Assert.notNull(name, "id cannot be null"); this.name = name; this.prefix = name; int idx = name.lastIndexOf('-'); if (idx >= 0) { this.prefix = name.substring(0, idx); try { this.sequence = Integer.parseInt(name.substring(idx + 1)); } catch (Exception e) { // ignore } } } public int compareTo(LockNode that) { int s1 = this.sequence; int s2 = that.sequence; if (s1 == -1 && s2 == -1) { return this.name.compareTo(that.name); } if (s1 == -1) { return -1; } else if (s2 == -1) { return 1; } else { return s1 - s2; } } public String getName() { return name; } public int getSequence() { return sequence; } public String getPrefix() { return prefix; } public String toString() { return name.toString(); } // ==================== hashcode & equals方法======================= @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((name == null) ? 0 : name.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } LockNode other = (LockNode) obj; if (name == null) { if (other.name != null) { return false; } } else if (!name.equals(other.name)) { return false; } return true; } }
2 楼
accp_huangxin
2013-11-15
大哥,能提供一下LockNode这个对象结构吗
1 楼
ykdsg
2013-01-08
能提供源码不
发表评论
-
yugong QuickStart
2016-03-05 01:52 0几点说明 a. 数据迁移的方案可参见设计文档,oracl ... -
阿里巴巴开源项目: 阿里巴巴去Oracle数据迁移同步工具
2016-03-05 18:29 6514背景 08年左右,阿里巴巴开始尝试MySQL的相关 ... -
愚公performance
2016-03-02 17:29 0性能测试 全量测试 场景1 (单主键, ... -
yugong AdminGuide
2016-03-02 16:40 0环境要求 操作系统 数据库 迁移方案 部署 ... -
Tddl_hint
2014-01-27 13:52 0背景 工作原理 Hint格式 direct模 ... -
tddl5分库规则
2014-01-26 14:41 0背景 工作原理 构建语法树 元数据 基于 ... -
tddl5优化器
2014-01-22 15:12 0背景 工作原理 构建语法树 元数据 抽象语 ... -
Canal BinlogChange(mariadb5/10)
2014-01-20 17:25 4592背景 先前开源了一个 ... -
asynload quickstart
2013-10-08 22:49 0几点说明: 1. asyncload是做为一个j ... -
映射规则配置
2013-09-26 11:25 0背景 因为alibaba的特殊业务,比如: 同 ... -
网友文档贡献
2013-09-18 15:50 01. Otter源代码解析系列 链接:http://e ... -
Manager配置介绍
2013-09-16 13:00 0通道配置说明 多种同步方式配置 a. 单向同步 ... -
canal&otter FAQ
2013-09-05 17:30 0常见问题 1. canal和 ... -
阿里巴巴开源项目:分布式数据库同步系统otter(解决中美异地机房)
2013-08-22 16:48 40437项目背景 阿里巴巴B2B公司,因为业务的特性 ... -
Otter AdminGuide
2013-08-19 11:06 0几点说明 otter系统自带了manager,所以简化了一 ... -
Otter高可用性
2013-08-17 23:41 0基本需求 网络不可靠,异地机房尤为明显. man ... -
Otter数据一致性
2013-08-17 23:39 0技术选型分析 需要处理一致性的业务场景: 多地修改 ( ... -
Otter扩展性
2013-08-17 22:20 0扩展性定义 按照实现不同,可分为两类: 数据处理自定 ... -
Otter双向回环控制
2013-08-17 21:37 0基本需求 支持mysql/oracle的异构数据库的双 ... -
Otter调度模型
2013-08-17 20:13 0背景 在介绍调度模型之前,首先了解一下otter系统要解 ...
相关推荐
**Zookeeper的分布式锁实现原理** 1. **节点创建与监视**: Zookeeper允许客户端创建临时节点,这些节点会在客户端断开连接时自动删除。分布式锁的实现通常会为每个请求创建一个临时顺序节点,按照创建的顺序形成一...
总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...
它提供了多种同步原语,如`Semaphore`、`Lock`和`Event`,帮助开发者实现分布式环境下的互斥访问、资源限制和条件等待。这些原语对于构建分布式算法和保证数据一致性至关重要。 此外,Zookeeper还提供了**组服务**...
在处理订单生成的场景中,我们可以这样应用ZooKeeper分布式锁: 1. 当用户发起订单请求时,服务端会尝试在ZooKeeper上创建一个临时顺序节点。 2. 如果创建成功,服务端会检查当前最小序号的节点是否是自己创建的。...
Zookeeper 是一个广泛使用的分布式协调服务,它可以用来实现高效的分布式锁,尤其适合那些对数据一致性要求较高的场景。 Zookeeper 的分布式锁实现依赖于其核心特性: 1. **节点的互斥性**:Zookeeper 允许客户端...
**分布式重入排他锁(Reentrant Lock)在Zookeeper中的实现** Zookeeper是一个开源的分布式协调服务,常用于分布式环境中的一致性问题,如配置管理、命名服务、分布式同步等。在分布式系统中,为了保证数据的一致性...
ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...
《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...
在本场景中,我们利用Zookeeper的临时顺序节点来实现一个分布式调度解决方案,并结合Quartz进行作业调度。Zookeeper是一个分布式协调服务,而Quartz则是一个强大的、开源的作业调度框架,两者结合能构建出高可用、可...
分布式锁的实现基于Zookeeper的临时节点和监视器机制。通常,每个想要获取锁的客户端会在特定的zNode下创建一个临时节点。如果创建成功,说明获取了锁;如果失败,说明已有其他客户端持有锁。客户端还可以设置监视器...
使用zookeeper来实现分布式锁 原理 监听zookeeper的临时有序节点,监听到NodeDeleted事件,就会让线程重新获取锁 测试方法 public class ZookeeperLockTest { public static void main(String[] args) throws ...
在java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾明思议就是...
以下是一个简单的基于Zookeeper的分布式锁实现示例: ```java @Service public class ZookeeperDistributedLock { @Autowired private CuratorFramework curatorFramework; private String lockPath = "/...
本项目基于SpringBoot框架,结合Redis、Zookeeper和RabbitMQ实现了分布式锁,让我们一起深入探讨这三个组件在分布式锁中的作用和实现机制。 **SpringBoot** 是一个轻量级的Java开发框架,它简化了新Spring应用的...
在Spring XD中使用ZooKeeper可以实现分布式环境下的协调,例如在集群中管理服务的分布和任务分配。 对于领导者选举,ZooKeeper提供了一种无需羊群效应(Herd Effect)的锁机制。这种锁的实现依赖于临时顺序节点的...
4. **zkLockTest**:这个文件很可能是实现Zookeeper分布式锁的测试代码,可能包含客户端的连接、锁的获取和释放、异常处理等逻辑。通过对这个测试代码的分析和运行,我们可以深入理解Zookeeper分布式锁的工作机制。 ...
【Zookeeper实现分布式锁】 Zookeeper 是 Apache ...总之,Zookeeper 的分布式锁机制是基于其强大的一致性特性和事件通知能力实现的,它在分布式系统中扮演着至关重要的角色,帮助协调和管理分布式环境下的并发操作。
本文将详细讲解如何使用Java与Apache ZooKeeper实现一个分布式锁的示例。 ZooKeeper是一个分布式协调服务,它提供了一种可靠的方式来管理和同步分布式系统的数据。在分布式锁的场景中,ZooKeeper可以作为一个中心化...
本项目“springboot redis zookeeperlock rabbit实现的分布式锁”结合了Spring Boot、Redis、Zookeeper以及RabbitMQ这四款强大的工具,旨在构建一个健壮的分布式锁系统。以下是关于这些技术及其在分布式锁中的应用的...
通过深入理解Zookeeper的工作原理以及ZookeeperNet库的使用,开发者可以有效地在C#环境中实现高可用的分布式锁,保障多节点之间的协同工作和数据一致性。在实际项目中,分布式锁可以广泛应用于数据库操作、并发任务...