毫不为过的说,AbstractQueuedSynchronizer(以下简称AQS)是java.util.concurrent包下同步器类的灵魂组件,很多同步组件都是基于它实现的,比如CountDownLatch、CyclicBarrier、ReentrantLock、ReentrantReadWriteLock和ConcurrentHashMap等。
我们不要一头栽进AQS的代码里直接看,因为是DougLea大神写的,直接看会有点晕。我们来回顾一下我们用过的同步工具类,其中通用的核心操作有如下几种方式:
场景1:阻塞直到获取指定资源数
场景2:可中断限时等待直到获取指定资源数
场景3:直接尝试获取指定资源数
场景4:释放指定资源数
上述四个步骤又都可以分为共享(share)操作和独占(exclusive)操作两种,所以通用的就8种场景,如果AQS设计的足够好,则所有的容器类只需要控制资源数目、获取的资源量和释放的资源量即可,我们先从简单的特性说起。
我们先看下图(独占和共享的方法调用):
图中分为左右两个大矩形框,左边代表的独占访问的操作,右边代表着共享访问的操作,其中的小矩形框都代表的AQS里面的方法。其中,蓝色的小矩形框代表着AQS中默认已经实现了的方法,而红色小圆角矩形框代表着需要你自己去实现并覆盖的方法。箭头表示方法的调用次序。从方法的名称上,你就可以大概明白方法的作用,acquire用来表示是获取资源数的操作,而release表示用来释放资源数的操作,不带Shared表示是独占的操作。如果我们没有实现红色圆角矩形框的方法却间接调用了,将会抛出著名的UnsupportedOperationException异常。
在了解AQS之前,我们先来聊聊CLH(Craig, Landin, and Hagersten)锁,简单的说,它使用队列的方式来解决n个线程来争夺m把锁的问题,每当一个新的线程需要获取锁,为其创建一个节点并放到队尾,如果该线程是队列中的第一个节点,则节点的locked设置成false,如果它不是队列的第一个节点,则它的节点的prev指向原来的队尾节点,并不断自旋查看prev指向节点的locked属性,如果该值变为false,表示轮到它来尝试获取锁了,如果获取成功并最终用完释放后,则将自己的locked设置成false,如果获取失败,locked值不变,还是true,并不断尝试获取锁。MSC也是可扩展、高性能的自旋锁,它和CLH不同的是,它是对自己节点的locked属性进行自旋,这意味着prev节点释放锁后,需要去主动改变它的后继next节点的locked的状态。对比可以看出,CLH用的是隐式的队列,因为节点不需要关心它的prev节点是谁,关心的只是prev节点的locked属性,而MCS需要主动去通知next节点的locked属性,所以它的本质确实是队列。
从这里看,CLH和MCS线程争用锁似乎是公平的,先来的线程肯定会先获得锁,其实不然,比如,新来一个线程需要5把锁,但现在只有2把,于是该线程得等,但后面又来了一个只需要2把锁的线程,那有人立马抢答,因为是公平的,所以这个需要2把锁的线程也得等着,这就错了,其实如果有2把锁剩余,这个线程根本不会进入等待队列中,而是直接拿着走人了!但我们可以这样说,对于进入队列来等待锁的线程,是公平的,先来先得!
具体的CLH和MCS细节可以参看网文:http://www.bubuko.com/infodetail-638815.html 和http://coderbee.net/index.php/concurrent/20131115/577/comment-page-1
AQS参考了CLH锁的设计,但AQS没有采用CLH中的自旋来查看前驱(prev)节点的状态,因为在多核处理器时代,对volatile变量的自旋有可能是高代价的,AQS提供了诸如共享锁、独占锁的获取和释放的基本语义,既然AQS的名称中就有队列关键字Queued,可见队列是其使用的核心数据结构之一。接下来我们从源码角度看AQS是如何通过CLH锁的思想来实现上述语义的。队列有多种实现方式,比如我们维护一个head和tail的变量,用来指向队列的头节点和尾节点,再维护一个size的变量,用来记录队列的当前节点个数,当然还得维护一个表明队列最大容量的变量capacity。有了上述的“四件套”,则找一个存储队列节点的容器就行了,这个存储容器可以是数组(比如ArrayBlockingQueue)、链表(比如LinkedBlockingQueue)甚至是哈希表(比如FlumeEventQueue)。我们可以看到,在AQS中,队列节点是放在一个双向链表结构中的,我们立马看下AQS的静态内部类Node类有哪些主要属性:
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** SIGNAL,CANCELLED,CONDITION,PROPAGATE,0 */
volatile int waitStatus;
volatile Node prev;
volatile Node next;
/** Node上绑定的线程,由构造函数传入,用完后需要set null */
volatile Thread thread;
/** 因为条件只能是独占的,所以nextWaiter指向的是下一个等待该条件的Node。当然,在共享模式中,nextWaiter会被设置成一个特殊值:SHARED*/
Node nextWaiter;
/** 通过nextWaiter的值来判断该Node是处于独占模式还是共享模式 */
final boolean isShared() {
return nextWaiter == SHARED;
}
我们看Node的非静态属性,因为是双向链表,所以有prev和next,还有一个thread,用来记录该节点对应的线程,还有一个表示该节点状态的waitStatus,它有四种状态:
SIGNAL,值为-1,表示当前节点的next节点需要获取资源数,也就是需要unpark
CANCELLED,值为1,表示当前的线程被取消
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够执行
0,新建的Node的状态都是0,表示初始状态
我们再来看看在AQS中资源数(或者锁)的个数是放在哪个变量中的,没错,就是state:
/**
* The synchronization state.
*/
private volatile int state;
注意,这里没有直接使用AtomicInteger,而是使用了保证可见性却不保证一致性的volatile关键字,这是为了减少对其他并发类AtomicInteger的依赖,所以在AQS中更新state字段和AtomicInteger中的语意一样,使用的都是unsafe.compareAndSwapInt。
我们先看看资源获取。先看看需要我们自己来实现的tryAcquire和tryAcquireShared的操作,tryAcquire通过返回一个boolean值来告诉调用者获取资源数是否成功,而tryAcquireShared则返回的是个int型,小于0表示获取失败,等于0表示获取成功但后续还有其他获取将失败,大于0也表示获取成功但后续的获取有可能成功。
我们先来看看上面提到的共享下的场景1:阻塞直到获取指定资源数。从上面的独占和共享的方法调用图中可以看出,对应的是acquireShared方法,看了代码就知道acquireSharedInterruptibly和acquireShared的区别就是acquireSharedInterruptibly在操作之前会先去检查当前线程的中断标记,如果已经被中断了,则直接抛出InterruptedException,所以acquireSharedInterruptibly就比acquireShared多了如下两行代码:
if (Thread.interrupted())
throw new InterruptedException();
所以,如果能看懂acquireShared,acquireSharedInterruptibly自然就不在话下。这里我们先来看下acquireShared的内部实现:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
前面已经说过,tryAcquireShared就是我们需要自己实现的方法之一,如果返回值小于0,表示获取资源数失败,从上面的代码可以看出,如果获取成功(返回值大于或等于0),该方法就直接返回了,如果获取失败(返回值小于0),则直接调用doAcquireShared方法。我们试想一下,如果当前线程调用了获取资源数的阻塞方法,如果当前没有足够的资源数,那它会做什么?对!它会一直等,因为acquireShared既不会接收中断信号,也没有超时时间,他能做的事情就是“等待机会”。我们来看看doAcquireShared做了哪些事情,AQS中有两个Node类型的属性,用来指向队列的头和尾:
private transient volatile Node head;
private transient volatile Node tail;
每个由于获取资源数而被阻塞的线程都会“化身”成一个Node节点而被添加到这head和tail之间的队列中,这也是Node类中有个Thread类型的成员属性thread的原因。为了便于理解,我们假设当前被阻塞的线程不止一个,也就是说该队列会有多个Node节点。就在这时,又一个线程threand-n(这里的n表示队列中已经被阻塞了n-1个线程了,也就是已经有个n-1个Node节点了)来通过调用acquireShared来以阻塞获取资源数,此时我们自定义实现的tryAcquireShared方法的返回的值小于0(因为现在真的没有可用的资源数了,其他线程还未释放),所以就进入到了doAcquireShared方法中。
doAcquireShared方法中会包含如下几个大体步骤:
步骤1:创建一个Node变量node-n,将node-n.thread指向线程thread-n,node-n.nextWaiter指向SHARED(上面讲Node类的属性时提到的SHARED),node-n.waitStatus值为0。
步骤2:不断进行CAS(unsafe.compareAndSwapObject)操作,直到成功将node-n添加到队尾(如果head=tail,则会先创建一个Node作为头节点,使head和tail同时指向该节点)即tail指向node-n。此时状态是原来的尾节点node-(n-1).next指向node-n,而node-n.prev指向node-(n-1),并且tail指向新的尾节点node-n。之所以要不断CAS,是因为有可能有多个线程在进行此操作。
步骤3:获取node-n的前一个节点,这里是node-(n-1)。如果node-(n-1)是头节点(即head指向的节点),则会将会再一次尝试获取资源数(调用tryAcquireShared),如果成功(tryAcquireShared返回值大于等于0),则进入步骤4;如果获取资源数失败或者node-(n-1)根本就不是头节点时,则进入步骤5。
步骤4:将node-n设置成新的头结点,并将node-n.waitStatus设置成0,node-n.thread设置成null。从队尾开始向队头遍历,找到一个最接近队头的并且waitStatus是SIGNAL,CONDITION,PROPAGATE之一的Node,将其解除阻塞状态(LockSupport.unpark(s.thread);),最后结束整个调用。
步骤5:如果node-(n-1).waitStatus为0(新建的Node该值都是0),则尝试将其设置成SIGNAL(值为-1,设置成SIGNAL就相当于当前节点的next节点需要获取资源数,注意,这里也是通过一次CAS操作去设值,是否设值成功并不重要,重要的是至少有一个线程会将其设置成功);如果node-(n-1).waitStatus为SIGNAL,则阻塞自己(LockSupport.park(this)),因为你的上游Node还没获取到资源数呢,所以你得等,如果走到这一步,就真的只能等别的线程来救自己了。
步骤6:不断重复步骤3-5,直到在步骤5中阻塞自己。
为了更好的做对比,我们看看对应的共享访问的资源数的释放操作releaseShared的实现:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
如我们所料,当我们的自定义方法tryReleaseShared返回true时,表明我们释放的资源数已经成功加回到AQS的state了。此时就会直接调用内部操作doReleaseShared。
doReleaseShared方法中会包含如下几个大体步骤:
步骤1:通过head取得头节点node-0,如果node-0.waitStatus为SIGNAL,则使用CAS尝试将node-0.waitStatus设置成0,如果也成功,则进入步骤2;如果node-0.waitStatus为0,则通过CAS尝试将node-0.waitStatus设置成PROPAGATE,如果设置成功则退出,否则再进行步骤1。
步骤2:获取node-0的下一个节点node-1,如果node-1.waitStatus不是CANCELLED和0,则直接解放node-1的线程(LockSupport.unpark(s.thread))。否则则从尾节点开始向节点node-1遍历,找到最靠近节点node-1的且waitStatus不是CANCELLED的节点node-x,如果存在node-x,则解放node-x的线程。
从上述获取资源数和释放资源数的步骤看出,1. 能直接获取到资源数的线程是不会进入队列中的;2. 头节点节点要么是初始化时新建的空Node,要么是上一个已经获取到资源数后线程离开后的Node;3. node-n“关心”的只是节点node-(n-1)的状态;4. 如果node-x相比node-y更靠近head,且这两个node状态都是SIGNAL且没有超时时间,那么node-x一定比node-y先得到资源数。
关于这个第4点,有人会想,如果node-x排在node-y前面,但node-x需要4个资源数,而node-y只需要2个,那么如果这时有个线程释放了2个资源数,那么此时node-y是不能直接获取这2个资源数,这也许不是我们想要的(大家可以用Semaphore来试试)!不过这也许也是正确的,不然将有可能导致node-x之类的节点被活活“饿死”。
我们先通过简单一个例子来回顾上述的几个步骤,为了简单起见,我们借助AQS定义一个简单的同步工具类MzzConcurrentTool ,如下:
/**
* 自定义并发工具
* 类似于简单的Semaphore
* Created by yizhenqiang on 2016/6/14.
*/
public class MzzConcurrentTool {
private MzzSync mzzSync;
public MzzConcurrentTool(int lockNum) {
mzzSync = new MzzSync(lockNum);
}
/**
* 获取当前还剩余的锁
* @return
*/
public int getLockNum() {
return mzzSync.getLockNum();
}
public void acquireLock(int num) {
mzzSync.acquireShared(num);
}
public void release(int num) {
mzzSync.releaseShared(num);
}
private static class MzzSync extends AbstractQueuedSynchronizer {
public MzzSync(int lockNum) {
setState(lockNum);
}
public int getLockNum() {
return getState();
}
@Override
protected final int tryAcquireShared(int lockNum) {
for (;;) {
int available = getState();
int remaining = available - lockNum;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int lockNum) {
for (;;) {
int current = getState();
int next = current + lockNum;
if (compareAndSetState(current, next))
return true;
}
}
}
}
上面已经说过,如果简单实现的话,我们只需要实现上图红色圆角矩形框中的方法,在这里,就是tryAcquireShared和tryReleaseShared方法,MzzConcurrentTool就是个最简单的信号量工具。
假设一把锁对应一个资源,那么假设当前我们有10把锁:
final static MzzConcurrentTool tool = new MzzConcurrentTool(10);
我们通过如下步骤来逐步观察AQS中队列的情况:
步骤1:thread-a来获取8把锁,因为有10把,所以它成功了
步骤2:thread-b来获取5把锁,因为只剩2把,所以它只能等了
步骤3:thread-c来获取3把锁,因为只剩2把,所以它只能等了
步骤4:thread-d来获取2把锁,即使现在有2把可用的锁,但队列中thread-b和thread-c排在了它的前面,所以thread-d也只能等
步骤5:thread-a此时释放了5把锁,于是thread-b成功获取了这5把锁,thread-b的Node成为了新的头节点
步骤6:thread-a和thread-b此时释放了所有的锁,于是thread-c和thread-d都成功获取了锁,thread-b的Node成为了新的头节点
到目前为止,我们简单的介绍了acquireShared和releaseShared操作中的基本行为,前面也说过,acquireInterruptibly和acquireShared功能类似,只是操作之前进行了线程中断判断,现在我们看看tryAcquireSharedNanos是怎么做到超时返回的,其代码如下:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
其中tryAcquireShared还是我们需要自己实现的代码,如果tryAcquireShared返回小于0,表示资源数不够,我们就将进入限时等待的doAcquireSharedNanos操作之中,我们看看doAcquireSharedNanos中的内部实现,发现和doAcquireShared内部实现类似,doAcquireSharedNanos只是在doAcquireShared的循环操作的尾部添加了线程阻塞等相关操作,如下:
// 确认完prev节点的waitStatus为SIGNAL后且超时时间大于1秒时,进入等待状态
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
// 计算下一轮循环剩余的可等待时间
nanosTimeout -= now - lastTime;
lastTime = now;
可见,上面doAcquireSharedNanos的if语句的条件判断部分没有像doAcquireShared的if条件判断语句的parkAndCheckInterrupt()操作,因为parkAndCheckInterrupt()会一直阻塞当前线程。
其实,整体上看,其实AQS的核心操作就是获取资源失败时进入队列告知prev节点我需要获取资源并阻塞(park)自己并、获取资源数成功时唤醒next节点并离开队列(成为新的头节点)、被唤醒后尝试获取资源数,如果失败则继续等待、释放资源数时唤醒next节点线程。
至于独占(EXCLUSIVE)模式,意味着一次只能有一个线程获取到锁,我们可以从独占模式的tryAcquire签名来看出它与共享模式的区别:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
而共享模式的tryAcquireShared签名如下:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
我们可以看出,tryAcquire返回的是布尔型来表明是成功还是失败,而tryAcquireShared通过返回一个整数值来表明是成功还是失败,就是说tryAcquire关系的不是资源数量,它关系的是“有”还是“没有”的问题。对于独占模式,我们标记出锁的拥有者线程,凡是不是该线程来获取锁(因为独占模式的拥有者线程可以重复获取某把锁),都直接拒绝,AQS继承与抽象类AbstractOwnableSynchronizer,AbstractOwnableSynchronizer中添加了独占线程属性和相关的操作,如下:
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
表面上看,独占模式似乎就是资源数为1的共享模式,实际上独占模式指资源只能同时被一个线程锁持有,即使还有可用的资源,其他线程也只能排队,而不能直接获取资源,只有等资源拥有者线程释放所有的资源,其他线程才能去争夺资源拥有者的位置。
相关推荐
COM技术初探.doc
天干地支在择时中的应用初探 天干地支是中国古代的计时系统,用于记录和预测时间周期。天干地支系统由十个天干(甲乙丙丁戊己庚辛壬癸)和十二个地支(子丑寅卯辰巳午未申酉戌亥)组成。天干地支的阴阳属性奇数为阳...
地产行业杂谈系列之十六:社区增值服务模式初探
网络公关初探资料.pdf
企业信息化初探。。。。。
关于物联网智能家居的初探. 关于物联网智能家居的初探
元宇宙初探React+Three.js制作3D全景漫游.zip元宇宙初探React+Three.js制作3D全景漫游.zip元宇宙初探React+Three.js制作3D全景漫游.zip元宇宙初探React+Three.js制作3D全景漫游.zip元宇宙初探React+Three.js制作3D...
多因子模型体系初探 多因子模型是风险-收益关系的定量表达,因子是不同类型风险的解释变量。多因子模型是由 APT 理论发展而来,其一般表达式为:̃ = ∑ ∗ ̃ + ̃=1。多因子模型本质是将对只股票的收益-风险预测...
基于SNS互动的就业指导网站网络营销运营初探.docx基于SNS互动的就业指导网站网络营销运营初探.docx基于SNS互动的就业指导网站网络营销运营初探.docx基于SNS互动的就业指导网站网络营销运营初探.docx基于SNS互动的...
SpringCloud服务拆分初探与案例解析 cloud-demo第一次 SpringCloud服务拆分初探与案例解析 cloud-demo第一次 SpringCloud服务拆分初探与案例解析 cloud-demo第一次 SpringCloud服务拆分初探与案例解析 cloud-demo第...
基于云平台的多系统安全等保合规方案初探.pdf基于云平台的多系统安全等保合规方案初探.pdf基于云平台的多系统安全等保合规方案初探.pdf基于云平台的多系统安全等保合规方案初探.pdf基于云平台的多系统安全等保合规...
基于云平台的多系统安全等保合规方案初探.docx基于云平台的多系统安全等保合规方案初探.docx基于云平台的多系统安全等保合规方案初探.docx基于云平台的多系统安全等保合规方案初探.docx基于云平台的多系统安全等保...
在计算机软件开发中,Java编程语言的应用是极为广泛与深远的。Java语言自问世以来就以其独特的特性和优势吸引了大量开发者的注意,这些特性包括但不限于其平台独立性、内存管理优化、面向对象的编程设计以及其强大的...
在这一系列报告的首篇中,华泰证券通过对多因子模型的初探,为后续的深入分析奠定了基础。 首先,报告指出主动定量管理的本质是统计套利,其中的“因子”是研究的核心,代表了不同类型的共性风险。与定性管理相比,...
### 初探uCOS-II:嵌入式操作系统的基础与应用 #### 一、uCOS-II简介 uCOS-II,全称MicroC/OS-II,是一款专门为嵌入式系统设计的操作系统内核。它由Jean J. Labrosse创建,并在开源社区中得到了广泛的认可和支持。...
六爻预测彩票初探.pdf
区块链与人工智能技术融合发展初探 本文研究了区块链和人工智能技术的融合发展,分析了两者技术特点,探究了如何将区块链和人工智能技术结合起来,为学界和业界提供了新的思路。 一、区块链技术概述 区块链技术是...
15、培训学校成本管理初探.doc
电商英语翻译任务型教学模式初探 随着经济全球化进程的加快,电子商务在外贸领域的作用日益重要,对外贸人才的需求不断增长。电商英语翻译作为电商行业的重要技能之一,其教学质量直接关系到电商人才的专业能力。...