来源:https://blog.csdn.net/hanchao5272/article/details/79779639
本章主要对CyclicBarrier进行学习。
1.CyclicBarrier简介
CyclicBarrier,是JDK1.5的java.util.concurrent并发包中提供的一个并发工具类。
所谓Cyclic即 循环 的意思,所谓Barrier即 屏障 的意思。
所以综合起来,CyclicBarrier指的就是 循环屏障,虽然这个叫法很奇怪,但是确能很好地表示它的作用。
其作用在JDK注释中是这样描述的:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
The barrier is called cyclic because it can be re-used after the waiting threads are released.
翻译过来,如下:
CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用CyclicBarrier很有帮助。
这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的。
CyclicBarrier的简单理解
其实,我更喜欢[人满发车]这个词来理解CyclicBarrier的作用:
长途汽车站提供长途客运服务。
当等待坐车的乘客到达20人时,汽车站就会发出一辆长途汽车,让这20个乘客上车走人。
等到下次等待的乘客又到达20人是,汽车站就会又发出一辆长途汽车。
CyclicBarrier的应用场景
CyclicBarrier常用于多线程分组计算。
2.CyclicBarrier方法说明
CyclicBarrier提供的方法有:
——CyclicBarrier(parties)
初始化相互等待的线程数量的构造方法。
——CyclicBarrier(parties,Runnable barrierAction)
初始化相互等待的线程数量以及屏障线程的构造方法。
屏障线程的运行时机:等待的线程数量=parties之后,CyclicBarrier打开屏障之前。
举例:在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。
——getParties()
获取CyclicBarrier打开屏障的线程数量,也成为方数。
——getNumberWaiting()
获取正在CyclicBarrier上等待的线程数量。
——await()
在CyclicBarrier上进行阻塞等待,直到发生以下情形之一:
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
——await(timeout,TimeUnit)
在CyclicBarrier上进行限时的阻塞等待,直到发生以下情形之一:
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
——isBroken()
获取是否破损标志位broken的值,此值有以下几种情况:
CyclicBarrier初始化时,broken=false,表示屏障未破损。
如果正在等待的线程被中断,则broken=true,表示屏障破损。
如果正在等待的线程超时,则broken=true,表示屏障破损。
如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。
——reset()
使得CyclicBarrier回归初始状态,直观来看它做了两件事:
如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。
3.CyclicBarrier方法练习
3.1.练习一
练习目的:
了解CyclicBarrier(parties)/getParties()/await()/getNumberWaiting()的基本用法。
理解循环的意义。
示例代码:
//构造函数1:初始化-开启屏障的方数
CyclicBarrier barrier0 = new CyclicBarrier(2);
//通过barrier.getParties()获取开启屏障的方数
LOGGER.info("barrier.getParties()获取开启屏障的方数:" + barrier0.getParties());
System.out.println();
//通过barrier.getNumberWaiting()获取正在等待的线程数
LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:初始----" + barrier0.getNumberWaiting());
System.out.println();
new Thread(() -> {
//添加一个等待线程
LOGGER.info("添加第1个等待线程----" + Thread.currentThread().getName());
try {
barrier0.await();
LOGGER.info(Thread.currentThread().getName() + " is running...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(10);
//通过barrier.getNumberWaiting()获取正在等待的线程数
LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:添加第1个等待线程---" + barrier0.getNumberWaiting());
Thread.sleep(10);
System.out.println();
new Thread(() -> {
//添加一个等待线程
LOGGER.info("添加第2个等待线程----" + Thread.currentThread().getName());
try {
barrier0.await();
LOGGER.info(Thread.currentThread().getName() + " is running...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(100);
System.out.println();
//通过barrier.getNumberWaiting()获取正在等待的线程数
LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting());
//已经打开的屏障,再次有线程等待的话,还会重新生效--视为循环
new Thread(() -> {
LOGGER.info("屏障打开之后,再有线程加入等待:" + Thread.currentThread().getName());
try {
//BrokenBarrierException
barrier0.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
System.out.println();
Thread.sleep(10);
LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting());
Thread.sleep(10);
new Thread(() -> {
LOGGER.info("屏障打开之后,再有线程加入等待:" + Thread.currentThread().getName());
try {
//BrokenBarrierException
barrier0.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(10);
LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting());
运行结果:
2018-04-01 13:27:55 INFO - barrier.getParties()获取开启屏障的方数:2
2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:初始----0
2018-04-01 13:27:55 INFO - 添加第1个等待线程----Thread-0
2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:添加第1个等待线程---1
2018-04-01 13:27:55 INFO - 添加第2个等待线程----Thread-1
2018-04-01 13:27:55 INFO - Thread-1 is running...
2018-04-01 13:27:55 INFO - Thread-0 is running...
2018-04-01 13:27:55 INFO - Thread-1 is terminated.
2018-04-01 13:27:55 INFO - Thread-0 is terminated.
2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---0
2018-04-01 13:27:55 INFO - 屏障打开之后,再有线程加入等待:Thread-2
2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---1
2018-04-01 13:27:55 INFO - 屏障打开之后,再有线程加入等待:Thread-3
2018-04-01 13:27:55 INFO - Thread-3 is terminated.
2018-04-01 13:27:55 INFO - Thread-2 is terminated.
2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---0
从运行结果,可以更好的理解循环的意义。
3.2.练习二
练习目的:
熟悉reset()的用法
理解回归初始状态的意义
实例代码:
CyclicBarrier barrier2 = new CyclicBarrier(2);
//如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生
LOGGER.info("如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生");
barrier2.reset();
System.out.println();
Thread.sleep(100);
//如果是一个已经打开一次的CyclicBarrier,则reset()之后,什么也不会发生
ExecutorService executorService2 = Executors.newCachedThreadPool();
//等待两次
for (int i = 0; i < 2; i++) {
executorService2.submit(() -> {
try {
barrier2.await();
LOGGER.info("222屏障已经打开.");
} catch (InterruptedException e) {
//e.printStackTrace();
LOGGER.info("222被中断");
} catch (BrokenBarrierException e) {
//e.printStackTrace();
LOGGER.info("222被重置");
}
});
}
barrier2.reset();
Thread.sleep(100);
System.out.println();
//如果是一个 有线程正在等待的线程,则reset()方法会使正在等待的线程抛出异常
executorService2.submit(() -> {
executorService2.submit(() -> {
try {
barrier2.await();
LOGGER.info("333屏障已经打开.");
} catch (InterruptedException e) {
//e.printStackTrace();
LOGGER.info("333被中断");
} catch (BrokenBarrierException e) {
LOGGER.info("在等待过程中,执行reset()方法,等待的线程抛出BrokenBarrierException异常,并不再等待");
//e.printStackTrace();
}
});
});
Thread.sleep(100);
barrier2.reset();
executorService2.shutdown();
break;
执行结果:
2018-04-01 16:53:12 INFO - 如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生
2018-04-01 16:53:13 INFO - 222屏障已经打开.
2018-04-01 16:53:13 INFO - 222屏障已经打开.
2018-04-01 16:53:13 INFO - 在等待过程中,执行reset()方法,等待的线程跑出BrokenBarrierException异常,并不再等待
3.3.练习三
练习目的:
练习await()/await(timeout,TimeUnit)/isBroken()的使用方法
理解破损标志位broken的状态转换
实例代码:
CyclicBarrier barrier1 = new CyclicBarrier(3);
ExecutorService executorService = Executors.newCachedThreadPool();
//添加一个用await()等待的线程
executorService.submit(() -> {
try {
//等待,除非:1.屏障打开;2.本线程被interrupt;3.其他等待线程被interrupted;4.其他等待线程timeout;5.其他线程调用reset()
barrier1.await();
} catch (InterruptedException e) {
LOGGER.info(Thread.currentThread().getName() + " is interrupted.");
//e.printStackTrace();
} catch (BrokenBarrierException e) {
LOGGER.info(Thread.currentThread().getName() + " is been broken.");
//e.printStackTrace();
}
});
Thread.sleep(10);
LOGGER.info("刚开始,屏障是否破损:" + barrier1.isBroken());
//添加一个等待线程-并超时
executorService.submit(() -> {
try {
//等待1s,除非:1.屏障打开(返回true);2.本线程被interrupt;3.本线程timeout;4.其他等待线程被interrupted;5.其他等待线程timeout;6.其他线程调用reset()
barrier1.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.info(Thread.currentThread().getName() + " is interrupted.");
//e.printStackTrace();
} catch (BrokenBarrierException e) {
LOGGER.info(Thread.currentThread().getName() + " is been reset().");
//e.printStackTrace();
} catch (TimeoutException e) {
LOGGER.info(Thread.currentThread().getName() + " is timeout.");
//e.printStackTrace();
}
});
Thread.sleep(100);
LOGGER.info("当前等待线程数量:" + barrier1.getNumberWaiting());
Thread.sleep(1000);
LOGGER.info("当前等待线程数量:" + barrier1.getNumberWaiting());
LOGGER.info("当等待的线程timeout时,当前屏障是否破损:" + barrier1.isBroken());
LOGGER.info("等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。");
System.out.println();
Thread.sleep(5000);
//通过reset()重置屏障回初始状态,也包括是否破损
barrier1.reset();
LOGGER.info("reset()之后,当前屏障是否破损:" + barrier1.isBroken());
LOGGER.info("reset()之后,当前等待线程数量:" + barrier1.getNumberWaiting());
executorService.shutdown();
运行结果:
2018-04-01 17:01:16 INFO - 刚开始,屏障是否破损:false
2018-04-01 17:01:16 INFO - 当前等待线程数量:2
2018-04-01 17:01:17 INFO - pool-1-thread-1 is been broken.
2018-04-01 17:01:17 INFO - pool-1-thread-2 is timeout.
2018-04-01 17:01:17 INFO - 当前等待线程数量:0
2018-04-01 17:01:17 INFO - 当等待的线程timeout时,当前屏障是否破损:true
2018-04-01 17:01:17 INFO - 等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。
2018-04-01 17:01:22 INFO - reset()之后,当前屏障是否破损:false
2018-04-01 17:01:22 INFO - reset()之后,当前等待线程数量:0
3.4.练习四
练习目的:
练习CyclicBarrier(int parties, Runnable barrierAction)的用法
理解屏障线程的意义
实例代码:
//构造器:设置屏障放开前做的事情
CyclicBarrier barrier3 = new CyclicBarrier(2, () -> {
LOGGER.info("屏障放开,[屏障线程]先运行!");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("[屏障线程]的事情做完了!");
});
for (int i = 0; i < 2; i++) {
new Thread(() -> {
LOGGER.info(Thread.currentThread().getName() + " 等待屏障放开");
try {
barrier3.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + "开始干活...干活结束");
}).start();
}
运行结果:
2018-04-01 17:01:56 INFO - Thread-0 等待屏障放开
2018-04-01 17:01:56 INFO - Thread-1 等待屏障放开
2018-04-01 17:01:56 INFO - 屏障放开,[屏障线程]先运行!
2018-04-01 17:01:58 INFO - [屏障线程]的事情做完了!
2018-04-01 17:01:58 INFO - Thread-1开始干活...干活结束
2018-04-01 17:01:58 INFO - Thread-0开始干活...干活结束
4.应用场景
场景说明:
模拟多线程分组计算
有一个大小为50000的随机数组,用5个线程分别计算10000个元素的和
然后在将计算结果进行合并,得出最后的结果。
重点分析:
用5个线程分别计算:定义一个大小为5的线程池。
计算结果进行合并:定义一个屏障线程,将上面5个线程计算的子结果信息合并。
实例代码:
/**
* <p>CyclicBarrier-循环屏障-模拟多线程计算</p>
*
* @author hanchao 2018/3/29 22:48
**/
public static void main(String[] args) {
//数组大小
int size = 50000;
//定义数组
int[] numbers = new int[size];
//随机初始化数组
for (int i = 0; i < size; i++) {
numbers[i] = RandomUtils.nextInt(100, 1000);
}
//单线程计算结果
System.out.println();
Long sum = 0L;
for (int i = 0; i < size; i++) {
sum += numbers[i];
}
LOGGER.info("单线程计算结果:" + sum);
//多线程计算结果
//定义线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
//定义五个Future去保存子数组计算结果
final int[] results = new int[5];
//定义一个循环屏障,在屏障线程中进行计算结果合并
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
int sums = 0;
for (int i = 0; i < 5; i++) {
sums += results[i];
}
LOGGER.info("多线程计算结果:" + sums);
});
//子数组长度
int length = 10000;
//定义五个线程去计算
for (int i = 0; i < 5; i++) {
//定义子数组
int[] subNumbers = Arrays.copyOfRange(numbers, (i * length), ((i + 1) * length));
//盛放计算结果
int finalI = i;
executorService.submit(() -> {
for (int j = 0; j < subNumbers.length; j++) {
results[finalI] += subNumbers[j];
}
//等待其他线程进行计算
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
//关闭线程池
executorService.shutdown();
}
运行结果:
2018-04-01 17:05:47 INFO - 单线程计算结果:27487277
2018-04-01 17:05:47 INFO - 多线程计算结果:27487277
---------------------
作者:hanchao5272
来源:CSDN
原文:https://blog.csdn.net/hanchao5272/article/details/79779639
版权声明:本文为博主原创文章,转载请附上博文链接!
该类就是等到所有的线程都到达指定的地点之后,在进行下一步的执行过程。
package thread.cyclicbarrier; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3);//申明的等待的线程有3个那么就立即往下执行 //遇到下一个等待点则继续等待只要满足有3个线程就继续往下执行 //无论在哪个等待点 for (int i = 0; i < 3; i++) { final int c = i; pool.execute(new Runnable() { @Override public void run() { try { Thread.sleep((long)(Math.random()*1000)); System.out.println("线程"+Thread.currentThread().getName()+"已经到达集合点1,当前已经有"+cb.getNumberWaiting()+"在等待!" +(cb.getNumberWaiting()+1==3?"人到齐了,一起走吧!":"")); cb.await();//如果有线程提前到达之后,则再次等待后续的线程到达。全部到达之后则继续往下执行 Thread.sleep((long)(Math.random()*1000)); System.out.println("线程"+Thread.currentThread().getName()+"已经到达集合点2,当前已经有"+cb.getNumberWaiting()+"在等待!" +(cb.getNumberWaiting()+1==3?"人到齐了,一起走吧!":"")); cb.await();//如果有线程提前到达之后,则再次等待后续的线程到达。全部到达之后则继续往下执行 Thread.sleep((long)(Math.random()*1000)); System.out.println("线程"+Thread.currentThread().getName()+"已经到达集合点3,当前已经有"+cb.getNumberWaiting()+"在等待!" +(cb.getNumberWaiting()+1==3?"人到齐了,一起走吧!":"")); cb.await();//如果有线程提前到达之后,则再次等待后续的线程到达。全部到达之后则继续往下执行 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { } } }); } pool.shutdown(); } }
可以看到每隔线程到达障碍点之后就会等到其他线程,直到等待线程数量为3个,则继续往下去执行。代码当中有多个障碍点,无论哪种情况只要等待下线程数目达到3个 就会立即执行。
比如:第一个障碍点有1个线程等待,第二障碍点有2个线程等待,总的等待线程数目已经达到了3个。则:那么第一个障碍点线程立即执行,第二个障碍点线程也是立即执行。
注意:所以每次申明等待线程数目,最好是总线程数的约数。这样才不会出现死锁的现象(申明等待线程数为3,总线程数为4,这样容易出现死锁的现象,切记!)
相关推荐
java.CyclicBarrier(解决方案).md
java.CyclicBarrier(处理方案示例).md
Java多线程之CyclicBarrier的使用方法 Java多线程之CyclicBarrier的使用方法是Java多线程编程中的一种同步机制,用于实现多个线程之间的同步协作。CyclicBarrier是Java 5中引入的一种同步机制,用于让多个线程等待...
Java并发实例之CyclicBarrier的使用 CyclicBarrier是Java中的一种多线程并发控制实用工具,和CountDownLatch非常类似,它也可以实现线程间的计数等待,但是它的功能比CountDownLatch更加复杂且强大。CyclicBarrier...
Java中的`CyclicBarrier`是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达一个公共的屏障点。这个屏障点就像一个交通信号灯,只有当所有的车辆(线程)都到达了交叉口,信号灯才会变为绿灯,允许它们...
在Java多线程编程中,`CyclicBarrier`是一个非常重要的同步工具类,它允许一组线程等待其他线程到达某个屏障点后再一起继续执行。这个屏障点就是我们所说的“循环栅栏”,顾名思义,它就像一个旋转门,所有线程必须...
项目中碰到的,记录一下
`await()`方法是`CyclicBarrier`的核心方法之一。每个参与线程需要调用`await()`方法,表示自己已经到达了屏障点。一旦所有参与线程都调用了`await()`,那么所有线程都会被释放,可以继续执行后续任务。 下面是一个...
下面是一个使用CyclicBarrier的典型例子,展示了一个并行分解问题的解决过程: ```java class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable...
java并发编程中CountDownLatch和CyclicBarrier的使用借鉴 java并发编程中CountDownLatch和CyclicBarrier是两个非常重要的线程控制和调度工具,经常被用于解决多线程程序设计中的线程等待问题。本文将对...
Java并发系列之CyclicBarrier源码分析 CyclicBarrier是Java并发系列中的一种同步工具类,用于实现一组线程相互等待。当所有线程都到达某个屏障点后,再进行后续的操作。下面是对CyclicBarrier源码的详细分析。 ...
在Java并发编程中,CountDownLatch和CyclicBarrier是两种非常重要的同步工具,用于协调多个线程之间的交互。它们都属于java.util.concurrent包下的类,为多线程编程提供了强大的支持。 **CountDownLatch** 是一个...
本文将详细介绍CountDownLatch和CyclicBarrier的工作原理、使用场景以及如何在实际项目中应用它们。 CountDownLatch和CyclicBarrier是Java并发编程中两个非常有用的同步工具,它们在不同的场景下有着各自的优势。...
Java 并发编程专题(九)----(JUC)浅析 CyclicBarrier CyclicBarrier 是 Java 并发编程中的一个同步辅助工具,它允许一组线程全部等待彼此到达公共屏障点。它的字面意思是可循环使用的屏障,用于让一组线程到达一...
在Java并发编程中,CountDownLatch和CyclicBarrier是两种非常重要的同步工具类,它们用于协调多个线程间的协作。这两个工具都是在`java.util.concurrent`包下,是Java并发库的重要组成部分。 **CountDownLatch** ...
在Java并发编程中,CountDownLatch和CyclicBarrier都是用于协调多线程间同步的重要工具,它们可以帮助开发者在特定条件满足时启动或者结束线程的执行。本文将详细探讨这两个类的内部实现机制以及它们在实际应用场景...
Java中的`CyclicBarrier`是用于多线程同步的一个强大工具,主要应用于一组线程需要在继续执行之前等待其他线程到达某个特定点的情况。它允许开发者定义一个屏障点,只有当所有参与的线程都到达这个屏障点时,它们才...