`
fsplove520
  • 浏览: 27788 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

基于zookeeper的分布式lock实现

 
阅读更多
博客分类: opensourcejavadistributed
参考地址
https://github.com/alibaba/canal/blob/5b56a027e59e46f13e9024bc2f24399981c29d7a/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZooKeeperx.java

转载这位大神的,做个记录,因为项目中用到zookeeper的分布式锁
.背景
继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。



这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制



算法
可以预先看一下zookeeper的官方文档:



•http://zookeeper.apache.org/doc/trunk/recipes.html
lock操作过程:
•首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
•每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
•进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
•按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
•如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
•将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出


其中的几个关键点:
1.node节点选择为EPHEMERAL_SEQUENTIAL很重要。
* 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
* 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
2.获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)


注意:
•使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
•同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
没有两全其美的做法,两者取其一,选择自己一个能接受的即可


代码
Java代码 
1.public class DistributedLock {  
2. 
3.    private static final byte[]  data      = { 0x12, 0x34 };  
4.    private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();  
5.    private final String         root;                                     //根节点路径  
6.    private String               id;  
7.    private LockNode             idName;  
8.    private String               ownerId;  
9.    private String               lastChildId;  
10.    private Throwable            other     = null;  
11.    private KeeperException      exception = null;  
12.    private InterruptedException interrupt = null;  
13. 
14.    public DistributedLock(String root) {  
15.        this.root = root;  
16.        ensureExists(root);  
17.    }  
18. 
19.    /** 
20.     * 尝试获取锁操作,阻塞式可被中断 
21.     */ 
22.    public void lock() throws InterruptedException, KeeperException {  
23.        // 可能初始化的时候就失败了  
24.        if (exception != null) {  
25.            throw exception;  
26.        }  
27. 
28.        if (interrupt != null) {  
29.            throw interrupt;  
30.        }  
31. 
32.        if (other != null) {  
33.            throw new NestableRuntimeException(other);  
34.        }  
35. 
36.        if (isOwner()) {//锁重入  
37.            return;  
38.        }  
39. 
40.        BooleanMutex mutex = new BooleanMutex();  
41.        acquireLock(mutex);  
42.        // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试  
43.        try {  
44.            mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true  
45.            // mutex.get();  
46.        } catch (TimeoutException e) {  
47.            if (!mutex.state()) {  
48.                lock();  
49.            }  
50.        }  
51. 
52.        if (exception != null) {  
53.            throw exception;  
54.        }  
55. 
56.        if (interrupt != null) {  
57.            throw interrupt;  
58.        }  
59. 
60.        if (other != null) {  
61.            throw new NestableRuntimeException(other);  
62.        }  
63.    }  
64. 
65.    /** 
66.     * 尝试获取锁对象, 不会阻塞 
67.     *  
68.     * @throws InterruptedException 
69.     * @throws KeeperException 
70.     */ 
71.    public boolean tryLock() throws KeeperException {  
72.        // 可能初始化的时候就失败了  
73.        if (exception != null) {  
74.            throw exception;  
75.        }  
76. 
77.        if (isOwner()) {//锁重入  
78.            return true;  
79.        }  
80. 
81.        acquireLock(null);  
82. 
83.        if (exception != null) {  
84.            throw exception;  
85.        }  
86. 
87.        if (interrupt != null) {  
88.            Thread.currentThread().interrupt();  
89.        }  
90. 
91.        if (other != null) {  
92.            throw new NestableRuntimeException(other);  
93.        }  
94. 
95.        return isOwner();  
96.    }  
97. 
98.    /** 
99.     * 释放锁对象 
100.     */ 
101.    public void unlock() throws KeeperException {  
102.        if (id != null) {  
103.            try {  
104.                zookeeper.delete(root + "/" + id, -1);  
105.            } catch (InterruptedException e) {  
106.                Thread.currentThread().interrupt();  
107.            } catch (KeeperException.NoNodeException e) {  
108.                // do nothing  
109.            } finally {  
110.                id = null;  
111.            }  
112.        } else {  
113.            //do nothing  
114.        }  
115.    }  
116. 
117.    private void ensureExists(final String path) {  
118.        try {  
119.            Stat stat = zookeeper.exists(path, false);  
120.            if (stat != null) {  
121.                return;  
122.            }  
123. 
124.            zookeeper.create(path, data, CreateMode.PERSISTENT);  
125.        } catch (KeeperException e) {  
126.            exception = e;  
127.        } catch (InterruptedException e) {  
128.            Thread.currentThread().interrupt();  
129.            interrupt = e;  
130.        }  
131.    }  
132. 
133.    /** 
134.     * 返回锁对象对应的path 
135.     */ 
136.    public String getRoot() {  
137.        return root;  
138.    }  
139. 
140.    /** 
141.     * 判断当前是不是锁的owner 
142.     */ 
143.    public boolean isOwner() {  
144.        return id != null && ownerId != null && id.equals(ownerId);  
145.    }  
146. 
147.    /** 
148.     * 返回当前的节点id 
149.     */ 
150.    public String getId() {  
151.        return this.id;  
152.    }  
153. 
154.    // ===================== helper method =============================  
155. 
156.    /** 
157.     * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作 
158.     */ 
159.    private Boolean acquireLock(final BooleanMutex mutex) {  
160.        try {  
161.            do {  
162.                if (id == null) {//构建当前lock的唯一标识  
163.                    long sessionId = zookeeper.getDelegate().getSessionId();  
164.                    String prefix = "x-" + sessionId + "-";  
165.                    //如果第一次,则创建一个节点  
166.                    String path = zookeeper.create(root + "/" + prefix, data,  
167.                            CreateMode.EPHEMERAL_SEQUENTIAL);  
168.                    int index = path.lastIndexOf("/");  
169.                    id = StringUtils.substring(path, index + 1);  
170.                    idName = new LockNode(id);  
171.                }  
172. 
173.                if (id != null) {  
174.                    List<String> names = zookeeper.getChildren(root, false);  
175.                    if (names.isEmpty()) {  
176.                        id = null;//异常情况,重新创建一个  
177.                    } else {  
178.                        //对节点进行排序  
179.                        SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();  
180.                        for (String name : names) {  
181.                            sortedNames.add(new LockNode(name));  
182.                        }  
183. 
184.                        if (sortedNames.contains(idName) == false) {  
185.                            id = null;//清空为null,重新创建一个  
186.                            continue;  
187.                        }  
188. 
189.                        //将第一个节点做为ownerId  
190.                        ownerId = sortedNames.first().getName();  
191.                        if (mutex != null && isOwner()) {  
192.                            mutex.set(true);//直接更新状态,返回  
193.                            return true;  
194.                        } else if (mutex == null) {  
195.                            return isOwner();  
196.                        }  
197. 
198.                        SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);  
199.                        if (!lessThanMe.isEmpty()) {  
200.                            //关注一下排队在自己之前的最近的一个节点  
201.                            LockNode lastChildName = lessThanMe.last();  
202.                            lastChildId = lastChildName.getName();  
203.                            //异步watcher处理  
204.                            zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {  
205. 
206.                                public void asyncProcess(WatchedEvent event) {  
207.                                    acquireLock(mutex);  
208.                                }  
209. 
210.                            });  
211. 
212.                            if (stat == null) {  
213.                                acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去  
214.                            }  
215.                        } else {  
216.                            if (isOwner()) {  
217.                                mutex.set(true);  
218.                            } else {  
219.                                id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同  
220.                            }  
221.                        }  
222.                    }  
223.                }  
224.            } while (id == null);  
225.        } catch (KeeperException e) {  
226.            exception = e;  
227.            if (mutex != null) {  
228.                mutex.set(true);  
229.            }  
230.        } catch (InterruptedException e) {  
231.            interrupt = e;  
232.            if (mutex != null) {  
233.                mutex.set(true);  
234.            }  
235.        } catch (Throwable e) {  
236.            other = e;  
237.            if (mutex != null) {  
238.                mutex.set(true);  
239.            }  
240.        }  
241. 
242.        if (isOwner() && mutex != null) {  
243.            mutex.set(true);  
244.        }  
245.        return Boolean.FALSE;  
246.    }  
247.} 
public class DistributedLock {

    private static final byte[]  data      = { 0x12, 0x34 };
    private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();
    private final String         root;                                     //根节点路径
    private String               id;
    private LockNode             idName;
    private String               ownerId;
    private String               lastChildId;
    private Throwable            other     = null;
    private KeeperException      exception = null;
    private InterruptedException interrupt = null;

    public DistributedLock(String root) {
        this.root = root;
        ensureExists(root);
    }

    /**
     * 尝试获取锁操作,阻塞式可被中断
     */
    public void lock() throws InterruptedException, KeeperException {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }

        if (isOwner()) {//锁重入
            return;
        }

        BooleanMutex mutex = new BooleanMutex();
        acquireLock(mutex);
        // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
        try {
            mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
            // mutex.get();
        } catch (TimeoutException e) {
            if (!mutex.state()) {
                lock();
            }
        }

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }
    }

    /**
     * 尝试获取锁对象, 不会阻塞
     *
     * @throws InterruptedException
     * @throws KeeperException
     */
    public boolean tryLock() throws KeeperException {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (isOwner()) {//锁重入
            return true;
        }

        acquireLock(null);

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            Thread.currentThread().interrupt();
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }

        return isOwner();
    }

    /**
     * 释放锁对象
     */
    public void unlock() throws KeeperException {
        if (id != null) {
            try {
                zookeeper.delete(root + "/" + id, -1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } finally {
                id = null;
            }
        } else {
            //do nothing
        }
    }

    private void ensureExists(final String path) {
        try {
            Stat stat = zookeeper.exists(path, false);
            if (stat != null) {
                return;
            }

            zookeeper.create(path, data, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            exception = e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            interrupt = e;
        }
    }

    /**
     * 返回锁对象对应的path
     */
    public String getRoot() {
        return root;
    }

    /**
     * 判断当前是不是锁的owner
     */
    public boolean isOwner() {
        return id != null && ownerId != null && id.equals(ownerId);
    }

    /**
     * 返回当前的节点id
     */
    public String getId() {
        return this.id;
    }

    // ===================== helper method =============================

    /**
     * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
     */
    private Boolean acquireLock(final BooleanMutex mutex) {
        try {
            do {
                if (id == null) {//构建当前lock的唯一标识
                    long sessionId = zookeeper.getDelegate().getSessionId();
                    String prefix = "x-" + sessionId + "-";
                    //如果第一次,则创建一个节点
                    String path = zookeeper.create(root + "/" + prefix, data,
                            CreateMode.EPHEMERAL_SEQUENTIAL);
                    int index = path.lastIndexOf("/");
                    id = StringUtils.substring(path, index + 1);
                    idName = new LockNode(id);
                }

                if (id != null) {
                    List<String> names = zookeeper.getChildren(root, false);
                    if (names.isEmpty()) {
                        id = null;//异常情况,重新创建一个
                    } else {
                        //对节点进行排序
                        SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
                        for (String name : names) {
                            sortedNames.add(new LockNode(name));
                        }

                        if (sortedNames.contains(idName) == false) {
                            id = null;//清空为null,重新创建一个
                            continue;
                        }

                        //将第一个节点做为ownerId
                        ownerId = sortedNames.first().getName();
                        if (mutex != null && isOwner()) {
                            mutex.set(true);//直接更新状态,返回
                            return true;
                        } else if (mutex == null) {
                            return isOwner();
                        }

                        SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
                        if (!lessThanMe.isEmpty()) {
                            //关注一下排队在自己之前的最近的一个节点
                            LockNode lastChildName = lessThanMe.last();
                            lastChildId = lastChildName.getName();
                            //异步watcher处理
                            zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {

                                public void asyncProcess(WatchedEvent event) {
                                    acquireLock(mutex);
                                }

                            });

                            if (stat == null) {
                                acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
                            }
                        } else {
                            if (isOwner()) {
                                mutex.set(true);
                            } else {
                                id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
                            }
                        }
                    }
                }
            } while (id == null);
        } catch (KeeperException e) {
            exception = e;
            if (mutex != null) {
                mutex.set(true);
            }
        } catch (InterruptedException e) {
            interrupt = e;
            if (mutex != null) {
                mutex.set(true);
            }
        } catch (Throwable e) {
            other = e;
            if (mutex != null) {
                mutex.set(true);
            }
        }

        if (isOwner() && mutex != null) {
            mutex.set(true);
        }
        return Boolean.FALSE;
    }
}相关说明:








测试代码:



Java代码 
1.@Test 
2.    public void test_lock() {  
3.        ExecutorService exeucotr = Executors.newCachedThreadPool();  
4.        final int count = 50;  
5.        final CountDownLatch latch = new CountDownLatch(count);  
6.        final DistributedLock[] nodes = new DistributedLock[count];  
7.        for (int i = 0; i < count; i++) {  
8.            final DistributedLock node = new DistributedLock(dir);  
9.            nodes[i] = node;  
10.            exeucotr.submit(new Runnable() {  
11. 
12.                public void run() {  
13.                    try {  
14.                        Thread.sleep(1000);  
15.                        node.lock(); //获取锁  
16.                        Thread.sleep(100 + RandomUtils.nextInt(100));  
17. 
18.                        System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());  
19.                    } catch (InterruptedException e) {  
20.                        want.fail();  
21.                    } catch (KeeperException e) {  
22.                        want.fail();  
23.                    } finally {  
24.                        latch.countDown();  
25.                        try {  
26.                            node.unlock();  
27.                        } catch (KeeperException e) {  
28.                            want.fail();  
29.                        }  
30.                    }  
31. 
32.                }  
33.            });  
34.        }  
35. 
36.        try {  
37.            latch.await();  
38.        } catch (InterruptedException e) {  
39.            want.fail();  
40.        }  
41. 
42.        exeucotr.shutdown();  
43.    } 
@Test
    public void test_lock() {
        ExecutorService exeucotr = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);
        final DistributedLock[] nodes = new DistributedLock[count];
        for (int i = 0; i < count; i++) {
            final DistributedLock node = new DistributedLock(dir);
            nodes[i] = node;
            exeucotr.submit(new Runnable() {

                public void run() {
                    try {
                        Thread.sleep(1000);
                        node.lock(); //获取锁
                        Thread.sleep(100 + RandomUtils.nextInt(100));

                        System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
                    } catch (InterruptedException e) {
                        want.fail();
                    } catch (KeeperException e) {
                        want.fail();
                    } finally {
                        latch.countDown();
                        try {
                            node.unlock();
                        } catch (KeeperException e) {
                            want.fail();
                        }
                    }

                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            want.fail();
        }

        exeucotr.shutdown();
    }

升级版
实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。



大致原理就是ReentrantLock 和 DistributedLock的一个结合。





• 单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
•拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。


代码:
Java代码 
1.public class DistributedReentrantLock extends DistributedLock {  
2. 
3.    private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";  
4.    private ReentrantLock       reentrantLock = new ReentrantLock();  
5. 
6.    public DistributedReentrantLock(String root) {  
7.        super(root);  
8.    }  
9. 
10.    public void lock() throws InterruptedException, KeeperException {  
11.        reentrantLock.lock();//多线程竞争时,先拿到第一层锁  
12.        super.lock();  
13.    }  
14. 
15.    public boolean tryLock() throws KeeperException {  
16.        //多线程竞争时,先拿到第一层锁  
17.        return reentrantLock.tryLock() && super.tryLock();  
18.    }  
19. 
20.    public void unlock() throws KeeperException {  
21.        super.unlock();  
22.        reentrantLock.unlock();//多线程竞争时,释放最外层锁  
23.    }  
24. 
25.    @Override 
26.    public String getId() {  
27.        return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());  
28.    }  
29. 
30.    @Override 
31.    public boolean isOwner() {  
32.        return reentrantLock.isHeldByCurrentThread() && super.isOwner();  
33.    }  
34. 
35.} 
public class DistributedReentrantLock extends DistributedLock {

    private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";
    private ReentrantLock       reentrantLock = new ReentrantLock();

    public DistributedReentrantLock(String root) {
        super(root);
    }

    public void lock() throws InterruptedException, KeeperException {
        reentrantLock.lock();//多线程竞争时,先拿到第一层锁
        super.lock();
    }

    public boolean tryLock() throws KeeperException {
        //多线程竞争时,先拿到第一层锁
        return reentrantLock.tryLock() && super.tryLock();
    }

    public void unlock() throws KeeperException {
        super.unlock();
        reentrantLock.unlock();//多线程竞争时,释放最外层锁
    }

    @Override
    public String getId() {
        return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
    }

    @Override
    public boolean isOwner() {
        return reentrantLock.isHeldByCurrentThread() && super.isOwner();
    }

}
测试代码:
Java代码 
1.@Test 
2.    public void test_lock() {  
3.        ExecutorService exeucotr = Executors.newCachedThreadPool();  
4.        final int count = 50;  
5.        final CountDownLatch latch = new CountDownLatch(count);  
6. 
7.        final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁  
8.        for (int i = 0; i < count; i++) {  
9.            exeucotr.submit(new Runnable() {  
10. 
11.                public void run() {  
12.                    try {  
13.                        Thread.sleep(1000);  
14.                        lock.lock();  
15.                        Thread.sleep(100 + RandomUtils.nextInt(100));  
16. 
17.                        System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());  
18.                    } catch (InterruptedException e) {  
19.                        want.fail();  
20.                    } catch (KeeperException e) {  
21.                        want.fail();  
22.                    } finally {  
23.                        latch.countDown();  
24.                        try {  
25.                            lock.unlock();  
26.                        } catch (KeeperException e) {  
27.                            want.fail();  
28.                        }  
29.                    }  
30. 
31.                }  
32.            });  
33.        }  
34. 
35.        try {  
36.            latch.await();  
37.        } catch (InterruptedException e) {  
38.            want.fail();  
39.        }  
40. 
41.        exeucotr.shutdown();  
42.    } 
@Test
    public void test_lock() {
        ExecutorService exeucotr = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);

        final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
        for (int i = 0; i < count; i++) {
            exeucotr.submit(new Runnable() {

                public void run() {
                    try {
                        Thread.sleep(1000);
                        lock.lock();
                        Thread.sleep(100 + RandomUtils.nextInt(100));

                        System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
                    } catch (InterruptedException e) {
                        want.fail();
                    } catch (KeeperException e) {
                        want.fail();
                    } finally {
                        latch.countDown();
                        try {
                            lock.unlock();
                        } catch (KeeperException e) {
                            want.fail();
                        }
                    }

                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            want.fail();
        }

        exeucotr.shutdown();
    }最后
其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下



大致思路:



1.竞争资源标示:  read_自增id , write_自增id
2.首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
3.watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)
分享到:
评论

相关推荐

    zookeeper分布式锁实现和客户端简单实现

    **Zookeeper的分布式锁实现原理** 1. **节点创建与监视**: Zookeeper允许客户端创建临时节点,这些节点会在客户端断开连接时自动删除。分布式锁的实现通常会为每个请求创建一个临时顺序节点,按照创建的顺序形成一...

    C#基于zookeeper分布式锁的实现源码

    总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...

    Zookeeper分布式应用程序协调服务:zookeeper-3.4.13

    它提供了多种同步原语,如`Semaphore`、`Lock`和`Event`,帮助开发者实现分布式环境下的互斥访问、资源限制和条件等待。这些原语对于构建分布式算法和保证数据一致性至关重要。 此外,Zookeeper还提供了**组服务**...

    使用ZooKeeper实现分布式锁

    在处理订单生成的场景中,我们可以这样应用ZooKeeper分布式锁: 1. 当用户发起订单请求时,服务端会尝试在ZooKeeper上创建一个临时顺序节点。 2. 如果创建成功,服务端会检查当前最小序号的节点是否是自己创建的。...

    zookeeper 分布式锁的实现1

    Zookeeper 是一个广泛使用的分布式协调服务,它可以用来实现高效的分布式锁,尤其适合那些对数据一致性要求较高的场景。 Zookeeper 的分布式锁实现依赖于其核心特性: 1. **节点的互斥性**:Zookeeper 允许客户端...

    Zookeeper 分布式重入排它锁实现

    **分布式重入排他锁(Reentrant Lock)在Zookeeper中的实现** Zookeeper是一个开源的分布式协调服务,常用于分布式环境中的一致性问题,如配置管理、命名服务、分布式同步等。在分布式系统中,为了保证数据的一致性...

    zookeeper分布式锁实例源码

    ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...

    一文彻底理解ZooKeeper分布式锁的实现原理

    《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...

    基于zookeeper临时顺序节点实现分布式调度

    在本场景中,我们利用Zookeeper的临时顺序节点来实现一个分布式调度解决方案,并结合Quartz进行作业调度。Zookeeper是一个分布式协调服务,而Quartz则是一个强大的、开源的作业调度框架,两者结合能构建出高可用、可...

    基于Zookeeper实现分布式锁实践教程

    分布式锁的实现基于Zookeeper的临时节点和监视器机制。通常,每个想要获取锁的客户端会在特定的zNode下创建一个临时节点。如果创建成功,说明获取了锁;如果失败,说明已有其他客户端持有锁。客户端还可以设置监视器...

    zookeeper:基于Zookeeper的分布式锁

    使用zookeeper来实现分布式锁 原理 监听zookeeper的临时有序节点,监听到NodeDeleted事件,就会让线程重新获取锁 测试方法 public class ZookeeperLockTest { public static void main(String[] args) throws ...

    zookeeper实现分布式锁

    在java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾明思议就是...

    springboot zookeeper 分布式锁

    以下是一个简单的基于Zookeeper的分布式锁实现示例: ```java @Service public class ZookeeperDistributedLock { @Autowired private CuratorFramework curatorFramework; private String lockPath = "/...

    springboot redis zookeeperlock rabbit实现的分布式锁

    本项目基于SpringBoot框架,结合Redis、Zookeeper和RabbitMQ实现了分布式锁,让我们一起深入探讨这三个组件在分布式锁中的作用和实现机制。 **SpringBoot** 是一个轻量级的Java开发框架,它简化了新Spring应用的...

    陶隽-基于Apache Zookeeper的分布式协调原理及应用

    在Spring XD中使用ZooKeeper可以实现分布式环境下的协调,例如在集群中管理服务的分布和任务分配。 对于领导者选举,ZooKeeper提供了一种无需羊群效应(Herd Effect)的锁机制。这种锁的实现依赖于临时顺序节点的...

    Zookeeper实现分布式锁

    4. **zkLockTest**:这个文件很可能是实现Zookeeper分布式锁的测试代码,可能包含客户端的连接、锁的获取和释放、异常处理等逻辑。通过对这个测试代码的分析和运行,我们可以深入理解Zookeeper分布式锁的工作机制。 ...

    通过zookeeper实现分布式锁

    【Zookeeper实现分布式锁】 Zookeeper 是 Apache ...总之,Zookeeper 的分布式锁机制是基于其强大的一致性特性和事件通知能力实现的,它在分布式系统中扮演着至关重要的角色,帮助协调和管理分布式环境下的并发操作。

    java使用zookeeper实现的分布式锁示例

    本文将详细讲解如何使用Java与Apache ZooKeeper实现一个分布式锁的示例。 ZooKeeper是一个分布式协调服务,它提供了一种可靠的方式来管理和同步分布式系统的数据。在分布式锁的场景中,ZooKeeper可以作为一个中心化...

    springboot redis zookeeperlock rabbit实现的分布式锁.zip

    本项目“springboot redis zookeeperlock rabbit实现的分布式锁”结合了Spring Boot、Redis、Zookeeper以及RabbitMQ这四款强大的工具,旨在构建一个健壮的分布式锁系统。以下是关于这些技术及其在分布式锁中的应用的...

    ZookeeperNet实现分布式锁

    通过深入理解Zookeeper的工作原理以及ZookeeperNet库的使用,开发者可以有效地在C#环境中实现高可用的分布式锁,保障多节点之间的协同工作和数据一致性。在实际项目中,分布式锁可以广泛应用于数据库操作、并发任务...

Global site tag (gtag.js) - Google Analytics