`
BrokenDreams
  • 浏览: 253993 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100161
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(11)-CyclicBarrier

阅读更多

Jdk1.6 JUC源码解析(11)-CyclicBarrier

作者:大飞

 

功能简介:
  • CyclicBarrier是一种可重复使用的栅栏机制,可以让一组线程在某个点上相互等待,这个点就可以类比为栅栏。并且这个栅栏是可重复使用的,这点可以和前面分析过的CountDownLatch做对比,CountDownLatch只能用一次。
  • CyclicBarrier还支持在所有线程到达栅栏之后,在所有线程从等待状态转到可运行状态之前,执行一个命令(或者说是动作)。
  • 当然,在某些情况下,栅栏可以被打破。比如某个线程无法在规定的时间内到达栅栏。
源码分析:
  • 先看下CyclicBarrier内部的结构:
public class CyclicBarrier {
    /**
     * 每次对栅栏的使用可以表示为一个generation。栅栏每次开放或者重置,
     * generation都会发生改变。使用栅栏的线程可以关联多个generations,
     * 由于等待线程可能会以多种方式请求锁,但是在特定的时间只有一个是 
     * 可用的,其他的要么被打破,要么开放。
     * 如果一个栅栏已经被打破。且没有后续的重置动作,那么可以不存在可 
     * 用的generation。
     */
    private static class Generation {
        boolean broken = false;
    }
    /** 用于保护栅栏的锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 栅栏开放的条件 */
    private final Condition trip = lock.newCondition();
    /** 表示当前使用栅栏的使用方(线程)数量 */
    private final int parties;
    /* 当栅栏开放时,要使用的命令(动作) */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
    /**
     * 处于等待状态的使用方(线程)的数量,在每一个generation上从 
     * parties递减为0。当新建generation(栅栏开放)或者栅栏被打破 
     * 时,重置为parties。 
     */
    private int count;

       从上述的结构细节中可见,CyclicBarrier内部使用ReentrantLock来实现,并包含一个trip条件,来作为栅栏模拟栅栏的行为(所有使用方都在这个条件上等待)。

 

  • 具体使用栅栏时,各个线程会在要互相等待的地方调用一个"等待"方法,然后在这个方法处等待。当所有线程都到达次方法时,栅栏打开,所有线程从等待出继续执行。接下来就从这个"等待"方法入手开始分析:
    /**
     * 线程调用此方法后等待,直到所有parties都调用当前barrier的这个方法。
     *
     * 如果当前线程是不最后一个到达此方法的线程,那么会阻塞,直到下面的 
     * 事情发生:
     * 
     * 最后的线程也到达此方法。
     * 其他线程中断了当前线程。
     * 其他线程中断了在栅栏处等待的某个线程。
     * 某个线程调用了栅栏的reset方法。
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
    /**
     * 和上面的方法相比,多了超时的情况。
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

 

       看下上述等待方法中调用的dowait方法:

 

    /**
     * 栅栏主体代码,涵盖了所有情况。
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //获取当前的generation实例。
            final Generation g = generation;
            
            if (g.broken) //如果当前generation状态为broken,说明栅栏被打破,抛出BrokenBarrierException异常。
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                //如果当前线程被中断,打破栅栏,然后抛出中断异常。
                breakBarrier();
                throw new InterruptedException();
            }
           //计算当前到达线程的下标。
           int index = --count;
           //下标为0表示当前线程为最后一个使用栅栏的线程。
           if (index == 0) {  // 栅栏开放。
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)//如果有栅栏命令,执行栅栏命令。
                       command.run();//看来栅栏的命令是由最后一个到达栅栏的线程执行。
                   ranAction = true;
                   //产生新的generation。
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction) //如果栅栏命令未执行,打破栅栏。
                       breakBarrier();
               }
           }
            // 等待中的主循环,直到栅栏开放、栅栏被打破、线程被打断或者超时时退出。
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        //如果出于当前generation 且generation状态为未打破,那么打破栅栏。
                        breakBarrier();
                        throw ie;
                    } else {
                        // 如果没被中断的话,我们即将完成等待。
                        // 所以这个中断被算作下一次执行的中断。
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)//如果generation改变了,说明之前的栅栏已经开放,返回index。
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();//如果超时,打破栅栏,并返回超时异常。
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

 

       从dowait方法中可以看到,当所有使用者都到达时,栅栏开放,会调用nextGeneration方法;如果有其他情况(超时、中断等)发生,会调用breakBarrier方法。先看下nextGeneration:

    /**
     * 更新栅栏状态,唤醒所有在栅栏处等待的线程。
     * 这个方法只有在持有锁的情况下被调用。
     */
    private void nextGeneration() {
        //唤醒所有在栅栏处等待的线程。
        trip.signalAll();
        //重置count。
        count = parties;
        //产生新的generation。
        generation = new Generation();
    }

 

       再看下breakBarrier方法: 

    /**
     * 设置当前栅栏generation状态为打破状态,并唤醒栅栏处的等待线程。
     * 这个方法只有在持有锁的情况下被调用。
     */
    private void breakBarrier() {
        //设置"打破"状态。
        generation.broken = true;
        //重置count。
        count = parties;
        //唤醒所有在栅栏处等待的线程。
        trip.signalAll();
    }

 

       dowait内部为栅栏的主要逻辑,经过上面的分析,应该很清晰了。最后看下其他的方法:

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public int getParties() {
        return parties;
    }

    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //重置逻辑,先打破当前的栅栏,然后建立一个新的。
            breakBarrier();
            nextGeneration(); 
        } finally {
            lock.unlock();
        }
    }
    /**
     * 获取在栅栏处等待的使用方(线程)数量。
     */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

 

 
       小总结一下:
              1.当建立一个使用方数量为n的栅栏时,栅栏内部有一个为n的计数。当使用方调用await方法时,如果其他n-1个使用方没有全部到达await方法(内部计数减1后,不等于0),那么使用方(线程)阻塞等待。

              2.当第n个使用方调用await时,栅栏开放(内部计数减1后等于0),会唤醒所有在await方法上等待着的使用方(线程),大家一起通过栅栏,然后重置栅栏(内部计数又变成n),栅栏变成新建后的状态,可以再次使用。 

 

       CyclicBarrier的代码解析完毕!
 
    
分享到:
评论

相关推荐

    JavaCommon:Java基础用法,集合,线程,JUC,jdk5--8各个版本特性。

    该项目还深入解析了从JDK 5到JDK 8各版本的重要特性,为开发者提供了丰富的代码示例和源码分析。 1. **Java基础用法**: - **变量与数据类型**:Java支持基本数据类型如int、float、char等,以及引用类型如类、...

    JDK1.8:JDK1.8源码解析-源码解析

    在深入探讨JDK1.8源码解析之前,先理解一下JDK的含义:Java Development Kit,即Java开发工具包,是Java编程语言的核心组成部分,提供了编写、编译和运行Java应用程序所需的所有工具。JDK1.8是Oracle公司发布的Java ...

    A-JUC-JVM-Java并发知识..pdf

    JUC的同步器包括CountDownLatch、CyclicBarrier、Semaphore等,它们是为了解决多个线程协作完成特定任务的工具。例如,CountDownLatch可以让一些线程等待其他线程完成操作后再继续执行,而CyclicBarrier则让一组线程...

    thread_analysis:JDK中JUC学习记录

    本篇将深入解析JDK中的JUC包,探讨其核心组件和机制,帮助开发者更好地理解和利用这些高级并发工具。 一、线程池ExecutorService ExecutorService是JUC包中的核心接口,它扩展了Executor接口,提供了管理和控制线程...

    javajdk8源码-concurrencyJava:基于JDK8源码学习并发编程知识

    总之,这份"javajdk8源码-concurrencyJava"的学习资料会带你深入理解JDK 8中并发编程的各种机制和工具,通过源码解析,你可以更好地掌握Java并发编程的精髓,从而编写出高效、安全的多线程应用。对于系统开源领域的...

    艾编程coding老师:JUC 并发编程 + 底层原理.pdf

    JUC(java.util.concurrent)是Java提供的一个并发编程工具包...要掌握JUC并发编程及其底层原理,除了通过阅读官方文档和源码学习外,还需要大量实践和经验积累,才能够真正理解和应用JUC中的并发工具来解决实际问题。

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    JAVA面试题大全

    - CountDownLatch、CyclicBarrier、Semaphore的用途。 - Atomic类的理解和使用,如AtomicInteger、AtomicReference。 13. **Spring框架** - Spring的核心模块和依赖注入(DI)的概念。 - AOP(面向切面编程)的...

    Java面试宝典2018-最全面试资料

    - **JUC并发工具类**:CountDownLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor的使用。 - **Maven或Gradle**:构建工具的使用,配置管理,依赖管理。 - **单元测试**:JUnit、Mockito等工具的使用。 ...

    最新JAVA经典面试题(有答案)

    - 动态代理的理解,包括JDK动态代理和CGLIB动态代理。 7. **设计模式**: - 了解常见的设计模式,如单例、工厂、观察者、装饰者、适配器、代理等,并能结合实际场景应用。 8. **JVM**: - 垃圾回收机制(GC)的...

    JVM_JUC_Demo:练习案例

    在Java世界中,JVM(Java Virtual Machine)和JUC(Java Concurrency Utilities)是两个极其重要的概念。JVM是Java程序的运行平台,而JUC则是Java并发编程的重要工具集。这个名为“JVM_JUC_Demo”的实践案例旨在帮助...

    汪文君高并发编程实战视频资源全集

     高并发编程第三阶段11讲 AtomicXXXFieldUpdater源码分析及使用场景分析.mp4  高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4  高并发编程第三阶段13讲 一个JNI程序的编写,通过...

    汪文君高并发编程实战视频资源下载.txt

     高并发编程第三阶段11讲 AtomicXXXFieldUpdater源码分析及使用场景分析.mp4  高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4  高并发编程第三阶段13讲 一个JNI程序的编写,通过...

    jdkLearning:阅读java原始代码包含:集合,JUC,Executor体系

    这个"jdkLearning"项目专注于深入理解JDK的源代码,特别是集合、并发编程(JUC)和Executor服务这三个核心领域。通过阅读和分析这些源码,开发者可以更深入地了解Java平台的工作原理,提升编程技能,并优化应用程序...

    java并发编程经典书籍(英文版)

    - **并发集合**:详述了JUC(Java Util Concurrency)库,包括ArrayList、LinkedList、HashMap等线程安全的改进版本,以及CopyOnWriteArrayList、ConcurrentHashMap等高效并发集合。 - **原子类**:如...

    Java基础笔记(包括底层原理)

    JUC(Java Concurrency Utilities)提供了高级同步工具,如Semaphore、CyclicBarrier和CountDownLatch,以及并发容器如ConcurrentHashMap,提升了多线程编程的便利性和性能。 总的来说,Java基础知识覆盖了从简单的...

    资深程序员的Java面试题总结汇总.pdf

    13. JUC中的并发工具包括Semaphore(信号量)、CyclicBarrier(回环栅栏)、CountDownLatch(计数器门闩)等。 14. ReadWriteLock提供了读写锁分离,提高并发性能;StampedLock提供了更细粒度的锁控制。 15. 通过...

    安琪拉教百里守约学并发编程之多线程基础

    此外,Java的并发工具包JUC(Java Concurrency Utilities)提供了丰富的类和接口,如`ExecutorService`和`ThreadPoolExecutor`用于管理和调度线程,`CountDownLatch`和`CyclicBarrier`用于线程间的协作,`Future`和`...

Global site tag (gtag.js) - Google Analytics