`
乡里伢崽
  • 浏览: 112312 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

ZooKeeper 分布式锁实现

阅读更多
场景描述

在分布式应用, 往往存在多个进程提供同一服务. 这些进程有可能在相同的机器上, 也有可能分布在不同的机器上. 如果这些进程共享了一些资源, 可能就需要分布式锁来锁定对这些资源的访问.
本文将介绍如何利用zookeeper实现分布式锁.

获取锁实现思路:

1.     首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node
2.     希望获得锁的客户端在锁目录下创建znode,作为锁/lock_node的子节点,并且节点类型为有序临时节点(EPHEMERAL_SEQUENTIAL);
        例如:有两个客户端创建znode,分别为/lock_node/lock-1和/lock_node/lock-2
3.     当前客户端调用getChildren(/lock_node)得到锁目录所有子节点,不设置watch,接着获取小于自己(步骤2创建)的兄弟节点
4.     步骤3中获取小于自己的节点不存在 && 最小节点与步骤2中创建的相同,说明当前客户端顺序号最小,获得锁,结束。
5.     客户端监视(watch)相对自己次小的有序临时节点状态
6.     如果监视的次小节点状态发生变化,则跳转到步骤3,继续后续操作,直到退出锁竞争。

实现

以一个DistributedClient对象模拟一个进程的形式, 演示zookeeper分布式锁的实现.
public class DistributedClient {
    // 超时时间
    private static final int SESSION_TIMEOUT = 5000;
    // zookeeper server列表
    private String hosts = "localhost:4180,localhost:4181,localhost:4182";
    private String groupNode = "locks";
    private String subNode = "sub";

    private ZooKeeper zk;
    // 当前client创建的子节点
    private String thisPath;
    // 当前client等待的子节点
    private String waitPath;

    private CountDownLatch latch = new CountDownLatch(1);

    /**
     * 连接zookeeper
     */
    public void connectZookeeper() throws Exception {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                try {
                    // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
                    if (event.getState() == KeeperState.SyncConnected) {
                        latch.countDown();
                    }

                    // 发生了waitPath的删除事件
                    if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                        doSomething();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 等待连接建立
        latch.await();

        // 创建子节点
        thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);

        // wait一小会, 让结果更清晰一些
        Thread.sleep(10);

        // 注意, 没有必要监听"/locks"的子节点的变化情况
        List<String> childrenNodes = zk.getChildren("/" + groupNode, false);

        // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
        if (childrenNodes.size() == 1) {
            doSomething();
        } else {
            String thisNode = thisPath.substring(("/" + groupNode + "/").length());
            // 排序
            Collections.sort(childrenNodes);
            int index = childrenNodes.indexOf(thisNode);
            if (index == -1) {
                // never happened
            } else if (index == 0) {
                // inddx == 0, 说明thisNode在列表中最小, 当前client获得锁
                doSomething();
            } else {
                // 获得排名比thisPath前1位的节点
                this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
                // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法
                zk.getData(waitPath, true, new Stat());
            }
        }
    }

    private void doSomething() throws Exception {
        try {
            System.out.println("gain lock: " + thisPath);
            Thread.sleep(2000);
            // do something
        } finally {
            System.out.println("finished: " + thisPath);
            // 将thisPath删除, 监听thisPath的client将获得通知
            // 相当于释放锁
            zk.delete(this.thisPath, -1);
        }
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            new Thread() {
                public void run() {
                    try {
                        DistributedClient dl = new DistributedClient();
                        dl.connectZookeeper();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }

        Thread.sleep(Long.MAX_VALUE);
    }
}


思考

思维缜密的朋友可能会想到, 上述的方案并不安全. 假设某个client在获得锁之前挂掉了, 由于client创建的节点是ephemeral类型的, 因此这个节点也会被删除, 从而导致排在这个client之后的client提前获得了锁. 此时会存在多个client同时访问共享资源.
如何解决这个问题呢? 可以在接到waitPath的删除通知的时候, 进行一次确认, 确认当前的thisPath是否真的是列表中最小的节点.

// 发生了waitPath的删除事件
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
	// 确认thisPath是否真的是列表中的最小节点
	List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
	String thisNode = thisPath.substring(("/" + groupNode + "/").length());
	// 排序
	Collections.sort(childrenNodes);
	int index = childrenNodes.indexOf(thisNode);
	if (index == 0) {
		// 确实是最小节点
		doSomething();
	} else {
		// 说明waitPath是由于出现异常而挂掉的
		// 更新waitPath
		waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
		// 重新注册监听, 并判断此时waitPath是否已删除
		if (zk.exists(waitPath, true) == null) {
			doSomething();
		}
	}
}


另外, 由于thisPath和waitPath这2个成员变量会在多个线程中访问, 最好将他们声明为volatile, 以防止出现线程可见性问题.

另一种思路

下面介绍一种更简单, 但是不怎么推荐的解决方案.
每个client在getChildren的时候, 注册监听子节点的变化. 当子节点的变化通知到来时, 再一次通过getChildren获取子节点列表, 判断thisPath是否是列表中的最小节点, 如果是, 则执行资源访问逻辑.

public class DistributedClient2 {
	// 超时时间
	private static final int SESSION_TIMEOUT = 5000;
	// zookeeper server列表
	private String hosts = "localhost:4180,localhost:4181,localhost:4182";
	private String groupNode = "locks";
	private String subNode = "sub";

	private ZooKeeper zk;
	// 当前client创建的子节点
	private volatile String thisPath;

	private CountDownLatch latch = new CountDownLatch(1);

	/**
	 * 连接zookeeper
	 */
	public void connectZookeeper() throws Exception {
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
			public void process(WatchedEvent event) {
				try {
					// 连接建立时, 打开latch, 唤醒wait在该latch上的线程
					if (event.getState() == KeeperState.SyncConnected) {
						latch.countDown();
					}

					// 子节点发生变化
					if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
						// thisPath是否是列表中的最小节点
						List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
						String thisNode = thisPath.substring(("/" + groupNode + "/").length());
						// 排序
						Collections.sort(childrenNodes);
						if (childrenNodes.indexOf(thisNode) == 0) {
							doSomething();
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});

		// 等待连接建立
		latch.await();

		// 创建子节点
		thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
				CreateMode.EPHEMERAL_SEQUENTIAL);

		// wait一小会, 让结果更清晰一些
		Thread.sleep(10);

		// 监听子节点的变化
		List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

		// 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
		if (childrenNodes.size() == 1) {
			doSomething();
		}
	}

	/**
	 * 共享资源的访问逻辑写在这个方法中
	 */
	private void doSomething() throws Exception {
		try {
			System.out.println("gain lock: " + thisPath);
			Thread.sleep(2000);
			// do something
		} finally {
			System.out.println("finished: " + thisPath);
			// 将thisPath删除, 监听thisPath的client将获得通知
			// 相当于释放锁
			zk.delete(this.thisPath, -1);
		}
	}

	public static void main(String[] args) throws Exception {
		for (int i = 0; i < 10; i++) {
			new Thread() {
				public void run() {
					try {
						DistributedClient2 dl = new DistributedClient2();
						dl.connectZookeeper();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}.start();
		}

		Thread.sleep(Long.MAX_VALUE);
	}
}


为什么不推荐这个方案呢? 是因为每次子节点的增加和删除都要广播给所有client, client数量不多时还看不出问题. 如果存在很多client, 那么就可能导致广播风暴--过多的广播通知阻塞了网络. 使用第一个方案, 会使得通知的数量大大下降. 当然第一个方案更复杂一些, 复杂的方案同时也意味着更容易引进bug.
分享到:
评论

相关推荐

    zookeeper分布式锁实现和客户端简单实现

    **Zookeeper的分布式锁实现原理** 1. **节点创建与监视**: Zookeeper允许客户端创建临时节点,这些节点会在客户端断开连接时自动删除。分布式锁的实现通常会为每个请求创建一个临时顺序节点,按照创建的顺序形成一...

    基于zookeeper的分布式锁实现demo

    **Zookeeper分布式锁的关键特性包括:** 1. **顺序一致性:** Zookeeper中的节点被创建顺序是全局唯一的,这有助于实现锁的唯一性。 2. **原子性:** 创建和删除节点的操作在Zookeeper中都是原子性的,这保证了...

    基于zookeeper的分布式锁简单实现

    这时,Zookeeper,一个高可用的分布式协调服务,常被用来实现分布式锁。 Zookeeper由Apache基金会开发,它提供了一种可靠的分布式一致性服务,包括命名服务、配置管理、集群同步、领导者选举等功能。Zookeeper基于...

    使用ZooKeeper实现分布式锁

    在处理订单生成的场景中,我们可以这样应用ZooKeeper分布式锁: 1. 当用户发起订单请求时,服务端会尝试在ZooKeeper上创建一个临时顺序节点。 2. 如果创建成功,服务端会检查当前最小序号的节点是否是自己创建的。...

    一文彻底理解ZooKeeper分布式锁的实现原理

    《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...

    zookeeper做分布式锁

    分布式锁是解决多节点系统中同步问题的一种常见技术,ZooKeeper,由Apache基金会开发的分布式协调服务,常被用于实现高效可靠的分布式锁。本文将深入探讨如何利用ZooKeeper来构建分布式锁,并讨论其背后的关键概念和...

    zookeeper 分布式锁的实现1

    Zookeeper 的分布式锁实现依赖于其核心特性: 1. **节点的互斥性**:Zookeeper 允许客户端创建临时节点(EPHEMERAL)。这些节点在客户端会话结束(比如客户端宕机)时会被自动删除。通过竞争创建特定路径下的临时...

    C#基于zookeeper分布式锁的实现源码

    总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...

    基于zookeeper实现分布式锁

    zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是集群的管理者。提供了文件系统和通知机制。...在开发项目的过程中,很多大型项目都是分布式部署的,那么我们现在使用zookeeper实现一个分布式锁。

    zookeeper分布式锁实例源码

    在这个场景下,我们将关注ZooKeeper如何实现分布式锁,特别是不可重入锁、可重入锁以及可重入读写锁的概念与实践。 首先,我们要理解什么是分布式锁。在多节点并发访问共享资源时,分布式锁能确保同一时刻只有一个...

    从Paxos到Zookeeper分布式一致性原理与实践PDF

    《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,其中重点讲解了Paxos算法与Zookeeper在实际应用中的理论与实践。Paxos是分布式计算领域中著名的共识算法,为解决分布式...

    zookeeper分布式锁

    Zookeeper分布式锁的工作原理: 1. **会话和临时节点**:Zookeeper支持两种类型的节点,持久节点和临时节点。临时节点在客户端会话失效(例如,客户端崩溃或网络断开)时会被自动删除,这为实现分布式锁提供了一个...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践

    2. 分布式锁:利用ZNODE的创建、删除操作,实现跨节点的互斥访问控制。 3. 配置管理:集中式存储和分发系统的配置信息,保证所有节点访问的配置是一致的。 4. 命名服务:为分布式组件提供全局唯一的ID或名称,简化...

    从Paxos到Zookeeper分布式一致性原理与实践 + ZooKeeper-分布式过程协同技术详解 pdf

    《从Paxos到Zookeeper分布式一致性原理与实践》与《ZooKeeper-分布式过程协同技术详解》这两本书深入探讨了分布式系统中的一个重要概念——一致性,以及如何通过ZooKeeper这一工具来实现高效的分布式协同。...

    zookeeper实现分布式锁

    在程序开发过程中不得不考虑的就是并发问题。在java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争...分布式锁顾明思议就是可以满足分布式情况下的并发锁。

    从Paxos到Zookeeper分布式一致性原理与实践包括源码

    Zookeeper基于Paxos和其他一致性算法的实现,为分布式应用程序提供了命名服务、配置管理、分布式锁、群组服务等功能。Zookeeper通过ZNode(类似于文件系统的节点)来存储和操作数据,并采用观察者模式来实时监控数据...

    ZooKeeper分布式过程协同技术详解_new.pdf

    此外,书中还会深入探讨ZooKeeper在实际应用场景中的最佳实践,如如何利用ZooKeeper进行服务发现、实现分布式锁、构建分布式队列等。通过实例分析,读者可以更好地掌握ZooKeeper在分布式系统中的作用和价值。 在...

    zk分布式锁1

    ZooKeeper是一个广泛使用的分布式锁实现方案,本文将对ZooKeeper分布式锁进行详细的介绍。 什么是分布式锁 分布式锁是指在分布式系统中,多个节点之间对共享资源的访问控制机制。分布式锁可以确保在分布式环境中,...

    springboot zookeeper 分布式锁

    以下是一个简单的基于Zookeeper的分布式锁实现示例: ```java @Service public class ZookeeperDistributedLock { @Autowired private CuratorFramework curatorFramework; private String lockPath = "/...

    浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    Java(SpringBoot)基于zookeeper的分布式锁实现 本文主要介绍了Java(SpringBoot)基于zookeeper的分布式锁实现,通过示例代码详细介绍了分布式锁的实现过程,对大家的学习或者工作具有一定的参考学习价值。 分布式锁...

Global site tag (gtag.js) - Google Analytics