private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//如果设置t.next = node;
//compareAndSetTail如果设置tail失败,则需要解除t的next关联,所以在compareAndSetTail设置成功后再设置t.next = node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
//分三种情况:
//1.如果node为tail,将pred设为tail,pred.next设为空
//2.如果node不为tail,且pred为signal时,不用unpark后续节点,
// 只需要设置pred.next = node.next,丢失中间所有cancell的节点
//3.pred为head节点,unpark后续节点
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
//导致unparkSuccessor只能从tail往前找waitStatus <= 0的节点
//从head节点往后找会出现闭环
node.next = node; // help GC
}
//比jdk1.5的实现要更加精确
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/*
1.如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获
得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行2。
2.如果前一个节点的等待状态waitStatus>0,也就是前一个节点
被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节
点的waitStatus<=0,进行4。否则进行3。
3.前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,
表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。
进行4。
4.返回false,表示线程不应该park()。
*/
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//如果nanosTimeout < spinForTimeoutThreshold
//自旋会比线程切换更有效率,否则才做park
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
//前任节点状态为cancelled,compareAndSetWaitStatus设置前任节点
//状态失败,表明前任节点此时已经cancelled,唤醒线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
分享到:
相关推荐
**Java并发系列之AbstractQueuedSynchronizer源码分析概要** **1. AbstractQueuedSynchronizer(AQS)的定义与作用** AbstractQueuedSynchronizer(AQS)是Java并发编程中的核心组件,它是一个抽象的、基于FIFO...
Java大神Doug Lea对AQS的解析:Most synchronizers (locks, barriers, etc.) in the J2SE1.5 java.util.concurrent package are constructed using a small framework based on class AbstractQueuedSynchronizer. ...
《ThreadPoolExecutor源码解析》 ThreadPoolExecutor是Java并发编程中重要的组件,它是ExecutorService接口的实现,用于管理和调度线程的执行。理解其源码有助于我们更好地控制并发环境下的任务执行,提高系统的...
本文将详细解析AQS中的关键方法以及其工作原理。 #### 锁的类图与架构 AQS采用模板方法模式,大多数与锁相关的操作都在`AbstractQueuedSynchronizer`类中完成。它提供了一个同步器的框架,其中包含了共享资源的状态...
"Java并发编程解析 | 解析AQS基础同步器的设计与实现" 在Java领域中,解决并发编程问题的关键是解决同步和互斥的问题。同步是指线程之间的通信和协作,互斥是指同一时刻只能允许一个线程访问共享资源。Java领域中有...
从源码中可以看到,unlock()方法调用了AbstractQueuedSynchronizer的release()方法,并传入参数1,表示要释放一个锁的数量。release()方法的源码如下: ```java public final boolean release(int arg) { if ...
《ReentrantLock深度解析》 在Java并发编程中,ReentrantLock是JDK提供的一个可重入互斥锁,它是java.util.concurrent.locks包下的核心类。与synchronized关键字相比,ReentrantLock提供了更高的灵活性,如尝试加锁...
1. AQS 的理解:AQS(AbstractQueuedSynchronizer)是 Java 中的一种同步器框架,提供了一种能够实现锁的机制,用于协调多个线程之间的访问。AQS 的核心思想是使用一个 volatile 变量来记录锁的状态,并使用 CAS ...
【标题】: "21 更高级的锁—深入解析Lock.pdf" 【描述】: 这份资料详细介绍了Java并发编程中的高级锁机制,特别是ReentrantLock的使用与原理。 【标签】: java, 并发, 编程, 宝典 在Java并发编程中,synchronized...
Java源码解析之可重入锁ReentrantLock ReentrantLock是一个可重入锁,在ConcurrentHashMap中使用了ReentrantLock。它是一个可重入的排他锁,它和synchronized的方法和代码有着相同的行为和语义,但有更多的功能。 ...
compareAndSetState()方法是AQS(AbstractQueuedSynchronizer)类中的一个方法,用于比较并设置当前状态的值。该方法的实现可以分为以下两步: * 首先,通过unsafe.compareAndSwapInt()方法比较当前状态的值expect...
《Chinese_AQS:为中国程序员解析AbstractQueuedSynchronizer》 在Java并发编程领域,AbstractQueuedSynchronizer(AQS)是一个至关重要的组件,它为实现锁和其他同步构建块提供了基础。然而,由于其复杂性和原始...
本文将基于JDK源码解析Java领域中的并发锁,探讨AQS基础同步器、LockSupport、Condition接口、Lock接口、ReadWriteLock接口以及自定义API操作的设计与实现。 一、AQS(AbstractQueuedSynchronizer)基础同步器的...
Semaphore的核心实现基于Java并发库中的AbstractQueuedSynchronizer(AQS)类,它是一个抽象类,提供了线程同步的基本框架。Semaphore类有两个内部静态类——NonfairSync和FairSync,分别对应非公平锁和公平锁。在...
CountDownLatch内部使用了AQS(AbstractQueuedSynchronizer)来实现同步控制。AQS维护了一个整型状态字段,对于CountDownLatch,这个状态就是计数器的值。`countDown()`方法相当于原子性地对状态进行减1操作,而`...
基于JDK源码解析Java并发锁,我们需要关注以下几个关键知识点: 1. **AQS(AbstractQueuedSynchronizer)基础同步器**: AQS是一个用于构建锁和同步器的框架,它维护了一个FIFO的等待队列,提供了两种模式:独占和...
下面我们将详细解析CountDownLatch源码之await()方法的原理。 首先,我们来看await()方法的源码: public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 可以看到,await...
AbstractQueuedSynchronizer(AQS)详解.mp4 使用AQS重写自己的锁.mp4 重入锁原理与演示.mp4 读写锁认识与原理.mp4 细读ReentrantReadWriteLock源码.mp4 ReentrantReadWriteLock锁降级详解.mp4 线程安全性问题简单总结...
本篇文章将深入解析ReentrantLock的源码,重点讨论非公平锁和公平锁的获取过程。 1. **ReentrantLock的基本概念** ReentrantLock是由Java提供的可重入互斥锁,支持公平锁和非公平锁两种模式。非公平锁的特性是获取...
ReentrantLock的核心实现基于AbstractQueuedSynchronizer(AQS),这是一个抽象的队列式同步器。AQS维护了一个FIFO的等待队列,用于存储等待锁的线程。每个节点表示一个线程,节点间通过状态字段进行同步。 1.1 AQS...