`

使用curator操作zookeeper

 
阅读更多

转自:http://blog.csdn.net/lzy_lizhiyang/article/details/48518731 

使用Java操作zookeeper时,一般有两种方式:使用zkclient或者curator,相比较来说,curator的使用较为简便。今天就来看看如何使用curator来操作zookeeper。

     需要的依赖如下:

 

 

<dependency>  
        <groupId>org.apache.curator</groupId>  
        <artifactId>curator-framework</artifactId>  
        <version>2.8.0</version>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.curator</groupId>  
        <artifactId>curator-client</artifactId>  
        <version>2.8.0</version>  
    </dependency>  
<dependency>  
        <groupId>org.apache.zookeeper</groupId>  
       <artifactId>zookeeper</artifactId>  
       <version>3.4.6</version>  
</dependency>  
  
<dependency>  
         <groupId>com.google.guava</groupId>  
        <artifactId>guava</artifactId>  
        <version>16.0.1</version>  
</dependency>  
 

 

 

     首先,我们创建一个客户端类,来真正的操作zookeeper,包括创建连接、创建节点,写入值、读取值等。

 

import java.util.ArrayList;  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Random;  
import java.util.Set;  
import java.util.concurrent.ConcurrentHashMap;  
import java.util.concurrent.CopyOnWriteArraySet;  
  
import org.apache.commons.lang.StringUtils;  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.CuratorFrameworkFactory;  
import org.apache.curator.framework.api.CuratorWatcher;  
import org.apache.curator.framework.state.ConnectionState;  
import org.apache.curator.framework.state.ConnectionStateListener;  
import org.apache.curator.retry.RetryNTimes;  
import org.apache.zookeeper.CreateMode;  
import org.apache.zookeeper.WatchedEvent;  
import org.apache.zookeeper.Watcher.Event;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
  
  
public class CuratorZookeeperClient {  
      
    private final int CONNECT_TIMEOUT = 15000;  
    private final int RETRY_TIME = Integer.MAX_VALUE;  
    private final int RETRY_INTERVAL = 1000;  
    private static final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class);  
    private CuratorFramework curator;  
      
    private volatile static CuratorZookeeperClient instance;  
      
    /** 
     * key:父路径,如/jobcenter/client/goodscenter 
     * value:Map-->key:子路径,如/jobcenter/client/goodscenter/goodscenter00000001 
     *              value:路径中的值 
     */  
    private static ConcurrentHashMap<String,Map<String,String>> zkCacheMap = new ConcurrentHashMap<String,Map<String,String>>();  
      
    public static Map<String,Map<String,String>> getZkCacheMap() {  
        return zkCacheMap;  
    }  
      
    private CuratorFramework newCurator(String zkServers) {  
        return CuratorFrameworkFactory.builder().connectString(zkServers)  
                .retryPolicy(new RetryNTimes(RETRY_TIME, RETRY_INTERVAL))  
                .connectionTimeoutMs(CONNECT_TIMEOUT).build();  
    }  
      
    private CuratorZookeeperClient(String zkServers) {  
        if(curator == null) {  
            curator = newCurator(zkServers);  
            curator.getConnectionStateListenable().addListener(new ConnectionStateListener() {  
                public void stateChanged(CuratorFramework client, ConnectionState state) {  
                    if (state == ConnectionState.LOST) {  
                        //连接丢失  
                        logger.info("lost session with zookeeper");  
                    } else if (state == ConnectionState.CONNECTED) {  
                        //连接新建  
                        logger.info("connected with zookeeper");  
                    } else if (state == ConnectionState.RECONNECTED) {  
                        logger.info("reconnected with zookeeper");  
                        //连接重连  
                        for(ZkStateListener s:stateListeners){  
                            s.reconnected();  
                        }  
                    }  
                }  
            });  
            curator.start();  
        }  
    }  
      
    public static CuratorZookeeperClient getInstance(String zkServers) {  
        if(instance == null) {  
            synchronized(CuratorZookeeperClient.class) {  
                if(instance == null) {  
                    logger.info("initial CuratorZookeeperClient instance");  
                    instance = new CuratorZookeeperClient(zkServers);  
                }  
            }  
        }  
        return instance;  
    }  
      
    /** 
     * 写数据:/docker/jobcenter/client/app/app0..../app1...../app2 
     * @param path 
     * @param content 
     *  
     * @return 返回真正写到的路径 
     * @throws Exception 
     */  
    public String write(String path,String content) throws Exception {  
        StringBuilder sb = new StringBuilder(path);  
        String writePath = curator.create().creatingParentsIfNeeded()  
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)  
            .forPath(sb.toString(), content.getBytes("utf-8"));  
        return writePath;  
    }  
      
    /** 
     * 随机读取一个path子路径 
     * 先从cache中读取,如果没有,再从zookeeper中查询 
     * @param path 
     * @return 
     * @throws Exception 
     */  
    public String readRandom(String path) throws Exception {  
        String parentPath = path;  
        Map<String,String> cacheMap = zkCacheMap.get(path);  
        if(cacheMap != null && cacheMap.size() > 0) {  
            logger.debug("get random value from cache,path="+path);  
            return getRandomValue4Map(cacheMap);  
        }  
        if(curator.checkExists().forPath(path) == null) {  
            logger.debug("path [{}] is not exists,return null",path);  
            return null;  
        } else {  
            logger.debug("read random from zookeeper,path="+path);  
            cacheMap = new HashMap<String,String>();  
            List<String> list = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path);  
            if(list == null || list.size() == 0) {  
                logger.debug("path [{}] has no children return null",path);  
                return null;  
            }  
            Random rand = new Random();  
            String child = list.get(rand.nextInt(list.size()));  
            path = path + "/" + child;  
            byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,path))  
                                .forPath(path);  
            String value = new String(b,"utf-8");  
            if(StringUtils.isNotBlank(value)) {  
                cacheMap.put(path, value);  
                zkCacheMap.put(parentPath, cacheMap);  
            }  
            return value;  
        }  
    }  
      
    /** 
     * 读取path下所有子路径下的内容 
     * 先从map中读取,如果不存在,再从zookeeper中查询 
     * @param path 
     * @return 
     * @throws Exception 
     */  
    public List<String> readAll(String path) throws Exception {  
        String parentPath = path;  
        Map<String,String> cacheMap = zkCacheMap.get(path);  
        List<String> list = new ArrayList<String>();  
        if(cacheMap != null) {  
            logger.debug("read all from cache,path="+path);  
            list.addAll(cacheMap.values());  
            return list;  
        }  
        if(curator.checkExists().forPath(path) == null) {  
            logger.debug("path [{}] is not exists,return null",path);  
            return null;  
        } else {  
            cacheMap = new HashMap<String,String>();  
            List<String> children = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path);  
            if(children == null || children.size() == 0) {  
                logger.debug("path [{}] has no children,return null",path);  
                return null;  
            } else {  
                logger.debug("read all from zookeeper,path="+path);  
                String basePath = path;  
                for(String child : children) {  
                    path = basePath + "/" + child;  
                    byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,path))  
                            .forPath(path);  
                    String value = new String(b,"utf-8");  
                    if(StringUtils.isNotBlank(value)) {  
                        list.add(value);  
                        cacheMap.put(path, value);  
                    }  
                }  
            }  
            zkCacheMap.put(parentPath, cacheMap);  
            return list;  
        }  
    }  
      
    /** 
     * 随机获取Map中的一个值 
     * @param map 
     * @return 
     */  
    private String getRandomValue4Map(Map<String,String> map) {  
        Object[] values = map.values().toArray();  
        Random rand = new Random();  
        return values[rand.nextInt(values.length)].toString();  
    }  
      
    public void delete(String path) throws Exception {  
        if(curator.checkExists().forPath(path) != null) {  
            curator.delete().inBackground().forPath(path);  
            zkCacheMap.remove(path);  
        }  
    }  
      
    /** 
     * 获取路径下的所有子路径 
     * @param path 
     * @return 
     */  
    public List<String> getChildren(String path) throws Exception {  
        if(curator.checkExists().forPath(path) == null) {  
            logger.debug("path [{}] is not exists,return null",path);  
            return null;  
        } else {  
            List<String> children = curator.getChildren().forPath(path);  
            return children;  
        }  
    }  
      
    public void close() {  
        if(curator != null) {  
            curator.close();  
            curator = null;  
        }  
        zkCacheMap.clear();  
    }  
      
    /** 
     * zookeeper监听节点数据变化 
     * @author lizhiyang 
     * 
     */  
    private class ZKWatcher implements CuratorWatcher {  
        private String parentPath;  
        private String path;  
        public ZKWatcher(String parentPath,String path) {  
            this.parentPath = parentPath;  
            this.path = path;  
        }  
  
        public void process(WatchedEvent event) throws Exception {  
            Map<String,String> cacheMap = zkCacheMap.get(parentPath);  
            if(cacheMap == null) {  
                cacheMap = new HashMap<String,String>();  
            }  
            if(event.getType() == Event.EventType.NodeDataChanged   
                     || event.getType() == Event.EventType.NodeCreated){  
                 byte[] data = curator.getData().  
                        usingWatcher(this).forPath(path);  
                 cacheMap.put(path, new String(data,"utf-8"));  
                 logger.info("add cache={}",new String(data,"utf-8"));  
            } else if(event.getType() == Event.EventType.NodeDeleted) {  
                 cacheMap.remove(path);  
                 logger.info("remove cache path={}",path);  
            } else if(event.getType() == Event.EventType.NodeChildrenChanged) {  
                //子节点发生变化,重新进行缓存  
                cacheMap.clear();  
                List<String> children = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path);  
                if(children != null && children.size() > 0) {  
                    for(String child : children) {  
                        String childPath = parentPath + "/" + child;  
                        byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,childPath))  
                                .forPath(childPath);  
                        String value = new String(b,"utf-8");  
                        if(StringUtils.isNotBlank(value)) {  
                            cacheMap.put(childPath, value);  
                        }  
                    }  
                }  
                logger.info("node children changed,recaching path={}",path);  
            }  
            zkCacheMap.put(parentPath, cacheMap);  
        }  
    }  
    private final Set<ZkStateListener> stateListeners = new CopyOnWriteArraySet<ZkStateListener>();  
    public void addStateListener(ZkStateListener listener) {  
        stateListeners.add(listener);  
    }  

 


其中,我们对节点和值进行了缓存,避免频繁的访问zookeeper。在对zookeeper操作时,对连接丢失、连接新建、重连等事件进行了监听,使用到了类ZkStateListener

 

 

public interface ZkStateListener {  
  
    void reconnected();  
  
}  

 

下面,我们就使用ZkDockerService类来封住客户端的操作。

 

import java.util.ArrayList;  
import java.util.HashSet;  
import java.util.List;  
import java.util.Set;  
import java.util.concurrent.Executors;  
import java.util.concurrent.ScheduledExecutorService;  
import java.util.concurrent.ScheduledFuture;  
import java.util.concurrent.TimeUnit;  
  
import org.apache.commons.lang.StringUtils;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
  
/** 
 * jobclient docker改造 
 * 注册应用信息至zookeeper 
 * @author lizhiyang 
 * 
 */  
public class ZkDockerService {  
    private static final Logger logger = LoggerFactory.getLogger(ZkDockerService.class);  
      
    private CuratorZookeeperClient zkClient;  
      
    private Set<String> zkPathList = new HashSet<String>();  
     // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试  
    private  ScheduledFuture<?> retryFuture;  
    // 定时任务执行器  
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1,  
            new NamedThreadFactory("RegistryFailedRetryTimer", true));  
    //需要重新注册的数据  
    private Set<ClientData> retrySet = new HashSet<ClientData>();  
      
    /** 
     * init-method,初始化执行 
     * 将本机docker的IP地址 端口都注册到zookeeper中 
     */  
    public void register2Zookeeper() {  
        try {  
            zkClient = CuratorZookeeperClient.getInstance(ZOOKEEPER_ADDRESS);  
            ClientData client = findClientData();  
            registerClientData(client);  
            zkClient.addStateListener(new ZkStateListener(){  
                @Override  
                public void reconnected() {  
                    ClientData client = findClientData();  
                    //将服务添加到重试列表  
                    retrySet.add(client);  
                }  
            });  
            //启动线程进行重试,1秒执行一次,因为jobcenter的定时触发时间最短的是1秒  
            this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {  
                public void run() {  
                    // 检测并连接注册中心  
                    try {  
                        retryRegister();  
                    } catch (Throwable t) { // 防御性容错  
                        logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);  
                    }  
                }  
            }, 1, 1, TimeUnit.SECONDS);  
              
        } catch (Exception e) {  
            logger.error("zookeeper write exception",e);  
        }         
    }  
  
    /** 
     * destrory-method,销毁时执行 
     */  
    public void destroy4Zookeeper() {  
        logger.info("zkDockerService destrory4Zookeeper path="+zkPathList);  
         try {  
             if(retryFuture != null){  
                 retryFuture.cancel(true);  
             }  
                  
       } catch (Throwable t) {  
           logger.warn(t.getMessage(), t);  
       }  
           
        if(zkPathList != null && zkPathList.size() > 0) {  
            for(String path : zkPathList) {  
                try {  
                    zkClient.delete(path);  
                } catch (Exception e) {  
                    logger.error("zkDockerService destrory4Zookeeper exception",e);  
                }  
            }  
        }  
        zkClient.close();         
    }  
      
        /** 构造要存储的对象 **/  
    private ClientData findClientData() {  
        ClientData client = new ClientData();  
        client.setIpAddress(ip);  
        client.setPort(port);  
        client.setSource(1);  
        return client;  
    }  
        /** 将值写入zookeeper中 **/  
        private void registerClientData(ClientData client) throws Exception{  
            String centerPath = "/server";  
            String content = "";  
            String strServer = zkClient.write(centerPath, content);  
            if(!StringUtils.isBlank(strServer)) {  
                zkPathList.add(strServer);  
            }  
    }  
    /** 
     * 重连到zookeeper时,自动重试 
     */  
    protected synchronized void retryRegister() {  
        if(!retrySet.isEmpty()){  
             logger.info("jobclient  begin retry register client to zookeeper");  
             Set<ClientData> retryClients = new HashSet<ClientData>(retrySet);  
             for(ClientData data :retryClients){  
                 logger.info("retry register="+data);  
                 try {  
                    registerJobcenterClient(data);  
                    retrySet.remove(data);  
                } catch (Exception e) {  
                    logger.error("registerJobcenterClient failed",e);  
                }  
             }  
        }  
    }  
}  

 

其中在使用定时任务时,使用到了NamedThreadFactory,如下:

 

import java.util.concurrent.ThreadFactory;  
import java.util.concurrent.atomic.AtomicInteger;  
  
public class NamedThreadFactory implements ThreadFactory  
{  
    private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);  
  
    private final AtomicInteger mThreadNum = new AtomicInteger(1);  
  
    private final String mPrefix;  
  
    private final boolean mDaemo;  
  
    private final ThreadGroup mGroup;  
  
    public NamedThreadFactory()  
    {  
        this("pool-" + POOL_SEQ.getAndIncrement(),false);  
    }  
  
    public NamedThreadFactory(String prefix)  
    {  
        this(prefix,false);  
    }  
  
    public NamedThreadFactory(String prefix,boolean daemo)  
    {  
        mPrefix = prefix + "-thread-";  
        mDaemo = daemo;  
        SecurityManager s = System.getSecurityManager();  
        mGroup = ( s == null ) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();  
    }  
  
    public Thread newThread(Runnable runnable)  
    {  
        String name = mPrefix + mThreadNum.getAndIncrement();  
        Thread ret = new Thread(mGroup,runnable,name,0);  
        ret.setDaemon(mDaemo);  
        return ret;  
    }  
  
    public ThreadGroup getThreadGroup()  
    {  
        return mGroup;  
    }  
}

 

分享到:
评论

相关推荐

    zookeeper客户端curator操作示例

    而Apache Curator是Facebook开源的一个ZooKeeper客户端库,它提供了更高级别的API,简化了ZooKeeper的使用,并增加了诸如连接管理、重试策略、事务操作等功能。 在Java开发中,Curator是使用ZooKeeper最常用且推荐...

    curator zookeeper 3.4.6 2.9.1

    5. javassist.jar:这是一个Java字节码操作库,Curator可能使用它来动态修改类的行为,例如在客户端实现更复杂的监控和管理功能。 6. swift-codec-0.16.0.jar和swift-service-0.16.0.jar:这些可能与特定项目或库...

    curator_zookeeper需要的jar

    描述提到“curator封装zookeeper需要的jar”,这意味着我们将探讨Curator如何将复杂的Zookeeper操作包装成更易于使用的Java接口。 首先,`zookeeper-3.4.6.jar`是Zookeeper的核心库,包含了Zookeeper服务器和客户端...

    zookeeper 使用 Curator 示例监听、分布式锁

    综上所述,这个示例项目将展示如何使用 Curator 对 ZooKeeper 进行数据操作,以及如何实现基于 ZooKeeper 的分布式锁,这对于构建分布式系统中的并发控制和协调至关重要。通过学习这个示例,你可以深入理解 ...

    maven-zookeeper

    总的来说,`maven-zookeeper`项目为学习和实践如何使用Curator操作Zookeeper集群提供了一个很好的起点。通过这个项目,开发者不仅可以掌握Curator的使用,还能加深对Zookeeper的理解,并熟悉Maven在项目构建中的应用...

    curator zookeeper

    curator zookeeper 3.4.6 2.9.1

    Zookeeper 原生api zkClient Curator操作

    在Java开发中,我们通常使用三种方式来操作Zookeeper:原生API、zkClient和Curator。接下来,我们将详细探讨这三种方式。 **一、Zookeeper原生API** Zookeeper提供了Java API,可以直接与Zookeeper服务器进行交互...

    zookeeper开源客户端Curator

    Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...

    zookeeper学习之三(Curator客户端)

    Curator客户端是Apache Curator框架的一部分,该框架为开发人员提供了一系列高级API和工具,用于处理常见的ZooKeeper用例,降低了使用ZooKeeper的复杂性。以下是对Curator客户端及其主要特性的详细阐述: 1. **连接...

    使用curator实现zookeeper锁服务的示例分享

    Curator是Netflix开源的一个用于简化Zookeeper使用的Java客户端库,它提供了很多高级功能,包括分布式锁、事件监听、配方等。在这个示例中,我们将使用Curator的`InterProcessMutex`类来实现分布式互斥锁(Mutex)。...

    项目加入zookeeper的依赖包(Curator框架)

    总的来说,Curator框架极大地简化了Zookeeper的使用,提供了丰富的功能和易于使用的API,使开发者能够更专注于业务逻辑而不是底层的协调细节。在项目中集成这些依赖包,能够有效地支持分布式环境下的协调需求。

    curator-test单元测试zookeeper

    使用apache curator-test单元测试zookeeper

    zookeeper_demo maven项目:包含原生API、zkclient、Curator操作

    在本项目`zookeeper_demo`中,我们将深入探讨Zookeeper的三种主要客户端库——原生API、ZkClient和Curator的操作方式,以及它们在集群配置和分布式锁等实际场景中的应用。 1. **Zookeeper原生API** Zookeeper原生...

    Zookeeper客户端Curator Framework使用代码

    Zookeeper客户端Curator Framework是Apache Zookeeper项目的一个高级封装库,它为开发者提供了一系列便捷的API,使得操作Zookeeper变得更加简单。本篇文章将深入探讨Curator Framework的主要功能、使用方法以及示例...

    ZooKeeper-Curator:zookeeper的curator客户端

    - 操作:使用`CuratorFramework`提供的API进行增删改查等操作。 - 监听:注册Watchers来监听ZNode的变化。 - 关闭:在应用结束时,记得关闭`CuratorFramework`实例,释放资源。 通过ZooKeeper-Curator,开发者...

    5、zookeeper的java -Curator(服务注册与发现)

    Curator的`ServiceCache`提供了更加高效的服务查询方式,它会在内存中缓存服务实例,减少对Zookeeper的频繁查询,同时使用Watcher机制实时更新服务列表。 总的来说,Zookeeper结合Curator的`ServiceDiscovery`机制...

    zookeeper(java使用curator操作zooke.emmx

    使用mindmaster打开

    Zookeeper开源客户端框架Curator简介与示例

    Curator 是一个基于 ZooKeeper 的开源客户端框架,它为 ZooKeeper 提供了高级抽象和功能,使得开发人员能够更方便地使用 ZooKeeper。 **Curator 框架概述** Curator 包含多个模块,如 ZooKeeper 客户端连接管理、...

    zk使用curator实现分布式锁

    在分布式系统中,确保数据的一致性和正确性是至关重要的,而ZooKeeper与Curator的结合使用就是解决这一问题的有效方案。ZooKeeper是一个高度可靠的分布式协调服务,它提供了诸如分布式锁、队列和配置管理等核心功能...

    Curator的JAR包

    `Apache Curator入门实战.docx`文档很可能是对如何使用Curator进行Zookeeper操作的详细教程,包括安装、配置、基本用法以及食谱的实践示例。这份文档对于初学者来说是一份宝贵的资源,可以帮助他们快速理解和掌握...

Global site tag (gtag.js) - Google Analytics