`
marb
  • 浏览: 422595 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java Phaser使用

    博客分类:
  • JAVA
 
阅读更多

1 Overview

   Java 7的并发包中推出了Phaser,其功能跟CyclicBarrier和CountDownLatch有些重叠,但是提供了更灵活的用法,例如支持动态调整注册任务的数量等。本文在Phaser自带的示例代码基础上进行一下简单的分析。

 

2 Glossary

2.1 Registration

    Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量,此外也支持通过其构造函数进行指定初始数量。在适当的时机,Phaser支持减少注册任务的数量,例如 arriveAndDeregister()。单个Phaser实例允许的注册任务数的上限是65535。

 

2.2 Arrival

    正如Phaser类的名字所暗示,每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到 达,arriveAndAwaitAdvance()方法用于记录到达,并且等待其它未到达的任务。

 

2.3 Termination

    Phaser支持终止。Phaser终止之后,调用register()和bulkRegister(int parties)方法没有任何效果,arriveAndAwaitAdvance()方法也会立即返回。触发终止的时机是在protected boolean onAdvance(int phase, int registeredParties)方法返回时,如果该方法返回true,那么Phaser会被终止。默认实现是在注册任务数为0时返回true(即 return registeredParties == 0;)。此外,forceTermination()方法用于强制终止,isTerminated()方法用于判断是否已经终止。

 

2.4 Tiering

    Phaser支持层次结构,即通过构造函数Phaser(Phaser parent)和Phaser(Phaser parent, int parties)构造一个树形结构。这有助于减轻因在单个的Phaser上注册过多的任务而导致的竞争,从而提升吞吐量,代价是增加单个操作的开销。

 

3 Sample Usage

3.1 Sample 1

    在有些场景下,我们希望控制多个线程的启动时机:例如在并发相关的单元测试中,有时需要控制线程的启动时机,以期获得最大程度的并发,通常我们会使用CountDownLatch,以下是使用Phaser的版本。

Java代码  收藏代码
  1. import java.util.concurrent.Phaser;  
  2.   
  3. public class PhaserTest1 {  
  4.   
  5.     public static void main(String args[]) {  
  6.         //  
  7.         final int count = 5;  
  8.         final Phaser phaser = new Phaser(count);  
  9.         for(int i = 0; i < count; i++) {  
  10.             System.out.println("starting thread, id: " + i);  
  11.             final Thread thread = new Thread(new Task(i, phaser));  
  12.             thread.start();  
  13.         }  
  14.     }  
  15.       
  16.     public static class Task implements Runnable {  
  17.         //  
  18.         private final int id;  
  19.         private final Phaser phaser;  
  20.   
  21.         public Task(int id, Phaser phaser) {  
  22.             this.id = id;  
  23.             this.phaser = phaser;  
  24.         }  
  25.           
  26.         @Override  
  27.         public void run() {  
  28.             phaser.arriveAndAwaitAdvance();  
  29.             System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);  
  30.         }  
  31.     }  
  32. }  

   以上例子中,由于线程是在一个循环中start,因此start的时机有一定的间隔。本例中这些线程实际开始工作的时机是在所有的线程都调用了phaser.arriveAndAwaitAdvance()之后。

    此外,如果留心arriveAndAwaitAdvance()方法的签名,会发现它并没有抛出InterruptedException,实际上,即使 当前线程被中断,arriveAndAwaitAdvance()方法也不会返回,而是继续等待。如果在等待时希望可中断,或者可超时,那么需要使用以下 方法:

Java代码  收藏代码
  1. awaitAdvance(arrive())  // 等效于arriveAndAwaitAdvance()  
  2. awaitAdvanceInterruptibly(int phase)  
  3. awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)  

 

3.2 Sample 2

    有些时候我们希望只有在某些外部条件满足时,才真正开始任务的执行,例如:

Java代码  收藏代码
  1. import java.io.BufferedReader;  
  2. import java.io.InputStreamReader;  
  3. import java.util.concurrent.Phaser;  
  4.   
  5. public class PhaserTest2 {  
  6.   
  7.     public static void main(String args[]) throws Exception {  
  8.         //  
  9.         final Phaser phaser = new Phaser(1);  
  10.         for(int i = 0; i < 5; i++) {  
  11.             phaser.register();  
  12.             System.out.println("starting thread, id: " + i);  
  13.             final Thread thread = new Thread(new Task(i, phaser));  
  14.             thread.start();  
  15.         }  
  16.           
  17.         //  
  18.         System.out.println("Press ENTER to continue");  
  19.         BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));  
  20.         reader.readLine();  
  21.         phaser.arriveAndDeregister();  
  22.     }  
  23.       
  24.     public static class Task implements Runnable {  
  25.         //  
  26.         private final int id;  
  27.         private final Phaser phaser;  
  28.   
  29.         public Task(int id, Phaser phaser) {  
  30.             this.id = id;  
  31.             this.phaser = phaser;  
  32.         }  
  33.           
  34.         @Override  
  35.         public void run() {  
  36.             phaser.arriveAndAwaitAdvance();  
  37.             System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);  
  38.         }  
  39.     }  
  40. }  

    以上例子中,只有当用户按下回车之后,任务才真正开始执行。需要注意的是,arriveAndDeregister()方法不会被阻塞,并且返回到达时的phase number(arrive方法也是如此)。

 

3.3 Sample 3

    CyclicBarrier支持barrier action, Phaser同样也支持。不同之处是Phaser的barrier action需要改写onAdvance方法来进行定制。

Java代码  收藏代码
  1. import java.util.concurrent.Phaser;  
  2.   
  3. public class PhaserTest3 {  
  4.   
  5.     public static void main(String args[]) throws Exception {  
  6.         //  
  7.         final int count = 5;  
  8.         final int phaseToTerminate = 3;  
  9.         final Phaser phaser = new Phaser(count) {  
  10.             @Override  
  11.             protected boolean onAdvance(int phase, int registeredParties) {  
  12.                 System.out.println("====== " + phase + " ======");  
  13.                 return phase >= phaseToTerminate || registeredParties == 0;  
  14.             }  
  15.         };  
  16.           
  17.         //  
  18.         for(int i = 0; i < count; i++) {  
  19.             System.out.println("starting thread, id: " + i);  
  20.             final Thread thread = new Thread(new Task(i, phaser));  
  21.             thread.start();  
  22.         }  
  23.     }  
  24.       
  25.     public static class Task implements Runnable {  
  26.         //  
  27.         private final int id;  
  28.         private final Phaser phaser;  
  29.   
  30.         public Task(int id, Phaser phaser) {  
  31.             this.id = id;  
  32.             this.phaser = phaser;  
  33.         }  
  34.           
  35.         @Override  
  36.         public void run() {  
  37.             do {  
  38.                 try {  
  39.                     Thread.sleep(500);  
  40.                 } catch(InterruptedException e) {  
  41.                     // NOP  
  42.                 }  
  43.                 System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);  
  44.                 phaser.arriveAndAwaitAdvance();  
  45.             } while(!phaser.isTerminated());  
  46.         }  
  47.     }  
  48. }  

   本例中的barrier action只是简单地打印了一条信息,此外在超过指定的迭代次数后终止了Phaser。

 

3.4 Sample 4

    在Smaple 3的例子中,主线程在其它工作线程结束之前已经终止。如果希望主线程等待这些工作线程结束,除了使用Thread.join()之外,也可以尝试以下的方式:

Java代码  收藏代码
  1. import java.util.concurrent.Phaser;  
  2.   
  3. public class PhaserTest4 {  
  4.   
  5.     public static void main(String args[]) throws Exception {  
  6.         //  
  7.         final int count = 5;  
  8.         final int phaseToTerminate = 3;  
  9.         final Phaser phaser = new Phaser(count) {  
  10.             @Override  
  11.             protected boolean onAdvance(int phase, int registeredParties) {  
  12.                 System.out.println("====== " + phase + " ======");  
  13.                 return phase == phaseToTerminate || registeredParties == 0;  
  14.             }  
  15.         };  
  16.           
  17.         //  
  18.         for(int i = 0; i < count; i++) {  
  19.             System.out.println("starting thread, id: " + i);  
  20.             final Thread thread = new Thread(new Task(i, phaser));  
  21.             thread.start();  
  22.         }  
  23.           
  24.         //  
  25.         phaser.register();  
  26.         while (!phaser.isTerminated()) {  
  27.             phaser.arriveAndAwaitAdvance();  
  28.         }  
  29.         System.out.println("done");  
  30.     }  
  31.       
  32.     public static class Task implements Runnable {  
  33.         //  
  34.         private final int id;  
  35.         private final Phaser phaser;  
  36.   
  37.         public Task(int id, Phaser phaser) {  
  38.             this.id = id;  
  39.             this.phaser = phaser;  
  40.         }  
  41.           
  42.         @Override  
  43.         public void run() {  
  44.             while(!phaser.isTerminated()) {  
  45.                 try {  
  46.                     Thread.sleep(500);  
  47.                 } catch(InterruptedException e) {  
  48.                     // NOP  
  49.                 }  
  50.                 System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);  
  51.                 phaser.arriveAndAwaitAdvance();  
  52.             }  
  53.         }  
  54.     }  
  55. }  

   如果希望主线程在特定的phase结束之后终止,那么可以在主线程中调用下述方法:

Java代码  收藏代码
  1. public static void awaitPhase(Phaser phaser, int phase) {  
  2.     int p = phaser.register(); // assumes caller not already registered  
  3.     while (p < phase) {  
  4.         if (phaser.isTerminated()) {  
  5.             break// ... deal with unexpected termination  
  6.         } else {  
  7.             p = phaser.arriveAndAwaitAdvance();  
  8.         }  
  9.     }  
  10.     phaser.arriveAndDeregister();  
  11. }  

    需要注意的是,awaitPhase方法中的if (phaser.isTerminated()) 分支里需要能够正确处理Phaser终止的情况。否则由于在Phaser终止之后, phaser.register()和arriveAndAwaitAdvance()方法均返回负值,那么上述方法可能陷入死循环。

 

3.5 Sample 5

    以下对Phaser进行分层的例子:

Java代码  收藏代码
  1. import java.util.concurrent.Phaser;  
  2.   
  3. public class PhaserTest6 {  
  4.     //  
  5.     private static final int TASKS_PER_PHASER = 4;  
  6.   
  7.     public static void main(String args[]) throws Exception {  
  8.         //  
  9.         final int phaseToTerminate = 3;  
  10.         final Phaser phaser = new Phaser() {  
  11.             @Override  
  12.             protected boolean onAdvance(int phase, int registeredParties) {  
  13.                 System.out.println("====== " + phase + " ======");  
  14.                 return phase == phaseToTerminate || registeredParties == 0;  
  15.             }  
  16.         };  
  17.           
  18.         //  
  19.         final Task tasks[] = new Task[10];  
  20.         build(tasks, 0, tasks.length, phaser);  
  21.         for (int i = 0; i < tasks.length; i++) {  
  22.             System.out.println("starting thread, id: " + i);  
  23.             final Thread thread = new Thread(tasks[i]);  
  24.             thread.start();  
  25.         }  
  26.     }  
  27.   
  28.     public static void build(Task[] tasks, int lo, int hi, Phaser ph) {  
  29.         if (hi - lo > TASKS_PER_PHASER) {  
  30.             for (int i = lo; i < hi; i += TASKS_PER_PHASER) {  
  31.                 int j = Math.min(i + TASKS_PER_PHASER, hi);  
  32.                 build(tasks, i, j, new Phaser(ph));  
  33.             }  
  34.         } else {  
  35.             for (int i = lo; i < hi; ++i)  
  36.                 tasks[i] = new Task(i, ph);  
  37.         }  
  38.     }  
  39.   
  40.     public static class Task implements Runnable {  
  41.         //  
  42.         private final int id;  
  43.         private final Phaser phaser;  
  44.   
  45.         public Task(int id, Phaser phaser) {  
  46.             this.id = id;  
  47.             this.phaser = phaser;  
  48.             this.phaser.register();  
  49.         }  
  50.   
  51.         @Override  
  52.         public void run() {  
  53.             while (!phaser.isTerminated()) {  
  54.                 try {  
  55.                     Thread.sleep(200);  
  56.                 } catch (InterruptedException e) {  
  57.                     // NOP  
  58.                 }  
  59.                 System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);  
  60.                 phaser.arriveAndAwaitAdvance();  
  61.             }  
  62.         }  
  63.     }  
  64. }  

    需要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),可以考虑使用较小的TASKS_PER_PHASER值,例如4。反之可以适当增大TASKS_PER_PHASER。

分享到:
评论
1 楼 shengshihouzhe 2018-01-22  
第一个eg实现的应该是循环栅栏,不是计数器

相关推荐

    java多线程之Phaser的使用详解

    Java 多线程之 Phaser 的使用详解 Phaser 是 Java 7 中引入的新的并发 API,主要用于控制多个线程之间的同步和协作。Phaser 机制可以看作是一个阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个...

    java8.pdf详细说明java8的使用说明

    Java 8对并发API也进行了大量改进,例如增加了新的并发工具,如CompletableFuture和Phaser。这些改进旨在简化并发编程,提高性能和可读性。 9. Nashorn引擎: Nashorn是一个JavaScript引擎,允许在Java平台内执行...

    Java synchronized使用案例

    在`Synchronized`压缩包中,可能包含了通过实验来演示`synchronized`关键字使用的一些Java代码。例如,创建一个共享资源类,然后通过多个线程去操作这个资源,使用`synchronized`关键字来保证线程安全。实验可能涉及...

    Java并发工具类示例

    本文将详细解析Java并发工具类,并通过示例代码介绍`CountDownLatch`、`CyclicBarrier`、`Phaser`、`Semaphore`和`ThreadLocal`的用法。 1. **CountDownLatch** `CountDownLatch`是一个计数器,通常用于等待多个...

    java-jdk1.6-jdk-6u45-windows-x64.zip

    "使用说明.txt"文件可能包含关于如何配置环境变量、启动Java应用程序或使用特定工具的详细步骤。务必仔细阅读并按照说明进行操作,以确保Java开发环境的正确配置。 总的来说,Java JDK 1.6对于当时的开发者来说是一...

    33 分阶段执行你的任务-学习使用Phaser运行多阶段任务.pdf

    Phaser 是 Java 7 引入的一个并发工具类,它扩展了 CyclicBarrier 和 CountDownLatch 的功能,提供了更灵活的线程同步机制。Phaser 主要用于管理一系列阶段性的任务,允许在每个阶段的开始和结束时进行同步,且支持...

    java7帮助文档

    The Phaser class is a new synchronization barrier, similar to CyclicBarrier. Rich Internet Applications (RIA) and Deployment The window of a dragged applet can be decorated with a default or custom ...

    JAVA API文档|JAVA API 1.7中文文档

    Java API文档是Java开发者的重要参考资料,它包含了Java标准库中所有类、接口和方法的详细说明,便于开发者理解和使用Java平台的各种功能。本篇将详细解读Java API 1.7中文版中的关键知识点。 首先,Java API 1.7,...

    Java1.6 32.rar

    7. **并发工具**:Java Concurrency Utilities进一步扩展,提供了更多线程同步和并发编程的工具,如CountDownLatch、CyclicBarrier和Phaser。 8. **安全增强**:加强了安全管理器,提供了更强的加密功能,并修复了...

    java6 api中文版

    9. 并发工具:Java 6加强了并发编程的支持,新增了`ConcurrentHashMap`、`Phaser`、`ForkJoinPool`等并发工具类,简化了多线程编程。 10. 安全性改进:Java 6增强了安全模型,提供了更好的数字证书管理和SSL/TLS...

    JAVA_API_1.7中文版

    10. **并发改进**:`java.util.concurrent`包中增加了`ConcurrentHashMap`的性能优化,以及`Phaser`类,用于协调多线程间的同步。 这些新特性大大提升了Java的生产力和灵活性,使得开发者能更高效地编写和管理代码...

    Java多线程编程实战指南-核心篇

    此外,Phaser、CyclicBarrier和CountDownLatch等同步辅助类也是线程间协调的重要工具。 线程池是Java并发编程中不可或缺的一部分。Executor框架提供了ThreadPoolExecutor,它允许我们预先创建一定数量的线程,管理...

    Java并发编程实践高清pdf及源码

    5. **并发工具类**:如`CountDownLatch`、`CyclicBarrier`、`Semaphore`和`Phaser`等,它们为多线程间的协作提供了方便。 6. **活锁与死锁**:活锁是两个或更多线程相互等待对方释放资源,导致无限期等待;死锁则是...

    java API文档chm中文+英文

    这份资源提供了jdk 1.6版本的API文档,分别以中文和英文两种语言呈现,对于中国开发者来说,尤其方便,可以更好地理解和使用Java语言进行开发。 一、Java API文档的重要性 Java API文档是学习和开发Java应用的基础...

    JAVA100例之实例64 JAVA线程间通讯

    6. **Phaser**:Phaser是Java 7引入的新工具,类似于CyclicBarrier,但更灵活。它可以动态地添加和移除参与线程,并且在每个阶段可以执行不同的动作。 7. **BlockingQueue**:阻塞队列是一种特殊的队列,当队列为空...

    java1.6JDK

    4. **并发工具集**:提供了一些高级的并发控制和同步机制,如`ConcurrentHashMap`、`Phaser`等,便于开发者编写多线程程序。 5. **改进的安全性**:增强了安全策略和证书管理,提升了Java应用程序的安全性。 6. **...

    JAVA高级编程培训讲义

    接下来,关于并发编程,Java提供了并发工具包java.util.concurrent,这个包中包含很多并发编程中经常会用到的类和接口,例如Executor框架、CountDownLatch、CyclicBarrier、Semaphore、Phaser等。这些工具类提供了...

    JDK(Java Development Kit)

    5. **动态语言支持**:JDK 1.6引入了JSR 223(Scripting for the Java Platform),允许在Java中使用脚本语言,如JavaScript。 6. **改进的数据库连接(JDBC)**:提供了更强大的数据库连接池管理和SQL语句执行优化...

    javaJDK1.6-Win64

    尽管现代Java开发通常推荐使用较新版本的JDK,但了解并掌握这些历史版本的知识仍然是必要的,因为它们可以帮助开发者理解Java技术的发展历程,同时在遇到需要维护旧系统时能迅速上手。在某些情况下,由于兼容性和...

Global site tag (gtag.js) - Google Analytics