`
m635674608
  • 浏览: 5043165 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Zookeeper开源客户端框架Curator简介与示例

 
阅读更多

简介

        Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用ZooKeeper.

        所谓ZooKeeper技巧(ZooKeeper Recipes),也可以称之为解决方案, 或者叫实现方案, 是指ZooKeeper的使用方法, 比如分布式的配置管理, Leader选举等

        Curator作为Apache ZooKeeper天生配套的组件。ZooKeeper的Java开发者自然而然的会选择它在项目中使用。

        官网链接:http://curator.apache.org/

提供的功能组件

  1. Framework 提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制

  2. Client是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法

  3. Recipes实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上

  4. Utilities各种工具类

  5. Errors异常处理, 连接, 恢复等.

  6. Extensionscurator-recipes包实现了通用的技巧,这些技巧在ZooKeeper文档中有介绍。为了避免是这个包(package)变得巨大, recipes/applications将会放入一个独立的extension包下。并使用命名规则curator-x-name.

        Curator 编译好的类库被发布到Maven Center中。Curator包含几个artifact. 你可以根据你的需要在你的项目中加入相应的依赖。对于大多数开发者来说,引入curator-recipes这一个就足够了

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.6.0</version>
</dependency> 
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.6.0</version>
</dependency>

 

代码示例

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;


/**
 * DateTime: 2015年1月9日 上午9:14:08
 *
 */
public class CuratorTest {
    private static String zkAddress = "hadoop02:2181,hadoop03:2181,hadoop04:2181";


    public static void main(String[] args) throws Exception {
        CuratorUtil curator = new CuratorUtil(zkAddress);
        curator.createNode("/zkroot/test1", "你好abc11");
        curator.createNode("/zkroot/test2", "你好abc22");
        curator.updateNode("/zkroot/test2", "你好abc22");
        List<String> list = curator.listChildren("/zkroot");
        Map<String, String> map = curator.listChildrenDetail("/zkroot");
        // curator.deleteNode("/zkroot");
        // curator.destory();
        System.out.println("=========================================");
        for (String str : list) {
            System.out.println(str);
        }

        System.out.println("=========================================");
        for (Map.Entry<String, String> entry : map.entrySet()) {
            System.out.println(entry.getKey() + "=>" + entry.getValue());
        }

        // 增加监听
        curator.addWatch("/zkroot", false);

        TimeUnit.SECONDS.sleep(600);
    }

}


class CuratorUtil {
    private CuratorFramework client;


    public CuratorUtil(String zkAddress) {
        client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
        client.getCuratorListenable().addListener(new NodeEventListener());
        client.start();
    }


    /**
     * 创建node
     * 
     * @param nodeName
     * @param value
     * @return
     */
    public boolean createNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat == null) {
                String opResult = null;
                if (Strings.isNullOrEmpty(value)) {
                    opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
                }
                else {
                    opResult =
                            getClient().create().creatingParentsIfNeeded()
                                .forPath(nodeName, value.getBytes(Charsets.UTF_8));
                }
                suc = Objects.equal(nodeName, opResult);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }


    /**
     * 更新节点
     * 
     * @param nodeName
     * @param value
     * @return
     */
    public boolean updateNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat != null) {
                Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
                suc = opResult != null;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }


    /**
     * 删除节点
     * 
     * @param nodeName
     */
    public void deleteNode(String nodeName) {
        try {
            getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 找到指定节点下所有子节点的名称与值
     * 
     * @param node
     * @return
     */
    public Map<String, String> listChildrenDetail(String node) {
        Map<String, String> map = Maps.newHashMap();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            List<String> children = childrenBuilder.forPath(node);
            GetDataBuilder dataBuilder = getClient().getData();
            if (children != null) {
                for (String child : children) {
                    String propPath = ZKPaths.makePath(node, child);
                    map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return map;
    }


    /**
     * 列出子节点的名称
     * 
     * @param node
     * @return
     */
    public List<String> listChildren(String node) {
        List<String> children = Lists.newArrayList();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            children = childrenBuilder.forPath(node);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return children;
    }


    /**
     * 增加监听
     * 
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf) throws Exception {
        if (isSelf) {
            getClient().getData().watched().forPath(node);
        }
        else {
            getClient().getChildren().watched().forPath(node);
        }
    }


    /**
     * 增加监听
     * 
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, Watcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }


    /**
     * 增加监听
     * 
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, CuratorWatcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }


    /**
     * 销毁资源
     */
    public void destory() {
        if (client != null) {
            client.close();
        }
    }


    /**
     * 获取client
     * 
     * @return
     */
    public CuratorFramework getClient() {
        return client;
    }

}


// 监听器
final class NodeEventListener implements CuratorListener {
    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
        System.out.println(event.toString() + ".......................");
        final WatchedEvent watchedEvent = event.getWatchedEvent();
        if (watchedEvent != null) {
            System.out.println(watchedEvent.getState() + "=======================" + watchedEvent.getType());
            if (watchedEvent.getState() == KeeperState.SyncConnected) {
                switch (watchedEvent.getType()) {
                case NodeChildrenChanged:
                    // TODO
                    break;
                case NodeDataChanged:
                    // TODO
                    break;
                default:
                    break;
                }
            }
        }
    }

http://my.oschina.net/cloudcoder/blog?disp=2&p=1&catalog=418649

http://supben.iteye.com/blog/2094077

 

 

package com.cyyun;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;

public class CrudExamples {

	private static CuratorFramework client = get();
	private static final String PATH = "/crud";

	public static void main(String[] args) {
		try {
			client.start();

			client.create().forPath(PATH, "I love messi".getBytes());

			byte[] bs = client.getData().forPath(PATH);
			System.out.println("新建的节点,data为:" + new String(bs));

			// 注册观察者,当节点变动时触发
			client.getData().usingWatcher(new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					System.out.println("node is changed");
				}
			}).inBackground().forPath(PATH);

			client.setData().forPath(PATH, "I love football".getBytes());

			// 注册观察者,当节点变动时触发
			client.getData().usingWatcher(new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					System.out.println("node is changed");
				}
			}).inBackground().forPath(PATH);

			client.setData().forPath(PATH, "I love foot2222ball".getBytes());

			System.out.println(new String(client.getData().forPath(PATH)));

			// 由于是在background模式下获取的data,此时的bs可能为null
			byte[] bs2 = client.getData().watched().inBackground()
					.forPath(PATH);
			System.out.println("修改后的data为"
					+ new String(bs2 != null ? bs2 : new byte[0]));

			client.delete().forPath(PATH);
			Stat stat = client.checkExists().forPath(PATH);

			// Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在!
			System.out.println(stat);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			CloseableUtils.closeQuietly(client);
		}
	}

	public static CuratorFramework get() {
		String zookeeperConnectionString = "127.0.0.1:4180,127.0.0.1:5180,127.0.0.1:6180";

		CuratorFramework client = CuratorFrameworkFactory
				.builder()
				.connectString(zookeeperConnectionString)
				.sessionTimeoutMs(30000)
				.connectionTimeoutMs(30000)
				.canBeReadOnly(false)
				.retryPolicy(
						new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
				.namespace("zk").defaultData(null).build();
		return client;
	}

}

 

分享到:
评论

相关推荐

    zookeeper客户端curator操作示例

    而Apache Curator是Facebook开源的一个ZooKeeper客户端库,它提供了更高级别的API,简化了ZooKeeper的使用,并增加了诸如连接管理、重试策略、事务操作等功能。 在Java开发中,Curator是使用ZooKeeper最常用且推荐...

    zookeeper示例代码。

    6. **Curator应用**:Curator是Facebook开源的ZooKeeper客户端库,它封装了ZooKeeper的操作,提供了更高级的API,如连接管理、故障恢复、分布式锁、队列等。使用Curator可以使ZooKeeper的使用更加便捷和稳定。 7. *...

    VIP-02 Zookeeper客户端使用与集群特性(1)

    Curator是Netflix公司开源的一个针对ZooKeeper的高级Java客户端框架。它不仅简化了ZooKeeper的使用,还提供了丰富的抽象和工具类,使得开发者能够更加专注于业务逻辑而不用过多关注ZooKeeper的具体实现细节。Curator...

    4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁)

    这篇文章主要介绍了Zookeeper的三种Java客户端:原生Java API、ZkClient以及Apache Curator,并着重讨论了Curator的特性与优势。 一、Zookeeper原生Java API Zookeeper的原生Java API提供了基本的CRUD(创建、读取...

    springboot-zookeeper-curator.rar

    SpringBoot与Zookeeper Curator整合详解 在分布式系统中,协调服务是至关重要的,Apache ZooKeeper和Curator作为其中的佼佼者,被广泛应用于配置管理、服务发现、分布式锁等场景。本教程将深入讲解如何在SpringBoot...

    zookeeper实践操作下载指南 1

    Curator是Facebook开源的一个Zookeeper客户端库,它提供了更高级别的抽象和实用工具,简化了Zookeeper的使用。例如,Curator提供了分布式锁、领导选举、配置管理等功能,使得开发人员能够更方便地构建可靠的分布式...

    基于zookeeper的分布式锁实现demo

    在示例代码中,我们看到使用了Curator框架,它是一个开源的Zookeeper客户端库,封装了Zookeeper的复杂操作,简化了Zookeeper编程。Curator提供了现成的分布式锁实现,如`InterProcessMutex`和`...

    使用curator实现zookeeper锁服务的示例分享

    Curator是Netflix开源的一个用于简化Zookeeper使用的Java客户端库,它提供了很多高级功能,包括分布式锁、事件监听、配方等。在这个示例中,我们将使用Curator的`InterProcessMutex`类来实现分布式互斥锁(Mutex)。...

    springcloud集成zookeeper的方法示例

    Curator 是 Netflix 公司开源的一个 ZooKeeper 客户端,与 ZooKeeper 提供的原生客户端相比,Curator 的抽象层次更高,简化了 ZooKeeper 客户端编程。 最后,我们可以使用 CuratorFramework 来访问和操作 ZooKeeper...

    zookeeper ui界面源码(github)

    这些API可能会封装ZooKeeper的命令行工具,如`zkCli.sh`,或者直接使用Java客户端库(zkclient或curator)与ZooKeeper服务器交互。 3. **数据可视化**:UI界面需要展示ZooKeeper集群的状态,如节点结构、节点数据、...

    zk实现的分布式锁相关代码

    首先,我们通过CuratorFrameworkFactory创建了一个CuratorFramework实例,这是与ZooKeeper交互的主要对象。在创建CuratorFramework实例时,我们使用了RetryNTimes策略,这意味着在遇到连接问题时,客户端会尝试重新...

    workflow:基于ZooKeeper和Curator的分布式工作流管理库,可启用分布式任务工作流

    Curator是LinkedIn贡献的一个ZooKeeper客户端库,它为Java开发者提供了更高级别的抽象,简化了与ZooKeeper的交互。Curator包括一系列的工具、策略和拦截器,用于管理ZooKeeper的数据和会话,从而降低了开发复杂性的...

    注册管理(zk基本运用)

    6. **实战应用**:在实际开发中,我们可以通过Apache Curator、Java原生API或者其他语言的Zookeeper客户端库来操作Zookeeper。例如,使用Curator的`ServiceDiscovery`接口可以简化服务注册和发现的流程。 总结,...

    DubboxTest.zip

    5. `zkclient`或`curator`目录:Zookeeper客户端库,用于与Zookeeper交互。 通过这个项目,开发者可以学习如何配置SpringBoot应用,将Dubbo服务暴露出去,以及如何编写服务消费者来调用这些服务。同时,Zookeeper的...

    java技术面试------项目.md

    - **Dubbo**:阿里巴巴开源的一款高性能、轻量级的微服务框架。 - **Spring Cloud**:基于Spring Boot实现的一套微服务云应用及管理框架。 #### 二、Dubbo开发流程 Dubbo是一个分布式服务框架,它能够帮助开发者...

    SpringBoot(2.1.6) + dubbo(apache 2.7.5) + zookeeper 集成并测试

    在本示例中,我们将探讨如何将SpringBoot 2.1.6与Apache Dubbo 2.7.5以及Zookeeper集成,以便构建一个服务提供者(Provider)和服务消费者(Consumer)。首先,我们需要理解这三个核心组件的角色: 1. **SpringBoot...

    zk:redis分布式锁.zip

    2. **框架方式**:除了直接使用Zookeeper API,还可以借助于像Curator这样的框架,它提供了更高级别的抽象和便利的功能,如Lock接口,简化了分布式锁的实现,降低了开发难度。 接下来是Redis的分布式锁。Redis是一...

    1000道 互联网Java工程师面试题 485页_PDF密码解除.pdf

    - **会话:**客户端与ZooKeeper服务器之间的连接称为会话。 - **心跳检测:**用于检测会话的有效性。 **12. 服务器角色** - **Leader:**负责接收数据更新请求。 - **Follower:**复制Leader的数据。 - **Observer...

Global site tag (gtag.js) - Google Analytics