CountDownLatch:计数门闩,可以用来协调多个线程的协作,使用CountDownLatch的典型场景:某项工作需要多个线程共同来完成,并且其中一个线程(往往是主线程)需要等待其他线程都已经完成了自己的工作时才能继续进行,否则要等待。下面分析CountDownLatch源码:
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
}
public boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* 构造函数,关键代码是构建了Sync对象
*
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 如果门闩值>0,则当前线程等待。否则该方法立刻返回。
* 调用countDown方法会使门闩值减少。
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 如果门闩值>0,则当前线程等待,直到门闩值<=0|| timeout时间到。
* 否则该方法立刻返回。
* 调用countDown方法会使门闩值减少。
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 减少门闩值, 如果门闩值==0,则释放所有等待的线程。
*
*/
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
CountDownLatch代码非常简单,关键方法已经加上了注释,CountDownLatch的核心逻辑都委托给了Sync这个静态内部类。Sync类继承了AbstractQueuedSynchronizer 。AbstractQueuedSynchronizer 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。通俗点说,用java的人都知道synchronized能够对一个需要确保线程安全的对象,方法实现多线程并发控制,这是在java语法层次的实现,而AbstractQueuedSynchronizer 则是在应用层次而不是语法层次(更高的层次)提供了实现多线程并发控制组件的基础框架,通过AbstractQueuedSynchronizer
我们可以根据自己的需要设计灵活的多线程并发控制组件,CountDownLatch就是这种组件的典型代表。AbstractQueuedSynchronizer 代码比较复杂,先由浅至深的开始学习,
首先看一下其类层次结构:
可以看出AbstractQueuedSynchronizer的子类都是静态内部类(比如CountDownLatch.Sync),其javadoc已明确说明了具体用法:
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class.
AbstractQueuedSynchronizer的子类都被定义成非公有内部帮助类,来实现附属类的同步策略。
AbstractQueuedSynchronizer的关键设计思想:
1. 使用等待队列来维护所有等待的线程,等待队列的具体实现是一个双向链表,链表的每个节点由AbstractQueuedSynchronizer.Node类体现。AbstractQueuedSynchronizer的私有成员head,tail是链表的头尾节点:
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
AbstractQueuedSynchronizer的等待队列是"CLH"队列的变体,"CLH"队列经常用来实现"自旋锁",而这里主要是借鉴"CLH"队列的思想来持有等待线程的控制信息。队列中的每个节点保存如下信息:
/**
* 状态域,取值仅限于下面几个:
* SIGNAL:
* 当前节点的后继节点被阻塞(通过park),所以当当前节点释放或者撤销的时候必须启动(unpark)它的后继节点。
* 为了避免竞争,acquire的相关方法必须首先指出他们需要一个signal,然后不断地重试原子的acquire,如果失败就阻塞(block)。
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified only using
* CAS.
*/
volatile int waitStatus;
/**
*
*/
volatile Node prev;
/**
*
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
2. 使用int值来代表同步器的状态:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
3. 使用"自旋锁"的思想通过对volatile的类成员的安全访问来实现锁控制和管理
以CountDownLatch.await方法为例:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
F3进去:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
F3进去:
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}
- 大小: 69.3 KB
分享到:
相关推荐
在本篇中,我们将深入分析AQS的条件队列,它是实现高级同步机制如`ReentrantLock`和`CountDownLatch`的关键部分。 条件队列是AQS中与`Condition`接口相关的部分,它允许线程在满足特定条件时等待,而不是简单地阻塞...
**Java并发系列之AbstractQueuedSynchronizer源码分析概要** **1. AbstractQueuedSynchronizer(AQS)的定义与作用** AbstractQueuedSynchronizer(AQS)是Java并发编程中的核心组件,它是一个抽象的、基于FIFO...
【Java并发系列之AbstractQueuedSynchronizer源码分析(独占模式)】 AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁...
CountDownLatch源码解析之await() CountDownLatch是Java并发编程中常用的同步工具,通过await()方法可以让当前线程处于阻塞状态,直到锁存器计数为零(或者线程中断)。下面我们将详细解析CountDownLatch源码之...
本文将深入探讨CountDownLatch的工作原理、使用场景以及相关源码分析。 CountDownLatch是一个计数器,初始化时设定一个初始值,通常表示一个任务的子任务数量。每个线程完成其工作后会调用`countDown()`方法,...
《Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)》 AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁...
Java并发系列之CountDownLatch源码分析 CountDownLatch是一种非常有用的工具类,用于拦截一个或多个线程,使其在某个条件成熟后再执行。它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的...
接下来,我们来具体分析一下AQS的源码。AQS中定义了一个名为state的volatile变量,用于表示同步状态。这个变量有三种操作方法:getstate()、setstate()和compareAndSetState(),分别用于获取、设置和原子性地更新...
CountDownLatch源码解析之countDown()方法详解 ...通过对tryReleaseShared()方法和doReleaseShared()方法的实现原理的分析,我们可以更好地理解CountDownLatch的工作机制,并更好地应用于实际开发中。
在 CountDownLatch 的实现中,它依赖于 AbstractQueuedSynchronizer(简称 AQS)来管理内部状态和同步。AQS 是一个抽象类,提供了一套基于共享模式的同步框架,许多并发工具如 ReentrantLock、Semaphore 和 ...
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
Java并发结合源码分析AQS原理 Java并发编程中,AQS(AbstractQueuedSynchronizer)是一个核心组件,它提供了一个基于FIFO队列和状态...通过对AQS的深入了解和源码分析,我们可以更好地理解Java并发编程的机制和原理。
总的来说,AQS通过维护状态和线程队列,实现了对资源的高效管理,使得开发者能够方便地构建各种锁和同步器。理解AQS的工作原理,有助于我们在编写高并发程序时,更好地利用Java提供的并发工具,提高系统的并行性和...
通过以上分析,我们可以看到Java并发包提供了丰富的并发锁机制,从简单的synchronized关键字到复杂的AQS、LockSupport、Condition等,这些都是为了解决并发编程中的互斥和同步问题。在实际应用中,开发者需要根据...
线程池是一种管理线程的机制,通过复用线程来减少创建和销毁线程的开销,提高系统的响应速度和吞吐量。 此外,资源可能还包含了对并发工具类(如CountDownLatch、CyclicBarrier、Semaphore等)的讲解,这些工具可以...
8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....
8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....
8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....
8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....
8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....