`
sheungxin
  • 浏览: 105508 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ZooKeeper客户端框架Curator

阅读更多
  • Curator介绍
关于Curator不多介绍,网上很多,可以参考这篇:
引用

  • Curator工具类
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import com.zgjky.hec.common.constants.Constants;


/**
 * 客户端连接zookeeper的工具类 通过Curator 客户端框架实现
 * @author sheungxin
 *
 */
public class CuratorClient {
	private CuratorFramework curatorClient;
	private String namespace;
	private String connStr;
	private int retryInterval;
	private int maxRetries;
	private int connTimeoutMs;
	private int sessionTimeoutMs;
	
	public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries,int connTimeoutMs,int sessionTimeoutMs){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=retryInterval;
		this.maxRetries=maxRetries;
		this.connTimeoutMs=connTimeoutMs;
		this.sessionTimeoutMs=sessionTimeoutMs;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=retryInterval;
		this.maxRetries=maxRetries;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}

	public CuratorClient(String namespace,String connStr){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=Constants.ZK_RETRY_INTERVAL;
		this.maxRetries=Constants.ZK_MAX_RETRIES;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	public CuratorClient(String connStr){
		this.namespace=Constants.ZK_NAMESPACE;
		this.connStr=connStr;
		this.retryInterval=Constants.ZK_RETRY_INTERVAL;
		this.maxRetries=Constants.ZK_MAX_RETRIES;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	/**
	 * 创建到zookeeper的连接
	 * @param connectionString zookeeper服务器连接字符串 格式: ip/域名:port  127.0.0.1:2181
	 * @param retryInterval 重试间隔
	 * @param maxRetries 重试次数
	 * @param connectionTimeoutMs 连接timeout时间
	 * @param sessionTimeoutMs session 失效时间
	 * @return
	 */
    public void connect(String namespace,String connStr, int retryInterval,int maxRetries, int connTimeoutMs,int sessionTimeoutMs){
        ACLProvider aclProvider=new ACLProvider() {
			private List<ACL> aclList;
			
			@Override
			public List<ACL> getDefaultAcl() {
				if(aclList==null){
					aclList=ZooDefs.Ids.CREATOR_ALL_ACL;
					aclList.clear();
					aclList.add(new ACL(Perms.ALL, new Id("auth",Constants.ZK_AUTH)));
				}
				return aclList;
			}
			
			@Override
			public List<ACL> getAclForPath(String arg0) {
				return aclList;
			}
		};
    	curatorClient = CuratorFrameworkFactory.builder()
        	.aclProvider(aclProvider)
        	.authorization("digest", Constants.ZK_AUTH.getBytes())
        	.namespace(namespace)
            .connectString(connStr)
            .retryPolicy(new ExponentialBackoffRetry(retryInterval, maxRetries))
            .connectionTimeoutMs(connTimeoutMs)
            .sessionTimeoutMs(sessionTimeoutMs)
            .build();
    	start();
    }
    
    /**
     * 添加连接状态监听器
     * @param listener
     */
    public void addConnListener(ConnectionStateListener listener){
    	curatorClient.getConnectionStateListenable().addListener(listener);
    }
    
    public void start(){
    	if(curatorClient.getState()!=CuratorFrameworkState.STARTED){
    		curatorClient.start();
    	}
    }
    
    public void disconnect(){
    	CloseableUtils.closeQuietly(curatorClient);
    }
    
    public CuratorFrameworkState getState(){
    	return curatorClient.getState();
    }
    
    /**
     * 创建全局可见的持久化节点
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void createNodes(String path, byte[] payload) throws Exception{
    	curatorClient.create().creatingParentsIfNeeded().forPath(path, payload);
    }
    
    /**
     * 创建全局可见的持久化节点
     * @param path 节点路径
     * @throws Exception
     */
    public void createNodes(String path) throws Exception{
    	curatorClient.create().creatingParentsIfNeeded().forPath(path);
    }
    
    /**
     * 创建全局可见的临时节点
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void createEphemeral(String path, byte[] payload) throws Exception{    	 
    	curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
    }
    
    /**
     *  创建全局可见的临时节点
     * @param path
     * @throws Exception
     */
    public String createEphemeral(String path) throws Exception{    	 
    	return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
    }
    /**
     * 临时顺序节点
     * @param path
     * @return
     * @throws Exception
     */
    public String createEphemeralSeq(String path) throws Exception{    	 
    	return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    /**
     * 写入节点数据
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void setData(String path, byte[] payload) throws Exception{    	
    	curatorClient.setData().forPath(path,payload);
    }
    
    /**
     * 获取节点数据
     * @param path 节点路径
     * @return
     * @throws Exception
     */
    public String getData(String path) throws Exception{
    	String result=null;
    	if(isNodeExists(path)){
    		 result=new String(curatorClient.getData().forPath(path),Constants.ZK_CHARSET);
    	}
        return result;
    }  
    
    /**
     * 获取节点数据
     * @param path 节点路径
     * @return
     * @throws Exception
     */
    public List<String> getChildData(String path) throws Exception{
    	List<String> resultList=null;
    	if(isNodeExists(path)){
    		resultList=curatorClient.getChildren().forPath(path);
    	}
        return resultList;
    } 
    
    /**
     * 判断节点是否存在
     * @param path
     * @return
     */
    public boolean isNodeExists(String path){
    	boolean flag=false;
    	try {
    		flag=curatorClient.checkExists().forPath(path)!=null?true:false;
		} catch (Exception e) {
			e.printStackTrace();
		}
    	return flag;
    }
    
    /**
     * 删除指定节点
     * @param path
     * @throws Exception
     */
    public void delNode(String path) throws Exception{
    	curatorClient.delete().guaranteed().inBackground().forPath(path);
    }
    
    /**
     * 监控子节点的变化:添加、修改、删除
     * @param path
     * @param childCacheListener
     * @return 
     * @throws Exception
     */
    public PathChildrenCache pathChildrenCache(String path,PathChildrenCacheListener childCacheListener) throws Exception {
        final PathChildrenCache childCache = new PathChildrenCache(curatorClient, path, true);
        childCache.getListenable().addListener(childCacheListener);
        return childCache;
    }
    
    /**
     * 监控节点的变化:增加、修改
     * @param path
     * @param nodeCacheListener
     * @return
     */
    public NodeCache nodeCache(String path,NodeCacheListener nodeCacheListener) {
        final NodeCache nodeCache = new NodeCache(curatorClient, path);
        nodeCache.getListenable().addListener(nodeCacheListener);
        return nodeCache;
    }
 
    /**
     * 监控节点及子节点的变化:增加、修改、删除
     * @param path
     * @param treeCacheListener
     * @return
     */
    public TreeCache nodeCache(String path,TreeCacheListener treeCacheListener) {
        final TreeCache treeCache = new TreeCache(curatorClient, path);
        treeCache.getListenable().addListener(treeCacheListener);
        return treeCache;
    }
    
    public void addCuratorListener(CuratorListener curatorListener){
    	curatorClient.getCuratorListenable().addListener(curatorListener);
    }
    
}

1
0
分享到:
评论

相关推荐

    Zookeeper开源客户端框架Curator简介与示例

    Curator 是一个基于 ZooKeeper 的开源客户端框架,它为 ZooKeeper 提供了高级抽象和功能,使得开发人员能够更方便地使用 ZooKeeper。 **Curator 框架概述** Curator 包含多个模块,如 ZooKeeper 客户端连接管理、...

    zookeeper开源客户端Curator

    Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...

    浅谈Zookeeper开源客户端框架Curator

    浅谈Zookeeper开源客户端框架Curator Curator是一个基于Zookeeper的开源客户端框架,由Netflix开发,旨在解决Zookeeper客户端使用过程中的各种问题。Curator提供了封装ZooKeeper client与ZooKeeper server之间的...

    zookeeper学习之三(Curator客户端)

    Curator客户端是Apache Curator框架的一部分,该框架为开发人员提供了一系列高级API和工具,用于处理常见的ZooKeeper用例,降低了使用ZooKeeper的复杂性。以下是对Curator客户端及其主要特性的详细阐述: 1. **连接...

    zookeeper客户端与服务器管理包

    一、Apache Curator:Zookeeper客户端的利器 Apache Curator是Apache软件基金会的一个项目,它为Zookeeper提供了丰富的Java客户端库,简化了与Zookeeper交互的复杂性。在`apache-curator-2.5.0-source-release.zip`...

    项目加入zookeeper的依赖包(Curator框架)

    而Curator是Facebook开源的一个基于Zookeeper的客户端框架,它对Zookeeper的API进行了封装,提供了更高级别的抽象,使得开发者能够更方便地与Zookeeper进行交互。 标题提到的“项目加入Zookeeper的依赖包(Curator...

    5、zookeeper的java -Curator(服务注册与发现)

    在Java环境中,Curator是一个优秀的Zookeeper客户端库,简化了与Zookeeper的交互,包括服务注册与发现。本文将深入探讨如何利用Curator实现这一功能。 首先,Curator提供了一套完整的API来抽象服务注册与发现,包括...

    Zookeeper 原生api zkClient Curator操作

    ZkClient是一个轻量级的Zookeeper客户端,它对原生API进行了封装,使得操作更加简单易用。主要特点包括: 1. **更友好的API**:ZkClient提供了一些简洁的接口,如`createEphemeral()`、`readData()`、`writeData()`...

    zookeeper 使用 Curator 示例监听、分布式锁

    Curator 是 Apache ZooKeeper 的一个客户端库,提供了丰富的工具和模式,简化了 ZooKeeper 的使用。本示例将详细介绍如何利用 Curator 在 ZooKeeper 上进行数据操作以及实现分布式锁。 一、ZooKeeper 与 Curator 的...

    VIP-02 Zookeeper客户端使用与集群特性(1)

    Curator是Netflix公司开源的一个针对ZooKeeper的高级Java客户端框架。它不仅简化了ZooKeeper的使用,还提供了丰富的抽象和工具类,使得开发者能够更加专注于业务逻辑而不用过多关注ZooKeeper的具体实现细节。Curator...

    curator zookeeper 3.4.6 2.9.1

    而Curator则是Zookeeper的一个客户端库,为开发者提供了更高级别的抽象和工具,简化了Zookeeper的使用。 Zookeeper 3.4.6是其稳定且广泛采用的一个版本,它提供了丰富的API和强大的一致性模型。在这个版本中,...

    ZooKeeper-Curator:zookeeper的curator客户端

    ZooKeeper-Curator是Apache ZooKeeper的一个高级客户端,它提供了许多实用工具和抽象,使得与ZooKeeper交互变得更加简单和可靠。ZooKeeper是一个分布式协调服务,广泛用于管理分布式应用程序的状态,实现一致性、...

    zookeeper Java api - curator 5.6.0

    Apache Curator 是一个高度封装的 ZooKeeper Java 客户端库,它简化了与 ZooKeeper 交互的复杂性,提供了更高级别的抽象和实用工具。ZooKeeper 是一个分布式的,开放源码的协调服务,用于分布式应用程序,提供命名...

    zookeeper_demo maven项目:包含原生API、zkclient、Curator操作

    Curator是Apache的一个顶级项目,它是对Zookeeper客户端API的进一步封装,提供了更高的抽象层次和更多的实用工具,如recipes(配方),如分布式锁、领导选举、队列和命名服务等。Curator的出现降低了使用Zookeeper...

    zookeeper实战

    Curator是Netflix开源的ZooKeeper客户端框架,它提供了一套高级API来简化ZooKeeper客户端的使用。Curator封装了许多常见的操作,使得开发者可以更加方便地利用ZooKeeper的特性,无需深入底层细节。Curator的常用API...

    深入探索Zookeeper:从客户端使用到集群特性的全面指南

    接着,创建Zookeeper客户端实例是客户端应用的第一步。通常,我们会使用`ZooKeeper`构造函数,指定连接字符串(包含Zookeeper服务器地址和端口)、会话超时时间及一个Watcher对象,用于监听连接状态: ```java ...

    curator_zookeeper需要的jar

    1. `curator-client-2.8.0.jar`:这是Curator的基本客户端库,提供了连接Zookeeper服务器、执行基本操作(如创建、删除节点)的类和方法。例如,`org.apache.curator.framework.CuratorFramework`是Curator的主要...

    curator-client

    Apache Curator是Facebook开源的Zookeeper客户端,其设计目标是提供一个更加易用、功能强大且稳定的Zookeeper客户端框架。它不仅包含了基本的Zookeeper操作,还提供了一套完整的解决方案,包括分布式锁、分布式...

    zookeeper的jar包

    4. **curator-framework-3.2.1.jar, curator-recipes-3.2.1.jar, curator-client-3.2.1.jar**:这些是Apache Curator库的不同组件,Curator是Facebook贡献的一个Zookeeper客户端,简化了与Zookeeper交互的复杂性。...

    springboot整合Zookeeper组件,管理架构中服务协调

    Curator是Apache开源的一个Zookeeper客户端连接和操作的组件,Curator框架在Zookeeper原生API接口上进行二次包装。提供ZooKeeper各种应用场景:比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列...

Global site tag (gtag.js) - Google Analytics