`
chenjingbo
  • 浏览: 459981 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于zookeeper的分布式锁实现

阅读更多

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

 

准备工作

有几个帮助类,先把代码放上来

ZKClient 对zk的操作做了一个简单的封装

 

package zk.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import zk.util.ZKUtil;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * User: zhenghui
 * Date: 14-3-26
 * Time: 下午8:50
 * 封装一个zookeeper实例.
 */
public class ZKClient implements Watcher {

    private ZooKeeper zookeeper;

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);


    public ZKClient(String connectString, int sessionTimeout) throws Exception {
        zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
        System.out.println("connecting zk server");
        if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
            System.out.println("connect zk server success");
        } else {
            System.out.println("connect zk server error.");
            throw new Exception("connect zk server error.");
        }
    }

    public void close() throws InterruptedException {
        if (zookeeper != null) {
            zookeeper.close();
        }
    }

    public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
        CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
        path = ZKUtil.normalize(path);
        if (!this.exists(path)) {
            zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
        }
    }

    public boolean exists(String path) throws Exception {
        path = ZKUtil.normalize(path);
        Stat stat = zookeeper.exists(path, null);
        return stat != null;
    }

    public String getData(String path) throws Exception {
        path = ZKUtil.normalize(path);
        try {
            byte[] data = zookeeper.getData(path, null, null);
            return new String(data);
        } catch (KeeperException e) {
            if (e instanceof KeeperException.NoNodeException) {
                throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
            } else {
                throw new Exception(e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new Exception(e);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event == null) return;

        // 连接状态
        Watcher.Event.KeeperState keeperState = event.getState();
        // 事件类型
        Watcher.Event.EventType eventType = event.getType();
        // 受影响的path
//        String path = event.getPath();
        if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
            // 成功连接上ZK服务器
            if (Watcher.Event.EventType.None == eventType) {
                System.out.println("zookeeper connect success");
                connectedSemaphore.countDown();
            }
        }
        //下面可以做一些重连的工作.
        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
            System.out.println("zookeeper Disconnected");
        } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
            System.out.println("zookeeper AuthFailed");
        } else if (Watcher.Event.KeeperState.Expired == keeperState) {
            System.out.println("zookeeper Expired");
        }
    }
}

 ZKUtil 针对zk路径的一个工具类

package zk.util;

/**
 * User: zhenghui
 * Date: 14-3-26
 * Time: 下午9:56
 */
public class ZKUtil {

    public static final String SEPARATOR = "/";

    /**
     * 转换path为zk的标准路径 以/开头,最后不带/
     */
    public static String normalize(String path) {
        String temp = path;
        if(!path.startsWith(SEPARATOR)) {
            temp = SEPARATOR + path;
        }
        if(path.endsWith(SEPARATOR)) {
            temp = temp.substring(0, temp.length()-1);
            return normalize(temp);
        }else {
            return temp;
        }
    }

    /**
     * 链接两个path,并转化为zk的标准路径
     */
    public static String contact(String path1,String path2){
        if(path2.startsWith(SEPARATOR)) {
            path2 = path2.substring(1);
        }
        if(path1.endsWith(SEPARATOR)) {
            return normalize(path1 + path2);
        } else {
            return normalize(path1 + SEPARATOR + path2);
        }
    }

    /**
     * 字符串转化成byte类型
     */
    public static byte[] toBytes(String data) {
        if(data == null || data.trim().equals("")) return null;
        return data.getBytes();
    }
}

 NetworkUtil 获取本机IP的工具方法

package zk.util;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;

/**
 * User: zhenghui
 * Date: 14-4-1
 * Time: 下午4:47
 */
public class NetworkUtil {

    static private final char COLON = ':';

    /**
     * 获取当前机器ip地址
     * 据说多网卡的时候会有问题.
     */
    public static String getNetworkAddress() {
        Enumeration<NetworkInterface> netInterfaces;
        try {
            netInterfaces = NetworkInterface.getNetworkInterfaces();
            InetAddress ip;
            while (netInterfaces.hasMoreElements()) {
                NetworkInterface ni = netInterfaces
                        .nextElement();
                Enumeration<InetAddress> addresses=ni.getInetAddresses();
                while(addresses.hasMoreElements()){
                    ip = addresses.nextElement();
                    if (!ip.isLoopbackAddress()
                            && ip.getHostAddress().indexOf(COLON) == -1) {
                        return ip.getHostAddress();
                    }
                }
            }
            return "";
        } catch (Exception e) {
            return "";
        }
    }
}

 

--------------------------- 正文开始  -----------------------------------

这种实现非常简单,具体的流程如下



 对应的实现如下

package zk.lock;


import zk.util.NetworkUtil;
import zk.util.ZKUtil;

/**
 * User: zhenghui
 * Date: 14-3-26
 * Time: 下午8:37
 * 分布式锁实现.
 *
 * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
 * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
 */
public class DistributedLock01 {

    private ZKClient zkClient;


    public static final String LOCK_ROOT = "/lock";
    private String lockName;


    public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
        //先创建zk链接.
        this.createConnection(connectString,sessionTimeout);

        this.lockName = lockName;
    }

    public boolean tryLock(){
        String path = ZKUtil.contact(LOCK_ROOT,lockName);
        String localIp = NetworkUtil.getNetworkAddress();
        try {
            if(zkClient.exists(path)){
                String ownnerIp = zkClient.getData(path);
                if(localIp.equals(ownnerIp)){
                    return true;
                }
            } else {
                zkClient.createPathIfAbsent(path,false);
                if(zkClient.exists(path)){
                    String ownnerIp = zkClient.getData(path);
                    if(localIp.equals(ownnerIp)){
                        return true;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }


    /**
     * 创建zk连接
     *
     */
    protected void createConnection(String connectString, int sessionTimeout) throws Exception {
        if(zkClient != null){
            releaseConnection();
        }
        zkClient = new ZKClient(connectString,sessionTimeout);
        zkClient.createPathIfAbsent(LOCK_ROOT,true);
    }
    /**
     * 关闭ZK连接
     */
    protected void releaseConnection() throws InterruptedException {
        if (zkClient != null) {
            zkClient.close();
        }
    }

}

 

总结

网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更..这种方式我个人觉得有点繁琐,实现起来有点麻烦.具体的现有实现可以查看

https://svn.apache.org/repos/asf/zookeeper/trunk/src/recipes/lock/

其他的,想了半天没啥好说的..over

 

 

  • 大小: 13.2 KB
分享到:
评论
5 楼 wangyaopeng1992 2016-05-18  
同一机器不同线程都会获取锁,有并发问题
4 楼 icanfly 2016-01-05  
我只能说你实现的有严重的并发问题,并没有考虑并发的情况。
3 楼 aiheng1988 2015-06-11  
若多个客户端同时调用tryLock去获取锁,最开始不存在锁的节点,这样都会去创建节点,不会报节点冲突?
2 楼 Mynameisyuan 2015-05-21  
1 楼 其疾如风 2014-08-29  
release lock就是关闭zk连接么?要是zk client连接关闭失败,server要等待session timeout才删除这个临时节点,那么这段时间内其他人也拿不到锁了?

相关推荐

    基于zookeeper的分布式锁实现demo

    **基于Zookeeper的分布式锁实现机制:** - **锁的节点创建:** 当客户端尝试获取锁时,它会在Zookeeper上创建一个临时顺序节点,节点名通常包含客户端标识,以便区分不同客户端。 - **节点的顺序性:** 由于...

    基于zookeeper的分布式锁简单实现

    实现基于Zookeeper的分布式锁,主要涉及以下核心概念: 1. **节点创建**:在Zookeeper中,每个锁可以表示为一个临时节点。当客户端需要获取锁时,会在特定路径下创建一个临时节点。由于Zookeeper的顺序性,创建的...

    C#基于zookeeper分布式锁的实现源码

    总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...

    基于zookeeper实现分布式锁

    zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是集群的管理者。提供了文件系统和通知机制。...在开发项目的过程中,很多大型项目都是分布式部署的,那么我们现在使用zookeeper实现一个分布式锁。

    zookeeper做分布式锁

    基于一致性模型,ZooKeeper确保了数据的一致性和有序性,使其成为构建分布式锁的理想选择。 **2. 分布式锁原理** 分布式锁的目的是在多个节点之间实现对共享资源的互斥访问。在ZooKeeper中,通常通过创建临时顺序...

    zk分布式锁1

    ZooKeeper分布式锁的实现基于ZooKeeper的节点机制。ZooKeeper的节点可以分为四种类型:持久节点、临时节点、顺序节点和ephemeral节点。其中,临时节点和ephemeral节点可以被用来实现分布式锁。 临时节点实现分布式...

    浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    Java(SpringBoot)基于zookeeper的分布式锁实现 本文主要介绍了Java(SpringBoot)基于zookeeper的分布式锁实现,通过示例代码详细介绍了分布式锁的实现过程,对大家的学习或者工作具有一定的参考学习价值。 分布式锁...

    zookeeper分布式锁实例源码

    ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...

    一文彻底理解ZooKeeper分布式锁的实现原理

    《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...

    从Paxos到Zookeeper分布式一致性原理与实践PDF

    《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,其中重点讲解了Paxos算法与Zookeeper在实际应用中的理论与实践。Paxos是分布式计算领域中著名的共识算法,为解决分布式...

    redis和zookeeper实现分布式锁的区别

    **Zookeeper分布式锁** Zookeeper是一个高可用的分布式协调服务,它维护了一致的命名空间和分布式事件通知。Zookeeper中的分布式锁通常通过创建临时节点(ephemeral nodes)来实现。当客户端创建一个临时节点并持有...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践

    2. 分布式锁:利用ZNODE的创建、删除操作,实现跨节点的互斥访问控制。 3. 配置管理:集中式存储和分发系统的配置信息,保证所有节点访问的配置是一致的。 4. 命名服务:为分布式组件提供全局唯一的ID或名称,简化...

    从Paxos到Zookeeper分布式一致性原理与实践 + ZooKeeper-分布式过程协同技术详解 pdf

    ZooKeeper基于Paxos等一致性算法,实现了简单易用的接口,使得开发者无需关心底层复杂的共识机制,就能轻松实现诸如命名服务、配置管理、集群同步、分布式锁等功能。ZooKeeper的数据模型是一个层次化的命名空间,...

    从Paxos到Zookeeper分布式一致性原理与实践包括源码

    Zookeeper基于Paxos和其他一致性算法的实现,为分布式应用程序提供了命名服务、配置管理、分布式锁、群组服务等功能。Zookeeper通过ZNode(类似于文件系统的节点)来存储和操作数据,并采用观察者模式来实时监控数据...

    ZooKeeper分布式过程协同技术详解_new.pdf

    此外,书中还会深入探讨ZooKeeper在实际应用场景中的最佳实践,如如何利用ZooKeeper进行服务发现、实现分布式锁、构建分布式队列等。通过实例分析,读者可以更好地掌握ZooKeeper在分布式系统中的作用和价值。 在...

    吊打面试官之基于zookeeper实现分布式锁源码

    总结,基于ZooKeeper实现的分布式锁方案,充分利用了ZooKeeper的特性,提供了高可用、高性能的锁服务。在SpringCloud框架下,可以轻松地整合ZooKeeper,简化分布式系统的开发。不过,在实际应用中,还需要根据业务...

    springboot zookeeper 分布式锁

    以下是一个简单的基于Zookeeper的分布式锁实现示例: ```java @Service public class ZookeeperDistributedLock { @Autowired private CuratorFramework curatorFramework; private String lockPath = "/...

    基于zookeeper实现的分布式读写锁

    在分布式系统中,数据一致性是至关重要的,而实现这一目标的一种常见方法是使用分布式锁。本文将深入探讨基于Zookeeper实现的分布式读写锁,并利用Zkclient客户端进行操作。Zookeeper是一个分布式服务协调框架,它...

    Zookeeper 分布式重入排它锁实现

    Zookeeper提供了基于其强大的数据一致性特性的分布式锁实现,其中包括了重入排他锁。 **一、Zookeeper的分布式锁原理** 1. **节点创建与删除**:在Zookeeper中,锁的实现通常依赖于临时节点(ephemeral nodes)。...

    ZookeeperNet实现分布式锁

    **分布式锁实现** 分布式锁是解决分布式系统中多个进程或线程对共享资源的并发访问问题的关键技术。以下是如何使用ZookeeperNet实现分布式锁的步骤: 1. **创建临时节点**:每个客户端在尝试获取锁时,会在特定的...

Global site tag (gtag.js) - Google Analytics