ZooKeeper是一个分布式协调服务,在很多开源的分布式服务中都有使用!现在介绍一下ZK的基本API的使用
ZK的主要特性:
- 客户端如果对ZK的一个数据节点注册一个Watcher监听,那么当该数据节点的内容或子节点列表发生变更时zk服务器都会向所有订阅客户端发送变更通知
- 对于在ZK上创建的临时节点,一旦客户端与服务器见的会话失败,那么该临时节点也会自动清除
- ZK将会保证客户端无法重复创建一个已经存在的数据节点
基于zk的这两个特性可以解决很多分布式问题。
ZK作为一个分布式服务框架,主要用来解决分布式数据一致性问题,它提供了简单分布式元语,并且对多种语言提供了API,下面是介绍JAVA客户端API的使用。
ZK提供的API接口一般有一个同步接口一个异步接口,使用方法基本相同。
- 客户端可以通过创建一个org.apache.zookeeper.ZooKeeper的实例来连接服务器。主要的构造方法有:
ZooKeeper(String connectString,int sessionTimeout,Watcher watcher)
ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canReadOnly)
ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd)
ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd,boolean canReadOnly)
connectString:值ZK服务器列表,如localhost:2181,127.0.0.1:2181
sessionTimeout:指会话超时时间,以毫秒为单位的整数值;在一个会话周期内,zk客户端和服务器会通过心跳维持会话的有效性,在sessionTimeout时间内,如果连接断开,zk客户端会主动和服务器建立连接
watcher:Watcher事件通知处理器
canReadOnly:这是一个boolean值,用于标识当前会话是否支持“read-only”模式。默认情况下,在ZK集群中,一个机器如果和集群中的半数以及半数以上的机器失去连接,那么这个机器将不再处理客户端请求(读请求+写请求均不处理);但是有时候我们希望在发生此类故障时不影响读取请求的处理,这个就是zk的read-only 模式
sessionId和sessionPasswd,分别代表会话Id和会话密钥。这连个参数可以唯一确定一个会话。
- 创建节点常用的接口是:
String create(final String path,byte[] data,List<ACL>acl,CreateModel createModel);
String create(final String path,byte[] data,List<ACL> acl,CreateModel,StringCallback cb,Object ctx);
如果节点已经存在则抛出一个NodeExistsException异常
回调接口
StringCallback{
public void (int rc,String path,Object ctx,String name)
}
rc是result code 服务端响应结果码。客户端可以从这个结果码中识别出API的调用结果,常见的结果码有:
0(OK),接口调用成功
-4(ConnectionLoss),客户端和服务器连接断开
-110(NodeExists) 节点已存在
-112(SessionExpired)会话已过期
path: 接口调用传入的数据节点的节点路径
ctx: 接口调用传入的ctx参数
name: 实际在服务器端创建的节点名
- 删除节点常用的接口:
void delete(final String path,int version);
void delete(final String path,int version,VoidCallback cb,Object ctx);
如果删除的节点不存在则会抛出一个NoNodeException,如果删除数据的版本号不正确则抛出一个BadVersionException
- 获取子节点列表的接口:
List<String> getChildren(final String path,Watcher watcher);
List<String> getChildren(final String path,boolean watcher);
void getChildren(final String path,Watcher watcher,ChildrenCallback cb,Object ctx);
void getChildren(final String path,boolean watcher,ChildrenCallback cb,Object ctx);
List<String> getChildren(final String path,Watcher watcher,Stat stat);
List<String> getChildren(final String path,boolean watcher,Stat stat);
void getChildren(final String path,Watcher watcher,Stat stat,ChildrenCallback cb,Object ctx);
void getChildren(final String path,boolean watcher,Stat stat,ChildrenCallback cb,Object ctx);
- 获取节点数据的接口
byte[] getData(final String path,Watcher watcher,Stat stat);
byte[] getData(final String path,boolean watcher,Stat stat);
void getData(final String path,Watcher watcher,Stat stat,DataCallback cb,Object ctx);
void getData(final String path,boolean watcher,Stat stat,DataCallback cb,Object ctx);
- 更新数据的接口
Stat setData(final String path,byte[] data,int version);
void setData(final String path,byte[] data,int version,StatCallback cb,Object ctx);
如果数据的版本不一致则抛出一个BadVersionExcepion
zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
- 检测节点是否存在的接口
Stat exists(final String path,Watcher watcher);
Stat exists(final String path,boolean watcher);
void exists(final String path, Watcher watcher,StatCallback cb,Object ctx);
void exists(final String path,boolean watcher,StatCallback cb,Object ctx);
- version 说明
version参数用于指定节点数据的版本,表明本次更新操作是针对指定数据版本进行的。这是ZK对 CAS(Compare and Swap)的实现,只有数据的版本和预期的版本一致时才会更新数据,这样可以有效避免分布式环境下并发更新的问题。如果多个客户端并发更新,则只有一个可 以更新成功,其他的因为版本不一致则不会更新数据。zk中version和传统意义上的软件版本概念上有很大的不同,在ZK中,version表示的是对 节点数据内容,子节点列表或是ACL信息修改的次数。在一个节点数据创建完毕后,其viersion=0,表示当前节点数据自创建以后被修改0次,修改一 次版本的值增加1;使用-1表示使用基于最新版本进行修改,即每次都会执行更新!version强调的是变更次数,即使前后两次变更的内容的值没有发生变化,version的值依然会变更。
在创建zk的客户端时在构造方法中默认设置一个Watcher,这个Watcher作为zk会话期间的默认watcher.同时zk也可以通过 getData, getChildren,exist三个接口向zk注册watcher.
Watcher是通知的处理器,WatchedEvent的属性有 KeeperState ,EventType
public class WatchedEvent { final private KeeperState keeperState; final private EventType eventType; private String path; /** * Create a WatchedEvent with specified type, state and path */ public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { this.keeperState = keeperState; this.eventType = eventType; this.path = path; }
WatchedEvent的KeeperStat和EventType对应关系
KeeperState | EventType | 触发条件 | 备注 |
SyncConnected | None | 客户端与服务器成功建立会话 | 会话是连接状态 |
NodeCreated | Watcher监听的节点创建成功 | ||
NodeDeleted | Watcher监听的节点被删除 | ||
NodeDataChanged | Watcher监听的节点数据内容发生变更 | ||
NodeChildrenChanged | Watcher监听的节点子列表发生变更 | ||
Disconnected | None | 客户端与服务器断开连接 | |
Expired | None | 会话超时 | |
AuthFailed | None |
- Watcher特性说明
一旦watcher被触发,ZK都会从相应的存储中移除。因此在使用Watcher时需要谨记使用前一定要注册
客户端串行执行
客户端Watcher回调的过程是一个串行同步的过程,这是为了保证顺序。同时需要谨记千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调
轻量
WatchedEvent是ZK整个Watcher通知机制的最小通知单元。从上文已经介绍了这个数据结构中只包含三部分:通知状态,事件类型,节点路径。也就是说,Watcher通知仅仅告诉客户端发生了什么事情,而不会说明事件的具体内容。
/** * * @author zhangwei_david * @version $Id: ZKDemo.java, v 0.1 2015年5月2日 上午9:10:56 zhangwei_david Exp $ */ public class ZKDemo { private static ZooKeeper authZK = null; private static String path = "/zk-demo"; /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { sync(); TimeUnit.SECONDS.sleep(300); createZKWithSessionIdAndSessionPasswd(); authCreate(); } /** * 指定权限 * * @throws Exception */ private static void authCreate() throws Exception { final CountDownLatch latch = new CountDownLatch(1); authZK = new ZooKeeper("localhost:2181", 5000, new Watcher() { public void process(WatchedEvent event) { System.out.println("监听器,监听到的事件时:" + event); if (KeeperState.SyncConnected == event.getState()) { //如果客户端已经建立连接闭锁减一 System.out.println("建立连接"); latch.countDown(); } } }); // 等待连接建立 latch.await(); // 增加权限 authZK.addAuthInfo("digest", "foo:true".getBytes()); // 判断path 是否已经存在 if (authZK.exists(path, true) == null) { authZK.create(path, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); } String str = authZK.create(path + "/auth", "test".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); System.out.println(str); } private static void createZKWithSessionIdAndSessionPasswd() throws Exception { // 初始化一个闭锁 final CountDownLatch latch = new CountDownLatch(1); /** * 创建一个ZooKeeper对象,localhost:2181是zk服务器的主机名和端口号 * 50000 sessionTimeout session超时时间 * Watcher 默认的WatchedEvent事件处理器 */ ZooKeeper originalZk = new ZooKeeper("localhost:2181", 50000, new Watcher() { public void process(WatchedEvent event) { /** * 如果zk客户端和服务器已经建立连接,闭锁减一 */ if (KeeperState.SyncConnected == event.getState()) { latch.countDown(); } } }); // 等待闭锁为0,即一直等待zk客户端和服务器建立连接 latch.await(); // 通过sessionId,和sessionPasswd创建一个复用的ZK客户端 ZooKeeper zkCopy = new ZooKeeper(path, 50000, new Watcher() { public void process(WatchedEvent event) { } }, originalZk.getSessionId(), originalZk.getSessionPasswd()); System.out.println("复用zk" + zkCopy + " original zk=" + originalZk); } /** * * * @return * @throws Exception */ private static ZooKeeper sync() throws Exception { // 初始化一个闭锁 final CountDownLatch latch = new CountDownLatch(1); /** * 创建一个ZooKeeper对象,localhost:2181是zk服务器的主机名和端口号 * 50000 sessionTimeout session超时时间 * Watcher 默认的WatchedEvent事件处理器 */ ZooKeeper zk = new ZooKeeper("localhost:2181", 50000, new Watcher() { public void process(WatchedEvent event) { System.out.println("默认监听器,KeeperStat:" + event.getState() + ", EventType:" + event.getType() + ", path:" + event.getPath()); /** * 如果zk客户端和服务器已经建立连接,闭锁减一 */ if (KeeperState.SyncConnected == event.getState()) { latch.countDown(); } } }); // 等待闭锁为0,即一直等待zk客户端和服务器建立连接 latch.await(); zk.getData(path, true, new DataCallback() { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println("rc=" + rc + ",path=" + path + "data=" + data + " Stat=" + stat + ", data=" + (data == null ? "null" : new String(data))); } }, null); // 创建一个临时节点,非安全的权限 zk.create(path + "/linshi", "ephemeral".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new StringCallback() { public void processResult(int rc, String path, Object ctx, String name) { if (rc == 0) { System.out.println("成功创建一个临时节点"); } } }, null); // 更新path节点数据 zk.setData(path, String.valueOf(new Date().getTime()).getBytes(), -1, new StatCallback() { public void processResult(int rc, String path, Object ctx, Stat stat) { if (rc == 0) { System.out.println("成功更新节点数据, rc=0, path=" + path + ", stat=" + stat); } } }, null); /** * 如果该节点不存在则创建持久化的节点 * ZK的节点有三位,持久化节点(PERSISTENT),临时节点(EPHEMERAL),顺序节点(SEQUENTIAL) * 具体的组合有 /** PERSISTENT, //持久化序列节点 PERSISTENT_SEQUENTIAL, EPHEMERAL , //临时序列节点 EPHEMERAL_SEQUENTIAL * */ // 创建一个持久化节点,如果该节点已经存在则不需要再次创建 if (zk.exists(path, true) == null) { zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 删除持久化序列节点 try { zk.delete(path + "/seq ", 1, new VoidCallback() { public void processResult(int rc, String path, Object ctx) { System.out.println("删除" + ZKDemo.path + "/seq 的结果是:rc=" + rc + " path=" + path + ",context=" + ctx); } }, null); } catch (Exception e) { // 示例代码创建新的持久节点前,如果以前存在则删除,如果不存在则或抛出一个NoNodeException } // 创建一个持久化序列节点 String seqPath = zk .create(path + "/seq ", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //读取子节点列表,并注册一个监听器 zk.getChildren(path, new Watcher() { public void process(WatchedEvent event) { System.out.println(event); } }); //更新节点数据 zk.setData(seqPath, "seqDemo".getBytes(), -1); //创建临时节点 zk.create(path + "/test", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // 删除节点 zk.delete(path + "/test", 0); // 异步创建临时序列节点 zk.create(path + "/test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("创建临时节点的结果是:result code=" + rc + ", path=" + path + " context=" + ctx + ", name=" + name); } }, "Test Context"); TimeUnit.SECONDS.sleep(5); // 更新数据 if (zk.exists(path + "/test", true) != null) { zk.setData(path + "/test", "testData".getBytes(), -1); } zk.create(path + "/test2", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // 使用默认监听器,获取子节点 List<String> znodes = zk.getChildren(path, true); // 获取所有节点下的数据 Stat stat = new Stat(); for (String str : znodes) { byte[] data = zk.getData(path + "/" + str, true, stat); System.out.println("获取节点:" + str + " 的数据是:" + (data == null ? "null" : new String(data))); } return zk; } }运行的结果是:
相关推荐
本文将深入探讨ZooKeeper客户端API的使用方法和核心概念。 ### 1. 连接ZooKeeper 在使用ZooKeeper客户端API之前,首先需要建立与ZooKeeper服务器的连接。这通常通过`ZooKeeper`类的构造函数完成,传入服务器地址...
在Java开发中,我们通常使用三种方式来操作Zookeeper:原生API、zkClient和Curator。接下来,我们将详细探讨这三种方式。 **一、Zookeeper原生API** Zookeeper提供了Java API,可以直接与Zookeeper服务器进行交互...
本API文档是由doxygen工具自动生成,它详细解析了`zookeeper.h`头文件中的函数、数据类型、枚举和宏定义,帮助开发者理解和使用ZooKeeper的C接口。 首先,我们需要理解ZooKeeper的核心概念。ZNode是ZooKeeper中的...
ZooKeeper基本概念** - **节点(ZNode)**: ZooKeeper的数据存储结构是树形的,每个节点称为ZNode,包括数据、元数据和ACL(访问控制列表)。 - **会话(Session)**: 一个客户端连接到ZooKeeper服务器的实例被称为...
本篇将详细介绍Zookeeper的Java API使用方法以及其在实际场景中的应用。 ### 1. Zookeeper基本概念 - **节点(Znode)**: Zookeeper中的数据存储单元,分为临时节点和持久节点。 - **会话(Session)**: 用户与...
下面是一个完整的 Java 示例,演示了如何使用 Zookeeper API 创建节点、设置数据、获取数据等: ```java import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache....
在实际使用中,ZooKeeper提供了丰富的API,如创建、删除、更新和读取ZNode,以及设置和触发Watcher。例如,`Demo1.java`和`Demo2.java`可能包含这样的示例代码: ```java import org.apache.zookeeper.*; public ...
本文将深入探讨如何使用Java客户端API来对Zookeeper的Znode进行增删改查操作,并讨论同步与异步两种方式的使用。 首先,要使用Java客户端API访问Zookeeper,你需要引入相关的依赖。在Maven项目中,可以在pom.xml...
4. **数据操作**:Curator 提供了简单的 API 来读写 ZooKeeper 节点的数据,如 `create()`, `setData()`, `getData()` 等。例如,创建节点和设置数据: ```java client.create().creatingParentsIfNeeded().for...
ZooKeeper 的客户端是连接到 ZooKeeper 服务器的接口,它提供了丰富的 API 供开发者操作 ZooKeeper 的数据模型。ZooKeeper 的数据模型基于树形结构,每个节点称为 ZNode,ZNode 可以存储数据,也可以作为路径,用于...
下面将详细介绍ZooKeeper的基本概念、使用方法和实践场景。 基本概念: 1. 分布式协调服务:ZooKeeper设计之初,就是为了提供高效可靠的分布式协调服务。在分布式系统中,多个节点可能需要进行协调工作,如配置同步...
在 ZooKeeper 的 Java API 中,开发人员可以使用 Executor 类来维护与服务器端的连接,DataMonitor 类来监视 Znode 节点的数据。Executor 类包括 ZooKeeper 对象和 DataMonitor 对象,负责维护 ZooKeeper 连接,监视...
ZkClient是基于Zookeeper原生API封装的轻量级客户端,它简化了API的使用,提供了更友好的API接口和线程安全的访问模式。ZkClient支持事件监听、异步操作,以及自动重连和会话过期处理等功能,使得在开发中更加便捷...
在本文中,我们将深入理解Zookeeper的基本操作,包括通过shell客户端进行交互,以及使用Java API进行编程,并探讨其在实际应用中的示例。 一、Zookeeper Shell操作 1. 连接Zookeeper客户端:使用`zkCli.sh`命令...
安装完成后,你可以进一步学习Zookeeper的基本操作,如创建、删除节点,以及利用Java API和Curator框架进行更复杂的分布式操作,如服务注册与发现、分布式锁等。Zookeeper作为分布式系统中的重要组件,其稳定性和...
**正文** 在分布式系统领域,ZooKeeper 是一...通过本文的介绍,你应该对 ZooKeeper 客户端的基本使用有了初步认识。为了深入理解,可以继续学习 ZooKeeper 的高级特性,如 ZNode 的权限控制、Watcher 的触发机制等。
ZooKeeper是一个专门为大型分布式系统设计的高可用协调服务,其目标是简化并封装复杂的、易出错的关键服务,提供简单易用的API和高效、稳定的系统给用户。ZooKeeper的功能主要包括配置管理、命名服务、分布式同步...
8. **JavaApiSample.java**:这个文件可能是示例代码,演示了如何在Java中使用Zookeeper API进行基本操作,如连接、创建节点、读取数据等。 通过学习这些内容,开发者能够掌握如何在Java环境中有效地使用Zookeeper...
2. Curator:Curator 是由 Netflix 开发并贡献给 Apache 的 ZooKeeper 客户端库,它提供了一套高级API,使得开发者能够更方便地处理 ZooKeeper 中常见的任务,例如创建、删除节点、设置/获取节点数据,以及实现...
5. **API**:Zookeeper提供了丰富的Java API,供开发者在应用程序中使用,包括同步和异步的操作,以及watcher的注册和处理。 6. **故障恢复与一致性**:Zookeeper通过多数派投票机制保证数据一致性,即使在部分节点...