`
Jen
  • 浏览: 57391 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

JUC代码浅析[6]——基于AQS的CyclicBarrier

    博客分类:
  • java
阅读更多

 

JUC代码浅析[6]——基于AQSCyclicBarrier

       CyclicBarrier是一种同步机制允许一组线程相互等待,等到所有线程都到达一个屏障点才退出await方法,它没有直接实现AQS而是借助ReentrantLock来实现的同步机制。它是可循环使用的,而CountDownLatch是一次性的,另外它体现的语义也跟CountDownLatch不同,CountDownLatch减少计数到达条件采用的是release方式,而CyclicBarrier走向屏障点(await)采用的是Acquire方式,Acquire是会阻塞的,这也实现了CyclicBarrier的另外一个特点,只要有一个线程中断那么屏障点就被打破,所有线程都将被唤醒(CyclicBarrier自己负责这部分实现,不是由AQS调度的),这样也避免了因为一个线程中断引起永远不能到达屏障点而导致其他线程一直等待。屏障点被打破的CyclicBarrier将不可再使用(会抛出BrokenBarrierException)除非执行reset操作。

 

下面是一个使用的例子

class Solver {

   final int N;

   final float[][] data;

   final CyclicBarrier barrier;

 

   class Worker implements Runnable {

     int myRow;

     Worker(int row) { myRow = row; }

     public void run() {

       while (!done()) {

         processRow(myRow);

 

         try {

           barrier.await();

         } catch (InterruptedException ex) {

           return;

         } catch (BrokenBarrierException ex) {

           return;

         }

       }

     }

   }

 

   public Solver(float[][] matrix) {

     data = matrix;

     N = matrix.length;

     barrier = new CyclicBarrier(N,

                                 new Runnable() {

                                   public void run() {

                                     mergeRows(...);

                                   }

                                 });

     for (int i = 0; i < N; ++i)

       new Thread(new Worker(i)).start();

 

     waitUntilDone();

   }

 }

 

       这个例子的场景是每个Worker线程处理完一行数据后,等待其他线程到达屏障点。当所有线程都到达屏障点时进行一次合并操作(mergeRows方法),然后每个Worker线程继续处理下一行数据.如此反复。

 

       下面的代码片段说明了如何实现这种机制,整个过程为

1首先检查屏障点有没有打破,如果打破了就直接抛出异常退出。

2检查当前线程是否被中断了,如果中断了就打破屏障点,并抛出异常退出。

3检查当前线程是否最后一个到达屏障点,如果是的话就设置下一轮屏障,并唤醒所有线程,如果需要的话执行barrierCommand,退出。

4.以上都不满足的话,循环等待在trip这个condition上。如果设置了等待超时时间,那么只要有一个线程等待超时后就会打破屏障点唤醒所有线程,并抛出异常TimeoutException退出,其他线程可能会抛出BrokenBarrierException异常。所以如果不设置超时时间并且最后一个线程由于某种原因不能到达屏障点,那么所有线程都一直死等待了,因此才需要上面几步的检查。

整个过程还有很多逻辑来处理执行屏障点任务引发的异常、park线程超时引发的中断异常和超时异常等

    private int dowait(boolean timed, long nanos)

        throws InterruptedException, BrokenBarrierException,

               TimeoutException {

        final ReentrantLock lock = this.lock;

        lock.lock();

        try {

            final Generation g = generation;

 

            if (g.broken)

                throw new BrokenBarrierException();

 

            if (Thread.interrupted()) {

                breakBarrier();

                throw new InterruptedException();

            }

 

           int index = --count;

           if (index == 0) {  // tripped

               boolean ranAction = false;

               try {

          final Runnable command = barrierCommand;

                   if (command != null)

                       command.run();

                   ranAction = true;

                   nextGeneration();

                   return 0;

               } finally {

                   if (!ranAction)

                       breakBarrier();

               }

           }

 

            // loop until tripped, broken, interrupted, or timed out

            for (;;) {

                try {

                    if (!timed)

                        trip.await();

                    else if (nanos > 0L)

                        nanos = trip.awaitNanos(nanos);

                } catch (InterruptedException ie) {

                    if (g == generation && ! g.broken) {

                        breakBarrier();

           throw ie;

           } else {

           // We're about to finish waiting even if we had not

           // been interrupted, so this interrupt is deemed to

           // "belong" to subsequent execution.

           Thread.currentThread().interrupt();

           }

                }

 

                if (g.broken)

                    throw new BrokenBarrierException();

 

                if (g != generation)

                    return index;

 

                if (timed && nanos <= 0L) {

                    breakBarrier();

                    throw new TimeoutException();

                }

            }

        } finally {

            lock.unlock();

        }

    }

 

CyclicBarrier对外API只有7个:

l  public CyclicBarrier(int parties) 创建一个CyclicBarrier,指定参与者数量

l  public CyclicBarrier(int parties, Runnable barrierAction)创建一 CyclicBarrier,指定参与者数量。并且指定所有参与者到达屏障点之后需要执行的动作。

l  public int await() throws InterruptedException, BrokenBarrierException在所 有参与者到达屏障点之前一直等待(也就是说等待所有参与者都调用了await方法才往下执行)

l  public int await(long timeout, TimeUnit unit)

        throws InterruptedException,BrokenBarrierException,TimeoutException await()方法一样,只是指定了等待超时时间,具体实现请看上面的分析

l  public boolean isBroken()屏障点是否被打破

l  public void reset()重置屏障点(屏障点打破之后只能调用这个方法才可重新使用)

l  public int getNumberWaiting()处于等待状态的参与者的个数,也就是调用了await方法的参与者 个数

 

分享到:
评论

相关推荐

    狂神说JUC代码狂神说JUC代码

    【狂神说JUC代码】是一系列专注于Java并发编程(JUC,Java Util Concurrency)的教程或笔记,旨在帮助开发者深入理解并掌握Java平台上的并发处理机制。JUC是Java标准库中的一组高级并发工具类,为多线程环境下的程序...

    Java 多线程与并发(10-26)-JUC锁- 锁核心类AQS详解.pdf

    AQS是一个抽象的队列同步器,它基于FIFO(先进先出)队列来管理线程的排队和获取共享资源。它是实现Java并发包中锁和其他同步器的基础框架,例如ReentrantLock(可重入锁)、Semaphore(信号量)、CountDownLatch...

    java并发编程专题(九)----(JUC)浅析CyclicBarrier

    Java 并发编程专题(九)----(JUC)浅析 CyclicBarrier CyclicBarrier 是 Java 并发编程中的一个同步辅助工具,它允许一组线程全部等待彼此到达公共屏障点。它的字面意思是可循环使用的屏障,用于让一组线程到达一...

    juc aqs java

    juc 的aqs介绍。

    个人学习JUC代码笔记总集

    这个压缩包文件“个人学习JUC代码笔记总集”显然是一个个人的学习资源,记录了对JUC组件的理解和应用实例,特别适合已经有一定Java基础,想要深入学习并发编程的开发者。 JUC的主要目标是简化并发编程,提高多线程...

    JUC代码收集,java高并发多线程学习

    本资源"JUC代码收集,java高并发多线程学习"显然是一个专注于探讨和学习JUC库的资料包。 JUC库包含多个子包,如`concurrent`, `atomic`, `locks`等,每个子包都有其特定的功能和用途: 1. **concurrent**:这是JUC...

    AQS和JUC知识点讲解

    《AQS和JUC知识点详解》 在Java并发编程领域,AbstractQueuedSynchronizer(AQS)和Java Util Concurrency(JUC)是两个至关重要的概念。它们为开发高效、线程安全的多线程程序提供了强大的工具。本文将深入解析这...

    java并发编程:juc、aqs

    AQS通过内部维护一个基于链表的等待队列,有效地管理线程的同步和唤醒,从而实现锁和其他同步原语。 **AQS核心特性:** 1. **资源管理**:AQS定义了两种资源获取方式:独占和共享。独占模式下,只有一个线程能访问...

    JUC AQS的加解锁.pdf

    Java的并发编程是多线程和多任务处理的核心技术之一,而在Java并...开发者可以基于AQS创建复杂的同步结构,满足各种并发场景的需求。理解AQS的工作原理,对于使用Java并发包以及开发高性能的多线程应用具有重要的意义。

    JUC(一)-AQS源码分析

    为了学习JUC,AQS是基础中的基础,所以我们首先深入了解下AQS。 一、锁的介绍 为了了解AQS的源码,我们需要先大概下锁中的一些功能 1.1 乐观锁/悲观锁 乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同...

    这就是标题—— JUC.pdf

    JUC是什么 线程 进程 / 线程 线程状态 wait / sleep 并发 / 并行 Lock 使用Lock锁 可重入锁 公平锁 / 非公平锁 Synchronized / Lock 线程通讯 wait()、notify()和notifyAll() 虚假唤醒 Condition 定制化通信 多线程...

    Java——JUC

    Java并发编程是Java开发中的重要领域,而Java并发工具包(Java Concurrency Utility,简称JUC)则是Java标准库提供的一套强大而丰富的工具,它极大地简化了多线程环境下的编程工作。JUC主要包含在`java.util....

    juc学习代码。。。。

    在这个"juc学习代码"的资源中,我们很显然会接触到Java并发编程的核心概念和实践。 首先,JUC库中的`java.util.concurrent`包包含了大量并发工具类,如`Semaphore`(信号量)、`CyclicBarrier`(循环屏障)、`...

    JUC核心类AQS的底层原理

    AQS作为Java并发工具包(JUC)中的一个核心抽象类,其设计目的是为了实现各种同步器(如锁、信号量等)。AQS主要通过三个核心组成部分来实现这些同步组件的功能: 1. **State变量及其CAS操作**:AQS维护了一个名为`...

    JUC代码演示 Java多线程并发

    除了`synchronized`之外,Java还提供了更灵活的锁机制——`Lock`接口。`Lock`接口提供了比`synchronized`更强大的锁定机制,并且实现了更加精细的线程控制。 在示例中,使用了`ReentrantLock`类来实现自定义的锁: ...

    juc并发编程脑图以及相关示例代码

    juc并发编程脑图以及相关示例代码

    尚硅谷大厂必备技术之JUC并发编程基础视频 配套资料(自己根据视频整理课件,和代码)

    【尚硅谷】大厂必备技术之JUC并发编程视频 配套资料,自己根据视频整理 pdf 课件,和代码 视频地址:...

    JUC AQS(AbstractQueuedSynchronizer)

    ReentrantLock Lock 加锁过程源码分析图,AQS 源码分析

    JUC2019.6V1.5.zip

    juc笔记:这些知识点是怎么积累进来的呢,下面我以JUC的学习过程为例子来讲解我的知识框架图记录过程, 首先我们知道,JUC就是java.util.concurrent包,俗称java并发包,那首先我们要知道java并发包是用来干嘛 的...

    Java volatile与AQS锁内存可见性

    从JUC中的AQS引入,讲解Java volatile与AQS锁内存可见性

Global site tag (gtag.js) - Google Analytics