Java 7 concurrency API 提供了Phaser类来执行那些能够被划分为不同阶段执行的任务。如果你有一个流程能够被清晰地分为几个步骤,这些步骤中,第一个步骤必须完成后才能开始第二个步骤,以此类推,那么你可以使用Phaser这个类来实现,Phaser类主要包括以下特性:
- Phaser类必须知道它所要控制的任务数。Java称它为注册参与者。一个参与者能够在任何时候向phaser注册。
- 当任务完成一个阶段时,必须通知phaser。Phaser 会让这个任务进入睡眠直到所有参与者都完成了那个阶段。
- 在 phaser 内部使用一个整型字段来保存已经发生的阶段数
- 一个参与者能在任何时间离开phaser的控制。Java称它为注销参与者。
- 你能够控制一个phaser的终止。如果一个phaser被终止了,新的参与者将不被接受,任务之间也不会同步。
- 你可以调用一些方法来获取phaser的状态以及参与者的数目。
参与者的注册和注销
根据以上所说,一个phaser必须知道它所控制的任务数。它必须知道有多少个不同线程来执行分阶段的算法以同时控制phase的改变。
Java称这一过程为参与者注册。通常情况下,参与者在运行初期注册自己,但是参与者也能在任何时候注册自己。
你可以使用以下不同方法来注册参与者:
- 当你创建 Phaser 对象时,Phaser 类提供了四种不同的构造函数。其中这两个最常用:Phaser() 创建一个没有参与者的 phaser 以及 Phaser(int parties) 创建有指定数量参与者的 phaser
- 直接调用 Phaser 的这两个方法:bulkRegister(int parties) 一次注册指定数量的新的参与者;register() 注册一个新的参与者
当一个 phaser 控制的任务完成运行时,它必须从 phaser 注销。如果你没有注销,这个任务将在下一个阶段一直处于等待状态。你可以使用 arriveAndDeregister() 方法注销一个参与者。使用这个方法来指示 phaser 该任务已经完成本阶段的运行,而且不会参与下一阶段。
同步阶段的更改
Phaser 的主要任务是允许用并行方式实现能够被清楚划分为不同运行阶段的算法。没有一个任务能够在所有任务完成本阶段前进入下一个阶段。Phaser 类提供了三个方法来指示任务完成了一个阶段:arrive(),arriveAndDeregister() 以及 arriveAndAwaitAdvance()。如果有一个任务没有调用任何一个这些方法,那么剩下的任务将被 phaser 无限期阻塞。使用以下的方法使任务进入下一个阶段:
- arriveAndAwaitAdvance():此方法通知 phaser 它已完成本阶段并要参与到下一个阶段。Phaser 将阻塞这个任务直到所有参与的任务都调用了以上三个之一的同步方法
- awaitAdvance (int phase):在指定的哪个阶段等待进入下一个阶段,如果当前阶段和传入的阶段不相等或者 phaser 已经被终止,则直接返回
其它功能
当所有参与的任务完成了一个阶段的运行并在它们进入下一个阶段之前, Phaser类会执行 onAdvance() 方法。此方法接收以下两个参数:
- phase:指已经完成的阶段的号码。第一个阶段的号码是0。
- registeredParties:指参与的任务的数目。
如果你想要在两个阶段之间运行一些代码,例如对一些数据进行排序或者变形,你可以通过继承 Phaser 类并重载这个方法来实现自定义的 phaser。
一个 phaser 可以有以下两种状态:
- Active:当 phaser 创建并注册了新的参与者后直到它结束之前都处于这个状态,处于该状态的 phaser 可以接受新的参与者。
- Termination:当 phaser 的 onAdvance() 方法返回 true 值时,phaser 进入这个阶段。默认情况下,该方法在所有的参与者都已经注销了时返回 true 值
另外, Phaser 类提供一些获取 phaser 状态和参与者信息的方法:
- getRegisteredParties():该方法返回一个 phaser 的参与者数量
- getPhase():这个方法返回当前阶段的号码
- getArrivedParties():这个方法返回当前阶段已经结束的参与者数量
- getUnarrivedParties():这个方法返回当前阶段尚未完成的参与者数量
- isTerminated():当 phaser 处于终止阶段时返回 true,否则返回 false
Phaser 使用实例
public class PhaserDemo { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3); // 初始阶段我们只注册了一个参与者,即主线程 Phaser ph = new Phaser(1); System.out.println("Initial phase " + ph.getPhase() + " started."); Runnable readThread1 = new FileReaderThread("ReadThread1", ph, "stock1.xls"); Runnable readThread2 = new FileReaderThread("ReadThread2", ph, "stock2.xls"); Runnable readThread3 = new FileReaderThread("ReadThread3", ph, "stock3.xls"); executor.submit(readThread1); executor.submit(readThread2); executor.submit(readThread3); // 主线程等待其它线程并参与到下一个阶段 ph.arriveAndAwaitAdvance(); // 这里开始一个新的阶段 System.out.println("New phase " + ph.getPhase() + " started"); Runnable processThread1 = new FileProcessorThread("ProcessThread1", ph); Runnable processThread2 = new FileProcessorThread("ProcessThread2", ph); executor.submit(processThread1); executor.submit(processThread2); // 主线程进入等待直到其它所有线程均到达 ph.arriveAndDeregister(); try { executor.awaitTermination(4, TimeUnit.SECONDS); executor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } } } class FileReaderThread implements Runnable { private String threadName; private Phaser ph; private String fileName; FileReaderThread(String threadName, Phaser ph, String fileName) { this.threadName = threadName; this.ph = ph; this.fileName = fileName; // 在构造函数中向 phaser 注册自己 ph.register(); } @Override public void run() { System.out.println("***Thread " + threadName + "*** starts reading file " + fileName + " in phase " + ph.getPhase()); try { Thread.sleep(20); System.out.println("***Thread " + threadName + "*** finishes reading file " + fileName + " in phase " + ph.getPhase()); // 线程完成本阶段任务,不等待其它线程完成而向phaser注销自己 // 注销的影响是 phaser 的参与者数量减一 ph.arriveAndDeregister(); } catch (InterruptedException e) { e.printStackTrace(); } } } class FileProcessorThread implements Runnable { private String threadName; private Phaser ph; FileProcessorThread(String threadName, Phaser ph) { this.threadName = threadName; this.ph = ph; ph.register(); } @Override public void run() { System.out.println("***Thread " + threadName + "*** starts processing data in phase " + ph.getPhase()); try { Thread.sleep(30); System.out.println("***Thread " + threadName + "*** finish processing data in phase " + ph.getPhase()); // 同上面一样,完成本阶段后向phaser注销自己 ph.arriveAndDeregister(); } catch (InterruptedException e) { e.printStackTrace(); } } }
相关推荐
- 线程间通信用于协调不同线程间的操作,如使用`BlockingQueue`、` Phaser`、`CountDownLatch`等工具类。 6. **线程状态** - 线程有多种状态,包括运行、新建、就绪、阻塞和死亡。线程可以通过`Thread.sleep()`、...
5. **第五章:原子变量** - 这一章可能深入讲解了Java的Atomic类,如AtomicInteger、AtomicLong等,它们提供了一种无锁编程的方法,可以实现高效且线程安全的更新操作。 6. **第六章:并发集合** - 讲解了...
在本压缩包“chapter25.rar”中,包含的是《Java语言程序设计(进阶篇)》一书的第25章课后习题的源代码。这是一份宝贵的资源,对于正在学习Java编程,尤其是深入阶段的学生来说,是提高编程技能和理解Java高级特性...
5. **CountDownLatch**: - `java.util.concurrent.CountDownLatch`允许一个线程或多个线程等待其他线程完成操作。计数器一旦达到0,所有等待的线程就可以继续执行。 6. ** Phaser**: - `java.util.concurrent....
5. **第五章:并发工具** - 介绍Java并发API中的工具类,如Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser,以及它们在并发控制中的应用。 6. **第六章:并发集合** - 讨论java.util.concurrent包下...
第五章:线程池 分析ExecutorService和ThreadPoolExecutor,解释如何配置和使用线程池,以及线程池的生命周期管理,以提高系统资源利用率和性能。 第六章:同步机制 详细讲解了Java中多种同步机制,包括wait()、...
### 第20章 Part4 并发工具同步器 #### 主要内容概述 本章节主要介绍了Java中的并发工具,特别是同步器的相关概念和技术。这部分内容是构建高效、可靠的并发应用程序的基础,尤其对于需要处理大量并发任务的应用...
对于【第9章:枚举&注解(day14)】的学习资料,可能是某个课程或教程的一部分,涵盖了这两个主题的深入讲解和实践示例。学习者可以通过这份资料深入了解枚举和注解的用法,以及如何在实际项目中应用这些概念,提升...
- **第5章:Essential Game Components**:探讨构成游戏的基本组件及其功能实现方法。 - **第6章:Your First Game: Alien Turtle Invasion**:通过实例项目“外星龟入侵”来实践前面所学的知识,完成一个简单的...
这款游戏使用了 Phaser框架。如果你想编写一个简单的游戏,那么这款射击游戏就是你必玩的游戏。这款游戏包含大量 JavaScript,用于对游戏的某些部分进行验证。 游戏玩法 要运行此游戏,您不需要任何类型的本地...
HTML5是超文本标记语言(HyperText Markup Language)的第五次重大修订,它在互联网内容创作领域具有广泛的应用,尤其在游戏开发中表现出色。以下是对这个主题的详细讲解: 1. HTML5基础:HTML5引入了许多新的元素...
第二章可能详细介绍了CSS3的选择器、布局模式(如Flexbox和Grid)以及动画效果,这些都是现代网页设计不可或缺的部分。 "淘宝的HTML5实践.pdf"则是一份实战案例,展示了HTML5在大型电商网站如淘宝的实际应用。这...
最后,通过Callable和Future接口使用线程执行可返回结果的任务也是本章内容的一部分。 线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。在Java中,实现多线程主要有两种...