精华帖 (16) :: 良好帖 (11) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-07-02
JUC之 AQS 状态依赖的抽象AQS全称为:AbstractQueuedSynchronizer,它是juc的synchronizer的基础
状态依赖的管理 在JUC中,不管是FutureTask、CountDownLatch、Lock、还是信号量,CyclicBarrier,从某种角度来说,他们都是依赖某种状态,或者说条件,虽然这些条件的值不同:
从某个角度来说,都是在做下面的事情(另一个线程会发送notify消息唤醒等待的线程):
while (!state){ wait(); } change state();
在单线程的程序中,进入等待状态,那么永远也不能被唤醒了,在并发程序中,基于状态的条件是会在其他线程中改变的,因此就是其他线程就需要被阻塞,直到可以继续.
依赖状态在等待的时候可以理解为进入了一个FIFO队列,前面提到的一些类,就是在状态未满足的情况下阻塞,并且加入队列,在合适的时机,唤醒。Java提供了这样的内部队列,但是它是和锁相关的,每个对象都有一把锁,每把锁都有对应的队列,Object中的wait、notify、notifyAll就是操作这个队列的API(参考系列2,话说,话说java这也太不直观了)。从这个角度来看,要操作锁的队列,需要先有锁,因此wait/notify需要在锁中调用,另外,这也确保了,在观察/操作状态的时候,不会有其他线程修改状态!类似这样: Syn{ while (!state){ wait(); } change state(); } AQS 状态 前面提到了状态以后,以及wait/notify的队列支持,但是内部锁很不灵活,wait/notify用起来也有很多麻烦的地方,特别是多个线程等待因为不同的原因等待同一个对象上的消息的时候,非常不直观,因此FutureTask/CountDownLatch等是用另一种方式实现的,看ReentrantLock的源码,会发现lock、tryLock等方法都是调用内部类Sync实现,Sync又是继承自AQS,其他类也是这样的,AQS是这些synchronizer类的基础。
状态依赖的抽象
AQS将上面的状态依赖合理抽象,设计理念主要有两部分:
状态主要由字段state、getSate、setState、compareAndSetState来表达、操作,状态的原子操作非常重要,可以保证实现synchronizer的语义,否则就会有并发问题;队列中保存的不仅仅是线程信息,它保存的是AQS的一个内部类,Node,它的结构后面介绍
在进行一个因为某些原因可能阻塞的操作时,大致流程是下面这样的:
2和3、5的操作,是需要不同的synchronizer自己实现的:
ReentrantLock判断state是否为0,这时将当前线程设置为拥有者并且修改状态改为1,否则挂起当前线程,加入等待队列
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 独占锁,判断状态是否为0 if (c == 0) { //通过原子操作设置状态 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
CountDownLatch则是判断状态是否减为0,不是的话则挂起线程,放入阻塞队列;
//如果失败,表示状态没有ok,需要放入等待队列,并且通过LockSupport挂起线程 public int tryAcquireShared(int acquires) { return getState() == 0? 1 : -1; }
Semaphore则是判断是否还有可用的许可,也就是判断(available-acquires)是否大于0,如果许可是1的话,其实也就是个独占锁,否则则挂起线程,放入阻塞队列;
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
FutureTask的条件是判断任务是否结束:
protected int tryAcquireShared(int ignore) { //判断状态是被已经ok还是cancel了 return innerIsDone()? 1 : -1; }
在调用Get方法的时候,如果状态没有ok,会被放入队列,阻塞: V innerGet() throws InterruptedException, ExecutionException { //here acquireSharedInterruptibly(0); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; }
对应步骤5,ReentrantLock调用unLock,会修改state为0,并且设置锁拥有者为null,如果有等待中的线程,唤醒一个:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
CountDownLatch会调用countDown方法,每次state减1,直到state为0,表示状态ok,唤醒所有其他线程。
public boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; // 注意状态的修改都是原子操作,并且都是for循环里的 if (compareAndSetState(c, nextc)) return nextc == 0; } }Semaphore会调用release方法,每次将许可加1,这样其他线程可以重新获得许可:如果队列里有阻塞的线程的话,唤醒
protected final boolean tryReleaseShared(int releases) { for (;;) { int p = getState(); if (compareAndSetState(p, p + releases)) return true; } }
FutureTask则是在设置状态为RUN,并且唤醒阻塞的线程
void innerSet(V v) { for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { releaseShared(0); return; } if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); return; } } }
看了这么多典型的synchronizer可以发现,这种颜色标记出来的,是每种synchronizer可能不同的地方,需要子类自己实现;而这种颜色的,不管是什么synchronizer都是一样的,一个队列,就那么几个操作,比如入对挂起,单个唤醒出对列、全部唤醒出对列,所以它的代码在AQS里面,主要是一些队列和锁的操作,后面再抓个例子看看;需要自定义的主要有以下几个操作:
tryAcquire 返回独占模式下状态的判断, Reentrant根据返回决定是否阻塞线程,加入等待队列 tryRelease,返回独占模式下的状态判断,Reentrant判断是否成功释放锁,并且是否唤醒后面的等待线程 isHeldExclusively:返回独占模式下,当前线程是否占用了锁
tryAcquireShared,非独占模式下的状态判断,Latch判断state是否为0,根据返回决定是否阻塞当前线程,加入等待 tryReleaseShared,非独占模式下的状态判断,Latch将state减1状态,并且父类根据返回决定是否需要唤醒所有等待线程
tryAcquire,tryRelease和isHeldExclusively三个方法为需要独占形式获取的synchronizer实现的,而tryAcquireShared和tryReleasedShared为需要共享形式获取的synchronizer实现。(从我的角度来看,有tryAcquire和tryRelease就够了,可能是为了语义上更加明显吧)
Node的结构 队列中保存的Node元素结构是这样的:
节点的状态
状态的判定非常简单,不用判断特定的数值,非负数表示当前节点不必去发出通知;
SIGNAL状态表示该节点的后继节点需要被唤醒。这个状态的作用主要是告诉前置节点:“你结束以后,你后面还有兄弟需要被唤醒”,如果没有这个状态,那么unlock以后,仅仅是修改锁的状态,不会有什么操作! 如果因为中断/超时,该请求已经取消,会修改状态未Cancled,遇到取消的任务,那么需要踢出这些节点,并把后面的节点接上,Pre这个引用在一般的CLH锁中是没有的,这里主要是为了在取消的时候,保证取消节点的next可以指向取消节点的pre。
另外, 注意:依赖Node状态去判断是否有后继结点需要唤醒,又会牵涉到变量的竞争,为了避免竞争,必须用1.原子操作先设置Node状态,2.再次尝试获取锁,3.失败以后再阻塞线程!看下面shouldParkAfterFailedAcquire的例子
它是是一个双向链表:
+----------+ prev +--------+ +------+ | head | <------- | | <-------- | tail | | | ------->| | --------à| | +---------+ next +---------+ +------+
Head节点就是代表正在占用锁的节点,但是该链表是延迟初始化的,也就是说,并不会在第一次有代码获取锁的时候就初始化这个队列,只会在第一次真正有线程阻塞,需要加入阻塞队列的时候初始化!因此:
写道
1.在只有一个线程占用锁的时候,队列为空
2.在有一个阻塞线程的时候是这样: Dummy head----àwait thread1(tail) 2.两个阻塞线程的时候是这样: Dummy head----àwait thread1----àwait thread 2(tail) 4.当前面一个线程结束的时候,会唤醒head后的第一个节点: wait thread1(head)----àwait thread 2(tail) LockSupport
另外,还必须了解下一个工具类,LockSupport 为阻塞线程提供基础的功能,它由一对park和unpark组成,park会阻塞当前线程,unpark“唤醒”等待线程;内部使用了类似信号量的“许可”机制,该许可为0,park会在许可等于0的时候下阻塞,等于1的时候立即返回,并且将许可减为0,umpark会尝试唤醒线程,并且将许可+1(最大值就是1)。因此,如果先调用unpark方法,再调用park是无效的,因为这时候许可为1,park会立即返回,搞段简单的代码测试下:
public static void main(String args[]) throws Exception { //先调用下unpark LockSupport.unpark(Thread.currentThread()); LockSupport.park(); //打出了not work,说明没起作用 System.out.println("not work"); LockSupport.park(); System.out.println(" wok"); }因此,从原则上来说,最好park/unpark按顺序,依次出现。
另外也并不是只有调用unpark方法才会返回,在下面3中情况下,park都会返回: 1.其他线程对当前线程调用了unpark方法, 2.其他线程中断了当前阻塞的线程 3.“不靠谱的”,毫无理由的返回,这类似一种“忙等待”的机制,不断地返回,不断地检查条件,不过这种方式自旋的时间更短一些,因为这个原因,需要向下面这样调用park方法:
while (!canProceed()) { ... LockSupport.park(this); }wait/notify是和对象、锁、等待队列、线程强关联的,在调用wait的时候,必须有锁,这时候释放对象上的锁,将当前线程加入该对象锁的等待队列;
park/unpark和当前对象、锁、等待队列无关,park方法只是会挂起当前线程;unpark(thread)方法唤醒对应的线程,至于是否有锁、是否放入对待队列,我们并不关心!下面的例子证明了park/wait他们之间是没什么关联的:
public class LockSupportAndWait { public static void main(String[] args) throws InterruptedException{ Thread t=new Thread(){ public void run(){ try { System.out.println("wait"); synchronized(this){ this.wait(); } System.out.println("notify work"); System.out.println("============================="); System.out.println("park"); //因为上面提到的原因,这里调用两次park LockSupport.park(); LockSupport.park(); System.out.println("unpark work"); } catch (InterruptedException ex) { } } }; t.start(); TimeUnit.MILLISECONDS.sleep(100); System.out.println("go to unpark"); LockSupport.unpark(t); System.out.println("go to notify"); goNotify(t); TimeUnit.MILLISECONDS.sleep(1000); System.out.println("go to notify"); goNotify(t); System.out.println("go to unpark"); LockSupport.unpark(t); } public static void goNotify(Thread t){ synchronized(t){ t.notify(); } } }结果(可以看到他们之间没有影响): wait go to unpark go to notify notify work ============================= park go to notify go to unpark unpark work
ps:下一篇再找个源码分析下吧,这篇长了点 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
浏览 2298 次