`

Concurrent - Phaser - awaitAdvance & awaitAdvanceInterruptibly

 
阅读更多

原创转载请注明出处:http://agilestyle.iteye.com/blog/2344661

 

awaitAdvance(int phase)

awaitAdvance(int phase)作用是如果传入参数phase值和当前getPhase()返回值一样,则在屏障处等待,否则立刻返回执行下面的代码


PhaserTest11.java

package org.fool.java.concurrent.phaser;

import java.util.concurrent.Phaser;

public class PhaserTest11 {
    public static class ThreadA implements Runnable {
        private Phaser phaser;

        public ThreadA(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

            phaser.arriveAndAwaitAdvance();

            System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
        }
    }

    public static class ThreadB implements Runnable {
        private Phaser phaser;

        public ThreadB(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

            phaser.arriveAndAwaitAdvance();

            System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
        }
    }

    public static class ThreadC implements Runnable {
        private Phaser phaser;

        public ThreadC(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

                Thread.sleep(3000);

                System.out.println("phaser.getPhase()=" + phaser.getPhase());
                phaser.awaitAdvance(0);

                System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ThreadD implements Runnable {
        private Phaser phaser;

        public ThreadD(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

                Thread.sleep(5000);

                phaser.arriveAndAwaitAdvance();

                System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Thread t1 = new Thread(new ThreadA(phaser));
        Thread t2 = new Thread(new ThreadB(phaser));
        Thread t3 = new Thread(new ThreadC(phaser));
        Thread t4 = new Thread(new ThreadD(phaser));

        t1.setName("ThreadA");
        t2.setName("ThreadB");
        t3.setName("ThreadC");
        t4.setName("ThreadD");

        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

Run


Note:

awaitAdvance(int phase)并不参与parties计数的操作,仅仅具有判断的功能

 

更改ThreadC,将 

phaser.awaitAdvance(0);

改为

phaser.awaitAdvance(1);

再Run


Note:

由于传入参数phase值和当前getPhase()返回值不一致,则立即返回

 

awaitAdvanceInterruptibly(int phase)

awaitAdvanceInterruptibly(int phase)作用是传入参数phase值和当前getPhase()返回值不一致时,则继续执行下面的代码

PhaserTest14.java

package org.fool.java.concurrent.phaser;

import java.util.concurrent.Phaser;

public class PhaserTest14 {
    public static class ThreadA implements Runnable {
        private Phaser phaser;

        public ThreadA(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

                System.out.println("phaser.getPhase()=" + phaser.getPhase());
                phaser.awaitAdvanceInterruptibly(10);

                System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                System.out.println("InterruptedException invoked...");
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Thread t1 = new Thread(new ThreadA(phaser));
        t1.setName("ThreadA");
        t1.start();
    }
}

Run


Note:

由于传入参数phase值和当前getPhase()返回值不一致,控制台程序快速继续向下执行,运行的时间几乎是一样的

 

awaitAdvance(int phase)是不可中断的

PhaserTest12.java

package org.fool.java.concurrent.phaser;

import java.util.concurrent.Phaser;

public class PhaserTest12 {
    public static class ThreadA implements Runnable {
        private Phaser phaser;

        public ThreadA(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

            System.out.println("phaser.getPhase()=" + phaser.getPhase());
            phaser.awaitAdvance(0);

            System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
        }
    }

    public static void main(String[] args) {
        try {
            Phaser phaser = new Phaser(3);

            Thread t1 = new Thread(new ThreadA(phaser));
            t1.setName("ThreadA");
            t1.start();

            Thread.sleep(3000);

            t1.interrupt();

            System.out.println("interrupt() invoked..");
        } catch (InterruptedException e) {
            System.out.println("InterruptedException invoked...");
            e.printStackTrace();
        }
    }
}

Run


Note:

控制台没有出现异常,说明线程并未中断 

 

awaitAdvanceInterruptibly(int phase)是可中断的

PhaserTest13.java

package org.fool.java.concurrent.phaser;

import java.util.concurrent.Phaser;

public class PhaserTest13 {
    public static class ThreadA implements Runnable {
        private Phaser phaser;

        public ThreadA(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

                System.out.println("phaser.getPhase()=" + phaser.getPhase());
                phaser.awaitAdvanceInterruptibly(0);

                System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                System.out.println("InterruptedException invoked...");
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {
            Phaser phaser = new Phaser(3);

            Thread t1 = new Thread(new ThreadA(phaser));
            t1.setName("ThreadA");
            t1.start();

            Thread.sleep(5000);

            t1.interrupt();

            System.out.println("interrupt() invoked..");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Run


Note:

控制台出现异常,线程被中断了 

 

awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)

awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)作用是在指定的Phase等待最大的单位时间,如果在指定的时间内,Phase值未变,则出现异常,否则继续向下运行。


PhaserTest15.java

package org.fool.java.concurrent.phaser;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class PhaserTest15 {
    public static class ThreadA implements Runnable {
        private Phaser phaser;

        public ThreadA(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin " + System.currentTimeMillis());

                System.out.println("phaser.getPhase()=" + phaser.getPhase());
                phaser.awaitAdvanceInterruptibly(0, 5, TimeUnit.SECONDS);

                System.out.println(Thread.currentThread().getName() + " A1 end " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                System.out.println("InterruptedException invoked...");
                e.printStackTrace();
            } catch (TimeoutException e) {
                System.out.println("TimeoutException invoked...");
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Thread t1 = new Thread(new ThreadA(phaser));
        t1.setName("ThreadA");
        t1.start();
    }
}

Run  


Note:

出现了超时异常,因为5秒之后phase值并没有发生改变 

 

修改main方法

public static void main(String[] args) {
	try {
		Phaser phaser = new Phaser(3);

		Thread t1 = new Thread(new ThreadA(phaser));
		t1.setName("ThreadA");
		t1.start();

		System.out.println("phaser.getArrivedParties()=" + phaser.getArrivedParties());

		Thread.sleep(1000);
		phaser.arrive();
		System.out.println("phaser.getArrivedParties()=" + phaser.getArrivedParties());

		Thread.sleep(1000);
		phaser.arrive();
		System.out.println("phaser.getArrivedParties()=" + phaser.getArrivedParties());

		Thread.sleep(1000);
		phaser.arrive();
		System.out.println("phaser.getArrivedParties()=" + phaser.getArrivedParties());

		Thread.sleep(3000);
		System.out.println("phaser.getPhase()=" + phaser.getPhase());
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

再Run


Note:

5秒之后phase值发生改变,继续向下运行 

 

再次修改main方法

public static void main(String[] args) {
	try {
		Phaser phaser = new Phaser(3);

		Thread t1 = new Thread(new ThreadA(phaser));
		t1.setName("ThreadA");
		t1.start();

		Thread.sleep(1000);
		t1.interrupt();
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

再Run


Note:

提前将还未到达5秒的线程进行了中断 

 

Reference

Java并发编程核心方法与框架  

  

  • 大小: 8.7 KB
  • 大小: 22.2 KB
  • 大小: 22 KB
  • 大小: 12.4 KB
  • 大小: 10.8 KB
  • 大小: 24.3 KB
  • 大小: 13.6 KB
  • 大小: 14.4 KB
  • 大小: 22.2 KB
  • 大小: 21.4 KB
  • 大小: 22 KB
分享到:
评论

相关推荐

    JavaEE源代码 concurrent-1.3.2

    JavaEE源代码 concurrent-1.3.2JavaEE源代码 concurrent-1.3.2JavaEE源代码 concurrent-1.3.2JavaEE源代码 concurrent-1.3.2JavaEE源代码 concurrent-1.3.2JavaEE源代码 concurrent-1.3.2JavaEE源代码 concurrent-...

    backport-util-concurrent-3.1.jar

    - copy %AXIS2_HOME%\lib\backport-util-concurrent-3.1.jar 到%ECLIPSE_HOME%\plugins\Axis2_Codegen_Wizard_1.3.0\lib - 注册此 jar 包: 編輯 %ECLIPSE_HOME%\plugins\Axis2_Codegen_Wizard_1.3.0\plugin.xml , ...

    concurrent-1.3.4.jar

    concurrent-1.3.4.jar

    backport-util-concurrent-3.1.jar geronimo-stax-api_1.0_spec-1.0.1.jar 下载

    backport-util-concurrent-3.1.jar 和 geronimo-stax-api_1.0_spec-1.0.1.jar 复制到 MyEclipse 6.5\eclipse\plugins\Axis2_Codegen_Wizard_1.3.0\lib 文件夹下。 (3).注册此 jar 包: 修改MyEclipse 6.5\eclipse...

    backport-util-concurrent-java12-3.1-sources.jar

    官方版本,亲测可用

    jodconverter-core-3.0-beta-4jar&sources&javadoc

    3. **并发处理**:`org.jodconverter.concurrent`包提供了并发处理工具,允许在多线程环境中高效地执行大量转换任务。 4. **日志记录**:JodConverter集成了SLF4J(Simple Logging Facade for Java),方便开发者...

    concurrent-1.3.4-sources.jar

    《并发编程库 concurrent-1.3.4-sources.jar 深度解析》 在Java编程领域,"concurrent"一词通常与多线程和并发处理相关,它指的是能够同时执行多个任务的能力。这里提到的`concurrent-1.3.4-sources.jar`是一个特定...

    backport-util-concurrent-3.1.jar和geronimo-stax-api_1.0_spec-1.0.1.jar

    "backport-util-concurrent-3.1.jar" 文件到 Axis2_Codegen_Wizard_1.3.0 的 lib 目录中 , 同时修改 Axis2_Codegen_Wizard_1.3.0 下的 plugin.xml 文件 , 在 <runtime> 中添加 <library name="lib/geronimo-stax-...

    Android代码-Concurrent-Utils

    Concurrent-Utils Utilities for Java concurrent library. This is a library contains some useful and smart utility class for Java concurrent library. Shelly, HermesEventBus and AndroidDataStorage are ...

    concurrent-1.3.2.jar

    concurrent-1.3.2.ja

    atlassian-util-concurrent-0.0.12.jar.zip

    《深入解析Atlassian Util Concurrent库:0.0.12版本》 在IT行业中,高效且可靠的并发处理是系统性能优化的关键因素之一。Atlassian公司,以其强大的协作工具如Jira、Confluence等闻名,也提供了许多开源工具来支持...

    concurrent-all-in-one.pdf

    《并发编程全方位解析》 并发编程是现代计算机系统中不可或缺的一部分,尤其是在多核处理器和分布式系统环境下。本章将深入探讨Java并发编程的核心概念、工具和最佳实践。 1. **Java并发组件概述** ...

    backport-util-concurrent(2.2 /3.1)

    - 在3.1版本中,backport-util-concurrent引入了Phaser,这是一个可重用的同步帮助器类,支持多个参与者之间的有界同步,类似于CyclicBarrier和CountDownLatch,但更灵活。Phaser可以自动调整参与者的数量,并且在...

    backport-util-concurrent-1.0.jar

    backport-util-concurrent-1.0.jar,

    patterns-for-concurrent-and-networked-objects

    patterns-for-concurrent-and-networked-objects

Global site tag (gtag.js) - Google Analytics