`
talentkep
  • 浏览: 100484 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

java concurrent 探秘(2)

    博客分类:
 
阅读更多
java concurrent 探秘(2)

BlockingQueue
支持两个附加操作的 Queue,这两个操作是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。

BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。

BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 额外的元素。
没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。
然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁定或其他形式的并发控制来自动达到它们的目的。
然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。
因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。
这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。

下面的例子演示了这个阻塞队列的基本功能。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class MyBlockingQueue extends Thread {
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

private int index;

public MyBlockingQueue(int i) {
   this.index = i;
}

public void run() {
   try {
    queue.put(String.valueOf(this.index));
    System.out.println("{" + this.index + "} in queue!");
   } catch (Exception e) {
    e.printStackTrace();
   }
}

public static void main(String args[]) {
   ExecutorService service = Executors.newCachedThreadPool();
   for (int i = 0; i < 10; i++) {
    service.submit(new MyBlockingQueue(i));
   }
   Thread thread = new Thread() {
    public void run() {
     try {
      while (true) {
       Thread.sleep((int) (Math.random() * 1000));
       if(MyBlockingQueue.queue.isEmpty())
        break;
       String str = MyBlockingQueue.queue.take();
       System.out.println(str + " has take!");
      }
     } catch (Exception e) {
      e.printStackTrace();
     }
    }
   };
   service.submit(thread);
   service.shutdown();
}
}
---------------------执行结果-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!

-----------------------------------------


CompletionService

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,
并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,
然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,
CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。


import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MyCompletionService implements Callable<String> {
private int id;

public MyCompletionService(int i){
   this.id=i;
}
public static void main(String[] args) throws Exception{
   ExecutorService service=Executors.newCachedThreadPool();
   CompletionService<String> completion=new ExecutorCompletionService<String>(service);
   for(int i=0;i<10;i++){
    completion.submit(new MyCompletionService(i));
   }
   for(int i=0;i<10;i++){
    System.out.println(completion.take().get());
   }
   service.shutdown();
}
public String call() throws Exception {
   Integer time=(int)(Math.random()*1000);
   try{
    System.out.println(this.id+" start");
    Thread.sleep(time);
    System.out.println(this.id+" end");
   }
   catch(Exception e){
    e.printStackTrace();
   }
   return this.id+":"+time;
}
}


CountDownLatch


一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。
之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,
或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。
用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,
而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。
一下的例子是别人写的,非常形象。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
   // 开始的倒数锁
   final CountDownLatch begin = new CountDownLatch(1);
   // 结束的倒数锁
   final CountDownLatch end = new CountDownLatch(10);
   // 十名选手
   final ExecutorService exec = Executors.newFixedThreadPool(10);
  
   for (int index = 0; index < 10; index++) {
    final int NO = index + 1;
    Runnable run = new Runnable() {
     public void run() {
      try {
       begin.await();//一直阻塞
       Thread.sleep((long) (Math.random() * 10000));
       System.out.println("No." + NO + " arrived");
      } catch (InterruptedException e) {
      } finally {
       end.countDown();
      }
     }
    };
    exec.submit(run);
   }
   System.out.println("Game Start");
   begin.countDown();
   end.await();
   System.out.println("Game Over");
   exec.shutdown();
}
}
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。


CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。
在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),
该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

示例用法:下面是一个在并行分解设计中使用 barrier 的例子,很经典的旅行团例子:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {
// 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
private static int[] timeWalk = { 5, 8, 15, 15, 10 };
// 自驾游
private static int[] timeSelf = { 1, 3, 4, 4, 5 };
// 旅游大巴
private static int[] timeBus = { 2, 4, 6, 6, 7 };

static String now() {
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    return sdf.format(new Date()) + ": ";
}

static class Tour implements Runnable {
    private int[] times;
    private CyclicBarrier barrier;
    private String tourName;
    public Tour(CyclicBarrier barrier, String tourName, int[] times) {
      this.times = times;
      this.tourName = tourName;
      this.barrier = barrier;
    }
    public void run() {
      try {
        Thread.sleep(times[0] * 1000);
        System.out.println(now() + tourName + " Reached Shenzhen");
        barrier.await();
        Thread.sleep(times[1] * 1000);
        System.out.println(now() + tourName + " Reached Guangzhou");
        barrier.await();
        Thread.sleep(times[2] * 1000);
        System.out.println(now() + tourName + " Reached Shaoguan");
        barrier.await();
        Thread.sleep(times[3] * 1000);
        System.out.println(now() + tourName + " Reached Changsha");
        barrier.await();
        Thread.sleep(times[4] * 1000);
        System.out.println(now() + tourName + " Reached Wuhan");
        barrier.await();
      } catch (InterruptedException e) {
      } catch (BrokenBarrierException e) {
      }
    }
}

public static void main(String[] args) {
    // 三个旅行团
    CyclicBarrier barrier = new CyclicBarrier(3);
    ExecutorService exec = Executors.newFixedThreadPool(3);
    exec.submit(new Tour(barrier, "WalkTour", timeWalk));
    exec.submit(new Tour(barrier, "SelfTour", timeSelf));
//当我们把下面的这段代码注释后,会发现,程序阻塞了,无法继续运行下去。
    exec.submit(new Tour(barrier, "BusTour", timeBus));
    exec.shutdown();
}
}

CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则就会等待。

Future

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
计算完成后只能使用 get 方法来检索结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。
还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。
如果为了可取消性而使用 Future但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为基础任务的结果。

这个我们在前面CompletionService已经看到了,这个Future的功能,而且这个可以在提交线程的时候被指定为一个返回对象的。


ScheduledExecutorService

一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令。

schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。

用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通过所请求的 0 延迟进行安排。
schedule 方法中允许出现 0 和负数延迟(但不是周期),并将这些视为一种立即执行的请求。

所有的 schedule 方法都接受相对 延迟和周期作为参数,而不是绝对的时间或日期。将以 Date 所表示的绝对时间转换成要求的形式很容易。
例如,要安排在某个以后的日期运行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
但是要注意,由于网络时间同步协议、时钟漂移或其他因素的存在,因此相对延迟的期满日期不必与启用任务的当前 Date 相符。
Executors 类为此包中所提供的 ScheduledExecutorService 实现提供了便捷的工厂方法。

一下的例子也是网上比较流行的。

import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;

public class TestScheduledThread {
public static void main(String[] args) {
   final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
   final Runnable beeper = new Runnable() {
    int count = 0;

    public void run() {
     System.out.println(new Date() + " beep " + (++count));
    }
   };
   // 1秒钟后运行,并每隔2秒运行一次
   final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
   // 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行
   final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
   // 30秒后结束关闭任务,并且关闭Scheduler
   scheduler.schedule(new Runnable() {
    public void run() {
     beeperHandle.cancel(true);
     beeperHandle2.cancel(true);
     scheduler.shutdown();
    }
   }, 30, SECONDS);
}
}

这样我们就把concurrent包下比较重要的功能都已经总结完了,希望对我们理解能有帮助。

分享到:
评论

相关推荐

    java concurrent 精简源码

    本资源“java concurrent 精简源码”着重关注Java并发库(java.util.concurrent)的核心概念,包括阻塞队列和线程管理。下面将详细阐述这些知识点。 1. **Java并发库(java.util.concurrent)** Java并发库是Java ...

    Java Concurrent in practice (animated)

    Java Concurrent in practice (animated)

    Java Concurrent Programming

    为了简化多线程编程,Java提供了一系列工具和API,如`java.util.Timer`和`java.util.concurrent`包,这些工具可以帮助开发者更高效地管理线程间的同步问题。 ##### 1.2 synchronized关键字 `synchronized`关键字是...

    java concurrent 包 详细解析

    2. **Executor框架**:`java.util.concurrent.Executor`是执行任务的核心接口,它定义了运行任务的方法。`ExecutorService`是Executor的一个子接口,提供了管理和控制执行器的额外功能,如`shutdown()`用于关闭执行...

    JAVA的CONCURRENT用法详解.pdf

    JAVA的CONCURRENT用法详解.pdf

    使用java concurrent调用xmlp api生成pdf

    这里我们关注的是如何使用`java.concurrent`包中的工具和XML Processing API(通常指的是JAXB或DOM4J等处理XML的库)来高效地生成PDF。下面将详细解释这个过程以及涉及的相关知识点。 首先,`java.concurrent`包是...

    java concurrent source code

    资深Java专家10年经验总结,全程案例式讲解,首本全面介绍Java多线程编程技术的专著 结合大量实例,全面讲解Java多线程编程中的并发访问、线程间通信、锁等最难突破的核心技术与应用实践 封底 Java多线程无处不在,...

    java concurrent包分类结构图

    java concurrent包分类结构图

    java并发工具包 java.util.concurrent中文版pdf

    ### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    JAVA Concurrent Programming

    2. Java 1.5+并发工具 Java 1.5引入了`java.util.concurrent`包,包含了一系列的并发工具类,如线程池、阻塞队列、并发集合等。这些工具旨在提高并发性能并简化编程模型。例如,`ExecutorService`和`...

    java concurrent program的实现

    EBS java concurrent program的实现

    java同步大杀器concurrent 包

    java同步大杀器concurrent 包java同步大杀器concurrent 包java同步大杀器concurrent 包java同步大杀器concurrent 包java同步大杀器concurrent 包java同步大杀器concurrent 包java同步大杀器concurrent 包java同步大...

    Java Concurrent处理并发需求

    ### Java Concurrent处理并发需求 #### 一、Java并发基础与Concurrent API介绍 在现代软件开发中,尤其是在服务器端应用中,对并发处理的需求日益增长。为了满足这种需求,Java平台提供了一系列强大的工具和API来...

    java concurrent in practive

    《Java并发编程实战》还会讨论`java.util.concurrent`包中的高级并发工具,如`ExecutorService`和`Future`,它们可以方便地管理和控制线程池,提高系统的并行处理能力。`CountDownLatch`、`CyclicBarrier`和`...

    java中的线程并发库-----javaconcurrent探秘

    我们都知道,在JDK1.5之前,Java中要进行业务并发时,通常需要有程序员独立完成代码实现,当然也有一些开源的框架提供了这些功能,但是这些依然没有JDK自带的功能使用起来方便。而当针对高质量Java多线程并发程序...

    java_util_concurrent_中文用户使用手册@微信公众号-架构探险之道.zip

    JUC使用指导手册 http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html 中文译文

    动画学习 java.util.concurrent并发工具包

    如何启动:以win7系统为例,最好jdk8 1.打开cmd,cd到jdk的path,本机是:cd C:\Java\jdk6\bin ...java -cp D:\javaConcurrentAnimated.jar vgrazi.concurrent.samples.launcher.ConcurrentExampleLauncher

    Concurrent Programming in Java

    2. **同步机制**:Java提供了多种同步工具,如`synchronized`关键字、`wait()`、`notify()`和`notifyAll()`方法,用于控制对共享资源的访问。书中详细解释了这些机制的工作原理和使用场景,以及死锁、活锁和饥饿等...

    使用Java并发编程Concurrent Programming Using Java

    Java平台提供了丰富的API支持并发编程,如`java.util.concurrent`包下的各种类和接口,这些工具可以帮助开发者更高效地管理多线程环境下的任务调度和数据共享问题。 ### Java并发编程基础 #### 1. 多线程基础 - **...

Global site tag (gtag.js) - Google Analytics