`
Donald_Draper
  • 浏览: 985126 次
社区版块
存档分类
最新评论

PriorityBlockingQueue解析

    博客分类:
  • JUC
阅读更多
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
package java.util.concurrent;

import java.util.concurrent.locks.*;
import java.util.*;

/**
 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
 * the same ordering rules as class {@link PriorityQueue} and supplies
 * blocking retrieval operations.  While this queue is logically
 * unbounded, attempted additions may fail due to resource exhaustion
 * (causing {@code OutOfMemoryError}). This class does not permit
 * {@code null} elements.  A priority queue relying on {@linkplain
 * Comparable natural ordering} also does not permit insertion of
 * non-comparable objects (doing so results in
 * {@code ClassCastException}).
 *
 PriorityBlockingQueue是一个与PriorityQueue具有相同排序策略的无界阻塞队列,
 提供阻塞检索操作。队列虽说是无界的,但当内存资源耗尽时,尝试添加元素,则
 将会失败。队列不允许null元素的存在。优先级队列插入的元素依据元素的Comparable,
 不允许插入一个不可比较的元素。
 * <p>This class and its iterator implement all of the
 * [i]optional[/i] methods of the {@link Collection} and {@link
 * Iterator} interfaces.  The Iterator provided in method {@link
 * #iterator()} is [i]not[/i] guaranteed to traverse the elements of
 * the PriorityBlockingQueue in any particular order. If you need
 * ordered traversal, consider using
 * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
 * can be used to [i]remove[/i] some or all elements in priority
 * order and place them in another collection.

 队列实现了集合接口和迭代器的所有方法。迭代器的#iterator方法不能保证,
 不能以特殊的顺序traverse(移动)元素。如果需要以特殊的方式traverse(移动)元素,则
 可以使用Arrays.sort(pq.toArray()方法。drainTo用于将一些元素,或所有元素以优先级的
 顺序移动到另一个集合中。
 *
 * <p>Operations on this class make no guarantees about the ordering
 * of elements with equal priority. If you need to enforce an
 * ordering, you can define custom classes or comparators that use a
 * secondary key to break ties in primary priority values.  For
 * example, here is a class that applies first-in-first-out
 * tie-breaking to comparable elements. To use it, you would insert a
 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
 *
 队列的所有操作不能保证按照元素优先级的顺序。如果需要重新定义一个以原始优先级作为key的
 比较器,保证顺序。比如原始为FIFO队列,你可以用 FIFOEntry代替原始的Entry
 *  <pre> {@code
 * class FIFOEntry<E extends Comparable<? super E>>
 *     implements Comparable<FIFOEntry<E>> {
 *   static final AtomicLong seq = new AtomicLong(0);
 *   final long seqNum;
 *   final E entry;
 *   public FIFOEntry(E entry) {
 *     seqNum = seq.getAndIncrement();
 *     this.entry = entry;
 *   }
 *   public E getEntry() { return entry; }
 *   public int compareTo(FIFOEntry<E> other) {
 *     int res = entry.compareTo(other.entry);
 *     if (res == 0 && other.entry != this.entry)
 *       res = (seqNum < other.seqNum ? -1 : 1);
 *     return res;
 *   }
 * }}</pre>
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <E> the type of elements held in this collection
 */
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = 5595510919245408276L;
     /*
     * The implementation uses an array-based binary heap, with public
     * operations protected with a single lock. However, allocation
     * during resizing uses a simple spinlock (used only while not
     * holding main lock) in order to allow takes to operate
     * concurrently with allocation.  This avoids repeated
     * postponement of waiting consumers and consequent element
     * build-up. The need to back away from lock during allocation
     * makes it impossible to simply wrap delegated
     * java.util.PriorityQueue operations within a lock, as was done
     * in a previous version of this class. To maintain
     * interoperability, a plain PriorityQueue is still used during
     * serialization, which maintains compatibility at the espense of
     * transiently doubling overhead.
     用一个二进制的数组堆来实现,用一个lock来保护public方法操作。
     然而,为take操作与扩容操作的并发,我们用一个自旋锁来控制空间分配。
     这可以避免消费者的重复等待和元素的包装。
     The need to back away from lock during allocation
      makes it impossible to simply wrap delegated
     java.util.PriorityQueue operations within a lock, as was done
     in a previous version of this class.
     为了保证互操作性,在序列化的时候用了一个空白的PriorityQueue,
     which maintains compatibility at the espense of
     transiently doubling overhead.
     */

    /**
     * Default array capacity.
     */
    默认容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
     //最大队列容量
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     优先级队列,表示一个平衡二叉树堆: 
     queue[n]的两个字节点为queue[2*n+1] and queue[2*(n+1)]。优先级队列以元素
     的比较作为排序依据,如果比较器为null,则以元素的自然属性排序。每个堆元素n的
     子孙d满足n <= d。如果队列为非空,最低优先级元素放在queue[0]。
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.队列元素数量
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     比较器,为空,则按自然属性
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations,public操作保护lock
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty,队列非空条件
     */
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS,通过CAS获取分配的自旋锁
     */
    private transient volatile int allocationSpinLock;

    /**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     空有限级队列用于序列化
     */
    private PriorityQueue q;
    /**
     * Creates a {@code PriorityBlockingQueue} with the default
     * initial capacity (11) that orders its elements according to
     * their {@linkplain Comparable natural ordering}.
     */
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
     /**
     * Creates a {@code PriorityBlockingQueue} with the specified
     * initial capacity that orders its elements according to their
     * {@linkplain Comparable natural ordering}.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
    
    /**
     * Creates a {@code PriorityBlockingQueue} with the specified initial
     * capacity that orders its elements according to the specified
     * comparator.
     *
     待比较器和容量参数的构造
     * @param initialCapacity the initial capacity for this priority queue
     * @param  comparator the comparator that will be used to order this
     *         priority queue.  If {@code null}, the {@linkplain Comparable
     *         natural ordering} of the elements will be used.
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
     public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

来看put操作:
 
 /**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never block.
     *
     * @param e the element to add
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public void put(E e) {
        //委托给put操作
        offer(e); // never need to block
    }

//offer操作
 
/**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
	    //如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
	    //比较,确定元素存储的位置
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
	    //容量自增
            size = n + 1;
	    //唤醒等待take的线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

这里有几点要关注:
1.
 while ((n = size) >= (cap = (array = queue).length))
	 //如果队列已满
          tryGrow(array, cap);

2.
 Comparator<? super E> cmp = comparator;
 if (cmp == null)
    siftUpComparable(n, e, array);

3.
 else
    siftUpUsingComparator(n, e, array, cmp);

先看第一点
1.
while ((n = size) >= (cap = (array = queue).length))
	 //如果队列已满
          tryGrow(array, cap);

/**
     * Tries to grow array to accommodate at least one more element
     * (but normally expand by about 50%), giving up (allowing retry)
     * on contention (which we expect to be rare). Call only while
     * holding lock.
     尝试调整至少一个元素,释放锁。在需要的时候再持有锁
     * @param array the heap array
     * @param oldCap the length of the array
     */
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock,先释放锁,需要时,在重新获取
        Object[] newArray = null;
	//获取扩容锁,成功则,重新扩展队列容量,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
	        //当容量小于64时,则增长容量为原来的2倍,大于64则每次增长两个。
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
		    //如果添加元素后,容量移除,或大于MAX_ARRAY_SIZE,则抛出OutOfMemoryError
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
	    //如果分配失败,则暂定当前线程
            Thread.yield();
	//自旋后,重新获取锁
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
	    //将原始队列拷贝的新的队列中
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

这一步,首先释放锁,获取扩容锁,成功则,重新扩展队列容量,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
2.
Comparator<? super E> cmp = comparator;
 if (cmp == null)
    siftUpComparable(n, e, array);

 /**
     * Inserts item x at position k, maintaining heap invariant by
     * promoting x up the tree until it is greater than or equal to
     * its parent, or is the root.
     *
     * To simplify and speed up coercions and comparisons. the
     * Comparable and Comparator versions are separated into different
     * methods that are otherwise identical. (Similarly for siftDown.)
     * These methods are static, with heap state as arguments, to
     * simplify use in light of possible comparator exceptions.
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        //获取元素的比较器
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
	    //数组k叶节点的父节点为(k - 1) >>> 1,无符号右移,左补0
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
	    //比较,确定元素存储的位置
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

3.
 else
    siftUpUsingComparator(n, e, array, cmp);

这个以上一步没有什么太大的区别,唯一区别是,使用的自定义的比较器
   
  private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

小节:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。
add操作:
 /**
     * Inserts the specified element into this priority queue.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return offer(e);
    }

超时offer操作:
**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never block or
     * return {@code false}.
     *
     * @param e the element to add
     * @param timeout This parameter is ignored as the method never blocks
     * @param unit This parameter is ignored as the method never blocks
     * @return {@code true} (as specified by
     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }


从上面来看无论是add,put,还是超时offer操作,都是委托给offer操作。
再来看take操作:
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
	//以可中断方式获取锁
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = extract()) == null)
	        //如果队列为空,则等待非空条件notEmpty
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }


这里我们有一点要关注的是
while ( (result = extract()) == null)
	//如果队列为空,则等待非空条件notEmpty
        notEmpty.await();

 /**
     * Mechanics for poll().  Call only while holding lock.
     */
    private E extract() {
        E result;
        int n = size - 1;
        if (n < 0)
	    //队列为空
            result = null;
        else {
            Object[] array = queue;
	    从队列头获取元素
            result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
	    //从队列头去除元素,则重新调整平衡二叉树
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
        }
        return result;
    }
//调整平衡二叉树
 /**
     * Inserts item x at position k, maintaining heap invariant by
     * demoting x down the tree repeatedly until it is less than or
     * equal to its children or is a leaf.
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
//使用比较器调整平衡二叉树
    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator<? super T> cmp) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }

从上面可以看出,take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。

poll操作
 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        E result;
        try {
            result = extract();
        } finally {
            lock.unlock();
        }
        return result;
    }


poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。

再看超时poll,
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = extract()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }

超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。

再看peek操作:
   public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        E result;
        try {
            result = size > 0 ? (E) queue[0] : null;
        } finally {
            lock.unlock();
        }
        return result;
    }

peek操作获取锁,返回队头元素。
再来看remove操作:
/**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.  Returns {@code true} if and only if this queue contained
     * the specified element (or equivalently, if this queue changed as a
     * result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        boolean removed = false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
	    //定位数据在队列中的索引
            int i = indexOf(o);
            if (i != -1) {
	        //如果存在对应的元素,则移除
                removeAt(i);
                removed = true;
            }
        } finally {
            lock.unlock();
        }
        return removed;
    }


有两点要看:
1.
 //定位数据在队列中的索引
      int i = indexOf(o);


 private int indexOf(Object o) {
        if (o != null) {
            Object[] array = queue;
            int n = size;
            for (int i = 0; i < n; i++)
                if (o.equals(array[i]))
                    return i;
        }
        return -1;
    }

2.
 if (i != -1) {
    //如果存在对应的元素,则移除
           removeAt(i);
           removed = true;
       }


 /**
     * Removes the ith element from queue.
     */
    private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
	//移除元素
        if (n == i) // removed last element
            array[i] = null;
        else {
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
	    //左旋
            if (cmp == null)
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            //右旋
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        size = n;
    }

从上可以看出,remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
再看contains
public boolean contains(Object o) {
        int index;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            index = indexOf(o);
        } finally {
            lock.unlock();
        }
        return index != -1;
    }

这个不需要多讲了吧。
有了上面的分析,下面的drainTo和clear应该很容易理解了吧。
drainTo操作:
/**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            E e;
            while ( (e = extract()) != null) {
                c.add(e);
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            E e;
            while (n < maxElements && (e = extract()) != null) {
                c.add(e);
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

clear操作:
  
 /**
     * Atomically removes all of the elements from this queue.
     * The queue will be empty after this call returns.
     */
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] array = queue;
            int n = size;
            size = 0;
            for (int i = 0; i < n; i++)
                array[i] = null;
        } finally {
            lock.unlock();
        }
    }


序列化:
 /**
     * Saves the state to a stream (that is, serializes it).  For
     * compatibility with previous version of this class,
     * elements are first copied to a java.util.PriorityQueue,
     * which is then serialized.
     */
    private void writeObject(java.io.ObjectOutputStream s)
        throws java.io.IOException {
        lock.lock();
        try {
            int n = size; // avoid zero capacity argument
            q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
            q.addAll(this);
            s.defaultWriteObject();
        } finally {
            q = null;
            lock.unlock();
        }
    }

反序列:
  
 /**
     * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
     * (that is, deserializes it).
     *
     * @param s the stream
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        try {
            s.defaultReadObject();
            this.queue = new Object[q.size()];
            comparator = q.comparator();
            addAll(q);
        } finally {
            q = null;
        }
    }

序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。
总结:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。无论是add,put,还是超时offer操作,都是委托给offer操作。
take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。
poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。
peek操作获取锁,返回队头元素。
remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。



0
0
分享到:
评论
5 楼 zhanggang807 2017-04-01  
嗯。我也想明白了,是这个意思
4 楼 Donald_Draper 2017-03-31  
你说的是offer操作吧,这个不是自旋,感谢提醒,从allocationSpinLock(Spinlock for allocation, acquired via CAS.)的定义来看,字面上的意思为是否允许分配的自旋锁,并且为volatile,可以简单理解为锁。当队列满,需要扩容,因为队列可能有多个线程同时访问,没有allocationSpinLock锁控制,那么,将会有两个线程同时扩容队列,这个肯定不是我们希望看到的。allocationSpinLock可以理解为锁,获取锁,则可以扩容队列,否则不可。希望可以帮到您。
3 楼 Donald_Draper 2017-03-31  
您说的是哪个地方?
2 楼 zhanggang807 2017-03-31  
为什么要自旋呢?
1 楼 tairan_0729 2017-03-20  
朋友,你写的这些文章能否做成PDF,方便离线拜读,非常感谢

相关推荐

    面试必备:Java线程池解析.pdf

    - PriorityBlockingQueue:具有优先级的无界阻塞队列。 - DelayQueue:用于延迟执行元素的无界阻塞队列。 - LinkedTransferQueue:基于链表的无界阻塞队列。 - LinkedBlockingDeque:基于链表的双端阻塞队列。 使用...

    java阻塞队列实现原理及实例解析.docx

    在Java中,自Java 5.0起,`java.util.concurrent`包提供了多种阻塞队列的实现,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等。这些类都实现了`java.util.concurrent.BlockingQueue`...

    java阻塞队列实现原理及实例解析

    Java阻塞队列实现原理及实例解析 Java阻塞队列是一种特殊的队列,它能够在队列为空或满时阻塞线程,使得线程之间能够更好地协作和通信。阻塞队列的实现原理是基于锁机制和条件变量机制的,通过wait和notify方法来...

    解析Java中PriorityQueue优先级队列结构的源码及用法

    4. **非线程安全**:PriorityQueue不是线程安全的,如果在多线程环境中使用,应考虑使用PriorityBlockingQueue,它是线程安全的实现。 PriorityQueue的构造过程主要包含两个步骤: 1. **复制数据**:如果传入一个...

    Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

    3. `PriorityBlockingQueue`:与`LinkedBlockingQueue`类似,但元素按照优先级排序,可以自定义比较器或者使用对象的自然排序。 4. `SynchronousQueue`:并非真正的队列,更像是一个传递点,每个插入操作必须等待另...

    线程池管理源码 java 源码

    本文将深入解析线程池的管理源码,帮助读者理解其工作原理和优化策略。 在Java中,`java.util.concurrent`包下的`ThreadPoolExecutor`类是线程池的核心实现。它提供了丰富的参数来定制线程池的行为,包括核心线程数...

    操作系统-消费者-生产者安卓实现

    下面我们将深入探讨这个问题,并结合安卓环境进行详细解析。 1. **消费者-生产者问题的背景**: 消费者-生产者问题是由Dijkstra提出的,用于模拟生产者(Producer)与消费者(Consumer)如何在共享资源池中协作。...

    最全的线程池的资料,里面包含了各种图片简单易懂

    ### 最全的线程池资料解析 #### 一、线程池的优点与工作原理 线程池技术在Java多线程编程中占有极其重要的地位,它不仅能够提高系统的响应速度和资源利用率,还能简化程序设计。下面将详细介绍线程池的优势及其...

    Java 线程池详解及创建简单实例

    本文将深入解析Java线程池的工作原理,并给出创建简单实例的步骤。 线程池的核心在于`java.util.concurrent`包中的`ExecutorService`接口,它是执行任务的中心接口。`ExecutorService`扩展了`Executor`,提供了更...

    java程序员面试大纲错过了金三银四你还要错过2018吗.docx

    20. **八种阻塞队列**:`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等,每种队列都有不同的特点和适用场景。 #### 四、Spring框架 1. **BeanFactory和FactoryBean**:`BeanFactory`是...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...

    java核心知识点整理.pdf

    1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM .........................

    JAVA核心知识点整理(有效)

    1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM ........................

Global site tag (gtag.js) - Google Analytics