`
tianruirui
  • 浏览: 5560 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

源码阅读之CyclicBarrier

阅读更多

源码阅读是基于JDK7,本篇主要涉及CyclicBarrier常用方法源码分析。Java技术分享微信公众号JavaQ,欢迎围观吐槽,最新文章分享公众号同步更新!

1.概述
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法,所有被屏障拦截的线程才会继续运行await方法后面的程序。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该屏障点在释放等待线程后可以重用,所以称它为循环的屏障点。CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达屏障点之后(但在释放所有线程之前),该命令只在所有线程到达屏障点之后运行一次,并且该命令由最后一个进入屏障点的线程执行。

2.使用样例
下面的代码演示了CyclicBarrier简单使用的样例。

public class CyclicBarrierDemo {

    @Test
    public void test() {
        final CyclicBarrier barrier = new CyclicBarrier(2, myThread);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName());
                    barrier.await();
                    System.out.println(Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "thread1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName());
                    barrier.await();
                    System.out.println(Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "thread2").start();
    }

    Thread myThread = new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("myThread");
        }
    }, "thread3");
}

输出结果如下所示:

thread1
thread2
myThread
thread2
thread1

3.数据结构
CyclicBarrier中声明了如下一些属性及变量:

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;

(1)lock用于保护屏障入口的锁;
(2)trip线程等待条件;
(3)parties参与等待的线程数;
(4)barrierCommand当所有线程到达屏障点之后,首先执行的命令;
(5)count实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties。

4.构造方法
提供了两个构造函数可供使用。

    //创建一个CyclicBarrier实例,parties指定参与相互等待的线程数,
    //barrierAction指定当所有线程到达屏障点之后,首先执行的操作,该操作由最后一个进入屏障点的线程执行。
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    //创建一个CyclicBarrier实例,parties指定参与相互等待的线程数
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

5.getParties方法

    //返回参与相互等待的线程数
    public int getParties() {
        return parties;
    }

6.await方法

    //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态
    //直到所有线程都到达屏障点,当前线程才会被唤醒
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }

    //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态
    //在timeout指定的超时时间内,等待其他参与线程到达屏障点
    //如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   //当所有参与的线程都到达屏障点,立即去唤醒所有处于休眠状态的线程,恢复执行
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        //让当前执行的线程阻塞,处于休眠状态
                        trip.await();
                    else if (nanos > 0L)
                        //让当前执行的线程阻塞,在超时时间内处于休眠状态
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    //唤醒所有处于休眠状态的线程,恢复执行
    //重置count值为parties
    //重置中断状态为false
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    //唤醒所有处于休眠状态的线程,恢复执行
    //重置count值为parties
    //重置中断状态为true
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

这个等待的await方法,其实是使用ReentrantLock和Condition控制实现的。

7.isBroken方法

    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

判断此屏障是否处于中断状态。如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回true;否则返回false。

8.reset方法

    //将屏障重置为其初始状态。
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //唤醒所有等待的线程继续执行,并设置屏障中断状态为true
            breakBarrier();   // break the current generation
            //唤醒所有等待的线程继续执行,并设置屏障中断状态为false
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

9.getNumberWaiting方法

    //返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

小结:
1.CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。
2.这个等待的await方法,其实是使用ReentrantLock和Condition控制实现的。

0
1
分享到:
评论

相关推荐

    Java并发系列之CyclicBarrier源码分析

    Java并发系列之CyclicBarrier源码分析 CyclicBarrier是Java并发系列中的一种同步工具类,用于实现一组线程相互等待。当所有线程都到达某个屏障点后,再进行后续的操作。下面是对CyclicBarrier源码的详细分析。 ...

    CyclicBarrier的用法

    在Java多线程编程中,`CyclicBarrier`是一个非常重要的同步工具类,它允许一组线程等待其他线程到达某个屏障点后再一起继续执行。这个屏障点就是我们所说的“循环栅栏”,顾名思义,它就像一个旋转门,所有线程必须...

    jdk源码阅读.zip

    阅读JDK源码是提升JAVA技术的关键步骤,因为它揭示了Java平台的基础构造和设计理念。JDK1.8源码包含了众多重要的API,如IO框架、集合框架和并发框架等,这些都是Java开发者日常工作中不可或缺的部分。下面,我们将...

    Java多线程开发之CyclicBarrier

    近研究了一个别人的源码,其中用到多个线程并行操作一个文件,并且在所有线程全部结束后才进行主线程后面的处理。  其用到java.util.concurrent.CyclicBarrier 这个类。  CyclicBarrier是一个同步辅助类,它允许...

    Android 7.0源码(Nougat)

    `jsr166`是Java并发包的一部分,包含了Java并发特性的实现,如线程池、Future和CyclicBarrier等。 综上所述,Android 7.0源码提供了深入研究Android系统的机会,它涵盖了从低级硬件交互到高级应用开发的全部层面。...

    疯狂Java flashget源码

    Java提供了多种线程同步机制,如synchronized关键字、wait()、notify()和notifyAll()方法,以及Semaphore、CyclicBarrier等高级并发工具。 2. **并发下载策略**:在FlashGet的实现中,关键在于如何有效地分配和管理...

    Java编程方法学源码 Java源码 Java编程的艺术

    11. **并发工具类**:Java并发包(java.util.concurrent)提供了一系列高级并发工具,如Semaphore、CyclicBarrier等,源码可以帮助理解这些工具的实现和使用。 通过对这些知识点的学习和研究,不仅可以深化对Java...

    java并发编程源码

    Java提供了丰富的并发工具和API,如`Thread`类、`ExecutorService`、`Semaphore`、`CountDownLatch`、`CyclicBarrier`等,这些都将在源码中有所体现。 1. **线程与线程安全** Java并发编程的基础是`Thread`类,...

    java并发源码分析之实战编程

    "java并发源码分析之实战编程"这个主题深入探讨了Java平台上的并发处理机制,旨在帮助开发者理解并有效地利用这些机制来提高程序性能和可扩展性。在这个专题中,我们将围绕Java并发库、线程管理、锁机制、并发容器...

    JDK1.6源码

    Java的并发库在JDK 1.6中得到了显著增强,包括Executor框架、Future接口、CountDownLatch、CyclicBarrier等。源码解析有助于掌握这些并发工具类的实现原理,从而编写更高效的多线程程序。 4. **I/O流** JDK 1.6对...

    Java开发实战经典源码

    对于初学者来说,通过阅读和分析源码可以了解如何组织代码结构、定义类和方法、处理异常、实现接口等基本概念。 在"第八章"中,可能涉及了Java的一些高级主题,例如: 1. **异常处理**:Java中的异常处理是通过try...

    《JAVA.核心技术.卷II:高级特性》源码

    源码可能包括同步机制(如synchronized关键字、wait/notify、ReentrantLock等)和并发工具类(如Semaphore、CountDownLatch、CyclicBarrier)的应用示例。 2. **集合框架**:Java集合框架是处理对象集合的重要工具...

    葛一鸣 实战java高并发程序设计 源码

    4. **并发工具类**:`java.util.concurrent`包还包含了许多并发工具类,如`CountDownLatch`、`CyclicBarrier`、`Semaphore`等,用于协调线程间的同步和通信。这些工具在处理并发问题时非常有用。 5. **并发编程模式...

    多线程环境下快速登录YY案例源码

    在IT行业中,多线程是实现并发执行任务的关键技术,特别是在...阅读并分析这份源码,可以更深入地理解上述知识点在实际项目中的应用。学习并实践这个案例,对于提升你在多线程编程和网络应用开发方面的能力将大有裨益。

    java核心技术第十版源码

    - **ConcurrentHashMap, CopyOnWriteArrayList, CountDownLatch, CyclicBarrier, Semaphore** 等并发工具类的使用,优化多线程环境下的程序设计。 通过这些源码,读者不仅可以加深对Java语法的理解,还能掌握面向...

    java多线程并发实战和源码

    通过阅读这些示例,可以深入理解并发设计模式,如生产者-消费者模型、双端队列、线程池的实现原理等。 总结来说,Java多线程并发实战和源码的学习涵盖了线程创建与管理、同步机制、并发容器、内存模型以及并发工具...

    大漠多线程范例源码

    Java提供了`synchronized`关键字、`Lock`接口(如`ReentrantLock`)、`Semaphore`、`CyclicBarrier`等工具。源码中可能包含这些同步机制的示例。 5. **死锁**:当两个或更多线程相互等待对方释放资源而形成的一种...

    jdk:jdk源码阅读

    【标题】:深入理解JDK源码 在Java开发领域,深入阅读JDK源码是提升技术水平...总之,JDK源码阅读是一场深度技术探索之旅,它将帮助我们揭开Java语言的神秘面纱,提升我们的编程技艺,使我们成为更优秀的Java开发者。

    28个java常用的工具类源码

    18. **CountDownLatch和CyclicBarrier**:同步原语,用于多线程间的协调。 19. **Semaphore**:信号量,用于控制并发访问的资源数量。 20. **Future和ExecutorService**:异步编程和任务调度的重要组件。 21. **...

    《Java Concurrency in Practice》源码

    2. **并发工具类**:Java Concurrency API提供了丰富的并发工具类,如ExecutorService、Semaphore、CountDownLatch、CyclicBarrier等,这些工具可以帮助我们更好地管理和协调线程,实现更高效的并发程序。...

Global site tag (gtag.js) - Google Analytics