package org.gjp;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class ZkCreateNode {
/**
* 检查节点是否存在
*
* @param client
* @param path
* @return
*/
public Stat checkNode(CuratorFramework client, String path) {
ExistsBuilder eb = client.checkExists();
Stat stat = null;
try {
stat = eb.forPath(path);
if (stat != null) {
System.out.println(stat);
System.out.println("version:" + stat.getVersion());
} else {
System.out.println(" null ......");
}
} catch (Exception e) {
e.printStackTrace();
}
return stat;
}
/**
* 创建节点
*
* @param client
* @param path
* @return
*/
public String createNode(CuratorFramework client, final String path, final String value) {
String result = "";
try {
result = client.create().forPath(path, value.getBytes());
System.out.println("res:" + result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 设置具有序列化的临时节点
* @param client
* @param path
* @param value
* @return
*/
public String createModeEphSeqNode(CuratorFramework client, final String path, final String value) {
String result = "";
try {
result = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, value.getBytes());
System.out.println("res:" + result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 删除节点
*
* @param client
* @param path
* @return
*/
public boolean deleteNode(CuratorFramework client, final String path) {
boolean result = true;
try {
client.delete().forPath(path);
} catch (Exception e) {
result = false;
e.printStackTrace();
}
return result;
}
/**
* 修改节点值
* @param client
* @param path
* @param val
* @return
*/
public boolean changeNodeData(CuratorFramework client, final String path,final String val){
boolean result =true;
try {
client.setData().forPath(path, val.getBytes());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
}
public void getNodeChild(CuratorFramework client, final String path) throws Exception{
List<String> pathChild = client.getChildren().forPath(path);
for(String item:pathChild){
System.out.println("child:"+item);
}
}
/**
* 分布式锁
*
* @param client
* @param path
* @param val
*/
public boolean disLock(CuratorFramework client, final String path, final String val) {
boolean result = false;
// 首先查看节点是否存在
Stat stat = checkNode(client, path);
if (null == stat) {
// 创建节点
String res = createNode(client, path, val);
if (null != res && "".equals(res.trim())) {
// 创建节点成功
InterProcessMutex lock = new InterProcessMutex(client, path);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
//业务逻辑内容。。。。。。。
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock");
result = true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return result;
}
//节点监听说明 /////////////////////////////////////////////////////////////
//1.path Cache 连接 路径 是否获取数据
//能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听
public void setListenterThreeOne(CuratorFramework client,final String path) throws Exception{
PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("开始进行事件分析:-----"+event.getType());
ChildData data = event.getData();
StringBuffer sb = new StringBuffer(32);
switch (event.getType()) {
case CHILD_ADDED:
sb.append("CHILD_ADDED : "+ data.getPath());
break;
case CHILD_REMOVED:
sb.append("CHILD_REMOVED : "+ data.getPath());
break;
case CHILD_UPDATED:
sb.append("CHILD_UPDATED : "+ data.getPath());
break;
default:
break;
}
}
};
childrenCache.getListenable().addListener(childrenCacheListener);
System.out.println("Register zk watcher successfully!");
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
}
//2.Node Cache 监控本节点的变化情况 连接 目录 是否压缩
//监听本节点的变化 节点可以进行修改操作 删除节点后会再次创建(空节点)
public void setListenterThreeTwo(CuratorFramework client,final String path) throws Exception{
//设置节点的cache
final NodeCache nodeCache = new NodeCache(client, path, false);
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("the test node is change and result is :");
System.out.println("path : "+nodeCache.getCurrentData().getPath());
System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));
System.out.println("stat : "+nodeCache.getCurrentData().getStat());
}
});
nodeCache.start();
}
//3.Tree Cache
// 监控 指定节点和节点下的所有的节点的变化--无限监听 可以进行本节点的删除(不在创建)
public void setListenterThreeThree(CuratorFramework client,final String path) throws Exception{
ExecutorService pool = Executors.newCachedThreadPool();
//设置节点的cache
TreeCache treeCache = new TreeCache(client, path);
//设置监听器和处理过程
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
StringBuffer sb = new StringBuffer(32);
switch (event.getType()) {
case NODE_ADDED:
sb.append("NODE_ADDED : "+ data.getPath());
break;
case NODE_REMOVED:
sb.append("NODE_REMOVED : "+ data.getPath());
break;
case NODE_UPDATED:
sb.append("NODE_UPDATED : "+ data.getPath());
break;
default:
break;
}
if(null !=data){
sb.append(" ;data=");
sb.append(new String(data.getData()));
}
sb.append(" end");
System.out.println(sb.toString());
}
});
//开始监听
treeCache.start();
}
public static void main(String[] args) {
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(10, 5000));
client.start();
String parentPath = "/zkpath2";
String path = "/zkpath2/product13";
ZkCreateNode zk = new ZkCreateNode();
try {
//zk.setListenterThreeOne(client, path);
//zk.setListenterThreeTwo(client, path);
zk.setListenterThreeThree(client,path);
//zk.checkNode(client, path);
//zk.createNode(client, path, "val2");
// zk.createNode(client, "/zkpath2/product6", "val6");
//zk.changeNodeData(client, path, "bocetest13");
//zk.changeNodeData(client, path, "bocetest13");
//System.out.println("del="+zk.deleteNode(client,path));
//zk.getNodeChild(client, parentPath);
zk.createModeEphSeqNode(client, "/seq/test", "sql1");
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(client !=null){
client.close();
}
}
}
相关推荐
它隐藏了底层的ZooKeeper操作,使得开发者只需调用几个方法即可实现分布式锁的获取和释放。 此外,Curator还提供了一些附加功能,如锁的公平性(按照创建顺序获取锁)、租约(设置锁的持有时间限制)以及锁竞争的...
以下是使用Curator实现分布式锁的步骤: 1. 添加Curator依赖: ```xml <groupId>org.apache.curator <artifactId>curator-framework <version>4.x.x</version> <!-- 使用最新版本 --> ``` 2. 初始化...
分布式锁Curator是Java开发中的一个关键工具,用于在分布式环境中实现锁的机制。Curator是Apache ZooKeeper的一个客户端库,提供了丰富的高级API和工具,简化了ZooKeeper的使用,同时也包括了分布式锁的实现。...
本示例将详细介绍如何利用 Curator 在 ZooKeeper 上进行数据操作以及实现分布式锁。 一、ZooKeeper 与 Curator 的基本概念 1. ZooKeeper:ZooKeeper 是一款分布式协调服务,它为分布式应用提供一致性服务,如命名...
在分布式应用系统中,实现分布式锁是解决多系统或多个服务实例之间资源互斥访问的一种常用机制。本部分将详细介绍使用Zookeeper实现分布式锁的知识点。 Zookeeper是Apache的一个开源项目,它为分布式应用提供了高...
本项目可能使用了如ZooKeeper、Redis或Apache Curator等工具来实现分布式锁。这些工具能够提供高可用性和可扩展性,确保在分布式系统中的任务调度一致性。 而“xxx-job”可能指的是某种特定的分布式任务调度框架,...
2. **框架方式**:除了直接使用Zookeeper API,还可以借助于像Curator这样的框架,它提供了更高级别的抽象和便利的功能,如Lock接口,简化了分布式锁的实现,降低了开发难度。 接下来是Redis的分布式锁。Redis是一...
本实践教程将指导你如何利用Zookeeper实现分布式锁,以便在分布式环境中保证资源访问的互斥性。 **1. Zookeeper概述** Zookeeper是一个分布式协调服务,它为分布式应用提供了简单而强大的命名服务、配置管理、集群...
Apache Curator是一个ZooKeeper的客户端库,它提供了更高层次的API来简化ZooKeeper的使用,包括实现分布式锁的配方。 在提供的代码示例中,我们展示了如何使用Apache Curator来实现一个健壮的分布式锁。这个实现...
在实际项目中,可以使用Java的ZooKeeper客户端库(如Curator)来简化分布式锁的实现。这些库提供了高级API,帮助开发者更方便地处理ZooKeeper的操作,例如创建、删除节点,以及设置节点监视器等。 总之,ZooKeeper...
它被广泛用于实现分布式锁、配置管理、服务发现等多个场景。本篇文章将深入探讨如何使用Zookeeper实现分布式共享锁。 分布式锁是一种在多节点之间共享资源的机制,它允许在同一时间只有一个节点对资源进行操作。在...
zookeeper是一种分布式协调服务,可以用于实现分布式锁。zookeeper提供了一个树形结构的命名空间,客户端可以在这个命名空间中创建、删除、修改节点。在分布式锁的实现中,zookeeper可以作为一个中间件,用于协调多...
ZooKeeper因其强一致性、高可用性和实时性,成为实现分布式锁的理想选择。 二、ZooKeeper分布式锁机制 1. 临时顺序节点的创建 当客户端需要获取锁时,它会在zk上的一个特定路径(例如 `/my_lock`)下创建一个临时...
Zookeeper是一个分布式协调服务,提供了丰富的数据一致性操作,如创建临时节点实现分布式锁。在Java中,通常使用ZKClient或Curator库来操作Zookeeper。Zookeeper的分布式锁特点是强一致性和高可用性,但由于其操作...
集成Zookeeper,我们可以利用其提供的分布式协调服务,包括节点创建、监听、删除等操作,来实现分布式锁。 Zookeeper是一个分布式服务框架,它是Apache Hadoop的一个子项目,设计用于处理高并发、高可用性的分布式...
使用Curator可以极大地简化与Zookeeper交互的代码,例如实现分布式锁只需要几行代码。此外,Curator还提供了各种场景的“Recipe”,如分布式锁服务、选举机制和分布式计数器等,这些预定义的解决方案使开发者能更...
ZooKeeper 实现分布式锁的方法示例 ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等...
ZooKeeper经典应用场景 ZooKeeper是一个高可用的... ZooKeeper分布式锁的实现可以基于临时znode和Curator客户端中的各种官方实现的分布式锁,而ZooKeeper服务注册中心可以提供高可用性、强一致性和实时性等特性。
- **使用库**:Curator是Apache的ZooKeeper客户端,提供了`InterProcessMutex`实现分布式锁。 **对比分析**: - **数据库分布式锁**:性能较差,可能锁表,需轮询检查,占用连接资源。 - **Redis分布式锁**:锁...
Curator的分布式锁食谱可以防止多个节点同时修改同一数据,确保数据的一致性。 3. **分组服务**:通过Group服务,节点可以加入到特定的组中,实现组内的通信和协作。 4. **队列和栈**:Curator提供了FIFO(先进先...