引言及简介
上一章我们介绍了独占式的同步组件ReentrantLock,今天我们先不忙着继续介绍其他Lock接口的实现类同步组件,先来看看JDK基于ReentrantLock同步组件以及Condition条件等待机制实现的一个同步辅助器CyclicBarrier。
CyclicBarrier是一个同步辅助工具,它允许一组线程相互等待达到一个公共的屏障(barrier)点。CyclicBarrier的字面意思可以叫做可循环(Cyclic)使用的屏障(Barrier),之所以说它是可循环(Cyclic)使用的屏障,是因为当所有相互等待的线程都释放之后(或者说都通过屏障之后),它是可以被重复使用的。另外,CyclicBarrier还可以让这一组相互等待的线程在都到达这个公共的屏障(barrier)之后,让最晚到达屏障的线程先去完成一件特殊指定的任务,在这项指定的任务完成之后,再让这一组线程接着往下执行(可能已经执行完成就意味着结束),包括最晚到达屏障的那一个线程也会继续往下执行自己本来的逻辑。
可能通过这样的描述,我们很难理解CyclicBarrier到底有什么作用,以我的理解可以简单的描述为:使用CyclicBarrier可以使一组线程在至少有其中一个线程没有就位之前都让已经就位的线程陷入等待,直到所有的线程都就位之后,大家再一起继续往下执行(如果已经执行完就意味着结束)。所谓的“就位”就相当于是一个屏障或者一扇门,只有当所有的参与者都来到这扇大门前,门才会开启让大家一起通过,在大家通过之后,这扇门又会关闭复原,使其能够被重复用于进行“就位”等待放行使用。作为可选的特殊任务就是在大家都就位之后,在门打开之前的这一个间隙之间,可以让最晚就位的参与者临时去完成一件特殊任务(谁让你来的最晚呢),在这件任务完成之后,大家再一起通过这扇门,然后该干嘛干嘛。
上图是我理解的CyclicBarrier,线程A,B,C花费不同的时间之后达到一个公共的屏障之后,再一起继续往后执行(当然也可以是后面没有逻辑需要执行而结束),在线程B达到之前,线程C已经等待了5秒,线程A已经等待了3秒,如果有指定一个在屏障点的特殊任务,那么在线程A,B,C通过屏障继续往下执行之前,将由线程B(因为它最晚到达)去完成该特殊任务。
使用示例
通过上面的简介,我相信应该对CyclicBarrier的作用有了了解,下面我们就使用几个示例来加深对其了解。在使用之前,我们先要了解一下CyclicBarrier的构造方法,它有两个构造方法:
public CyclicBarrier(int parties, Runnable barrierAction) { //接受一个在屏障点的特殊任务barrierAction if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //直接构造一个CyclicBarrier,它将使指定个数的参与者(线程)都到达屏障点之前相互等待 public CyclicBarrier(int parties) { this(parties, null); }
通过构造方法我们可以发现,在屏障点的特殊任务是可选的,该任务是一个实现了Runnable接口的实例,整形参数parties表示让多少个线程相互等待至屏障点。
示例一:假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情
public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N, new Runnable(){ @Override public void run() { System.out.println("由线程"+Thread.currentThread().getName()+"开始执行屏障点特殊任务"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { Thread.sleep((long)(Math.random() * 10000)); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); int index = cyclicBarrier.await(); System.out.println("所有线程写入完毕,当前线程"+Thread.currentThread().getName()+"(await返回:"+index+")继续处理其他任务..."); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } }
执行结果:
线程Thread-2正在写入数据... 线程Thread-1正在写入数据... 线程Thread-3正在写入数据... 线程Thread-0正在写入数据... 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-2写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 由线程Thread-1开始执行屏障点特殊任务 所有线程写入完毕,当前线程Thread-1(await返回:0)继续处理其他任务... 所有线程写入完毕,当前线程Thread-0(await返回:3)继续处理其他任务... 所有线程写入完毕,当前线程Thread-2(await返回:2)继续处理其他任务... 所有线程写入完毕,当前线程Thread-3(await返回:1)继续处理其他任务...
通过示例一可以看到,我们构造了四个相互等待的线程,在线程Thread-1没有写入数据完毕之前,其它三个最快的线程一直等待,当四个线程都到达屏障点时,让最晚执行完的线程Thread-1执行了屏障点的特殊任务,特殊任务执行完之后,四个线程再一起继续往后执行。并且最早达到的线程Thread-0的await()的返回值为3,往后依次递减,最晚达到屏障点的线程Thread-1的await()的返回值为0,
示例二:继续改造示例一,演示 CyclicBarrier的可重用性。
public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() { @Override public void run() { System.out.println("由线程"+Thread.currentThread().getName()+"开始执行屏障点特殊任务"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { int count = 2; try { while (count > 0) { count--; System.out.println("线程"+Thread.currentThread().getName()+"第"+(2-count)+"次正在写入数据..."); Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"第"+(2-count)+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } System.out.println("所有线程写入完毕,当前线程"+Thread.currentThread().getName()+"继续处理其他任务..."); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } }
执行结果:
线程Thread-0第1次正在写入数据... 线程Thread-2第1次正在写入数据... 线程Thread-3第1次正在写入数据... 线程Thread-1第1次正在写入数据... 线程Thread-3第1写入数据完毕,等待其他线程写入完毕 线程Thread-1第1写入数据完毕,等待其他线程写入完毕 线程Thread-0第1写入数据完毕,等待其他线程写入完毕 线程Thread-2第1写入数据完毕,等待其他线程写入完毕 由线程Thread-2开始执行屏障点特殊任务 线程Thread-2第2次正在写入数据... 线程Thread-0第2次正在写入数据... 线程Thread-1第2次正在写入数据... 线程Thread-3第2次正在写入数据... 线程Thread-3第2写入数据完毕,等待其他线程写入完毕 线程Thread-2第2写入数据完毕,等待其他线程写入完毕 线程Thread-1第2写入数据完毕,等待其他线程写入完毕 线程Thread-0第2写入数据完毕,等待其他线程写入完毕 由线程Thread-0开始执行屏障点特殊任务 所有线程写入完毕,当前线程Thread-0继续处理其他任务... 所有线程写入完毕,当前线程Thread-3继续处理其他任务... 所有线程写入完毕,当前线程Thread-2继续处理其他任务... 所有线程写入完毕,当前线程Thread-1继续处理其他任务...
在示例二中,我让这四个线程循环执行两次模拟数据写入操作,所以他们会遇到两次CyclicBarrier设定的屏障,再他们第一次通过屏障之后,屏障将会被复原,所以第二次遇到屏障的时候,依然会产生和第一次相同的效果,当然这四个线程具体的执行顺序几乎不会和第一次相同了,执行屏障点的特殊任务的线程还是由最晚达到屏障点的线程去完成,第一次是线程Thread-2,第二次是线程Thread-0.
通过以上的两个示例我们只能基本的了解CyclicBarrier循环屏障的简单使用,至于其深层次的其他特性,比如:万一存在线程在未到达屏障之前出现了未被捕获的异常,或者线程在执行屏障点的特殊任务的时候抛出了异常,那么又会对其他线程的执行有什么影响呢?这些问题我们稍后再来回答,虽然我们可以通过继续举例来得到验证,但是我还是想先看看CyclicBarrier的源码之后再进行说明,毕竟举例不可能把所有的情况都包含,通过源码分析才能得到最直接最有力的结论。
源码分析
首先我们看看CyclicBarrier的成员属性:
public class CyclicBarrier { private static class Generation { boolean broken = false; } //锁对象 private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); /** 参与者个数,构造方法参数之一 */ private final int parties; /* 屏障点特殊任务,构造方法参数之一 */ private final Runnable barrierCommand; //用于表示屏障状态 private Generation generation = new Generation(); //表示还未达到屏障点的线程个数,为0表示所有线程都达到了屏障点 private int count; .... }通过以上对成员属性的分析,可以发现,CyclicBarrier确实是基于ReentrantLock同步组件以及Condition条件等待机制实现的,另外,CyclicBarrier还存在一个私有的静态内部类Generation,它只有一个布尔型的broken属性,用于表示当前屏障是否被破坏,在默认情况下或者所有参与者都通过屏障之后屏障都是没有被破坏的。Generation其实也是实现CyclicBarrier能够被重复使用的关键。
接着分析CyclicBarrier最核心的方法,通过上面的示例可以发现,CyclicBarrier最核心的关系方法就是await()方法,所以我们直接看这个方法就可以了,当然CyclicBarrier内部还提供了带有超时时间的await()方法。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L);//第一个参数为false.超时时间为0,表示没有超时设定 } catch (TimeoutException toe) { throw new Error(toe); // 因为dowait()的时间参数为0, 所以这里永远不会抛出TimeoutException。 } } //带有超时时间的await()方法 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); //因为dowait()的时间参数由使用者传入,所以有可能会抛出TimeoutException异常}从表面上可以看到await()是可以抛出中断异常和BrokenBarrierException异常的,甚至await(long, TimeUnit)方法还可能抛出TimeoutException超时异常,他们都是调用的dowait()方法,其返回值是一个由dowait()方法返回的整形数值,通过Java Doc的描述,await()/dowait()方法的返回值代表当前线程到达屏障点的序列号,假设有5个线程,最先到达屏障点的线程该方法返回值的就是4,往后依次递减,最后一个达到屏障点的线程调用该方法的返回值就是0.
相关推荐
8. **CyclicBarrier和CountDownLatch**:这两个是同步辅助类。CyclicBarrier允许多个线程等待直到所有线程到达屏障点后一起继续;CountDownLatch则是一次性的计数器,线程调用countDown()方法,当计数到零时所有等待...
- **CyclicBarrier** 是同步辅助类,允许多个线程等待彼此到达一个屏障点后一起继续执行(thread-t053-jdk1.5-cyclicbarrier)。 - **SingleTaskThreadpool** 涉及到单任务线程池,可能是一种特定场景下的定制化...
4. **CountDownLatch**:这是一个一次性使用的同步辅助类,用于让一组线程等待其他线程完成操作。在批量处理中,主线程可能使用CountDownLatch来等待所有子线程完成任务,然后继续执行后续操作。 5. **...
Java并发包还提供了一系列的同步辅助类,比如CountDownLatch、CyclicBarrier、Semaphore等。这些类可以用来控制线程的执行流程,例如CountDownLatch用于让一个或多个线程等待直到在其他线程中的一组操作完成。 ### ...
在并发程序设计中,我们需要控制多个线程的执行流程,CountDownLatch是一种同步辅助类,允许一个或多个线程等待其他线程完成操作。CyclicBarrier是一种用于多线程协作的同步工具,允许一组线程相互等待直到所有线程...