package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class CountDownLatch {
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
int getCount() {
return getState();
public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
public boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
private final Sync sync;
* 构造函数,关键代码是构建了Sync对象
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
* 如果门闩值>0,则当前线程等待。否则该方法立刻返回。
* 调用countDown方法会使门闩值减少。
public void await() throws InterruptedException {
* 如果门闩值>0,则当前线程等待,直到门闩值<=0|| timeout时间到。
* 否则该方法立刻返回。
* 调用countDown方法会使门闩值减少。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
* 减少门闩值, 如果门闩值==0,则释放所有等待的线程。
public void countDown() {
public long getCount() {
return sync.getCount();
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
CountDownLatch代码非常简单,关键方法已经加上了注释,CountDownLatch的核心逻辑都委托给了Sync这个静态内部类。Sync类继承了AbstractQueuedSynchronizer 。AbstractQueuedSynchronizer 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。通俗点说,用java的人都知道synchronized能够对一个需要确保线程安全的对象,方法实现多线程并发控制,这是在java语法层次的实现,而AbstractQueuedSynchronizer 则是在应用层次而不是语法层次(更高的层次)提供了实现多线程并发控制组件的基础框架,通过AbstractQueuedSynchronizer
我们可以根据自己的需要设计灵活的多线程并发控制组件,CountDownLatch就是这种组件的典型代表。AbstractQueuedSynchronizer 代码比较复杂,先由浅至深的开始学习,

* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class.
1. 使用等待队列来维护所有等待的线程,等待队列的具体实现是一个双向链表,链表的每个节点由AbstractQueuedSynchronizer.Node类体现。AbstractQueuedSynchronizer的私有成员head,tail是链表的头尾节点:
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
private transient volatile Node head;
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
private transient volatile Node tail;
* 状态域,取值仅限于下面几个:
* 当前节点的后继节点被阻塞(通过park),所以当当前节点释放或者撤销的时候必须启动(unpark)它的后继节点。
* 为了避免竞争,acquire的相关方法必须首先指出他们需要一个signal,然后不断地重试原子的acquire,如果失败就阻塞(block)。
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified only using
* CAS.
volatile int waitStatus;
volatile Node prev;
volatile Node next;
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
volatile Thread thread;
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
Node nextWaiter;
2. 使用int值来代表同步器的状态:
* The synchronization state.
private volatile int state;
* Returns the current value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> read.
* @return current state value
protected final int getState() {
return state;
* Sets the value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> write.
* @param newState the new state value
protected final void setState(int newState) {
state = newState;
3. 使用"自旋锁"的思想通过对volatile的类成员的安全访问来实现锁控制和管理
public void await() throws InterruptedException {
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
* Acquires in shared interruptible mode.
* @param arg the acquire argument
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); = null; // help GC
if (shouldParkAfterFailedAcquire(p, node) &&
} catch (RuntimeException ex) {
throw ex;
// Arrive here only if interrupted
throw new InterruptedException();

