`
woodding2008
  • 浏览: 290774 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

并发同步工具之Phaser

 
阅读更多

 

Phaser是一个可重用同步屏障,类似于CyclicBarrier、CountDownLatch,但是使用起来更灵活。

A reusable synchronization barrier, similar in functionality to
 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
 * but supporting more flexible usage.
<p><b>Registration.</b> Unlike the case for other barriers, the
 * number of parties <em>registered</em> to synchronize on a phaser
 * may vary over time.  Tasks may be registered at any time (using
 * methods {@link #register}, {@link #bulkRegister}, or forms of
 * constructors establishing initial numbers of parties), and
 * optionally deregistered upon any arrival (using {@link
 * #arriveAndDeregister}).  As is the case with most basic
 * synchronization constructs, registration and deregistration affect
 * only internal counts; they do not establish any further internal
 * bookkeeping, so tasks cannot query whether they are registered.
 * (However, you can introduce such bookkeeping by subclassing this
 * class.)

 

 

 

Phaser主要接口如下

  • arriveAndAwaitAdvance() 当前线程当前阶段执行完毕,等待其它线程完成当前阶段。如果当前线程是该阶段最后一个未到达的,则该方法直接返回下一个阶段的序号(阶段序号从0开始),同时其它线程的该方法也返回下一个阶段的序号。
  • arriveAndDeregister() 该方法立即返回下一阶段的序号,并且其它线程需要等待的个数减一,并且把当前线程从之后需要等待的成员中移除。如果该Phaser是另外一个Phaser 的子Phaser(层次化Phaser会在后文中讲到),并且该操作导致当前Phaser的成员数为0,则该操作也会将当前Phaser从其父 Phaser中移除。
  • arrive() 该方法不作任何等待,直接返回下一阶段的序号。
  • awaitAdvance(int phase) 该方法等待某一阶段执行完毕。如果当前阶段不等于指定的阶段或者该Phaser已经被终止,则立即返回。该阶段数一般由arrive()方法或者arriveAndDeregister()方法返回。返回下一阶段的序号,或者返回参数指定的值(如果该参数为负数),或者直接返回当前阶段序号(如果当前Phaser已经被终止)。
  • awaitAdvanceInterruptibly(int phase) 效果与awaitAdvance(int phase)相当,唯一的不同在于若该线程在该方法等待时被中断,则该方法抛出InterruptedException
  • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果与awaitAdvanceInterruptibly(int phase)相当,区别在于如果超时则抛出TimeoutException
  • bulkRegister(int parties) 注册多个party。如果当前phaser已经被终止,则该方法无效,并返回负数。如果调用该方法时,onAdvance方法正在执行,则该方法等待其执行完毕。如果该Phaser有父Phaser则指定的party数大于0,且之前该Phaser的party数为0,那么该Phaser会被注册到其父Phaser中。
  • forceTermination() 强制让该Phaser进入终止状态。已经注册的party数不受影响。如果该Phaser有子Phaser,则其所有的子Phaser均进入终止状态。如果该Phaser已经处于终止状态,该方法调用不造成任何影响。

玩具代码

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

//Phaser类提供的同步线程机制是在每个步骤的末端, 所以全部的线程都完成第一步后,才能开始执行第二步。

public class FileSearch implements Runnable {

	private String initPath;
	private String end;
	private List<String> results;
	private Phaser phaser;
	
	public FileSearch(String initPath,String end,Phaser phaser){
		this.initPath = initPath;
		this.end = end;
		this.phaser = phaser;
		results = new ArrayList<>();
	}
	
	//根据目录查找文件
	private void directoryProcess(File file){
		File list[] = file.listFiles();
		if(list != null){
			for (int i = 0; i < list.length; i++) {
				if(list[i].isDirectory()){
					directoryProcess(list[i]);
				}else{
					fileProcess(list[i]);
				}
			}
		}
	}
	
	//记录文件
	private void fileProcess(File file){
		if(file.getName().endsWith(end)){
			results.add(file.getAbsolutePath());
		}
	}
	
	//过滤超过24小时的数据
	private void filterResults(){
		List<String> newResults = new ArrayList<>();
		long actualDate = new Date().getTime();
		for(int i=0;i<results.size();i++){
			File file = new File(results.get(i));
			long fileDate  = file.lastModified();
			if(actualDate-fileDate < TimeUnit.MICROSECONDS.convert(1, TimeUnit.DAYS)){
				newResults.add(results.get(i));
			}
		}
		results = newResults;
	}
	
	private boolean checkResults(){
		//检查文件列表
		if(results.isEmpty()){
			System.out.printf("%s: Phase %d: 0 results.\n", 
					Thread.currentThread().getName(),phaser.getPhase());
			System.out.printf("%s: Phase %d: 0 End.\n", 
					Thread.currentThread().getName(),phaser.getPhase());
			//通知phaser,当前工作已经完成,并且在不参与下一个阶段的工作。
			phaser.arriveAndDeregister();
			return false;
		}else{
			System.out.printf("%s: Phase %d: 0 results.\n", 
					Thread.currentThread().getName(),phaser.getPhase());

			//通知phaser,当前工作已经完成,并且在等待其他线程完成。
			phaser.arriveAndAwaitAdvance();
			return true;
		}
	}
	
	private void showInfo(){
		for (int i = 0; i < results.size(); i++) {
			File file = new File(results.get(i));
			System.out.printf("%s: %s\n", Thread.currentThread().getName(),
							file.getAbsolutePath());
		}
		phaser.arriveAndAwaitAdvance();
	}
	
	
	@Override
	public void run() {
		
		phaser.arriveAndAwaitAdvance();
		System.out.printf("%s:Starting.\n", Thread.currentThread().getName());
		File file = new File(initPath);
		if(file.isDirectory()){
			directoryProcess(file);
		}
		
		if(!checkResults()){
			return;
		}
		
		filterResults();
		if(!checkResults()){
			return;
		}
		
		showInfo();
		phaser.arriveAndDeregister();
		System.out.printf("%s:Work completed.\n", Thread.currentThread().getName		
	}

	public static void main(String[] args) {

		Phaser phaser = new Phaser(3);
		FileSearch system = new FileSearch("C:\\Windows","log",phaser);
		FileSearch apps = new FileSearch("C:\\Program Files","log",phaser);
		FileSearch documents = new FileSearch("C:\\Documents And Settings","log",phaser);
		
		Thread systemThread = new Thread(system,"system");
		systemThread.start();
		
		Thread appsThread = new Thread(apps,"apps");
		appsThread.start();
		
		Thread documentsThread = new Thread(documents,"documents");
		documentsThread.start();
				
		//等待三个线程结束
		try{
			systemThread.join();
			appsThread.join();
			documentsThread.join();
		}catch(InterruptedException e){
			e.printStackTrace();
		}
		
		System.out.printf("Terminated:%s", phaser.isTerminated());
	}

}

 

分享到:
评论

相关推荐

    Java并发工具包

    包括 TimeUnit、ThreadLocalRandom 和 Phaser 等,这些工具类提供了在并发编程中常用的实用功能,如时间单位转换、随机数生成和同步阶段控制。 总结来说,Java并发工具包提供了丰富的工具和机制,帮助开发者构建...

    CountDownLatch、Semaphore等4大并发工具类详解

    Phaser 是一个更加灵活的同步工具类,可以用来实现复杂的同步逻辑。Phaser 可以注册多个 partiecipants,每个 partiicipant 可以在 Phaser 中注册,Phaser 会等待所有 partiicipants 都执行完毕后,再执行下一步操作...

    第20章 Part4 并发工具同步器.pdf

    ### 第20章 Part4 并发工具同步器 #### 主要内容概述 本章节主要介绍了Java中的并发工具,特别是同步器的相关概念和技术。这部分内容是构建高效、可靠的并发应用程序的基础,尤其对于需要处理大量并发任务的应用...

    Java并发工具类示例

    `Phaser`是Java 7引入的一个更为灵活的同步工具,可以看作是`CyclicBarrier`的增强版。`Demo_Phaser.java`展示了如何初始化Phaser,注册线程并在每个阶段调用`arriveAndAwaitAdvance()`。Phaser可以处理动态的线程...

    JCToolsJDK中缺失的并发工具

    - `Phaser`:JDK 7引入的先进同步工具,可以替代`CountDownLatch`和`CyclicBarrier`,提供更复杂的协调功能。 5. **栅栏(Fences)和内存模型**: - `VolatileReference`和`AtomicReference`的增强版,如`...

    Java并发工具包对并发编程的优化.zip

    Java并发工具包是Java平台中一个非常重要的组成部分,它为开发者提供了高级的并发和多线程编程工具,极大地简化了并发编程的复杂性。在Java 5及更高版本中,Java并发工具包(java.util.concurrent)引入了一系列新的...

    Java并发编程实践高清pdf及源码

    5. **并发工具类**:如`CountDownLatch`、`CyclicBarrier`、`Semaphore`和`Phaser`等,它们为多线程间的协作提供了方便。 6. **活锁与死锁**:活锁是两个或更多线程相互等待对方释放资源,导致无限期等待;死锁则是...

    33 分阶段执行你的任务-学习使用Phaser运行多阶段任务.pdf

    在编写并发程序时,选择合适的同步工具至关重要。Phaser 提供了一种高效的方式来管理多阶段的任务执行,尤其适用于那些需要在不同阶段协调多个线程的场景。通过合理利用 Phaser 的 API,开发者可以构建出复杂而灵活...

    汪文君高并发编程实战视频资源下载.txt

     高并发编程第三阶段35讲 Phaser工具的实战案例使用第一部分_.mp4  高并发编程第三阶段36讲 Phaser工具的实战案例使用第二部分_.mp4  高并发编程第三阶段37讲 Phaser工具的实战案例使用第三部分_.mp4  高并发...

    JAVA并发编程经典书籍

    4. **并发工具类**(04章):介绍Java并发包(java.util.concurrent)中的各种工具类,如ExecutorService、Future、Semaphore、CountDownLatch、CyclicBarrier和ThreadPoolExecutor等,它们为并发编程提供了强大的...

    java 7并发编程实战手册 源码

    总的来说,《Java 7并发编程实战手册》的源码提供了丰富的实例,涵盖了Java 7在并发编程中的核心知识点,包括线程管理、同步工具、并发控制、异步计算和线程安全集合等,为开发者提供了深入了解和实践Java并发编程的...

    java并发编程经典书籍(英文版)

    - **并发工具类的高级用法**:Executor框架、Future、Callable、CyclicBarrier、Phaser等,帮助构建高效的并发程序。 - **并发设计原则**:如最小化锁的使用,正确处理中断,避免活跃性问题等。 - **并发测试**:...

    Java并发编程实践.rar

    详细讲解了Java中多种同步机制,包括wait()、notify()和notifyAll()方法,Lock接口以及相关的ReentrantLock、Condition等高级同步工具。 第七章:并发工具 介绍其他并发工具,如Semaphore(信号量)、CyclicBarrier...

    Java并发编程实践-电子书1-9章pdf

    2. **第二章:同步机制** - 这部分可能讨论了Java中的同步工具,如synchronized关键字、wait()和notify()方法,以及如何避免数据竞争和死锁问题。此外,还可能涉及线程安全的数据结构,如Vector和Collections....

    Java并发编程全景图.pdf

    Java并发工具类扩展了基本的并发功能,例如CountDownLatch、CyclicBarrier、Semaphore和Phaser提供了不同场景下的同步支持。Exchanger用于两个线程之间交换数据。 12. 硬件和操作系统支持 Java并发编程的成功在很大...

    java并发编程实战

    - `Phaser`:介绍了这个新的同步工具,可实现复杂的多阶段同步。 7. **并发性能调优** - 线程池配置:讨论了如何根据应用需求优化线程池的大小和配置。 - 死锁、活锁和饥饿:分析了这些并发问题的原因和解决策略...

    汪文君高并发编程实战视频资源全集

     高并发编程第三阶段35讲 Phaser工具的实战案例使用第一部分_.mp4  高并发编程第三阶段36讲 Phaser工具的实战案例使用第二部分_.mp4  高并发编程第三阶段37讲 Phaser工具的实战案例使用第三部分_.mp4  高并发...

    java并发编程实践中文版和英文版

    4. **并发工具类**:Java并发库提供了丰富的工具类,如 Executors 框架、Future 和 Callable 接口、Phaser 和 CyclicBarrier 等。这些工具可以帮助开发者构建更高效、更易于管理的并发程序。 5. **并发设计模式**:...

    java 高并发解决 思路

    4. **并发工具类** - **CountDownLatch**:一次性计数器,用于多线程同步。 - **CyclicBarrier/Phaser**:循环栅栏,允许一组线程等待其他线程到达某个点后继续执行。 - **Semaphore**:信号量,用于限制同时访问...

Global site tag (gtag.js) - Google Analytics