`
whitesock
  • 浏览: 485130 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

What's New on Java 7 Phaser

阅读更多

1 Overview

   Java 7的并发包中推出了Phaser,其功能跟CyclicBarrier和CountDownLatch有些重叠,但是提供了更灵活的用法,例如支持动态调整注册任务的数量等。本文在Phaser自带的示例代码基础上进行一下简单的分析。

 

2 Glossary

2.1 Registration

    Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量,此外也支持通过其构造函数进行指定初始数量。在适当的时机,Phaser支持减少注册任务的数量,例如 arriveAndDeregister()。单个Phaser实例允许的注册任务数的上限是65535。


2.2 Arrival

    正如Phaser类的名字所暗示,每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到 达,arriveAndAwaitAdvance()方法用于记录到达,并且等待其它未到达的任务。


2.3 Termination

    Phaser支持终止。Phaser终止之后,调用register()和bulkRegister(int parties)方法没有任何效果,arriveAndAwaitAdvance()方法也会立即返回。触发终止的时机是在protected boolean onAdvance(int phase, int registeredParties)方法返回时,如果该方法返回true,那么Phaser会被终止。默认实现是在注册任务数为0时返回true(即 return registeredParties == 0;)。此外,forceTermination()方法用于强制终止,isTerminated()方法用于判断是否已经终止。


2.4 Tiering

    Phaser支持层次结构,即通过构造函数Phaser(Phaser parent)和Phaser(Phaser parent, int parties)构造一个树形结构。这有助于减轻因在单个的Phaser上注册过多的任务而导致的竞争,从而提升吞吐量,代价是增加单个操作的开销。

 

3 Sample Usage

3.1 Sample 1

    在有些场景下,我们希望控制多个线程的启动时机:例如在并发相关的单元测试中,有时需要控制线程的启动时机,以期获得最大程度的并发,通常我们会使用CountDownLatch,以下是使用Phaser的版本。

import java.util.concurrent.Phaser;

public class PhaserTest1 {

    public static void main(String args[]) {
        //
        final int count = 5;
        final Phaser phaser = new Phaser(count);
        for(int i = 0; i < count; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }
    }
    
    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            phaser.arriveAndAwaitAdvance();
            System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
        }
    }
}

   以上例子中,由于线程是在一个循环中start,因此start的时机有一定的间隔。本例中这些线程实际开始工作的时机是在所有的线程都调用了phaser.arriveAndAwaitAdvance()之后。

    此外,如果留心arriveAndAwaitAdvance()方法的签名,会发现它并没有抛出InterruptedException,实际上,即使 当前线程被中断,arriveAndAwaitAdvance()方法也不会返回,而是继续等待。如果在等待时希望可中断,或者可超时,那么需要使用以下 方法:

awaitAdvance(arrive())  // 等效于arriveAndAwaitAdvance()
awaitAdvanceInterruptibly(int phase)
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
 

3.2 Sample 2

    有些时候我们希望只有在某些外部条件满足时,才真正开始任务的执行,例如:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.Phaser;

public class PhaserTest2 {

    public static void main(String args[]) throws Exception {
        //
        final Phaser phaser = new Phaser(1);
        for(int i = 0; i < 5; i++) {
            phaser.register();
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }
        
        //
        System.out.println("Press ENTER to continue");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        reader.readLine();
        phaser.arriveAndDeregister();
    }
    
    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            phaser.arriveAndAwaitAdvance();
            System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
        }
    }
}

    以上例子中,只有当用户按下回车之后,任务才真正开始执行。需要注意的是,arriveAndDeregister()方法不会被阻塞,并且返回到达时的phase number(arrive方法也是如此)。

 

3.3 Sample 3

    CyclicBarrier支持barrier action, Phaser同样也支持。不同之处是Phaser的barrier action需要改写onAdvance方法来进行定制。

import java.util.concurrent.Phaser;

public class PhaserTest3 {

    public static void main(String args[]) throws Exception {
        //
        final int count = 5;
        final int phaseToTerminate = 3;
        final Phaser phaser = new Phaser(count) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("====== " + phase + " ======");
                return phase >= phaseToTerminate || registeredParties == 0;
            }
        };
        
        //
        for(int i = 0; i < count; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }
    }
    
    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            do {
                try {
                    Thread.sleep(500);
                } catch(InterruptedException e) {
                    // NOP
                }
                System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
                phaser.arriveAndAwaitAdvance();
            } while(!phaser.isTerminated());
        }
    }
}

   本例中的barrier action只是简单地打印了一条信息,此外在超过指定的迭代次数后终止了Phaser。


3.4 Sample 4

    在Smaple 3的例子中,主线程在其它工作线程结束之前已经终止。如果希望主线程等待这些工作线程结束,除了使用Thread.join()之外,也可以尝试以下的方式:

import java.util.concurrent.Phaser;

public class PhaserTest4 {

    public static void main(String args[]) throws Exception {
        //
        final int count = 5;
        final int phaseToTerminate = 3;
        final Phaser phaser = new Phaser(count) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("====== " + phase + " ======");
                return phase == phaseToTerminate || registeredParties == 0;
            }
        };
        
        //
        for(int i = 0; i < count; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(new Task(i, phaser));
            thread.start();
        }
        
        //
        phaser.register();
        while (!phaser.isTerminated()) {
            phaser.arriveAndAwaitAdvance();
        }
        System.out.println("done");
    }
    
    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            while(!phaser.isTerminated()) {
                try {
                    Thread.sleep(500);
                } catch(InterruptedException e) {
                    // NOP
                }
                System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
                phaser.arriveAndAwaitAdvance();
            }
        }
    }
}

   如果希望主线程在特定的phase结束之后终止,那么可以在主线程中调用下述方法:

public static void awaitPhase(Phaser phaser, int phase) {
    int p = phaser.register(); // assumes caller not already registered
    while (p < phase) {
        if (phaser.isTerminated()) {
            break; // ... deal with unexpected termination
        } else {
            p = phaser.arriveAndAwaitAdvance();
        }
    }
    phaser.arriveAndDeregister();
}

    需要注意的是,awaitPhase方法中的if (phaser.isTerminated()) 分支里需要能够正确处理Phaser终止的情况。否则由于在Phaser终止之后, phaser.register()和arriveAndAwaitAdvance()方法均返回负值,那么上述方法可能陷入死循环。


3.5 Sample 5

    以下对Phaser进行分层的例子:

import java.util.concurrent.Phaser;

public class PhaserTest6 {
    //
    private static final int TASKS_PER_PHASER = 4;

    public static void main(String args[]) throws Exception {
        //
        final int phaseToTerminate = 3;
        final Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("====== " + phase + " ======");
                return phase == phaseToTerminate || registeredParties == 0;
            }
        };
        
        //
        final Task tasks[] = new Task[10];
        build(tasks, 0, tasks.length, phaser);
        for (int i = 0; i < tasks.length; i++) {
            System.out.println("starting thread, id: " + i);
            final Thread thread = new Thread(tasks[i]);
            thread.start();
        }
    }

    public static void build(Task[] tasks, int lo, int hi, Phaser ph) {
        if (hi - lo > TASKS_PER_PHASER) {
            for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
                int j = Math.min(i + TASKS_PER_PHASER, hi);
                build(tasks, i, j, new Phaser(ph));
            }
        } else {
            for (int i = lo; i < hi; ++i)
                tasks[i] = new Task(i, ph);
        }
    }

    public static class Task implements Runnable {
        //
        private final int id;
        private final Phaser phaser;

        public Task(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
            this.phaser.register();
        }

        @Override
        public void run() {
            while (!phaser.isTerminated()) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    // NOP
                }
                System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);
                phaser.arriveAndAwaitAdvance();
            }
        }
    }
}

    需要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),可以考虑使用较小的TASKS_PER_PHASER值,例如4。反之可以适当增大TASKS_PER_PHASER。

分享到:
评论
4 楼 龘龘龘 2015-10-10  
TrueBrian 写道
有个问题,
Sample 1中,为了控制线程的启动时机,博主实际上是通过线程之间相互等待来实现的,即等待所有的线程都start后,再大家一起执行。这种情形据我了解应该是使用CyclicBarrier而不是CountDownLatch啊?望博主指教一下

这个确实好像搞错了!
3 楼 龘龘龘 2015-10-10  
楼主总结的不错。
2 楼 TrueBrian 2015-03-30  
有个问题,
Sample 1中,为了控制线程的启动时机,博主实际上是通过线程之间相互等待来实现的,即等待所有的线程都start后,再大家一起执行。这种情形据我了解应该是使用CyclicBarrier而不是CountDownLatch啊?望博主指教一下
1 楼 childz 2012-08-07  
非常不错,感谢

相关推荐

    java7帮助文档

    Java Virtual Machine Support for Non-Java Languages: Java SE 7 introduces a new JVM instruction that simplifies the implementation of dynamically typed programming languages on the JVM. Garbage-First...

    java多线程之Phaser的使用详解

    Phaser 是 Java 7 中引入的新的并发 API,主要用于控制多个线程之间的同步和协作。Phaser 机制可以看作是一个阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个阶段。Phaser 特别适合使用在重复执行...

    Java 7 Concurrency Cookbook

    of how to use the new Java 7 Phaser class to synchronize tasks divided into phases. Chapter 4, Thread Executors will teach the readers to delegate the thread management to executors. They allow ...

    java 7 api chm版本

    Java 7 API CHM版本是Java开发者的重要参考资料,它包含了Java SE 7(Java Standard Edition 7)的所有公共类、接口、枚举和注解的详细文档。CHM(Compiled HTML Help)是一种由Microsoft开发的帮助文件格式,它将...

    docs_Phaser3.22最新文档_

    7. **国际化和多语言支持**:为了使Phaser成为全球开发者的首选框架,3.22版可能加强了对多语言的支持,包括文档的本地化,让非英语国家的开发者也能无障碍地学习和使用Phaser。 在实际应用中,开发者可以通过阅读...

    Phaser 刮刮樂 Demo

    7. **状态管理**:Phaser的状态管理系统可以帮助组织游戏的不同阶段,例如加载屏幕、游戏主界面和游戏结束画面。 8. **声音和音乐**:Phaser支持音频播放,可以添加刮擦声效或者背景音乐,提升用户体验。 9. **...

    xerox phaser3155驱动

    富士施乐(xerox) phaser3155驱动软件,英文版

    适配微信小游戏的Phaser以及它的demo集合phaser-wxdemo-master.zip

    7. **渲染技术**:理解Phaser-WX的渲染机制,包括批处理渲染、图层管理和优化策略。 8. **保存与加载进度**:学习如何利用微信小游戏的本地存储功能来保存和加载玩家的游戏进度。 9. **自适应屏幕**:掌握如何使...

    phaser.min.js

    phaser.min.js,免费,实用,提供phaser.min.js,前端JS

    java 7并发编程实战手册 源码

    其次,Java 7中改进了`java.util.concurrent`包,增加了许多并发工具类,如`CountDownLatch`、`CyclicBarrier`和`Phaser`,它们是同步和协调多线程的重要工具。`CountDownLatch`允许一个或多个线程等待其他线程完成...

    phaser-inspector, Phaser检查器插件允许你检查Phaser游戏.zip

    phaser-inspector, Phaser检查器插件允许你检查Phaser游戏 Phaser检查器插件将重新设计 [UPDATE] Phaser检查器插件重新设计和重构,以使它的成为更好的插件。 [UPDATE] Phaser检查器插件支持RenderTexture上的。 ...

    phaser游戏入门4(接金币游戏2)

    5. Phaser API应用:在实现这些功能时,我们会频繁使用Phaser的API,例如`Phaser.Physics.Arcade`模块进行物理碰撞检测,`Phaser.Group`管理游戏对象,`Phaser.Text`创建分数显示文本,以及`Phaser.SoundManager`...

    Phaser.js 贪吃蛇游戏

    7. **音频管理**:Phaser支持音频播放,可以在适当的时候添加背景音乐或音效,提高游戏体验。 8. **资产(Assets)管理**:在`assets`目录下,通常包含游戏所需的图像、音频和其他资源。Phaser提供了一种加载和管理...

    Phaser库文件

    Phaser是一个广泛使用的开源JavaScript游戏框架,专为创建2D游戏而设计。它提供了一整套工具和服务,帮助开发者快速构建各种类型的游戏,从简单的休闲游戏到复杂的动作冒险游戏。这个压缩包包含了Phaser库的两个主要...

    Phaser_3117_x64.rar 驱动

    正确安装并更新驱动可以确保Phaser 3117打印机在Windows 7 64位系统下发挥最佳性能,提供清晰、快速的打印效果。同时,保持驱动程序的最新状态有助于解决可能出现的兼容性问题和提高打印效率。如果在安装过程中遇到...

    java core 7源码

    6. **钻石操作符**:在创建泛型实例时,Java 7允许省略类型参数,编译器会自动推断出类型,如`new ArrayList()`。 7. **Strings in Switch**:Java 7允许在switch语句中直接使用字符串,增强了代码的可读性。 学习...

    富士施乐xerox phaser 3116打印机驱动 for win7 64位_官网版

    富士施乐3116驱动是由富士施乐官方专门为富士施乐xeroxphaser3116型号打印机打造的打印机驱动程序,如果您打印机无法正常打印或者不能连接电脑,您可以下载phaser3116驱动,即可快速帮你解决这类问题。富士施乐...

    打印机Phaser3200驱动

    《Xerox富士施乐Phaser 3200MFP激光打印机Vista-64位驱动详解》 Xerox富士施乐Phaser 3200MFP是一款高效能的激光打印机,专为满足中小型企业及个人用户对高质量打印的需求而设计。这款打印机以其出色的打印速度、...

    core java 7 第2卷

    《Core Java 7 第2卷》是一本专为Java开发者准备的权威指南,主要涵盖了Java 7版本中的高级特性和核心概念。这本书是Java开发人员深入理解语言、提高编程技能的重要参考资料。描述中的"core java 7 II"进一步强调了...

Global site tag (gtag.js) - Google Analytics