在实际应用中,有时候需要多个线程同时工作以完成同一件事情,而且在完成过程中,往往会等待其他线程都完成某一阶段后再执行,等所有线程都到达某一个阶段后再统一执行。
JDK:
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
对于失败的同步尝试,CyclicBarrier 使用了一种快速失败的、要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么其他所有线程(甚至是那些尚未从以前的 await() 中恢复的线程)也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
JDK地址:
//构造方法摘要 CyclicBarrier(int parties) //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在每个 barrier 上执行预定义的操作。 CyclicBarrier(int parties, Runnable barrierAction) //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
Example1:
package tags; import java.util.Random; import java.util.concurrent.CyclicBarrier; /** * CyclicBarrier调用CyclicBarrier.await()进入等待的线程数, * 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。 * CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。 * CyclicBarrier初始时还可带一个Runnable的参数,此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。 */ public class CyclicBarrierTest { public static class ComponentThread implements Runnable { CyclicBarrier barrier;// 计数器 int ID; // 组件标识 int[] array; // 数据数组 // 构造方法 public ComponentThread(CyclicBarrier barrier, int[] array, int ID) { this.barrier = barrier; this.ID = ID; this.array = array; } public void run() { try { array[ID] = new Random().nextInt(100); System.out.println("Component " + ID + " generates: " + array[ID]); System.out.println("Component " + ID + " sleep"); // 在这里等待Barrier处 barrier.await(); System.out.println("Component " + ID + " awaked"); // 计算数据数组中的当前值和后续值 int result = array[ID] + array[ID + 1]; System.out.println("Component " + ID + " result: " + result); } catch (Exception ex) { } } } /** * 测试CyclicBarrier的用法 */ public static void testCyclicBarrier() { final int[] array = new int[3]; CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() { // 在所有线程都到达Barrier时执行 public void run() { System.out.println("testCyclicBarrier run"); array[2] = array[0] + array[1]; } }); // 启动线程 new Thread(new ComponentThread(barrier, array, 0)).start(); new Thread(new ComponentThread(barrier, array, 1)).start(); } public static void main(String[] args) { CyclicBarrierTest.testCyclicBarrier(); } }
结果:
Component 1 generates: 40
Component 0 generates: 8
Component 1 sleep
Component 0 sleep
testCyclicBarrier run
Component 0 awaked
Component 0 result: 48
Component 1 awaked
Component 1 result: 88
两个线程分别执行,互不影响 ,执行到barrier.await();时该线程进入等待状态,
当两个线程都执行到barrier.await();时,达到CyclicBarrier启动所需的阻塞线程数,进入到new CyclicBarrier(2, new Runnable()...)里面的方法, 执行完里面的方法后,等待的两个线程再次被唤醒,继续各自执行线程后面的语句。
Example2:
比如有几个旅行团需要途经深圳、广州、韶关、长沙最后到达武汉。旅行团中有自驾游的,有徒步的,有乘坐旅游大巴的;这些旅行团同时出发,并且每到一个目的地,都要等待其他旅行团到达此地后再同时出发,直到都到达终点站武汉。
这时候CyclicBarrier就可以派上用场。CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则就会等待。
package tags; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCyclicBarrier { // 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan private static int[] timeWalk = { 5, 8, 15}; // 自驾游 private static int[] timeSelf = { 1, 3, 4}; // 旅游大巴 private static int[] timeBus = { 2, 4, 6}; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(new Date()) + ": "; } static class Tour implements Runnable { private int[] times; private CyclicBarrier barrier; private String tourName; public Tour(CyclicBarrier barrier, String tourName, int[] times) { this.times = times; this.tourName = tourName; this.barrier = barrier; } public void run() { try { Thread.sleep(times[0] * 1000); //使用 times岔开各组时间,使其显示不同的now() System.out.println(now() + tourName + " Reached Shenzhen"); barrier.await(); Thread.sleep(times[1] * 1000); System.out.println(now() + tourName + " Reached Guangzhou"); barrier.await(); Thread.sleep(times[2] * 1000); System.out.println(now() + tourName + " Reached Shaoguan"); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } } } public static void main(String[] args) { // 三个旅行团 CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService exec = Executors.newFixedThreadPool(3); exec.submit(new Tour(barrier, "WalkTour", timeWalk)); exec.submit(new Tour(barrier, "SelfTour", timeSelf)); exec.submit(new Tour(barrier, "BusTour", timeBus)); exec.shutdown(); } }
结果:
15:13:11: SelfTour Reached Shenzhen
15:13:12: BusTour Reached Shenzhen
15:13:15: WalkTour Reached Shenzhen
15:13:18: SelfTour Reached Guangzhou
15:13:19: BusTour Reached Guangzhou
15:13:23: WalkTour Reached Guangzhou
15:13:27: SelfTour Reached Shaoguan
15:13:29: BusTour Reached Shaoguan
15:13:38: WalkTour Reached Shaoguan
例子来源:http://conkeyn.iteye.com/blog/546280(测试时稍有修改)
源码分析:
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); /** 达到的数量 */ private final int parties; /* 数量满足后执行的线程 */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); private static class Generation { boolean broken = false; } public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } private int dowait(boolean timed, long nanos) throws InterruptedException,BrokenBarrierException,TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // index为0时,线程数量达到屏障数量,执行barrierCommand,并唤醒所有线程 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //若未达到屏障数量,每一个线程进入时都阻塞在这 //此时会释放锁,以便下一线程进入 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch(InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } private void nextGeneration() { // 唤醒所有线程 trip.signalAll(); // set up next generation // 重置屏障数量,使其可循环使用 count = parties; generation = new Generation(); }
其实就是使用一个重入锁,一个Condition实现。
源码:
/* * CyclicBarrier可重复使用reset();可打断breakBarrier() */ public class CyclicBarrier { /** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which <tt>count</tt> applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); } /** * Returns the number of parties required to trip this barrier. * * @return the number of parties required to trip this barrier */ public int getParties() { return parties; } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * <ul> * <li>The last thread arrives; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, * then all other waiting threads will throw * {@link BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws BrokenBarrierException if <em>another</em> thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was * broken when {@code await} was called, or the barrier * action (if present) failed due an exception. */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier, or the specified waiting time elapses. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * <ul> * <li>The last thread arrives; or * <li>The specified timeout elapses; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the specified waiting time elapses then {@link TimeoutException} * is thrown. If the time is less than or equal to zero, the * method will not wait at all. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while * waiting, then all other waiting threads will throw {@link * BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @param timeout the time to wait for the barrier * @param unit the time unit of the timeout parameter * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the specified timeout elapses * @throws BrokenBarrierException if <em>another</em> thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was broken * when {@code await} was called, or the barrier action (if * present) failed due an exception */ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } /** * Queries if this barrier is in a broken state. * * @return {@code true} if one or more parties broke out of this * barrier due to interruption or timeout since * construction or the last reset, or a barrier action * failed due to an exception; {@code false} otherwise. */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /** * Returns the number of parties currently waiting at the barrier. * This method is primarily useful for debugging and assertions. * * @return the number of parties currently blocked in {@link #await} */ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
相关推荐
CyclicBarrier是Java并发包(java.util.concurrent)中一个重要的同步辅助类,它的主要作用在于协调多个线程之间的协作,使得这些线程能够一起到达一个公共的“集结点”(称为屏障点)。在多线程编程中,尤其是在...
- **屏障点动作**:`CyclicBarrier`允许指定当所有线程到达屏障点时执行的动作,而`CountDownLatch`没有这个功能。 ## 最佳实践与注意事项 1. **避免死锁**:在使用`CyclicBarrier`时,确保所有线程都能够正确地...
Java中的`CyclicBarrier`是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达一个公共的屏障点。这个屏障点就像一个交通信号灯,只有当所有的车辆(线程)都到达了交叉口,信号灯才会变为绿灯,允许它们...
CyclicBarrier 是 Java 并发编程中的一个同步辅助工具,它允许一组线程全部等待彼此到达公共屏障点。它的字面意思是可循环使用的屏障,用于让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会...
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用,其相当...
在Java并发库中,`java.util.concurrent.CyclicBarrier`是一个同步辅助类,它允许一组线程等待彼此到达一个公共屏障点。这个屏障点在所有线程都到达之后才会被释放,从而允许所有线程继续执行。`CyclicBarrier`这个...
`CyclicBarrier`则是一个循环屏障,它允许一组线程互相等待,直到达到一个公共屏障点。与`CountDownLatch`不同,`CyclicBarrier`可以重用,即屏障点到达后,屏障可以重新设置,让线程再次进入等待状态。 在`...
CyclicBarrier是一个同步辅助类,允许一组线程等待其他线程到达一个公共屏障点。在所有线程到达屏障点后,屏障会释放,所有线程继续执行。CyclicBarrier在多线程排序场景中起到了关键作用,确保所有线程完成排序后再...
CyclicBarrier是一个同步辅助类,它允许一组线程等待彼此到达一个公共屏障点(barrier)后再继续执行。"Cyclic"一词意味着这个屏障可以重用,不同于一次性的CountDownLatch。在CyclicBarrier中,程序员可以设置一个...
1. **CyclicBarrier**:`CyclicBarrier`是一个同步辅助类,允许一组线程等待彼此到达一个公共屏障点。在所有线程都到达屏障后,屏障点被重置,然后线程可以继续执行。例如,`TestCyclicBarrier`和`TestCyclicBarrier...
CyclicBarrier 是一种同步工具,允许一组线程互相等待,直到到达某个公共屏障点(common barrier point),然后才能继续执行。它有两个构造函数:CyclicBarrier(int parties) 和 CyclicBarrier(int parties, Runnable ...
CyclicBarrier是一个同步辅助类,它允许一组线程等待彼此到达一个公共屏障点,这与游戏中的所有玩家必须同时进入副本的设定相吻合。 CyclicBarrier的主要功能包括: 1. **初始化屏障**: CyclicBarrier在创建时需要...
2. `CyclicBarrier`:循环屏障允许一组线程等待其他线程到达一个公共屏障点后,所有线程一起继续执行。它常用于多线程协作场景,例如分布式计算中的分治策略。 3. `CountDownLatch`:倒计时门锁通常用于一次性事件...
- **答案:** CyclicBarrier是一种同步辅助类,用于使一组线程相互等待,直到到达某个公共屏障点。一旦所有线程到达该屏障点,它们就可以一起继续执行。CyclicBarrier与CountDownLatch的不同之处在于: - ...
CyclicBarrier允许一组线程等待彼此到达一个公共屏障点后一起继续执行,而CountDownLatch则允许一个或多个线程等待其他线程完成特定数量的操作。 5. **Phaser**:Java 7新增的Phaser是一种更灵活的同步机制,它可以...
`CyclicBarrier`允许一组线程等待彼此到达一个公共屏障点。当所有线程都到达屏障点后,屏障会释放,允许所有线程继续执行。这个类支持重用,即当所有线程再次到达屏障点时,可以再次“打开”屏障。常用于实现多线程...
CyclicBarrier是一个同步辅助类,允许多个线程等待彼此达到一个公共屏障点,之后再一起继续执行。 ##### Semaphore Semaphore是一个计数信号量,用于控制对共享资源的并发访问量。它通常用于限制可以访问某些资源...
7. ** CyclicBarrier 循环栅栏**:它允许一组线程等待彼此到达一个公共屏障点。在所有线程到达屏障点后,它们会一起继续执行。 8. ** CountDownLatch 计数器门**:它是一个一次性使用的同步辅助类,用于等待一组...
- `CyclicBarrier`:循环屏障,让一组线程等待彼此到达一个公共屏障点。 - `CountDownLatch`:计数器门锁,一次性使用的计数器,常用于启动依赖于其他任务完成的并发操作。 6. **原子类**:这些类提供了在不使用...
`CyclicBarrier`允许一组线程等待其他线程到达一个公共屏障点后一起继续执行,而`CountDownLatch`则是一个计数器,当计数器减至零时,所有等待的线程都可以继续执行。 7. **Semaphore信号量**:用于限制同时访问...