`
m635674608
  • 浏览: 5053223 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

AbstractQueuedSynchronizer初探

    博客分类:
  • java
 
阅读更多

毫不为过的说,AbstractQueuedSynchronizer(以下简称AQS)java.util.concurrent包下同步器类的灵魂组件,很多同步组件都是基于它实现的,比如CountDownLatchCyclicBarrierReentrantLockReentrantReadWriteLockConcurrentHashMap等。

我们不要一头栽进AQS的代码里直接看,因为是DougLea大神写的,直接看会有点晕。我们来回顾一下我们用过的同步工具类,其中通用的核心操作有如下几种方式:

场景1:阻塞直到获取指定资源数

场景2:可中断限时等待直到获取指定资源数

场景3:直接尝试获取指定资源数

场景4:释放指定资源数

上述四个步骤又都可以分为共享share)操作和独占exclusive)操作两种,所以通用的就8种场景,如果AQS设计的足够好,则所有的容器类只需要控制资源数目、获取的资源量和释放的资源量即可,我们先从简单的特性说起。

我们先看下图(独占和共享的方法调用):


图中分为左右两个大矩形框,左边代表的独占访问的操作,右边代表着共享访问的操作,其中的小矩形框都代表的AQS里面的方法。其中,蓝色的小矩形框代表着AQS中默认已经实现了的方法,而红色小圆角矩形框代表着需要你自己去实现并覆盖的方法。箭头表示方法的调用次序。从方法的名称上,你就可以大概明白方法的作用,acquire用来表示是获取资源数的操作,而release表示用来释放资源数的操作,不带Shared表示是独占的操作。如果我们没有实现红色圆角矩形框的方法却间接调用了,将会抛出著名的UnsupportedOperationException异常。

 

在了解AQS之前,我们先来聊聊CLH(Craig, Landin, and Hagersten),简单的说,它使用队列的方式来解决n个线程来争夺m把锁的问题,每当一个新的线程需要获取锁,为其创建一个节点并放到队尾,如果该线程是队列中的第一个节点,则节点的locked设置成false,如果它不是队列的第一个节点,则它的节点的prev指向原来的队尾节点,并不断轮询查看prev指向节点的locked属性,如果该值变为false,表示轮到它来尝试获取锁了,如果获取成功,则将自己的locked设置成false,如果获取失败,locked值不变,还是true,并不断尝试获取锁。从这里看,CLH线程争用锁似乎是公平的,先来的线程肯定会先获得锁,其实不然,比如,新来一个线程需要5把锁,但现在只有2把,于是该线程得等,但后面又来了一个只需要2把锁的线程,那有人立马抢答,因为是公平的,所以这个需要2把锁的线程也得等着,这就错了,其实如果有2把锁剩余,这个线程根本不会进入等待队列中,而是直接拿着走人了!但我们可以这样说,对于进入队列来等待锁的线程,是公平的,先来先得!具体的CLH细节可以参看网文:http://www.bubuko.com/infodetail-638815.html

 

AQS提供了诸如共享锁、独占锁的获取和释放的基本语义,既然AQS的名称中就有队列关键字Queued,可见队列是其使用的核心数据结构之一。接下来我们从源码角度看AQS是如何通过CLH锁的思想来实现上述语义的。队列有多种实现方式,比如我们维护一个headtail的变量,用来指向队列的头节点和尾节点,再维护一个size的变量,用来记录队列的当前节点个数,当然还得维护一个表明队列最大容量的变量capacity。有了上述的“四件套”,则找一个存储队列节点的容器就行了,这个存储容器可以是数组(比如ArrayBlockingQueue)、链表(比如LinkedBlockingQueue)甚至是哈希表(比如FlumeEventQueue)。我们可以看到,在AQS中,队列节点是放在一个双向链表结构中的,我们立马看下AQS的静态内部类Node类有哪些主要属性:

/** Marker to indicate a node is waiting in shared mode */

static final Node SHARED = new Node();

/** Marker to indicate a node is waiting in exclusive mode */

static final Node EXCLUSIVE = null;

/** SIGNAL,CANCELLED,CONDITION,PROPAGATE,0 */

volatile int waitStatus;

volatile Node prev;

volatile Node next;

/** Node上绑定的线程,由构造函数传入,用完后需要set null */

volatile Thread thread;

/** 因为条件只能是独占的,所以nextWaiter指向的是下一个等待该条件的Node。当然,在共享模式中,nextWaiter会被设置成一个特殊值:SHARED*/

Node nextWaiter;

/** 通过nextWaiter的值来判断该Node是处于独占模式还是共享模式 */

final boolean isShared() {

    return nextWaiter == SHARED;

}

我们看Node的非静态属性,因为是双向链表,所以有prevnext,还有一个thread,用来记录该节点对应的线程,还有一个表示该节点状态的waitStatus,它有四种状态:

SIGNAL,值为-1,表示当前节点的next节点需要获取资源数,也就是需要unpark

CANCELLED,值为1,表示当前的线程被取消

CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中

PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够执行

0,新建的Node的状态都是0,表示初始状态

 

 

我们再来看看在AQS中资源数(或者锁)的个数是放在哪个变量中的,没错,就是state

/**

 * The synchronization state.

 */

private volatile int state;

注意,这里没有直接使用AtomicInteger,而是使用了保证可见性却不保证一致性的volatile关键字,这是为了减少对AtomicInteger,所以在AQS中更新state字段和AtomicInteger中的语意一样,使用的都是unsafe.compareAndSwapInt

 

 

我们先看看资源获取。先看看需要我们自己来实现的tryAcquiretryAcquireShared的操作,tryAcquire通过返回一个boolean值来告诉调用者获取资源数是否成功,而tryAcquireShared则返回的是个int型,小于0表示获取失败,等于0表示获取成功但后续还有其他获取将失败,大于0也表示获取成功但后续的获取有可能成功。

 

我们先来看看上面提到的共享下的场景1:阻塞直到获取指定资源数。从上面的独占和共享的方法调用图中可以看出,对应的是acquireShared方法,看了代码就知道acquireSharedInterruptiblyacquireShared的区别就是acquireSharedInterruptibly在操作之前会先去检查当前线程的中断标记,如果已经被中断了,则直接抛出InterruptedException,所以acquireSharedInterruptibly就比acquireShared多了如下两行代码:

if (Thread.interrupted())

    throw new InterruptedException();

所以,如果能看懂acquireSharedacquireSharedInterruptibly自然就不在话下。这里我们先来看下acquireShared的内部实现:

public final void acquireShared(int arg) {

    if (tryAcquireShared(arg) < 0)

        doAcquireShared(arg);

}

前面已经说过,tryAcquireShared就是我们需要自己实现的方法之一,如果返回值小于0,表示获取资源数失败,从上面的代码可以看出,如果获取成功(返回值大于或等于0),该方法就直接返回了,如果获取失败(返回值小于0),则直接调用doAcquireShared方法。我们试想一下,如果当前线程调用了获取资源数的阻塞方法,如果当前没有足够的资源数,那它会做什么?对!它会一直等,因为acquireShared既不会接收中断信号,也没有超时时间,他能做的事情就是“等待机会”。我们来看看doAcquireShared做了哪些事情,AQS中有两个Node类型的属性,用来指向队列的头和尾:

private transient volatile Node head;

private transient volatile Node tail;

每个由于获取资源数而被阻塞的线程都会“化身”成一个Node节点而被添加到这headtail之间的队列中,这也是Node类中有个Thread类型的成员属性thread的原因。为了便于理解,我们假设当前被阻塞的线程不止一个,也就是说该队列会有多个Node节点。就在这时,又一个线程threand-n(这里的n表示队列中已经被阻塞了n-1个线程了,也就是已经有个n-1Node节点了)来通过调用acquireShared来以阻塞获取资源数,此时我们自定义实现的tryAcquireShared方法的返回的值小于0(因为现在真的没有可用的资源数了,其他线程还未释放),所以就进入到了doAcquireShared方法中。

 

doAcquireShared方法中会包含如下几个大体步骤

步骤1:创建一个Node变量node-n,将node-n.thread指向线程thread-nnode-n.nextWaiter指向SHARED(上面讲Node类的属性时提到的SHARED),node-n.waitStatus值为0

步骤2:不断进行CASunsafe.compareAndSwapObject)操作,直到成功将node-n添加到队尾(如果head=tail,则会先创建一个Node作为头节点,使headtail同时指向该节点)即tail指向node-n。此时状态是原来的尾节点node-(n-1).next指向node-n,而node-n.prev指向node-(n-1),并且tail指向新的尾节点node-n。之所以要不断CAS,是因为有可能有多个线程在进行此操作。

步骤3:获取node-n的前一个节点,这里是node-(n-1)。如果node-(n-1)是头节点(即head指向的节点),则会将会再一次尝试获取资源数(调用tryAcquireShared),如果成功(tryAcquireShared返回值大于等于0),则进入步骤4;如果获取资源数失败或者node-(n-1)根本就不是头节点时,则进入步骤5

步骤4:将node-n设置成新的头结点,并将node-n.waitStatus设置成0node-n.thread设置成null。从队尾开始向队头遍历,找到一个最接近队头的并且waitStatusSIGNAL,CONDITION,PROPAGATE之一的Node,将其解除阻塞状态(LockSupport.unpark(s.thread);),最后结束整个调用。

步骤5:如果node-(n-1).waitStatus0(新建的Node该值都是0),则尝试将其设置成SIGNAL(值为-1,设置成SIGNAL就相当于当前节点的next节点需要获取资源数,注意,这里也是通过一次CAS操作去设值,是否设值成功并不重要,重要的是至少有一个线程会将其设置成功);如果node-(n-1).waitStatusSIGNAL,则阻塞自己(LockSupport.park(this)),因为你的上游Node还没获取到资源数呢,所以你得等,如果走到这一步,就真的只能等别的线程来救自己了。

步骤6:不断重复步骤3-5,直到在步骤5中阻塞自己。

 

为了更好的做对比,我们看看对应的共享访问的资源数的释放操作releaseShared的实现:

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();

        return true;

    }

    return false;

}

如我们所料,当我们的自定义方法tryReleaseShared返回true时,表明我们释放的资源数已经成功加回到AQSstate了。此时就会直接调用内部操作doReleaseShared

 

doReleaseShared方法中会包含如下几个大体步骤

步骤1:通过head取得头节点node-0,如果node-0.waitStatusSIGNAL,则使用CAS尝试将node-0.waitStatus设置成0,如果也成功,则进入步骤2;如果node-0.waitStatus0,则通过CAS尝试将node-0.waitStatus设置成PROPAGATE,如果设置成功则退出,否则再进行步骤1

步骤2:获取node-0的下一个节点node-1,如果node-1.waitStatus不是CANCELLED0,则直接解放node-1的线程(LockSupport.unpark(s.thread))。否则则从尾节点开始向节点node-1遍历,找到最靠近节点node-1的且waitStatus不是CANCELLED的节点node-x,如果存在node-x,则解放node-x的线程。

 

从上述获取资源数和释放资源数的步骤看出,1. 能直接获取到资源数的线程是不会进入队列中的;2. 头节点节点要么是初始化时新建的空Node,要么是上一个已经获取到资源数后线程离开后的Node3. node-n“关心”的只是节点node-(n-1)的状态;4. 如果node-x相比node-y更靠近head,且这两个node状态都是SIGNAL且没有超时时间,那么node-x一定比node-y先得到资源数。

 

关于这个第4点,有人会想,如果node-x排在node-y前面,但node-x需要4个资源数,而node-y只需要2个,那么如果这时有个线程释放了2个资源数,那么此时node-y是不能直接获取这2个资源数,这也许不是我们想要的(大家可以用Semaphore来试试)!不过这也许也是正确的,不然将有可能导致node-x之类的节点被活活“饿死”。

 

我们先通过简单一个例子来回顾上述的几个步骤,为了简单起见,我们借助AQS定义一个简单的同步工具类MzzConcurrentTool ,如下:

/**

 * 自定义并发工具

 * 类似于简单的Semaphore

 * Created by yizhenqiang on 2016/6/14.

 */

public class MzzConcurrentTool {

 

    private MzzSync mzzSync;

 

    public MzzConcurrentTool(int lockNum) {

        mzzSync = new MzzSync(lockNum);

    }

 

    /**

     * 获取当前还剩余的锁

     * @return

     */

    public int getLockNum() {

        return mzzSync.getLockNum();

    }

 

    public void acquireLock(int num) {

        mzzSync.acquireShared(num);

    }

 

    public void release(int num) {

        mzzSync.releaseShared(num);

    }

 

    private static class MzzSync extends AbstractQueuedSynchronizer {

 

        public MzzSync(int lockNum) {

            setState(lockNum);

        }

 

        public int getLockNum() {

            return getState();

        }

 

        @Override

        protected final int tryAcquireShared(int lockNum) {

            for (;;) {

                int available = getState();

                int remaining = available - lockNum;

                if (remaining < 0 ||

                        compareAndSetState(available, remaining))

                    return remaining;

            }

        }

 

        protected final boolean tryReleaseShared(int lockNum) {

            for (;;) {

                int current = getState();

                int next = current + lockNum;

 

                if (compareAndSetState(current, next))

                    return true;

            }

        }

    }

}

 

上面已经说过,如果简单实现的话,我们只需要实现上图红色圆角矩形框中的方法,在这里,就是tryAcquireSharedtryReleaseShared方法,MzzConcurrentTool就是个最简单的信号量工具。

 

假设一把锁对应一个资源,那么假设当前我们有10把锁:

final static MzzConcurrentTool tool = new MzzConcurrentTool(10);

我们通过如下步骤来逐步观察AQS中队列的情况:

 

步骤1thread-a来获取8把锁,因为有10把,所以它成功了


 

步骤2thread-b来获取5把锁,因为只剩2把,所以它只能等了


 

步骤3thread-c来获取3把锁,因为只剩2把,所以它只能等了


 

步骤4thread-d来获取2把锁,即使现在有2把可用的锁,但队列中thread-bthread-c排在了它的前面,所以thread-d也只能等


 

步骤5thread-a此时释放了5把锁,于是thread-b成功获取了这5把锁,thread-bNode成为了新的头节点


 

步骤6thread-athread-b此时释放了所有的锁,于是thread-cthread-d都成功获取了锁,thread-bNode成为了新的头节点


 

 

到目前为止,我们简单的介绍了acquireSharedreleaseShared操作中的基本行为,前面也说过,acquireInterruptiblyacquireShared功能类似,只是操作之前进行了线程中断判断,现在我们看看tryAcquireSharedNanos是怎么做到超时返回的,其代码如下:

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

        throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    return tryAcquireShared(arg) >= 0 ||

        doAcquireSharedNanos(arg, nanosTimeout);

}

 其中tryAcquireShared还是我们需要自己实现的代码,如果tryAcquireShared返回小于0,表示资源数不够,我们就将进入限时等待的doAcquireSharedNanos操作之中,我们看看doAcquireSharedNanos中的内部实现,发现和doAcquireShared内部实现类似,只是在确认完prev节点的waitStatusSIGNAL后且超时时间大于1秒时,进入等待状态:

if (shouldParkAfterFailedAcquire(p, node) &&

    nanosTimeout > spinForTimeoutThreshold)

    LockSupport.parkNanos(this, nanosTimeout);

 

其实,整体上看,其实AQS的核心操作就是获取资源失败时进入队列告知prev节点我需要获取资源并阻塞(park)自己并、获取资源数成功时唤醒next节点并离开队列(成为新的头节点)、被唤醒后尝试获取资源数,如果失败则继续等待、释放资源数时唤醒next节点线程。

 

 

至于独占(EXCLUSIVE)模式,意味着一次只能有一个线程获取到锁,可以想象成资源数为1的共享模式,锁的争夺策略和共享模式是类似的,这里就不过多介绍了。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics