`
wode66
  • 浏览: 742666 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java并发编程--AbstractQueuedSynchronizer加锁和解锁分析(二)

阅读更多

 

在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),其实现都依赖java.util.concurrent.AbstractQueuedSynchronizer类,实现思路都大同小异,因此我们以ReentrantLock作为讲解切入点。

1. ReentrantLock的调用过程

经过观察ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:

 

static abstract class Sync extends AbstractQueuedSynchronizer 
 

 

Sync又有两个子类:

final static class NonfairSync extends Sync
final static class FairSync extends Sync
 

显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁。

先理一下Reentrant.lock()方法的调用过程(默认非公平锁):


这些讨厌的Template模式导致很难直观的看到整个调用过程,其实通过上面调用过程及AbstractQueuedSynchronizer的注释可以发现,AbstractQueuedSynchronizer中抽象了绝大多数Lock的功能,而只把tryAcquire方法延迟到子类中实现。tryAcquire方法的语义在于用具体子类判断请求线程是否可以获得锁,无论成功与否AbstractQueuedSynchronizer都将处理后面的流程。

2. 锁实现(加锁)

简单说来,AbstractQueuedSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态,经过调查线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。

该队列如图:


与synchronized相同的是,这也是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。令人疑惑的是为什么采用CLH队列呢?原生的CLH队列是用于自旋锁,但Doug Lea把其改造为阻塞锁。

当有线程竞争锁时,该线程会首先尝试获得锁,这对于那些已经在队列中排队的线程来说显得不公平,这也是非公平锁的由来,与synchronized实现类似,这样会极大提高吞吐量。

如果已经存在Running线程,则新的竞争线程会被追加到队尾,具体是采用基于CAS的Lock-Free算法,因为线程并发对Tail调用CAS可能会导致其他线程CAS失败,解决办法是循环CAS直至成功。AbstractQueuedSynchronizer的实现非常精巧,令人叹为观止,不入细节难以完全领会其精髓,下面详细说明实现过程:

2.1 NonFairSync.lock

 

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    // 如果锁没有被任何线程锁定且加锁成功则设定当前线程为锁的拥有者
    // 如果锁已被当前线程锁定,则在acquire中将状态加1并返回
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 加锁失败,再次尝试加锁,失败则加入等待队列,禁用当前线程,直到被中断或有线程释放锁时被唤醒
        acquire(1);
}
 

 

  

2.2 Sync.nonfairTryAcquire

nonfairTryAcquire方法将是lock方法间接调用的第一个方法,每次请求锁时都会首先调用该方法。

/**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 如果锁空闲则尝试锁定,成功则设当前线程为锁拥有者
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 若当前线程为锁拥有者则直接修改锁状态计数
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 尝试获取失败,返回
            return false;
        }

该方法会首先判断当前状态,如果c==0说明没有线程正在竞争该锁,如果不c !=0 说明有线程正拥有了该锁。

1.如果发现c==0,则通过CAS设置该状态值为acquires,acquires的初始调用值为1,如果CAS设置成功,则可以预计其他任何线程调用CAS都不会再成功,也就认为当前线程得到了该锁,也作为Running线程,很显然这个Running线程并未进入等待队列。

2.如果c !=0 但发现自己已经拥有锁,只是简单地重新计算 status + acquires,并修改status值,但因为没有竞争,所以通过setStatus修改,而非CAS,也就是说这段代码实现了偏向锁的功能,并且实现的非常漂亮。每次线程重入该锁都会+1,每次unlock都会-1,但为0时释放锁。

2.3 AbstractQueuedSynchronizer.addWaiter

addWaiter方法负责把当前无法获得锁的线程包装为一个Node添加到队尾:

/**
     * Creates and enqueues node for given thread and mode.
     *
     * @param current the thread
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  其中参数mode是独占锁还是共享锁,默认为null,独占锁。追加到队尾的动作分两步:

  1. 如果当前队尾已经存在(tail!=null),则使用CAS把当前线程更新为Tail
  2. 如果当前Tail为null或则线程调用CAS设置队尾失败,则通过enq方法继续设置Tail

下面是enq方法:

 

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev = h;
                if (compareAndSetHead(h)) {
                    tail = node;
                    return h;
                }
            }
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
 

 

该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是通过CAS把当前线程追加到队尾,并返回包装后的Node实例。

把线程要包装为Node对象的主要原因,除了用Node构造供虚拟队列外,还用Node包装了各种线程状态,这些状态被精心设计为一些数字值:

  • SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)
  • CANCELLED(1):因为超时或中断,该线程已经被取消
  • CONDITION(-2):表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞
  • PROPAGATE(-3):传播共享锁
  • 0:0代表无状态

2.3 AbstractQueuedSynchronizer.acquireQueued

acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回

 

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                // 如果当前线程是head的直接后继则尝试获取锁
                // 这里不会和等待队列中其它线程发生竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争。这是非公平锁和公平锁的一个重要区别。
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);//当前线程获得锁后,node的thread变量设置为了null,此时该node结点就变为了等待队列的哑结点(dummy node)
                    p.next = null; // help GC
                    return interrupted;
                }
                // 如果不是head直接后继或获取锁失败,则检查是否要禁用当前线程
                // 是则禁用,直到被lock.release唤醒或线程中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }
 

仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第12行的parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。

 

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
 

 

如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中:

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int s = pred.waitStatus;
        if (s < 0)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            // 如果前置结点waitStatus已经被置为SIGNAL,则返回true,可以禁用线程
            return true;
        if (s > 0) {
            // 如果前置结果已被CALCEL,则移除。
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
        node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    }
        else
            /*
             * Indicate that we need a signal, but don't park yet. Caller
             * will need to retry to make sure it cannot acquire before
             * parking.
             */

            // 原子性将前置结点waitStatus设为SIGNAL
            compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
        // 这里一定要返回false,有可能前置结点这时已经释放了锁,但因其 waitStatus在释放锁时还未被置为SIGNAL而未触发唤醒等待线程操作,因此必须通过return false来重新尝试一次获取锁
        return false;
    }
 

检查原则在于:

  • 规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞
  • 规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
  • 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同

总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。等待状态在刚进入队列时都是0,如果等待被取消则被设为Node.CANCELLED,若线程释放锁时需要唤醒等待队列里的其它线程则被置为Node.SIGNAL,

至此,锁住线程的逻辑已经完成,下面讨论解锁的过程。

3. 解锁

请求锁不成功的线程会被挂起在acquireQueued方法的第12行,12行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第13行,即设置interrupted = true,之后又进入无限循环。

从无限循环的代码可以看出,并不是得到解锁的线程一定能获得锁,必须在第6行中调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。通过之后将要介绍的解锁机制会看到,第一个被解锁的线程就是Head,因此p == head的判断基本都会成功。

至此可以看到,把tryAcquire方法延迟到子类中实现的做法非常精妙并具有极强的可扩展性,令人叹为观止!当然精妙的不是这个Templae设计模式,而是Doug Lea对锁结构的精心布局。

解锁代码相对简单,主要体现在AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:

class AbstractQueuedSynchronizer

 

 /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
 

class Sync

 

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
 

tryRelease与tryAcquire语义相同,把如何释放的逻辑延迟到子类中。tryRelease语义很明确:如果线程多次锁定,则进行多次释放,直至status==0则真正释放锁,所谓释放锁即设置status为0,因为无竞争所以没有使用CAS。

release的语义在于:如果可以释放锁,则唤醒队列第一个线程(Head),具体唤醒代码如下:

 

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling. It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); 

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
 

这段代码的意思在于找出第一个可以unpark的线程,一般说来head.next == head,Head就是第一个线程,但Head.next可能被取消或被置为null,因此比较稳妥的办法是从后往前找第一个可用线程。貌似回溯会导致性能降低,其实这个发生的几率很小,所以不会有性能影响。之后便是通知系统内核继续该线程,在Linux下是通过pthread_mutex_unlock完成。之后,被解锁的线程进入上面所说的重新竞争状态。

4. Lock VS Synchronized

AbstractQueuedSynchronizer通过构造一个基于阻塞的CLH队列容纳所有的阻塞线程,而对该队列的操作均通过Lock-Free(CAS)操作,但对已经获得锁的线程而言,ReentrantLock实现了偏向锁的功能。

synchronized的底层也是一个基于CAS操作的等待队列,但JVM实现的更精细,把等待队列分为ContentionList和EntryList,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但synchronized还实现了自旋锁,并针对不同的系统和硬件体系进行了优化,而Lock则完全依靠系统阻塞挂起等待线程。

当然Lock比synchronized更适合在应用层扩展,可以继承AbstractQueuedSynchronizer定义各种实现,比如实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock对应的Condition也比wait/notify要方便的多、灵活的多。

 

 

参考资料:

http://blog.csdn.net/chen77716/article/details/6641477 深入JVM锁机制2-Lock

http://www.iteye.com/topic/623398 非阻塞算法-ReentrantLock代码剖析之ReentrantLock.lock

 

  • 大小: 20 KB
  • 大小: 12.2 KB
分享到:
评论
2 楼 ljzxloaf 2016-09-06  
阻塞和等待不一样吧,condition持有等待队列,而AQS持有阻塞队列。文中一直在说阻塞看得有点乱
1 楼 liguanqun811 2014-08-20  
"规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark" 这里应该是park 而不是unpark吧

相关推荐

    62-Java并发编程实战

    62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java并发编程实战62-Java...

    Java并发编程-3.pdf

    CountDownLatch、CyclicBarrier 和 Semaphore 等多线程协作机制都是 Java 并发编程中的重要组成部分。它们可以帮助开发者编写高效、可靠的多线程程序,解决复杂的并发问题。 在实际开发中,我们可以根据具体情况...

    Java并发编程-设计原则与模式

    Java并发编程-设计原则与模式 pdf格式

    Java并发编程实践-电子书

    Java并发编程实践-电子书-01章.pdf Java并发编程实践-电子书-02章.pdf Java并发编程实践-电子书-03章.pdf Java并发编程实践-电子书-04章.pdf Java并发编程实践-电子书-05章.pdf Java并发编程实践-电子书-06章.pdf ...

    java并发编程-从入门到精通

    Java提供了丰富的并发工具和API来支持并发编程,如线程(Thread)、Executor框架、Future、Callable和Runnable接口等。 在Java中,线程的创建和管理是通过继承Thread类或实现Runnable接口来完成的。理解如何正确地...

    阿里专家级并发编程架构师课程 彻底解决JAVA并发编程疑难杂症 JAVA并发编程高级教程

    阿里专家级并发编程架构师级课程,完成课程的学习可以帮助同学们解决非常多的JAVA并发编程疑难杂症,极大的提高JAVA并发编程的效率。课程内容包括了JAVA手写线程池,UC线程池API详解,线程安全根因详解,锁与原子类...

    Java并发编程-Doug Lea

    Java并发编程原汁原味英文版,Doug Lea大神经典著作, 内容:Concurrency Models, design forces, Java Designing objects for concurrency Immutability, locking, state dependence, containment, splitting ...

    java高级技术JUC高并发编程教程2021(1.5G)

    05-JUC高并发编程-Synchronized复习和案例分析.mp4& R: O+ [6 A8 h1 L9 | 06-JUC高并发编程-Synchronized实现案例.mp4+ ?2 `( V" U4 z3 F0 w 07-JUC高并发编程-Lock接口概述和实现案例.mp48 G J/ u; W' _$ o: {2 M ...

    Java 并发编程实战.pdf

    在Java平台上,由于其本身提供了强大的并发编程支持,因此,掌握并发编程对于提高Java应用的性能和响应能力至关重要。 书中会首先介绍Java并发编程的基础知识,包括线程的创建和运行,同步机制的基本用法,以及Java...

    java并发编程-超级大全整理

    总结,Java并发编程涵盖了大量的概念和技术,包括线程的创建、同步、通信以及并发工具的使用。理解和掌握这些知识点,是成为一名合格的Java并发程序员的基础。在实际开发中,应结合具体场景选择合适的并发策略,以...

    Java并发编程常识-梁飞.rar

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。阿里大牛梁飞编写的《Java并发编程常识》PPT,深入浅出地讲解了这个主题,对开发者来...

    java并发编程-构建块

    "java并发编程-构建块"这个主题涵盖了使程序能够同时处理多个任务的关键概念和技术。在这个主题下,我们将深入探讨Java中用于构建高效并发应用的核心工具和概念。 1. **线程**:Java中的线程是并发编程的基础,每个...

    java并发编程2

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。以下是对标题和描述中所提及的几个知识点的详细解释: 1. **线程与并发** - **线程*...

    《java 并发编程实战高清PDF版》

    《Java并发编程实战》是一本深入探讨Java平台并发编程的权威指南。这本书旨在帮助开发者理解和掌握在Java环境中创建高效、可扩展且可靠的多线程应用程序的关键技术和实践。它涵盖了从基本概念到高级主题的广泛内容,...

    Java并发编程-线程安全与基础构建模块

    在Java编程领域,多线程并发是不可或缺的一部分,特别是在服务器端和高并发应用中。本文将深入探讨"Java并发编程-线程安全与基础构建模块"这一主题,旨在帮助开发者理解如何有效地处理并发问题,提高程序性能和稳定...

    java 并发编程的艺术pdf清晰完整版 源码

    这本书全面地介绍了Java平台上的并发和多线程编程技术,旨在帮助开发者解决在实际工作中遇到的并发问题,提高程序的性能和可伸缩性。 并发编程是现代计算机系统中不可或缺的一部分,尤其是在多核处理器成为主流的...

    JAVA并发编程艺术 高清pdf

    JAVA并发编程艺术 高清pdf : 1.并发变成的挑战 2. java并发机制的底层实现原理 3. java 内存模型 4. java并发编程基础 5.java中的锁。。。。。。。

    java并发编程艺术

    总而言之,《Java并发编程艺术》这本书将系统性地介绍Java并发编程的各种技术和最佳实践,帮助读者提升在多线程环境下的编程能力,从而设计出更加健壮、高效的Java应用。无论你是初级开发者还是经验丰富的工程师,这...

Global site tag (gtag.js) - Google Analytics