package org.gjp;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class ZkCreateNode {
public byte[] getData(CuratorFramework client,final String path){
byte [] b = null;
try {
b = client.getData().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
return b;
}
/**
* 检查节点是否存在
*
* @param client
* @param path
* @return
*/
public Stat checkNode(CuratorFramework client, String path) {
ExistsBuilder eb = client.checkExists();
Stat stat = null;
try {
stat = eb.forPath(path);
if (stat != null) {
//System.out.println(stat);
System.out.println("version:" + stat.getVersion());
} else {
System.out.println(" null ......");
}
} catch (Exception e) {
e.printStackTrace();
}
return stat;
}
/**
* 创建节点
*
* @param client
* @param path
* @return
*/
public String createNode(CuratorFramework client, final String path, final String value) {
String result = "";
try {
result = client.create().forPath(path, value.getBytes());
System.out.println("res:" + result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 设置具有序列化的临时节点
*
* @param client
* @param path
* @param value
* @return
*/
public String createModeEphSeqNode(CuratorFramework client, final String path, final String value) {
String result = "";
try {
result = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, value.getBytes());
System.out.println("res:" + result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 删除节点
*
* @param client
* @param path
* @return
*/
public boolean deleteNode(CuratorFramework client, final String path) {
boolean result = true;
try {
client.delete().forPath(path);
} catch (Exception e) {
result = false;
e.printStackTrace();
}
return result;
}
/**
* 修改节点值
*
* @param client
* @param path
* @param val
* @return
*/
public boolean changeNodeData(CuratorFramework client, final String path, final String val) {
boolean result = true;
try {
client.setData().forPath(path, val.getBytes());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
}
public void getNodeChild(CuratorFramework client, final String path) throws Exception {
List<String> pathChild = client.getChildren().forPath(path);
for (String item : pathChild) {
System.out.println("child:" + item);
}
}
/**
* 分布式锁 方式1
*
* @param client
* @param path
* @param val
*/
public boolean disLock(CuratorFramework client, final String path, final String val) {
boolean result = false;
// 首先查看节点是否存在
Stat stat = checkNode(client, path);
if (null == stat) {
// 创建节点
String res = createNode(client, path, val);
if (null != res && "".equals(res.trim())) {
// 创建节点成功
InterProcessMutex lock = new InterProcessMutex(client, path);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
// 业务逻辑内容。。。。。。。
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock");
result = true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return result;
}
/** 分布式锁 方式2
* 1.当商品抢购时,把抢购商品的 发布数量和购买数量传入,当用户支付前执行disLockNum 方法在zookeeper中记录用户已经准备支付的商品数量,
* 2.当发布商品修改商品数量时,设置zookeeper 中数量为0
* 3.支付失败时,从zookeeper中减去支付失败的商品数量;
* @param client
* @param path
* @param num:购买数量
* @param releaseNum:剩余数量
* @return
*/
public boolean disLockProductAddNum(CuratorFramework client, final String path, final Double num, final Double releaseNum) {
boolean result = false;
InterProcessMutex lock = new InterProcessMutex(client, path);
try {
if (lock.acquire(100 * 1000, TimeUnit.SECONDS)) {
// 首先查看节点是否存在
Stat stat = checkNode(client, path);
if (null == stat) {
// 创建节点
String res = createNode(client, path, num.toString());
if (null != res && "".equals(res.trim())) {
try {
result = true;
} catch (Exception e) {
e.printStackTrace();
}
}
}else{
//获取zookeeper中的数量
byte[] preNum = getData(client, path);
if(null != preNum){
Double preTotal = 0.0;
if(preNum.length >0){
preTotal = Double.parseDouble(new String(preNum));
}
Double total = preTotal.doubleValue() + num.doubleValue();
if(total.doubleValue() < releaseNum.doubleValue()){
boolean b = changeNodeData(client, path, total.toString());
if(b){
result = true;
return result;
}else{
//操作失败
return false;
}
}else{
//库存不足
return false;
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
return result;
}
/**
* 支付失败后,把购买数量从zookeeper中去除
* @param client
* @param path
* @param num 减少的数量
* @return
*/
public boolean disLockProductdReduceNum(CuratorFramework client, final String path, final Double num) {
boolean result = false;
//获取zookeeper中的数量
byte[] preNum = getData(client, path);
if(null != preNum){
Double preTotal = 0.0;
if(preNum.length >0){
preTotal = Double.parseDouble(new String(preNum));
Double total = preTotal.doubleValue() - num.doubleValue();
if(total.doubleValue() >=0){
boolean b = changeNodeData(client, path, total.toString());
if(b){
result = true;
return result;
}
}
}
}
return result;
}
}
测试代码:
package org.gjp;
import java.util.Calendar;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
public class ProductNumThread {
public static void main(String[] args) {
ExecutorService exe = Executors.newFixedThreadPool(70);
final long start =Calendar.getInstance().getTimeInMillis();
for (int i = 1; i < 1990; i++) {
final double step = i;
final double iN = 1;
exe.execute(new Runnable() {
double num = iN; //购买数量
double storeNum = 2000;//发布数量
String path ="/orage_id"; //商品id
public void run() {
CuratorFramework client = ZkListenTest.getClient();
ZkCreateNode node = new ZkCreateNode();
boolean flag = node.disLockProductAddNum(client, path, num, storeNum);
System.out.println("flag:"+flag);
boolean pay = false;
if(flag){
pay = true; //开始支付货款
}
if(!pay){
//支付失败时
node.disLockProductdReduceNum(client, path, num);
}
if(null != client){
client.close();
}
if(step >1990){
long end = Calendar.getInstance().getTimeInMillis();
System.out.println("==========================="+(end -start));
}
}
});
}
}
}
相关推荐
有两种方式:悲观锁(`SELECT ... FOR UPDATE`)和乐观锁。悲观锁在事务开始时立即锁定资源,直到事务结束才释放,而乐观锁则在更新数据时检查是否有其他事务已经修改了数据。具体实现方式取决于业务需求和性能考虑...
Curator提供了现成的分布式锁实现,如`InterProcessMutex`和`InterProcessSemaphoreMutex`,并且提供了易用的API,使得开发人员可以更简单地使用Zookeeper进行分布式锁的实现。 在上述代码片段中,`LockUtil`类是...
本压缩包“zk:redis分布式锁.zip”提供了基于Zookeeper(zk)和Redis两种分布式锁实现的示例和相关资料。 首先,我们来看Zookeeper(zk)的分布式锁。Zookeeper是Apache的一个开源项目,提供了一个高可用的、高性能...
在分布式系统中,为了保证数据的一致性和安全性,分布式锁是一种常见的解决方案。本文将深入探讨如何使用Redisson和...提供的代码示例可以作为学习和参考的起点,帮助开发者更好地理解和应用这两种分布式锁的实现。
在IT行业中,尤其是在大型分布式系统的设计与开发中,分布式锁是一种关键的同步机制。本篇文章将深入探讨如何在C#.NET环境下利用Redis实现分布式锁,以及相关的核心知识点。 首先,让我们理解什么是分布式锁。...
Redis和Zookeeper作为两种常用的分布式锁实现方式,各有优缺点。在实际应用中,应根据业务需求和系统特性来选择适合的方案。同时,使用分布式锁时,必须考虑锁的可扩展性、公平性、安全性和容错性,确保在分布式环境...
分布式锁是解决分布式系统中多个进程间共享资源互斥访问的一种方法,它保证了即使在分布式环境下,也能维持数据的一致性。...在不同的场景和需求下,可能需要选择不同的分布式锁实现策略,来满足系统的最终一致性目标。
本文将详细介绍三种常用的分布式锁实现方式:基于数据库、基于缓存(如Redis)、以及基于Zookeeper。 #### 一、基于数据库实现分布式锁 数据库作为传统意义上的数据存储介质,在实现分布式锁方面也有其独特的优势...
redis分布式锁的工具类,采用的是Lua代码的方式,保证了Java执行方法的原子性。
文件"DistributedLockByRedis"可能包含了具体的Redis分布式锁实现代码,包括客户端的API设计、lua脚本、续租逻辑以及容错策略等,对于理解和实现分布式锁是一个宝贵的参考资料。通过深入研究这些内容,开发者可以更...
分布式锁是一种在分布式系统中实现同步访问同一资源的机制,特别是在多节点环境下,它能确保在任何时刻只有一个节点能够对共享资源进行操作。基于Netty实现的分布式锁,利用了Netty作为高效的异步事件驱动网络应用...
分布式锁是分布式系统中用于同步资源访问的一种机制,它能够保证在分布式部署的应用系统中,同一时刻只允许一个客户端对共享...在实际应用中,需要根据具体业务场景和需求,权衡利弊,选择最适合的分布式锁实现方案。
分布式锁技术实现原理 分布式锁是一种控制分布式系统中互斥访问共享资源的手段,以避免并行导致的结果不可控。其基本实现原理与单进程锁相同,通过一个共享标识来确定唯一性,对共享标识进行修改时能够保证原子性和...
Redisson的分布式锁实现更加完善,支持可重入锁、公平锁、读写锁,还具有锁自动续期功能,避免了因网络延迟导致的锁丢失。使用Redisson创建分布式锁只需几行代码,通过`RLock`接口的`lock()`和`unlock()`方法即可...
常见的分布式锁实现包括基于Zookeeper、Redis或数据库的分布式锁,它们通常采用乐观锁或悲观锁策略,确保在多个节点间安全地进行资源访问。 分布式事务则涉及多个节点之间的数据一致性保证。本地事务在单一资源管理...
本文将深入探讨Redis和ZooKeeper这两种流行的分布式系统中的分布式锁实现。 **Redis分布式锁** Redis是一个高性能的键值存储系统,广泛用于缓存、消息队列等场景。在Redis中实现分布式锁主要依赖于两个命令:`...
分布式锁是一种在分布式系统中实现同步的技术,它允许多个节点在同一时间访问共享资源。在大型分布式环境中,确保数据的一致性和正确性至关重要,这就是分布式锁发挥作用的地方。Zookeeper,一个由Apache开发的...
Curator的分布式锁实现主要有两种方式:`InterProcessMutex`和`InterProcessSemaphoreMutex`。`InterProcessMutex`实现了独占锁,即一次只有一个客户端能够获取到锁;而`InterProcessSemaphoreMutex`则提供了信号量...
8. **分布式锁的其他实现**:除了Redis,还可以利用ZooKeeper、MongoDB等其他数据库或服务来实现分布式锁,但每种方式都有其优缺点,需要根据实际场景选择合适的方法。 以上就是基于SpringBoot和Redis实现分布式锁...
分布式锁的实现有悲观锁和乐观锁之分,下面将分别介绍这两种锁的实现思路。 悲观锁是基于悲观的思想,认为在数据处理过程中总是会发生冲突,因此在整个数据处理过程中,会先对数据加锁。这种锁的实现方法通常依赖于...