`
uule
  • 浏览: 6348804 次
  • 性别: Icon_minigender_1
  • 来自: 一片神奇的土地
社区版块
存档分类
最新评论

ArrayBlockingQueue

 
阅读更多

 ArrayBlockingQueue

     

public interface BlockingQueue<E> extends Queue<E> {  
     
    /**
	* 添加元素
	*/
    boolean add(E e);       //将指定的元素添加到此队列中(如果立即可行),在成功时返回 true,其他情况则抛出 IllegalStateException。     
          
    void put(E e) throws InterruptedException;   //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要)  
         
    boolean offer(E e);    //将指定元素插入此队列中  
      
	  	  
    /**
	* 删除元素
	*/   
    
    E take() throws InterruptedException;   //检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待  
         
    E poll(long timeout, TimeUnit unit)  
        throws InterruptedException;    //检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)  
                
    boolean remove(Object o);       //从队列中删除指定元素  
		
	E peek();
}  

 

Java代码  收藏代码
  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     private final E[] items;  
  5.       
  6.     /** items index for next take, poll or remove */  
  7.     private int takeIndex;  
  8.       
  9.     /** items index for next put, offer, or add. */  
  10.     private int putIndex;  
  11.   
  12.     private int count;  
  13.       
  14.     /*ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象, 
  15.     * 由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue 
  16.     */  
  17.     private final ReentrantLock lock;  
  18.   
  19.     private final Condition notEmpty;  
  20.   
  21.     private final Condition notFull;  
  22.       
  23.     ...   
  24. }  

 

 /**
 *  构造函数
 */
 public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }


    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
			
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

 

Java代码  收藏代码
  1. final int inc(int i) {  
  2.     return (++i == items.length)? 0 : i;  
  3. }     
  4.   
  5. //塞数据、取数据内部主要引用这两个方法  
  6.   
  7. //塞数据:  
  8. private void insert(E x) {  
  9.     items[putIndex] = x;  
  10.     putIndex = inc(putIndex);  
  11.     ++count;  
  12.     notEmpty.signal();  
  13. }  
  14.   
  15.       
  16. //取数据:  
  17. private E extract() {  
  18.     final E[] items = this.items;  
  19.     E x = items[takeIndex];  
  20.     items[takeIndex] = null;  
  21.     takeIndex = inc(takeIndex);  
  22.     --count;  
  23.     notFull.signal();  
  24.     return x;  
  25. }  

 put()/take() 可打断,并且会等待

 

 

put()/take() 可打断,并且会等待

/**
* 放数据
*/

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
		
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }
	
	
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

/**
* 取数据
*/

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
	
	
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }	
	
 public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : items[takeIndex];
        } finally {
            lock.unlock();
        }
    }		
	
public boolean remove(Object o) {
        if (o == null) return false;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = 0;
            for (;;) {
                if (k++ >= count)
                    return false;
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                i = inc(i);
            }

        } finally {
            lock.unlock();
        }
    }

 

从 extract()方法里可见,tabkeIndex维护一个可以提取/移除元素的索引位置,因为takeIndex是从0递增的,所以这个类是FIFO队列

putIndex维护一个可以插入的元素的位置索引。

count显然是维护队列中已经存在的元素总数。

 

ArrayBlockingQueue是JAVA5中的一个阻塞队列,能够自定义队列大小,当插入时,如果队列已经没有空闲位置,那么新的插入线程将阻塞到该队列,一旦该队列有空闲位置,那么阻塞的线程将执行插入。从队列中取数据为:take,放数据为:put

 

 

     基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

 

  ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

分享到:
评论

相关推荐

    ArrayBlockingQueue源码分析.docx

    `ArrayBlockingQueue` 是 Java 中实现并发编程时常用的一个线程安全的数据结构,它是一个有界的阻塞队列。在 `java.util.concurrent` 包下,`ArrayBlockingQueue` 继承自 `java.util.concurrent.BlockingQueue` 接口...

    ArrayBlockingQueue源码解析-动力节点共

    ArrayBlockingQueue是Java并发编程中一个重要的数据结构,它是Java并发包`java.util.concurrent`中的一个线程安全的阻塞队列。这个队列基于数组实现,提供了在生产者和消费者之间进行数据交换的能力,同时确保了线程...

    并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解

    "并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解" ArrayBlockingQueue和LinkedBlockingQueue是Java并发容器中两个常用的阻塞队列实现,分别基于数组和链表存储元素。它们都继承自AbstractQueue类...

    26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue.pdf

    —BlockingQueue和ArrayBlockingQueue.pdf” 【描述】:此文档是关于Java并发编程的学习资料,以漫画形式讲解,聚焦于Java并发编程中的核心概念——BlockingQueue接口及其具体实现ArrayBlockingQueue。 【标签】:...

    详细分析Java并发集合ArrayBlockingQueue的用法

    Java并发集合ArrayBlockingQueue的用法详解 Java并发集合ArrayBlockingQueue是Java并发集合框架下的一个重要组件,它提供了阻塞队列的实现,用于多线程环境下的并发操作。下面是对ArrayBlockingQueue的用法详解: ...

    java并发之ArrayBlockingQueue详细介绍

    Java并发之ArrayBlockingQueue详细介绍 ArrayBlockingQueue是Java并发编程中常用的线程安全队列,经常被用作任务队列在线程池中。它是基于数组实现的循环队列,具有线程安全的实现。 ArrayBlockingQueue的实现 ...

    Java源码解析阻塞队列ArrayBlockingQueue常用方法

    Java中的ArrayBlockingQueue是一个高效的并发数据结构,它是Java并发包`java.util.concurrent`下的一个阻塞队列。本文将深入解析ArrayBlockingQueue的常用方法及其内部实现机制。 ArrayBlockingQueue的核心是一个...

    Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理

    ArrayBlockingQueue是Java并发编程中一个重要的集合类,它属于 BlockingQueue 接口的一个实现,主要特点是线程安全、有界以及基于数组的阻塞队列。线程安全得益于其内部使用了Java并发包中的ReentrantLock(可重入锁...

    java中LinkedBlockingQueue与ArrayBlockingQueue的异同

    Java中的`LinkedBlockingQueue`和`ArrayBlockingQueue`都是`java.util.concurrent`包下的线程安全队列,它们都实现了`BlockingQueue`接口,提供了一种高效、线程安全的数据同步方式。这两种队列在很多方面都有相似之...

    Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java源码解析阻塞队列ArrayBlockingQueue介绍 Java源码解析阻塞队列ArrayBlockingQueue介绍是Java中的一种阻塞队列实现,使用ReentrantLock和Condition来实现同步和阻塞机制。本文将对ArrayBlockingQueue的源码进行...

    Java源码解析阻塞队列ArrayBlockingQueue功能简介

    Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...

    Java可阻塞队列-ArrayBlockingQueue

    在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的Condition实现的,但是JAVA类中提供了这项功能,是ArrayBlockingQueue,  ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先...

    spring-blockingqueue:用Spring Boot阻止队列

    通常,我们会选择实现BlockingQueue的类,如ArrayBlockingQueue、LinkedBlockingQueue或PriorityBlockingQueue,根据实际需求选择合适的实现。 例如,我们可以创建一个配置类,如下所示: ```java @Configuration ...

    阻塞队列阻塞队列阻塞队列

    ArrayBlockingQueue支持公平和非公平两种策略,公平策略意味着等待最久的线程优先获得资源,而非公平策略则不保证这一点。在高并发场景下,非公平策略通常能提供更好的性能。 2. LinkedBlockingQueue ...

    生产者消费者问题java

    ArrayBlockingQueue&lt;Integer&gt; queue = new ArrayBlockingQueue(10); Thread producer = new Thread(new Producer(queue)); Thread consumer = new Thread(new Consumer(queue)); producer.start(); consumer....

    JAVA并发编程深度学习-无锁并行计算框架1

    本节我们将深入探讨Disruptor框架以及与ArrayBlockingQueue的对比。 ArrayBlockingQueue是Java并发库中的一个阻塞队列实现,基于循环数组,提供了线程安全的入队和出队操作。在上述代码中,我们创建了一个固定大小...

Global site tag (gtag.js) - Google Analytics