- 浏览: 71072 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
Alex_Cheung:
对了,第二个没有提取码,请知悉。
一大波视频分享 -
Alex_Cheung:
谢谢分享。
一大波视频分享 -
Jiy:
很详细,谢谢分享
java并发之同步辅助类Phaser -
walle1027:
非常不错,学习了。
java并发之同步辅助类Phaser -
huangjinjin520:
somefuture 写道除了单词写错了 其他挺好的已更正
dubbo注解使用详解
Phaser含义:
更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。
函数:
arriveAndAwaitAdvance():类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行。
arriveAndDeregister():把执行到此的线程从Phaser中注销掉。
isTerminated():判断Phaser是否终止。
register():将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程。
forceTermination():强制Phaser进入终止态。
... ...
例子
使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:
1、在执行的文件夹及其子文件夹中获取扩展名为.log的文件
2、对每一步的结果进行过滤,删除修改时间超过24小时的文件
3、将结果打印到控制台
在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)
文件查找类:
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class FileSearch implements Runnable {
private String initPath;
private String end;
private List<String> results;
private Phaser phaser;
public FileSearch(String initPath, String end, Phaser phaser) {
this.initPath = initPath;
this.end = end;
this.phaser=phaser;
results=new ArrayList<>();
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了
System.out.printf("%s: Starting.\n",Thread.currentThread().getName());
// 1st Phase: 查找文件
File file = new File(initPath);
if (file.isDirectory()) {
directoryProcess(file);
}
// 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行
if (!checkResults()){
return;
}
// 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集
filterResults();
// 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行
if (!checkResults()){
return;
}
// 3rd Phase: 显示结果
showInfo();
phaser.arriveAndDeregister();//任务完成,注销掉所有的线程
System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
}
private void showInfo() {
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
}
// Waits for the end of all the FileSearch threads that are registered in the phaser
phaser.arriveAndAwaitAdvance();
}
private boolean checkResults() {
if (results.isEmpty()) {
System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
//结果为空,Phaser完成并把该线程从Phaser中移除掉
phaser.arriveAndDeregister();
return false;
} else {
// 等待所有线程查找完成
System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
phaser.arriveAndAwaitAdvance();
return true;
}
}
private void filterResults() {
List<String> newResults=new ArrayList<>();
long actualDate=new Date().getTime();
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
long fileDate=file.lastModified();
if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
newResults.add(results.get(i));
}
}
results=newResults;
}
private void directoryProcess(File file) {
// Get the content of the directory
File list[] = file.listFiles();
if (list != null) {
for (int i = 0; i < list.length; i++) {
if (list[i].isDirectory()) {
// If is a directory, process it
directoryProcess(list[i]);
} else {
// If is a file, process it
fileProcess(list[i]);
}
}
}
}
private void fileProcess(File file) {
if (file.getName().endsWith(end)) {
results.add(file.getAbsolutePath());
}
}
}
测试主类:
import java.util.concurrent.Phaser;
public class PhaserMain {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);
Thread systemThread = new Thread(system, "System");
systemThread.start();
Thread appsThread = new Thread(apps, "Apps");
appsThread.start();
Thread documentsThread = new Thread(documents, "Documents");
documentsThread.start();
try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Terminated: %s\n", phaser.isTerminated());
}
}
海量视频获取
更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。
函数:
arriveAndAwaitAdvance():类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行。
arriveAndDeregister():把执行到此的线程从Phaser中注销掉。
isTerminated():判断Phaser是否终止。
register():将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程。
forceTermination():强制Phaser进入终止态。
... ...
例子
使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:
1、在执行的文件夹及其子文件夹中获取扩展名为.log的文件
2、对每一步的结果进行过滤,删除修改时间超过24小时的文件
3、将结果打印到控制台
在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)
文件查找类:
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class FileSearch implements Runnable {
private String initPath;
private String end;
private List<String> results;
private Phaser phaser;
public FileSearch(String initPath, String end, Phaser phaser) {
this.initPath = initPath;
this.end = end;
this.phaser=phaser;
results=new ArrayList<>();
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了
System.out.printf("%s: Starting.\n",Thread.currentThread().getName());
// 1st Phase: 查找文件
File file = new File(initPath);
if (file.isDirectory()) {
directoryProcess(file);
}
// 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行
if (!checkResults()){
return;
}
// 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集
filterResults();
// 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行
if (!checkResults()){
return;
}
// 3rd Phase: 显示结果
showInfo();
phaser.arriveAndDeregister();//任务完成,注销掉所有的线程
System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
}
private void showInfo() {
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
}
// Waits for the end of all the FileSearch threads that are registered in the phaser
phaser.arriveAndAwaitAdvance();
}
private boolean checkResults() {
if (results.isEmpty()) {
System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
//结果为空,Phaser完成并把该线程从Phaser中移除掉
phaser.arriveAndDeregister();
return false;
} else {
// 等待所有线程查找完成
System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
phaser.arriveAndAwaitAdvance();
return true;
}
}
private void filterResults() {
List<String> newResults=new ArrayList<>();
long actualDate=new Date().getTime();
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
long fileDate=file.lastModified();
if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
newResults.add(results.get(i));
}
}
results=newResults;
}
private void directoryProcess(File file) {
// Get the content of the directory
File list[] = file.listFiles();
if (list != null) {
for (int i = 0; i < list.length; i++) {
if (list[i].isDirectory()) {
// If is a directory, process it
directoryProcess(list[i]);
} else {
// If is a file, process it
fileProcess(list[i]);
}
}
}
}
private void fileProcess(File file) {
if (file.getName().endsWith(end)) {
results.add(file.getAbsolutePath());
}
}
}
测试主类:
import java.util.concurrent.Phaser;
public class PhaserMain {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);
Thread systemThread = new Thread(system, "System");
systemThread.start();
Thread appsThread = new Thread(apps, "Apps");
appsThread.start();
Thread documentsThread = new Thread(documents, "Documents");
documentsThread.start();
try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Terminated: %s\n", phaser.isTerminated());
}
}
海量视频获取
发表评论
-
一大波视频分享
2018-06-09 09:36 11361.ps 链接: https://pan.baidu ... -
利用Sharding-Jdbc实现分表
2018-05-24 22:32 3771你们团队使用SpringMVC+Spr ... -
MINA原理详解
2018-05-19 13:51 14841. 通过SocketConnector同服务器端建立连接 ... -
最近有人说我欺骗消费者,今天来一波视频分享
2018-05-12 21:00 1234最近有人说我欺骗消费者,今天来一波视频分享 dubbo入门 ... -
SVN多版本库环境的搭建
2018-05-02 21:00 1192一、 1、启动SVN sudo svn ... -
前端 Java Python等资源合集大放送
2018-04-21 22:11 694如果需要学习视频,欢 ... -
Nginx会话保持之nginx-sticky-module模块
2018-04-16 20:34 1963在使用负载均衡的时候会遇到会话保持的问题,常用的方法有: 1. ... -
dubbo源码学习(四):暴露服务的过程
2018-04-14 11:38 977dubbo采用的nio异步的通信,通信协议默认为 netty, ... -
dubbo源码学习(四)初始化过程细节:解析服务
2018-04-12 20:32 610今天将真正去看dubbo内部的实现过程,看dubbo的源码前我 ... -
dubbo源码学习(二) : spring 自定义标签
2018-04-09 20:29 630做dubbo的配置时很容易发现,dubbo有一套自己的标签,提 ... -
Dubbo多注册中心和Zookeeper服务的迁移
2018-04-06 08:58 1501一、Dubbo多注册中心 1、 应用场景 例如阿里有些服务 ... -
dubbo源码学习一:基础知识及使用的相关技术
2018-04-05 20:10 687Dubbo是Alibaba开源的分布式服务框架,它最大的特点是 ... -
worker模式
2018-03-29 20:16 633今天来学学,大家也好对线程池有一个更好的理解。 public ... -
线程各种状态转移分析
2018-03-28 22:13 896线程在它的生命周期 ... -
生产者-消费者模式实现
2018-03-26 22:45 1157生产者是指:生产数据的线程 消费者是指:使用数据的线程 生产者 ... -
对find xargs grep和管道的深入理解
2018-03-24 22:14 766问题: 相信大家都知道在目录中搜索含有固定字符串文件的命令: ... -
java并发之同步辅助类CyclicBarrier
2018-03-18 20:13 831CyclicBarrier含义: 栅栏允许两个或者多个线程在 ... -
java并发之同步辅助类semaphore
2018-03-14 21:24 778semaphore(seməˌfôr)含义: 信号量就是可以 ... -
MySQL常用命令
2018-03-13 22:09 7731、查看数据库状态 及 ... -
Tomcat 集群 文件上传下载的共享问题 NFS配置
2018-03-12 21:50 658Tomcat 集群时上传文件时如何使得多部tomcat中的文件 ...
相关推荐
Java并发工具包中包含了一些同步辅助类,如Semaphore(信号量)、CyclicBarrier(循环屏障)和CountDownLatch(计数器门锁)。它们帮助协调多线程间的协作,控制线程的并发访问数量或等待特定条件。 六、FutureTask...
本文将详细解析Java并发工具类,并通过示例代码介绍`CountDownLatch`、`CyclicBarrier`、`Phaser`、`Semaphore`和`ThreadLocal`的用法。 1. **CountDownLatch** `CountDownLatch`是一个计数器,通常用于等待多个...
`Phaser`是更为灵活的同步辅助类,可以实现类似功能并支持自定义回退行为。 另外,Java 7对`ExecutorService`接口进行了扩展,引入了`Executors`工具类,方便创建不同类型的线程池,如`FixedThreadPool`、`...
Java 并发工具类是 Java 并发编程的核心组件之一,提供了多种同步结构和并发容器,帮助开发者创建高效、可靠的并发程序。本文将详细介绍 Java 并发工具类的四大类:CountDownLatch、Semaphore、CyclicBarrier 和 ...
Java中的同步辅助类是用于控制并发执行和线程间协作的重要工具,它们提供了比`synchronized`关键字和`wait/notify`机制更为高级和灵活的同步机制。以下将详细介绍五种常用的同步辅助类: 1. **Semaphore(信号量)*...
它是一个一次性使用的同步辅助工具,可以通过`await()`方法阻塞当前线程,直到所有计数器减到0。 - **CyclicBarrier**:让一组线程在预定义的执行点进行等待,当所有线程到达这个点时,它们会被同时释放继续执行。 -...
8. ** CountDownLatch 计数器门**:它是一个一次性使用的同步辅助类,用于等待一组操作完成。计数器从正整数开始,每当一个线程完成其任务时,计数器减1,当计数器为零时,所有等待的线程都被释放。 9. ** ...
12. **Java并发工具类**:如`Future`和`CompletableFuture`用于异步编程,`CyclicBarrier`和`Phaser`用于协调多线程的执行。 13. **响应式编程**:如Reactor和Vavr库,利用函数式编程和流的概念,提供了一种处理高...
此外,Phaser、CyclicBarrier和CountDownLatch等同步辅助类也是线程间协调的重要工具。 线程池是Java并发编程中不可或缺的一部分。Executor框架提供了ThreadPoolExecutor,它允许我们预先创建一定数量的线程,管理...
Java并发工具包(java.util.concurrent)是Java编程中不可或缺的一部分,它为多线程环境提供了高效、安全且易用的工具。这个包包含了各种类和接口,帮助开发者编写高效的并发程序,避免了直接操作线程所带来的复杂性...
5. Phaser:Java并发库提供的一个同步辅助类,允许多个线程进行协作,完成一系列阶段性的任务。 6. CountDownLatch:计数器,初始化为一个整数值,线程调用countDown()方法时计数值减1,所有线程都在await()处等待...
Java并发编程中的JUC(Java Util Concurrent)库是开发者处理多线程问题的关键工具,尤其在多核处理器的环境中,其重要性不言而喻。JUC库包含了一系列的类和接口,帮助开发者构建高效、稳定的并发程序。下面将详细...
2. **CountDownLatch**:这个同步辅助类用于协调多个线程的启动或结束,它是一个计数器,当计数到零时,所有等待的线程才能继续执行。`java.util.concurrent.CountDownLatch`常用于实现并发任务间的依赖关系。 3. *...
3. **CyclicBarrier和Phaser**:这两个是同步辅助类,用于协调多个线程间的操作。CyclicBarrier允许一组线程等待彼此到达某个点后才继续执行,而Phaser则是在Java 6中引入的,提供更灵活的同步控制。 4. **...
8. ** Phaser**:Phaser是一个更灵活的同步辅助类,可以替代CyclicBarrier或Semaphore,它允许多个阶段的协调,并且可以动态调整参与者的数量。 在给定的文件中,`pom.xml`可能是Maven项目的配置文件,它包含了项目...
- **`java.util.concurrent`包**:包含高级并发工具,如`ThreadLocal`用于线程局部变量,`ReentrantLock`可重入锁,以及`CyclicBarrier`和`Phaser`等同步辅助类。 4. **网络编程**: - **`java.net`包**:提供了...
CountDownLatch是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。它是一个一次性使用的计数器,当计数值为0时,所有等待的线程才能继续执行。CyclicBarrier则允许一组线程等待彼此到达屏障点后一起继续...
- **CyclicBarrier和CountDownLatch**:同步辅助类,用于协调多个线程之间的操作。 4. **线程间的通信**: - **wait()和notify()/notifyAll()**:在同步块中使用,使线程进入等待状态或唤醒等待的线程。 - **...