队列结点
Node类型的waitStatus、prev、next 字段都用volatile 修饰,这样直接的读写操作就具有内存可视性。表示Node状态的waitStatus字段是个int类型,这样通过数值比较就可以判断Node的状态,而不需要很多的分支语句。
它的构造函数也是比较有意思的,有三个,分别用于构建同步队列的初始头结点或共享标识、构造同步队列的有效结点、构造条件队列的结点。也就是说,同步队列和条件队列的结点是相同的类型,所以可以从条件队列转移到同步队列去获取许可。
static final class Node {
// 表明节点是否以共享模式等待的标记
static final Node SHARED = new Node();
// 表明节点是否以独占模式等待的标记
static final Node EXCLUSIVE = null;
// 表明线程已被取消
static final int CANCELLED = 1;
// 表明后续节点的线程需要unparking
static final int SIGNAL = -1;
// 表明线程正在等待一个条件
static final int CONDITION = -2;
// 表明下一次acquireShared应该无条件传播
static final int PROPAGATE = -3;
/*
* 状态字段,只能取下面的值:
* SIGNAL(-1): 这个结点的后继是(或很快是)阻塞的(通过park),所以当前结点
* 必须unpark它的后继,当它释放或取消时。为了避免竞争,acquire方法必须
* 首先表明它们需要一个信号,然后再次尝试原子性acquire,如果失败了就阻塞。
*
* CANCELLED(1): 这个结点由于超时或中断已被取消。结点从不离开这种状态。尤其是,
* 这种状态的线程从不再次阻塞。
*
* CONDITION(-2): 这个结点当前在一个条件队列上。它将不会用于sync队列的结点,
* 直到被转移,在那时,结点的状态将被设为0.
* 这个值在这里的使用与其他字段的使用没有关系,仅仅是简化结构。
*
* PROPAGATE(-3): releaseShared应该传递给其他结点。这是在doReleaseShared里设置
* (仅仅是头结点)以确保传递继续,即使其他操作有干涉。
*
* 0: 非以上任何值。
*
* 值是组织为数字的用以简化使用。非负值表示结点不需要信号。这样,大部分代码不需要
* 检查特定的值,只需要(检查)符号。
*
* 对于普通同步结点,字段初始化为0;对于条件结点初始化为CONDITION(-2)。
* 通过CAS操作修改(或者,当允许时,用无条件volatile写。)
*/
volatile int waitStatus;
/*
* 连接到当前结点/线程依赖的用来检查等待状态的前驱结点。
* 在进入队列时赋值,只在出队列时置为空(为了GC考虑)。
* 根据前驱结点的取消,我们使查找一个非取消结点的while循环短路,这个总是会退出,
* 因为头结点从不会是取消了的:一个结点成为头只能是一次成功的acquire操作结果。
*
* 一个取消了的线程从不会在获取操作成功,线程只能取消自己,不能是其他结点。
*/
volatile Node prev;
/*
* 连接到当前结点/线程释放时解除阻塞的后续结点。
* 在入队列时赋值,在绕过已取消前驱节点时调整,出队列时置为空(for GC)。
* 入队操作不会给前驱结点的next字段赋值,直到附件后(把新节点赋值给队列的tail属性?),
* 所以看到next字段为空不一定表示它就是队列的尾结点。然而,如果next字段看起来是空,
* 我们可以从tail向前遍历进行双重检查。
* 被取消了的结点的next字段被设置为指向它自己而不是空,这让isOnSyncQueue变得容易。
*/
volatile Node next;
/*
* 列队在这个结点的线程,在构造时初始化,用完后置空。
*/
volatile Thread thread;
/*
* 连接到下一个在条件上等待的结点或是特殊的值SHARED。
* 因为条件队列只在独占模式下持有时访问,我们只需要一个简单的链表队列来持有在条件上等待的结点。
* 他们然后被转移到队列去re-acquire。
* 因为条件只能是独占的,我们通过用一个特殊的值来表明共享模式 来节省一个字段。
*/
Node nextWaiter;
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
添加结点到等待队列
对于添加结点到队列的操作最重要的是要保证:即使添加的CAS操作失败了,也不能影响队列结点现有的连接关系。
对于新结点,它在CAS之前指向它的预期前驱,CAS成功之后再更新预期前驱的后继指针。
在步骤1成功之后、步骤2完成之前,其他线程通过结点的 “next” 连接可能看到“尾结点”(即代码里的 pred)的 “next” 为空,但其实队列里已经加入新的结点,这也是为什么通过 “next” 连接遍历队列时碰到后继为空的,必须从原子地更新的 “tail” 结点向后遍历。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试enq的快速路径;失败后回退到完整的enq。
Node pred = tail;
if (pred != null) {
// 把新结点的前驱指向pred,必须在下面的CAS完成之前设置,
// 这样确保一旦CAS成功后,从tail向后遍历是ok的。
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 步骤 1
// 新节点成功进入队列
// 前驱结点的next字段指向新结点,建立完整的连接。
pred.next = node; // 步骤 2
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 队列是空,必须初始化。
if (compareAndSetHead(new Node())) // 原子地设置头结点
tail = head; // 尾结点也指向头结点
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 步骤 1
t.next = node; // 步骤 2
// 在把新结点设置为tail后才能更新前驱的next字段,这样,即使CAS失败了也不会影响原来的连接关系。
return t;
}
}
}
}
acquire操作
acquire方法不提供绝对公平的保证,因为现在在加入队列之前先进行tryAcquire操作,如果这个线程先于头结点锁定,那么头结点就只能继续等待了。这种情形称为闯入。
这个acquire之所以先尝试获取是为了在无竞争的情况下提高性能,并可以实现非公平的获取。如果要保证绝对的公平性,则可以在子类实现的tryAcquire方法里判断当前线程是否是头结点,是则尝试获取,不是则直接返回false。
// 以独占模式获取
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 首先尝试获取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 失败后加入等待队列,再从队列里再次尝试获取;成功获取后才返回,
// 返回的boolean表示线程是否曾经被中断。
// 在acquireQueued方法里,线程可能被反复park、unpark,直到获取锁。
selfInterrupt(); // 重新设置中断状态位,是否执行取决于acquireQueued的返回值
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 线程是否曾被中断是由这个变量记录的。
for (;;) { // 死循环,用于acquire失败后重试
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 前驱是头结点,继续尝试获取
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检测是否需要等待,如果需要,则park当前线程
// 只有前驱在等待时才进入等待,否则继续重试
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 线程进入等待,需要其他线程来唤醒这个线程以继续执行
interrupted = true; // 只要线程在等待过程中被中断过一次就会记录下来
}
} finally {
if (failed)
// acquire失败,取消acquire
cancelAcquire(node);
}
}
/*
* 这个方法是信号控制的核心。检查和更新没有成功获取的结点的状态。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱结点也在等待,说明这是一个稳定的等待状态。
return true ;
if (ws > 0) {
// 前驱结点已取消,向前遍历直到找到一个非取消结点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 把找到的结点的后继指向node,那么当前pred与node之间的已取消结点就不再被引用了,可以被垃圾回收。
pred.next = node;
} else {
// 前驱的状态必是 0 或 PROPAGATE之一。表明需要一个信号,但不park先。
// 调用者需要重试来确保它在park之前没法获取。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// park当前执行线程, 其他线程unpark这个线程后继续执行
LockSupport.park( this);
return Thread.interrupted();
}
结点取消acquire
当前结点CN取消acquire的处理逻辑是:首先获取到CN之前的最后一个非取消结点PN,if CN处于队列末尾 且 成功地把PN设为尾结点后,再把PN的后继置为空即可;如果前面的if语句失败说明CN后面还有结点SN,如果PN处于需要通知状态,那么需要把PN指向SN,否则唤醒CN的后继。
private void cancelAcquire(Node node) {
// 如果结点是null则忽略
if (node == null)
return;
node.thread = null;
// 跳过已取消的前驱
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext是需要移除的结点。如果不是,CAS将失败,这种情况下,
// 我们与其他的取消或信号竞争失败,不需要更多的操作。
// (也就是说,通过predNext来感知其他的取消或信号操作)
Node predNext = pred.next;
// 用无条件的写替代CAS操作,在这个原子操作后,其他的结点可以跳过这个结点。
// 在这之前,我们不受其他线程的干扰。
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
// 如果处于链尾,直接移除,再修复前驱的连接关系
compareAndSetNext(pred, predNext, null);
// 如果失败了,说明有其他线程的取消/信号操作移除了predNext
} else {
// 说明有node有后继。用前驱的next指针指向它,这样它会得到(signal)。
// 否则唤醒它来传播。
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// if条件成立,说明node有一个需要signal的前驱,让这个前驱指向node的后继,这样传播的链是完整的。
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); // 同上
} else {
// node没有需要signal的前驱,通知后继结点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
唤醒后继
private void unparkSuccessor(Node node) {
/*
* 如果status是负的(比如,可能需要信号)尝试清除预期的信号。
* 如果这失败了或status被其他等待线程修改也是没关系的。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 准备unpark的线程在后继里持有,一般就是下一个结点。
* 但如果被取消或是空,从tail向后遍历来找到实际的非取消后继。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 没有直接后继或直接后继不需要通知
s = null;
// 从tail向后遍历,查找需要通知的结点
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到一个后不跳出循环是为了找到最老的需要通知的结点。
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 结点不为null,唤醒后继的等待线程
LockSupport.unpark(s.thread);
}
释放独占的资源
此方法可以用来实现Lock.unlock()
方法。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true ;
}
return false;
}
acquireInterruptibly
acquire过程中检测中断则取消acquire。
中断并不是立即终止acquire过程,而是在线程恢复执行才能检测到中断才终止。线程总是会恢复执行,因为它的前驱会通知它。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 这个方法与acquireQueued基本一样,只不过是检测到中断时则抛出异常。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 先添加结点到等待队列,返回新结点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果前驱是头结点,尝试获取。这是为了尽快获取。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
带超时的获取:tryAcquireNanos
如果线程在指定的时间内加锁失败或在指定时间内被中断则终止加锁尝试。实现带超时的逻辑是:首先进行获取,成功则直接返回,失败则park指定的时间,到期后再次进行获取,
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || // 首先尝试获取,是否公平取决于子类的tryAcquire实现
doAcquireNanos(arg, nanosTimeout);// 第一次尝试失败,进入常规流程
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 处于对头,再次尝试获取
setHead(node);
p.next = null; // help GC
failed = false;
return true ;
}
if (nanosTimeout <= 0)// 超时了
return false ;
if (shouldParkAfterFailedAcquire(p, node) &&// 判断是否需要等待
nanosTimeout > spinForTimeoutThreshold) // 可用于acquire的剩余时间大于指定的阀值
// 这里之所以要大于一个给定的阀值才park线程,是因为park线程涉及上下文切换,
// 如果等待时间很短,那么上下文切换的代价会大于自旋的代价。
// 所以如果等待时间很短,线程进行自旋。
// if的两个条件都满足,进入等待。
LockSupport.parkNanos( this, nanosTimeout);
// 线程等待到期自动醒来,继续
long now = System.nanoTime();
nanosTimeout -= now - lastTime; // 计算新的剩余时间
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
// 这里不立即判断剩余时间是否<=0是为了在线程醒来后再次获取
}
} finally {
if (failed)
cancelAcquire(node);
}
}
到这里,已完整展示了等待队列的管理(添加结点、移除取消结点)、独占模式下的acquire操作、acquire中断取消、前驱如何通知后继、如何在指定时间内acquire。
相关推荐
《JUC并发编程与源码分析视频课》是一门深入探讨Java并发编程的课程,主要聚焦于Java Util Concurrency(JUC)库的使用和源码解析。JUC是Java平台提供的一组高级并发工具包,它极大地简化了多线程编程,并提供了更...
ReentrantLock Lock 加锁过程源码分析图,AQS 源码分析
Java并发编程是现代Java开发中的重要组成部分,Java并发 utilities(JUC)库是Java平台标准版(Java SE)的一部分,提供了强大的工具和类库来帮助开发者编写高效的多线程和并发程序。本教程将深入探讨JUC源码,旨在...
集合源码分析 高并发与多线程 Stargazers over time 线程 线程的创建和启动 线程的sleep、yield、join 线程的状态 代码在 部分。 synchronized关键字(悲观锁) synchronized(Object) 不能用String常量、Integer、Long...
juc并发编程脑图以及相关示例代码
【尚硅谷】大厂必备技术之JUC并发编程视频 配套资料,自己根据视频整理 pdf 课件,和代码 视频地址:...
Java 并发库(Java Util Concurrency, JUC)是 Java 平台中用于处理并发问题的核心工具包,提供了丰富的类和接口来简化并发编程。 在计算机系统中,进程是操作系统资源分配的基本单位,它拥有独立的内存空间,而...
1、根据尚硅谷JUC并发编程(对标阿里P6-P7)视频自己整理的pdf文档 2、包含源码 视频地址:https://www.bilibili.com/video/BV1ar4y1x727/?p=1&vd_source=c634d163b940964d44747b4c3976117b 参考资料:...
AQS源码分析一、锁的介绍1.1 乐观锁/悲观锁1.2 共享锁/独占锁1.3 公平锁/非公平锁1.4 小结二、AQS框架结构介绍2.1 类图2.2 AQS数据结构三、源码详解3.1 acquire源码详解3.2 release源码详解四、从ReentranLock看公平...
JUC(Java Util Concurrent),即Java的并发工具包,是Java提供的一套并发编程解决方案,它通过一系列接口和类简化了并发编程的复杂性。本笔记整理涉及了JUC的内存可见性、volatile关键字以及CAS算法和原子变量等多...
尚硅谷_JUC线程高级_源码、课件 ·1. 尚硅谷_JUC线程高级_volatile 关键字与内存可见性 ·2. 尚硅谷_JUC线程高级_原子变量与 CAS 算法 ·3. 尚硅谷_JUC线程高级_模拟 CAS 算法 ·4. 尚硅谷_JUC线程高级_同步容器类...
在`juc_atguigu`这个压缩包中,包含了上述组件的示例代码,通过实际运行和分析这些代码,你可以深入理解JUC的工作原理和使用场景。同时,这些示例也能帮助你学习如何在实际项目中有效地利用JUC来提高并发程序的性能...
JUC提供了一系列的原子变量类,如AtomicInteger、AtomicLong等,它们实现了在无同步的情况下进行原子操作,确保在多线程环境下数据的一致性。这些类通过CAS(Compare and Swap)算法实现,能够在高并发场景下高效地...
JUC(java.util.concurrent)是Java提供的一个并发编程工具包...要掌握JUC并发编程及其底层原理,除了通过阅读官方文档和源码学习外,还需要大量实践和经验积累,才能够真正理解和应用JUC中的并发工具来解决实际问题。
【狂神说JUC代码】是一系列专注于Java并发编程(JUC,Java Util Concurrency)的教程或笔记,旨在帮助开发者深入理解并掌握Java平台上的并发处理机制。JUC是Java标准库中的一组高级并发工具类,为多线程环境下的程序...
尚硅谷作为一家知名的IT在线教育平台,提供了丰富的Java编程课程资源,其中“尚硅谷Java视频_JUC视频教程”是针对Java并发编程的一个系列教程。该教程旨在帮助学习者掌握Java多线程编程的核心技术和高级特性,通过一...
JUC(Java Util Concurrent)是Java中用于并发编程的工具包,它提供了一系列用于多线程编程的类和接口,以帮助开发者实现高效的多线程程序。在并发编程中,我们需要理解几个核心概念,比如线程和进程,以及它们之间...
根据提供的文件信息,“尚硅谷JUC百度云连接”这一标题和描述主要指向的是尚硅谷教育机构所提供的关于Java并发编程(Java Util Concurrency,简称JUC)的学习资源,并且通过一个百度网盘链接来分享这些资源。...
这个压缩包文件“个人学习JUC代码笔记总集”显然是一个个人的学习资源,记录了对JUC组件的理解和应用实例,特别适合已经有一定Java基础,想要深入学习并发编程的开发者。 JUC的主要目标是简化并发编程,提高多线程...
"JUC底层讲解,面试可用" JUC是Java并发编程的API,java.util.concurrent包名的简写。JUC相关的包有三个:java.util.concurrent、java.util.concurrent.atomic、java.util.concurrent.locks。 并发编程相关概念: ...