`

并发包源码分析 -- AQS

    博客分类:
  • java
 
阅读更多

Java 的concurrent 包给我们的并发编程提供了更灵活高效的方案,开发人员可以很方便的利用API利用线程池去处理多任务。最近对concurrent包进行了比较细致的分析,最核心的基础莫过于AQS(AbstractQueuedSynchronizer)。下面看看来自官方的摘要。

 

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(),setState(int) and compareAndSetState(int,int) is tracked with respect to synchronization.

Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such asacquireInterruptibly(int) that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.

 

 

提供基于FIFO的等待队列锁阻塞的一个框架,对于需要用atomic int 原子状态来控制锁的同步非常有用。并发包里面的各种Lock都是基于AQS实现,AQS 子类应该实现改变状态的方法。一般子类都是作为一个内部的帮助类去实现锁,下面来看一个不可重入锁的实现,状态0表示unlock,1表示lock。

class Mutex implements Lock, java.io.Serializable {

	   // Our internal helper class
	   private static class Sync extends AbstractQueuedSynchronizer {
	     // Report whether in locked state
	     protected boolean isHeldExclusively() {
	    	 
	       return getState() == 1;
	     }

	     // Acquire the lock if state is zero
	     public boolean tryAcquire(int acquires) {
	       assert acquires == 1; // Otherwise unused
	       if (compareAndSetState(0, 1)) {
	         setExclusiveOwnerThread(Thread.currentThread());
	         return true;
	       }
	       return false;
	     }

	     // Release the lock by setting state to zero
	     protected boolean tryRelease(int releases) {
	       assert releases == 1; // Otherwise unused
	       if (getState() == 0) throw new IllegalMonitorStateException();
	       setExclusiveOwnerThread(null);
	       setState(0);
	       return true;
	     }

	     // Provide a Condition
	     Condition newCondition() { return new ConditionObject(); }

	     // Deserialize properly
	     private void readObject(ObjectInputStream s)
	         throws IOException, ClassNotFoundException {
	       s.defaultReadObject();
	       setState(0); // reset to unlocked state
	     }
	   }

	   // The sync object does all the hard work. We just forward to it.
	   private final Sync sync = new Sync();

	   public void lock()                { sync.acquire(1); }
	   public boolean tryLock()          { return sync.tryAcquire(1); }
	   public void unlock()              { sync.release(1); }
	   public Condition newCondition()   { return sync.newCondition(); }
	   public boolean isLocked()         { return sync.isHeldExclusively(); }
	   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
	   public void lockInterruptibly() throws InterruptedException {
	     sync.acquireInterruptibly(1);
	   }
	   public boolean tryLock(long timeout, TimeUnit unit)
	       throws InterruptedException {
	     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
	   }
}

 

 AQS用了一个内部的静态类实现了一个双向链表,此链表用来存储所有等待的线程以及其状态。 由于代码太多,我作了提炼,请看看Node的基本结构。

 

 static final class Node {
//Marker to indicate a node is waiting in shared mode
         static final Node SHARED = new Node();
//Marker to indicate a node is waiting in exclusive mode
         static final Node EXCLUSIVE = null;       
//waitStatus value to indicate thread has cancelled
         static final int CANCELLED =  1;      
//waitStatus value to indicate successor's thread needs unparking
         static final int SIGNAL    = -1;
//waitStatus value to indicate thread is waiting on condition
//        static final int CONDITION = -2;
 
         volatile int waitStatus;

        
//Link to predecessor node that current node/thread relies on for checking waitStatus. Assigned during enqueing, and nulled out (for sake of GC) only upon dequeuing. Also, upon cancellation of a predecessor, we short-circuit while finding a non-cancelled one, which will always exist because the head node is never cancelled: A node becomes head only as a result of successful acquire. A cancelled thread never succeeds in acquiring, and a thread only cancels itself, not any other node.

         volatile Node prev;

        
//Link to the successor node that the current node/thread unparks upon release. Assigned during enqueuing, adjusted when bypassing cancelled predecessors, and nulled out (for sake of GC) when dequeued. The enq operation does not assign next field of a predecessor until after attachment, so seeing a null next field does not necessarily mean that node is at end of queue. However, if a next field appears to be null, we can scan prev's from the tail to double-check. The next field of cancelled nodes is set to point to the node itself instead of null, to make life easier for isOnSyncQueue.
 
         volatile Node next;

        
//The thread that enqueued this node. Initialized on construction and nulled out after use.

        volatile Thread thread;

        
//Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive, we save a field by using special value to indicate shared mode.

        Node nextWaiter;
}

 

 链表的节点有三个状态,这些状态将在后面的逻辑中会参与判断该节点是否可以被park,是否可以被unpark.

waitingStauts默认是0,不属于任何下面的状态。任何对状态的修改都是CAS操作,当一个线程被加入锁的阻塞队列的时候状态会编程SIGNAL,表示我需要upark来唤醒。

waitStatus value to indicate thread has cancelled

        static final int CANCELLED =  1;

waitStatus value to indicate successor's thread needs unparking

       static final int SIGNAL    = -1;      

waitStatus value to indicate thread is waiting on condition

       static final int CONDITION = -2;

 

 

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))   
         selfInterrupt();
    }

 这里简单的解释为:如果获取锁没有成功则入等待队列,入队是一定保证成功的,然后accquireQueued循环检查当前加入的节点是不是第一个节点,如果是则继续tryAcquire尝试获取锁成功则返回(如果为true则标记当前线程中断),否则将阻塞直到自己是第一个节点

 

 

 

final boolean acquireQueued(final Node node, int arg) {
         boolean failed = true;
         try {
             boolean interrupted = false;
             for (;;) {
                 final Node p = node.predecessor();
                 if (p == head && tryAcquire(arg)) {            
                     setHead(node);
                     p.next = null; // help GC
                     failed = false;
                     return interrupted;
                 }
                 if (shouldParkAfterFailedAcquire(p, node) &&     
                     parkAndCheckInterrupt())
                     interrupted = true;
             }
         } finally {
             if (failed)
                 cancelAcquire(node);
         }
     }

 这里判断当前节是否是处于队列的第一个节点,如果是则用tryAcquire尝试下(因为锁最终会被另外的线程释放),如果成功,则将节点从链表中移除重置头节点,并最终返回interrupted(注意:因为park在没有得到许可的时候会阻塞,唤醒必须是unpark或者响应interupt(),但是当唤醒的时候我们并不知道是哪种情况导致的唤醒,所以下面park的时候有个中断检查,如果interrupted为true则说明是由于中断唤醒的,否则是unpark唤醒的,这里无论interrupted是true还是false都表示获取锁成功了,不会阻塞,但是我们要将这个唤醒的原因告诉调用上层去继续决定怎么处理。实际上我们aquire方法中可以看到,又调用了一次标记中断,说明这里会继续交由线程去处理。中断导致的情况是极少发生的,因为我们不会随便调用中断

对于后面的if, 先进行状态检查看看是否真的应该阻塞(shouldParkAfterFailedAcquire,判断的逻辑在这个方法中,就是根据上面提到的状态判断的,初始状态为0,通过CAS将状态标记为SINGAL,表示需要unpark),如果是的则会通过LockSupport.park()将当前线程阻塞,等到其他线程unpark的时候唤醒进入下一个循环,直到获取锁。有点类似于while wait。

 

 

     private Node addWaiter(Node mode) {
         Node node = new Node(Thread.currentThread(), mode);
         // Try the fast path of enq; backup to full enq on failure
         Node pred = tail;
         if (pred != null) {
             node.prev = pred;
             if (compareAndSetTail(pred, node)) {  
                 pred.next = node;
                 return node;
             }
         }
         enq(node);
         return node;
     }

这个方法会将当前节点加入等待队列。首先判断尾巴节点是否为空,不为空则设通过CAS置尾巴节点设置为当前节点。 尾巴节点不为空说明等待队列不为空,直接将当前节点加入,如果失败还有后面的enq 方法通过轮询一定保证入队成功。

 

 

     private Node enq(final Node node) {
         for (;;) {                              //无限循环,通过CAS操作让当前节点安全入队。
             Node t = tail;
             if (t == null) { // Must initialize
                 if (compareAndSetHead(new Node()))  
                     tail = head;
             } else { 
                 node.prev = t;                       
                 if (compareAndSetTail(t, node)) {
                     t.next = node;
                     return t;
                 }
             }
         }
     }

 

对于第一个if 初始化时,首尾都为空,这个时候利用CAS加入一个new出来的空的头节点。

初始化完毕后,下一个循环既会进入else分支,对于第一个入队的节点,很显然前面初始化的时候已经成功入队,所以这里CAS尝试更新尾巴节点肯定失败,所以会继续循环更新尾巴节点,最终更新prev跟next(next指向自己)。对于队列大于1的入队这个CAS则有可能成功,并添加到尾巴节点。如果CAS失败则继续循环保证入队一定成功。

分享到:
评论

相关推荐

    7 AQS源码分析.docx

    本文将详细分析AQS的源码,探讨其工作机制,以及在Java中如何实现不同类型的锁。 首先,我们需要了解锁的基本类型。在Java中,锁主要分为两类:悲观锁和乐观锁。悲观锁认为并发操作会导致数据不一致,因此在操作...

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    第五章 ReentrantLock源码解析1--获得非公平锁与公平锁lock()1

    7. **源码分析** 在深入源码之前,我们需要对获取锁的整体流程有一个大致的理解。ReentrantLock的`lock()`方法调用`Sync`的`lock()`,然后由`NonfairSync`或`FairSync`完成具体的锁获取逻辑。这个过程中涉及到的...

    并发编程以及计算机底层原理

    3. **AQS(AbstractQueuedSynchronizer)**:`09-深入理解AQS之独占锁ReentrantLock源码分析-fox`中,AQS是Java并发包中的一种抽象同步器,它是许多高级锁(如ReentrantLock)的基础。AQS维护了一个等待队列,通过...

    Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)

    《Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)》 AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    通过以上分析,我们可以看到Java并发包提供了丰富的并发锁机制,从简单的synchronized关键字到复杂的AQS、LockSupport、Condition等,这些都是为了解决并发编程中的互斥和同步问题。在实际应用中,开发者需要根据...

    Java并发系列之AbstractQueuedSynchronizer源码分析(独占模式)

    【Java并发系列之AbstractQueuedSynchronizer源码分析(独占模式)】 AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁...

    Java 多线程与并发(11-26)-JUC锁- ReentrantLock详解.pdf

    **源码分析** ReentrantLock类实现了Lock接口,提供了lock()、unlock()等方法。Sync类是内部抽象类,继承自AQS,它有两个子类NonfairSync和FairSync。Sync类中有lock()抽象方法,以及nonfairTryAcquire()和...

    Java分布式应用学习笔记06浅谈并发加锁机制分析

    Java并发包源码分析 让我们先来看一个Java并发包中的示例代码片段: ```java private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { ...

    JUC并发工具包实例.zip

    AQS是Java并发包中的核心抽象类,它是基于FIFO队列的等待锁框架。许多并发工具,如ReentrantLock、Semaphore等,都基于AQS实现。AQS维护了一个内部状态以及一个等待线程的双端队列,通过共享或独占模式来控制资源的...

    阿里P7面试题整理集合

    - **AQS(AbstractQueuedSynchronizer)**:是Java并发包中的核心组件,用于实现锁和其他同步组件,如`ReentrantLock`和`Semaphore`。面试中可能会要求了解其内部队列管理、状态变更的源码细节。 - **...

    KnowledgeBase:技术笔记

    java 源码分析 几乎覆盖了所有常用的类,集合、多线程、并发包等。对源码尽我所能的进行了分析。 也对线程池,AQS的基础CLH 锁,提供了简易的实现,帮助理解。 jvm 将深入理解 jvm 的内容与oracle 官网的内容进行了...

    2021版本Java程序月薪30k简历模板.docx

    简历中提到了synchronized原理、Atomic原子类、BlockingQueue、AQS(并发队列同步器)、CAS(比较并交换)、Lock锁原理,以及线程池原理等,这些都是处理并发问题的核心工具。 3. **开发工具与Web开发**: - 熟练使用...

Global site tag (gtag.js) - Google Analytics