`

zookeeper 共享锁代码的实现

 
阅读更多
zookeeper存储是树状结构,如/path1/path2(这里path1必须是PERSISTENT,否则path2不能创建),
节点有三种状态:
1、EPHEMERAL临时节点,server删除后该节点也会删除,但是我测试的时候,如果将server强制删除,EPHEMERAL节点会延迟删除;
2、PERSISTENT节点,这个是持久保存的节点。
3、SEQUENTIAL节点,自动标号的节点
如果父节点有子节点,父节点将不能删除。


watch是zookeeper最核心的一个功能;
watch = true;
监听某path1节点目录下是否创建删除子节点
zookeeper.getChildren("/path1", watch );
监听path2是否删除、修改
zookeeper.exists(path2,true);
zookeeper.getData(path2,true, null);




zookeeper能很好的协调分布式应用的任务执行,比如当多个应用同时处理分布式存储内存里面KEY VALUE、MQ时。锁只是其中一种表现。还有zookeeper 选举,集群管理,队列管理等;



public abstract class ZkLock implements Watcher {
private ZooKeeper zk;
private String hosts = "192.168.1.202:2181";
private String lockName = null;
private String nodePath = null;
private String nodeName = null;
private boolean extra = false;
private Integer SESSION_TIMEOUT = 5000;
private String waitPath = null;
public CountDownLatch latch = new CountDownLatch(1);

public ZkLock(String lockName,ZooKeeper zk) throws Exception{
this.lockName = StringUtils.contains(lockName, "/")?lockName:"/"+lockName;
if(zk==null){
  zk = new ZooKeeper(hosts, SESSION_TIMEOUT,this);
}
this.zk = zk;
if(zk.exists(this.lockName, false)==null){
zk.create(this.lockName, this.lockName.getBytes()
, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
nodePath = zk.create(this.lockName + "/lock-", new byte[0],Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
nodeName = StringUtils.substringAfter(nodePath, this.lockName+"/");
}

public ZkLock(String lockName) throws Exception{
this(lockName,null);
}
@Override
public void process(WatchedEvent event) {
latch.countDown();
if (extra&&event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
action();
        }
}

private Object exec() throws Exception{
List<String> list = zk.getChildren(lockName, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if(StringUtils.equals(nodeName, nodes[0])){
Object obj = action();
return obj;
}else{
return waitLock(nodes[0]);
}
}

private Object waitLock(String lower) throws Exception{
Stat stat = zk.exists(lockName + "/" + lower,true);
if(stat!=null){
latch.await();
}
return this.exec();
}

//这里的锁,主线程后面要加Thread.sleep()
private void extraExec() throws Exception{
        latch.await();
        List<String> childrenNodes = zk.getChildren(lockName, false);
        if (childrenNodes.size() == 1) {
        action();
        } else {
          Collections.sort(childrenNodes);
              int index = childrenNodes.indexOf(nodeName);
            if (index == -1) {
                // never happened
            } else if (index == 0) {
            action();
            } else {
//            System.out.println("waitPath:"+childrenNodes.get(index - 1));
            this.waitPath = this.lockName + "/" + childrenNodes.get(index - 1);
                // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法
            if(zk.exists(waitPath, true)==null){
            action();
                 }
            }
        }
}

private Object action() {
try{
return doAction();
}finally{
try {
zk.delete(nodePath, -1);
// zk.close();
} catch (Exception e) {
throw new RuntimeException(e);
}

}
}

/**
* 主程必须守候,因为是与脱离主线程,所以无返回值。
* @return
* @throws Exception
*/
public void extraExecute() throws Exception{
this.extra = true;
this.extraExec();
}

public Object execute() throws Exception{
return exec();
}

public abstract Object doAction() ;


public static void main(String[] args) throws Exception {
Object obj = null;
final CountDownLatch startSignal = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {//zookeeper默认是一个应用10个连接
            new Thread() {
                public void run() {
                ZkLock lock;
try {
lock = new ZkLock("test") {
@Override
public Object doAction() {
return null;
}
};
// startSignal.await();
lock.execute();
                                              //  lock.extraExecute();
                                      //Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
                }
            }.start();
            startSignal.countDown();
        }
}

}
分享到:
评论

相关推荐

    基于zookeeper的分布式锁实现demo

    4. **锁节点的创建与检查:** 示例代码中创建了两个锁节点`/ExclusiveLockDemo`和`/ShardLockDemo`,分别用于实现非共享和共享锁。 5. **锁的等待机制:** `CountDownLatch`被用来阻塞锁获取操作,直到监听到锁可用...

    基于zookeeper的分布式锁简单实现

    - **测试代码**:展示了如何在实际应用中使用Zookeeper实现分布式锁的示例,包括创建锁、获取锁、释放锁以及异常处理等操作。 - **实用工具类**:封装了与Zookeeper交互的常用方法,如创建节点、设置监听、检查节点...

    zookeeper分布式锁实例源码

    通过分析和理解这些源代码,你可以更好地掌握如何在实际项目中利用ZooKeeper实现高效、可靠的分布式锁机制。 在实践中,ZooKeeper的分布式锁解决方案需要考虑的方面还包括异常处理(如网络分区)、锁的超时机制、...

    zookeeper的分布式全局锁纯代码解决方案

    在这个"zookeeper的分布式全局锁纯代码解决方案"中,我们将探讨如何利用Zookeeper来构建一套易于理解和扩展的分布式锁。 首先,Zookeeper的特性使其成为实现分布式锁的理想选择。Zookeeper提供了原子性的读写操作、...

    基于zookeeper临时顺序节点实现分布式调度

    在`ets-schedule`压缩包中,可能包含的是实现这个分布式调度方案的相关代码和配置文件,包括Quartz的配置、Zookeeper的连接代码以及任务调度逻辑等。通过对这些文件的分析和理解,我们可以深入学习如何在实际项目中...

    zookeeper客户端Java代码示例.zip

    在分布式环境中,Zookeeper可以用来实现共享锁。例如,基于临时顺序节点的独占锁: ```java String lockPath = "/lock"; String lockId = zookeeper.create(lockPath + "/lock-", "", ZooDefs.Ids.OPEN_ACL_UNSAFE, ...

    java使用zookeeper实现的分布式锁示例

    在分布式锁的场景中,ZooKeeper可以作为一个中心化的锁服务,帮助各个节点之间协调并发操作,防止多个节点同时访问共享资源。 在提供的代码示例中,我们看到一个名为`DistributedLock`的类,它实现了Java的`Lock`...

    springboot redis zookeeperlock rabbit实现的分布式锁.zip

    在分布式锁中,Redis常被用作共享锁的存储介质,因为它支持原子操作如`SETNX`(设置如果不存在)和`EXPIRE`(设置过期时间),可以保证在高并发环境下的锁操作一致性。 3. **Zookeeper**:Apache ZooKeeper是一个...

    ZookeeperNet实现分布式锁

    在提供的"ZookeeperConsole"项目中,你可以找到具体实现分布式锁的代码示例。通常,它会包含以下几个关键部分: - 连接Zookeeper服务器:`ZooKeeper.Connect("server地址", sessionTimeout)` - 创建临时节点:`...

    zookeeper 使用 Curator 示例监听、分布式锁

    1. 读写锁:Curator 提供了 `InterProcessMutex`(互斥锁) 和 `InterProcessReadWriteLock`(读写锁) 类,用于实现分布式环境中的独占锁和共享锁。通过竞争 ZooKeeper 节点的持有权来达到锁定目的。 2. 分布式锁流程...

    zookeeper实现分布式锁

    基于 Zookeeper 的分布式锁的代码实现 定义分布式锁接口 public interface DistributedLock { /获取锁,如果没有得到就等待*/ public void acquire() throws Exception; / * 获取锁,直到超时 * @param ...

    java中的zookeeper

    在`05_zk实现分布式锁的原理和实现.docx`中,应该详细讲解了这两种实现方式的原理和代码示例。 **使用Java操作ZooKeeper** Java提供了ZooKeeper的API,使得我们可以方便地进行节点的创建、读取、更新和删除操作。`...

    分布式锁/信号量的实现方式:基于Redis、Zookeeper 、etcd 的分布式锁 确保在分布式环境中安全地访问共享资源

    下面将详细介绍基于Redis、Zookeeper以及etcd的分布式锁实现方式。 #### 二、基于Redis的分布式锁实现 Redis作为一款高性能的键值存储系统,非常适合用来实现分布式锁。以下是一个简单的示例代码: - **获取锁**...

    ZOOKEEPER3.4.5

    在实际项目中,开发者可以通过解压`zookeeper-3.4.5`压缩包,了解ZooKeeper的源代码,学习其实现原理,以便更好地运用到自己的分布式系统设计中。 总之,ZooKeeper 3.4.5 在服务治理和分布式部署中的作用不可忽视,...

    zookeeper 自己学习资料

    《分布式服务框架 Zookeeper -- 管理分布式环境中的数据 - 给自己一片纯净的天空 - 开源中国社区_files》这个文件夹可能是上述网页的附件,包含图表、图片或者代码示例,用于辅助理解Zookeeper的工作机制和应用场景...

    zookeeper工具

    ZooKeeper的主要功能包括命名服务、配置管理、分布式锁、集群状态监控等,这些都是分布式应用程序在处理节点间通信和数据共享时不可或缺的组件。 1. **命名服务**:ZooKeeper提供了一种全局唯一的命名空间,使得...

    zk实现的分布式锁相关代码

    在提供的代码示例中,我们展示了如何使用Apache Curator来实现一个健壮的分布式锁。这个实现不仅能够处理锁的基本获取和释放,还加入了网络问题和服务中断的容错处理。 首先,我们通过CuratorFrameworkFactory创建...

    zookeeper-3.4.8.rar

    - **编写客户端代码**: 使用Zookeeper的Java API在Tomcat应用中实现所需功能,如创建ZNode、设置Watcher等。 5. **最佳实践** - **监控与日志**: 集成Zookeeper的监控工具,如ZK-Console或ZK-Admin,以便实时查看...

    zookeeper-server-client 简单例子

    Zookeeper 是一个分布式协调服务,由 Apache 开发并维护,它为分布式应用程序提供了高效且可靠的命名服务、配置管理、集群同步、分布式锁等核心功能。Zookeeper 的设计目标是简化分布式环境下的数据一致性问题,使得...

    zookeeper-3.3.2.tar.gz

    4. 分布式同步:Zookeeper提供了分布式锁和队列,使得多个分布式进程可以同步执行任务。这在处理并发操作或者实现分布式事务时尤其重要。 5. 领导者选举:在Zookeeper集群中,只有一个节点是领导者,负责处理所有的...

Global site tag (gtag.js) - Google Analytics