`
jiangwenfeng762
  • 浏览: 288617 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

通过CountDownLatch来分析AbstractQueuedSynchronizer的源码

阅读更多

CountDownLatch:计数门闩,可以用来协调多个线程的协作,使用CountDownLatch的典型场景:某项工作需要多个线程共同来完成,并且其中一个线程(往往是主线程)需要等待其他线程都已经完成了自己的工作时才能继续进行,否则要等待。下面分析CountDownLatch源码:

 

package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * 构造函数,关键代码是构建了Sync对象
     *
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
	 * 如果门闩值>0,则当前线程等待。否则该方法立刻返回。
	 * 调用countDown方法会使门闩值减少。
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 如果门闩值>0,则当前线程等待,直到门闩值<=0|| timeout时间到。
	 * 否则该方法立刻返回。
	 * 调用countDown方法会使门闩值减少。
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 减少门闩值, 如果门闩值==0,则释放所有等待的线程。
     *
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

 CountDownLatch代码非常简单,关键方法已经加上了注释,CountDownLatch的核心逻辑都委托给了Sync这个静态内部类。Sync类继承了AbstractQueuedSynchronizer 。AbstractQueuedSynchronizer 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。通俗点说,用java的人都知道synchronized能够对一个需要确保线程安全的对象,方法实现多线程并发控制,这是在java语法层次的实现,而AbstractQueuedSynchronizer 则是在应用层次而不是语法层次(更高的层次)提供了实现多线程并发控制组件的基础框架,通过AbstractQueuedSynchronizer

我们可以根据自己的需要设计灵活的多线程并发控制组件,CountDownLatch就是这种组件的典型代表。AbstractQueuedSynchronizer 代码比较复杂,先由浅至深的开始学习,

首先看一下其类层次结构:




 可以看出AbstractQueuedSynchronizer的子类都是静态内部类(比如CountDownLatch.Sync),其javadoc已明确说明了具体用法:

* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. 

 

AbstractQueuedSynchronizer的子类都被定义成非公有内部帮助类,来实现附属类的同步策略。

 

AbstractQueuedSynchronizer的关键设计思想:

1. 使用等待队列来维护所有等待的线程,等待队列的具体实现是一个双向链表,链表的每个节点由AbstractQueuedSynchronizer.Node类体现。AbstractQueuedSynchronizer的私有成员head,tail是链表的头尾节点:

 

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

AbstractQueuedSynchronizer的等待队列是"CLH"队列的变体,"CLH"队列经常用来实现"自旋锁",而这里主要是借鉴"CLH"队列的思想来持有等待线程的控制信息。队列中的每个节点保存如下信息:

 

        /**
	 * 状态域,取值仅限于下面几个:
         *   SIGNAL:     
	 *				当前节点的后继节点被阻塞(通过park),所以当当前节点释放或者撤销的时候必须启动(unpark)它的后继节点。
	*				为了避免竞争,acquire的相关方法必须首先指出他们需要一个signal,然后不断地重试原子的acquire,如果失败就阻塞(block)。
		 
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened. 
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified only using
         * CAS.
         */
        volatile int waitStatus;

        /**
	*
         */
        volatile Node prev;

        /**
         * 
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;
 

 

 2. 使用int值来代表同步器的状态:

 

    /**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

3. 使用"自旋锁"的思想通过对volatile的类成员的安全访问来实现锁控制和管理

以CountDownLatch.await方法为例:

 

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 

 F3进去:

 

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

 F3进去:

 

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }
  • 大小: 69.3 KB
分享到:
评论

相关推荐

    Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)

    在本篇中,我们将深入分析AQS的条件队列,它是实现高级同步机制如`ReentrantLock`和`CountDownLatch`的关键部分。 条件队列是AQS中与`Condition`接口相关的部分,它允许线程在满足特定条件时等待,而不是简单地阻塞...

    Java并发系列之AbstractQueuedSynchronizer源码分析(概要分析)

    **Java并发系列之AbstractQueuedSynchronizer源码分析概要** **1. AbstractQueuedSynchronizer(AQS)的定义与作用** AbstractQueuedSynchronizer(AQS)是Java并发编程中的核心组件,它是一个抽象的、基于FIFO...

    Java并发系列之AbstractQueuedSynchronizer源码分析(独占模式)

    【Java并发系列之AbstractQueuedSynchronizer源码分析(独占模式)】 AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁...

    CountDownLatch源码解析之await()

    CountDownLatch源码解析之await() CountDownLatch是Java并发编程中常用的同步工具,通过await()方法可以让当前线程处于阻塞状态,直到锁存器计数为零(或者线程中断)。下面我们将详细解析CountDownLatch源码之...

    多线程countDownLatch方法介绍

    本文将深入探讨CountDownLatch的工作原理、使用场景以及相关源码分析。 CountDownLatch是一个计数器,初始化时设定一个初始值,通常表示一个任务的子任务数量。每个线程完成其工作后会调用`countDown()`方法,...

    Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)

    《Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)》 AbstractQueuedSynchronizer(AQS)是Java并发编程中一个重要的工具,它是Java并发包`java.util.concurrent.locks`中的核心抽象类,用于构建锁...

    Java并发系列之CountDownLatch源码分析

    Java并发系列之CountDownLatch源码分析 CountDownLatch是一种非常有用的工具类,用于拦截一个或多个线程,使其在某个条件成熟后再执行。它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的...

    AQS源码分析 (1).pdf

    接下来,我们来具体分析一下AQS的源码。AQS中定义了一个名为state的volatile变量,用于表示同步状态。这个变量有三种操作方法:getstate()、setstate()和compareAndSetState(),分别用于获取、设置和原子性地更新...

    CountDownLatch源码解析之countDown()

    CountDownLatch源码解析之countDown()方法详解 ...通过对tryReleaseShared()方法和doReleaseShared()方法的实现原理的分析,我们可以更好地理解CountDownLatch的工作机制,并更好地应用于实际开发中。

    源码详解CountDownLatch

    在 CountDownLatch 的实现中,它依赖于 AbstractQueuedSynchronizer(简称 AQS)来管理内部状态和同步。AQS 是一个抽象类,提供了一套基于共享模式的同步框架,许多并发工具如 ReentrantLock、Semaphore 和 ...

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    Java并发 结合源码分析AQS原理

    Java并发结合源码分析AQS原理 Java并发编程中,AQS(AbstractQueuedSynchronizer)是一个核心组件,它提供了一个基于FIFO队列和状态...通过对AQS的深入了解和源码分析,我们可以更好地理解Java并发编程的机制和原理。

    7 AQS源码分析.docx

    总的来说,AQS通过维护状态和线程队列,实现了对资源的高效管理,使得开发者能够方便地构建各种锁和同步器。理解AQS的工作原理,有助于我们在编写高并发程序时,更好地利用Java提供的并发工具,提高系统的并行性和...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    通过以上分析,我们可以看到Java并发包提供了丰富的并发锁机制,从简单的synchronized关键字到复杂的AQS、LockSupport、Condition等,这些都是为了解决并发编程中的互斥和同步问题。在实际应用中,开发者需要根据...

    多线程与高并发编程笔记、源码等

    线程池是一种管理线程的机制,通过复用线程来减少创建和销毁线程的开销,提高系统的响应速度和吞吐量。 此外,资源可能还包含了对并发工具类(如CountDownLatch、CyclicBarrier、Semaphore等)的讲解,这些工具可以...

    Java互联网架构多线程并发编程原理及实战 视频教程 下载4.zip

    8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载2.zip

    8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载3.zip

    8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载.zip

    8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载1.zip

    8-1 CountDownLatch的使用及其源码探秘.mp4 8-2 CyclicBarrier的使用及其源码探秘.mp4 8-3 Semaphore的使用及其源码探秘.mp4 8-4 Exchanger的使用.mp4 9-1 为什么要使用线程池?.mp4 9-2 创建线程池及其使用....

Global site tag (gtag.js) - Google Analytics