`
春花秋月何时了
  • 浏览: 41835 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Java并发包核心框架AQS之一同步阻塞与唤醒续

 
阅读更多

本文承接未完上文。

四、AQS共享获取/释放源码分析

 在上文中对Du占方式获取和释放共享资源相关的源码进行了分析,本节接着开始对共享式获取/释放资源的源码进行分析。共享式与Du占式的最主要区别在于同一时刻Du占式只能有一个线程成功获取同步资源,而共享式在同一时刻可以有多个线程成功获取同步资源。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。

4.1 void acquireShared(int arg)

此方法是共享模式下线程获取共享资源的顶层入口。它会通过自定义共享资源获取方法acquireShared(int)获取指定量的资源,获取成功则直接返回,获取失败则通过doAcquireShared(int)被加入同步等待队列,直到获取到资源为止,整个过程忽略中断。 

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

   这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:

  1. 负值,代表获取资源失败
  2. 0,代表获取资源成功,但没有剩余资源
  3. 正值,表示获取资源成功,并且还有剩余资源,其他线程还可以尝试去获取。

4.1.1 doAcquireShared(int) 

此方法用于将当前线程加入同步等待队列尾部阻塞,直到被其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

private void doAcquireShared(int arg) {
	 //addWaiter方法在上一文Du占模式中已经分析过,就是将当前线程加入到同步等待队列的队尾,并返回当前线程所在的节点。
	//从这里可以看出,不管是共享模式还是Du占模式,都共享同一个等待队列。
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true; //标记是否成功获取资源
	try {
		boolean interrupted = false; //标记在等待过程中是否被中断过
		for (;;) {   //又是一个CAS“自旋”
			final Node p = node.predecessor();//拿到前驱节点
			if (p == head) {  //如果前驱是头节点即该节点是第二节点,那么就有资格去尝试获取资源
				int r = tryAcquireShared(arg); //尝试获取资源
				if (r >= 0) {  //0表示获取成功,大于0表示获取成功还有剩余资源
					setHeadAndPropagate(node, r);  //将head指向自己,还有剩余资源或者后继节点状态小于0时,可以再唤醒之后的线程
					p.next = null; // help GC
					if (interrupted) //如果等待过程中被打断过,此时将中断补上。
						selfInterrupt();
					failed = false;
					return;
				}
			}
			//重排序(如果有必要的话),然后阻塞进入waiting状态,等待被唤醒
			if (shouldParkAfterFailedAcquire(p, node) && 
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

    通过分析其源码可以发现其逻辑个Du占式获取资源的过程基本一致,唯一的区别就在于这里有一个新的方法setHeadAndPropagate()出现,该方法的作用是重新设置头节点为当前成功获取资源的线程节点,并且如果还有剩余资源或者后继节点告诉了当前节点需要被唤醒,则还要继续唤醒后面的线程,这也是所谓共享式获取资源的最直接的体现。那么它到底是如何继续唤醒后面的线程的,我们接着看起源码:

private void setHeadAndPropagate(Node node, int propagate) {
	Node h = head; 
	setHead(node); //设置当前节点为头节点,并且会把当前节点的前驱节点置为null
	//如果还有剩余资源 或者 后继节点正在等待被唤醒 或者没有后继节点 或者后继节点为共享,就尝试唤醒下一个节点
	if (propagate > 0 || h == null || h.waitStatus < 0 ||
		(h = head) == null || h.waitStatus < 0) {//这里的一堆判断条件有些保守,在有多个线程竞争获取/释放时可能导致不必要的唤醒但是不会造成任何危害。
		Node s = node.next;
		if (s == null || s.isShared())
		//如果后继是独占模式,那么即使剩下的许可大于0也不会继续往后传递唤醒操作,即使后面有结点是共享模式。
                //但是当没有后继节点(s==null)时,还是会去自旋中继续尝试将来新加入进来的不论是共享还是Du占模式的节点。
		doReleaseShared();
	}
}

    通过以上源码可见,此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源等多个保守条件),还会尝试去唤醒后继结点,因为这是是共享模式!至于最终是如何唤醒后继节点的逻辑这里依然是通过调用doReleaseShared()方法实现的,那么我们继续往下看它的源码:

private void doReleaseShared() {
	for (;;) { //"自旋" 操作
		Node h = head; //现在的头节点是已经成功获取共享资源的节点
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			if (ws == Node.SIGNAL) { //如果后继节点正在等待同步资源,并要求被唤醒
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //在唤醒后继节点之前先将自身同步状态置为0
					continue;            //如果头结点状态被改变(例如取消了等待或者等待新的条件满足),不再是SIGNAL,需要重新进行自旋,找到另外的合适的后继节点
				unparkSuccessor(h); //如果设置节点同步状态的CAS操作成功,表示后继节点确实需要被唤醒,那么就唤醒h的后继节点
			}/**
	                  为什么这里要把state状态修改为Node.PROPAGATE?
                          我觉得应该是出于在多线程并发情况下尽可能提高运行效率,达到能够快速释放资源,并及时响应新加入节点的唤醒传播。
                         **/
                        else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
				continue; // CAS更改节点同步状态失败,需要重新进行自旋
		}
                /**
	          为什么这里要加个h == head?
	          什么情况下这里的头节点会被改变?
	          假设当前AQS队列没有任何等待的节点,即head==tail。这时候上面的if判断不成立,执行到这里适合再次判断h==head,如果有新节点添加
	          进来,则h!=head,会重新尝试释放。另外如果在高并发下,头节点也可能会被其他线程获取到资源之后更改。所以这里的判断估计是考虑到并发竞争的情况。
                **/
		if (h == head) 
			break;
	}
}

    可能一开始有点看不明白到底是怎样唤醒后继的,我这里就简单梳理如下:

  1. 首先从doAcquireShared()方法成功获取共享资源开始,发现还有剩余资源,执行setHeadAndPropagate()方法。
  2. setHeadAndPropagate()方法重新设置当前成功获取资源的线程的节点为head节点,并发现其下一个紧接着的节点是共享模式或者已经没有后继节点时,继续执行doReleaseShared()方法。
  3. doReleaseShared()方法发现后继节点确实需要被唤醒,则执行unparkSuccessor()方法唤醒其后继节点。
  4. 被unparkSuccessor()方法唤醒的后继节点从doAcquireShared()方法中的parkAndCheckInterrupt()方法返回之后(假设没有被执行中断)继续尝试获取共享资源,如果成功获取资源,发现依然还有共享资源则重复过程1.2,3,4. 直到某一个被唤醒的线程没有成功获取到指定量的资源,或者其后继节点不是共享模式,继续向后唤醒线程的过程就终止。但在没有后继节点的时候也会继续自旋唤醒被新加入进来的节点,即使其不是共享模式而是Du占模式。

由此可见在共享式的获取同步资源的时候,由于需要在获取资源之后,如果还有剩余资源,还需要对后继节点的线程进行唤醒,所以其实现逻辑比起独占式单纯的获取资源更复杂,如果存在多线程并发的话,那情况将更加复杂。值得一提的是,在共享式获取同步资源的过程中,如果一个线程成功获取到共享资源之后,发现还有剩余的资源,于是唤醒排在它后面的节点,但是被唤醒的线程发现剩余的资源并不足以满足自己的需要(例如剩余5个资源,但是它却需要6个资源). 这时候线程就会再次进入阻塞等待状态,这时候即使剩余的资源满足排在更后面的线程的需要,也不会跳过这个线程去唤醒更后面的线程。因为AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

4.2 boolean releaseShared(int arg)

4.1 小节讲述了共享式获取同步资源的过程,这里开始共享式释放资源的逻辑releaseShared(),此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。

 

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { //通过自定义共享资源释放方法尝试释放资源
            doReleaseShared();  //唤醒后继节点
            return true;
        }
        return false;
}
   有了前面章节的理解,对这个方法应该就比较简单了,首先还是会执行被覆写过的共享资源释放方法tryReleaseShared(),该方法返回true表示成功释放掉指定量的资源,释放成功之后,又调用了doReleaseShared()方法,该方法在上面的4.1.1中已经详细的分析过了,其作用就是将后继节点的线程从doAcquireShared()方法中的阻塞点唤醒,继续尝试获取资源,如果成功则又执行setHeadAndPropagate()方法,若还有剩余则继续执行doReleaseShared(),这样传播式的向后唤醒线程的过程又和4.1.1中的过程一致了。值得一提的是,在doReleaseShared()方法以及unparkSuccessor()方法中并没有对节点模式进行判断,所以只要是在等待的线程都会被唤醒去尝试获取资源,不论它是共享模式还是Du占模式。
 

自此,共享式获取/释放同步资源的基础方法的源码就分析完毕,和独占式类似的,支持响应中断和超时机制的acquireSharedInterruptibly(int)和tryAcquireSharedNanos(int arg, long nanosTimeout)与独占式相对应的方法原理一致,这里就不再详解了。

AQS中还提供了其他一些重要或有用的方法,例如查看所有正在同步队列中等待的线程方法getQueuedThreads(),getExclusiveQueuedThreads(),getSharedQueuedThreads()等,这里也就不再做全面的介绍了。

 

五、自定义AQS同步器

通过前面的分析学习,我们已经对AQS的源码进行详细的分析,对其原理也有了了解即:AQS同步阻塞唤醒机制的主要思想是对共享资源state的获取与释放,以及对同步等待队列的维护。AQS本身已经对同步队列的维护进行了完整的支持,而对共享资源的获取与释放则需要被不同的自定义同步器来实现,应该实现的模板方法已经被AQS设计规范好了,其自定义同步器实现时主要实现以下几种方法:

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。如果允许重入,则state允许累加。
  • tryRelease(int):独占方式。尝试释放资源,已经彻底释放掉资源(例如重入之后,要释放多次,直到state为0)则返回true,否则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果成功释放资源则返回true,否则返回false。
  • isHeldExclusively():该线程是否正在独占资源。一般只有用到condition才需要去实现它。
由于ASQ这种基于模板方式的设计实现,已经对大量细节进行了实现,当我们对其内部原理以及需要被不同类型的自定义同步器实现的模板方法有了了解之后,我们可以很方面的实现我们自己的同步器。 

编写自定义同步组件需要注意的地方

1. 使用新的接口和实现包装同步组件

一般来说,自定义同步器要么是独占方法,要么是共享方式,我们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种对共享资源或者说执行许可state变量的维护方法即可。但是由于AQS同时对这两种模式的资源共享方式都进行了支持,所以如果我们的自定义同步组件属于其中某一种模式(例如独占模式)的时候,采用直接继承AbstractQueuedSynchronizer会将另一种模式资源获取与释放等方法(tryAcquireShared、tryReleaseShared等)也继承下来,但是我们却不需要去实现这些不必要的模板方法,因此应该将这些方法屏蔽掉,防止用户误操作。按照最佳实现,屏蔽的方式是定义一个新的接口,这个接口只定义了该模式下的相关方法,另外再编写一个实现类,对该同步组件的具体实现都封装在实现类中。

2. 同步组件推荐定义为静态内部类

因为某个同步组件通常是为实现特定的目的而实现,可能只适用于特定的场合。如果某个同步组件不具备通用性,我们应该将其定义为一个私有的静态内部类。结合第一点,我们编写的同步组件的最终实现就应该是我们定义的外部接口的实现类中的一个私有静态内部类。

自定义独占式同步组件实例

1. 首先定义一个外部接口Mutex
//不可重入的独占锁接口
public interface Mutex {
    //获取锁
    public void lock();
    //释放锁
    public void unlock();
}
2.  通过私有静态内部类实现同步组件的接口实例MutexImpl
//实现
public class MutexImpl implements Mutex{
    // 仅需要将操作代理到Sync上即可
    private Sync sync=new Sync();
    @Override //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    @Override  //unlock<-->release。两者语义一样:释放资源。
    public void unlock() {
        sync.release(1);
    }

    //私有静态内部类,独占式同步组件实现
    private static class Sync extends AbstractQueuedSynchronizer{
 
        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0,1); //state为0才设置为1,表明不可重入!
        }
 
        @Override
        protected boolean tryRelease(int arg) {
            return compareAndSetState(1,0);//重新将state从1设置为0
        }
    }
}
3.  测试同步组件类MutexMain
public class MutexMain {
	
	static class MutexThread extends Thread{
        private Mutex mutex;
 
        public MutexThread(String name,Mutex mutex) {
            this.mutex = mutex;
            this.setName(name);
        }
 
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"启动..");
            mutex.lock();
            System.out.println(Thread.currentThread().getName()+"获取锁成功..");
            try {
                System.out.println(Thread.currentThread().getName()+"开始执行,当前时间:"+new Date().toLocaleString());
                Thread.currentThread().sleep(1000);//假设线程执行需要1秒钟
                System.out.println(Thread.currentThread().getName()+"结束执行,当前时间:"+new Date().toLocaleString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println(Thread.currentThread().getName()+"释放锁..");
                mutex.unlock();
            }
        }
    }
	
	public static void main(String[] args) {
		Mutex mutex=new MutexImpl();
        for (int i = 0; i <5 ; i++) {
            new MutexThread("线程"+i,mutex).start();
        }
	}

}
通过运行测试方法,可以看到,我们的独占锁的确是起作用了,任意一时刻只有一个线程在运行。 并且线程的执行顺序是严格按照等待顺序的(注意这里的等待顺序有可能和程序编写的启动顺序不一致). 因为同步等待队列是一个FIFO的队列,所以先进入等待队列的先获取到锁。

自定义共享式同步组件实例

该例以银行柜台业务办理窗口为例,假设总共有3个窗口,一共来了十个人排队等候,那么每三个人就可以同时办理业务(共享银行窗口资源),这是典型的共享式同步模式。实现代码如下:
1. 首先定义一个外部接口BankServiceWindows
public interface BankServiceWindows {

	public void handle();
	
	public void release();
}
2.  通过私有静态内部类实现同步组件的接口实例BankServiceWindowsImpl
public class BankServiceWindowsImpl implements BankServiceWindows{
	
	private Sync sync;
	
	public BankServiceWindowsImpl(int count){
		sync = new Sync(count);
	}

	@Override
	public void handle() {
		sync.acquireShared(1);
	}

	@Override
	public void release() {
		sync.releaseShared(1);
	}
	
	private static class Sync extends AbstractQueuedSynchronizer{
		
		private  Sync(int count){
			setState(count);
		}

		@Override
		protected int tryAcquireShared(int arg) {
			for (;;) {
				int current = getState();
				int newCount = current - 1;
				if (newCount < 0 || compareAndSetState(current, newCount)) {
					return newCount;
				}
			}
		}

		@Override
		protected boolean tryReleaseShared(int arg) {
			for (;;) {
				int current = getState();
				int newCount = current + 1;
				if (compareAndSetState(current, newCount)) {
					return true;
				}
			}
		}
		
	}

}
3.  测试同步组件类BankServiceWindowsTest
public class BankServiceWindowsTest {
	
	static class handleThread extends Thread{
		
		private BankServiceWindows windows;
		
		public handleThread(BankServiceWindows windows, String name) {
			super();
			this.windows = windows;
			setName(name);
		}
		
		@Override
        public void run() {
			System.out.println(Thread.currentThread().getName() +" 开始等候");
			windows.handle();
			try {
				System.out.println(Thread.currentThread().getName() +" 开始办理");
				Thread.currentThread().sleep(5000);
				System.out.println(Thread.currentThread().getName() +" 办理结束");
			} catch(Exception e){
				e.printStackTrace();
			}finally{
				windows.release();
			}
		}
	}

	public static void main(String[] args) {
		BankServiceWindows windows =new BankServiceWindowsImpl(3);
		for (int i = 0; i < 10 ; i++) {
			new handleThread(windows, "线程"+i).start();
		}
	}
}
 
通过运行上面的测试代码,可以看到,我们的共享锁可以支持3个线程同时运行。这就是共享式同步组件的意义。
 
 
 
分享到:
评论

相关推荐

    Java 多线程与并发(10-26)-JUC锁- 锁核心类AQS详解.pdf

    总结来说,AQS作为Java并发包中的重要组件,它提供了一种高效、可扩展的同步器实现机制。开发者可以通过继承AQS并实现一些必要的方法来创建自定义的同步器。AQS的使用大大简化了并发编程的复杂度,使得并发控制更加...

    The java. util. concurrent synchronizer framework.pdf

    AQS(AbstractQueuedSynchronizer)是Java.util.concurrent包中同步器的基础框架,它的核心设计思想与实现方法在Doug Lea先生的这篇论文中有详细的介绍。论文详细阐述了AQS框架的原理、设计、实现、应用以及性能等...

    JUC AQS的加解锁.pdf

    Java的并发编程是多线程和多任务处理的核心技术之一,而在Java并发包 java.util.concurrent 中,AQS(AbstractQueuedSynchronizer)扮演了至关重要的角色。AQS是一种框架,用来构建锁或其他同步组件的基础。它提供了...

    7 AQS源码分析.docx

    它基于一种称为CLH(Craig, Landin, and Hagersten)队列的等待队列实现,是Java并发包`java.util.concurrent.locks`中的核心类。本文将详细分析AQS的源码,探讨其工作机制,以及在Java中如何实现不同类型的锁。 ...

    笔记-4、显式锁和AQS(1)1

    再来看AQS,全称为AbstractQueuedSynchronizer,它是Java并发包中一个强大的同步组件,主要用于构建锁和其他同步组件的基础框架。AQS的核心思想是基于一个整型的同步状态(state),并通过CAS操作来保证其更新的原子...

    尚硅谷大厂面试题第三季周阳主讲

    【描述】提到的重点在于JUC(Java并发包)中的可重入锁概念,以及与之相关的锁机制,如LockSupport工具类的使用。此外,还提到了LockSupport如何实现线程的阻塞和唤醒,以及AbstractQueuedSynchronizer (AQS) 在锁和...

    The java.util.concurrent synchronizer framework.pdf

    文档标题“java.util.concurrent同步器框架”和描述“Doug Lea的java.util.concurrent同步器框架”表明本文将探讨由Doug Lea所撰写的关于Java并发编程中同步器框架的内容。文档中提到了AbstractQueuedSynchronizer类...

    Java concurrency之CountDownLatch原理和示例_动力节点Java学院整理

    总的来说,CountDownLatch是Java并发编程中用于协调多线程间同步的一种高效工具,它的核心是基于AQS的计数器管理。理解其工作原理和使用方法对于编写高效的并发程序至关重要。在实际编程中,应根据具体需求选择...

    多线程编程的核心思想.doc

    Lock 接口是 Java 并发包中的一个核心接口,提供了同步机制,用于多线程环境下的线程安全。Lock 接口中的方法有 lock()、lockInterruptibly()、tryLock()、tryLock(long time, TimeUnit unit)和 unlock()。 * lock...

    Java分布式应用学习笔记06浅谈并发加锁机制分析

    `AbstractQueuedSynchronizer`(AQS)是`ReentrantLock`的核心组件之一,它提供了一个框架来构建锁和其他同步组件。AQS维护了一个FIFO线程等待队列,当线程无法获取锁时,会被插入到队列中。队列中的线程会被暂时挂...

    04 并发编程专题08.zip

    AQS是Java并发包中一个重要的抽象类,它为实现锁和其他同步组件提供了基础框架。AQS维护了一个FIFO的等待队列,用于管理线程的等待状态。基于AQS的锁有ReentrantLock、ReadWriteLock等。 AQS的核心是state变量,...

    Java 多线程与并发(12-26)-JUC锁- ReentrantReadWriteLock详解.pdf

    在Java多线程并发编程中,ReentrantReadWriteLock(可重入读写锁)是一个重要的同步工具,它属于Java并发包(java.util.concurrent.locks)中的一个类。这个锁提供了比标准的synchronized关键字更细粒度的控制,允许...

    大并发编程交流

    - **AbstractQueuedSynchronizer (AQS)**:这是一个抽象类,作为 Java 并发包中许多同步器的基础框架。它提供了一个队列同步器,用于构建自定义的同步组件。 - **LockSupport 类**:该类提供了线程之间的阻塞和唤醒...

    最热门的Java 面试题汇总

    10. **AQS(AbstractQueuedSynchronizer)**:AQS是一个抽象的队列同步器,它是Java并发包中的核心组件,支持独占锁和共享锁。 11. **AQS中的共享锁与独占锁**:独占锁只有一个线程能获得,如synchronized;共享锁...

    ReentrantLock流程浅析

    ReentrantLock,即可重入锁,是Java并发包(java.util.concurrent.locks)中的一个核心组件,它提供了比synchronized更灵活的锁机制。ReentrantLock实现了Lock接口,具备公平锁与非公平锁两种模式,同时支持中断等待...

    了解Java线程池执行原理

    AQS是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。Worker包装了Thread,由它去执行任务。 execute方法是线程池执行原理的入口点。当execute方法被调用时,...

    线程面试汇总.docx

    - **AbstractQueuedSynchronizer**(AQS)是Java并发包中的基础框架之一,用于构建锁和其他同步组件的基础框架。AQS维护了一个内部队列来管理等待的线程,并实现了FIFO队列的锁。 ### 原子操作与原子类 - **原子...

    详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    首先,CountDownLatch的内部实现基于Java并发包(java.util.concurrent)中的AbstractQueuedSynchronizer(AQS)类。AQS是一个抽象类,提供了线程同步的基础框架,它使用一个整型状态变量(在这里是count)来管理...

Global site tag (gtag.js) - Google Analytics