- 浏览: 28152 次
- 性别:
- 来自: 深圳
最新评论
博客分类: opensourcejavadistributed
参考地址
https://github.com/alibaba/canal/blob/5b56a027e59e46f13e9024bc2f24399981c29d7a/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZooKeeperx.java
转载这位大神的,做个记录,因为项目中用到zookeeper的分布式锁
.背景
继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。
这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制
算法
可以预先看一下zookeeper的官方文档:
•http://zookeeper.apache.org/doc/trunk/recipes.html
lock操作过程:
•首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
•每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
•进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
•按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
•如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
•将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出
其中的几个关键点:
1.node节点选择为EPHEMERAL_SEQUENTIAL很重要。
* 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
* 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
2.获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
注意:
•使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
•同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
没有两全其美的做法,两者取其一,选择自己一个能接受的即可
代码
Java代码
1.public class DistributedLock {
2.
3. private static final byte[] data = { 0x12, 0x34 };
4. private ZooKeeperx zookeeper = ZooKeeperClient.getInstance();
5. private final String root; //根节点路径
6. private String id;
7. private LockNode idName;
8. private String ownerId;
9. private String lastChildId;
10. private Throwable other = null;
11. private KeeperException exception = null;
12. private InterruptedException interrupt = null;
13.
14. public DistributedLock(String root) {
15. this.root = root;
16. ensureExists(root);
17. }
18.
19. /**
20. * 尝试获取锁操作,阻塞式可被中断
21. */
22. public void lock() throws InterruptedException, KeeperException {
23. // 可能初始化的时候就失败了
24. if (exception != null) {
25. throw exception;
26. }
27.
28. if (interrupt != null) {
29. throw interrupt;
30. }
31.
32. if (other != null) {
33. throw new NestableRuntimeException(other);
34. }
35.
36. if (isOwner()) {//锁重入
37. return;
38. }
39.
40. BooleanMutex mutex = new BooleanMutex();
41. acquireLock(mutex);
42. // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
43. try {
44. mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
45. // mutex.get();
46. } catch (TimeoutException e) {
47. if (!mutex.state()) {
48. lock();
49. }
50. }
51.
52. if (exception != null) {
53. throw exception;
54. }
55.
56. if (interrupt != null) {
57. throw interrupt;
58. }
59.
60. if (other != null) {
61. throw new NestableRuntimeException(other);
62. }
63. }
64.
65. /**
66. * 尝试获取锁对象, 不会阻塞
67. *
68. * @throws InterruptedException
69. * @throws KeeperException
70. */
71. public boolean tryLock() throws KeeperException {
72. // 可能初始化的时候就失败了
73. if (exception != null) {
74. throw exception;
75. }
76.
77. if (isOwner()) {//锁重入
78. return true;
79. }
80.
81. acquireLock(null);
82.
83. if (exception != null) {
84. throw exception;
85. }
86.
87. if (interrupt != null) {
88. Thread.currentThread().interrupt();
89. }
90.
91. if (other != null) {
92. throw new NestableRuntimeException(other);
93. }
94.
95. return isOwner();
96. }
97.
98. /**
99. * 释放锁对象
100. */
101. public void unlock() throws KeeperException {
102. if (id != null) {
103. try {
104. zookeeper.delete(root + "/" + id, -1);
105. } catch (InterruptedException e) {
106. Thread.currentThread().interrupt();
107. } catch (KeeperException.NoNodeException e) {
108. // do nothing
109. } finally {
110. id = null;
111. }
112. } else {
113. //do nothing
114. }
115. }
116.
117. private void ensureExists(final String path) {
118. try {
119. Stat stat = zookeeper.exists(path, false);
120. if (stat != null) {
121. return;
122. }
123.
124. zookeeper.create(path, data, CreateMode.PERSISTENT);
125. } catch (KeeperException e) {
126. exception = e;
127. } catch (InterruptedException e) {
128. Thread.currentThread().interrupt();
129. interrupt = e;
130. }
131. }
132.
133. /**
134. * 返回锁对象对应的path
135. */
136. public String getRoot() {
137. return root;
138. }
139.
140. /**
141. * 判断当前是不是锁的owner
142. */
143. public boolean isOwner() {
144. return id != null && ownerId != null && id.equals(ownerId);
145. }
146.
147. /**
148. * 返回当前的节点id
149. */
150. public String getId() {
151. return this.id;
152. }
153.
154. // ===================== helper method =============================
155.
156. /**
157. * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
158. */
159. private Boolean acquireLock(final BooleanMutex mutex) {
160. try {
161. do {
162. if (id == null) {//构建当前lock的唯一标识
163. long sessionId = zookeeper.getDelegate().getSessionId();
164. String prefix = "x-" + sessionId + "-";
165. //如果第一次,则创建一个节点
166. String path = zookeeper.create(root + "/" + prefix, data,
167. CreateMode.EPHEMERAL_SEQUENTIAL);
168. int index = path.lastIndexOf("/");
169. id = StringUtils.substring(path, index + 1);
170. idName = new LockNode(id);
171. }
172.
173. if (id != null) {
174. List<String> names = zookeeper.getChildren(root, false);
175. if (names.isEmpty()) {
176. id = null;//异常情况,重新创建一个
177. } else {
178. //对节点进行排序
179. SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
180. for (String name : names) {
181. sortedNames.add(new LockNode(name));
182. }
183.
184. if (sortedNames.contains(idName) == false) {
185. id = null;//清空为null,重新创建一个
186. continue;
187. }
188.
189. //将第一个节点做为ownerId
190. ownerId = sortedNames.first().getName();
191. if (mutex != null && isOwner()) {
192. mutex.set(true);//直接更新状态,返回
193. return true;
194. } else if (mutex == null) {
195. return isOwner();
196. }
197.
198. SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
199. if (!lessThanMe.isEmpty()) {
200. //关注一下排队在自己之前的最近的一个节点
201. LockNode lastChildName = lessThanMe.last();
202. lastChildId = lastChildName.getName();
203. //异步watcher处理
204. zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {
205.
206. public void asyncProcess(WatchedEvent event) {
207. acquireLock(mutex);
208. }
209.
210. });
211.
212. if (stat == null) {
213. acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
214. }
215. } else {
216. if (isOwner()) {
217. mutex.set(true);
218. } else {
219. id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
220. }
221. }
222. }
223. }
224. } while (id == null);
225. } catch (KeeperException e) {
226. exception = e;
227. if (mutex != null) {
228. mutex.set(true);
229. }
230. } catch (InterruptedException e) {
231. interrupt = e;
232. if (mutex != null) {
233. mutex.set(true);
234. }
235. } catch (Throwable e) {
236. other = e;
237. if (mutex != null) {
238. mutex.set(true);
239. }
240. }
241.
242. if (isOwner() && mutex != null) {
243. mutex.set(true);
244. }
245. return Boolean.FALSE;
246. }
247.}
public class DistributedLock {
private static final byte[] data = { 0x12, 0x34 };
private ZooKeeperx zookeeper = ZooKeeperClient.getInstance();
private final String root; //根节点路径
private String id;
private LockNode idName;
private String ownerId;
private String lastChildId;
private Throwable other = null;
private KeeperException exception = null;
private InterruptedException interrupt = null;
public DistributedLock(String root) {
this.root = root;
ensureExists(root);
}
/**
* 尝试获取锁操作,阻塞式可被中断
*/
public void lock() throws InterruptedException, KeeperException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}
if (interrupt != null) {
throw interrupt;
}
if (other != null) {
throw new NestableRuntimeException(other);
}
if (isOwner()) {//锁重入
return;
}
BooleanMutex mutex = new BooleanMutex();
acquireLock(mutex);
// 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
try {
mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
// mutex.get();
} catch (TimeoutException e) {
if (!mutex.state()) {
lock();
}
}
if (exception != null) {
throw exception;
}
if (interrupt != null) {
throw interrupt;
}
if (other != null) {
throw new NestableRuntimeException(other);
}
}
/**
* 尝试获取锁对象, 不会阻塞
*
* @throws InterruptedException
* @throws KeeperException
*/
public boolean tryLock() throws KeeperException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}
if (isOwner()) {//锁重入
return true;
}
acquireLock(null);
if (exception != null) {
throw exception;
}
if (interrupt != null) {
Thread.currentThread().interrupt();
}
if (other != null) {
throw new NestableRuntimeException(other);
}
return isOwner();
}
/**
* 释放锁对象
*/
public void unlock() throws KeeperException {
if (id != null) {
try {
zookeeper.delete(root + "/" + id, -1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} finally {
id = null;
}
} else {
//do nothing
}
}
private void ensureExists(final String path) {
try {
Stat stat = zookeeper.exists(path, false);
if (stat != null) {
return;
}
zookeeper.create(path, data, CreateMode.PERSISTENT);
} catch (KeeperException e) {
exception = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
interrupt = e;
}
}
/**
* 返回锁对象对应的path
*/
public String getRoot() {
return root;
}
/**
* 判断当前是不是锁的owner
*/
public boolean isOwner() {
return id != null && ownerId != null && id.equals(ownerId);
}
/**
* 返回当前的节点id
*/
public String getId() {
return this.id;
}
// ===================== helper method =============================
/**
* 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
*/
private Boolean acquireLock(final BooleanMutex mutex) {
try {
do {
if (id == null) {//构建当前lock的唯一标识
long sessionId = zookeeper.getDelegate().getSessionId();
String prefix = "x-" + sessionId + "-";
//如果第一次,则创建一个节点
String path = zookeeper.create(root + "/" + prefix, data,
CreateMode.EPHEMERAL_SEQUENTIAL);
int index = path.lastIndexOf("/");
id = StringUtils.substring(path, index + 1);
idName = new LockNode(id);
}
if (id != null) {
List<String> names = zookeeper.getChildren(root, false);
if (names.isEmpty()) {
id = null;//异常情况,重新创建一个
} else {
//对节点进行排序
SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
for (String name : names) {
sortedNames.add(new LockNode(name));
}
if (sortedNames.contains(idName) == false) {
id = null;//清空为null,重新创建一个
continue;
}
//将第一个节点做为ownerId
ownerId = sortedNames.first().getName();
if (mutex != null && isOwner()) {
mutex.set(true);//直接更新状态,返回
return true;
} else if (mutex == null) {
return isOwner();
}
SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
//关注一下排队在自己之前的最近的一个节点
LockNode lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
//异步watcher处理
zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {
public void asyncProcess(WatchedEvent event) {
acquireLock(mutex);
}
});
if (stat == null) {
acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
}
} else {
if (isOwner()) {
mutex.set(true);
} else {
id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
}
}
}
}
} while (id == null);
} catch (KeeperException e) {
exception = e;
if (mutex != null) {
mutex.set(true);
}
} catch (InterruptedException e) {
interrupt = e;
if (mutex != null) {
mutex.set(true);
}
} catch (Throwable e) {
other = e;
if (mutex != null) {
mutex.set(true);
}
}
if (isOwner() && mutex != null) {
mutex.set(true);
}
return Boolean.FALSE;
}
}相关说明:
测试代码:
Java代码
1.@Test
2. public void test_lock() {
3. ExecutorService exeucotr = Executors.newCachedThreadPool();
4. final int count = 50;
5. final CountDownLatch latch = new CountDownLatch(count);
6. final DistributedLock[] nodes = new DistributedLock[count];
7. for (int i = 0; i < count; i++) {
8. final DistributedLock node = new DistributedLock(dir);
9. nodes[i] = node;
10. exeucotr.submit(new Runnable() {
11.
12. public void run() {
13. try {
14. Thread.sleep(1000);
15. node.lock(); //获取锁
16. Thread.sleep(100 + RandomUtils.nextInt(100));
17.
18. System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
19. } catch (InterruptedException e) {
20. want.fail();
21. } catch (KeeperException e) {
22. want.fail();
23. } finally {
24. latch.countDown();
25. try {
26. node.unlock();
27. } catch (KeeperException e) {
28. want.fail();
29. }
30. }
31.
32. }
33. });
34. }
35.
36. try {
37. latch.await();
38. } catch (InterruptedException e) {
39. want.fail();
40. }
41.
42. exeucotr.shutdown();
43. }
@Test
public void test_lock() {
ExecutorService exeucotr = Executors.newCachedThreadPool();
final int count = 50;
final CountDownLatch latch = new CountDownLatch(count);
final DistributedLock[] nodes = new DistributedLock[count];
for (int i = 0; i < count; i++) {
final DistributedLock node = new DistributedLock(dir);
nodes[i] = node;
exeucotr.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
node.lock(); //获取锁
Thread.sleep(100 + RandomUtils.nextInt(100));
System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
} catch (InterruptedException e) {
want.fail();
} catch (KeeperException e) {
want.fail();
} finally {
latch.countDown();
try {
node.unlock();
} catch (KeeperException e) {
want.fail();
}
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
want.fail();
}
exeucotr.shutdown();
}
升级版
实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。
大致原理就是ReentrantLock 和 DistributedLock的一个结合。
• 单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
•拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。
代码:
Java代码
1.public class DistributedReentrantLock extends DistributedLock {
2.
3. private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
4. private ReentrantLock reentrantLock = new ReentrantLock();
5.
6. public DistributedReentrantLock(String root) {
7. super(root);
8. }
9.
10. public void lock() throws InterruptedException, KeeperException {
11. reentrantLock.lock();//多线程竞争时,先拿到第一层锁
12. super.lock();
13. }
14.
15. public boolean tryLock() throws KeeperException {
16. //多线程竞争时,先拿到第一层锁
17. return reentrantLock.tryLock() && super.tryLock();
18. }
19.
20. public void unlock() throws KeeperException {
21. super.unlock();
22. reentrantLock.unlock();//多线程竞争时,释放最外层锁
23. }
24.
25. @Override
26. public String getId() {
27. return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
28. }
29.
30. @Override
31. public boolean isOwner() {
32. return reentrantLock.isHeldByCurrentThread() && super.isOwner();
33. }
34.
35.}
public class DistributedReentrantLock extends DistributedLock {
private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
private ReentrantLock reentrantLock = new ReentrantLock();
public DistributedReentrantLock(String root) {
super(root);
}
public void lock() throws InterruptedException, KeeperException {
reentrantLock.lock();//多线程竞争时,先拿到第一层锁
super.lock();
}
public boolean tryLock() throws KeeperException {
//多线程竞争时,先拿到第一层锁
return reentrantLock.tryLock() && super.tryLock();
}
public void unlock() throws KeeperException {
super.unlock();
reentrantLock.unlock();//多线程竞争时,释放最外层锁
}
@Override
public String getId() {
return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
}
@Override
public boolean isOwner() {
return reentrantLock.isHeldByCurrentThread() && super.isOwner();
}
}
测试代码:
Java代码
1.@Test
2. public void test_lock() {
3. ExecutorService exeucotr = Executors.newCachedThreadPool();
4. final int count = 50;
5. final CountDownLatch latch = new CountDownLatch(count);
6.
7. final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
8. for (int i = 0; i < count; i++) {
9. exeucotr.submit(new Runnable() {
10.
11. public void run() {
12. try {
13. Thread.sleep(1000);
14. lock.lock();
15. Thread.sleep(100 + RandomUtils.nextInt(100));
16.
17. System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
18. } catch (InterruptedException e) {
19. want.fail();
20. } catch (KeeperException e) {
21. want.fail();
22. } finally {
23. latch.countDown();
24. try {
25. lock.unlock();
26. } catch (KeeperException e) {
27. want.fail();
28. }
29. }
30.
31. }
32. });
33. }
34.
35. try {
36. latch.await();
37. } catch (InterruptedException e) {
38. want.fail();
39. }
40.
41. exeucotr.shutdown();
42. }
@Test
public void test_lock() {
ExecutorService exeucotr = Executors.newCachedThreadPool();
final int count = 50;
final CountDownLatch latch = new CountDownLatch(count);
final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
for (int i = 0; i < count; i++) {
exeucotr.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
lock.lock();
Thread.sleep(100 + RandomUtils.nextInt(100));
System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
} catch (InterruptedException e) {
want.fail();
} catch (KeeperException e) {
want.fail();
} finally {
latch.countDown();
try {
lock.unlock();
} catch (KeeperException e) {
want.fail();
}
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
want.fail();
}
exeucotr.shutdown();
}最后
其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下
大致思路:
1.竞争资源标示: read_自增id , write_自增id
2.首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
3.watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)
参考地址
https://github.com/alibaba/canal/blob/5b56a027e59e46f13e9024bc2f24399981c29d7a/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZooKeeperx.java
转载这位大神的,做个记录,因为项目中用到zookeeper的分布式锁
.背景
继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。
这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制
算法
可以预先看一下zookeeper的官方文档:
•http://zookeeper.apache.org/doc/trunk/recipes.html
lock操作过程:
•首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
•每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
•进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
•按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
•如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
•将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出
其中的几个关键点:
1.node节点选择为EPHEMERAL_SEQUENTIAL很重要。
* 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
* 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
2.获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
注意:
•使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
•同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
没有两全其美的做法,两者取其一,选择自己一个能接受的即可
代码
Java代码
1.public class DistributedLock {
2.
3. private static final byte[] data = { 0x12, 0x34 };
4. private ZooKeeperx zookeeper = ZooKeeperClient.getInstance();
5. private final String root; //根节点路径
6. private String id;
7. private LockNode idName;
8. private String ownerId;
9. private String lastChildId;
10. private Throwable other = null;
11. private KeeperException exception = null;
12. private InterruptedException interrupt = null;
13.
14. public DistributedLock(String root) {
15. this.root = root;
16. ensureExists(root);
17. }
18.
19. /**
20. * 尝试获取锁操作,阻塞式可被中断
21. */
22. public void lock() throws InterruptedException, KeeperException {
23. // 可能初始化的时候就失败了
24. if (exception != null) {
25. throw exception;
26. }
27.
28. if (interrupt != null) {
29. throw interrupt;
30. }
31.
32. if (other != null) {
33. throw new NestableRuntimeException(other);
34. }
35.
36. if (isOwner()) {//锁重入
37. return;
38. }
39.
40. BooleanMutex mutex = new BooleanMutex();
41. acquireLock(mutex);
42. // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
43. try {
44. mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
45. // mutex.get();
46. } catch (TimeoutException e) {
47. if (!mutex.state()) {
48. lock();
49. }
50. }
51.
52. if (exception != null) {
53. throw exception;
54. }
55.
56. if (interrupt != null) {
57. throw interrupt;
58. }
59.
60. if (other != null) {
61. throw new NestableRuntimeException(other);
62. }
63. }
64.
65. /**
66. * 尝试获取锁对象, 不会阻塞
67. *
68. * @throws InterruptedException
69. * @throws KeeperException
70. */
71. public boolean tryLock() throws KeeperException {
72. // 可能初始化的时候就失败了
73. if (exception != null) {
74. throw exception;
75. }
76.
77. if (isOwner()) {//锁重入
78. return true;
79. }
80.
81. acquireLock(null);
82.
83. if (exception != null) {
84. throw exception;
85. }
86.
87. if (interrupt != null) {
88. Thread.currentThread().interrupt();
89. }
90.
91. if (other != null) {
92. throw new NestableRuntimeException(other);
93. }
94.
95. return isOwner();
96. }
97.
98. /**
99. * 释放锁对象
100. */
101. public void unlock() throws KeeperException {
102. if (id != null) {
103. try {
104. zookeeper.delete(root + "/" + id, -1);
105. } catch (InterruptedException e) {
106. Thread.currentThread().interrupt();
107. } catch (KeeperException.NoNodeException e) {
108. // do nothing
109. } finally {
110. id = null;
111. }
112. } else {
113. //do nothing
114. }
115. }
116.
117. private void ensureExists(final String path) {
118. try {
119. Stat stat = zookeeper.exists(path, false);
120. if (stat != null) {
121. return;
122. }
123.
124. zookeeper.create(path, data, CreateMode.PERSISTENT);
125. } catch (KeeperException e) {
126. exception = e;
127. } catch (InterruptedException e) {
128. Thread.currentThread().interrupt();
129. interrupt = e;
130. }
131. }
132.
133. /**
134. * 返回锁对象对应的path
135. */
136. public String getRoot() {
137. return root;
138. }
139.
140. /**
141. * 判断当前是不是锁的owner
142. */
143. public boolean isOwner() {
144. return id != null && ownerId != null && id.equals(ownerId);
145. }
146.
147. /**
148. * 返回当前的节点id
149. */
150. public String getId() {
151. return this.id;
152. }
153.
154. // ===================== helper method =============================
155.
156. /**
157. * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
158. */
159. private Boolean acquireLock(final BooleanMutex mutex) {
160. try {
161. do {
162. if (id == null) {//构建当前lock的唯一标识
163. long sessionId = zookeeper.getDelegate().getSessionId();
164. String prefix = "x-" + sessionId + "-";
165. //如果第一次,则创建一个节点
166. String path = zookeeper.create(root + "/" + prefix, data,
167. CreateMode.EPHEMERAL_SEQUENTIAL);
168. int index = path.lastIndexOf("/");
169. id = StringUtils.substring(path, index + 1);
170. idName = new LockNode(id);
171. }
172.
173. if (id != null) {
174. List<String> names = zookeeper.getChildren(root, false);
175. if (names.isEmpty()) {
176. id = null;//异常情况,重新创建一个
177. } else {
178. //对节点进行排序
179. SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
180. for (String name : names) {
181. sortedNames.add(new LockNode(name));
182. }
183.
184. if (sortedNames.contains(idName) == false) {
185. id = null;//清空为null,重新创建一个
186. continue;
187. }
188.
189. //将第一个节点做为ownerId
190. ownerId = sortedNames.first().getName();
191. if (mutex != null && isOwner()) {
192. mutex.set(true);//直接更新状态,返回
193. return true;
194. } else if (mutex == null) {
195. return isOwner();
196. }
197.
198. SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
199. if (!lessThanMe.isEmpty()) {
200. //关注一下排队在自己之前的最近的一个节点
201. LockNode lastChildName = lessThanMe.last();
202. lastChildId = lastChildName.getName();
203. //异步watcher处理
204. zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {
205.
206. public void asyncProcess(WatchedEvent event) {
207. acquireLock(mutex);
208. }
209.
210. });
211.
212. if (stat == null) {
213. acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
214. }
215. } else {
216. if (isOwner()) {
217. mutex.set(true);
218. } else {
219. id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
220. }
221. }
222. }
223. }
224. } while (id == null);
225. } catch (KeeperException e) {
226. exception = e;
227. if (mutex != null) {
228. mutex.set(true);
229. }
230. } catch (InterruptedException e) {
231. interrupt = e;
232. if (mutex != null) {
233. mutex.set(true);
234. }
235. } catch (Throwable e) {
236. other = e;
237. if (mutex != null) {
238. mutex.set(true);
239. }
240. }
241.
242. if (isOwner() && mutex != null) {
243. mutex.set(true);
244. }
245. return Boolean.FALSE;
246. }
247.}
public class DistributedLock {
private static final byte[] data = { 0x12, 0x34 };
private ZooKeeperx zookeeper = ZooKeeperClient.getInstance();
private final String root; //根节点路径
private String id;
private LockNode idName;
private String ownerId;
private String lastChildId;
private Throwable other = null;
private KeeperException exception = null;
private InterruptedException interrupt = null;
public DistributedLock(String root) {
this.root = root;
ensureExists(root);
}
/**
* 尝试获取锁操作,阻塞式可被中断
*/
public void lock() throws InterruptedException, KeeperException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}
if (interrupt != null) {
throw interrupt;
}
if (other != null) {
throw new NestableRuntimeException(other);
}
if (isOwner()) {//锁重入
return;
}
BooleanMutex mutex = new BooleanMutex();
acquireLock(mutex);
// 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
try {
mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
// mutex.get();
} catch (TimeoutException e) {
if (!mutex.state()) {
lock();
}
}
if (exception != null) {
throw exception;
}
if (interrupt != null) {
throw interrupt;
}
if (other != null) {
throw new NestableRuntimeException(other);
}
}
/**
* 尝试获取锁对象, 不会阻塞
*
* @throws InterruptedException
* @throws KeeperException
*/
public boolean tryLock() throws KeeperException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}
if (isOwner()) {//锁重入
return true;
}
acquireLock(null);
if (exception != null) {
throw exception;
}
if (interrupt != null) {
Thread.currentThread().interrupt();
}
if (other != null) {
throw new NestableRuntimeException(other);
}
return isOwner();
}
/**
* 释放锁对象
*/
public void unlock() throws KeeperException {
if (id != null) {
try {
zookeeper.delete(root + "/" + id, -1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} finally {
id = null;
}
} else {
//do nothing
}
}
private void ensureExists(final String path) {
try {
Stat stat = zookeeper.exists(path, false);
if (stat != null) {
return;
}
zookeeper.create(path, data, CreateMode.PERSISTENT);
} catch (KeeperException e) {
exception = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
interrupt = e;
}
}
/**
* 返回锁对象对应的path
*/
public String getRoot() {
return root;
}
/**
* 判断当前是不是锁的owner
*/
public boolean isOwner() {
return id != null && ownerId != null && id.equals(ownerId);
}
/**
* 返回当前的节点id
*/
public String getId() {
return this.id;
}
// ===================== helper method =============================
/**
* 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
*/
private Boolean acquireLock(final BooleanMutex mutex) {
try {
do {
if (id == null) {//构建当前lock的唯一标识
long sessionId = zookeeper.getDelegate().getSessionId();
String prefix = "x-" + sessionId + "-";
//如果第一次,则创建一个节点
String path = zookeeper.create(root + "/" + prefix, data,
CreateMode.EPHEMERAL_SEQUENTIAL);
int index = path.lastIndexOf("/");
id = StringUtils.substring(path, index + 1);
idName = new LockNode(id);
}
if (id != null) {
List<String> names = zookeeper.getChildren(root, false);
if (names.isEmpty()) {
id = null;//异常情况,重新创建一个
} else {
//对节点进行排序
SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
for (String name : names) {
sortedNames.add(new LockNode(name));
}
if (sortedNames.contains(idName) == false) {
id = null;//清空为null,重新创建一个
continue;
}
//将第一个节点做为ownerId
ownerId = sortedNames.first().getName();
if (mutex != null && isOwner()) {
mutex.set(true);//直接更新状态,返回
return true;
} else if (mutex == null) {
return isOwner();
}
SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
//关注一下排队在自己之前的最近的一个节点
LockNode lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
//异步watcher处理
zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {
public void asyncProcess(WatchedEvent event) {
acquireLock(mutex);
}
});
if (stat == null) {
acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
}
} else {
if (isOwner()) {
mutex.set(true);
} else {
id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
}
}
}
}
} while (id == null);
} catch (KeeperException e) {
exception = e;
if (mutex != null) {
mutex.set(true);
}
} catch (InterruptedException e) {
interrupt = e;
if (mutex != null) {
mutex.set(true);
}
} catch (Throwable e) {
other = e;
if (mutex != null) {
mutex.set(true);
}
}
if (isOwner() && mutex != null) {
mutex.set(true);
}
return Boolean.FALSE;
}
}相关说明:
测试代码:
Java代码
1.@Test
2. public void test_lock() {
3. ExecutorService exeucotr = Executors.newCachedThreadPool();
4. final int count = 50;
5. final CountDownLatch latch = new CountDownLatch(count);
6. final DistributedLock[] nodes = new DistributedLock[count];
7. for (int i = 0; i < count; i++) {
8. final DistributedLock node = new DistributedLock(dir);
9. nodes[i] = node;
10. exeucotr.submit(new Runnable() {
11.
12. public void run() {
13. try {
14. Thread.sleep(1000);
15. node.lock(); //获取锁
16. Thread.sleep(100 + RandomUtils.nextInt(100));
17.
18. System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
19. } catch (InterruptedException e) {
20. want.fail();
21. } catch (KeeperException e) {
22. want.fail();
23. } finally {
24. latch.countDown();
25. try {
26. node.unlock();
27. } catch (KeeperException e) {
28. want.fail();
29. }
30. }
31.
32. }
33. });
34. }
35.
36. try {
37. latch.await();
38. } catch (InterruptedException e) {
39. want.fail();
40. }
41.
42. exeucotr.shutdown();
43. }
@Test
public void test_lock() {
ExecutorService exeucotr = Executors.newCachedThreadPool();
final int count = 50;
final CountDownLatch latch = new CountDownLatch(count);
final DistributedLock[] nodes = new DistributedLock[count];
for (int i = 0; i < count; i++) {
final DistributedLock node = new DistributedLock(dir);
nodes[i] = node;
exeucotr.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
node.lock(); //获取锁
Thread.sleep(100 + RandomUtils.nextInt(100));
System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
} catch (InterruptedException e) {
want.fail();
} catch (KeeperException e) {
want.fail();
} finally {
latch.countDown();
try {
node.unlock();
} catch (KeeperException e) {
want.fail();
}
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
want.fail();
}
exeucotr.shutdown();
}
升级版
实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。
大致原理就是ReentrantLock 和 DistributedLock的一个结合。
• 单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
•拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。
代码:
Java代码
1.public class DistributedReentrantLock extends DistributedLock {
2.
3. private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
4. private ReentrantLock reentrantLock = new ReentrantLock();
5.
6. public DistributedReentrantLock(String root) {
7. super(root);
8. }
9.
10. public void lock() throws InterruptedException, KeeperException {
11. reentrantLock.lock();//多线程竞争时,先拿到第一层锁
12. super.lock();
13. }
14.
15. public boolean tryLock() throws KeeperException {
16. //多线程竞争时,先拿到第一层锁
17. return reentrantLock.tryLock() && super.tryLock();
18. }
19.
20. public void unlock() throws KeeperException {
21. super.unlock();
22. reentrantLock.unlock();//多线程竞争时,释放最外层锁
23. }
24.
25. @Override
26. public String getId() {
27. return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
28. }
29.
30. @Override
31. public boolean isOwner() {
32. return reentrantLock.isHeldByCurrentThread() && super.isOwner();
33. }
34.
35.}
public class DistributedReentrantLock extends DistributedLock {
private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
private ReentrantLock reentrantLock = new ReentrantLock();
public DistributedReentrantLock(String root) {
super(root);
}
public void lock() throws InterruptedException, KeeperException {
reentrantLock.lock();//多线程竞争时,先拿到第一层锁
super.lock();
}
public boolean tryLock() throws KeeperException {
//多线程竞争时,先拿到第一层锁
return reentrantLock.tryLock() && super.tryLock();
}
public void unlock() throws KeeperException {
super.unlock();
reentrantLock.unlock();//多线程竞争时,释放最外层锁
}
@Override
public String getId() {
return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
}
@Override
public boolean isOwner() {
return reentrantLock.isHeldByCurrentThread() && super.isOwner();
}
}
测试代码:
Java代码
1.@Test
2. public void test_lock() {
3. ExecutorService exeucotr = Executors.newCachedThreadPool();
4. final int count = 50;
5. final CountDownLatch latch = new CountDownLatch(count);
6.
7. final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
8. for (int i = 0; i < count; i++) {
9. exeucotr.submit(new Runnable() {
10.
11. public void run() {
12. try {
13. Thread.sleep(1000);
14. lock.lock();
15. Thread.sleep(100 + RandomUtils.nextInt(100));
16.
17. System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
18. } catch (InterruptedException e) {
19. want.fail();
20. } catch (KeeperException e) {
21. want.fail();
22. } finally {
23. latch.countDown();
24. try {
25. lock.unlock();
26. } catch (KeeperException e) {
27. want.fail();
28. }
29. }
30.
31. }
32. });
33. }
34.
35. try {
36. latch.await();
37. } catch (InterruptedException e) {
38. want.fail();
39. }
40.
41. exeucotr.shutdown();
42. }
@Test
public void test_lock() {
ExecutorService exeucotr = Executors.newCachedThreadPool();
final int count = 50;
final CountDownLatch latch = new CountDownLatch(count);
final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
for (int i = 0; i < count; i++) {
exeucotr.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
lock.lock();
Thread.sleep(100 + RandomUtils.nextInt(100));
System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
} catch (InterruptedException e) {
want.fail();
} catch (KeeperException e) {
want.fail();
} finally {
latch.countDown();
try {
lock.unlock();
} catch (KeeperException e) {
want.fail();
}
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
want.fail();
}
exeucotr.shutdown();
}最后
其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下
大致思路:
1.竞争资源标示: read_自增id , write_自增id
2.首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
3.watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)
相关推荐
**Zookeeper的分布式锁实现原理** 1. **节点创建与监视**: Zookeeper允许客户端创建临时节点,这些节点会在客户端断开连接时自动删除。分布式锁的实现通常会为每个请求创建一个临时顺序节点,按照创建的顺序形成一...
总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...
它提供了多种同步原语,如`Semaphore`、`Lock`和`Event`,帮助开发者实现分布式环境下的互斥访问、资源限制和条件等待。这些原语对于构建分布式算法和保证数据一致性至关重要。 此外,Zookeeper还提供了**组服务**...
在处理订单生成的场景中,我们可以这样应用ZooKeeper分布式锁: 1. 当用户发起订单请求时,服务端会尝试在ZooKeeper上创建一个临时顺序节点。 2. 如果创建成功,服务端会检查当前最小序号的节点是否是自己创建的。...
Zookeeper 是一个广泛使用的分布式协调服务,它可以用来实现高效的分布式锁,尤其适合那些对数据一致性要求较高的场景。 Zookeeper 的分布式锁实现依赖于其核心特性: 1. **节点的互斥性**:Zookeeper 允许客户端...
**分布式重入排他锁(Reentrant Lock)在Zookeeper中的实现** Zookeeper是一个开源的分布式协调服务,常用于分布式环境中的一致性问题,如配置管理、命名服务、分布式同步等。在分布式系统中,为了保证数据的一致性...
ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...
《彻底理解ZooKeeper分布式锁实现原理》 ZooKeeper,简称zk,作为一个高可用的分布式协调服务,常被用于构建分布式系统中的各种组件,如分布式锁。在本篇文章中,我们将深入探讨如何利用Curator这个流行的开源框架...
在本场景中,我们利用Zookeeper的临时顺序节点来实现一个分布式调度解决方案,并结合Quartz进行作业调度。Zookeeper是一个分布式协调服务,而Quartz则是一个强大的、开源的作业调度框架,两者结合能构建出高可用、可...
分布式锁的实现基于Zookeeper的临时节点和监视器机制。通常,每个想要获取锁的客户端会在特定的zNode下创建一个临时节点。如果创建成功,说明获取了锁;如果失败,说明已有其他客户端持有锁。客户端还可以设置监视器...
使用zookeeper来实现分布式锁 原理 监听zookeeper的临时有序节点,监听到NodeDeleted事件,就会让线程重新获取锁 测试方法 public class ZookeeperLockTest { public static void main(String[] args) throws ...
在java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾明思议就是...
以下是一个简单的基于Zookeeper的分布式锁实现示例: ```java @Service public class ZookeeperDistributedLock { @Autowired private CuratorFramework curatorFramework; private String lockPath = "/...
本项目基于SpringBoot框架,结合Redis、Zookeeper和RabbitMQ实现了分布式锁,让我们一起深入探讨这三个组件在分布式锁中的作用和实现机制。 **SpringBoot** 是一个轻量级的Java开发框架,它简化了新Spring应用的...
在Spring XD中使用ZooKeeper可以实现分布式环境下的协调,例如在集群中管理服务的分布和任务分配。 对于领导者选举,ZooKeeper提供了一种无需羊群效应(Herd Effect)的锁机制。这种锁的实现依赖于临时顺序节点的...
4. **zkLockTest**:这个文件很可能是实现Zookeeper分布式锁的测试代码,可能包含客户端的连接、锁的获取和释放、异常处理等逻辑。通过对这个测试代码的分析和运行,我们可以深入理解Zookeeper分布式锁的工作机制。 ...
【Zookeeper实现分布式锁】 Zookeeper 是 Apache ...总之,Zookeeper 的分布式锁机制是基于其强大的一致性特性和事件通知能力实现的,它在分布式系统中扮演着至关重要的角色,帮助协调和管理分布式环境下的并发操作。
本文将详细讲解如何使用Java与Apache ZooKeeper实现一个分布式锁的示例。 ZooKeeper是一个分布式协调服务,它提供了一种可靠的方式来管理和同步分布式系统的数据。在分布式锁的场景中,ZooKeeper可以作为一个中心化...
本项目“springboot redis zookeeperlock rabbit实现的分布式锁”结合了Spring Boot、Redis、Zookeeper以及RabbitMQ这四款强大的工具,旨在构建一个健壮的分布式锁系统。以下是关于这些技术及其在分布式锁中的应用的...
通过深入理解Zookeeper的工作原理以及ZookeeperNet库的使用,开发者可以有效地在C#环境中实现高可用的分布式锁,保障多节点之间的协同工作和数据一致性。在实际项目中,分布式锁可以广泛应用于数据库操作、并发任务...