`

读CyclicBarrier源码

阅读更多
//一个循环的屏障。所有的线程在屏障处等待其他线程执行完毕。然后再各自执行。
//先看构造函数
 public CyclicBarrier(int parties) {
        this(parties, null);
    }


//barrierAction代表在屏障上等待的最后一个线程已经执行完后,执行的runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }


//等待其他线程执行到这
 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }


//如果count-1不为0则线程在trip条件上等待。否则唤醒在trip条件上等待的线程。
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();
            //如果线程已经被中断了
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;
	   //如果已经全部执行完了
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
	           //执行runnable
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
		        //在trip条件上等待
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

 //中断此barrier将所有在trip条件上等待的线程加入唤醒的队列
 private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }


 //重新生成下一代CyclicBarrier
 private void nextGeneration() {
        //唤醒在trip上等待的所有线程        
        trip.signalAll();
        // set up next generation
	//设置下一代操作(重用CyclicBarrier)
        count = parties;
        generation = new Generation();
    }

 //在此时间段等待
 public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

 //是否已经被损坏
 public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

//返回要求启动此CyclicBarrier的参与者数量。
public int getParties() {
        return parties;
    }

//还有几个参与者在等待
public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

//损坏旧屏障。创建新一代
 public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
分享到:
评论

相关推荐

    java并发编程实战源码,java并发编程实战pdf,Java源码.zip

    以下将围绕这本书的源码、PDF及Java并发编程的相关知识点进行详细阐述。 1. **并发基础** - **线程**:Java中的线程是执行流程的基本单元,它共享同一进程的内存空间,但拥有独立的执行路径。 - **线程创建**:...

    《Java并发编程高阶技术-高性能并发框架源码解析与实战》学习.zip

    再者,CopyOnWriteArrayList是一种特殊的线程安全列表,它通过复制原列表来保证并发安全,适用于读多写少的场景,提供了高效的迭代性能。 同步工具类如Semaphore(信号量)、CyclicBarrier(回环栅栏)和...

    java多线程设计模式详解(PDF及源码)

    - **CopyOnWriteArrayList/CopyOnWriteArraySet**:适用于读多写少的情况,读操作无需加锁,写操作则复制整个集合。 7. **源码分析** - PDF文档和源码结合可以帮助读者深入理解多线程设计模式的实际应用,通过...

    java高并发编程源码.zip

    - **CopyOnWriteArrayList/CopyOnWriteArraySet**:写时复制策略,读操作无锁,写操作复制整个数组,适合读多写少的场景。 5. **线程池**: - **ExecutorService**:Java并发框架的一部分,提供线程池服务,可以...

    Java并发编程的艺术源码

    - `CyclicBarrier`和`CountDownLatch`: 同步辅助类,允许一组线程等待其他线程到达某个点后继续执行。 - `Semaphore`:信号量,控制同时访问特定资源的线程数量。 4. **并发集合** - `ConcurrentHashMap`: 并发...

    基于JDK源码解析Java领域中的并发锁,我们需要特别关注哪些内容?

    基于JDK源码解析Java并发锁,我们需要关注以下几个关键知识点: 1. **AQS(AbstractQueuedSynchronizer)基础同步器**: AQS是一个用于构建锁和同步器的框架,它维护了一个FIFO的等待队列,提供了两种模式:独占和...

    Java互联网架构多线程并发编程原理及实战 视频教程 下载4.zip

    4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 StampedLock原理及使用.mp4 5-1 wait、notify、notifyAll.mp4 5-2 ...

    Java互联网架构多线程并发编程原理及实战 视频教程 下载2.zip

    4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 StampedLock原理及使用.mp4 5-1 wait、notify、notifyAll.mp4 5-2 ...

    Java互联网架构多线程并发编程原理及实战 视频教程 下载3.zip

    4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 StampedLock原理及使用.mp4 5-1 wait、notify、notifyAll.mp4 5-2 ...

    Java互联网架构多线程并发编程原理及实战 视频教程 下载.zip

    4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 StampedLock原理及使用.mp4 5-1 wait、notify、notifyAll.mp4 5-2 ...

    Java互联网架构多线程并发编程原理及实战 视频教程 下载1.zip

    4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 StampedLock原理及使用.mp4 5-1 wait、notify、notifyAll.mp4 5-2 ...

    Java多线程源码笔记.pdf

    ReentrantReadWriteLock是读写锁,允许多个读线程同时访问,但写线程独占资源,提高了并发性能。 7. 线程池:Java提供了ExecutorService和ThreadPoolExecutor等工具来管理线程池,线程池可以有效控制运行的线程数量...

    java并发编程艺术源码-ArtConcurrentBook:JAVA并发编程的艺术

    - **读写锁**:`ReentrantReadWriteLock`允许多个读线程同时访问,但写线程独占资源,提高了并发性能。 3. **并发工具类** - **Future和Callable**:`Future`代表异步计算的结果,`Callable`定义了计算任务,它们...

    File 线程读写

    - 并发读操作通常是安全的,因为读操作不会改变文件内容。但并发写操作需要特别注意,一般需要使用互斥锁或者文件通道的原子操作来确保写操作的顺序。 3. **文件锁(File Locking)**: - 在Java中,`java.nio....

    Java分布式应用学习笔记06浅谈并发加锁机制分析

    这段代码来自`java.util.concurrent.CyclicBarrier`类,其中使用了`ReentrantLock`来进行资源的加锁。这种加锁方式确保了在代码块中对共享资源的操作不会受到其他线程的影响。下面我们将深入分析`ReentrantLock`的...

    java并发集合

    这种方式保证了读操作的高性能,但写操作相对较慢,适用于读多写少的场景。 3. **BlockingQueue**:阻塞队列是一种特殊类型的线程安全队列,它支持阻塞的插入(put)和移除(take)操作。常见的实现有...

    java多线程设计模式详解

    此外,还可以使用Semaphore信号量、CountDownLatch倒计时锁、CyclicBarrier同步屏障等工具类。 以上是部分Java多线程设计模式的概述,每个模式都有其适用场景和优缺点。实际开发中,开发者应根据需求选择合适的模式...

    多线程面试题及处理方案和详解

    - **CyclicBarrier**:让一组线程等待至某个时间点,当所有线程到达时才全部继续运行。 #### 七、JUC阻塞队列 - **ArrayBlockingQueue**:基于数组的有界阻塞队列。 - **LinkedBlockingQueue**:基于链表的阻塞...

    guava 20.0

    Guava提供了强大的并发工具,如`CountDownLatch`、`CyclicBarrier`和`Semaphore`,这些都是Java并发库的重要补充。此外,`ListenableFuture`接口允许对异步操作进行监听和协调,增强了异步编程的能力。 3. **缓存...

Global site tag (gtag.js) - Google Analytics