Curator框架是最好用,最流行的zookeeper的客户端。
它有以下三个优点
1.提供了一套非常友好的操作API;
2. 提供一些高级特性(包括但不仅限于前篇文章中提到的)的封装
3.易测试
maven依赖如下
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.5.0</version> </dependency>
按照官方给出的文档和包结构,可以轻松的看出Curator功能分两大类,一是对zookeeper的一些基本命令的封装,比如增删改查。是他的framework模块,一个是他的高级特性,即recipes模块。
一、framework模块
Curator提供了一套Fluent风格的操作API。这在很多脚本类语言里比较流行。
比如他创建client的代码是这样
CuratorFramework client = builder.connectString("192.168.11.56:2180") .sessionTimeoutMs(30000) .connectionTimeoutMs(30000) .canBeReadOnly(false) .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)) .namespace(namespace) .defaultData(null) .build(); client.start();
增删改查的
public class CrudExamples { private static CuratorFramework client = ClientFactory.newClient(); private static final String PATH = "/crud"; public static void main(String[] args) { try { client.start(); client.create().forPath(PATH, "I love messi".getBytes()); byte[] bs = client.getData().forPath(PATH); System.out.println("新建的节点,data为:" + new String(bs)); client.setData().forPath(PATH, "I love football".getBytes()); // 由于是在background模式下获取的data,此时的bs可能为null byte[] bs2 = client.getData().watched().inBackground().forPath(PATH); System.out.println("修改后的data为" + new String(bs2 != null ? bs2 : new byte[0])); client.delete().forPath(PATH); Stat stat = client.checkExists().forPath(PATH); // Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在! System.out.println(stat); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } } }
此外,Curator还支持事务,一组crud操作同生同灭。代码如下
/** * 事务操作 * * @author shencl */ public class TransactionExamples { private static CuratorFramework client = ClientFactory.newClient(); public static void main(String[] args) { try { client.start(); // 开启事务 CuratorTransaction transaction = client.inTransaction(); Collection<CuratorTransactionResult> results = transaction.create() .forPath("/a/path", "some data".getBytes()).and().setData() .forPath("/another/path", "other data".getBytes()).and().delete().forPath("/yet/another/path") .and().commit(); for (CuratorTransactionResult result : results) { System.out.println(result.getForPath() + " - " + result.getType()); } } catch (Exception e) { e.printStackTrace(); } finally { // 释放客户端连接 CloseableUtils.closeQuietly(client); } } }
这段的代码的运行结果,由于最后一步delete的节点不存在,所以整个事务commit失败。失败的原因会放在Collection<CuratorTransactionResult>中,非常友好。
好了framework部分的内容就这么多,是不是特别简单呢。下面就来看看recipes包的内容吧。。
Recipes部分提供的功能官网列的很详细,点击这里。注意文章第一段:Curator宣称,Recipes模块实现了除二阶段提交之外的所有zookeeper特性。
二、Recipes模块
主要有
Elections(选举),Locks(锁),Barriers(关卡),Atomic(原子量),Caches,Queues等
1、 Elections
选举主要依赖于LeaderSelector和LeaderLatch2个类。前者是所有存活的客户端不间断的轮流做Leader,大同社会。后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。某党?
这两者在实现上是可以切换的,直接上代码,怎么切换注释里有。由于篇幅所限,这里仅贴出基于LeaderSelector的选举,更多代码见附件
/** * 本类基于leaderSelector实现,所有存活的client会公平的轮流做leader * 如果不想频繁的变化Leader,需要在takeLeadership方法里阻塞leader的变更! 或者使用 {@link} * LeaderLatchClient */ public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable { private final String name; private final LeaderSelector leaderSelector; private final String PATH = "/leaderselector"; public LeaderSelectorClient(CuratorFramework client, String name) { this.name = name; leaderSelector = new LeaderSelector(client, PATH, this); leaderSelector.autoRequeue(); } public void start() throws IOException { leaderSelector.start(); } @Override public void close() throws IOException { leaderSelector.close(); } /** * client成为leader后,会调用此方法 */ @Override public void takeLeadership(CuratorFramework client) throws Exception { int waitSeconds = (int) (5 * Math.random()) + 1; System.out.println(name + "是当前的leader"); try { Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { System.out.println(name + " 让出领导权\n"); } }
/** * leader选举 * * @author shencl */ public class LeaderSelectorExample { public static void main(String[] args) { List<CuratorFramework> clients = Lists.newArrayList(); List<LeaderSelectorClient> examples = Lists.newArrayList(); try { for (int i = 0; i < 10; i++) { CuratorFramework client = ClientFactory.newClient(); LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i); clients.add(client); examples.add(example); client.start(); example.start(); } System.out.println("----------先观察一会选举的结果-----------"); Thread.sleep(10000); System.out.println("----------关闭前5个客户端,再观察选举的结果-----------"); for (int i = 0; i < 5; i++) { clients.get(i).close(); } // 这里有个小技巧,让main程序一直监听控制台输入,异步的代码就可以一直在执行。不同于while(ture)的是,按回车或esc可退出 new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (Exception e) { e.printStackTrace(); } finally { for (LeaderSelectorClient exampleClient : examples) { CloseableUtils.closeQuietly(exampleClient); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } } } }
2、locks
curator lock相关的实现在recipes.locks包里。顶级接口都是InterProcessLock。我们直接看最有代表性的InterProcessReadWriteLock 进程内部读写锁(可重入读写锁)。什么叫可重入,什么叫读写锁。不清楚的先查好资料吧。总之读写锁一定是成对出现的。 简易传送门
我们先定义两个任务,可并行的执行的,和互斥执行的。
/** * 并行任务 * * @author shencl */ public class ParallelJob implements Runnable { private final String name; private final InterProcessLock lock; // 锁等待时间 private final int wait_time = 5; ParallelJob(String name, InterProcessLock lock) { this.name = name; this.lock = lock; } @Override public void run() { try { doWork(); } catch (Exception e) { // ingore; } } public void doWork() throws Exception { try { if (!lock.acquire(wait_time, TimeUnit.SECONDS)) { System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。"); } // 模拟job执行时间0-4000毫秒 int exeTime = new Random().nextInt(4000); System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------"); Thread.sleep(exeTime); } catch (Exception e) { e.printStackTrace(); } finally { lock.release(); } } }
/** * 互斥任务 * * @author shencl */ public class MutexJob implements Runnable { private final String name; private final InterProcessLock lock; // 锁等待时间 private final int wait_time = 10; MutexJob(String name, InterProcessLock lock) { this.name = name; this.lock = lock; } @Override public void run() { try { doWork(); } catch (Exception e) { // ingore; } } public void doWork() throws Exception { try { if (!lock.acquire(wait_time, TimeUnit.SECONDS)) { System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。"); } // 模拟job执行时间0-2000毫秒 int exeTime = new Random().nextInt(2000); System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------"); Thread.sleep(exeTime); } catch (Exception e) { e.printStackTrace(); } finally { lock.release(); } } }
/** * 分布式锁实例 * * @author shencl */ public class DistributedLockExample { private static CuratorFramework client = ClientFactory.newClient(); private static final String PATH = "/locks"; // 进程内部(可重入)读写锁 private static final InterProcessReadWriteLock lock; // 读锁 private static final InterProcessLock readLock; // 写锁 private static final InterProcessLock writeLock; static { client.start(); lock = new InterProcessReadWriteLock(client, PATH); readLock = lock.readLock(); writeLock = lock.writeLock(); } public static void main(String[] args) { try { List<Thread> jobs = Lists.newArrayList(); for (int i = 0; i < 10; i++) { Thread t = new Thread(new ParallelJob("Parallel任务" + i, readLock)); jobs.add(t); } for (int i = 0; i < 10; i++) { Thread t = new Thread(new MutexJob("Mutex任务" + i, writeLock)); jobs.add(t); } for (Thread t : jobs) { t.start(); } } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } } }
三、测试方法
curator提供了很好的测试工具,你甚至是可以在完全没有搭建zookeeper server端的情况下,完成测试。
有2个重要的类
TestingServer 模拟单点, TestingCluster模拟集群。
需要使用的话,得依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>2.5.0</version> </dependency>
附件:http://dl2.iteye.com/upload/attachment/0099/3974/33947bab-1b45-3593-8c30-616d66194a9d.rar
相关推荐
Curator客户端是Apache Curator框架的一部分,该框架为开发人员提供了一系列高级API和工具,用于处理常见的ZooKeeper用例,降低了使用ZooKeeper的复杂性。以下是对Curator客户端及其主要特性的详细阐述: 1. **连接...
ZooKeeper-Curator是Apache ZooKeeper的一个高级客户端,它提供了许多实用工具和抽象,使得与ZooKeeper交互变得更加简单和可靠。ZooKeeper是一个分布式协调服务,广泛用于管理分布式应用程序的状态,实现一致性、...
Zookeeper_Java客户端Curator
《Zookeeper两种客户端实战解析》 Zookeeper,作为分布式协调...以上就是关于Zookeeper的ZkClient和Curator客户端的介绍,希望对初学者和进阶者都有所启发。通过实践,相信你能更好地驾驭Zookeeper这个强大的工具。
创建Curator客户端的基本步骤是初始化`CuratorFrameworkFactory`,设置连接字符串、会话超时时间以及重试策略,例如: ```java ZooKeeperClientConfig clientConfig = new ZooKeeperClientConfig(); client...
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行...这将有助于你掌握Curator客户端的使用,并为实际项目中应用ZooKeeper打下基础。
ZooKeeper 是一个分布式服务协调框架,常用于构建高可用的分布式系统。Curator 是 Apache ZooKeeper 的一个客户端库,提供了丰富的工具...通过学习这个示例,你可以深入理解 ZooKeeper 和 Curator 在实际应用中的用法。
Curator 是一个基于 ZooKeeper 的开源客户端框架,它为 ZooKeeper 提供了高级抽象和功能,使得开发人员能够更方便地使用 ZooKeeper。 **Curator 框架概述** Curator 包含多个模块,如 ZooKeeper 客户端连接管理、...
Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...
而Curator是Facebook贡献的Zookeeper客户端库,它对Zookeeper的API进行了封装,提供了更高级别的抽象,降低了开发人员使用Zookeeper的难度。 要将SpringBoot与Zookeeper Curator整合,首先你需要在你的项目中添加...
1. `curator-client-4.0.1.jar`: 这是Curator客户端的核心库,包含了与Zookeeper服务器通信的基本组件。 2. `guava-18.0.jar`: Guava是Google的一个Java工具包,提供了很多实用的类和函数,Curator可能依赖于Guava的...
本文将详细阐述Zookeeper的核心功能、客户端应用以及集群管理,特别关注如何使用官方Java客户端和Apache Curator框架。 首先,要使用Zookeeper的Java客户端,我们需要在项目中添加相应的依赖。如文中所示,可以引入...
而Curator则是Zookeeper的一个客户端库,为开发者提供了更高级别的抽象和工具,简化了Zookeeper的使用。 Zookeeper 3.4.6是其稳定且广泛采用的一个版本,它提供了丰富的API和强大的一致性模型。在这个版本中,...
在Java环境中,Curator是一个优秀的Zookeeper客户端库,简化了与Zookeeper的交互,包括服务注册与发现。本文将深入探讨如何利用Curator实现这一功能。 首先,Curator提供了一套完整的API来抽象服务注册与发现,包括...
`curator-example` 是一个基于 Apache ZooKeeper 的 Java 客户端库 —— Apache Curator 的示例项目。这个项目旨在帮助开发者更好地理解和使用 Curator 提供的各种功能,以实现与 ZooKeeper 的高效交互。ZooKeeper ...
在本项目`zookeeper_demo`中,我们将深入探讨Zookeeper的三种主要客户端库——原生API、ZkClient和Curator的操作方式,以及它们在集群配置和分布式锁等实际场景中的应用。 1. **Zookeeper原生API** Zookeeper原生...
浅谈Zookeeper开源客户端框架Curator Curator是一个基于Zookeeper的开源客户端框架,由Netflix开发,旨在解决Zookeeper客户端使用过程中的各种问题。Curator提供了封装ZooKeeper client与ZooKeeper server之间的...
1. `curator-client-2.8.0.jar`:这是Curator的基本客户端库,提供了连接Zookeeper服务器、执行基本操作(如创建、删除节点)的类和方法。例如,`org.apache.curator.framework.CuratorFramework`是Curator的主要...
客户端是Curator Framework,是Apache的项目,它主要的功能是为ZK的客户端使用提供了高可用的封装。在Curator Framework基础上封装的curator-recipes,实现了很多经典场景。比如:集群管理(Leader选举)、共享锁、...
ZkClient是一个轻量级的Zookeeper客户端,它对原生API进行了封装,使得操作更加简单易用。主要特点包括: 1. **更友好的API**:ZkClient提供了一些简洁的接口,如`createEphemeral()`、`readData()`、`writeData()`...