转自:http://blog.csdn.net/lzy_lizhiyang/article/details/48518731
使用Java操作zookeeper时,一般有两种方式:使用zkclient或者curator,相比较来说,curator的使用较为简便。今天就来看看如何使用curator来操作zookeeper。
需要的依赖如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency>
首先,我们创建一个客户端类,来真正的操作zookeeper,包括创建连接、创建节点,写入值、读取值等。
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CuratorZookeeperClient { private final int CONNECT_TIMEOUT = 15000; private final int RETRY_TIME = Integer.MAX_VALUE; private final int RETRY_INTERVAL = 1000; private static final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class); private CuratorFramework curator; private volatile static CuratorZookeeperClient instance; /** * key:父路径,如/jobcenter/client/goodscenter * value:Map-->key:子路径,如/jobcenter/client/goodscenter/goodscenter00000001 * value:路径中的值 */ private static ConcurrentHashMap<String,Map<String,String>> zkCacheMap = new ConcurrentHashMap<String,Map<String,String>>(); public static Map<String,Map<String,String>> getZkCacheMap() { return zkCacheMap; } private CuratorFramework newCurator(String zkServers) { return CuratorFrameworkFactory.builder().connectString(zkServers) .retryPolicy(new RetryNTimes(RETRY_TIME, RETRY_INTERVAL)) .connectionTimeoutMs(CONNECT_TIMEOUT).build(); } private CuratorZookeeperClient(String zkServers) { if(curator == null) { curator = newCurator(zkServers); curator.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { //连接丢失 logger.info("lost session with zookeeper"); } else if (state == ConnectionState.CONNECTED) { //连接新建 logger.info("connected with zookeeper"); } else if (state == ConnectionState.RECONNECTED) { logger.info("reconnected with zookeeper"); //连接重连 for(ZkStateListener s:stateListeners){ s.reconnected(); } } } }); curator.start(); } } public static CuratorZookeeperClient getInstance(String zkServers) { if(instance == null) { synchronized(CuratorZookeeperClient.class) { if(instance == null) { logger.info("initial CuratorZookeeperClient instance"); instance = new CuratorZookeeperClient(zkServers); } } } return instance; } /** * 写数据:/docker/jobcenter/client/app/app0..../app1...../app2 * @param path * @param content * * @return 返回真正写到的路径 * @throws Exception */ public String write(String path,String content) throws Exception { StringBuilder sb = new StringBuilder(path); String writePath = curator.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(sb.toString(), content.getBytes("utf-8")); return writePath; } /** * 随机读取一个path子路径 * 先从cache中读取,如果没有,再从zookeeper中查询 * @param path * @return * @throws Exception */ public String readRandom(String path) throws Exception { String parentPath = path; Map<String,String> cacheMap = zkCacheMap.get(path); if(cacheMap != null && cacheMap.size() > 0) { logger.debug("get random value from cache,path="+path); return getRandomValue4Map(cacheMap); } if(curator.checkExists().forPath(path) == null) { logger.debug("path [{}] is not exists,return null",path); return null; } else { logger.debug("read random from zookeeper,path="+path); cacheMap = new HashMap<String,String>(); List<String> list = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path); if(list == null || list.size() == 0) { logger.debug("path [{}] has no children return null",path); return null; } Random rand = new Random(); String child = list.get(rand.nextInt(list.size())); path = path + "/" + child; byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,path)) .forPath(path); String value = new String(b,"utf-8"); if(StringUtils.isNotBlank(value)) { cacheMap.put(path, value); zkCacheMap.put(parentPath, cacheMap); } return value; } } /** * 读取path下所有子路径下的内容 * 先从map中读取,如果不存在,再从zookeeper中查询 * @param path * @return * @throws Exception */ public List<String> readAll(String path) throws Exception { String parentPath = path; Map<String,String> cacheMap = zkCacheMap.get(path); List<String> list = new ArrayList<String>(); if(cacheMap != null) { logger.debug("read all from cache,path="+path); list.addAll(cacheMap.values()); return list; } if(curator.checkExists().forPath(path) == null) { logger.debug("path [{}] is not exists,return null",path); return null; } else { cacheMap = new HashMap<String,String>(); List<String> children = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path); if(children == null || children.size() == 0) { logger.debug("path [{}] has no children,return null",path); return null; } else { logger.debug("read all from zookeeper,path="+path); String basePath = path; for(String child : children) { path = basePath + "/" + child; byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,path)) .forPath(path); String value = new String(b,"utf-8"); if(StringUtils.isNotBlank(value)) { list.add(value); cacheMap.put(path, value); } } } zkCacheMap.put(parentPath, cacheMap); return list; } } /** * 随机获取Map中的一个值 * @param map * @return */ private String getRandomValue4Map(Map<String,String> map) { Object[] values = map.values().toArray(); Random rand = new Random(); return values[rand.nextInt(values.length)].toString(); } public void delete(String path) throws Exception { if(curator.checkExists().forPath(path) != null) { curator.delete().inBackground().forPath(path); zkCacheMap.remove(path); } } /** * 获取路径下的所有子路径 * @param path * @return */ public List<String> getChildren(String path) throws Exception { if(curator.checkExists().forPath(path) == null) { logger.debug("path [{}] is not exists,return null",path); return null; } else { List<String> children = curator.getChildren().forPath(path); return children; } } public void close() { if(curator != null) { curator.close(); curator = null; } zkCacheMap.clear(); } /** * zookeeper监听节点数据变化 * @author lizhiyang * */ private class ZKWatcher implements CuratorWatcher { private String parentPath; private String path; public ZKWatcher(String parentPath,String path) { this.parentPath = parentPath; this.path = path; } public void process(WatchedEvent event) throws Exception { Map<String,String> cacheMap = zkCacheMap.get(parentPath); if(cacheMap == null) { cacheMap = new HashMap<String,String>(); } if(event.getType() == Event.EventType.NodeDataChanged || event.getType() == Event.EventType.NodeCreated){ byte[] data = curator.getData(). usingWatcher(this).forPath(path); cacheMap.put(path, new String(data,"utf-8")); logger.info("add cache={}",new String(data,"utf-8")); } else if(event.getType() == Event.EventType.NodeDeleted) { cacheMap.remove(path); logger.info("remove cache path={}",path); } else if(event.getType() == Event.EventType.NodeChildrenChanged) { //子节点发生变化,重新进行缓存 cacheMap.clear(); List<String> children = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path); if(children != null && children.size() > 0) { for(String child : children) { String childPath = parentPath + "/" + child; byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,childPath)) .forPath(childPath); String value = new String(b,"utf-8"); if(StringUtils.isNotBlank(value)) { cacheMap.put(childPath, value); } } } logger.info("node children changed,recaching path={}",path); } zkCacheMap.put(parentPath, cacheMap); } } private final Set<ZkStateListener> stateListeners = new CopyOnWriteArraySet<ZkStateListener>(); public void addStateListener(ZkStateListener listener) { stateListeners.add(listener); }
其中,我们对节点和值进行了缓存,避免频繁的访问zookeeper。在对zookeeper操作时,对连接丢失、连接新建、重连等事件进行了监听,使用到了类ZkStateListener
public interface ZkStateListener { void reconnected(); }
下面,我们就使用ZkDockerService类来封住客户端的操作。
import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * jobclient docker改造 * 注册应用信息至zookeeper * @author lizhiyang * */ public class ZkDockerService { private static final Logger logger = LoggerFactory.getLogger(ZkDockerService.class); private CuratorZookeeperClient zkClient; private Set<String> zkPathList = new HashSet<String>(); // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试 private ScheduledFuture<?> retryFuture; // 定时任务执行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("RegistryFailedRetryTimer", true)); //需要重新注册的数据 private Set<ClientData> retrySet = new HashSet<ClientData>(); /** * init-method,初始化执行 * 将本机docker的IP地址 端口都注册到zookeeper中 */ public void register2Zookeeper() { try { zkClient = CuratorZookeeperClient.getInstance(ZOOKEEPER_ADDRESS); ClientData client = findClientData(); registerClientData(client); zkClient.addStateListener(new ZkStateListener(){ @Override public void reconnected() { ClientData client = findClientData(); //将服务添加到重试列表 retrySet.add(client); } }); //启动线程进行重试,1秒执行一次,因为jobcenter的定时触发时间最短的是1秒 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并连接注册中心 try { retryRegister(); } catch (Throwable t) { // 防御性容错 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, 1, 1, TimeUnit.SECONDS); } catch (Exception e) { logger.error("zookeeper write exception",e); } } /** * destrory-method,销毁时执行 */ public void destroy4Zookeeper() { logger.info("zkDockerService destrory4Zookeeper path="+zkPathList); try { if(retryFuture != null){ retryFuture.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } if(zkPathList != null && zkPathList.size() > 0) { for(String path : zkPathList) { try { zkClient.delete(path); } catch (Exception e) { logger.error("zkDockerService destrory4Zookeeper exception",e); } } } zkClient.close(); } /** 构造要存储的对象 **/ private ClientData findClientData() { ClientData client = new ClientData(); client.setIpAddress(ip); client.setPort(port); client.setSource(1); return client; } /** 将值写入zookeeper中 **/ private void registerClientData(ClientData client) throws Exception{ String centerPath = "/server"; String content = ""; String strServer = zkClient.write(centerPath, content); if(!StringUtils.isBlank(strServer)) { zkPathList.add(strServer); } } /** * 重连到zookeeper时,自动重试 */ protected synchronized void retryRegister() { if(!retrySet.isEmpty()){ logger.info("jobclient begin retry register client to zookeeper"); Set<ClientData> retryClients = new HashSet<ClientData>(retrySet); for(ClientData data :retryClients){ logger.info("retry register="+data); try { registerJobcenterClient(data); retrySet.remove(data); } catch (Exception e) { logger.error("registerJobcenterClient failed",e); } } } } }
其中在使用定时任务时,使用到了NamedThreadFactory,如下:
import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { private static final AtomicInteger POOL_SEQ = new AtomicInteger(1); private final AtomicInteger mThreadNum = new AtomicInteger(1); private final String mPrefix; private final boolean mDaemo; private final ThreadGroup mGroup; public NamedThreadFactory() { this("pool-" + POOL_SEQ.getAndIncrement(),false); } public NamedThreadFactory(String prefix) { this(prefix,false); } public NamedThreadFactory(String prefix,boolean daemo) { mPrefix = prefix + "-thread-"; mDaemo = daemo; SecurityManager s = System.getSecurityManager(); mGroup = ( s == null ) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(mGroup,runnable,name,0); ret.setDaemon(mDaemo); return ret; } public ThreadGroup getThreadGroup() { return mGroup; } }
相关推荐
描述提到“curator封装zookeeper需要的jar”,这意味着我们将探讨Curator如何将复杂的Zookeeper操作包装成更易于使用的Java接口。 首先,`zookeeper-3.4.6.jar`是Zookeeper的核心库,包含了Zookeeper服务器和客户端...
而Apache Curator是Facebook开源的一个ZooKeeper客户端库,它提供了更高级别的API,简化了ZooKeeper的使用,并增加了诸如连接管理、重试策略、事务操作等功能。 在Java开发中,Curator是使用ZooKeeper最常用且推荐...
5. javassist.jar:这是一个Java字节码操作库,Curator可能使用它来动态修改类的行为,例如在客户端实现更复杂的监控和管理功能。 6. swift-codec-0.16.0.jar和swift-service-0.16.0.jar:这些可能与特定项目或库...
综上所述,这个示例项目将展示如何使用 Curator 对 ZooKeeper 进行数据操作,以及如何实现基于 ZooKeeper 的分布式锁,这对于构建分布式系统中的并发控制和协调至关重要。通过学习这个示例,你可以深入理解 ...
总的来说,`maven-zookeeper`项目为学习和实践如何使用Curator操作Zookeeper集群提供了一个很好的起点。通过这个项目,开发者不仅可以掌握Curator的使用,还能加深对Zookeeper的理解,并熟悉Maven在项目构建中的应用...
curator zookeeper 3.4.6 2.9.1
在Java开发中,我们通常使用三种方式来操作Zookeeper:原生API、zkClient和Curator。接下来,我们将详细探讨这三种方式。 **一、Zookeeper原生API** Zookeeper提供了Java API,可以直接与Zookeeper服务器进行交互...
Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...
Curator客户端是Apache Curator框架的一部分,该框架为开发人员提供了一系列高级API和工具,用于处理常见的ZooKeeper用例,降低了使用ZooKeeper的复杂性。以下是对Curator客户端及其主要特性的详细阐述: 1. **连接...
总的来说,Curator框架极大地简化了Zookeeper的使用,提供了丰富的功能和易于使用的API,使开发者能够更专注于业务逻辑而不是底层的协调细节。在项目中集成这些依赖包,能够有效地支持分布式环境下的协调需求。
Curator是Netflix开源的一个用于简化Zookeeper使用的Java客户端库,它提供了很多高级功能,包括分布式锁、事件监听、配方等。在这个示例中,我们将使用Curator的`InterProcessMutex`类来实现分布式互斥锁(Mutex)。...
CuratorFramework 是 Curator 的核心,它封装了 ZooKeeper 连接的创建、会话管理以及操作执行等。 3. **连接管理**:使用 `CuratorFrameworkFactory` 工厂类,你可以创建一个 ZooKeeper 客户端实例。例如: ```...
使用apache curator-test单元测试zookeeper
在本项目`zookeeper_demo`中,我们将深入探讨Zookeeper的三种主要客户端库——原生API、ZkClient和Curator的操作方式,以及它们在集群配置和分布式锁等实际场景中的应用。 1. **Zookeeper原生API** Zookeeper原生...
Zookeeper客户端Curator Framework是Apache Zookeeper项目的一个高级封装库,它为开发者提供了一系列便捷的API,使得操作Zookeeper变得更加简单。本篇文章将深入探讨Curator Framework的主要功能、使用方法以及示例...
- 操作:使用`CuratorFramework`提供的API进行增删改查等操作。 - 监听:注册Watchers来监听ZNode的变化。 - 关闭:在应用结束时,记得关闭`CuratorFramework`实例,释放资源。 通过ZooKeeper-Curator,开发者...
Curator的`ServiceCache`提供了更加高效的服务查询方式,它会在内存中缓存服务实例,减少对Zookeeper的频繁查询,同时使用Watcher机制实时更新服务列表。 总的来说,Zookeeper结合Curator的`ServiceDiscovery`机制...
使用mindmaster打开
Curator 是一个基于 ZooKeeper 的开源客户端框架,它为 ZooKeeper 提供了高级抽象和功能,使得开发人员能够更方便地使用 ZooKeeper。 **Curator 框架概述** Curator 包含多个模块,如 ZooKeeper 客户端连接管理、...
在分布式系统中,确保数据的一致性和正确性是至关重要的,而ZooKeeper与Curator的结合使用就是解决这一问题的有效方案。ZooKeeper是一个高度可靠的分布式协调服务,它提供了诸如分布式锁、队列和配置管理等核心功能...