`
vitoer
  • 浏览: 6505 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java进阶(四)线程间通信剖析

 
阅读更多
本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/java/thread_communication/

CountDownLatch
CountDownLatch适用场景
Java多线程编程中经常会碰到这样一种场景——某个线程需要等待一个或多个线程操作结束(或达到某种状态)才开始执行。比如开发一个并发测试工具时,主线程需要等到所有测试线程均执行完成再开始统计总共耗费的时间,此时可以通过CountDownLatch轻松实现。

CountDownLatch实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.test.thread;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
  public static void main(String[] args) throws InterruptedException {
    int totalThread = 3;
    long start = System.currentTimeMillis();
    CountDownLatch countDown = new CountDownLatch(totalThread);
    for(int i = 0; i < totalThread; i++) {
      final String threadName = "Thread " + i;
      new Thread(() -> {
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "started"));
        try {
          Thread.sleep(1000);
        } catch (Exception ex) {
          ex.printStackTrace();
        }
        countDown.countDown();
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
      }).start();;
    }
    countDown.await();
    long stop = System.currentTimeMillis();
    System.out.println(String.format("Total time : %sms", (stop - start)));
  }
}
执行结果

1
2
3
4
5
6
7
Sun Jun 19 20:34:31 CST 2016  Thread 1 started
Sun Jun 19 20:34:31 CST 2016  Thread 0 started
Sun Jun 19 20:34:31 CST 2016  Thread 2 started
Sun Jun 19 20:34:32 CST 2016  Thread 2 ended
Sun Jun 19 20:34:32 CST 2016  Thread 1 ended
Sun Jun 19 20:34:32 CST 2016  Thread 0 ended
Total time : 1072ms
可以看到,主线程等待所有3个线程都执行结束后才开始执行。

CountDownLatch主要接口分析
CountDownLatch工作原理相对简单,可以简单看成一个倒计时器,在构造方法中指定初始值,每次调用countDown()方法时讲计数器减1,而await()会等待计数器变为0。CountDownLatch关键接口如下

countDown() 如果当前计数器的值大于1,则将其减1;若当前值为1,则将其置为0并唤醒所有通过await等待的线程;若当前值为0,则什么也不做直接返回。
await() 等待计数器的值为0,若计数器的值为0则该方法返回;若等待期间该线程被中断,则抛出InterruptedException并清除该线程的中断状态。
await(long timeout, TimeUnit unit) 在指定的时间内等待计数器的值为0,若在指定时间内计数器的值变为0,则该方法返回true;若指定时间内计数器的值仍未变为0,则返回false;若指定时间内计数器的值变为0之前当前线程被中断,则抛出InterruptedException并清除该线程的中断状态。
getCount() 读取当前计数器的值,一般用于调试或者测试。
CyclicBarrier
CyclicBarrier适用场景
在《当我们说线程安全时,到底在说什么》一文中讲过内存屏障,它能保证屏障之前的代码一定在屏障之后的代码之前被执行。CyclicBarrier可以译为循环屏障,也有类似的功能。CyclicBarrier可以在构造时指定需要在屏障前执行await的个数,所有对await的调用都会等待,只到调用await的次数达到预定指,所有等待都会立即被唤醒。

从使用场景上来说,CyclicBarrier是让多个线程互相等待某一事件的发生,然后同时被唤醒。而上文讲的CountDownLatch是让某一线程等待多个线程的状态,然后该线程被唤醒。

CyclicBarrier实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.test.thread;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
  public static void main(String[] args) {
    int totalThread = 5;
    CyclicBarrier barrier = new CyclicBarrier(totalThread);
   
    for(int i = 0; i < totalThread; i++) {
      String threadName = "Thread " + i;
      new Thread(() -> {
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, " is waiting"));
        try {
          barrier.await();
        } catch (Exception ex) {
          ex.printStackTrace();
        }
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
      }).start();
    }
  }
}
执行结果如下

1
2
3
4
5
6
7
8
9
10
Sun Jun 19 21:04:49 CST 2016  Thread 1  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 0  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 3  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 2  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 4  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 4 ended
Sun Jun 19 21:04:49 CST 2016  Thread 0 ended
Sun Jun 19 21:04:49 CST 2016  Thread 2 ended
Sun Jun 19 21:04:49 CST 2016  Thread 1 ended
Sun Jun 19 21:04:49 CST 2016  Thread 3 ended
从执行结果可以看到,每个线程都不会在其它所有线程执行await()方法前继续执行,而等所有线程都执行await()方法后所有线程的等待都被唤醒从而继续执行。

CyclicBarrier主要接口分析
CyclicBarrier提供的关键方法如下

await() 等待其它参与方的到来(调用await())。如果当前调用是最后一个调用,则唤醒所有其它的线程的等待并且如果在构造CyclicBarrier时指定了action,当前线程会去执行该action,然后该方法返回该线程调用await的次序(getParties()-1说明该线程是第一个调用await的,0说明该线程是最后一个执行await的),接着该线程继续执行await后的代码;如果该调用不是最后一个调用,则阻塞等待;如果等待过程中,当前线程被中断,则抛出InterruptedException;如果等待过程中,其它等待的线程被中断,或者其它线程等待超时,或者该barrier被reset,或者当前线程在执行barrier构造时注册的action时因为抛出异常而失败,则抛出BrokenBarrierException。
await(long timeout, TimeUnit unit) 与await()唯一的不同点在于设置了等待超时时间,等待超时时会抛出TimeoutException。
reset() 该方法会将该barrier重置为它的初始状态,并使得所有对该barrier的await调用抛出BrokenBarrierException。
Phaser
Phaser适用场景
CountDownLatch和CyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能与CountDownLatch和CyclicBarrier有部分重叠,同时也提供了更丰富的语义和更灵活的用法。

Phaser顾名思义,与阶段相关。Phaser比较适合这样一种场景,一种任务可以分为多个阶段,现希望多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个CyclicBarrier来实现,每个CyclicBarrier负责等待一个阶段的任务全部完成。但是使用CyclicBarrier的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而Phaser可同时解决这两个问题。

Phaser实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class PhaserDemo {
  public static void main(String[] args) throws IOException {
    int parties = 3;
    int phases = 4;
    final Phaser phaser = new Phaser(parties) {
      @Override 
      protected boolean onAdvance(int phase, int registeredParties) { 
          System.out.println("====== Phase : " + phase + " ======"); 
          return registeredParties == 0; 
      } 
    };
   
    for(int i = 0; i < parties; i++) {
      int threadId = i;
      Thread thread = new Thread(() -> {
        for(int phase = 0; phase < phases; phase++) {
          System.out.println(String.format("Thread %s, phase %s", threadId, phase));
          phaser.arriveAndAwaitAdvance();
        }
      });
      thread.start();
    }
  }
}
执行结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread 0, phase 0
Thread 1, phase 0
Thread 2, phase 0
====== Phase : 0 ======
Thread 2, phase 1
Thread 0, phase 1
Thread 1, phase 1
====== Phase : 1 ======
Thread 1, phase 2
Thread 2, phase 2
Thread 0, phase 2
====== Phase : 2 ======
Thread 0, phase 3
Thread 1, phase 3
Thread 2, phase 3
====== Phase : 3 ======
从上面的结果可以看到,多个线程必须等到其它线程的同一阶段的任务全部完成才能进行到下一个阶段,并且每当完成某一阶段任务时,Phaser都会执行其onAdvance方法。

Phaser主要接口分析
Phaser主要接口如下

arriveAndAwaitAdvance() 当前线程当前阶段执行完毕,等待其它线程完成当前阶段。如果当前线程是该阶段最后一个未到达的,则该方法直接返回下一个阶段的序号(阶段序号从0开始),同时其它线程的该方法也返回下一个阶段的序号。
arriveAndDeregister() 该方法立即返回下一阶段的序号,并且其它线程需要等待的个数减一,并且把当前线程从之后需要等待的成员中移除。如果该Phaser是另外一个Phaser的子Phaser(层次化Phaser会在后文中讲到),并且该操作导致当前Phaser的成员数为0,则该操作也会将当前Phaser从其父Phaser中移除。
arrive() 该方法不作任何等待,直接返回下一阶段的序号。
awaitAdvance(int phase) 该方法等待某一阶段执行完毕。如果当前阶段不等于指定的阶段或者该Phaser已经被终止,则立即返回。该阶段数一般由arrive()方法或者arriveAndDeregister()方法返回。返回下一阶段的序号,或者返回参数指定的值(如果该参数为负数),或者直接返回当前阶段序号(如果当前Phaser已经被终止)。
awaitAdvanceInterruptibly(int phase) 效果与awaitAdvance(int phase)相当,唯一的不同在于若该线程在该方法等待时被中断,则该方法抛出InterruptedException。
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果与awaitAdvanceInterruptibly(int phase)相当,区别在于如果超时则抛出TimeoutException。
bulkRegister(int parties) 注册多个party。如果当前phaser已经被终止,则该方法无效,并返回负数。如果调用该方法时,onAdvance方法正在执行,则该方法等待其执行完毕。如果该Phaser有父Phaser则指定的party数大于0,且之前该Phaser的party数为0,那么该Phaser会被注册到其父Phaser中。
forceTermination() 强制让该Phaser进入终止状态。已经注册的party数不受影响。如果该Phaser有子Phaser,则其所有的子Phaser均进入终止状态。如果该Phaser已经处于终止状态,该方法调用不造成任何影响。
分享到:
评论

相关推荐

    java多线程进阶

    3. **线程通信**:Java的`wait()`, `notify()`, `notifyAll()`方法用于线程间的通信,以及`BlockingQueue`等并发容器在通信中的应用,也是书中重点内容。此外,可能会探讨`join()`, `Thread.yield()`等控制线程行为...

    Java进阶路线

    - **wait, notify, sleep**:这些方法用于线程间的通信。 - **Lock**:更灵活的锁机制。 - **ThreadLocal**:提供线程本地变量。 - **CopyOnWriteArrayList**:线程安全的列表实现。 - **BlockingQueue**:线程安全...

    Java进阶学习网络服务器编程.rar

    总之,Java进阶学习网络服务器编程涵盖了广泛的知识点,包括基础的网络原理、Java的网络API、多线程、NIO、Web服务、RESTful设计、安全性和测试方法。通过深入学习和实践,你可以掌握构建高性能、高可用性的网络...

    java进阶开发,高级版web项目 基于dubbo实现分布式微服务架构

    Java进阶开发与高级Web项目的实施常常涉及到对技术栈的深度理解和应用,特别是当我们要构建一个基于Dubbo的分布式微服务架构时。Dubbo是阿里巴巴开源的一款高性能、轻量级的服务治理框架,它主要应用于服务化时代的...

    圣思园张龙java开发进阶视频网盘.txt

    - **线程间通信**:`wait()`、`notify()`、`join()`方法以及`CountDownLatch`、`CyclicBarrier`、`Semaphore`等工具类的使用。 - **死锁问题**:原因分析及解决方案。 - **线程池**:`ThreadPoolExecutor`的配置参数...

    Java线程入门

    #### 五、线程间通信与同步 - **同步机制**:为了保证多线程程序的正确性和稳定性,需要采取适当的同步措施,如使用`synchronized`关键字、`ReentrantLock`等工具类来防止数据竞争条件。 - **线程间的通信**:线程...

    Java语言程序设计-进阶篇(原书第8版)

    《Java语言程序设计-进阶篇》是Java编程学习的重要参考资料,尤其对于已经掌握Java基础知识的开发者来说,这本书是进一步提升技能的关键。该书的第八版涵盖了Java编程的诸多高级主题,旨在帮助读者深入理解Java的...

    多线程终极案例程序(多线程进阶)

    在多线程环境中,线程间通信和同步是非常重要的,Java提供了多种机制,如`synchronized`关键字、`wait()`、`notify()`和`notifyAll()`方法,以及`Lock`和`Condition`接口,用于控制线程的执行顺序和资源访问。...

    java线程.pdf

    6. **线程通信**:线程间通信主要包括等待/通知机制(`wait()`、`notify()`、`notifyAll()`),这些方法通常用于解决生产者-消费者问题等经典并发问题。 7. **线程池**:通过使用线程池(`ExecutorService`接口)可以...

    java语言程序设计提高篇+进阶篇第十版

    《Java语言程序设计提高篇+进阶篇第十版》是一本深入探讨Java编程技术的权威著作,适合已经掌握Java基础的开发者进一步提升自己的技能。这本书的第十版充分反映了Java语言的最新发展,包括Java 8及更高版本的重要...

    Java常见笔试、面试题目深度剖析第二、三讲下载地址

    - **线程间通信**:`wait()`、`notify()`方法等。 #### 4. 泛型 - **泛型类**:可以在定义类时指定一个类型参数,提高代码的复用性和安全性。 - **泛型方法**:在方法级别上使用泛型,增强方法的功能性和通用性。 -...

    由浅入深学Java—基础、进阶与必做260题

    9. 多线程和并发:涉及Thread类和Runnable接口的使用,同步机制和线程间的通信。 10. Java网络编程基础:包括Socket通信、URL访问等基础知识。 进阶部分则可能涵盖: 1. 设计模式:介绍常用的设计模式,如单例...

    thinking in java 第四版 源码

    《Thinking in Java》是Bruce Eckel的经典之作,第四版更是被广大Java开发者视为学习和进阶的重要参考书籍。这本书深入浅出地介绍了Java语言的核心概念和技术,包括面向对象编程、集合框架、多线程、网络编程、GUI...

    《Java语言程序设计(进阶篇)》课后习题第19章代码chapter19.rar

    此外,还会介绍线程同步机制,如`synchronized`关键字,`wait()`, `notify()`, `notifyAll()`方法,以及`Lock`接口和`ReentrantLock`类的应用,用于防止线程间的竞态条件和死锁。 2. **网络编程**:Java提供了丰富...

    Android高手进阶指南

    1. **Activity与Intent**:深入解析Activity生命周期和Intent机制,如何合理管理Activity栈,以及如何通过Intent进行组件间通信,包括隐式Intent和显式Intent的使用。 2. **Service**:详细讲解Service的启动、绑定...

    Java面试常见问题从基础到进阶

    其他热门话题如Docker和Kubernetes用于容器化和集群管理,Redis作为缓存系统,消息队列如RabbitMQ、Kafka用于异步通信,分布式追踪系统Zipkin、Jaeger用于监控服务间的调用,Java在大数据处理中的应用,以及Lambda...

    JAVA并发编程实战.pdf

    Java并发编程是构建高性能应用的基础,涉及到的知识点非常广泛,包括但不限于线程模型、线程同步、线程间通信以及Java内存模型等方面。掌握这些基础知识,能够帮助开发者更好地理解和解决实际开发中遇到的并发问题,...

    JavaDemo_java_云课堂_

    6. **多线程**:理解线程的概念,创建和管理线程,同步和互斥的概念,以及线程间的通信。 7. **反射机制**:通过反射动态获取类的信息,实例化对象,调用方法,修改字段值等。 8. **网络编程**:利用Socket进行...

    java语言十大经典案例

    `wait()`, `notify()` 和 `notifyAll()` 方法用于线程间的通信。另外,Java 5引入的`ExecutorService`和`Future`接口提供了更高级的线程池管理机制。 3. **网络编程**: Java的`java.net`包提供了网络编程的支持。...

Global site tag (gtag.js) - Google Analytics