`

java栅栏--CyclicBarrier

阅读更多

CyclicBarrier介绍

 

java栅栏--CyclicBarrier,直接翻译为循环屏障。其作用与前面讲的闭锁CountDownLatch有些类似,可以构造一个可循环利用的“屏障”,在一组线程执行任务到达指定位置之前 阻塞已经到达的线程,等所有线程都到达指定位置后,同时依次唤醒所有线程。

 

CountDownLatch也可以阻塞一组线程,但在使用上与CyclicBarrier有所区别。CountDownLatch可以实现阻塞一组线程等待另一组线程执行完成,这里可以有两组线程;而CyclicBarrier只是阻塞一组线程,另外它还可以重复使用,并且还允许所有线程到达指定位置后 先执行一段方法,再依次唤醒这组线程继续继续,这些都是CountDownLatch所不具备的。

 

下面还是以讲解CountDownLatch时,使用的多人准备游戏为例,简单暂时下CyclicBarrier的用法:

public class CyclicBarrierTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
 
        CyclicBarrier playersCounter = new CyclicBarrier(10, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有玩家准备就绪,开始游戏!");
            }
        });
        System.out.println("等待玩家加入游戏");
 
        for (int i=0;i<20;i++){
            executorService.execute(new Player(playersCounter,i+""));
        }
 
        playersCounter.isBroken();
 
    }
}
 
 
//多线程操作线程不安全容器 ThreadSafe.datas
class Player implements Runnable{
    private CyclicBarrier playersCounter;
    private String name;
 
    public Player(CyclicBarrier playersCounter,String name) {
        this.playersCounter = playersCounter;
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println("玩家:"+name+"加入游戏");
        try {
            Thread.sleep(1000);
            playersCounter.await();
            System.out.println("玩家:"+name+"开始选英雄");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意与CountDownLatch版本的实现有三点区别:

1CountDownLatch是在主线程调用await阻塞(实际上可以在一组线程上await),而CyclicBarrier是在任务线程里await阻塞。

2CountDownLatch只能使用一次,也就是新建一个CountDownLatch对象对应10个玩家准备游戏,另外10个玩家准备时 又得创建一个新的CountDownLatch对象;而CyclicBarrier可以重复使用,这里可以看到20个玩家,分别被分成了两组开始游戏。

3、每10个玩家准备就绪后,在CyclicBarrier实现中可以先执行一段Runnablerun方法,再依次唤醒线程,这种处理方式在阶段性为题中非常有用。

 

在简单了解CyclicBarrier的使用方法之后,继续开始深入理解CyclicBarrier的实现原理。

 

CyclicBarrier实现原理

 

前面讲过CountDownLatch是基于AQS实现的;而CyclicBarrier是基于ReentrantLock重入锁实现的,当然ReentrantLock也是基于AQS实现的,非要说CyclicBarrier也是基于AQS实现的也不为过。

 

重要成员变量

CyclicBarrier定义了下列几个成员变量,其核心方法都会操作这几个成员变量。这里先列出来,结合具体方法使用到时 方便查看和理解。

 

    //可以理解为初始化时 需要阻塞的任务个数
    private final int parties;
    //剩余需要等待的任务个数,初始值为parties,直到为0时依次唤醒所有被阻塞的任务线程。
    private int count;
 
    //每次对“栅栏”的主要成员变量进行变更操作,都应该加锁
    private final ReentrantLock lock = new ReentrantLock();
    //用于阻塞和唤醒任务线程
   private final Condition trip = lock.newCondition();
 
    //在所有线程被唤醒前,需要执行的一个Runable对应的run方法
    private final Runnable barrierCommand;
    //用于表示“栅栏”当前的状态
    private Generation generation = new Generation();
 

 

其中只有countgeneration不是final的,也就是说其他几个成员变量初始化后是不允许修改的。

 

构造方法

CyclicBarrier有两个重载的构造方法,一个是不带Runnable参数,另一个带有Runnable参数。本质上都会调用带Runnable参数的构造方法进行实例化,这里只贴出带Runnable参数的构造方法实现:

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties; //为了实现复用,进行备份
        this.count = parties;//初始化,待阻塞的任务总数
        this.barrierCommand = barrierAction;//初始化
    }

 

构造方法很好理解,主要工作就是初始化三个重要的成员变量。

 

await方法

await方法是CyclicBarrier的核心方法,本质上调用的是dowait,一组线程的阻塞和唤醒工作都在这个方法中实现:

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
 
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //加锁
        try {
            final Generation g = generation;
 
            if (g.broken)
                throw new BrokenBarrierException();
            //有一个线程线程被中断,整个CyclicBarrier将不可用
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
 
            int index = --count; //待等待的任务数减1
            if (index == 0) {  // 如果待等待的任务数减至0,依次唤醒所有线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//唤醒前先执行Runnable对象的run方法
                    ranAction = true;
                    nextGeneration();//重置整个CyclicBarrier,方便下次重用
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
 
            //如果待等待的任务数大于0,进行线程阻塞,直到count为0时被唤醒
            for (;;) {
                try {
                    if (!timed)
                        trip.await();//阻塞当前线程
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//延时阻塞当前线程
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
 
                if (g.broken)//异常唤醒
                    throw new BrokenBarrierException();
 
                if (g != generation)//正常被唤醒,generation会被新建
                    return index;
 
                if (timed && nanos <= 0L) {//延迟阻塞时间到后唤醒
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
}
 

 

理解这个方法主要是理解两部分代码块:在count还没有到0之前,都会执行for循环着块代码,对线程进行阻塞;直到count变为0,执行if (index == 0)所在的代码块依次唤醒所有阻塞的线程,并对CyclicBarrier对象进行重置,方便继续使用。结合给出的注释,对await方法的实现应该就不难理解了。

 

另外await方法还有一个延时版本,主要是为了防止没有足够多线程调用await方法,count就不会减为0,线程就会被永久阻塞;如果使用延时版本的await方法就可以避免这个问题,时间到后,所有线程阻塞的线程都会被依次唤醒,并收到TimeoutException异常,避免被长时间阻塞。

 

isBroken方法

该方法主要用于判断当前CyclicBarrier对象的状态,如果状态为false,说明现在CyclicBarrier不可用,如果此时调用await方法 会直接收到一个BrokenBarrierException异常,见dowait方法。

 

reset()方法

该方法主要用于重置CyclicBarrier对象:

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   //先把状态置为不可用,并唤醒被阻塞的线程执行完成任务
            nextGeneration(); //重置CyclicBarrier对象
        } finally {
            lock.unlock();
        }
    }
 

 

getNumberWaiting()方法

该方法用于获取当前已经被阻塞的任务数:

public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count; //计算已被阻塞的任务数
        } finally {
            lock.unlock();
        }
    }
 

 

到这里CyclicBarrier的实现原理分析结束。

 

CyclicBarrier的不足

 

CyclicBarrier相对于CountDownLatch来说,在分段执行任务上进行了改进,可以重复使用。并且在每个分段完成时,可以执行一个Runnable对应的run方法 做一些业务处理,比如:统计汇总。使用CyclicBarrier实现分阶段汇总:



 

假设把任务分为三个阶段,每个阶段结束的汇总方法相同,使用CyclicBarrier可以很简单的实现,这个“汇总方法”其实就是CyclicBarrier构造方法的第二参数,Runnable对应的run方法。 但如果每个阶段的汇总方法如果不一样时,CyclicBarrier就显得束手无措,因为它只能定义一个Runnable参数。如果遇到这种场景可以使用java1.7中新增的Phaser

 

Phaser的名字就可以看出,它主要就是用于解决分阶段问题,比起CyclicBarrier来,Phaser可以在每个阶段结束后执行不同的操作,从分阶段的角度看PhaserCyclicBarrier的增强版。从CountDownLatchCyclicBarrier 再到Phaser是一个依次增强的过程,但又不能相互完全取到。关于Phaser这里就不再深入讨论,后面有时间再单独总结。

 

 

  • 大小: 10.2 KB
0
0
分享到:
评论

相关推荐

    Java-concurrency-master.zip

    7. **并发工具类**:`java.util.concurrent`包提供了丰富的并发工具,如`Semaphore`(信号量)、`CountDownLatch`(计数器)、`CyclicBarrier`(栅栏)和`Phaser`(同步屏障),用于控制并发流程。 8. **并发模式**...

    JAVA线程高级-线程按序交替执行

    - JUC包含了许多高级并发组件,如`Semaphore`信号量、`CyclicBarrier`回环栅栏、`CountDownLatch`倒计时器和`Exchanger`交换器等,它们可以帮助控制线程的执行顺序。 - `Semaphore`可以限制同时访问特定资源的线程...

    Java并发编程之栅栏(CyclicBarrier)实例介绍

    Java并发编程中的栅栏(CyclicBarrier)是一个同步辅助类,它允许一组线程等待彼此到达某个特定点,然后一起继续执行。这个特定点被称为屏障点。与闭锁(CountDownLatch)不同,闭锁通常是一次性的,而CyclicBarrier...

    CyclicBarrier的用法

    在Java多线程编程中,`CyclicBarrier`是一个非常重要的同步工具类,它允许一组线程等待其他线程到达某个屏障点后再一起继续执行。这个屏障点就是我们所说的“循环栅栏”,顾名思义,它就像一个旋转门,所有线程必须...

    backport-util-concurrent-Java50-3.1-src:我的项目只需要库文件

    此外,backport-util-concurrent还包括其他关键组件,如`Semaphore`(信号量)、`CyclicBarrier`(循环栅栏)、`Phaser`(屏障)等,它们都是协调多线程操作的强大工具。例如,Semaphore用于限制对资源的并发访问,...

    计算机后端-Java-Java高并发从入门到面试教程-容思路.zip

    接下来,我们要探讨Java并发工具类,如Semaphore信号量、CyclicBarrier回环栅栏、CountDownLatch倒计时器等,这些工具可以帮助我们更好地管理和协调并发任务。例如,Semaphore用于限制同时访问特定资源的线程数量,...

    java并发编程-AQS和JUC实战

    #### 六、CyclicBarrier 循环栅栏 - **概述**:`CyclicBarrier` 是一个让一组线程等待至某个状态点的同步工具类。 - **常用方法**: - `await()`:使所有线程等待至屏障点。 - `reset()`:重置屏障。 - **应用...

    java常见面试题---线程篇

    - `CyclicBarrier`:栅栏,使一组线程等待其他线程到达某个点后一起继续执行。 - `Semaphore`:信号量,限制同时访问特定资源的线程数量。 理解并熟练掌握这些知识点,对于解决Java并发问题和优化多线程应用至关...

    Java并发系列之CyclicBarrier源码分析

    Java并发系列之CyclicBarrier源码分析 CyclicBarrier是Java并发系列中的一种同步工具类,用于实现一组线程相互等待。当所有线程都到达某个屏障点后,再进行后续的操作。下面是对CyclicBarrier源码的详细分析。 ...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    PDF-Java7ConcurrencyCookbook-英文版.rar

    2. **并发工具类**:Java并发包(java.util.concurrent)提供了丰富的并发工具类,如Semaphore(信号量)、CyclicBarrier(循环栅栏)、CountDownLatch(倒计时锁)等。这些工具帮助开发者更好地控制线程间的同步和...

    java并发编程源码-Java-Programs-Source-Code:并发和并行编程,具有线程池,forkJoin,同步,信号量,Reen

    4. **同步机制**: Java提供了多种同步工具,如`synchronized`关键字、`Lock`接口(`ReentrantLock`、`公平锁`、`非公平锁`)、`Semaphore`(信号量)和`CyclicBarrier`(循环栅栏)。这些工具用于控制对共享资源的...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    Java线程间的通信----生产者消费者模型

    - `CyclicBarrier`循环栅栏:允许一组线程等待彼此到达某个点,然后一起继续。 - `CountDownLatch`计数器门锁:一次性释放所有等待线程,常用于初始化操作。 3. **生产者消费者模型的实现步骤**: - 创建一个...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool 进行...

    实战Java高并发程序设计第二版随书代码

    - **java.util.concurrent** 包:学习并发工具类,如Semaphore(信号量)、CountDownLatch(计数器)、CyclicBarrier(回环栅栏)、Exchanger(交换器)等,它们提供了高级并发控制手段。 3. **并发容器** - **...

    【IT十八掌徐培成】Java基础第08天-02.多线程-join-daemon-同步.zip

    `Semaphore`信号量控制对有限资源的访问,`CountDownLatch`计数器用于等待一组线程完成操作,`CyclicBarrier`循环栅栏允许一组线程等待彼此到达某个点后再继续执行,而`Phaser`是Java 7引入的更高级的同步工具。...

    java并发工具包详解

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    java并发包资源

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

Global site tag (gtag.js) - Google Analytics