`
DavidIsOK
  • 浏览: 75585 次
社区版块
存档分类
最新评论

java线程(四):阻塞队列(BlockingQueue)

    博客分类:
  • java
阅读更多

 

1. 阻塞队列定义

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

当没有空间可以用或者空间已满时下面各个方法的处理方式

方法\处理方式

抛出异常

返回布尔值

一直阻塞

超时退出

插入方法

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除方法

remove()

poll()

take()

poll(time,unit)

获取头部方法

element()

peek()

   

 

2. Java里的阻塞队列

JDK7提供了6个阻塞队列。分别是ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueDelayQueueSynchronousQueueLinkedTransferQueue,下面我们一一介绍他们的特点。

 

2.1 ArrayBlockingQueue

ArrayBlockingQueue实现了数组支持的有界阻塞队列,实现了FIFO(先进先出)排序机制。队列大小固定,之后不允许在增加数组容量,重载的构造函数允许指定公平策略来排序等待的线程。如果为真,所有阻塞的插入和删除操作都按照FIFO顺序处理,默认为假,这样可能导致等待线程顺序不公平,而且可能导致部分饿死(就是一直获取不到资源)的问题,但同时也带来了提高吞吐量的好处。

 

2.2 LinkedBlockingQueue

LinkedBlockingQueue 此队列按先进先出原则对元素排序,而且默认最大容量是最大整数值Integer.MAX_VALUE

 

2.3 PriorityBlockingQueue

    PriorityBlockingQueue一个无界阻塞队列,根据自然顺序决定队列中的元素优先级。它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。

 

2.4 DelayQueue

DelayQueue是一种特殊的优先级队列,按照每个元素的延迟时间(也就是在元素可以从队列中移除的剩余时间)进行排序。可想而知,队头肯定是离到期时间最短的元素了。如果都没到期,那就没有队头,poll是会返回null的。当然peek()查询队头元素和size()查询元素总数还是可以的。

 

2.5 SynchronousQueue

   SynchronousQueue就跟我们之前的消费者和生产者类似了。此队列没有任何容量,它定义了每一个插入操作都必须等待对应的移除操作,反之亦然。

 

2.6 LinkedTransferQueuejava7):

   LinkedTransferQueue是个非常有用的类,java7中新加入的阻塞队列,我们会有一个专门的例子来展示它。也是是TransferQueue接口的实现类,而TransferQueueBlockingQueue的扩展。很多开源项目里面都用到了这类。这个队列的容量是有限的,有以下方法


A. transfer(E e)
若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,直到到有消费者线程取走该元素。(这里有点类似SynchronousQueue队列,下面会说为什么),此时要注意,在队列为空时,只要有>=1个消费者的话,就立即转交,否则如果队列不为空它会等消费者消费完了队列的元素后再移交这个元素。(其实我有点不清楚这跟进去队列有什么区别吗?我后期会去研究研究,到时贴在这里。也希望有牛人能在下面解答一下,我将在这里注明作者)

 

B. tryTransfer(E e)若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻移交对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作,与上面的方法就区别开了

 

C. tryTransfer(E e, long timeout, TimeUnit unit)若当前存在一个正在等待获取的消费者线程,会立即移交给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false同时该元素被移除。

 

D. hasWaitingConsumer()判断是否存在消费者线程。

 

E. getWaitingConsumerCount()获取所有等待获取元素的消费线程数量。

 

F. size()返回队列的元素个数,因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改,一般不用。

PS:其实transfer方法在SynchronousQueue的实现中就已存在了,只是没有做为API暴露出来。SynchronousQueue有一个特性:它本身不存在容量,只能进行线程之间的

元素传送。SynchronousQueue在执行offer操作时,如果没有其他线程执行poll,则直接返回false.线程之间元素传送正是通过transfer方法完成的。

 

 

3.关于阻塞队列的两个例子

前一个是普通阻塞队列,后一个是LinkedTransferQueue

3.1 :模拟一个股票交易系统

package com.hxw.Threads;
 
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
 
public class StockExchange {
   /**
    * 模拟一个股票交易系统,100卖方,100个买方不停执行买卖操作
    */
   public static void main(String[] args) {
      System.out.println("按下回车键停止买卖!!!!!!!!!!");
      BlockingDeque<Integer> orderQueue=new LinkedBlockingDeque<Integer>();   //构建阻塞订单队列
      Seller seller=new Seller(orderQueue);
      Thread[] sellerThread=new Thread[10];
      for (int i = 0; i < sellerThread.length; i++) {   //加入100个卖方线程
          sellerThread[i]=new Thread(seller);
          sellerThread[i].start();
      }
     
      Buyer buyer=new Buyer(orderQueue);
      Thread[] buyerThread=new Thread[10];
      for (int i = 0; i < buyerThread.length; i++) { //加入100个买方线程
          sellerThread[i]=new Thread(buyer);
          sellerThread[i].start();
      }
     
      try {
         while(System.in.read()!='\n')  //当按下回车键时
         System.out.println("所有买卖方全部禁止交易!");
         for(Thread t: sellerThread){   //让卖方线程停止
            t.interrupt();
         }
         for(Thread t: buyerThread){    //让买方线程停止
            t.interrupt();
         }
      } catch (IOException e) {
         e.printStackTrace();
      }
   }
 
}
 
class Seller implements Runnable{
 
   private BlockingDeque<Integer> orderQueue;
   private boolean shutdownRequest=false;  //设定一个关闭标志
   public Seller(BlockingDeque<Integer> orderQueue) {
      this.orderQueue=orderQueue;
   }
   @Override
   public void run() {
      while(shutdownRequest==false){ //正常情况下无限循环抛售
         Integer quantity=(int)(Math.random()*100);
         try {
            orderQueue.put(quantity);
            System.out.println("卖方 "+Thread.currentThread().getName()+"抛售了股票 "+quantity+"股");
         } catch (InterruptedException e) {
            shutdownRequest=true; //抛出异常后停止
            e.printStackTrace();
         }
      }
   }
  
}
 
class Buyer implements Runnable{
 
   private BlockingDeque<Integer> orderQueue;
   private boolean shutdownRequest=false;  //同上
   public Buyer(BlockingDeque<Integer> orderQueue) {
      this.orderQueue=orderQueue;
   }
   @Override
   public void run() {
      while(shutdownRequest==false){
         try {
            Integer quantity=orderQueue.take();
            System.out.println("买方"+Thread.currentThread().getName()+"购买了股票 "+quantity+"股");
         } catch (InterruptedException e) {
            shutdownRequest=true;
            e.printStackTrace();
         }
      }
   }
  
}

 【运行结果】:

卖方 Thread-3抛售了股票 60股

买方Thread-18购买了股票 91股

卖方 Thread-5抛售了股票 32股

卖方 Thread-4抛售了股票 36股

卖方 Thread-0抛售了股票 70股

买方Thread-13购买了股票 42股

卖方 Thread-7抛售了股票 79股

买方Thread-14购买了股票 40股

买方Thread-11购买了股票 24股

卖方 Thread-2抛售了股票 73股

买方Thread-10购买了股票 32股

.....

 

 

3.2 模拟一个为顾客创建幸运数字的机器,顾客来了机器才创建,没有顾客不会创建幸运数字

package com.hxw.Threads;
 
import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
 
public class LuckyNumberGenerate {
 
   /**
    * LinkedTransferQueue 示例,模拟一个为顾客创建幸运数字的机器,顾客来了机器才创建,没有顾客不会创建幸运数字
    */ public static void main(String[] args) {
      TransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();
      Thread producer = new Thread(new NumberProducer(queue));
      producer.setDaemon(true);
      producer.start();
      for (int i = 0; i < 10; i++) {
         Thread consumer = new Thread(new LuckyComsumer(queue));
         consumer.setDaemon(true);
         consumer.start();
         try {
            Thread.sleep(1794);   //平均两秒来一个顾客
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
 
}
 
class NumberProducer implements Runnable {
   private TransferQueue<Integer> queue = null;
 
   NumberProducer(TransferQueue<Integer> queue) {
      this.queue = queue;
   }
 
   @Override
   public void run() {
      try {
         while (true) {
            if (queue.hasWaitingConsumer()) {
                System.out.println("顾客请稍等,正在创建幸运数字....");
                TimeUnit.SECONDS.sleep(1);  //创建一个幸运数字需要一秒
                queue.transfer(produce());
            }
           
         }
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
 
   }
   private Integer produce() {
      return (new Random().nextInt(100));
   }
 
}
 
class LuckyComsumer implements Runnable {
   private TransferQueue<Integer> queue = null;
 
   public LuckyComsumer(TransferQueue<Integer> queue) {
      this.queue = queue;
      System.out.println("有客来了!");
   }
 
   @Override
   public void run() {
      try {
         System.out.println("客户"+ Thread.currentThread().getName() + "获得幸运数字,幸运数字为"+queue.take()+"\n");
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
 
   }
 
}

   【运行结果】:

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-1获得幸运数字,幸运数字为31

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-2获得幸运数字,幸运数字为73

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-3获得幸运数字,幸运数字为31

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-4获得幸运数字,幸运数字为82

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-5获得幸运数字,幸运数字为81

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-6获得幸运数字,幸运数字为54

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-7获得幸运数字,幸运数字为87

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-8获得幸运数字,幸运数字为69

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-9获得幸运数字,幸运数字为61

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-10获得幸运数字,幸运数字为25

 

2
1
分享到:
评论

相关推荐

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...

    java中线程队列BlockingQueue的用法

    在Java编程中,`BlockingQueue`(阻塞队列)是一种重要的并发工具,它结合了队列的数据结构和线程同步机制。`BlockingQueue`接口位于`java.util.concurrent`包中,提供了线程安全的数据结构,可以用于实现生产者-...

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    ### 10、阻塞队列BlockingQueue 实战及其原理分析 #### 一、阻塞队列概述 阻塞队列(BlockingQueue)是Java语言中`java.util.concurrent`包下提供的一种重要的线程安全队列。它继承自`Queue`接口,并在此基础上...

    BlockingQueue(阻塞队列)详解

    ### BlockingQueue(阻塞队列)详解 #### 一、前言 随着现代软件系统对并发性能需求的不断提高,多线程编程技术逐渐成为开发人员不可或缺的技能之一。在Java平台中,`java.util.concurrent`包提供了丰富的工具来...

    支持多线程和泛型的阻塞队列

    阻塞队列是一种在多线程编程中广泛使用的并发数据结构,它在计算机科学和编程领域,特别是Java和C++等面向对象语言中扮演着重要角色。标题中的“支持多线程和泛型的阻塞队列”意味着我们讨论的是一个能够同时处理多...

    java 中 阻塞队列BlockingQueue详解及实例

    Java中的阻塞队列BlockingQueue是一种并发编程中常用的工具,它实现了线程间的同步和通信。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;当队列满时,尝试添加元素的...

    线程----BlockingQueue

    `BlockingQueue`是一种特殊类型的队列,主要用于多线程环境中的任务管理。它具有以下特性:当队列为空时,从队列中获取元素的操作会被阻塞;同样地,当队列满时,向队列中添加元素的操作也会被阻塞。这种特性使得`...

    java线程聊天室(阻塞队列实现)

    【Java线程聊天室(阻塞队列实现)】 在Java编程中,多线程是构建并发应用程序的关键技术。在创建一个线程聊天室时,我们通常会涉及到多个线程之间的交互,例如用户发送消息、接收消息以及处理网络通信等。而阻塞...

    java模拟阻塞队列

    Java中的阻塞队列是一种基于同步原语的高级数据结构,它在多线程编程中扮演着重要角色,尤其在并发处理和优化系统资源利用率方面。阻塞队列结合了队列的数据结构与线程同步机制,使得生产者可以在队列满时被阻塞,而...

    java多线程加队列上传文件_后台处理

    2. **阻塞队列**:利用`BlockingQueue`接口实现的队列,如`ArrayBlockingQueue`、`LinkedBlockingQueue`等,可以确保当队列满时,新添加的任务会阻塞等待,直到有空闲空间为止;同样,当队列为空时,从队列中取出...

    BlockingQueue队列自定义超时时间取消线程池任务

    首先,`BlockingQueue`是一个并发容器,它遵循先进先出(FIFO)原则,具有阻塞性质,当队列满时,生产者线程会被阻塞,直到有消费者取走元素;当队列空时,消费者线程会被阻塞,直到生产者放入新的元素。常用实现如`...

    java 多线程 队列工厂

    `java.util.concurrent.BlockingQueue`是Java并发库中的一个重要接口,它扩展了`Queue`接口,增加了线程安全的阻塞操作。阻塞队列在多线程环境中特别有用,因为它允许线程在队列满时等待,直到有空间可用;同样,当...

    Java-concurrent-collections-concurrenthashmap-blockingqueue.pdf

    * 阻塞机制:BlockingQueue 提供了阻塞机制,当队列为空时,消费者线程会被阻塞,直到生产者线程添加数据。 Java 并发集合的应用 Java 并发集合可以应用于多种场景,例如: * 多线程编程:Java 并发集合可以用于...

    Java实现简单的阻塞队列2种方式

    在Java编程中,阻塞队列是一种特殊类型的并发数据结构,它在多线程环境中的应用广泛,主要用于线程间的协作通信。阻塞队列在队列满时会阻止生产者线程添加元素,在队列空时会阻止消费者线程取出元素,直到条件满足...

    阻塞队列阻塞队列阻塞队列

    在Java编程语言中,阻塞队列是一种线程安全的数据结构,它在多线程并发控制中发挥着重要作用。阻塞队列的核心特性是当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;同样,当队列满时,试图插入...

    java多线程程序设计:Java NIO+多线程实现聊天室

    阻塞队列BlockingQueue,生产者消费者模式 Selector Channel ByteBuffer ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子变量 内置锁 CompletionService log4j+slf4j日志 实现的...

    Java多线程(五)之BlockingQueue深入分析

    Java多线程中,BlockingQueue是一种特殊的队列,它可以为线程同步提供有力的保障。在Java多线程(五)之BlockingQueue深入分析中,我们将深入分析BlockingQueue的定义、常用方法、注意点、实现类等。 一、概述 ...

    Java多线程编程总结

    #### 十六、Java线程:新特征-阻塞队列 - `BlockingQueue` 是一种特殊的队列,当队列为空或满时,插入或移除元素的操作将被阻塞。 #### 十七、Java线程:新特征-阻塞栈 - `BlockingStack` 提供了阻塞栈的功能,与...

Global site tag (gtag.js) - Google Analytics