`
supben
  • 浏览: 330909 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

zookeeper学习之三(Curator客户端)

 
阅读更多

Curator框架是最好用,最流行的zookeeper的客户端。

它有以下三个优点

1.提供了一套非常友好的操作API;

2. 提供一些高级特性(包括但不仅限于前篇文章中提到的)的封装

3.易测试

 

maven依赖如下

 

		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.5.0</version>
		</dependency>

 

 

按照官方给出的文档和包结构,可以轻松的看出Curator功能分两大类,一是对zookeeper的一些基本命令的封装,比如增删改查。是他的framework模块,一个是他的高级特性,即recipes模块。

 

一、framework模块

Curator提供了一套Fluent风格的操作API。这在很多脚本类语言里比较流行。

 

比如他创建client的代码是这样

CuratorFramework client = builder.connectString("192.168.11.56:2180")
		.sessionTimeoutMs(30000)
		.connectionTimeoutMs(30000)
		.canBeReadOnly(false)
		.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
		.namespace(namespace)
		.defaultData(null)
		.build();
client.start();

 一路点到底,这就是所谓的Fluent风格。 

 

我们再看增删改查的

public class CrudExamples {
	private static CuratorFramework client = ClientFactory.newClient();
	private static final String PATH = "/crud";

	public static void main(String[] args) {
		try {
			client.start();

			client.create().forPath(PATH, "I love messi".getBytes());

			byte[] bs = client.getData().forPath(PATH);
			System.out.println("新建的节点,data为:" + new String(bs));

			client.setData().forPath(PATH, "I love football".getBytes());

			// 由于是在background模式下获取的data,此时的bs可能为null
			byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);
			System.out.println("修改后的data为" + new String(bs2 != null ? bs2 : new byte[0]));

			client.delete().forPath(PATH);
			Stat stat = client.checkExists().forPath(PATH);

			// Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在!
			System.out.println(stat);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			CloseableUtils.closeQuietly(client);
		}
	}
}

 常用接口有

create()增

delete(): 删

checkExists(): 判断是否存在

setData():  改

getData(): 查

所有这些方法都以forpath()结尾,辅以watch(监听),withMode(指定模式),和inBackground(后台运行)等方法来使用。

 

 此外,Curator还支持事务,一组crud操作同生同灭。代码如下

/**
 * 事务操作
 * 
 * @author shencl
 */
public class TransactionExamples {
	private static CuratorFramework client = ClientFactory.newClient();

	public static void main(String[] args) {
		try {
			client.start();
			// 开启事务
			CuratorTransaction transaction = client.inTransaction();

			Collection<CuratorTransactionResult> results = transaction.create()
					.forPath("/a/path", "some data".getBytes()).and().setData()
					.forPath("/another/path", "other data".getBytes()).and().delete().forPath("/yet/another/path")
					.and().commit();

			for (CuratorTransactionResult result : results) {
				System.out.println(result.getForPath() + " - " + result.getType());
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 释放客户端连接
			CloseableUtils.closeQuietly(client);
		}

	}
}

 这段的代码的运行结果,由于最后一步delete的节点不存在,所以整个事务commit失败。失败的原因会放在Collection<CuratorTransactionResult>中,非常友好。

 

好了framework部分的内容就这么多,是不是特别简单呢。下面就来看看recipes包的内容吧。。

 

Recipes部分提供的功能官网列的很详细,点击这里。注意文章第一段:Curator宣称,Recipes模块实现了除二阶段提交之外的所有zookeeper特性。

 

二、Recipes模块

 

主要有

Elections(选举),Locks(锁),Barriers(关卡),Atomic(原子量),Caches,Queues等

 

1、 Elections

选举主要依赖于LeaderSelector和LeaderLatch2个类。前者是所有存活的客户端不间断的轮流做Leader,大同社会。后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。某党?

 

这两者在实现上是可以切换的,直接上代码,怎么切换注释里有。由于篇幅所限,这里仅贴出基于LeaderSelector的选举,更多代码见附件

/**
 * 本类基于leaderSelector实现,所有存活的client会公平的轮流做leader
 * 如果不想频繁的变化Leader,需要在takeLeadership方法里阻塞leader的变更! 或者使用 {@link}
 * LeaderLatchClient
 */
public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
	private final String name;
	private final LeaderSelector leaderSelector;
	private final String PATH = "/leaderselector";

	public LeaderSelectorClient(CuratorFramework client, String name) {
		this.name = name;
		leaderSelector = new LeaderSelector(client, PATH, this);
		leaderSelector.autoRequeue();
	}

	public void start() throws IOException {
		leaderSelector.start();
	}

	@Override
	public void close() throws IOException {
		leaderSelector.close();
	}

	/**
	 * client成为leader后,会调用此方法
	 */
	@Override
	public void takeLeadership(CuratorFramework client) throws Exception {
		int waitSeconds = (int) (5 * Math.random()) + 1;
		System.out.println(name + "是当前的leader");
		try {
			Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		} finally {
			System.out.println(name + " 让出领导权\n");
		}
	}

 

/**
 * leader选举
 * 
 * @author shencl
 */
public class LeaderSelectorExample {

	public static void main(String[] args) {

		List<CuratorFramework> clients = Lists.newArrayList();
		List<LeaderSelectorClient> examples = Lists.newArrayList();
		try {
			for (int i = 0; i < 10; i++) {
				CuratorFramework client = ClientFactory.newClient();
				LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);
				clients.add(client);
				examples.add(example);

				client.start();
				example.start();
			}

            System.out.println("----------先观察一会选举的结果-----------");
			Thread.sleep(10000);

			System.out.println("----------关闭前5个客户端,再观察选举的结果-----------");
			for (int i = 0; i < 5; i++) {
				clients.get(i).close();
			}

			// 这里有个小技巧,让main程序一直监听控制台输入,异步的代码就可以一直在执行。不同于while(ture)的是,按回车或esc可退出
			new BufferedReader(new InputStreamReader(System.in)).readLine();

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			for (LeaderSelectorClient exampleClient : examples) {
				CloseableUtils.closeQuietly(exampleClient);
			}
			for (CuratorFramework client : clients) {
				CloseableUtils.closeQuietly(client);
			}
		}
	}
}

 

2、locks

curator lock相关的实现在recipes.locks包里。顶级接口都是InterProcessLock。我们直接看最有代表性的InterProcessReadWriteLock 进程内部读写锁(可重入读写锁)。什么叫可重入,什么叫读写锁。不清楚的先查好资料吧。总之读写锁一定是成对出现的。    简易传送门

 

我们先定义两个任务,可并行的执行的,和互斥执行的。

/**
 * 并行任务
 * 
 * @author shencl
 */
public class ParallelJob implements Runnable {

	private final String name;

	private final InterProcessLock lock;

	// 锁等待时间
	private final int wait_time = 5;

	ParallelJob(String name, InterProcessLock lock) {
		this.name = name;
		this.lock = lock;
	}

	@Override
	public void run() {
		try {
			doWork();
		} catch (Exception e) {
			// ingore;
		}
	}

	public void doWork() throws Exception {
		try {
			if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
				System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。");
			}
			// 模拟job执行时间0-4000毫秒
			int exeTime = new Random().nextInt(4000);
			System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------");
			Thread.sleep(exeTime);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.release();
		}
	}
}

 

/**
 * 互斥任务
 * 
 * @author shencl
 */
public class MutexJob implements Runnable {

	private final String name;

	private final InterProcessLock lock;

	// 锁等待时间
	private final int wait_time = 10;

	MutexJob(String name, InterProcessLock lock) {
		this.name = name;
		this.lock = lock;
	}

	@Override
	public void run() {
		try {
			doWork();
		} catch (Exception e) {
			// ingore;
		}
	}

	public void doWork() throws Exception {
		try {
			if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
				System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。");
			}
			// 模拟job执行时间0-2000毫秒
			int exeTime = new Random().nextInt(2000);
			System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------");
			Thread.sleep(exeTime);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.release();
		}
	}
}

 

锁测试代码

 

/**
 * 分布式锁实例
 * 
 * @author shencl
 */
public class DistributedLockExample {
	private static CuratorFramework client = ClientFactory.newClient();
	private static final String PATH = "/locks";

	// 进程内部(可重入)读写锁
	private static final InterProcessReadWriteLock lock;
	// 读锁
	private static final InterProcessLock readLock;
	// 写锁
	private static final InterProcessLock writeLock;

	static {
		client.start();
		lock = new InterProcessReadWriteLock(client, PATH);
		readLock = lock.readLock();
		writeLock = lock.writeLock();
	}

	public static void main(String[] args) {
		try {
			List<Thread> jobs = Lists.newArrayList();
			for (int i = 0; i < 10; i++) {
				Thread t = new Thread(new ParallelJob("Parallel任务" + i, readLock));
				jobs.add(t);
			}

			for (int i = 0; i < 10; i++) {
				Thread t = new Thread(new MutexJob("Mutex任务" + i, writeLock));
				jobs.add(t);
			}

			for (Thread t : jobs) {
				t.start();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			CloseableUtils.closeQuietly(client);
		}
	}
}

 

看到没,用法和java concurrent包里的ReentrantReadWriteLock 是一模一样的。

事实上,整个recipes包的目录结构、实现原理同java concurrent包的设置是很一致的。比如有queue,Semaphore,Barrier等类,。他整个就是模仿jdk的实现,只不过是基于分布式的!

 

后边的几项,Barriers(关卡),Atomic(原子量),Caches,Queues和java concurrent包里的类的用法是一样的,就不继续贴了,有些附件里有。

要说明的是:有的功能性能不是特别理想,网上也没见有大的项目的使用案例。比如基于CAS机制的atomic,在某些情况重试的效率还不如硬同步,要是zookeeper节点再一多,各个节点之间通过event触发的数据同步极其频繁。那性能可以想象。

 

三、测试方法

 curator提供了很好的测试工具,你甚至是可以在完全没有搭建zookeeper server端的情况下,完成测试。

有2个重要的类

TestingServer 模拟单点, TestingCluster模拟集群。

需要使用的话,得依赖

		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-test</artifactId>
			<version>2.5.0</version>
		</dependency>

 

 

 

全文完。

 

本文参考:

http://curator.apache.org/

http://www.cnblogs.com/hzhuxin/archive/2012/11/01/2749341.html

http://www.chengxuyuans.com/Java+/72042.html

http://macrochen.iteye.com/blog/1366136

 

分享到:
评论
2 楼 liyonghui160com 2015-10-14  
楼主58的
1 楼 liutingfeng2010 2015-02-07  
哥们 DistributedAtomicIntegerExample 由于并发太大,延迟比较长,你把这一行
counter = new DistributedAtomicInteger(client, PATH, new RetryNTimes(100, 1000));
里的重试和等待时间加长些就行了,
另外程序里可以用CountDownLatch来替代不确定的sleep();

修改后可以这样:

package com.bj58.emc.study.curator.demo.atomic;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.CloseableUtils;

import com.bj58.emc.study.curator.demo.utils.ClientFactory;
import com.google.common.collect.Lists;

/**
* 这个类我没能运行出期待的结果来,谁检查一下我的代码?
*
* @author shencl
*/
public class DistributedAtomicIntegerExample {
private static final int _1000 = 1000;
private static CuratorFramework client = ClientFactory.newClient();
private static final String PATH = "/counter";
public static volatile DistributedAtomicInteger counter;

static {
client.start();
counter = new DistributedAtomicInteger(client, PATH, new RetryNTimes(100, 1000));
}

public static void main(String[] args) {
final CountDownLatch countDownLatch = new CountDownLatch(_1000);
try {
counter.trySet(0);
List<Thread> jobs = Lists.newArrayList();
// 开1k个线程,不用同步机制,同时启动
for (int i = 0; i < _1000; i++) {
jobs.add(new Thread(new Runnable() {

@Override
public void run() {
try {
AtomicValue<Integer> rc = counter.increment();
System.out.println("success:" + rc.succeeded() + ";before:" + rc.preValue() + ";after:" + rc.postValue());
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}));

}

for (Thread t : jobs) {
t.start();
}

// 保证线程全部执行完毕
countDownLatch.await();
System.out.println("计数器最终的值=" + counter.get().postValue());
AtomicValue<Integer> rc = counter.get();
System.out.println("success:" + rc.succeeded() + ";before:" + rc.preValue() + ";after:" + rc.postValue());
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}
}

相关推荐

    5、zookeeper的java -Curator(服务注册与发现)

    在Java环境中,Curator是一个优秀的Zookeeper客户端库,简化了与Zookeeper的交互,包括服务注册与发现。本文将深入探讨如何利用Curator实现这一功能。 首先,Curator提供了一套完整的API来抽象服务注册与发现,包括...

    Zookeeper-Java客户端Curator

    Zookeeper_Java客户端Curator

    zookeeper开源客户端Curator

    Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...

    Zookeeper开源客户端框架Curator简介与示例

    Curator 是一个基于 ZooKeeper 的开源客户端框架,它为 ZooKeeper 提供了高级抽象和功能,使得开发人员能够更方便地使用 ZooKeeper。 **Curator 框架概述** Curator 包含多个模块,如 ZooKeeper 客户端连接管理、...

    zookeeper两种客户端demo

    《Zookeeper两种客户端实战解析》 Zookeeper,作为分布式协调...以上就是关于Zookeeper的ZkClient和Curator客户端的介绍,希望对初学者和进阶者都有所启发。通过实践,相信你能更好地驾驭Zookeeper这个强大的工具。

    zookeeper客户端curator操作示例

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行...这将有助于你掌握Curator客户端的使用,并为实际项目中应用ZooKeeper打下基础。

    4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁)

    这篇文章主要介绍了Zookeeper的三种Java客户端:原生Java API、ZkClient以及Apache Curator,并着重讨论了Curator的特性与优势。 一、Zookeeper原生Java API Zookeeper的原生Java API提供了基本的CRUD(创建、读取...

    Zookeeper 原生api zkClient Curator操作

    在Java开发中,我们通常使用三种方式来操作Zookeeper:原生API、zkClient和Curator。接下来,我们将详细探讨这三种方式。 **一、Zookeeper原生API** Zookeeper提供了Java API,可以直接与Zookeeper服务器进行交互...

    ZooKeeper-Curator:zookeeper的curator客户端

    ZooKeeper-Curator是Apache ZooKeeper的一个高级客户端,它提供了许多实用工具和抽象,使得与ZooKeeper交互变得更加简单和可靠。ZooKeeper是一个分布式协调服务,广泛用于管理分布式应用程序的状态,实现一致性、...

    zookeeper Java api - curator 5.6.0

    Apache Curator 是一个高度封装的 ZooKeeper Java 客户端库,它简化了与 ZooKeeper 交互的复杂性,提供了更高级别的抽象和实用工具。ZooKeeper 是一个分布式的,开放源码的协调服务,用于分布式应用程序,提供命名...

    项目加入zookeeper的依赖包(Curator框架)

    1. `curator-client-4.0.1.jar`: 这是Curator客户端的核心库,包含了与Zookeeper服务器通信的基本组件。 2. `guava-18.0.jar`: Guava是Google的一个Java工具包,提供了很多实用的类和函数,Curator可能依赖于Guava的...

    Zookeeper客户端Curator Framework使用代码

    创建Curator客户端的基本步骤是初始化`CuratorFrameworkFactory`,设置连接字符串、会话超时时间以及重试策略,例如: ```java ZooKeeperClientConfig clientConfig = new ZooKeeperClientConfig(); client...

    curator zookeeper 3.4.6 2.9.1

    而Curator则是Zookeeper的一个客户端库,为开发者提供了更高级别的抽象和工具,简化了Zookeeper的使用。 Zookeeper 3.4.6是其稳定且广泛采用的一个版本,它提供了丰富的API和强大的一致性模型。在这个版本中,...

    zookeeper 使用 Curator 示例监听、分布式锁

    ZooKeeper 是一个分布式服务协调框架,常用于构建高可用的分布式系统。Curator 是 Apache ZooKeeper 的一个客户端库,提供了丰富的工具...通过学习这个示例,你可以深入理解 ZooKeeper 和 Curator 在实际应用中的用法。

    浅谈Zookeeper开源客户端框架Curator

    浅谈Zookeeper开源客户端框架Curator Curator是一个基于Zookeeper的开源客户端框架,由Netflix开发,旨在解决Zookeeper客户端使用过程中的各种问题。Curator提供了封装ZooKeeper client与ZooKeeper server之间的...

    zk客户端curator2.11

    客户端是Curator Framework,是Apache的项目,它主要的功能是为ZK的客户端使用提供了高可用的封装。在Curator Framework基础上封装的curator-recipes,实现了很多经典场景。比如:集群管理(Leader选举)、共享锁、...

    curator_zookeeper需要的jar

    1. `curator-client-2.8.0.jar`:这是Curator的基本客户端库,提供了连接Zookeeper服务器、执行基本操作(如创建、删除节点)的类和方法。例如,`org.apache.curator.framework.CuratorFramework`是Curator的主要...

    3天全面深入学习zookeeper视频教程

    3天精通zookeeper视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效...zookeeper 开源客户端curator介绍 zookeeper四字监控命令 zookeeper图形化的客户端工具(ZooInspector) taokeeper监控工具的使用

    zookeeper客户端工具

    zookeeper客户端工具,下载到本地后请使用ide工具进行编译,运行工程目录下的\zooinspector-master\target\zooinspector-pkg\bin里的bat或者sh。即可使用。

Global site tag (gtag.js) - Google Analytics