`
森林的天空
  • 浏览: 15238 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

JAVA并发编程学习笔记之AQS源码分析(获取与释放)

 
阅读更多

同步状态

AQS采用的是CLH队列,CLH队列是由一个一个结点构成的,前面提到结点中有一个状态位,这个状态位与线程状态密切相关,这个状态位(waitStatus)是一个32位的整型常量,它的取值如下:

 

  1. static final int CANCELLED =  1;  
  2. static final int SIGNAL    = -1;  
  3. static final int CONDITION = -2;  
  4. static final int PROPAGATE = -3;  

下面解释一下每个值的含义

 

CANCELLED:因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;

SIGNAL:表示这个结点的继任结点被阻塞了,到时需要通知它;

CONDITION:表示这个结点在条件队列中,因为等待某个条件而被阻塞;

PROPAGATE:使用在共享模式头结点有可能牌处于这种状态,表示锁的下一次获取可以无条件传播;

0:None of the above,新结点会处于这种状态。

 

获取

AQS中比较重要的两个操作是获取和释放,以下是各种获取操作:

 

  1. public final void acquire(int arg);  
  2. public final void acquireInterruptibly(int arg);  
  3. public final void acquireShared(int arg);  
  4. public final void acquireSharedInterruptibly(int arg);  
  5. protected boolean tryAcquire(int arg);   
  6. protected int tryAcquireShared(int arg);  
  7. public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;  
  8. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;       

获取操作的流程图如下:

1、如果尝试获取锁成功整个获取操作就结束,否则转到2. 尝试获取锁是通过方法tryAcquire来实现的,AQS中并没有该方法的具体实现,只是简单地抛出一个不支持操作异常,在AQS简介中谈到tryAcquire有很多实现方法,这里不再细化,只需要知道如果获取锁成功该方法返回true即可;

2、如果获取锁失败,那么就创建一个代表当前线程的结点加入到等待队列的尾部,是通过addWaiter方法实现的,来看该方法的具体实现:

 

  1. /** 
  2.  * Creates and enqueues node for current thread and given mode. 
  3.  * 
  4.  * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 
  5.  * @return the new node 
  6.  */  
  7. private Node addWaiter(Node mode) {  
  8.     Node node = new Node(Thread.currentThread(), mode);  
  9.     // Try the fast path of enq; backup to full enq on failure  
  10.     Node pred = tail;  
  11.     if (pred != null) {  
  12.         node.prev = pred;  
  13.         if (compareAndSetTail(pred, node)) {  
  14.             pred.next = node;  
  15.             return node;  
  16.         }  
  17.     }  
  18.     enq(node);  
  19.     return node;  
  20. }  

该方法创建了一个独占式结点,然后判断队列中是否有元素,如果有(pred!=null)就设置当前结点为队尾结点,返回;

 

如果没有元素(pred==null),表示队列为空,走的是入队操作

 

  1. /** 
  2.  * Inserts node into queue, initializing if necessary. See picture above. 
  3.  * @param node the node to insert 
  4.  * @return node's predecessor 
  5.  */  
  6. private Node enq(final Node node) {  
  7.     for (;;) {  
  8.         Node t = tail;  
  9.         if (t == null) { // Must initialize  
  10.             if (compareAndSetHead(new Node()))  
  11.                 tail = head;  
  12.         } else {  
  13.             node.prev = t;  
  14.             if (compareAndSetTail(t, node)) {  
  15.                 t.next = node;  
  16.                 return t;  
  17.             }  
  18.         }  
  19.     }  
  20. }  

enq方法采用的是变种CLH算法,先看头结点是否为空,如果为空就创建一个傀儡结点,头尾指针都指向这个傀儡结点,这一步只会在队列初始化时会执行;

 

如果头结点非空,就采用CAS操作将当前结点插入到头结点后面,如果在插入的时候尾结点有变化,就将尾结点向后移动直到移动到最后一个结点为止,然后再把当前结点插入到尾结点后面,尾指针指向当前结点,入队成功。

3、将新加入的结点放入队列之后,这个结点有两种状态,要么获取锁,要么就挂起,如果这个结点不是头结点,就看看这个结点是否应该挂起,如果应该挂起,就挂起当前结点,是否应该挂起是通过shouldParkAfterFailedAcquire方法来判断的

 

  1. /** 
  2.     * Checks and updates status for a node that failed to acquire. 
  3.     * Returns true if thread should block. This is the main signal 
  4.     * control in all acquire loops.  Requires that pred == node.prev 
  5.     * 
  6.     * @param pred node's predecessor holding status 
  7.     * @param node the node 
  8.     * @return {@code true} if thread should block 
  9.     */  
  10.    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
  11.        int ws = pred.waitStatus;  
  12.        if (ws == Node.SIGNAL)  
  13.            /* 
  14.             * This node has already set status asking a release 
  15.             * to signal it, so it can safely park. 
  16.             */  
  17.            return true;  
  18.        if (ws > 0) {  
  19.            /* 
  20.             * Predecessor was cancelled. Skip over predecessors and 
  21.             * indicate retry. 
  22.             */  
  23.            do {  
  24.                node.prev = pred = pred.prev;  
  25.            } while (pred.waitStatus > 0);  
  26.            pred.next = node;  
  27.        } else {  
  28.            /* 
  29.             * waitStatus must be 0 or PROPAGATE.  Indicate that we 
  30.             * need a signal, but don't park yet.  Caller will need to 
  31.             * retry to make sure it cannot acquire before parking. 
  32.             */  
  33.            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
  34.        }  
  35.        return false;  
  36.    }  

该方法首先检查前趋结点的waitStatus位,如果为SIGNAL,表示前趋结点会通知它,那么它可以放心大胆地挂起了;

 

如果前趋结点是一个被取消的结点怎么办呢?那么就向前遍历跳过被取消的结点,直到找到一个没有被取消的结点为止,将找到的这个结点作为它的前趋结点,将找到的这个结点的waitStatus位设置为SIGNAL,返回false表示线程不应该被挂起。
上面谈的不是头结点的情况决定是否应该挂起,是头结点的情况呢?

是头结点的情况,当前线程就调用tryAcquire尝试获取锁,如果获取成功就将头结点设置为当前结点,返回;如果获取失败就循环尝试获取锁,直到获取成功为止。整个acquire过程就分析完了。

 

释放

释放操作有以下方法:

 

  1. public final boolean release(int arg);   
  2. protected boolean tryRelease(int arg);   
  3. protected boolean tryReleaseShared(int arg);   

 

下面看看release方法的实现过程

1、release过程比acquire要简单,首先调用tryRelease释放锁,如果释放失败,直接返回;

2、释放锁成功后需要唤醒继任结点,是通过方法unparkSuccessor实现的

 

  1. /** 
  2.     * Wakes up node's successor, if one exists. 
  3.     * 
  4.     * @param node the node 
  5.     */  
  6.    private void unparkSuccessor(Node node) {  
  7.        /* 
  8.         * If status is negative (i.e., possibly needing signal) try 
  9.         * to clear in anticipation of signalling.  It is OK if this 
  10.         * fails or if status is changed by waiting thread. 
  11.         */  
  12.        int ws = node.waitStatus;  
  13.        if (ws < 0)  
  14.            compareAndSetWaitStatus(node, ws, 0);  
  15.   
  16.        /* 
  17.         * Thread to unpark is held in successor, which is normally 
  18.         * just the next node.  But if cancelled or apparently null, 
  19.         * traverse backwards from tail to find the actual 
  20.         * non-cancelled successor. 
  21.         */  
  22.        Node s = node.next;  
  23.        if (s == null || s.waitStatus > 0) {  
  24.            s = null;  
  25.            for (Node t = tail; t != null && t != node; t = t.prev)  
  26.                if (t.waitStatus <= 0)  
  27.                    s = t;  
  28.        }  
  29.        if (s != null)  
  30.            LockSupport.unpark(s.thread);  
  31.    }  

1、node参数传进来的是头结点,首先检查头结点的waitStatus位,如果为负,表示头结点还需要通知后继结点,这里不需要头结点去通知后继,因此将该该标志位清0.

 

2、然后查看头结点的下一个结点,如果下一个结点不为空且它的waitStatus<=0,表示后继结点没有被取消,是一个可以唤醒的结点,于是唤醒后继结点返回;如果后继结点为空或者被取消了怎么办?寻找下一个可唤醒的结点,然后唤醒它返回。

这里并没有从头向尾寻找,而是相反的方向寻找,为什么呢?

因为在CLH队列中的结点随时有可能被中断,被中断的结点的waitStatus设置为CANCEL,而且它会被踢出CLH队列,如何个踢出法,就是它的前趋结点的next并不会指向它,而是指向下一个非CANCEL的结点,而它自己的next指针指向它自己。一旦这种情况发生,如何从头向尾方向寻找继任结点会出现问题,因为一个CANCEL结点的next为自己,那么就找不到正确的继任接点。

有的人又会问了,CANCEL结点的next指针为什么要指向它自己,为什么不指向真正的next结点?为什么不为NULL?

第一个问题的答案是这种被CANCEL的结点最终会被GC回收,如果指向next结点,GC无法回收。

对于第二个问题的回答,JDK中有这么一句话: The next field of cancelled nodes is set to point to the node itself instead of null, to make life easier for isOnSyncQueue.大至意思是为了使isOnSyncQueue方法更新简单。isOnSyncQueue方法判断一个结点是否在同步队列,实现如下:

 

  1. /** 
  2.  * Returns true if a node, always one that was initially placed on 
  3.  * a condition queue, is now waiting to reacquire on sync queue. 
  4.  * @param node the node 
  5.  * @return true if is reacquiring 
  6.  */  
  7. final boolean isOnSyncQueue(Node node) {  
  8.     if (node.waitStatus == Node.CONDITION || node.prev == null)  
  9.         return false;  
  10.     if (node.next != null// If has successor, it must be on queue  
  11.         return true;  
  12.     /* 
  13.      * node.prev can be non-null, but not yet on queue because 
  14.      * the CAS to place it on queue can fail. So we have to 
  15.      * traverse from tail to make sure it actually made it.  It 
  16.      * will always be near the tail in calls to this method, and 
  17.      * unless the CAS failed (which is unlikely), it will be 
  18.      * there, so we hardly ever traverse much. 
  19.      */  
  20.     return findNodeFromTail(node);  
  21. }  

如果一个结点next不为空,那么它在同步队列中,如果CANCEL结点的后继为空那么CANCEL结点不在同步队列中,这与事实相矛盾。

 

因此将CANCEL结点的后继指向它自己是合理的选择。

 

参考资料:

 

 

分享到:
评论

相关推荐

    Java并发编程学习笔记

    Java并发编程是Java开发中必不可少的一部分,涉及到多线程、同步机制、线程池以及并发工具类等多个核心知识点。以下是对这些主题的详细说明: 1. **线程安全与锁 Synchronized 底层实现原理**: 线程安全是指在多...

    java并发编程:juc、aqs

    Java并发编程中的`JUC`(Java Util Concurrency)库是Java平台中用于处理多线程问题的核心工具包,它提供了一系列高效、线程安全的工具类,帮助开发者编写并发应用程序。`AQS`(AbstractQueuedSynchronizer)是JUC库中的...

    Java并发编程解析 | 解析AQS基础同步器的设计与实现

    "Java并发编程解析 | 解析AQS基础同步器的设计与实现" 在Java领域中,解决并发编程问题的关键是解决同步和互斥的问题。同步是指线程之间的通信和协作,互斥是指同一时刻只能允许一个线程访问共享资源。Java领域中有...

    Java 并发编程实战.pdf

    本书名为《Java 并发编程实战》,主要针对Java开发者,在内容上深入探讨了Java语言中的线程和并发编程机制。本书的描述表明了它将复杂的技术内容用浅显易懂的方式表达出来,使之成为读者在学习和使用Java进行并发...

    AQS源码分析 (1).pdf

    接下来,我们来具体分析一下AQS的源码。AQS中定义了一个名为state的volatile变量,用于表示同步状态。这个变量有三种操作方法:getstate()、setstate()和compareAndSetState(),分别用于获取、设置和原子性地更新...

    JAVA并发编程与高并发解决方案-并发编程四之J.U.C之AQS.docx

    ### JAVA并发编程与高并发解决方案-并发编程四之J.U.C之AQS #### 引言 《JAVA并发编程与高并发解决方案-并发编程四之J.U.C之AQS》是一篇详细介绍Java实用并发工具包(Java Util Concurrency,简称J.U.C.)中重要...

    Java并发编程全景图.pdf

    Java并发编程是Java语言中最为复杂且重要的部分之一,它涉及了多线程编程、内存模型、同步机制等多个领域。为了深入理解Java并发编程,有必要了解其核心技术点和相关实现原理,以下将详细介绍文件中提及的关键知识点...

    图灵Java高级互联网架构师第6期并发编程专题笔记.zip

    04-Java并发线程池底层原理详解与源码分析-monkey 05-并发编程之深入理解Java线程-fox 06-并发编程之CAS&Atomic原子操作详解-fox 07-并发锁机制之深入理解synchronized(一)-fox 08-并发锁机制之深入理解...

    Java并发编程:深入解析抽象队列同步器(AQS)及其在Lock中的应用

    本文深入探讨了Java并发编程的关键组件——抽象队列同步器(AQS)及其在ReentrantLock的应用。AQS是处理线程同步问题的高效工具,是Java并发编程中的核心。文章首先简要介绍了并发编程领域的先驱Doug Lea。重点在于...

    阿里专家级并发编程架构师课程 彻底解决JAVA并发编程疑难杂症 JAVA并发编程高级教程

    课程内容包括了JAVA手写线程池,UC线程池API详解,线程安全根因详解,锁与原子类,分布式锁原理与实现方式,并发编程-AQS等等针对性非常强的JAVA编程开发教程,这其中的内容对JAVA开发技能的拔尖,非常的有帮助。...

    多线程与高并发编程笔记、源码等

    标题“多线程与高并发编程笔记、源码等”表明了资源的核心内容,涵盖了多线程和高并发编程的理论与实践。多线程允许一个应用程序同时执行多个任务,而高并发则指系统能够处理大量并发请求的能力。这两个概念在现代...

    阿里专家级并发编程架构师课程-网盘链接提取码下载 .txt

    课程内容包括了JAVA手写线程池,UC线程池API详解,线程安全根因详解,锁与原子类,分布式锁原理与实现方式,并发编程-AQS等等针对性非常强的JAVA编程开发教程,这其中的内容对JAVA开发技能的拔尖,非常的有帮助。...

    Java并发编程原理与实战

    线程之间通信之join应用与实现原理剖析.mp4 ThreadLocal 使用及实现原理.mp4 并发工具类CountDownLatch详解.mp4 并发工具类CyclicBarrier 详解.mp4 并发工具类Semaphore详解.mp4 并发工具类Exchanger详解.mp4 ...

    Java并发之AQS详解.pdf

    Java并发之AQS详解 AbstractQueuedSynchronizer(AQS)是 Java 并发编程中的一个核心组件,提供了一套多线程访问共享资源的同步器框架。AQS 定义了两种资源共享方式:Exclusive(独占)和 Share(共享)。在 AQS 中...

    java并发编程-AQS和JUC实战

    ### Java并发编程-AQS和JUC实战 #### 一、ReentrantLock 重入锁 **1.1 概述** - **基本介绍**: `ReentrantLock` 是一个实现了 `Lock` 接口的可重入互斥锁,提供比 `synchronized` 更丰富的功能。与 `synchronized...

    龙果 java并发编程原理实战

    第4节学习并发的四个阶段并推荐学习并发的资料 [免费观看] 00:09:13分钟 | 第5节线程的状态以及各状态之间的转换详解00:21:56分钟 | 第6节线程的初始化,中断以及其源码讲解00:21:26分钟 | 第7节多种创建线程的...

    7 AQS源码分析.docx

    《深入解析AQS源码:理解Java并发编程的核心机制》 AQS,即AbstractQueuedSynchronizer,是Java并发编程中的重要组件,主要用于构建锁和同步器。它基于一种称为CLH(Craig, Landin, and Hagersten)队列的等待队列...

    Java并发 结合源码分析AQS原理

    Java并发结合源码分析AQS原理 Java并发编程中,AQS(AbstractQueuedSynchronizer)是一个核心组件,它提供了一个基于FIFO队列和状态变量的基础框架,用于构建锁和其他同步装置。在这篇文章中,我们将深入探讨AQS的...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    通过以上分析,我们可以看到Java并发包提供了丰富的并发锁机制,从简单的synchronized关键字到复杂的AQS、LockSupport、Condition等,这些都是为了解决并发编程中的互斥和同步问题。在实际应用中,开发者需要根据...

Global site tag (gtag.js) - Google Analytics