首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。
共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。
官网资料:http://zookeeper.apache.org/doc/r3.4.5/recipes.html
中文资料:https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper
详见Locks章节。
原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用Curator。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:
首先,创建一个Maven工程,在pom文件里导入下面的包:
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> </dependencies>
LockZookeeperClient接口:
package com.XXX.framework.lock; import org.apache.curator.framework.CuratorFramework; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public interface LockZookeeperClient { /** * * @return */ CuratorFramework getCuratorFramework(); /** * * @return */ String getBasePath(); /** * garbage collector * * @param gcPath */ void gc(String gcPath); }
LockZookeeperClient接口的实现LockZookeeperClientFactory:
package com.XXX.framework.lock; import java.util.Date; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public class LockZookeeperClientFactory implements LockZookeeperClient { private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class); private boolean hasGc = true; private Timer gcTimer; private TimerTask gcTimerTask; private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>(); private int gcIntervalSecond = 60; private CuratorFramework curatorFramework; private String zookeeperIpPort = "localhost:2181"; private int sessionTimeoutMs = 10000; private int connectionTimeoutMs = 10000; private String basePath = "/locks"; public void setHasGc(boolean hasGc) { this.hasGc = hasGc; } public void setGcIntervalSecond(int gcIntervalSecond) { this.gcIntervalSecond = gcIntervalSecond; } public void setZookeeperIpPort(String zookeeperIpPort) { this.zookeeperIpPort = zookeeperIpPort; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public void setBasePath(String basePath) { basePath = basePath.trim(); if (basePath.endsWith("/")) { basePath = basePath.substring(0, basePath.length() - 1); } this.basePath = basePath; } public void init() { if(StringUtils.isBlank(zookeeperIpPort)){ throw new NullPointerException("zookeeperIpPort"); } ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy); curatorFramework.start(); LOG.info("CuratorFramework initialise succeed."); if (hasGc) { gc(); } } public void destroy() { gcPaths.clear(); gcPaths = null; gcStop(); curatorFramework.close(); curatorFramework = null; } @Override public void gc(String gcPath) { if (hasGc && StringUtils.isNotBlank(gcPath)) { gcPaths.add(gcPath.trim()); } } @Override public CuratorFramework getCuratorFramework() { return this.curatorFramework; } @Override public String getBasePath() { return this.basePath; } private synchronized void gc() { gcStop(); try { scanningGCNodes(); } catch (Throwable e) { LOG.warn(e); } gcTimerTask = new TimerTask() { @Override public void run() { doingGc(); } }; Date begin = new Date(); begin.setTime(begin.getTime() + (10 * 1000L)); gcTimer = new Timer("lock-gc", true); gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L); } private synchronized void gcStop() { if (null != gcTimer) { gcTimer.cancel(); gcTimer = null; } if (null != gcTimerTask) { gcTimerTask.cancel(); gcTimerTask = null; } } private synchronized void scanningGCNodes() throws Exception { if (null == curatorFramework.checkExists().forPath(basePath)) { return; } List<String> paths = curatorFramework.getChildren().forPath(basePath); if (CollectionUtils.isEmpty(paths)) { gcPaths.add(basePath); return; } for (String path : paths) { try{ String tmpPath = basePath + "/" + path; if (null == curatorFramework.checkExists().forPath(tmpPath)) { continue; } gcPaths.add(tmpPath); } catch(Throwable e){ LOG.warn("scanning gc nodes error.", e); } } } private synchronized void doingGc() { LOG.debug("GC beginning."); if (CollectionUtils.isNotEmpty(gcPaths)) { for (String path : gcPaths) { try { if (null != curatorFramework.checkExists().forPath(path)) { if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) { curatorFramework.delete().forPath(path); gcPaths.remove(path); LOG.debug("GC " + path); } } else { gcPaths.remove(path); } } catch (Throwable e) { gcPaths.remove(path); LOG.warn(e); } } } LOG.debug("GC ended."); } }
SharedLock共享锁:
package com.XXX.framework.lock.shared; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import com.XXX.framework.lock.LockZookeeperClient; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public class SharedLock { private InterProcessLock interProcessLock; public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) { super(); if (StringUtils.isBlank(resourceId)) { throw new NullPointerException("resourceId"); } String path = lockZookeeperClient.getBasePath(); path += ("/" + resourceId.trim()); interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path); lockZookeeperClient.gc(path); } /** * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call * to {@link #release()} * * @throws Exception ZK errors, connection interruptions */ public void acquire() throws Exception { interProcessLock.acquire(); } /** * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call * to {@link #release()} * * @param time time to wait * @param unit time unit * @return true if the mutex was acquired, false if not * @throws Exception ZK errors, connection interruptions */ public boolean acquire(long time, TimeUnit unit) throws Exception { return interProcessLock.acquire(time, unit); } /** * Perform one release of the mutex. * * @throws Exception ZK errors, interruptions, current thread does not own the lock */ public void release() throws Exception { interProcessLock.release(); } /** * Returns true if the mutex is acquired by a thread in this JVM * * @return true/false */ public boolean isAcquiredInThisProcess() { return interProcessLock.isAcquiredInThisProcess(); } }
到此代码已经完成。下面写一个简单的Demo:
//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写 LockZookeeperClientFactory lzc = new LockZookeeperClientFactory(); lzc.setZookeeperIpPort("10.100.15.1:8900"); lzc.setBasePath("/locks/sharedLock/"); lzc.init(); SharedLock sharedLock = new SharedLock(lzc, "sharedLock1"); try { if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) { System.out.println("sharedLock1 get"); } } catch (Exception e) { e.printStackTrace(); } finally { try { sharedLock.release(); } catch (Exception e) { e.printStackTrace(); } } lzc.destroy();
就这样,系统就会每隔一分钟去回收一次没有使用的结点。
相关推荐
本篇文章将深入探讨如何使用Zookeeper实现分布式共享锁。 分布式锁是一种在多节点之间共享资源的机制,它允许在同一时间只有一个节点对资源进行操作。在Java环境中,我们可以利用Zookeeper的API来创建和管理这种锁...
分布式锁是一种在分布式系统中实现同步的技术,它允许多个节点在同一时刻访问共享资源。在大型分布式环境中,由于网络延迟和并发操作,简单的本地锁可能无法有效解决数据一致性问题。这时,Zookeeper,一个高可用的...
**Zookeeper分布式锁的关键特性包括:** 1. **顺序一致性:** Zookeeper中的节点被创建顺序是全局唯一的,这有助于实现锁的唯一性。 2. **原子性:** 创建和删除节点的操作在Zookeeper中都是原子性的,这保证了...
《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,其中重点讲解了Paxos算法与Zookeeper在实际应用中的理论与实践。Paxos是分布式计算领域中著名的共识算法,为解决分布式...
**分布式锁的概念与重要性** 在分布式系统中,由于多台服务器之间可能存在数据不一致的情况,因此需要一种机制来确保在并发...理解并熟练掌握Zookeeper的分布式锁机制,对于构建高可用、高性能的分布式系统至关重要。
基于一致性模型,ZooKeeper确保了数据的一致性和有序性,使其成为构建分布式锁的理想选择。 **2. 分布式锁原理** 分布式锁的目的是在多个节点之间实现对共享资源的互斥访问。在ZooKeeper中,通常通过创建临时顺序...
Zookeeper基于Paxos和其他一致性算法的实现,为分布式应用程序提供了命名服务、配置管理、分布式锁、群组服务等功能。Zookeeper通过ZNode(类似于文件系统的节点)来存储和操作数据,并采用观察者模式来实时监控数据...
本文将深入探讨基于Zookeeper实现的分布式读写锁,并利用Zkclient客户端进行操作。Zookeeper是一个分布式服务协调框架,它提供了一种简单且高效的方式来实现分布式锁。 **一、Zookeeper简介** Zookeeper是由Apache...
ZooKeeper分布式锁的实现基于ZooKeeper的节点机制。ZooKeeper的节点可以分为四种类型:持久节点、临时节点、顺序节点和ephemeral节点。其中,临时节点和ephemeral节点可以被用来实现分布式锁。 临时节点实现分布式...
ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...
Zookeeper提供了基于其强大的数据一致性特性的分布式锁实现,其中包括了重入排他锁。 **一、Zookeeper的分布式锁原理** 1. **节点创建与删除**:在Zookeeper中,锁的实现通常依赖于临时节点(ephemeral nodes)。...
- **分布式锁**:通过特定的ZNode结构,实现共享锁服务。 - **队列服务**:FIFO(先进先出)的队列可以通过ZNode的顺序创建实现。 3. **ZooKeeper的架构** - **客户端-服务器模型**:每个客户端连接到一个或多个...
Zookeeper分布式锁的工作原理: 1. **会话和临时节点**:Zookeeper支持两种类型的节点,持久节点和临时节点。临时节点在客户端会话失效(例如,客户端崩溃或网络断开)时会被自动删除,这为实现分布式锁提供了一个...
分布式全局锁是分布式系统中一个重要的同步控制工具,它允许在多节点环境下,对共享资源进行独占式访问,防止并发问题。Zookeeper,作为Apache的一个高性能、高可用的分布式协调服务,常被用于实现这样的功能。在这...
Zookeeper,是由Apache开发的开源分布式协调服务,它基于Paxos等一致性算法实现,并提供了更高级别的API,使得开发者能够更容易地实现分布式锁、配置管理、命名服务等功能。Zookeeper的主要特性包括: 1. **原子性*...
分布式锁是解决分布式系统中多个进程或线程对共享资源的并发访问问题的关键技术。以下是如何使用ZookeeperNet实现分布式锁的步骤: 1. **创建临时节点**:每个客户端在尝试获取锁时,会在特定的Zookeeper路径下创建...
《Zookeeper分布式协调案例详解》 Zookeeper,作为Apache软件基金会的一个开源项目,是分布式系统中的核心组件,专为管理分布式应用提供高可用性、一致性以及数据同步等服务。其设计目标是简化分布式环境下的复杂...
在分布式锁的场景中,ZooKeeper可以作为一个中心化的锁服务,帮助各个节点之间协调并发操作,防止多个节点同时访问共享资源。 在提供的代码示例中,我们看到一个名为`DistributedLock`的类,它实现了Java的`Lock`...
《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作。本书主要围绕Paxos算法和Zookeeper两大主题展开,旨在帮助读者理解分布式环境中如何保证数据的一致性,这对于大规模分布式...
分布式锁通常有三种实现方式:基于关系数据库的乐观锁、基于Redis的分布式锁以及基于ZooKeeper的分布式锁。 #### 1. 关系数据库乐观锁 基于关系数据库实现分布式锁主要利用数据库本身的事务机制和锁定机制。在高...