简介
在java同步锁中,除了synchronized关键字之外,还有很多更灵活的显式锁可供选择,比如ReentrantLock,ReentrantReadWriteLock等。甚至如果jdk中的锁不能满足你的需求,你还可以自己来定义自己的显式锁。显式锁都是实现Lock接口,需要自己实现比如lock(),unlock(),trylock()等方法。而lock接口方法的实现基本上都是基于AbstractOwnableSynchronizer(AQS)抽象类。可以说AQS是java的锁中最重要也最基础的类之一。
AQS是一个半成品的抽象类。它封装好了线程如何等待锁和如何释放锁的规则。比如多个线程竞争锁,没有获得锁的线程将放到FIFO中排队,并且以自旋的方式去尝试获得锁。但是一个线程什么情况下获得锁和释放锁,需要自己去定义。简单的说AQS帮你实现了框架上的规则,但是框架下面的更具体的规则需要自己来实现。举个不恰当的例子——一个吃饭的游戏:一群人吃饭,但是只有一个饭桶,只有获得饭桶的人才能从饭吃桶里吃到饭,每次只有一个人能获得饭桶,没有抢到饭桶的人就去排队。AQS实现了这样一个规则:如果没有人获得饭桶,那么大家一起抢饭桶。谁先抢到谁就获得这个饭桶。其他没有获得饭桶的人就去排队,后面参加进来的人就排在队伍末尾。但是大家并不是老老实实呆在队伍里不动,也不是等着别人把饭桶让给你。而是队伍里的每个人都在关注着饭桶是否被前面得到的人释放了,并一遍一遍的去尝试着抢饭桶,直到抢到饭桶(获得锁)或者自己累了(异常退出)退出抢饭桶的行列,或者他妈喊它回家吃饭了(中断或取消)。这个抢饭桶规则就是AQS给你实现的规则。但是什么情况下被认定为抢到饭桶,这个规则需要你来补充,比如饭桶释放之后传给队列里排第一个的人,或者传给队列里排队最久人,获得传给队列里年龄最小的人等等。这也是就AQS获得锁的过程。如果你得到了饭桶,吃饱了饭就要释放饭桶,让给其他人,这就是释放锁的过程。下面具体讲讲AQS具体的原理。
队列里的节点定义
没有抢到锁的线程将会放到一个队列里面。这个队列是给双向队列,所以节点node的属性包含前驱节点和后记节点,以及排队的线程,同时还包含一些判断获得释放锁需要的元素。下表示Node的所有属性
属性 |
描述 |
Node prev |
前驱节点,比如某些锁中判断当前节点的线程能否获得锁的一个条件是
前一个节点是头节点。
|
Node next |
后继节点。 |
Thread thread |
竞争锁的线程。 |
Node nextWaiter |
存储condition队列中的后继节点。
1.SHARED:表示该节点线程处于共享模式等待。
2.EXCLUSIVE:表示该节点线程处于排他模式等待。
|
int waitStatus |
表示节点的状态。其中包含的状态有:
CANCELLED,值为1,表示当前的线程被取消;
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
值为0,表示当前节点在sync队列中,等待着获取锁。
|
AQS的属性及描述
属性名 |
属性描述 |
Node head |
FIFO队列头节点 |
Node tail |
FIFO队列尾节点 |
volatile int state |
状态。该属性是AQS管理锁的最重要的属性。可以用它来标记当前锁的状态,也可以充当线程重入的计数器。同时在读写锁中,他的高16位和低16位都可以表示不同的含义。因此对这个字段的灵活应用是定制自定义锁的关键。通过CAS的方法来实现对state的同步操作,进而实现锁的获得与释放。 |
Thread exclusiveOwnerThread |
当前拥有排他锁的线程。该字段一般和state联合起来使用,以确定锁的状态。一般state=0时exclusiveOwnerThread为null,表示没有线程获得锁。同时state不等于0时exclusiveOwnerThread也可为null,比如读写锁的读锁。 |
排他模式锁获得锁的方法
上面抢饭桶的例子中只有1个人能获得饭桶,这其实是一种排他锁。前面介绍了AQS提供了半成品的获得锁的方法:一是它已经定义好的一套规则框架,比如轮询,排队等等。二是需要自定义的具体规则,比如公平与非公平等。
排他模式获得锁的实现方法
对应的方法如下:
1.protected boolean tryAcquire(int arg)。这个接口是需要自己实现的具体锁规则。比如ReentrantLock的tryAcquire的规则如下:
final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock定义的规则如下:如果state=0,那么通过compareAndSetState(0, acquires)方法先修改state,如果compareAndSetState的返回值为TRUE,说明state修改成功并且成功获得了锁。这时候其他线程调用compareAndSetState(0, acquires)一定返回false,state也不会修改。然后把exclusiveOwnerThread设置为当前线程,表示当前线程独占了锁。ReentrantLock是线程可重入,所以虽然state不等于0,但是exclusiveOwnerThread等于当前线程,那么也应该获得锁,这时候state充当计数器,state=c + acquires。因为执行这个操作的是同一个线程,所以不需要CAS进行同步。
2.public final void acquire(int arg)。这个方法就是AQS帮你实现了的规则。代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这段代码先调用自定义的tryAcquire(arg)方法。如果该方法返回为true,那么该方法执行并退出。表示获得锁。如果tryAcquire返回false,那么执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。首先先执行addWaiter(Node.EXCLUSIVE), arg)。这个方法是往把当前线程封装成一个node,然后挂在队列末尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter(Node mode)方法也是通过compareAndSetTail方法来实现队列同步的。addWaiter再性能上做了优化。在大部分情况下队列都不会为空,所以先按照队列不为空的情况进行处理把当前node挂到队列末尾。如果当前队列为空或者执行compareAndSetTail同步失败,那么在执行完整的把当前节点挂到队列末尾的的方法enq(node)。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq(node)方法进行自旋,直到node成功挂到了队列的末尾,期间还处理了队列为空时需要创建头节点的情况。
addWaiter方法执行完并创建node完成之后,那么再执行acquireQueued(final Node node, int arg)方法:
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
该方法通过自旋方式去获得锁,要么获得锁返回,要么被中断退出,要么继续自旋等待锁。首先判断当前节点的前驱节点是不是头节点,如果是,则执行自定义的tryAcquire(arg)。如果返回为true,则把当前节点设为头节点并返回。如果p == head && tryAcquire(arg)为false,则执行中断检查shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()。方法shouldParkAfterFailedAcquire(p, node) 是检查并更新没有获得锁的node的状态并返回改节点的线程是否应该阻塞。原代码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法实现以下逻辑。首先如果前一个节点的状态为SIGNAL,那么表示当前节点的线程可以安全地去获得许可;如果一个节点的状态大于0(大于0的只有一个值1,即取消了),然后往head方向找到一个waitStatus小于等于0的node A,并把当前节点的前驱节点设置为node A,node A的后继节点设置为当前node。即把当前node往head方向连续的的cancel节点摘掉,然后返回false,即当前线程不阻塞;如果前驱节点的状态等于0或-3(-2是共享模式才用的状态),表示当前线程需要一个signal触发当前线程去获得许可,但是不是在这次自旋中去获得许可。所以需要把前驱节点的状态设置为signal,以便下次自旋的时候去获得许可,同时返回false。如果shouldParkAfterFailedAcquire返回true,即当前线程能够安全的去获得许可,那么执行parkAndCheckInterrupt()方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
该方法执行LockSupport.park(this);取获取许可,如果获取不到则一直等待,直到获得一个许可或中断,然后返回Thread.interrupted(),Thread.interrupted()方法会返回线程的中断状态并reset中断状态。
至此AQS定义的一个完成的获得锁的规则结束了。现在总结下如何AQS是如何实现获得锁的:首先需要自己实现tryAcquire方法,然后acquire方法会调用tryAcquire方法去获得锁,如果得不到则为当前线程创建一个node,加到队列末尾。然后acquireQueued方法自旋,直到当前线程获得锁或中断退出。在自旋过程中,当前线程会处于阻塞状态,直到被唤醒获得锁。
排他模式释放锁的实现方法
某人获得饭桶后,他吃饱了饭,就需要释放饭桶让给其他人,如果一直抱着饭桶不放,就造成了死锁。现在再来看看释放锁的原理。
1.boolean tryRelease(int arg),这是需要自己实现的释放锁的规则,同样以ReentrantLock为例子看看它是怎么实现的:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果当前线程并没有获得锁调用该方法会抛出IllegalMonitorStateException异常,另外ReentrantLock是可重入的,如果state还充当了计数器的作用,只有当state等于0时才释放锁。
2.public final boolean release(int arg),这个方法是AQS定义好的释放锁规则。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果当前线程释放锁,并且头结点存在且状态不为0,那么需要对头结点的后继节点进行唤醒操作,调用方法unparkSuccessor():
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
如果头结点状态小于0,则先清除头结点状态,置为0,那么唤醒头结点的下一个节点。如果该节点不存在,或者改节点状态是取消状态,那么从队列末尾开始找,找到最后一个状态没有取消的节点,然后调用LockSupport.unpark(s.thread)方法释放许可,唤醒该节点的线程。为什么只对头结点的后继节点释放许可呢?因为每次只有一个线程获得锁,前面在获得锁acquireQueued方法中说了,只有当前节点的前驱节点是头节点才能获得锁,所以只需要唤醒头结点的线程就可以了。
共享模式锁
前面举的吃饭的例子中,只有一个人能够获得饭桶吃饭,但是如果这个饭桶足够大,可以允许多个人同时获得饭桶。那么上面讲的获得锁和释放锁就满足不了,因为前面讲的是独占锁。下面来讲讲共享锁,即多个线程获得同一个锁。共享锁的一个典型的例子是读锁。
共享模式获得锁的实现方法
1.protected int tryAcquireShared(int arg),该方法是共享锁需要自己实现的接口,以读写锁的读锁例子举例:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (!readerShouldBlock(current) &&
compareAndSetState(c, c + SHARED_UNIT)) {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
rh.count++;
return 1;
}
return fullTryAcquireShared(current);
}
关于读写锁,请参见并发编程之读写锁ReentrantReadWriteLock实现。这里不再赘述。
2.public final void acquireShared(int arg) 。这个方法是AQS实现的共享锁规则:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
如果当前线程没有获得共享锁,则调用doAcquireShared(arg)方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
该方法的主要功能是先创建一个共享模式的node,加到队列的末尾,然后判断当前节点的前驱节点是否是头节点,是的话则调用tryAcquireShared方法。如果tryAcquireShared的返回值大于等于0,则表示获得了共享锁,然后调用setHeadAndPropagate方法,setHeadAndPropagate方法先把当前节点设为为头节点,调用doReleaseShared方法。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
doReleaseShared方法实现的功能如下:遍历队列中的所有节点,如果节点状态为signal,把改siganl状态置为0,并调用unparkSuccessor(h);方法把该节点的后继节点线程唤醒;如果该节点状态为0,则把状态设置为PROPAGATE。
共享模式释放锁的实现方法
1.protected final boolean tryReleaseShared(int unused)。该方法是需要自己实现的共享模式释放锁方法。下面是读写锁的读锁的实现例子:
protected final boolean tryReleaseShared(int unused) {
HoldCounter rh = cachedHoldCounter;
Thread current = Thread.currentThread();
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
if (rh.tryDecrement() <= 0)
throw new IllegalMonitorStateException();
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
关于读写锁,请参见并发编程之读写锁ReentrantReadWriteLock实现。这里不再赘述。
2.releaseShared(int arg)。该方法是AQS实现的方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
调用自定义的tryReleaseShared方法成功,则调用doReleaseShared()。
其他获得锁、释放锁方法
除了前面介绍几个获得独占锁和排他锁的基本接口之外,AQS还提供了可中断接口acquireInterruptibly(int arg)、acquireSharedInterruptibly(int arg)和超时的接口tryAcquireNanos(int arg, long nanosTimeout)、doAcquireNanos(int arg, long nanosTimeout)、tryAcquireSharedNanos(int arg, long nanosTimeout)、doAcquireSharedNanos(int arg, long nanosTimeout)。这些中断接口和超时基本是只是在前面的基本方法中增加了中断处理逻辑和超时判断逻辑。这里不在详细描述。
分享到:
相关推荐
在Java并发编程中,`AbstractQueuedSynchronizer`(AQS)是一个重要的抽象类,用于构建锁和其他同步组件。AQS的核心是通过一个整型变量`state`来表示同步状态,并利用双端队列(FIFO)管理等待的线程。在本篇中,我们...
龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...
线程之间通信之join应用与实现原理剖析.mp4 ThreadLocal 使用及实现原理.mp4 并发工具类CountDownLatch详解.mp4 并发工具类CyclicBarrier 详解.mp4 并发工具类Semaphore详解.mp4 并发工具类Exchanger详解.mp4 ...
9. **线程池的原理与实践**:深入剖析ThreadPoolExecutor的工作原理,包括线程池的配置参数、工作流程、拒绝策略等,并给出优化线程池使用的建议。 10. **并发编程的最佳实践**:提供编写并发程序的一般原则和技巧...
java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...
**Java并发系列之AbstractQueuedSynchronizer源码分析概要** **1. AbstractQueuedSynchronizer(AQS)的定义与作用** AbstractQueuedSynchronizer(AQS)是Java并发编程中的核心组件,它是一个抽象的、基于FIFO...
5. **原子操作与CAS**:`06-并发编程之CAS&Atomic原子操作详解-fox`介绍了CAS(Compare and Swap)和Java的Atomic类。CAS是一种无锁算法,通过硬件指令比较并交换内存位置的值来实现线程安全的操作。Java的Atomic类...
AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁和同步器的基础框架。AQS主要依赖于一个FIFO(先进先出)的双端队列来...
《Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)》 AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁...
Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....
Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....
Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....
Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....
Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....
《JAVA并发编程与高并发解决方案-并发编程四之J.U.C之AQS》是一篇详细介绍Java实用并发工具包(Java Util Concurrency,简称J.U.C.)中重要组成部分——AbstractQueuedSynchronizer(简称AQS)的文章。AQS是Java并发...
Java并发编程之美_部分71 本节主要讲解了Java并发包中线程同步器原理剖析,具体来说是 CountDownLatch 和 CyclicBarrier 的使用和原理剖析。 一、CountDownLatch CountDownLatch 是一个同步工具,它可以让一个...
Java并发编程实战 本书深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险、构造线程安全的类及...
第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...
第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...