前面我们看到了Lock和synchronized都能正常的保证数据的一致性(上文例子中执行的结果都是20000000),也看到了Lock的优势,那究竟他们是什么原理来保障的呢?今天我们就来探讨下Java中的锁机制!
Synchronized是基于JVM来保证数据同步的,而Lock则是在硬件层面,依赖特殊的CPU指令实现数据同步的,那究竟是如何来实现的呢?我们一一看来!
一、synchronized的实现方案
synchronized比较简单,语义也比较明确,尽管Lock推出后性能有较大提升,但是基于其使用简单,语义清晰明了,使用还是比较广泛的,其应用层的含义是把任意一个非NULL的对象当作锁。当synchronized作用于方法时,锁住的是对象的实例(this),当作用于静态方法时,锁住的是Class实例,又因为Class的相关数据存储在永久带,因此静态方法锁相当于类的一个全局锁,当synchronized作用于一个对象实例时,锁住的是对应的代码块。在Sun的HotSpot JVM实现中,其实synchronized锁还有一个名字:对象监视器。
当多个线程一起访问某个对象监视器的时候,对象监视器会将这些请求存储在不同的容器中。
1、 Contention List:竞争队列,所有请求锁的线程首先被放在这个竞争队列中
2、 Entry List:Contention List中那些有资格成为候选资源的线程被移动到Entry List中
3、 Wait Set:哪些调用wait方法被阻塞的线程被放置在这里
4、 OnDeck:任意时刻,最多只有一个线程正在竞争锁资源,该线程被成为OnDeck
5、 Owner:当前已经获取到所资源的线程被称为Owner
6、 !Owner:当前释放锁的线程
下图展示了他们之前的关系
ContentionList
并不是真正意义上的一个队列。仅仅是一个虚拟队列,它只有Node以及对应的Next指针构成,并没有Queue的数据结构。每次新加入Node会在队头进行,通过CAS改变第一个节点为新增节点,同时新增阶段的next指向后续节点,而取数据都在队列尾部进行。
JVM
每次从队列的尾部取出一个数据用于锁竞争候选者(OnDeck),但是并发情况下,ContentionList会被大量的并发线程进行CAS访问,为了降低对尾部元素的竞争,JVM会将一部分线程移动到EntryList中作为候选竞争线程。Owner线程会在unlock时,将ContentionList中的部分线程迁移到EntryList中,并指定EntryList中的某个线程为OnDeck线程(一般是最先进去的那个线程)。Owner线程并不直接把锁传递给OnDeck线程,而是把锁竞争的权利交个OnDeck,OnDeck需要重新竞争锁。这样虽然牺牲了一些公平性,但是能极大的提升系统的吞吐量,在JVM中,也把这种选择行为称之为“竞争切换”。
OnDeck线程获取到锁资源后会变为Owner线程,而没有得到锁资源的仍然停留在EntryList中。如果Owner线程被wait方法阻塞,则转移到WaitSet队列中,直到某个时刻通过notify或者notifyAll唤醒,会重新进去EntryList中。
处于ContentionList、EntryList、WaitSet中的线程都处于阻塞状态,该阻塞是由操作系统来完成的(Linux内核下采用pthread_mutex_lock内核函数实现的)。该线程被阻塞后则进入内核调度状态,会导致系统在用户和内核之间进行来回切换,严重影响锁的性能。为了缓解上述性能问题,JVM引入了自旋锁。原理非常简单,如果Owner线程能在很短时间内释放锁资源,那么哪些等待竞争锁的线程可以稍微等一等(自旋)而不是立即阻塞,当Owner线程释放锁后可立即获取锁,进而避免用户线程和内核的切换。但是Owner可能执行的时间会超过设定的阈值,争用线程在一定时间内还是获取不到锁,这是争用线程会停止自旋进入阻塞状态。基本思路就是先自旋等待一段时间看能否成功获取,如果不成功再执行阻塞,尽可能的减少阻塞的可能性,这对于占用锁时间比较短的代码块来说性能能大幅度的提升!
但是有个头大的问题,何为自旋?其实就是执行几个空方法,稍微等一等,也许是一段时间的循环,也许是几行空的汇编指令,其目的是为了占着CPU的资源不释放,等到获取到锁立即进行处理。但是如何去选择自旋的执行时间呢?如果自旋执行时间太长,会有大量的线程处于自旋状态占用CPU资源,进而会影响整体系统的性能。因此自旋的周期选的额外重要!
JVM对于自旋周期的选择,基本认为一个线程上下文切换的时间是最佳的一个时间,同时JVM还针对当前CPU的负荷情况做了较多的优化
1、 如果平均负载小于CPUs则一直自旋
2、 如果有超过(CPUs/2)个线程正在自旋,则后来线程直接阻塞
3、 如果正在自旋的线程发现Owner发生了变化则延迟自旋时间(自旋计数)或进入阻塞
4、 如果CPU处于节电模式则停止自旋
5、 自旋时间的最坏情况是CPU的存储延迟(CPU A存储了一个数据,到CPU B得知这个数据直接的时间差)
6、 自旋时会适当放弃线程优先级之间的差异
Synchronized在线程进入ContentionList时,等待的线程就通过自旋先获取锁,如果获取不到就进入ContentionList,这明显对于已经进入队列的线程是不公平的,还有一个不公平的事情就是自旋获取锁的线程还可能直接抢占OnDeck线程的锁资源。
在JVM6以后还引入了一种偏向锁,主要用于解决无竞争下面锁的性能问题。我们首先来看没有这个会有什么样子的问题。
现在基本上所有的锁都是可重入的,即已经获取锁的线程可以多次锁定/解锁监视对象,但是按照之前JVM的设计,每次加锁解锁都采用CAS操作,而CAS会引发本地延迟(下面会讲原因),因此偏向锁希望线程一旦获取到监视对象后,之后让监视对象偏向这个锁,进而避免多次CAS操作,说白了就是设置了一个变量,发现是这个线程过来的就避免再走加锁解锁流程。
那CAS为什么会引发本地延迟呢?这要从多核处(SMP)理架构说起(前面有提到过--JVM内存模型),下图基本上表明了多核处理的架构
多核
CPU会共享一条系统总线,靠总线和主存通讯,但是每个CPU又有自己的一级缓存,而CAS是一条原子指令,其作用是让CPU比较,如果相同则进行数据更新,而这些是基于硬件实现的(JVM只是封装了硬件的汇编调用,AtomicInteger其实是通过调用这些封装后的接口实现的)。多核运算时,由于线程切换,很有可能第二次取值是在另外一核CPU上执行的。假设Core1和Core2把对应的某个值加载到自己的一级缓存时,某个时刻,core1更新了这个数据并通过总线通知主存,此时core2的一级缓存中的数据就失效了,他需要从主存中重新加载一次到一级缓存中,大家通过总线通讯被称之为一致性流量,总线的通讯能力有限,当缓存一致性流量过大时,总线会成为瓶颈,而当Core1和Core2的数据再次一致时,被称为缓存一致性!
而CAS要保证数据的一致性,恰好会引发比较多的一致性流量,如果有很多线程共享一个对象,当某个线程成功执行一次CAS时会引发总线风暴,这就是本地延迟,而偏向锁就是为了消除CAS,降低Cache一致性流量!
当然并不是所有的CAS都会引发总线风暴,这和Cache一致性协议有关系的。但是偏向锁的引入却带来了另外一个问题,在很多线程竞争使用中,如果一个线程持有偏向锁,另外一个线程想争用偏向对象,拥有者想释放这个偏向锁,释放会带来额外的性能开销,但是总体来说偏向锁带来的好处还是大于CAS的代价的。
二、Lock的实现
与synchronized不同的是,Lock书纯Java实现的,与底层的JVM无关。在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),其实现都依赖java.util.concurrent.AbstractQueuedSynchronizer类(简称AQS),实现思路都大同小异,因此我们以ReentrantLock作为讲解切入点。
分析之前我们先来花点时间看下AQS。AQS是我们后面将要提到的CountDownLatch/FutureTask/ReentrantLock/RenntrantReadWriteLock/Semaphore的基础,因此AQS也是Lock和Excutor实现的基础。它的基本思想就是一个同步器,支持获取锁和释放锁两个操作。
获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。
while(synchronization state does not allow acquire){
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话,就唤醒队列中的一个或者更多线程。
update synchronization state;
if(state may permit a blocked thread to acquire)
unlock one or more queued threads;
要支持上面两个操作就必须有下面的条件:
1、 状态位必须是原子操作的
2、 阻塞和唤醒线程
3、 一个有序的队列,用于支持锁的公平性
怎么样才能满足这几个条件呢?
1、 原子操作状态位,前面我们已经提到了,实际JDK中也是通过一个32bit的整数位进行CAS操作来实现的。
2、 阻塞和唤醒,JDK1.5之前的API中并没有阻塞一个线程,然后在将来的某个时刻唤醒它(wait/notify是基于synchronized下才生效的,在这里不算),JDK5之后利用JNI在LockSupport 这个类中实现了相关的特性!
3、 有序队列:在AQS中采用CLH队列来解决队列的有序问题。
我们来看下ReentrantLock的调用过程
经过源码分析,我们看到ReentrantLock把所有的Lock都委托给Sync类进行处理,该类继承自AQS,其类关系图如下
其中
Sync又有两个final static的子类NonfairSync和FairSync用于支持非公平锁和公平锁。我们先来挑一个看下对应Reentrant.lock()的调用过程(默认为非公平锁)
这些模版很难让我们直观的看到整个调用过程,但是通过上面的过程图和
AbstractQueuedSynchronizer的注释可以看出,AbstractQueuedSynchronizer抽象了大多数Lock的功能,而只把tryAcquire(int)委托给子类进行多态实现。tryAcquire用于判断对应线程事都能够获取锁,无论成功与否,AbstractQueuedSynchronizer都将处理后面的流程。
简单来讲,AQS会把所有请求锁的线程组成一个CLH的队列,当一个线程执行完毕释放锁(Lock.unlock())的时候,AQS会激活其后继节点,正在执行的线程不在队列当中,而那些等待的线程全部处于阻塞状态,经过源码分析,我们可以清楚的看到最终是通过LockSupport.park()实现的,而底层是调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。其运行示意图如下
与
synchronized相同的是,这个也是一个虚拟队列,并不存在真正的队列示例,仅存在节点之前的前后关系。(注:原生的CLH队列用于自旋锁,JUC将其改造为阻塞锁)。和synchronized还有一点相同的是,就是当获取锁失败的时候,不是立即进行阻塞,而是先自旋一段时间看是否能获取锁,这对那些已经在阻塞队列里面的线程显然不公平(非公平锁的实现,公平锁通过有序队列强制线程顺序进行),但会极大的提升吞吐量。如果自旋还是获取失败了,则创建一个节点加入队列尾部,加入方法仍采用CAS操作,并发对队尾CAS操作有可能会发生失败,AQS是采用自旋循环的方法,知道CAS成功!下面我们来看下锁的实现细节!
锁的实现依赖与lock()方法,Lock()方法首先是调用acquire(int)方法,不管是公平锁还是非公平锁
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
Acquire()方法默认首先调用tryAcquire(int)方法,而此时公平锁和不公平锁的实现就不一样了。
1、Sync.NonfairSync.TryAcquire(非公平锁)
nonfairTryAcquire方法是lock方法间接调用的第一个方法,每次调用都会首先调用这个方法,我们来看下对应的实现代码:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } |
该方法首先会判断当前线程的状态,如果c==0 说明没有线程正在竞争锁。(反过来,如果c!=0则说明已经有其他线程已经拥有了锁)。如果c==0,则通过CAS将状态设置为acquires(独占锁的acquires为1),后续每次重入该锁都会+1,每次unlock都会-1,当数据为0时则释放锁资源。其中精妙的部分在于:并发访问时,有可能多个线程同时检测到c为0,此时执行compareAndSetState(0, acquires))设置,可以预见,如果当前线程CAS成功,则其他线程都不会再成功,也就默认当前线程获取了锁,直接作为running线程,很显然这个线程并没有进入等待队列。如果c!=0,首先判断获取锁的线程是不是当前线程,如果是当前线程,则表明为锁重入,继续+1,修改state的状态,此时并没有锁竞争,也非CAS,因此这段代码也非常漂亮的实现了偏向锁。
2、Sync.FairSync.TryAcquire(公平锁)
我们直接来看代码
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (isFirst(current) && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } |
和明细我们可以看出,公平锁就比不公平锁多了一个判断头结点的方法,就是采用此方法来保证锁的公平性。
3、AbstractQueuedSynchronizer.addWaiter
tryAcquire失败就意味着入队列了。此时AQS的队列中节点Node就开始发挥作用了。一般情况下AQS支持独占锁和共享锁,而独占锁在Node中就意味着条件(Condition)队列为空。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有两个常量
static final Node EXCLUSIVE = null; //独占节点模式
static final Node SHARED = new Node(); //共享节点模式 |
addWaiter(mode)中的mode就是节点模式,也就是共享锁还是独占锁模式。添加的节点是当前线程。(注:ReentrantLock是独占锁模式),我们来看下对应的实现代码:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } |
这块代码并不复杂,如果当前队尾存在元素(tail!=null),则通过CAS添加当前线程到队尾,如果队尾为空或者CAS失败,则通过enq方法设置tail。我们来看下enq的代码
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize Node h = new Node(); // Dummy header h.next = node; node.prev = h; if (compareAndSetHead(h)) { tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } |
该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是通过CAS把当前现在追加到队尾,并返回包装后的Node实例。
把线程要包装为Node对象的主要原因,除了用Node构造供虚拟队列外,还用Node包装了各种线程状态,这些状态被精心设计为一些数字值:
1)、 SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)
2)、 CANCELLED(1):因为超时或中断,该线程已经被取消
3)、 CONDITION(-2):表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞
4)、 PROPAGATE(-3):传播共享锁
5)、 0:0代表无状态
3、AbstractQueuedSynchronizer.acquireQueued(进行阻塞)
acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回。下面我们来看以下它对应的源码信息
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } } |
仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。我们来看下他的实现方法:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } |
如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中:
private static boolean shouldParkAfterFailedAcquire(Node pred, Nodenode) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } |
检查原则如下:
1、如果前继节点的waitStatus为signal,则说明前面的节点都还灭有获取到锁,此时当前线程需要阻塞,直接返回true
2、如果前继节点waitStatus>0,说明前继节点已经被取消,则重新设置当前节点的前继节点,返回false,之后无限循环直到第一步状态返回true,导致线程阻塞
3、如果前继节点waitStatus小于0而且不等于-1(signal),则通过CAS设置前继节点额外isignal,并返回false,之后无限循环直到步骤1返回true,线程阻塞。
请求锁不成功的线程会被挂起在acquireQueued方法的第12行,12行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第13行,即设置interrupted = true,之后又进入无限循环。
从无限循环的代码可以看出,并不是得到解锁的线程一定能获得锁,必须在第6行中调用tryAccquire重新竞争,非公平锁中有可能被新加入的线程获取到,从而导致刚刚被唤醒的线程再次阻塞;公平锁通过判断当前节点是否是头结点来保证锁的公平性。上面的代码我们还可以看到,因为每次第一个被解锁的是头结点,因此一般p==head的判断都会成功。解锁相对比较简单,主要体现在AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } |
这个逻辑也比较简单:
1.判断持有锁的线程是否是当前线程,如果不是就抛出IllegalMonitorStateExeception(),因为一个线程是不能释放另一个线程持有的锁(否则锁就失去了意义)。否则进行2。
2.将AQS状态位减少要释放的次数(对于独占锁而言总是1),如果剩余的状态位0(也就是没有线程持有锁),那么当前线程就是最后一个持有锁的线程,清空AQS持有锁的独占线程。进行3。
3.将剩余的状态位写回AQS,如果没有线程持有锁就返回true,否则就是false。
从上面我们可以知道,这里c==0决定了是否完全释放了锁。由于ReentrantLock是可重入锁,因此同一个线程可能多重持有锁,那么当且仅当最后一个持有锁的线程释放锁是才能将AQS中持有锁的独占线程清空,这样接下来的操作才需要唤醒下一个需要锁的AQS节点(Node),否则就只是减少锁持有的计数器,并不能改变其他操作。
当tryRelease操作成功后(也就是完全释放了锁),release操作才能检查是否需要唤醒下一个继任节点。这里的前提是AQS队列的头结点需要锁(waitStatus!=0),如果头结点需要锁,就开始检测下一个继任节点是否需要锁操作。
上文说道acquireQueued操作完成后(拿到了锁),会将当前持有锁的节点设为头结点,所以一旦头结点释放锁,那么就需要寻找头结点的下一个需要锁的继任节点,并唤醒它。我们来看下对应的实现代码:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } |
对比对应的代码我们可以看出,一旦头结点的后继结点被唤醒,那么后继结点就尝试去获取锁,如果获取成功就将头结点设置为自身,并将头结点的前任节点清空。
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } } |
对比lock,unlock是相当比较简单的,主要是释放需要响应的资源,并唤醒AQS队列中有效的后继结点,这样就试图以请求的顺序获取锁资源了。
对比公平锁和不公平锁,其实就是在获取锁的时候有区别,释放锁的时候都是一样的。非公平锁总是尝试看当前有没有线程持有锁,如果没有则使用现有的线程去抢占锁资源,但是一旦抢占失败,也就和公平锁一样,进入阻塞队列老老实实排队去了,也就是说公平锁和非公平锁只有在进入AQS的CLH队列之前有区别,后面都是按照队列的顺序请求锁资源的。
由锁衍生的下一个对象是条件变量,这个对象的存在很大程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像Object.wait 做的那样。
上述API说明表明条件变量需要与锁绑定,而且多个Condition需要绑定到同一锁上。前面的Lock中提到,获取一个条件变量的方法是Lock.newCondition()。
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。await对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。
每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。我们通过一个生产者消费者模型来看一下相关的实现!
package com.yhj.lock;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Described 生产者消费者模型 * @Author YHJ create at 2013-6-6 下午09:15:27 */ public class ProductQueue<T> {
private final T[] items; //队列存储区
private final Lock lock = new ReentrantLock(); //独占锁
private Condition notFull = lock.newCondition(); //条件
private Condition notEmpty = lock.newCondition();
private int head, tail, count; //下标
@SuppressWarnings("unchecked") public ProductQueue(int maxSize) { items = (T[]) new Object[maxSize]; }
/** * 默认10个元素 * @Constructors * @Author YHJ create at 2013-6-6 下午09:15:21 */ public ProductQueue() { this(10); }
/** * 放置数据 * @param t * @throws InterruptedException * @Author YHJ create at 2013-6-6 下午09:15:27 */ public void put(T t) throws InterruptedException { lock.lock(); try { while (count == getCapacity()) { notFull.await(); } items[tail] = t; if (++tail == getCapacity()) { tail = 0; } ++count; notEmpty.signalAll(); } finally { lock.unlock(); } }
/** * 取数据 * @return * @throws InterruptedException * @Author YHJ create at 2013-6-6 下午09:18:36 */ public T take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } T ret = items[head]; items[head] = null;//GC if (++head == getCapacity()) { head = 0; } --count; notFull.signalAll(); return ret; } finally { lock.unlock(); } }
/** * 获取容量(队列) * @return * @Author YHJ create at 2013-6-6 下午09:18:45 */ public int getCapacity() { return items.length; }
/** * 获取元素数目 * @return * @Author YHJ create at 2013-6-6 下午09:19:04 */ public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } }
} |
在这个例子中消费take()需要 队列不为空,如果为空就挂起(await()),直到收到notEmpty的信号;生产put()需要队列不满,如果满了就挂起(await()),直到收到notFull的信号。可能有人会问:如果一个线程lock()对象后被挂起还没有unlock,那么另外一个线程就拿不到锁了(lock()操作会挂起),那么就无法通知(notify)前一个线程,这样岂不是“死锁”了?
是这样子么?当然不是,如果是这样有这么大的问题,锁性能再好又有什么用呢?我们来看下await方法的代码:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode !=THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); } |
很显然,执行await方法的时候,首先将当前节点加入Condition队列,然后会做一次锁的释放(如果不释放其他线程就会等待而无法获取锁,进而更没有办法notify此条件,引发死锁),然后自旋尝试挂起当前线程(LockSupport.park(this);),直到有线程condition的signal来解除(被唤醒继续操作或被取消,如果被取消则直接剔除),如果被唤醒而且没有被取消的话,尝试重新进入锁获取的等待队列(acquireQueued(node, savedState)),尝试成功后从Condition队列中删除(再次拿到了之前的锁对象)!
这里再回头介绍Condition的数据结构。我们知道一个Condition可以在多个地方被await(),那么就需要一个FIFO的结构将这些Condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition内部就需要一个FIFO的队列。我们再结合前面提到的节点(Node)数据结构。我们就发现Node.nextWaiter就派上用场了!nextWaiter就是将一系列的Condition.await()串联起来组成一个FIFO的队列。所以当某一个节点被唤醒的时候,需要进行一次队列关系重建(unlinkCancelledWaiters())。
await()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await()中FIFO队列中第一个Node/全部Node唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待(acquireQueued)。我们来看下相关的代码实现:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } |
上面的代码很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int c = p.waitStatus; if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
上面就是唤醒一个await()线程的过程,根据前面介绍的,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)操作,将当前节点加入到AQS队列。
相关推荐
本文将深入浅出地探讨J.U.C的原理和实现机制,主要包括以下几个方面: 1. 原子操作:原子操作是并发编程中的基础,它能够保证线程安全的进行加减操作或其他简单的计算。Java中提供了Atomic类来支持原子操作,例如...
《深入浅出Java》这本书以其独特的讲解方式,旨在让学习者轻松掌握复杂的Java编程语言。"深入浅出"这一理念,意味着作者通过直观、生动的示例和丰富的图解,帮助读者逐步理解Java的核心概念和技术。 Java是一种广泛...
《深入浅出Java语言程序设计》这本书旨在帮助初学者和有一定经验的程序员深入理解Java语言的核心概念和技术,从而能够熟练地进行Java程序开发。 本书首先会从Java的基础知识入手,包括Java的安装与配置环境,解释...
《深入浅出 Java 虚拟机》是一本旨在帮助开发者深入理解Java虚拟机(Java Virtual Machine, JVM)的著作。JVM是Java语言的核心组成部分,它负责将编译后的字节码转换为机器可执行的指令,是Java平台的重要特性之一。...
本资源“深入浅出java虚拟机”旨在帮助开发者深入理解JVM的工作原理及其内在机制。下面将详细探讨JVM的主要组件、内存模型、类加载机制、垃圾收集、性能优化等多个方面。 1. **JVM结构** JVM主要由类装载器、运行...
从提供的信息来看,这份资料似乎是一本关于Java编程语言的深度教程——《深入浅出Java程序语言设计》的完整版PDF。尽管没有直接提供书籍的内容,但从标题、描述及标签中,我们可以推断出该书可能涵盖的一些核心知识...
《深入浅出JAVA》这本书是为那些希望深入了解Java编程语言的初学者和有一定经验的开发者量身打造的。书中的内容全面且深入,旨在帮助读者巩固基础,理解Java的核心概念,提升编程技能。 首先,书中的第一章通常会...
Java并发API在java.util.concurrent包下,提供了如ExecutorService、Future、Callable等高级工具,它们可以帮助我们更方便、更高效地管理线程。Executor框架允许我们创建线程池,这有助于减少线程创建和销毁的开销,...
### 深入浅出Java多线程程序设计 #### 知识点一:理解多线程机制 多线程是一种让程序中的多个指令流能够并发执行的机制,每个指令流被称为一个线程,它们之间相对独立。线程与进程不同,主要体现在资源分配和隔离...
综上所述,《深入浅出Java多线程.pdf》覆盖了Java多线程从基础知识到高级特性的各个方面,适合于想要深入理解Java多线程机制的开发人员阅读。无论是初学者还是有一定经验的开发者,都能从中获得宝贵的理论知识和实践...
### 深入浅出Java_Concurrency #### J.U.C的整体认识 Java的并发编程模型在J.U.C(`java.util.concurrent`)包中得到了全面的展现,这不仅仅是Java语言本身的一大亮点,更是多线程编程领域的重要组成部分。本文...
本文将深入探讨Java多线程中的`join()`方法,以及它在实际开发中的应用。 `join()`方法是Java线程同步的一种机制,主要用于控制线程的执行顺序。在主线程中调用某个子线程的`join()`方法,主线程会等待该子线程执行...
Java并发编程实践笔记 Java并发编程实践笔记是一份关于Java并发编程的实践笔记,涵盖了多种关于线程安全、并发编程的实践经验和原则。下面是从笔记中总结的知识点: 1. 保证线程安全的三种方法:不要跨线程访问...
这本书深入浅出地介绍了如何在Java环境中有效地管理和控制并发操作,从而提高程序的性能和可扩展性。 并发编程是现代软件开发中的核心技能之一,尤其是在多核处理器和分布式系统普及的今天。Java平台提供了丰富的...
在《Java并发编程实战》这本书中,作者深入浅出地介绍了Java 5.0和Java 6中新增的并发特性。这些特性旨在帮助开发者更高效、安全地编写多线程程序。书中通过实例解释了诸如`ExecutorService`、`Future`、`Callable`...
《Java并发编程实战》提供了深入浅出的指导,涵盖了从基础理论到高级技术的广泛内容。 第一部分介绍了并发编程的基础,包括线程安全性的概念,如何构建线程安全的类,以及Java平台提供的并发工具如线程、同步机制等...
这本书是Java并发领域的经典之作,深入浅出地讲解了Java平台上的并发编程原理和最佳实践。书中涵盖了线程安全、同步机制、并发工具类、并发设计模式等多个方面,帮助读者理解如何编写出高效、正确且可维护的并发程序...
在《深入浅出Java语言程序设计》中,你将接触到异常处理,这是Java中处理错误和异常情况的关键机制。学习如何使用try-catch-finally语句块来捕获并处理可能出现的错误,可以提高程序的健壮性。 此外,本书还将涵盖...
它不仅深入浅出地介绍了Java线程和并发的相关知识,还从实战角度出发,提供了丰富的代码示例和最佳实践,帮助读者快速掌握Java并发编程的核心技能。 《Java并发编程的艺术》是一本全面探讨Java并发编程技术的专业...
阿里大牛梁飞编写的《Java并发编程常识》PPT,深入浅出地讲解了这个主题,对开发者来说是一份宝贵的资源。 首先,我们来探讨Java并发编程的基础概念。并发是指多个执行单元(线程或进程)在同一时间间隔内同时进行...