`

Java并发编程: CyclicBarrier让多线程齐步走

 
阅读更多

以前在<<编写高质量代码-改善Java程序的151个建议>>一书中看到有一节的标题是“CyclicBarrier让多线程齐步走”,觉得这标题挺不错的,所以在写这篇博文的时候也采用了这个名字。 

本文首先会介绍CyclicBarrier辅助工具类,其次将用CyclicBarrier工具类来完成一个实例,最后将给出CyclicBarrier和CountDownLatch的几点比较。 

之前关于CountDownLatch的博文,请参考如下链接: 
Java并发编程: 使用CountDownLatch协调子线程 - 
http://mouselearnjava.iteye.com/blog/1915438 

1. CyclicBarrier工具类介绍。 
CyclicBarrier是一个同步辅助工具类,它允许一组线程相互等待,直到到达一个公共的栏栅点。CyclicBarriers对于那些包含一组固定大小线程,并且这些线程必须不时地相互等待的程序非常有用。之所以将其称之为循环的Barrier是因为该Barrier在等待的线程释放之后可以重用。 
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。 

上面的介绍来自于CyclicBarrier类的注释。 

Java代码  收藏代码
  1. /** 
  2.  * A synchronization aid that allows a set of threads to all wait for 
  3.  * each other to reach a common barrier point.  CyclicBarriers are 
  4.  * useful in programs involving a fixed sized party of threads that 
  5.  * must occasionally wait for each other. The barrier is called 
  6.  * [i]cyclic[/i] because it can be re-used after the waiting threads 
  7.  * are released. 
  8.  * 
  9.  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command 
  10.  * that is run once per barrier point, after the last thread in the party 
  11.  * arrives, but before any threads are released. 
  12.  * This [i]barrier action[/i] is useful 
  13.  * for updating shared-state before any of the parties continue. 
  14.  */  




CyclicBarrier采用ConditionLock来完成线程之间的同步。相关的类图是CyclicBarrier类内容如下: 


 

Java代码  收藏代码
  1. /* 
  2.  * @(#)CyclicBarrier.java   1.12 06/03/30 
  3.  * 
  4.  * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 
  5.  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. 
  6.  */  
  7.   
  8. package java.util.concurrent;  
  9. import java.util.concurrent.locks.*;  
  10.   
  11. /** 
  12.  * A synchronization aid that allows a set of threads to all wait for 
  13.  * each other to reach a common barrier point.  CyclicBarriers are 
  14.  * useful in programs involving a fixed sized party of threads that 
  15.  * must occasionally wait for each other. The barrier is called 
  16.  * [i]cyclic[/i] because it can be re-used after the waiting threads 
  17.  * are released. 
  18.  * 
  19.  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command 
  20.  * that is run once per barrier point, after the last thread in the party 
  21.  * arrives, but before any threads are released. 
  22.  * This [i]barrier action[/i] is useful 
  23.  * for updating shared-state before any of the parties continue. 
  24.  * 
  25.  * <p><b>Sample usage:</b> Here is an example of 
  26.  *  using a barrier in a parallel decomposition design: 
  27.  * <pre> 
  28.  * class Solver { 
  29.  *   final int N; 
  30.  *   final float[][] data; 
  31.  *   final CyclicBarrier barrier; 
  32.  * 
  33.  *   class Worker implements Runnable { 
  34.  *     int myRow; 
  35.  *     Worker(int row) { myRow = row; } 
  36.  *     public void run() { 
  37.  *       while (!done()) { 
  38.  *         processRow(myRow); 
  39.  * 
  40.  *         try { 
  41.  *           barrier.await(); 
  42.  *         } catch (InterruptedException ex) { 
  43.  *           return; 
  44.  *         } catch (BrokenBarrierException ex) { 
  45.  *           return; 
  46.  *         } 
  47.  *       } 
  48.  *     } 
  49.  *   } 
  50.  * 
  51.  *   public Solver(float[][] matrix) { 
  52.  *     data = matrix; 
  53.  *     N = matrix.length; 
  54.  *     barrier = new CyclicBarrier(N, 
  55.  *                                 new Runnable() { 
  56.  *                                   public void run() { 
  57.  *                                     mergeRows(...); 
  58.  *                                   } 
  59.  *                                 }); 
  60.  *     for (int i = 0; i < N; ++i) 
  61.  *       new Thread(new Worker(i)).start(); 
  62.  * 
  63.  *     waitUntilDone(); 
  64.  *   } 
  65.  * } 
  66.  * </pre> 
  67.  * Here, each worker thread processes a row of the matrix then waits at the 
  68.  * barrier until all rows have been processed. When all rows are processed 
  69.  * the supplied {@link Runnable} barrier action is executed and merges the 
  70.  * rows. If the merger 
  71.  * determines that a solution has been found then <tt>done()</tt> will return 
  72.  * <tt>true</tt> and each worker will terminate. 
  73.  * 
  74.  * <p>If the barrier action does not rely on the parties being suspended when 
  75.  * it is executed, then any of the threads in the party could execute that 
  76.  * action when it is released. To facilitate this, each invocation of 
  77.  * {@link #await} returns the arrival index of that thread at the barrier. 
  78.  * You can then choose which thread should execute the barrier action, for 
  79.  * example: 
  80.  * <pre>  if (barrier.await() == 0) { 
  81.  *     // log the completion of this iteration 
  82.  *   }</pre> 
  83.  * 
  84.  * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model 
  85.  * for failed synchronization attempts: If a thread leaves a barrier 
  86.  * point prematurely because of interruption, failure, or timeout, all 
  87.  * other threads waiting at that barrier point will also leave 
  88.  * abnormally via {@link BrokenBarrierException} (or 
  89.  * {@link InterruptedException} if they too were interrupted at about 
  90.  * the same time). 
  91.  * 
  92.  * <p>Memory consistency effects: Actions in a thread prior to calling 
  93.  * {@code await()} 
  94.  * [url=package-summary.html#MemoryVisibility]<i>happen-before</i>[/url] 
  95.  * actions that are part of the barrier action, which in turn 
  96.  * <i>happen-before</i> actions following a successful return from the 
  97.  * corresponding {@code await()} in other threads. 
  98.  * 
  99.  * @since 1.5 
  100.  * @see CountDownLatch 
  101.  * 
  102.  * @author Doug Lea 
  103.  */  
  104. public class CyclicBarrier {  
  105.     /** 
  106.      * Each use of the barrier is represented as a generation instance. 
  107.      * The generation changes whenever the barrier is tripped, or 
  108.      * is reset. There can be many generations associated with threads 
  109.      * using the barrier - due to the non-deterministic way the lock 
  110.      * may be allocated to waiting threads - but only one of these 
  111.      * can be active at a time (the one to which <tt>count</tt> applies) 
  112.      * and all the rest are either broken or tripped. 
  113.      * There need not be an active generation if there has been a break 
  114.      * but no subsequent reset. 
  115.      */  
  116.     private static class Generation {  
  117.         boolean broken = false;  
  118.     }  
  119.   
  120.     /** The lock for guarding barrier entry */  
  121.     private final ReentrantLock lock = new ReentrantLock();  
  122.     /** Condition to wait on until tripped */  
  123.     private final Condition trip = lock.newCondition();  
  124.     /** The number of parties */  
  125.     private final int parties;  
  126.     /* The command to run when tripped */  
  127.     private final Runnable barrierCommand;  
  128.     /** The current generation */  
  129.     private Generation generation = new Generation();  
  130.   
  131.     /** 
  132.      * Number of parties still waiting. Counts down from parties to 0 
  133.      * on each generation.  It is reset to parties on each new 
  134.      * generation or when broken. 
  135.      */  
  136.     private int count;  
  137.   
  138.     /** 
  139.      * Updates state on barrier trip and wakes up everyone. 
  140.      * Called only while holding lock. 
  141.      */  
  142.     private void nextGeneration() {  
  143.         // signal completion of last generation  
  144.         trip.signalAll();  
  145.         // set up next generation  
  146.         count = parties;  
  147.         generation = new Generation();  
  148.     }  
  149.   
  150.     /** 
  151.      * Sets current barrier generation as broken and wakes up everyone. 
  152.      * Called only while holding lock. 
  153.      */  
  154.     private void breakBarrier() {  
  155.         generation.broken = true;  
  156.     count = parties;  
  157.         trip.signalAll();  
  158.     }  
  159.   
  160.     /** 
  161.      * Main barrier code, covering the various policies. 
  162.      */  
  163.     private int dowait(boolean timed, long nanos)  
  164.         throws InterruptedException, BrokenBarrierException,  
  165.                TimeoutException {  
  166.         final ReentrantLock lock = this.lock;  
  167.         lock.lock();  
  168.         try {  
  169.             final Generation g = generation;  
  170.   
  171.             if (g.broken)  
  172.                 throw new BrokenBarrierException();  
  173.   
  174.             if (Thread.interrupted()) {  
  175.                 breakBarrier();  
  176.                 throw new InterruptedException();  
  177.             }  
  178.   
  179.            int index = --count;  
  180.            if (index == 0) {  // tripped  
  181.                boolean ranAction = false;  
  182.                try {  
  183.            final Runnable command = barrierCommand;  
  184.                    if (command != null)  
  185.                        command.run();  
  186.                    ranAction = true;  
  187.                    nextGeneration();  
  188.                    return 0;  
  189.                } finally {  
  190.                    if (!ranAction)  
  191.                        breakBarrier();  
  192.                }  
  193.            }  
  194.   
  195.             // loop until tripped, broken, interrupted, or timed out  
  196.             for (;;) {  
  197.                 try {  
  198.                     if (!timed)  
  199.                         trip.await();  
  200.                     else if (nanos > 0L)  
  201.                         nanos = trip.awaitNanos(nanos);  
  202.                 } catch (InterruptedException ie) {  
  203.                     if (g == generation && ! g.broken) {  
  204.                         breakBarrier();  
  205.             throw ie;  
  206.             } else {  
  207.             // We're about to finish waiting even if we had not  
  208.             // been interrupted, so this interrupt is deemed to  
  209.             // "belong" to subsequent execution.  
  210.             Thread.currentThread().interrupt();  
  211.             }  
  212.                 }  
  213.   
  214.                 if (g.broken)  
  215.                     throw new BrokenBarrierException();  
  216.   
  217.                 if (g != generation)  
  218.                     return index;  
  219.   
  220.                 if (timed && nanos <= 0L) {  
  221.                     breakBarrier();  
  222.                     throw new TimeoutException();  
  223.                 }  
  224.             }  
  225.         } finally {  
  226.             lock.unlock();  
  227.         }  
  228.     }  
  229.   
  230.     /** 
  231.      * Creates a new <tt>CyclicBarrier</tt> that will trip when the 
  232.      * given number of parties (threads) are waiting upon it, and which 
  233.      * will execute the given barrier action when the barrier is tripped, 
  234.      * performed by the last thread entering the barrier. 
  235.      * 
  236.      * @param parties the number of threads that must invoke {@link #await} 
  237.      *        before the barrier is tripped 
  238.      * @param barrierAction the command to execute when the barrier is 
  239.      *        tripped, or {@code null} if there is no action 
  240.      * @throws IllegalArgumentException if {@code parties} is less than 1 
  241.      */  
  242.     public CyclicBarrier(int parties, Runnable barrierAction) {  
  243.         if (parties <= 0throw new IllegalArgumentException();  
  244.         this.parties = parties;  
  245.         this.count = parties;  
  246.         this.barrierCommand = barrierAction;  
  247.     }  
  248.   
  249.     /** 
  250.      * Creates a new <tt>CyclicBarrier</tt> that will trip when the 
  251.      * given number of parties (threads) are waiting upon it, and 
  252.      * does not perform a predefined action when the barrier is tripped. 
  253.      * 
  254.      * @param parties the number of threads that must invoke {@link #await} 
  255.      *        before the barrier is tripped 
  256.      * @throws IllegalArgumentException if {@code parties} is less than 1 
  257.      */  
  258.     public CyclicBarrier(int parties) {  
  259.         this(parties, null);  
  260.     }  
  261.   
  262.     /** 
  263.      * Returns the number of parties required to trip this barrier. 
  264.      * 
  265.      * @return the number of parties required to trip this barrier 
  266.      */  
  267.     public int getParties() {  
  268.         return parties;  
  269.     }  
  270.   
  271.     /** 
  272.      * Waits until all {@linkplain #getParties parties} have invoked 
  273.      * <tt>await</tt> on this barrier. 
  274.      * 
  275.      * <p>If the current thread is not the last to arrive then it is 
  276.      * disabled for thread scheduling purposes and lies dormant until 
  277.      * one of the following things happens: 
  278.      * [list] 
  279.      * <li>The last thread arrives; or 
  280.      * <li>Some other thread {@linkplain Thread#interrupt interrupts} 
  281.      * the current thread; or 
  282.      * <li>Some other thread {@linkplain Thread#interrupt interrupts} 
  283.      * one of the other waiting threads; or 
  284.      * <li>Some other thread times out while waiting for barrier; or 
  285.      * <li>Some other thread invokes {@link #reset} on this barrier. 
  286.      * [/list] 
  287.      * 
  288.      * <p>If the current thread: 
  289.      * [list] 
  290.      * <li>has its interrupted status set on entry to this method; or 
  291.      * <li>is {@linkplain Thread#interrupt interrupted} while waiting 
  292.      * [/list] 
  293.      * then {@link InterruptedException} is thrown and the current thread's 
  294.      * interrupted status is cleared. 
  295.      * 
  296.      * <p>If the barrier is {@link #reset} while any thread is waiting, 
  297.      * or if the barrier {@linkplain #isBroken is broken} when 
  298.      * <tt>await</tt> is invoked, or while any thread is waiting, then 
  299.      * {@link BrokenBarrierException} is thrown. 
  300.      * 
  301.      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, 
  302.      * then all other waiting threads will throw 
  303.      * {@link BrokenBarrierException} and the barrier is placed in the broken 
  304.      * state. 
  305.      * 
  306.      * <p>If the current thread is the last thread to arrive, and a 
  307.      * non-null barrier action was supplied in the constructor, then the 
  308.      * current thread runs the action before allowing the other threads to 
  309.      * continue. 
  310.      * If an exception occurs during the barrier action then that exception 
  311.      * will be propagated in the current thread and the barrier is placed in 
  312.      * the broken state. 
  313.      * 
  314.      * @return the arrival index of the current thread, where index 
  315.      *         <tt>{@link #getParties()} - 1</tt> indicates the first 
  316.      *         to arrive and zero indicates the last to arrive 
  317.      * @throws InterruptedException if the current thread was interrupted 
  318.      *         while waiting 
  319.      * @throws BrokenBarrierException if [i]another[/i] thread was 
  320.      *         interrupted or timed out while the current thread was 
  321.      *         waiting, or the barrier was reset, or the barrier was 
  322.      *         broken when {@code await} was called, or the barrier 
  323.      *         action (if present) failed due an exception. 
  324.      */  
  325.     public int await() throws InterruptedException, BrokenBarrierException {  
  326.         try {  
  327.             return dowait(false, 0L);  
  328.         } catch (TimeoutException toe) {  
  329.             throw new Error(toe); // cannot happen;  
  330.         }  
  331.     }  
  332.   
  333.     /** 
  334.      * Waits until all {@linkplain #getParties parties} have invoked 
  335.      * <tt>await</tt> on this barrier, or the specified waiting time elapses. 
  336.      * 
  337.      * <p>If the current thread is not the last to arrive then it is 
  338.      * disabled for thread scheduling purposes and lies dormant until 
  339.      * one of the following things happens: 
  340.      * [list] 
  341.      * <li>The last thread arrives; or 
  342.      * <li>The specified timeout elapses; or 
  343.      * <li>Some other thread {@linkplain Thread#interrupt interrupts} 
  344.      * the current thread; or 
  345.      * <li>Some other thread {@linkplain Thread#interrupt interrupts} 
  346.      * one of the other waiting threads; or 
  347.      * <li>Some other thread times out while waiting for barrier; or 
  348.      * <li>Some other thread invokes {@link #reset} on this barrier. 
  349.      * [/list] 
  350.      * 
  351.      * <p>If the current thread: 
  352.      * [list] 
  353.      * <li>has its interrupted status set on entry to this method; or 
  354.      * <li>is {@linkplain Thread#interrupt interrupted} while waiting 
  355.      * [/list] 
  356.      * then {@link InterruptedException} is thrown and the current thread's 
  357.      * interrupted status is cleared. 
  358.      * 
  359.      * <p>If the specified waiting time elapses then {@link TimeoutException} 
  360.      * is thrown. If the time is less than or equal to zero, the 
  361.      * method will not wait at all. 
  362.      * 
  363.      * <p>If the barrier is {@link #reset} while any thread is waiting, 
  364.      * or if the barrier {@linkplain #isBroken is broken} when 
  365.      * <tt>await</tt> is invoked, or while any thread is waiting, then 
  366.      * {@link BrokenBarrierException} is thrown. 
  367.      * 
  368.      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while 
  369.      * waiting, then all other waiting threads will throw {@link 
  370.      * BrokenBarrierException} and the barrier is placed in the broken 
  371.      * state. 
  372.      * 
  373.      * <p>If the current thread is the last thread to arrive, and a 
  374.      * non-null barrier action was supplied in the constructor, then the 
  375.      * current thread runs the action before allowing the other threads to 
  376.      * continue. 
  377.      * If an exception occurs during the barrier action then that exception 
  378.      * will be propagated in the current thread and the barrier is placed in 
  379.      * the broken state. 
  380.      * 
  381.      * @param timeout the time to wait for the barrier 
  382.      * @param unit the time unit of the timeout parameter 
  383.      * @return the arrival index of the current thread, where index 
  384.      *         <tt>{@link #getParties()} - 1</tt> indicates the first 
  385.      *         to arrive and zero indicates the last to arrive 
  386.      * @throws InterruptedException if the current thread was interrupted 
  387.      *         while waiting 
  388.      * @throws TimeoutException if the specified timeout elapses 
  389.      * @throws BrokenBarrierException if [i]another[/i] thread was 
  390.      *         interrupted or timed out while the current thread was 
  391.      *         waiting, or the barrier was reset, or the barrier was broken 
  392.      *         when {@code await} was called, or the barrier action (if 
  393.      *         present) failed due an exception 
  394.      */  
  395.     public int await(long timeout, TimeUnit unit)  
  396.         throws InterruptedException,  
  397.                BrokenBarrierException,  
  398.                TimeoutException {  
  399.         return dowait(true, unit.toNanos(timeout));  
  400.     }  
  401.   
  402.     /** 
  403.      * Queries if this barrier is in a broken state. 
  404.      * 
  405.      * @return {@code true} if one or more parties broke out of this 
  406.      *         barrier due to interruption or timeout since 
  407.      *         construction or the last reset, or a barrier action 
  408.      *         failed due to an exception; {@code false} otherwise. 
  409.      */  
  410.     public boolean isBroken() {  
  411.         final ReentrantLock lock = this.lock;  
  412.         lock.lock();  
  413.         try {  
  414.             return generation.broken;  
  415.         } finally {  
  416.             lock.unlock();  
  417.         }  
  418.     }  
  419.   
  420.     /** 
  421.      * Resets the barrier to its initial state.  If any parties are 
  422.      * currently waiting at the barrier, they will return with a 
  423.      * {@link BrokenBarrierException}. Note that resets [i]after[/i] 
  424.      * a breakage has occurred for other reasons can be complicated to 
  425.      * carry out; threads need to re-synchronize in some other way, 
  426.      * and choose one to perform the reset.  It may be preferable to 
  427.      * instead create a new barrier for subsequent use. 
  428.      */  
  429.     public void reset() {  
  430.         final ReentrantLock lock = this.lock;  
  431.         lock.lock();  
  432.         try {  
  433.             breakBarrier();   // break the current generation  
  434.             nextGeneration(); // start a new generation  
  435.         } finally {  
  436.             lock.unlock();  
  437.         }  
  438.     }  
  439.   
  440.     /** 
  441.      * Returns the number of parties currently waiting at the barrier. 
  442.      * This method is primarily useful for debugging and assertions. 
  443.      * 
  444.      * @return the number of parties currently blocked in {@link #await} 
  445.      */  
  446.     public int getNumberWaiting() {  
  447.         final ReentrantLock lock = this.lock;  
  448.         lock.lock();  
  449.         try {  
  450.             return parties - count;  
  451.         } finally {  
  452.             lock.unlock();  
  453.         }  
  454.     }  
  455. }  



2. CyclicBarrier工具类的使用案例 

CyclicBarrier可以让所有线程都处于等待状态(阻塞),然后在满足条件的情况下继续执行。打个比方: 几个小组包一辆车去旅游,一天行程包括上午小组自由活动和下午自由活动:各个小组早上自由活动,但是11点半大巴车上集合,然后吃饭并赶赴下一个景区。 
各个小组下午自由活动,但是要5点半大巴车上集合,然后一起回去。 

Java代码  收藏代码
  1. package my.concurrent.cyclicbarrier;  
  2.   
  3. import java.util.concurrent.BrokenBarrierException;  
  4. import java.util.concurrent.CyclicBarrier;  
  5.   
  6. public class TeamGroup implements Runnable {  
  7.   
  8.     private final CyclicBarrier barrier;  
  9.   
  10.     private int groupNumber;  
  11.   
  12.     /** 
  13.      * @param barrier 
  14.      * @param groupNumber 
  15.      */  
  16.     public TeamGroup(CyclicBarrier barrier, int groupNumber) {  
  17.         this.barrier = barrier;  
  18.         this.groupNumber = groupNumber;  
  19.     }  
  20.   
  21.     public void run() {  
  22.   
  23.         try {  
  24.             print();  
  25.             barrier.await();  
  26.         } catch (InterruptedException e) {  
  27.             // TODO Auto-generated catch block  
  28.             e.printStackTrace();  
  29.         } catch (BrokenBarrierException e) {  
  30.             // TODO Auto-generated catch block  
  31.             e.printStackTrace();  
  32.         }  
  33.     }  
  34.   
  35.     private void print() {  
  36.         System.out.println(String.format("第%d组完成该地景点浏览,并回到集合点", groupNumber));  
  37.     }  
  38.   
  39. }  



Java代码  收藏代码
  1. package my.concurrent.cyclicbarrier;  
  2.   
  3. import java.util.concurrent.CyclicBarrier;  
  4. import java.util.concurrent.ExecutorService;  
  5. import java.util.concurrent.Executors;  
  6.   
  7. public class CyclicBarrierTest {  
  8.   
  9.     private static final int THREAD_SLEEP_MILLIS = 6000;  
  10.   
  11.     /** 旅游小数的个数 */  
  12.     private static final int NUMBER_OF_GROUPS = 6;  
  13.   
  14.     /** 观光是否结束的标识 */  
  15.     private static boolean tourOver = false;  
  16.   
  17.     public static void main(String[] args) {  
  18.   
  19.         ExecutorService service = Executors  
  20.                 .newFixedThreadPool(NUMBER_OF_GROUPS);  
  21.   
  22.         CyclicBarrier cb = new CyclicBarrier(NUMBER_OF_GROUPS, new Runnable() {  
  23.   
  24.             public void run() {  
  25.                 /* 
  26.                  * 如果一天的游玩结束了,大家可以坐大巴回去了... ... 
  27.                  */  
  28.                 if (isTourOver()) {  
  29.                     System.out.println("各个小组都集合到大巴上,准备回家.. ...");  
  30.                 }  
  31.   
  32.             }  
  33.         });  
  34.   
  35.         System.out.println("用CyclicBarrier辅助工具类模拟旅游过程中小组集合::");  
  36.   
  37.         /* 
  38.          * 上午各个小组自由活动,然后在某个点,比如11点半集合到大巴上。 
  39.          */  
  40.         tourInTheMorning(service, cb);  
  41.         sleep(THREAD_SLEEP_MILLIS);  
  42.   
  43.         /* 
  44.          * 调用reset方法,将barrier设置到初始化状态。 
  45.          *  
  46.          * TODO://不知道这样的调用是否是合理的? 
  47.          */  
  48.         cb.reset();  
  49.   
  50.         /* 
  51.          * 下午各个小组自由活动,然后在某个点,比如11点半集合到大巴上。 
  52.          */  
  53.         tourInTheAfternoon(service, cb);  
  54.   
  55.         /* 
  56.          * 下午小组集合完毕后,一天的观光就结束了,将标志位记为true; 
  57.          */  
  58.         tourOver = true;  
  59.   
  60.         sleep(THREAD_SLEEP_MILLIS);  
  61.         service.shutdown();  
  62.   
  63.     }  
  64.   
  65.     /** 
  66.      * @return the tourOver 
  67.      */  
  68.     public static boolean isTourOver() {  
  69.         return tourOver;  
  70.     }  
  71.   
  72.     /** 
  73.      * @param tourOver 
  74.      *            the tourOver to set 
  75.      */  
  76.     public static void setTourOver(boolean tourOver) {  
  77.         CyclicBarrierTest.tourOver = tourOver;  
  78.     }  
  79.   
  80.     private static void tourInTheMorning(ExecutorService service,  
  81.             final CyclicBarrier cb) {  
  82.         System.out.println("早上自由玩... ... ");  
  83.         for (int groupNumber = 1; groupNumber <= NUMBER_OF_GROUPS; groupNumber++) {  
  84.             service.execute(new TeamGroup(cb, groupNumber));  
  85.         }  
  86.     }  
  87.   
  88.     private static void tourInTheAfternoon(ExecutorService service,  
  89.             final CyclicBarrier cb) {  
  90.         System.out.println("下午自由玩... ... ");  
  91.         for (int groupNumber = 1; groupNumber <= NUMBER_OF_GROUPS; groupNumber++) {  
  92.             service.execute(new TeamGroup(cb, groupNumber));  
  93.         }  
  94.     }  
  95.   
  96.     private static void sleep(long millis) {  
  97.         try {  
  98.             Thread.sleep(millis);  
  99.         } catch (InterruptedException e) {  
  100.             // TODO Auto-generated catch block  
  101.             e.printStackTrace();  
  102.         }  
  103.     }  
  104. }  



某一次运行的结果如下: 

用CyclicBarrier辅助工具类模拟旅游过程中小组集合:: 
早上自由玩... ... 
第1组完成该地景点浏览,并回到集合点 
第6组完成该地景点浏览,并回到集合点 
第2组完成该地景点浏览,并回到集合点 
第5组完成该地景点浏览,并回到集合点 
第4组完成该地景点浏览,并回到集合点 
第3组完成该地景点浏览,并回到集合点 
下午自由玩... ... 
第1组完成该地景点浏览,并回到集合点 
第3组完成该地景点浏览,并回到集合点 
第4组完成该地景点浏览,并回到集合点 
第5组完成该地景点浏览,并回到集合点 
第6组完成该地景点浏览,并回到集合点 
第2组完成该地景点浏览,并回到集合点 
各个小组都集合到大巴上,准备回家.. ... 

3. CyclicBarrier vs. CountDownLatch 

相同点:   
    两者都是用于线程同步的辅助工具类,都提供了await方法来达到线程等待。 

不同点: 
1. 从类的实现上看: 
    CountDownLatch通过一个继承AbstractQueuedSynchronizer的内部类Sync来完成同步。 
    CyclicBarrier通过Condition和Lock来完成同步。 
2. 从类的用途上看: 
    CountDownLatch: 一个或者是一部分线程,等待另外一部线程都完成操作。 
    CyclicBarrier: 所有线程互相等待完成。 
3. 从适合场合来看: 
CountDownLatch中计数是不能被重置的。如果需要一个可以重置计数的版本,需要考虑使用CyclicBarrie。 
CountDownLatch适用于一次同步。当使用CountDownLatch时,任何线程允许多次调用countDown(). 那些调用了await()方法的线程将被阻塞,直到那些没有被阻塞线程调用countDown()使计数到达0为止。 

 

相反,CyclicBarrier适用于多个同步点。例如:一组正在运算的线程,在进入下一个阶段计算之前需要同步。 

 

与CountDownLatch不同,一个处于某个阶段的线程调用了await()方法将会被阻塞,直到所有属于这个阶段的线程都调用了await()方法为止。 

在CyclicBarrier中,如果一个线程由于中断,失败或者超时等原因,过早地离开了栅栏点,那么所有在栅栏点等待的其它线程也会通过BrokenBarrierException或者IterupedException异常地离开。 

 

4. 从关注点上来看: 使用CountDownLatch时,它关注的一个线程或者多个线程需要在其它在一组线程完成操作之后,在去做一些事情。比如:服务的启动等。CyclicBarrier更加关注的是公共的栅栏点(Common Barrier point),关注的是这个点上的同步。这个点之前之后的事情并不需要太多的关注。比如:一个并行计算需要分几个阶段完成,在一个阶段完成进入到下一个阶段之前,需要同步,这时候CyclicBarrie很适合。 

由于知识的原因,上述例子以及CountDownLatch和CyclicBarrier的比较上会存在不足,如果有问题请大家指正,也希望大家能够提供两者其它方面的不同之处,一起学习分享。

分享到:
评论

相关推荐

    Java并发编程:设计原则与模式(第二版)-3

    《Java并发编程:设计原则与模式(第二版)》是一本深入探讨Java多线程编程技术的权威著作。这本书详细阐述了在Java平台中进行高效并发处理的关键概念、设计原则和实用模式。以下是对该书内容的一些核心知识点的概述...

    java并发编程:设计原则与模式.rar

    《Java并发编程:设计原则与模式》是一本深入探讨Java多线程编程的书籍,它涵盖了并发编程中的关键概念、原则和模式。在Java中,并发处理是优化应用程序性能、提高资源利用率的重要手段,尤其在现代多核处理器的环境...

    Java并发编程:设计原则与模式(第二版)-3PDF

    《Java并发编程:设计原则与模式(第二版)》是一本深入探讨Java平台上的多线程和并发编程的权威著作。这本书旨在帮助开发者理解和掌握如何有效地编写可扩展且高效的并发程序。以下是书中涵盖的一些关键知识点: 1....

    一本经典的多线程书籍 Java并发编程 设计原则与模式 第二版 (英文原版)

    这本书深入探讨了Java平台上的多线程编程技术,为开发者提供了丰富的设计原则和模式,帮助他们理解和解决并发环境中的各种挑战。通过阅读这本书,读者可以提升在并发编程领域的专业知识,掌握构建高效、稳定、可扩展...

    Java 并发编程:设计原则与模式

    总的来说,“Java并发编程:设计原则与模式”涵盖了从基础到高级的并发编程概念,帮助开发者理解和应用Java平台的并发特性,以构建可扩展、高性能的多线程应用程序。通过深入学习和实践这些知识点,开发者可以在...

    《Java并发编程:设计原则与模式(第二版)》

    《Java并发编程:设计原则与模式(第二版)》是一本深入探讨Java多线程编程技术的权威著作。这本书详细阐述了如何在Java环境中高效、安全地进行并发编程,涵盖了多线程设计的关键原则和常见模式。对于Java开发者来说...

    Java并发编程的设计原则与模式

    在Java编程领域,并发编程是一项核心技能,尤其是在多核处理器和分布式系统中,它能显著提升应用程序的性能和效率。本文将深入探讨Java并发编程的设计原则与模式,旨在帮助开发者理解并有效地应用这些原则和模式。 ...

    Java并发编程:设计原则与模式(第二版)_阅读密码www.zasp.net_仅提供试看如需要请购买原版书

    5. **并发工具类**:Java.util.concurrent包提供了许多并发工具类,如CountDownLatch、CyclicBarrier、Semaphore和Exchanger,它们在多线程协作中起到关键作用。 6. **并发模式**:书中可能会介绍一系列并发编程...

    Java 并发编程实战.pdf

    - **Java并发机制**:Java提供了丰富的并发机制来支持多线程程序的编写,包括但不限于`Thread`类、`Runnable`接口、`Callable`接口以及`Executor`框架等。 - **锁机制**:Java中的锁机制主要包括synchronized关键字...

    JAVA并发编程艺术pdf版

    通过深入学习《JAVA并发编程艺术》,开发者能更好地理解并发编程的原理,熟练运用Java提供的并发工具和API,解决实际开发中的多线程问题,提高软件的性能和稳定性。这是一本值得每一位Java开发者研读的书。

    java 并发编程的艺术pdf清晰完整版 源码

    这本书全面地介绍了Java平台上的并发和多线程编程技术,旨在帮助开发者解决在实际工作中遇到的并发问题,提高程序的性能和可伸缩性。 并发编程是现代计算机系统中不可或缺的一部分,尤其是在多核处理器成为主流的...

    java并发编程2

    - **`java.util.concurrent` 包** 提供了丰富的并发工具类,如`ExecutorService`用于管理线程池,`Semaphore`用于许可证管理,`CountDownLatch`用于同步多个线程,`CyclicBarrier`用于多线程间的协作等。...

    《java 并发编程实战高清PDF版》

    然而,多线程编程也带来了同步和竞态条件等问题,这需要开发者具备良好的线程管理和同步机制的知识。 书中详细讨论了Java中的线程管理,包括如何创建和启动线程,以及如何控制线程的生命周期。读者将学习到`Thread`...

    Java并发编程_设计原则和模式(CHM)

    Java并发编程是软件开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。本资源"Java并发编程_设计原则和模式(CHM)"聚焦于Java语言在并发环境下的编程技巧、...

    Java并发编程-3.pdf

    多线程协作机制是指在多线程编程中,多个线程之间如何协作、同步和通信,以达到共同完成某个任务的目的。Java 提供了多种多线程协作机制,包括CountDownLatch、CyclicBarrier、Semaphore 等。 1. CountDownLatch ...

    Java并发编程实践高清pdf及源码

    《Java并发编程实践》是一本深入探讨Java多线程编程的经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和David Holmes等专家共同编写。这本书全面介绍了Java平台上的并发编程技术,是Java开发...

    java并发编程艺术

    总而言之,《Java并发编程艺术》这本书将系统性地介绍Java并发编程的各种技术和最佳实践,帮助读者提升在多线程环境下的编程能力,从而设计出更加健壮、高效的Java应用。无论你是初级开发者还是经验丰富的工程师,这...

    java并发编程:juc、aqs

    Java并发编程中的`JUC`(Java Util Concurrency)库是Java平台中用于处理多线程问题的核心工具包,它提供了一系列高效、线程安全的工具类,帮助开发者编写并发应用程序。`AQS`(AbstractQueuedSynchronizer)是JUC库中的...

    《java并发编程的核心方法和框架》

    - **ExecutorService**:Java并发框架中的核心组件,用于管理线程池,提高线程复用率。 - **Future和Callable**:`Future`接口代表异步计算的结果,`Callable`接口定义了计算任务,可以返回结果。 - **Semaphore*...

Global site tag (gtag.js) - Google Analytics