`

CountDownLatch源码解析

 
阅读更多

JUC 中倒数计数器 CountDownLatch 的使用与原理分析,当需要等待多个线程执行完毕后在做一件事情时候 CountDownLatch 是比调用线程的 join 方法更好的选择,CountDownLatch 与 线程的 join 方法区别是什么?

日常开发中经常会遇到需要在主线程中开启多线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景,它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的初始值必须大于0。另外它还提供了一个countDown方法来操作计数器的值,每调用一次countDown方法计数器都会减1,直到计数器的值减为0时就代表条件已成熟,所有因调用await方法而阻塞的线程都会被唤醒。这就是CountDownLatch的内部机制,看起来很简单,无非就是阻塞一部分线程让其在达到某个条件之后再执行。但是CountDownLatch的应用场景却比较广泛,只要你脑洞够大利用它就可以玩出各种花样。最常见的一个应用场景是开启多个线程同时执行某个任务,等到所有任务都执行完再统计汇总结果。



在CountDownLatch出现之前一般都是使用线程的join()方法来实现,但是join不够灵活,不能够满足不同场景的需求。接下来我们看看CountDownLatch的原理实现。

 

一.CountDownLatch原理探究

  从CountDownLatch的名字可以猜测内部应该有个计数器,并且这个计数器是递减的,下面就通过源码看看JDK开发组是何时初始化计数器,何时递减的,计数器变为 0 的时候做了什么操作,多个线程是如何通过计时器值实现同步的,首先我们先看看CountDownLatch内部结构,类图如下:



 https://img2.mukewang.com/5b40bcbf00018d8706320278.jpg

 

从类图可以知道CountDownLatch内部还是使用AQS实现的,通过下面构造函数初始化计数器的值,可知实际上是把计数器的值赋值给了AQS的state,也就是这里AQS的状态值来表示计数器值。

构造函数源码如下:

 

   public CountDownLatch(int count) {        
    if (count < 0) throw new IllegalArgumentException("count < 0");        
    this.sync = new Sync(count);
    }

   Sync(int count) {
       setState(count);
   }

 

接下来主要看一下CountDownLatch中几个重要的方法内部是如何调用AQS来实现功能的。

  1.void await()方法,当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:(1)当所有线程都调用了CountDownLatch对象的countDown方法后,

也就是说计时器值为 0 的时候。(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。接下来让我们看看await()方法内部是如何调用

AQS的方法的,源码如下:

 

//CountDownLatch的await()方法public void await() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}    
    //AQS的获取共享资源时候可被中断的方法
    public final void acquireSharedInterruptibly(int arg)throws InterruptedException {    //如果线程被中断则抛异常
    if (Thread.interrupted())         throw new InterruptedException();        //尝试看当前是否计数值为0,为0则直接返回,否者进入AQS的队列等待
    if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
} //sync类实现的AQS的接口
 protected int tryAcquireShared(int acquires) {       
    return (getState() == 0) ? 1 : -1;
 }

 

从上面代码可以看到await()方法委托sync调用了AQS的acquireSharedInterruptibly方法,该方法的特点是线程获取资源的时候可以被中断,并且获取到的资源是共享资源,这里为什么要调用AQS的这个方法,而不是调用独占锁的accquireInterruptibly方法呢?这是因为这里状态值需要的并不是非 0 即 1 的效果,而是和初始化时候指定的计数器值有关系,比如你初始化的时候计数器值为 8 ,那么state的值应该就有 0 到 8 的状态,而不是只有  0  和  1 的独占效果。

  这里await()方法调用acquireSharedInterruptibly的时候传递的是 1 ,就是说明要获取一个资源,而这里计数器值是资源总数,也就是意味着是让总的资源数减 1 ,acquireSharedInterruptibly内部首先判断如果当前线程被中断了则抛出异常,否则调用sync实现的tryAcquireShared方法看当前状态值(计数器值)是否为 0  ,是则当前线程的await()方法直接返回,否则调用AQS的doAcquireSharedInterruptibly让当前线程阻塞。另外调用tryAcquireShared的方法仅仅是检查当前状态值是不是为 0 ,并没有调用CAS让当前状态值减去 1 。

 

  2.boolean await(long timeout, TimeUnit unit),当线程调用了 CountDownLatch 对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回: (1)当所有线程都调用了 CountDownLatch 对象的 countDown 方法后,也就是计时器值为 0 的时候,这时候返回 true; (2) 设置的 timeout 时间到了,因为超时而返回 false; (3)其它线程调用了当前线程的 interrupt()方法中断了当前线程,当前线程会抛出 InterruptedException 异常后返回。源码如下:

 

public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {        
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

   3.void countDown() 当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为 0 则会唤醒所有调用await 方法而被阻塞的线程,否则什么都不做,接下来看一下countDown()方法内部是如何调用AQS的方法的,源码如下:

 

 

   //CountDownLatch的countDown()方法
    public void countDown() {       //委托sync调用AQS的方法
        sync.releaseShared(1);
    }   //AQS的方法
    public final boolean releaseShared(int arg) {        //调用sync实现的tryReleaseShared
        if (tryReleaseShared(arg)) {            //AQS的释放资源方法            doReleaseShared();            return true;
        }        return false;
    }

 

如上面代码可以知道CountDownLatch的countDown()方法是委托sync调用了AQS的releaseShared方法,后者调用了sync 实现的AQS的tryReleaseShared,源码如下:


 

//syn的方法protected boolean tryReleaseShared(int releases) {  //循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一并更新到state
  for (;;) {      int c = getState();      //如果当前状态值为0则直接返回(1)
      if (c == 0)          return false;      //CAS设置计数值减一(2)
      int nextc = c-1;      if (compareAndSetState(c, nextc))          return nextc == 0;
  }
}

 

 

 

如上代码可以看到首先获取当前状态值(计数器值),代码(1)如果当前状态值为 0 则直接返回 false ,则countDown()方法直接返回;否则执行代码(2)使用CAS设置计数器减一,CAS失败则循环重试,否则如果当前计数器为 0 则返回 true 。返回 true 后,说明当前线程是最后一个调用countDown()方法的线程,那么该线程除了让计数器减一外,还需要唤醒调用CountDownLatch的await 方法而被阻塞的线程。这里的代码(1)貌似是多余的,其实不然,之所以添加代码 (1) 是为了防止计数器值为 0 后,其他线程又调用了countDown方法,如果没有代码(1),状态值就会变成负数。

 

  4.long getCount() 获取当前计数器的值,也就是 AQS 的 state 的值,一般在 debug 测试时候使用,源码如下:

 

public long getCount() {     
   return sync.getCount();
}
int getCount() {     
   return getState();
}

 

 

 

如上代码可知内部还是调用了 AQS 的 getState 方法来获取 state 的值(计数器当前值)。

 

到目前为止原理理解的差不多了,接下来用一个例子进行讲解CountDownLatch的用法,例子如下:

 

package com.hjc;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;/**
 * Created by cong on 2018/7/6. */public class CountDownLatchTest {    

    private static AtomicInteger id = new AtomicInteger();    // 创建一个CountDownLatch实例,管理计数为ThreadNum
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(3);    
    public static void main(String[] args) throws InterruptedException {

        Thread threadOne = new Thread(new Runnable() {

            @Override            public void run() {                
           try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {                    
                    // TODO Auto-generated catch block                    
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
                countDownLatch.countDown();
            }
        });

        Thread threadTwo = new Thread(new Runnable() {

            @Override            public void run() {                
           try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {                    
                    // TODO Auto-generated catch block                    
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
                countDownLatch.countDown();

            }
        });

        Thread threadThree = new Thread(new Runnable() {

            @Override            public void run() {                
           try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {                    
                    // TODO Auto-generated catch block                    
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
                countDownLatch.countDown();

            }
        });        // 启动子线程        threadOne.start();
        threadTwo.start();
        threadThree.start();
        System.out.println("等待斗地主玩家进场");        // 等待子线程执行完毕,返回
        countDownLatch.await();

        System.out.println("斗地主玩家已经满人,开始发牌.....");

    }
}


 

 

 

运行结果如下:

https://img1.mukewang.com/5b40bcaf00019b4a11560278.jpg

 

如上代码,创建了一个 CountDownLatch 实例,因为有两个子线程所以构造函数参数传递为 3,主线程调用 countDownLatch.await()方法后会被阻塞。子线程执行完毕后调用 countDownLatch.countDown() 方法让 countDownLatch 内部的计数器减一,等所有子线程执行完毕调用 countDown()后计数器会变为 0,这时候主线程的 await()才会返回。

 

如果把上面的代码中Thread.sleep和countDownLatch.await()的代码注释掉,运行几遍,运行结果就可能会出现如下结果,如下图:

 


https://img2.mukewang.com/5b40bca4000189a210060262.jpg

 

可以看到在注释掉latch.await()这行之后,就不能保证在所有玩家入场后才开始发牌了。

 

总结:CountDownLatch 与 join 方法的区别,一个区别是调用一个子线程的 join()方法后,该线程会一直被阻塞直到该线程运行完毕,而 CountDownLatch 则使用计数器允许子线程运行完毕或者运行中时候递减计数,也就是 CountDownLatch 可以在子线程运行任何时候让 await 方法返回而不一定必须等到线程结束;另外使用线程池来管理线程时候一般都是直接添加 Runable 到线程池这时候就没有办法在调用线程的 join 方法了,countDownLatch 相比 Join 方法让我们对线程同步有更灵活的控制。



 

分享到:
评论

相关推荐

    CountDownLatch源码解析之await()

    CountDownLatch源码解析之await() CountDownLatch是Java并发编程中常用的同步工具,通过await()方法可以让当前线程处于阻塞状态,直到锁存器计数为零(或者线程中断)。下面我们将详细解析CountDownLatch源码之...

    CountDownLatch源码解析之countDown()

    CountDownLatch源码解析之countDown()方法详解 CountDownLatch是Java并发编程中的一种同步工具,用于控制线程的执行顺序。其中,countDown()方法是CountDownLatch的核心方法之一,负责减少计数器的值,并在计数器值...

    多线程countDownLatch方法介绍

    **三、源码解析** 1. `countDown()`方法: ```java public void countDown() { sync.countDown(); } protected final void countDown() { doAcquireSharedInterruptibly(1); } ``` `countDown()`调用`sync....

    《Java并发编程高阶技术-高性能并发框架源码解析与实战》学习.zip

    《Java并发编程高阶技术-高性能并发框架源码解析与实战》是一本深入探讨Java并发编程的书籍,旨在帮助读者掌握高性能并发框架的核心原理,并通过源码解析与实战演练提升技术水平。在Java的世界里,并发编程是提升...

    countdownlatch-example-sourcecode.zip

    6. **源码解析** CountDownLatch的实现基于`java.util.concurrent.atomic.AtomicInteger`,通过原子操作保证线程安全。`await()`方法利用了`java.util.concurrent.locks.Condition`来阻塞和唤醒线程,确保线程间的...

    Java工程师成长之路,包含JDK源码解析、Java并发编程、JVM实例解析、SpringCloud以及各类中间件代码实例与教程

    源码解析可以帮助我们理解Java的核心机制,如垃圾回收、类加载、线程管理等,这对于优化代码性能、排查问题至关重要。深入学习JDK源码可以提升对语言特性的理解和应用,比如深入探讨Collections框架的实现,理解...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    本文将基于JDK源码解析Java领域中的并发锁,探讨AQS基础同步器、LockSupport、Condition接口、Lock接口、ReadWriteLock接口以及自定义API操作的设计与实现。 一、AQS(AbstractQueuedSynchronizer)基础同步器的...

    JDK1.8:JDK1.8源码解析-源码解析

    在深入探讨JDK1.8源码解析之前,先理解一下JDK的含义:Java Development Kit,即Java开发工具包,是Java编程语言的核心组成部分,提供了编写、编译和运行Java应用程序所需的所有工具。JDK1.8是Oracle公司发布的Java ...

    基于JDK源码解析Java领域中的并发锁,我们需要特别关注哪些内容?

    基于JDK源码解析Java并发锁,我们需要关注以下几个关键知识点: 1. **AQS(AbstractQueuedSynchronizer)基础同步器**: AQS是一个用于构建锁和同步器的框架,它维护了一个FIFO的等待队列,提供了两种模式:独占和...

    系统解析JDK源码,领略大牛设计思想,JAVA进阶必备(2023新课,已完结)

    2. **并发编程**:Java并发API如ExecutorService、Semaphore、CountDownLatch等,其源码解析可以帮助我们理解线程池的工作流程、同步原语的实现,加深对并发控制的理解。 3. **IO/NIO/BIO**:Java的输入输出系统是...

    java源码api-java-:JAVA常用API源码解析

    在"java--master"这个项目中,很可能是包含了对以上这些Java API的源码分析和实例解析,这对于深入学习Java编程和提高代码阅读能力非常有帮助。通过研究源码,可以理解Java库的内部工作原理,从而更好地运用到实际...

    28个java常用的类库源码

    源码解析能帮助你理解日期时间计算的复杂性。 6. **反射**: java.lang.reflect包中的类(如Class、Constructor、Method)用于运行时获取类的信息和动态调用方法。源码分析有助于理解Java的元数据机制。 7. **...

    JDK1.6源码

    源码解析有助于掌握这些并发工具类的实现原理,从而编写更高效的多线程程序。 4. **I/O流** JDK 1.6对I/O流进行了改进,引入了NIO(Non-blocking I/O)框架,提供了一种异步数据传输方式。通过分析源码,可以学习...

    jdk:jdk源码阅读

    了解HotSpot VM的内存结构(堆、栈、方法区等)、垃圾收集机制(如分代收集、可达性分析)、类加载过程(加载、验证、准备、解析、初始化)等,能帮助我们优化程序性能。 2. **集合框架**:包括ArrayList、...

    java常用类库源码

    9. **字符串处理**:String类是Java中最常用的类之一,源码解析可以帮助我们理解字符串的不可变性、substring、replace等方法的实现。 10. **枚举和注解**:Java枚举提供了一种安全的常量表示,而注解则提供了一种...

    javaconcurrent源码-java7-source-code:Java7源码/Concurrency同步

    核心类库源码解析 请直接查看.java : 通过JavaDoc+Test书写 ,方便链接到源码 Tracker 20181014 Java11正式发布并作为新的长期支持版本, 未来的应用会逐步迁移到Java11. 因此Java8以前的API参考价值越来越弱, 因此本...

    Java1.6源码

    `String`类是不可变字符串的实现,其源码解析可以帮助理解字符串的拼接、比较等操作。 2. **集合框架**:Java 1.6的集合框架在`java.util`包中,包括`List`、`Set`、`Map`接口及其实现类如`ArrayList`、`LinkedList...

    java1234:j2se源码2

    4. 第四十讲:可能涉及线程和并发编程,如线程的创建、同步机制(synchronized关键字、wait/notify方法)、并发工具类(Semaphore、CountDownLatch)的源码解析。 5. 第十七讲:这部分可能回到了基础,讨论了基本...

    葛一鸣 实战java高并发程序设计 源码

    《葛一鸣实战Java高并发程序设计》源码解析 在Java编程领域,高并发程序设计是一项关键且复杂的技能,涉及到多线程、同步机制、并发控制、性能优化等多个方面。该资源“葛一鸣实战Java高并发程序设计”提供了一套...

    java源码笔记

    5. **Java源码解析** - 核心库分析:深入JDK源码,如Collections、Stream、反射等模块,了解其内部实现机制。 - 设计模式:学习Java源码中常见的设计模式,如单例、工厂、观察者等,提升代码质量。 - 内存模型:...

Global site tag (gtag.js) - Google Analytics