关于Curator不多介绍,网上很多,可以参考这篇:
引用
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import com.zgjky.hec.common.constants.Constants;
/**
* 客户端连接zookeeper的工具类 通过Curator 客户端框架实现
* @author sheungxin
*
*/
public class CuratorClient {
private CuratorFramework curatorClient;
private String namespace;
private String connStr;
private int retryInterval;
private int maxRetries;
private int connTimeoutMs;
private int sessionTimeoutMs;
public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries,int connTimeoutMs,int sessionTimeoutMs){
this.namespace=namespace;
this.connStr=connStr;
this.retryInterval=retryInterval;
this.maxRetries=maxRetries;
this.connTimeoutMs=connTimeoutMs;
this.sessionTimeoutMs=sessionTimeoutMs;
connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries){
this.namespace=namespace;
this.connStr=connStr;
this.retryInterval=retryInterval;
this.maxRetries=maxRetries;
this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
public CuratorClient(String namespace,String connStr){
this.namespace=namespace;
this.connStr=connStr;
this.retryInterval=Constants.ZK_RETRY_INTERVAL;
this.maxRetries=Constants.ZK_MAX_RETRIES;
this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
connect(namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
public CuratorClient(String connStr){
this.namespace=Constants.ZK_NAMESPACE;
this.connStr=connStr;
this.retryInterval=Constants.ZK_RETRY_INTERVAL;
this.maxRetries=Constants.ZK_MAX_RETRIES;
this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
/**
* 创建到zookeeper的连接
* @param connectionString zookeeper服务器连接字符串 格式: ip/域名:port 127.0.0.1:2181
* @param retryInterval 重试间隔
* @param maxRetries 重试次数
* @param connectionTimeoutMs 连接timeout时间
* @param sessionTimeoutMs session 失效时间
* @return
*/
public void connect(String namespace,String connStr, int retryInterval,int maxRetries, int connTimeoutMs,int sessionTimeoutMs){
ACLProvider aclProvider=new ACLProvider() {
private List<ACL> aclList;
@Override
public List<ACL> getDefaultAcl() {
if(aclList==null){
aclList=ZooDefs.Ids.CREATOR_ALL_ACL;
aclList.clear();
aclList.add(new ACL(Perms.ALL, new Id("auth",Constants.ZK_AUTH)));
}
return aclList;
}
@Override
public List<ACL> getAclForPath(String arg0) {
return aclList;
}
};
curatorClient = CuratorFrameworkFactory.builder()
.aclProvider(aclProvider)
.authorization("digest", Constants.ZK_AUTH.getBytes())
.namespace(namespace)
.connectString(connStr)
.retryPolicy(new ExponentialBackoffRetry(retryInterval, maxRetries))
.connectionTimeoutMs(connTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.build();
start();
}
/**
* 添加连接状态监听器
* @param listener
*/
public void addConnListener(ConnectionStateListener listener){
curatorClient.getConnectionStateListenable().addListener(listener);
}
public void start(){
if(curatorClient.getState()!=CuratorFrameworkState.STARTED){
curatorClient.start();
}
}
public void disconnect(){
CloseableUtils.closeQuietly(curatorClient);
}
public CuratorFrameworkState getState(){
return curatorClient.getState();
}
/**
* 创建全局可见的持久化节点
* @param path 节点路径
* @param payload 节点数据
* @throws Exception
*/
public void createNodes(String path, byte[] payload) throws Exception{
curatorClient.create().creatingParentsIfNeeded().forPath(path, payload);
}
/**
* 创建全局可见的持久化节点
* @param path 节点路径
* @throws Exception
*/
public void createNodes(String path) throws Exception{
curatorClient.create().creatingParentsIfNeeded().forPath(path);
}
/**
* 创建全局可见的临时节点
* @param path 节点路径
* @param payload 节点数据
* @throws Exception
*/
public void createEphemeral(String path, byte[] payload) throws Exception{
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
}
/**
* 创建全局可见的临时节点
* @param path
* @throws Exception
*/
public String createEphemeral(String path) throws Exception{
return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
}
/**
* 临时顺序节点
* @param path
* @return
* @throws Exception
*/
public String createEphemeralSeq(String path) throws Exception{
return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
/**
* 写入节点数据
* @param path 节点路径
* @param payload 节点数据
* @throws Exception
*/
public void setData(String path, byte[] payload) throws Exception{
curatorClient.setData().forPath(path,payload);
}
/**
* 获取节点数据
* @param path 节点路径
* @return
* @throws Exception
*/
public String getData(String path) throws Exception{
String result=null;
if(isNodeExists(path)){
result=new String(curatorClient.getData().forPath(path),Constants.ZK_CHARSET);
}
return result;
}
/**
* 获取节点数据
* @param path 节点路径
* @return
* @throws Exception
*/
public List<String> getChildData(String path) throws Exception{
List<String> resultList=null;
if(isNodeExists(path)){
resultList=curatorClient.getChildren().forPath(path);
}
return resultList;
}
/**
* 判断节点是否存在
* @param path
* @return
*/
public boolean isNodeExists(String path){
boolean flag=false;
try {
flag=curatorClient.checkExists().forPath(path)!=null?true:false;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* 删除指定节点
* @param path
* @throws Exception
*/
public void delNode(String path) throws Exception{
curatorClient.delete().guaranteed().inBackground().forPath(path);
}
/**
* 监控子节点的变化:添加、修改、删除
* @param path
* @param childCacheListener
* @return
* @throws Exception
*/
public PathChildrenCache pathChildrenCache(String path,PathChildrenCacheListener childCacheListener) throws Exception {
final PathChildrenCache childCache = new PathChildrenCache(curatorClient, path, true);
childCache.getListenable().addListener(childCacheListener);
return childCache;
}
/**
* 监控节点的变化:增加、修改
* @param path
* @param nodeCacheListener
* @return
*/
public NodeCache nodeCache(String path,NodeCacheListener nodeCacheListener) {
final NodeCache nodeCache = new NodeCache(curatorClient, path);
nodeCache.getListenable().addListener(nodeCacheListener);
return nodeCache;
}
/**
* 监控节点及子节点的变化:增加、修改、删除
* @param path
* @param treeCacheListener
* @return
*/
public TreeCache nodeCache(String path,TreeCacheListener treeCacheListener) {
final TreeCache treeCache = new TreeCache(curatorClient, path);
treeCache.getListenable().addListener(treeCacheListener);
return treeCache;
}
public void addCuratorListener(CuratorListener curatorListener){
curatorClient.getCuratorListenable().addListener(curatorListener);
}
}
分享到:
相关推荐
Curator 是一个基于 ZooKeeper 的开源客户端框架,它为 ZooKeeper 提供了高级抽象和功能,使得开发人员能够更方便地使用 ZooKeeper。 **Curator 框架概述** Curator 包含多个模块,如 ZooKeeper 客户端连接管理、...
Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...
浅谈Zookeeper开源客户端框架Curator Curator是一个基于Zookeeper的开源客户端框架,由Netflix开发,旨在解决Zookeeper客户端使用过程中的各种问题。Curator提供了封装ZooKeeper client与ZooKeeper server之间的...
Curator客户端是Apache Curator框架的一部分,该框架为开发人员提供了一系列高级API和工具,用于处理常见的ZooKeeper用例,降低了使用ZooKeeper的复杂性。以下是对Curator客户端及其主要特性的详细阐述: 1. **连接...
一、Apache Curator:Zookeeper客户端的利器 Apache Curator是Apache软件基金会的一个项目,它为Zookeeper提供了丰富的Java客户端库,简化了与Zookeeper交互的复杂性。在`apache-curator-2.5.0-source-release.zip`...
而Curator是Facebook开源的一个基于Zookeeper的客户端框架,它对Zookeeper的API进行了封装,提供了更高级别的抽象,使得开发者能够更方便地与Zookeeper进行交互。 标题提到的“项目加入Zookeeper的依赖包(Curator...
在Java环境中,Curator是一个优秀的Zookeeper客户端库,简化了与Zookeeper的交互,包括服务注册与发现。本文将深入探讨如何利用Curator实现这一功能。 首先,Curator提供了一套完整的API来抽象服务注册与发现,包括...
ZkClient是一个轻量级的Zookeeper客户端,它对原生API进行了封装,使得操作更加简单易用。主要特点包括: 1. **更友好的API**:ZkClient提供了一些简洁的接口,如`createEphemeral()`、`readData()`、`writeData()`...
Curator 是 Apache ZooKeeper 的一个客户端库,提供了丰富的工具和模式,简化了 ZooKeeper 的使用。本示例将详细介绍如何利用 Curator 在 ZooKeeper 上进行数据操作以及实现分布式锁。 一、ZooKeeper 与 Curator 的...
Curator是Netflix公司开源的一个针对ZooKeeper的高级Java客户端框架。它不仅简化了ZooKeeper的使用,还提供了丰富的抽象和工具类,使得开发者能够更加专注于业务逻辑而不用过多关注ZooKeeper的具体实现细节。Curator...
而Curator则是Zookeeper的一个客户端库,为开发者提供了更高级别的抽象和工具,简化了Zookeeper的使用。 Zookeeper 3.4.6是其稳定且广泛采用的一个版本,它提供了丰富的API和强大的一致性模型。在这个版本中,...
ZooKeeper-Curator是Apache ZooKeeper的一个高级客户端,它提供了许多实用工具和抽象,使得与ZooKeeper交互变得更加简单和可靠。ZooKeeper是一个分布式协调服务,广泛用于管理分布式应用程序的状态,实现一致性、...
Apache Curator 是一个高度封装的 ZooKeeper Java 客户端库,它简化了与 ZooKeeper 交互的复杂性,提供了更高级别的抽象和实用工具。ZooKeeper 是一个分布式的,开放源码的协调服务,用于分布式应用程序,提供命名...
Curator是Apache的一个顶级项目,它是对Zookeeper客户端API的进一步封装,提供了更高的抽象层次和更多的实用工具,如recipes(配方),如分布式锁、领导选举、队列和命名服务等。Curator的出现降低了使用Zookeeper...
Curator是Netflix开源的ZooKeeper客户端框架,它提供了一套高级API来简化ZooKeeper客户端的使用。Curator封装了许多常见的操作,使得开发者可以更加方便地利用ZooKeeper的特性,无需深入底层细节。Curator的常用API...
接着,创建Zookeeper客户端实例是客户端应用的第一步。通常,我们会使用`ZooKeeper`构造函数,指定连接字符串(包含Zookeeper服务器地址和端口)、会话超时时间及一个Watcher对象,用于监听连接状态: ```java ...
1. `curator-client-2.8.0.jar`:这是Curator的基本客户端库,提供了连接Zookeeper服务器、执行基本操作(如创建、删除节点)的类和方法。例如,`org.apache.curator.framework.CuratorFramework`是Curator的主要...
Apache Curator是Facebook开源的Zookeeper客户端,其设计目标是提供一个更加易用、功能强大且稳定的Zookeeper客户端框架。它不仅包含了基本的Zookeeper操作,还提供了一套完整的解决方案,包括分布式锁、分布式...
4. **curator-framework-3.2.1.jar, curator-recipes-3.2.1.jar, curator-client-3.2.1.jar**:这些是Apache Curator库的不同组件,Curator是Facebook贡献的一个Zookeeper客户端,简化了与Zookeeper交互的复杂性。...
Curator是Apache开源的一个Zookeeper客户端连接和操作的组件,Curator框架在Zookeeper原生API接口上进行二次包装。提供ZooKeeper各种应用场景:比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列...