`

zookeeper学习笔记-zkclient,curator使用(转)

 
阅读更多

 

转自:http://blog.csdn.net/zhousenshan/article/details/52357558

 

开源客户端,原生api的不足 

连接的创建是异步的,需要开发人员自行编码实现等待 
连接没有自动的超时重连机制 
Zk本身没提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化 
Watcher注册一次只会生效一次,需要不断的重复注册 
Watcher的使用方式不符合java本身的术语,如果采用监听器方式,更容易理解 
不支持递归创建树形节点 

开源客户端---ZkClient介绍 

Github上一个开源的zk客户端,由datameer的工程师Stefan Groschupf和Peter Voss一起开发 
– 解决session会话超时重连 
– Watcher反复注册 
– 简化开发api 
– 还有..... 
– https://github.com/sgroschupf/zkclient 

开源客户端---Curator介绍 
1. 使用CuratorFrameworkFactory工厂的两个静态方法创建客户端 
a) static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, 
RetryPolicy retryPolicy) 
b) static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) 
2. Start()方法启动 
参数说明 
connectString 分开的ip:port对 
retryPolicy 重试策略,默认四种:Exponential BackoffRetry,RetryNTimes ,RetryOneTime, 
RetryUntilElapsed 
sessionTimeoutMs 会话超时时间,单位为毫秒,默认60000ms 
connectionTimeoutMs 连接创建超时时间,单位为毫秒,默认是15000ms 

重试策略 
– 实现接口RetryPolicy可以自定重重试策略 
• boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) 

retryCount 已经重试的次数,如果第一次重试 此值为0 
elapsedTimeMs 重试花费的时间,单位为毫秒 
sleeper 类似于Thread.sleep,用于sleep指定时间 
返回值 如果还会继续重试,则返回true 
四种默认重试策略 
– ExponentialBackoffRetry 
• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 
• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 
• 当前应该sleep的时间: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),随着重试次数增加重试时间间隔变大,指数倍增长 

参数说明 
baseSleepTimeMs 初始sleep时间 
maxRetries 最大重试次数 
maxSleepMs 最大重试时间 
返回值 如果还会继续重试,则返回true 

默认重试策略 
– RetryNTimes 
• RetryNTimes(int n, int sleepMsBetweenRetries) 
• 当前应该sleep的时间 
参数说明 
n 最大重试次数 
sleepMsBetweenRetries 每次重试的间隔时间 

– RetryOneTime 
• 只重试一次 
• RetryOneTime(int sleepMsBetweenRetry), sleepMsBetweenRetry为重试间隔的时间 

默认重试策略 
– RetryUntilElapsed 
• RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 
• 重试的时间超过最大时间后,就不再重试 
参数说明 
maxElapsedTimeMs 最大重试时间 
sleepMsBetweenRetries 每次重试的间隔时间 

Fluent风格的API 
– 定义:一种面向对象的开发方式,目的是提高代码的可读性 
– 实现方式:通过方法的级联或者方法链的方式实现 
– 举例: 
        zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();

创建节点 
– 构建操作包装类(Builder): CreateBuilder create()---- CuratorFramework 
– CreateBuilder 
• creatingParentsIfNeeded() //递归创建父目录 
• withMode(CreateMode mode)//设置节点属性,比如:CreateMode.PERSISTENT,如果是递归创建,创建模式 
为临时节点,则只有叶子节点是临时节点,非叶子节点都为持久节点 
• withACL(List aclList) //设置acl 
• forPath(String path) //指定路劲 

删除节点 
– 构建操作包装类(Builder):DeleteBuilder delete() -----CuratorFramework 
– DeleteBuilder 
• withVersion(int version) //特定版本号 
• guaranteed() //确保节点被删除 
• forPath(String path) //指定路径 
• deletingChildrenIfNeeded() //递归删除所有子节点 

关于guaranteed: 
Solves edge cases where an operation may succeed on the server but connection failure 
occurs before a response can be successfully returned to the client 
意思是:解决当某个删除操作在服务器端可能成功,但是此时客户端与服务器端的连接中断,而删除的响 
应没有成功返回到客户端 
底层的本质是重试 


关于异步操作 
– inBackground() 
– inBackground(Object context) 
– inBackground(BackgroundCallback callback) 
– inBackground(BackgroundCallback callback, Object context) 
– inBackground(BackgroundCallback callback, Executor executor) 
– inBackground(BackgroundCallback callback, Object context, Executor executor) 
从参数看跟zk的原生异步api相同,多了一个线程池,用于执行回调 

读取数据 
– 构建操作包装类(Builder): GetDataBuilder getData() -----CuratorFramework 
– GetDataBuilder 
• storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象 
• Byte[] forPath(String path)//节点路径 

读取子节点 
– 构建操作包装类(Builder): GetChildrenBuilder getChildren() -----CuratorFramework 
– GetChildrenBuilder 
• storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象 
• Byte[] forPath(String path)//节点路径 
• usingWatcher(org.apache.zookeeper.Watcher watcher) //设置watcher,类似于zk本身的api,也只能使用一次 
• usingWatcher(CuratorWatcher watcher) //设置watcher ,类似于zk本身的api,也只能使用一次 


设置watcher 
– NodeCache 
• 监听数据节点的内容变更 
• 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听 
– PathChildrenCache 
• 监听指定节点的子节点变化情况 
• 包括:新增子节点 子节点数据变更 和子节点删除 
NodeCache 
– 构造函数 
• NodeCache(CuratorFramework client, String path) 
• NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) 
参数说明 
client 客户端实例 
path 数据节点路径 
dataIsCompressed 是否进行数据压缩 
– 回调接口 
• public interface NodeCacheListener 
void nodeChanged() //没有参数,怎么获取事件信息以及节点数据? 
PathChildrenCache 
client 客户端实例 
path 数据节点路径 
dataIsCompressed 是否进行数据压缩 
cacheData 用于配置是否把节点内容缓存起来,如果配置为true,那么客户端在接 
收到节点列表变更的同时,也能够获取到节点的数据内容,如果为false 
则无法取到数据内容 
threadFactory 通过这两个参数构造专门的线程池来处理事件通知 
executorService 

PathChildrenCache 
– 监听接口 
• 时间类型包括:新增子节点(CHILD_ADDED),子节点数据变更(CHILD_UPDATED),子节点删除(CHILD_REMOVED)
– PathChildrenCache.StartMode 
• BUILD_INITIAL_CACHE //同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据 
• NORMAL //异步初始化cache 
• POST_INITIALIZED_EVENT //异步初始化,初始化完成触发事件PathChildrenCacheEvent.Type.INITIALIZED 

zkclient举例 

Java代码  收藏代码
  1. package com.zk.dev.zkClient.day1;  
  2.   
  3. import org.I0Itec.zkclient.IZkDataListener;  
  4. import org.I0Itec.zkclient.ZkClient;  
  5. import org.junit.After;  
  6. import org.junit.Before;  
  7. import org.junit.Test;  
  8.   
  9. import java.util.concurrent.TimeUnit;  
  10.   
  11. public class ZKTest  {  
  12.   
  13.     private ZkClient zk;  
  14.   
  15.     private String nodeName = "/test";  
  16.   
  17.     @Before  
  18.     public void initTest() {  
  19.         zk = new ZkClient("localhost:2181");  
  20.     }  
  21.   
  22.     @After  
  23.     public void dispose() {  
  24.         zk.close();  
  25.     }  
  26.   
  27.     @Test  
  28.     public void testListener() throws InterruptedException {  
  29.         // 监听指定节点的数据变化  
  30.   
  31.         zk.subscribeDataChanges(nodeName, new IZkDataListener() {  
  32.             public void handleDataChange(String s, Object o) throws Exception {  
  33.                 System.out.println("node data changed!");  
  34.                  System.out.println("node=>" + s);  
  35.                  System.out.println("data=>" + o);  
  36.                  System.out.println("--------------");  
  37.             }  
  38.   
  39.             public void handleDataDeleted(String s) throws Exception {  
  40.                 System.out.println("node data deleted!");  
  41.                 System.out.println("s=>" + s);  
  42.                 System.out.println("--------------");  
  43.   
  44.             }  
  45.         });  
  46.   
  47.                 System.out.println("ready!");  
  48.   
  49.         // junit测试时,防止线程退出  
  50.         while (true) {  
  51.             TimeUnit.SECONDS.sleep(5);  
  52.         }  
  53.     }  
  54.   
  55.     @Test  
  56.     public void testUpdateConfig() throws InterruptedException {  
  57.         if (!zk.exists(nodeName)) {  
  58.             zk.createPersistent(nodeName);  
  59.         }  
  60.         zk.writeData(nodeName, "1");  
  61.         zk.writeData(nodeName, "2");  
  62.         zk.delete(nodeName);  
  63.         zk.delete(nodeName);   
  64.         zk.writeData("/test/ba""bbb");  
  65.     }  
  66. }  


curator举例 

Java代码  收藏代码
  1. package com.zk.dev.zkClient.day1;  
  2.   
  3. import org.apache.curator.RetryPolicy;  
  4. import org.apache.curator.framework.CuratorFramework;  
  5. import org.apache.curator.framework.CuratorFrameworkFactory;  
  6. import org.apache.curator.framework.recipes.cache.NodeCache;  
  7. import org.apache.curator.framework.recipes.cache.NodeCacheListener;  
  8. import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
  9. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
  10. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
  11. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
  12. import org.apache.curator.retry.ExponentialBackoffRetry;  
  13. import org.apache.zookeeper.CreateMode;  
  14. import org.apache.zookeeper.ZooDefs.Ids;  
  15.   
  16. /** 
  17.  * @see 测试curator框架例子 
  18.  * @Author:xuehan 
  19.  * @Date:2016年5月14日下午8:44:49 
  20.  */  
  21. public class CuratorUtils {  
  22.       
  23.     public String connectString = "localhost:2181";  
  24.     CuratorFramework  zkclient = null ;  
  25.     public CuratorUtils(){  
  26.         /** 
  27.          * connectString连接字符串中间用分号隔开,sessionTimeoutMs session过期时间,connectionTimeoutMs连接超时时间,retryPolicyc连接重试策略 
  28.          */  
  29.         //CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy)  
  30.         // fluent风格aip   
  31.   //    CuratorFrameworkFactory.builder().sessionTimeoutMs(5000).connectString(connectString).namespace("/test").build();  
  32.         // 重连策略,没1一秒重试一次,最大重试次数3次  
  33.         RetryPolicy retryPolicy = new ExponentialBackoffRetry(10003);  
  34.         zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();  
  35.         zkclient.start();  
  36.     }  
  37.     /** 
  38.      * 递归创建节点 
  39.      * @param path 
  40.      * @param data 
  41.      * @throws Exception 
  42.      */  
  43.     public void createNode(String path, byte[] data) throws Exception{  
  44.         zkclient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, data);  
  45.     }  
  46.     /** 
  47.      * 递归删除节点 
  48.      * @param path 
  49.      * @throws Exception 
  50.      */  
  51.     public void delNode(String path) throws Exception{  
  52.         zkclient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);  
  53.     }   public void zkClose(){  
  54.         zkclient.close();  
  55.     }  
  56.     public void delNodeCallBack(String path) throws Exception{  
  57.         zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);  
  58.     }      
  59.     public void dataChanges(String path) throws Exception{  
  60.         final NodeCache  dataWatch =  new NodeCache(zkclient, path);  
  61.         dataWatch.start(true);  
  62.         dataWatch.getListenable().addListener(new NodeCacheListener(){  
  63.   
  64.             public void nodeChanged() throws Exception {  
  65.                 System.out.println("path==>" + dataWatch.getCurrentData().getPath() + "==data==>" + new String(dataWatch.getCurrentData().getData()));  
  66.             }  
  67.               
  68.         });  
  69.         zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);  
  70.     }      
  71.     public void addChildWatcher(String path) throws Exception{  
  72.         final PathChildrenCache pc = new PathChildrenCache(zkclient, path, true);  
  73.         pc.start(StartMode.POST_INITIALIZED_EVENT);  
  74.         System.out.println("节点个数===>" + pc.getCurrentData().size());  
  75.         pc.getListenable().addListener(new  PathChildrenCacheListener() {  
  76.               
  77.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
  78.                 System.out.println("事件监听到"  + event.getData().getPath());  
  79.                 if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){  
  80.                     System.out.println("客户端初始化节点完成"  + event.getData().getPath());  
  81.                 }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){  
  82.                     System.out.println("添加节点完成"  + event.getData().getPath());  
  83.                 }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){  
  84.                     System.out.println("删除节点完成"  + event.getData().getPath());  
  85.                 }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){  
  86.                     System.out.println("修改节点完成"  + event.getData().getPath());  
  87.                 }  
  88.             }  
  89.         });  
  90.           
  91.     }  
  92.       
  93.     public static void main(String[] args) throws Exception{  
  94.         CuratorUtils cu = new CuratorUtils();  
  95. //      cu.createNode("/test/sb/aa/bb", "erhu".getBytes());  
  96. //      cu.delNode("/test");  
  97.           
  98.         cu.zkclient.setData().forPath("/aa""love is not".getBytes());  
  99.         cu.addChildWatcher("/aa");  
  100.         try{  
  101.             Thread.sleep(20000000);  
  102.         }catch(Exception e){};  
  103.     }  
  104.  }  
分享到:
评论

相关推荐

    apache-zookeeper(apache-zookeeper-3.7.1-bin.tar.gz)

    apache-zookeeper分布式框架,压缩包内容:(apache-zookeeper-3.7.1-bin.tar.gz、apache-zookeeper-3.7.1.tar.gz、apache-zookeeper-3.6.4-bin.tar.gz、apache-zookeeper-3.6.4.tar.gz、apache-zookeeper-3.5.10-...

    5、zookeeper的java -Curator(服务注册与发现)

    Curator的`ServiceCache`提供了更加高效的服务查询方式,它会在内存中缓存服务实例,减少对Zookeeper的频繁查询,同时使用Watcher机制实时更新服务列表。 总的来说,Zookeeper结合Curator的`ServiceDiscovery`机制...

    zookeeper-3.4.5-cdh5.16.2.tar.gz

    《Zookeeper-3.4.5-cdh5.16.2:分布式协调服务的核心解析》 Apache ZooKeeper,一个高度可靠的分布式协调系统,是大数据生态中的重要组件。本资源包"zookeeper-3.4.5-cdh5.16.2.tar.gz"包含了Zookeeper的3.4.5版本...

    apache-zookeeper-3.5.10-bin 环境搭配

    apache-zookeeper-3.5.10-bin 环境搭配 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,...

    zookeeper-3.4.5-cdh5.15.1.tar.gz

    下载完成后,使用`tar -zxvf zookeeper-3.4.5-cdh5.15.1.tar.gz`命令进行解压。 2. **配置环境**:解压后,进入解压后的目录,找到conf目录下的zoo.cfg配置文件。这是Zookeeper的主要配置文件,需要根据实际环境...

    apache-zookeeper-3.6.2-bin.tar

    apache-zookeeper-3.6.2-bin.tar apache-zookeeper-3.6.2-bin.tar apache-zookeeper-3.6.2-bin.tar apache-zookeeper-3.6.2-bin.tar apache-zookeeper-3.6.2-bin.tar apache-zookeeper-3.6.2-bin.tar apache-...

    apache-zookeeper-3.6.3-bin.tar的压缩包,解压到本地即可使用,还有zk.sh的脚本以及zoo.cfg

    apache-zookeeper-3.6.3-bin.tar的压缩包,解压到本地即可使用,还有zk.sh的脚本以及zoo.cfg和xsync。ZooKeeper 是一个分布式协调服务 ,由 Apache 进行维护。ZooKeeper 可以视为一个高可用的文件系统。ZooKeeper ...

    apache-zookeeper-3.5.7-bin.tar.gz

    这个压缩包"apache-zookeeper-3.5.7-bin.tar.gz"是Zookeeper的可执行二进制文件包,用于在Linux或Unix类操作系统上部署和运行Zookeeper服务器。 **Zookeeper的基本概念:** 1. **节点(ZNode)**:Zookeeper的数据...

    apache-zookeeper-3.7.1-bin.tar.gz

    apache-zookeeper-3.7.1-bin.tar.gz 内容概要:通过带着读者手写简化版Spring框架,了解Spring核心原理。在手写Spring源码的过程中会摘取整体框架中的核心逻辑,简化代码实现过程,保留核心功能,例如:IOC, AOP、 Bean...

    apache-zookeeper-3.5.9-bin.tar.gz

    在"apache-zookeeper-3.5.9-bin.tar.gz"这个压缩包中,包含了Zookeeper 3.5.9版本的二进制发行版,用于在各种操作系统上部署和运行Zookeeper服务。 1. **Zookeeper基本概念** - **Znode**: Zookeeper中的数据存储...

    apache-zookeeper-3.6.3-bin.zip

    打开“系统属性” -&gt; “高级” -&gt; “环境变量”,在“系统变量”部分新建一个变量,变量名为"ZOOKEEPER_HOME",变量值设置为Zookeeper解压后的路径,即"C:\Zookeeper\apache-zookeeper-3.6.3-bin"。 然后,在系统...

    最新版linux apache-zookeeper-3.7.0-bin.tar.gz

    - 解压`apache-zookeeper-3.7.0-bin.tar.gz`到你选择的目录,例如 `/opt`. - 配置`conf/zoo.cfg`,设置数据目录(dataDir)和日志目录(dataLogDir),并根据需求调整其他配置。 - 初始化数据目录,创建myid文件...

    apache-zookeeper-3.6.1-bin.tar.gz

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性...window下先解压为apache-zookeeper-3.6.1-bin.tar.gz

    apache-zookeeper-3.8.4-bin.tar

    1. 解压`apache-zookeeper-3.8.4-bin.tar`到指定目录。 2. 配置`conf/zoo.cfg`,设置服务器ID、数据存储路径、集群配置等。 3. 启动Zookeeper服务,使用`bin/zkServer.sh start`命令。 4. 使用`bin/zkCli.sh`命令行...

    zookeeper-3.4.5-cdh5.10.0.tar.gz

    Zookeeper是Apache Hadoop项目中的一个关键组件,主要用于分布式系统...通过"zookeeper-3.4.5-cdh5.10.0.tar.gz"这个工具包,我们可以方便地在CDH 5.10.0环境中部署和使用Zookeeper,从而提升整个Hadoop集群的稳定性。

    apache-zookeeper-3.5.8-bin.tar.gz

    解压压缩包的命令通常为`tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz`,这会在当前目录下创建一个名为"apache-zookeeper-3.5.8-bin"的文件夹。 Zookeeper的核心概念包括: 1. **节点(ZNode)**:Zookeeper的...

    apache-zookeeper-3.6.4-bin.tar.gz

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、...

    Zookeeper 原生api zkClient Curator操作

    在Java开发中,我们通常使用三种方式来操作Zookeeper:原生API、zkClient和Curator。接下来,我们将详细探讨这三种方式。 **一、Zookeeper原生API** Zookeeper提供了Java API,可以直接与Zookeeper服务器进行交互...

    twill-zookeeper-0.6.0-incubating-API文档-中文版.zip

    赠送jar包:twill-zookeeper-0.6.0-incubating.jar; 赠送原API文档:twill-zookeeper-0.6.0-incubating-javadoc.jar; 赠送源代码:twill-zookeeper-0.6.0-incubating-sources.jar; 赠送Maven依赖信息文件:twill-...

    apache-zookeeper-3.7.0-bin.tar.gz

    apache-zookeeper-3.7.0-bin.tar.gz

Global site tag (gtag.js) - Google Analytics