`

【转】深入浅出CyclicBarrier

阅读更多

如果说CountDownLatch是一次性的,那么CyclicBarrier正好可以循环使用。它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。所谓屏障点就是一组任务执行完毕的时刻。

 

清单1 一个使用CyclicBarrier的例子

package xylz.study.concurrency.lock;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

    final CyclicBarrier barrier;

    final int MAX_TASK;

    public CyclicBarrierDemo(int cnt) {
        barrier = new CyclicBarrier(cnt + 1);
        MAX_TASK = cnt;
    }

    public void doWork(final Runnable work) {
        new Thread() {

            public void run() {
                work.run();
                try {
                    int index = barrier.await();
                    doWithIndex(index);
                } catch (InterruptedException e) {
                    return;
                } catch (BrokenBarrierException e) {
                    return;
                }
            }
        }.start();
    }

    private void doWithIndex(int index) {
        if (index == MAX_TASK / 3) {
            System.out.println("Left 30%.");
        } else if (index == MAX_TASK / 2) {
            System.out.println("Left 50%");
        } else if (index == 0) {
            System.out.println("run over");
        }
    }

    public void waitForNext() {
        try {
            doWithIndex(barrier.await());
        } catch (InterruptedException e) {
            return;
        } catch (BrokenBarrierException e) {
            return;
        }
    }

    public static void main(String[] args) {
        final int count = 10;
        CyclicBarrierDemo demo = new CyclicBarrierDemo(count);
        for (int i = 0; i < 100; i++) {
            demo.doWork(new Runnable() {

                public void run() {
                    //do something
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e) {
                        return;
                    }
                }
            });
            if ((i + 1) % count == 0) {
                demo.waitForNext();
            }
        }
    }

}

清单1描述的是一个周期性处理任务的例子,在这个例子中有一对的任务(100个),希望每10个为一组进行处理,当前仅当上一组任务处理完成后才能进行下一组,另外在每一组任务中,当任务剩下50%,30%以及所有任务执行完成时向观察者发出通知。

在这个例子中,CyclicBarrierDemo 构建了一个count+1的任务组(其中一个任务时为了外界方便挂起主线程)。每一个子任务里,人物本身执行完毕后都需要等待同组内其它任务执行完成后才能继续。同时在剩下任务50%、30%已经0时执行特殊的其他任务(发通知)。

很显然CyclicBarrier有以下几个特点:

  • await()方法将挂起线程,直到同组的其它线程执行完毕才能继续
  • await()方法返回线程执行完毕的索引,注意,索引时从任务数-1开始的,也就是第一个执行完成的任务索引为parties-1,最后一个为0,这个parties为总任务数,清单中是cnt+1
  • CyclicBarrier 是可循环的,显然名称说明了这点。在清单1中,每一组任务执行完毕就能够执行下一组任务。

另外除了CyclicBarrier除了以上特点外,还有以下几个特点:

  • 如果屏障操作不依赖于挂起的线程,那么任何线程都可以执行屏障操作。在清单1中可以看到并没有指定那个线程执行50%、30%、0%的操作,而是一组线程(cnt+1)个中任何一个线程只要到达了屏障点都可以执行相应的操作
  • CyclicBarrier 的构造函数允许携带一个任务,这个任务将在0%屏障点执行,它将在await()==0后执行。
  • CyclicBarrier 如果在await时因为中断、失败、超时等原因提前离开了屏障点,那么任务组中的其他任务将立即被中断,以InterruptedException异常离开线程。
  • 所有await()之前的操作都将在屏障点之前运行,也就是CyclicBarrier 的内存一致性效果

 

CyclicBarrier 的所有API如下:

  • public CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
  • public CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
  • public int await() throws InterruptedException, BrokenBarrierException 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
  • public int await(long timeout,TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
  • public int getNumberWaiting() 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
  • public int getParties() 返回要求启动此 barrier 的参与者数目。
  • public boolean isBroken() 查询此屏障是否处于损坏状态。
  • public void reset() 将屏障重置为其初始状态。

针对以上API,下面来探讨下CyclicBarrier 的实现原理,以及为什么有这样的API。

清单2 CyclicBarrier.await*()的实现片段

    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       int index = --count;
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

清单2有点复杂,这里一点一点的剖析,并且还原到最原始的状态。

利用前面学到的知识,我们知道要想让线程等待其他线程执行完毕,那么已经执行完毕的线程(进入await*()方法)就需要park(),直到超时或者被中断,或者被其它线程唤醒。

前面说过CyclicBarrier 的特点是要么大家都正常执行完毕,要么大家都异常被中断,不会其中有一个被中断而其它正常执行完毕的现象存在。这种特点叫all-or-none。类似的概念是原子操作中的要么大家都执行完,要么一个操作都不执行完。当前这其实是两个概念了。要完成这样的特点就必须有一个状态来描述曾经是否有过线程被中断(broken)了,这样后面执行完的线程就该知道是否需要继续等待了。而在CyclicBarrier 中Generation 就是为了完成这件事情的。Generation的定义非常简单,整个结构就只有一个变量boolean broken = false;,定义是否发生了broken操作。

由于有竞争资源的存在(broken/index),所以毫无疑问需要一把锁lock。拿到锁后整个过程是这样的:

  1. 检查是否存在中断位(broken),如果存在就立即以BrokenBarrierException异常返回。此异常描述的是线程进入屏障被破坏的等待状态。否则进行2。
  2. 检查当前线程是否被中断,如果是那么就设置中断位(使其它将要进入等待的线程知道),另外唤醒已经等待的线程,同时以InterruptedException异常返回,表示线程要处理中断。否则进行3。
  3. 将剩余任务数减1,如果此时剩下的任务数为0,也就是达到了公共屏障点,那么就执行屏障点任务(如果有的话),同时创建新的Generation(在这个过程中会唤醒其它所有线程,因此当前线程是屏障点线程,那么其它线程就都应该在等待状态)。否则进行4。
  4. 到这里说明还没有到达屏障点,那么此时线程就应该park()。很显然在下面的for循环中就是要park线程。这里park线程采用的是Condition.await()方法。也就是trip.await*()。为什么需要Condition?因为所有的await*()其实等待的都是一个条件,一旦条件满足就应该都被唤醒,所以Condition整好满足这个特点。所以到这里就会明白为什么在步骤3中到达屏障点时创建新的Generation的时候是一定要唤醒其它线程的原因了。

上面4个步骤其实只是描述主体结构,事实上整个过程中有非常多的逻辑来处理异常引发的问题,比如执行屏障点任务引发的异常,park线程超时引发的中断异常和超时异常等等。所以对于await()而言,异常的处理比业务逻辑的处理更复杂,这就解释了为什么await()的时候可能引发InterruptedException,BrokenBarrierException,TimeoutException 三种异常。

清单3 生成下一个循环周期并唤醒其它线程

private void nextGeneration() {
     trip.signalAll();
     count = parties;
     generation = new Generation();
}

清单3 描述了如何生成下一个循环周期的过程,在这个过程中当然需要使用Condition.signalAll()唤醒所有已经执行完成并且正在等待的线程。另外这里count描述的是还有多少线程需要执行,是为了线程执行完毕索引计数。

isBroken() 方法描述的就是generation.broken,也即线程组是否发生了异常。这里再一次解释下为什么要有这个状态的存在。

如果一个将要位于屏障点或者已经位于屏障点的而执行屏障点任务的线程发生了异常,那么即使唤醒了其它等待的线程,其它等待的线程也会因为循环等待而“死去”,因为再也没有一个线程来唤醒这些第二次进行park的线程了。还有一个意图是,如果屏障点都已经损坏了,那么其它将要等待屏障点的再线程挂起就没有意义了。

写到这里的时候非常不幸,用了4年多了台灯终于“寿终正寝了”。

其实CyclicBarrier 还有一个reset方法,描述的是手动立即将所有线程中断,恢复屏障点,进行下一组任务的执行。也就是与重新创建一个新的屏障点相比,可能维护的代价要小一些(减少同步,减少上一个CyclicBarrier 的管理等等)。

本来是想和Semaphore 一起将的,最后发现铺开后就有点长了,而且也不利于理解和吸收,所以放到下一篇吧。

 

参考资料:

  1. 使用 CyclicBarrier 做线程间同步

  2. CyclicBarrier And CountDownLatch Tutorial

  3. 线程—CyclicBarrier

  4. Java线程学习笔记(十)CountDownLatch 和CyclicBarrier

  5. 关于多线程同步的初步教程--Barrier的设计及使用

  6. Thread coordination with CountDownLatch and CyclicBarrier

  7. 如何充分利用多核CPU,计算很大的List中所有整数的和

 

转自:http://www.blogjava.net/xylz/archive/2010/07/12/325913.html

分享到:
评论

相关推荐

    深入浅出Java多线程.pdf

    ### 深入浅出Java多线程.pdf #### 知识点概览 本PDF文档涉及了Java多线程的全面介绍,分为基础篇、原理篇和JDK工具篇三个部分,旨在帮助读者深入了解Java多线程的概念、原理及实践应用。 #### 基础篇 **1. 进程...

    深入浅出Java_Concurrency

    ### 深入浅出Java_Concurrency #### J.U.C的整体认识 Java的并发编程模型在J.U.C(`java.util.concurrent`)包中得到了全面的展现,这不仅仅是Java语言本身的一大亮点,更是多线程编程领域的重要组成部分。本文...

    深入浅出Java语言程序设计

    《深入浅出Java语言程序设计》是一本旨在帮助初学者和有一定经验的开发者更深入理解Java编程的教材。这本书不仅覆盖了Java的基础知识,还深入探讨了许多高级特性,帮助读者构建坚实的Java编程基础。 首先,Java的...

    深入浅出_Java并发工具包原理讲解

    本文将深入浅出地探讨J.U.C的原理和实现机制,主要包括以下几个方面: 1. 原子操作:原子操作是并发编程中的基础,它能够保证线程安全的进行加减操作或其他简单的计算。Java中提供了Atomic类来支持原子操作,例如...

    【Java入门知识图谱】帮助Java初学者成长

    【对线面试官】深入浅出Java内存模型 Java虚拟机 【对线面试官】Java从编译到执行,发生了什么? 【对线面试官】双亲委派机制 【对线面试官】JVM内存结构 【对线面试官】垃圾回收机制 【对线面试官】CMS垃圾回收器 ...

    Thinking in Java第三版+第四版

    《Thinking in Java》是Bruce Eckel的经典之作,它深入浅出地介绍了Java编程语言的核心概念和技术。这本书分为多个部分,涵盖了从基础语法到高级特性的广泛内容,是学习和理解Java的重要参考资料。以下是对该书内容...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践

    总之,《从PAXOS到ZOOKEEPER分布式一致性原理与实践》是一本深入浅出的分布式一致性学习资料,它将理论与实践相结合,帮助读者掌握分布式系统中一致性问题的解决之道,对于提升Java开发者的分布式系统设计能力...

    JAVA编程思想 第四版.zip

    《JAVA编程思想》第四版是 Bruce Eckel 编著的一本深入浅出的Java编程教程,被誉为Java领域的经典之作。这本书全面介绍了Java语言的核心概念、语法以及编程实践,旨在帮助读者掌握面向对象编程的精髓,并深入理解...

    图解java多线程设计模式-结城浩-完整高清带书签版本

    总之,《图解Java多线程设计模式》是一本深入浅出的多线程编程指南,它不仅涵盖了Java多线程的基础知识,还深入探讨了高级主题和最佳实践,对于想要提升并发编程能力的Java开发者来说,是一本不可多得的参考书。...

    Thingking in java第四版课后习题答案

    《Thinking in Java》是Bruce Eckel的经典之作,它深入浅出地介绍了Java编程语言的核心概念和技术。第四版更是涵盖了Java的最新发展,包括并发处理等内容。这个压缩包包含的资源是该书的完整课后习题解答,对于学习...

    第四版-Thinking+In+Java-练习题答案

    这本书以其深入浅出的讲解方式,全面系统地介绍了Java语言的核心概念和技术。在解答该书的练习题时,我们可以深入理解Java语言的精髓,提升编程技能。 首先,我们要知道Java是一种面向对象的编程语言,它以其强大的...

    javA编程思想第四版中文版课后答案

    这本书深入浅出地介绍了Java语言的核心概念和技术,包括面向对象编程、泛型、集合框架、IO流、多线程、网络编程等多个方面。课后答案则是对书中的习题进行了解答,帮助读者检验学习效果,深化理解。 1. 面向对象...

    Java并发编程从入门到精通(pdf)(附源码)

    这些基础知识是理解并发编程的基石,通过深入浅出的讲解,使读者能够轻松上手。 接着,书中将深入探讨Java并发工具类,如Executor框架、Semaphore信号量、CyclicBarrier和CountDownLatch等,这些工具在实际项目中...

    java 深度历险

    王森可能会以生动的例子和深入浅出的解释,使读者掌握这些基础知识。 其次,深入讨论了Java的面向对象特性,如封装、继承、多态,以及接口与抽象类的区别和使用场景。这部分内容对于理解和应用Java的OOP设计原则至...

    Java Thread Programming

    在"Java Thread Programming"这本书中,作者深入浅出地介绍了如何有效地利用Java进行多线程开发。 首先,线程是程序中的执行流,它允许一个应用程序同时执行多个任务。Java通过`java.lang.Thread`类提供了对线程的...

    Java并发编程常识-梁飞.rar

    阿里大牛梁飞编写的《Java并发编程常识》PPT,深入浅出地讲解了这个主题,对开发者来说是一份宝贵的资源。 首先,我们来探讨Java并发编程的基础概念。并发是指多个执行单元(线程或进程)在同一时间间隔内同时进行...

    java并发编程实践

    《Java并发编程实践》这本书是Java开发者在处理并发和多线程问题时的重要参考资料,它深入浅出地探讨了Java平台上的并发编程技术。获得Jolt大奖提名和成为2006年JavaOne大会的畅销书,证明了其在业界的权威地位。...

    Writing Robust Java Code

    《编写健壮的Java代码》是一本专注于提升Java编程质量的资源,旨在帮助开发者构建稳定、高效且易于维护的软件系统。在这个主题中,我们将会深入探讨一...这本书的内容深入浅出,适合不同层次的Java开发者阅读和参考。

    Java技术面试宝典

    这份宝典深入浅出地讲解了Java编程语言的关键知识点,涵盖了从基础语法到高级特性的全方位内容。 首先,宝典会从Java语言的基础入手,包括但不限于数据类型、变量、运算符、流程控制语句(如if、switch、for、while...

    北大计算机系JAVA培训讲义

    【北大计算机系JAVA培训讲义】是一份专为学习者设计的高质量教学资源,由北京大学计算机系专家编纂,旨在深入浅出地介绍JAVA编程语言的核心概念和技术。这份讲义包含10个PPT文件,涵盖了JAVA编程的各个方面,是初学...

Global site tag (gtag.js) - Google Analytics