本文承接未完上文。
四、AQS共享获取/释放源码分析
在上文中对Du占方式获取和释放共享资源相关的源码进行了分析,本节接着开始对共享式获取/释放资源的源码进行分析。共享式与Du占式的最主要区别在于同一时刻Du占式只能有一个线程成功获取同步资源,而共享式在同一时刻可以有多个线程成功获取同步资源。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。
本文承接未完上文。
在上文中对Du占方式获取和释放共享资源相关的源码进行了分析,本节接着开始对共享式获取/释放资源的源码进行分析。共享式与Du占式的最主要区别在于同一时刻Du占式只能有一个线程成功获取同步资源,而共享式在同一时刻可以有多个线程成功获取同步资源。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。
此方法是共享模式下线程获取共享资源的顶层入口。它会通过自定义共享资源获取方法acquireShared(int)获取指定量的资源,获取成功则直接返回,获取失败则通过doAcquireShared(int)被加入同步等待队列,直到获取到资源为止,整个过程忽略中断。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:
此方法用于将当前线程加入同步等待队列尾部阻塞,直到被其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。
private void doAcquireShared(int arg) { //addWaiter方法在上一文Du占模式中已经分析过,就是将当前线程加入到同步等待队列的队尾,并返回当前线程所在的节点。 //从这里可以看出,不管是共享模式还是Du占模式,都共享同一个等待队列。 final Node node = addWaiter(Node.SHARED); boolean failed = true; //标记是否成功获取资源 try { boolean interrupted = false; //标记在等待过程中是否被中断过 for (;;) { //又是一个CAS“自旋” final Node p = node.predecessor();//拿到前驱节点 if (p == head) { //如果前驱是头节点即该节点是第二节点,那么就有资格去尝试获取资源 int r = tryAcquireShared(arg); //尝试获取资源 if (r >= 0) { //0表示获取成功,大于0表示获取成功还有剩余资源 setHeadAndPropagate(node, r); //将head指向自己,还有剩余资源或者后继节点状态小于0时,可以再唤醒之后的线程 p.next = null; // help GC if (interrupted) //如果等待过程中被打断过,此时将中断补上。 selfInterrupt(); failed = false; return; } } //重排序(如果有必要的话),然后阻塞进入waiting状态,等待被唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
通过分析其源码可以发现其逻辑个Du占式获取资源的过程基本一致,唯一的区别就在于这里有一个新的方法setHeadAndPropagate()出现,该方法的作用是重新设置头节点为当前成功获取资源的线程节点,并且如果还有剩余资源或者后继节点告诉了当前节点需要被唤醒,则还要继续唤醒后面的线程,这也是所谓共享式获取资源的最直接的体现。那么它到底是如何继续唤醒后面的线程的,我们接着看起源码:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); //设置当前节点为头节点,并且会把当前节点的前驱节点置为null //如果还有剩余资源 或者 后继节点正在等待被唤醒 或者没有后继节点 或者后继节点为共享,就尝试唤醒下一个节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {//这里的一堆判断条件有些保守,在有多个线程竞争获取/释放时可能导致不必要的唤醒但是不会造成任何危害。 Node s = node.next; if (s == null || s.isShared()) //如果后继是独占模式,那么即使剩下的许可大于0也不会继续往后传递唤醒操作,即使后面有结点是共享模式。 //但是当没有后继节点(s==null)时,还是会去自旋中继续尝试将来新加入进来的不论是共享还是Du占模式的节点。 doReleaseShared(); } }
通过以上源码可见,此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源等多个保守条件),还会尝试去唤醒后继结点,因为这是是共享模式!至于最终是如何唤醒后继节点的逻辑这里依然是通过调用doReleaseShared()方法实现的,那么我们继续往下看它的源码:
private void doReleaseShared() { for (;;) { //"自旋" 操作 Node h = head; //现在的头节点是已经成功获取共享资源的节点 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //如果后继节点正在等待同步资源,并要求被唤醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //在唤醒后继节点之前先将自身同步状态置为0 continue; //如果头结点状态被改变(例如取消了等待或者等待新的条件满足),不再是SIGNAL,需要重新进行自旋,找到另外的合适的后继节点 unparkSuccessor(h); //如果设置节点同步状态的CAS操作成功,表示后继节点确实需要被唤醒,那么就唤醒h的后继节点 }/** 为什么这里要把state状态修改为Node.PROPAGATE? 我觉得应该是出于在多线程并发情况下尽可能提高运行效率,达到能够快速释放资源,并及时响应新加入节点的唤醒传播。 **/ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // CAS更改节点同步状态失败,需要重新进行自旋 } /** 为什么这里要加个h == head? 什么情况下这里的头节点会被改变? 假设当前AQS队列没有任何等待的节点,即head==tail。这时候上面的if判断不成立,执行到这里适合再次判断h==head,如果有新节点添加 进来,则h!=head,会重新尝试释放。另外如果在高并发下,头节点也可能会被其他线程获取到资源之后更改。所以这里的判断估计是考虑到并发竞争的情况。 **/ if (h == head) break; } }
可能一开始有点看不明白到底是怎样唤醒后继的,我这里就简单梳理如下:
由此可见在共享式的获取同步资源的时候,由于需要在获取资源之后,如果还有剩余资源,还需要对后继节点的线程进行唤醒,所以其实现逻辑比起独占式单纯的获取资源更复杂,如果存在多线程并发的话,那情况将更加复杂。值得一提的是,在共享式获取同步资源的过程中,如果一个线程成功获取到共享资源之后,发现还有剩余的资源,于是唤醒排在它后面的节点,但是被唤醒的线程发现剩余的资源并不足以满足自己的需要(例如剩余5个资源,但是它却需要6个资源). 这时候线程就会再次进入阻塞等待状态,这时候即使剩余的资源满足排在更后面的线程的需要,也不会跳过这个线程去唤醒更后面的线程。因为AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。
4.1 小节讲述了共享式获取同步资源的过程,这里开始共享式释放资源的逻辑releaseShared(),此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //通过自定义共享资源释放方法尝试释放资源 doReleaseShared(); //唤醒后继节点 return true; } return false; }有了前面章节的理解,对这个方法应该就比较简单了,首先还是会执行被覆写过的共享资源释放方法tryReleaseShared(),该方法返回true表示成功释放掉指定量的资源,释放成功之后,又调用了doReleaseShared()方法,该方法在上面的4.1.1中已经详细的分析过了,其作用就是将后继节点的线程从doAcquireShared()方法中的阻塞点唤醒,继续尝试获取资源,如果成功则又执行setHeadAndPropagate()方法,若还有剩余则继续执行doReleaseShared(),这样传播式的向后唤醒线程的过程又和4.1.1中的过程一致了。值得一提的是,在doReleaseShared()方法以及unparkSuccessor()方法中并没有对节点模式进行判断,所以只要是在等待的线程都会被唤醒去尝试获取资源,不论它是共享模式还是Du占模式。
自此,共享式获取/释放同步资源的基础方法的源码就分析完毕,和独占式类似的,支持响应中断和超时机制的acquireSharedInterruptibly(int)和tryAcquireSharedNanos(int arg, long nanosTimeout)与独占式相对应的方法原理一致,这里就不再详解了。
AQS中还提供了其他一些重要或有用的方法,例如查看所有正在同步队列中等待的线程方法getQueuedThreads(),getExclusiveQueuedThreads(),getSharedQueuedThreads()等,这里也就不再做全面的介绍了。
通过前面的分析学习,我们已经对AQS的源码进行详细的分析,对其原理也有了了解即:AQS同步阻塞唤醒机制的主要思想是对共享资源state的获取与释放,以及对同步等待队列的维护。AQS本身已经对同步队列的维护进行了完整的支持,而对共享资源的获取与释放则需要被不同的自定义同步器来实现,应该实现的模板方法已经被AQS设计规范好了,其自定义同步器实现时主要实现以下几种方法:
//不可重入的独占锁接口 public interface Mutex { //获取锁 public void lock(); //释放锁 public void unlock(); }2. 通过私有静态内部类实现同步组件的接口实例MutexImpl
//实现 public class MutexImpl implements Mutex{ // 仅需要将操作代理到Sync上即可 private Sync sync=new Sync(); @Override //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。 public void lock() { sync.acquire(1); } @Override //unlock<-->release。两者语义一样:释放资源。 public void unlock() { sync.release(1); } //私有静态内部类,独占式同步组件实现 private static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { return compareAndSetState(0,1); //state为0才设置为1,表明不可重入! } @Override protected boolean tryRelease(int arg) { return compareAndSetState(1,0);//重新将state从1设置为0 } } }3. 测试同步组件类MutexMain
public class MutexMain { static class MutexThread extends Thread{ private Mutex mutex; public MutexThread(String name,Mutex mutex) { this.mutex = mutex; this.setName(name); } @Override public void run() { System.out.println(Thread.currentThread().getName()+"启动.."); mutex.lock(); System.out.println(Thread.currentThread().getName()+"获取锁成功.."); try { System.out.println(Thread.currentThread().getName()+"开始执行,当前时间:"+new Date().toLocaleString()); Thread.currentThread().sleep(1000);//假设线程执行需要1秒钟 System.out.println(Thread.currentThread().getName()+"结束执行,当前时间:"+new Date().toLocaleString()); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"释放锁.."); mutex.unlock(); } } } public static void main(String[] args) { Mutex mutex=new MutexImpl(); for (int i = 0; i <5 ; i++) { new MutexThread("线程"+i,mutex).start(); } } }通过运行测试方法,可以看到,我们的独占锁的确是起作用了,任意一时刻只有一个线程在运行。 并且线程的执行顺序是严格按照等待顺序的(注意这里的等待顺序有可能和程序编写的启动顺序不一致). 因为同步等待队列是一个FIFO的队列,所以先进入等待队列的先获取到锁。
public interface BankServiceWindows { public void handle(); public void release(); }2. 通过私有静态内部类实现同步组件的接口实例BankServiceWindowsImpl
public class BankServiceWindowsImpl implements BankServiceWindows{ private Sync sync; public BankServiceWindowsImpl(int count){ sync = new Sync(count); } @Override public void handle() { sync.acquireShared(1); } @Override public void release() { sync.releaseShared(1); } private static class Sync extends AbstractQueuedSynchronizer{ private Sync(int count){ setState(count); } @Override protected int tryAcquireShared(int arg) { for (;;) { int current = getState(); int newCount = current - 1; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override protected boolean tryReleaseShared(int arg) { for (;;) { int current = getState(); int newCount = current + 1; if (compareAndSetState(current, newCount)) { return true; } } } } }3. 测试同步组件类BankServiceWindowsTest
public class BankServiceWindowsTest { static class handleThread extends Thread{ private BankServiceWindows windows; public handleThread(BankServiceWindows windows, String name) { super(); this.windows = windows; setName(name); } @Override public void run() { System.out.println(Thread.currentThread().getName() +" 开始等候"); windows.handle(); try { System.out.println(Thread.currentThread().getName() +" 开始办理"); Thread.currentThread().sleep(5000); System.out.println(Thread.currentThread().getName() +" 办理结束"); } catch(Exception e){ e.printStackTrace(); }finally{ windows.release(); } } } public static void main(String[] args) { BankServiceWindows windows =new BankServiceWindowsImpl(3); for (int i = 0; i < 10 ; i++) { new handleThread(windows, "线程"+i).start(); } } }
相关推荐
总结来说,AQS作为Java并发包中的重要组件,它提供了一种高效、可扩展的同步器实现机制。开发者可以通过继承AQS并实现一些必要的方法来创建自定义的同步器。AQS的使用大大简化了并发编程的复杂度,使得并发控制更加...
AQS(AbstractQueuedSynchronizer)是Java.util.concurrent包中同步器的基础框架,它的核心设计思想与实现方法在Doug Lea先生的这篇论文中有详细的介绍。论文详细阐述了AQS框架的原理、设计、实现、应用以及性能等...
Java的并发编程是多线程和多任务处理的核心技术之一,而在Java并发包 java.util.concurrent 中,AQS(AbstractQueuedSynchronizer)扮演了至关重要的角色。AQS是一种框架,用来构建锁或其他同步组件的基础。它提供了...
它基于一种称为CLH(Craig, Landin, and Hagersten)队列的等待队列实现,是Java并发包`java.util.concurrent.locks`中的核心类。本文将详细分析AQS的源码,探讨其工作机制,以及在Java中如何实现不同类型的锁。 ...
再来看AQS,全称为AbstractQueuedSynchronizer,它是Java并发包中一个强大的同步组件,主要用于构建锁和其他同步组件的基础框架。AQS的核心思想是基于一个整型的同步状态(state),并通过CAS操作来保证其更新的原子...
【描述】提到的重点在于JUC(Java并发包)中的可重入锁概念,以及与之相关的锁机制,如LockSupport工具类的使用。此外,还提到了LockSupport如何实现线程的阻塞和唤醒,以及AbstractQueuedSynchronizer (AQS) 在锁和...
文档标题“java.util.concurrent同步器框架”和描述“Doug Lea的java.util.concurrent同步器框架”表明本文将探讨由Doug Lea所撰写的关于Java并发编程中同步器框架的内容。文档中提到了AbstractQueuedSynchronizer类...
总的来说,CountDownLatch是Java并发编程中用于协调多线程间同步的一种高效工具,它的核心是基于AQS的计数器管理。理解其工作原理和使用方法对于编写高效的并发程序至关重要。在实际编程中,应根据具体需求选择...
Lock 接口是 Java 并发包中的一个核心接口,提供了同步机制,用于多线程环境下的线程安全。Lock 接口中的方法有 lock()、lockInterruptibly()、tryLock()、tryLock(long time, TimeUnit unit)和 unlock()。 * lock...
`AbstractQueuedSynchronizer`(AQS)是`ReentrantLock`的核心组件之一,它提供了一个框架来构建锁和其他同步组件。AQS维护了一个FIFO线程等待队列,当线程无法获取锁时,会被插入到队列中。队列中的线程会被暂时挂...
AQS是Java并发包中一个重要的抽象类,它为实现锁和其他同步组件提供了基础框架。AQS维护了一个FIFO的等待队列,用于管理线程的等待状态。基于AQS的锁有ReentrantLock、ReadWriteLock等。 AQS的核心是state变量,...
在Java多线程并发编程中,ReentrantReadWriteLock(可重入读写锁)是一个重要的同步工具,它属于Java并发包(java.util.concurrent.locks)中的一个类。这个锁提供了比标准的synchronized关键字更细粒度的控制,允许...
- **AbstractQueuedSynchronizer (AQS)**:这是一个抽象类,作为 Java 并发包中许多同步器的基础框架。它提供了一个队列同步器,用于构建自定义的同步组件。 - **LockSupport 类**:该类提供了线程之间的阻塞和唤醒...
10. **AQS(AbstractQueuedSynchronizer)**:AQS是一个抽象的队列同步器,它是Java并发包中的核心组件,支持独占锁和共享锁。 11. **AQS中的共享锁与独占锁**:独占锁只有一个线程能获得,如synchronized;共享锁...
ReentrantLock,即可重入锁,是Java并发包(java.util.concurrent.locks)中的一个核心组件,它提供了比synchronized更灵活的锁机制。ReentrantLock实现了Lock接口,具备公平锁与非公平锁两种模式,同时支持中断等待...
AQS是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。Worker包装了Thread,由它去执行任务。 execute方法是线程池执行原理的入口点。当execute方法被调用时,...
- **AbstractQueuedSynchronizer**(AQS)是Java并发包中的基础框架之一,用于构建锁和其他同步组件的基础框架。AQS维护了一个内部队列来管理等待的线程,并实现了FIFO队列的锁。 ### 原子操作与原子类 - **原子...
首先,CountDownLatch的内部实现基于Java并发包(java.util.concurrent)中的AbstractQueuedSynchronizer(AQS)类。AQS是一个抽象类,提供了线程同步的基础框架,它使用一个整型状态变量(在这里是count)来管理...