- 浏览: 985126 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
来看put操作:
//offer操作
这里有几点要关注:
1.
2.
3.
先看第一点
1.
这一步,首先释放锁,获取扩容锁,成功则,重新扩展队列容量,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
2.
3.
这个以上一步没有什么太大的区别,唯一区别是,使用的自定义的比较器
小节:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。
add操作:
超时offer操作:
从上面来看无论是add,put,还是超时offer操作,都是委托给offer操作。
再来看take操作:
这里我们有一点要关注的是
从上面可以看出,take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。
poll操作
poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。
再看超时poll,
超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。
再看peek操作:
peek操作获取锁,返回队头元素。
再来看remove操作:
有两点要看:
1.
2.
从上可以看出,remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
再看contains
这个不需要多讲了吧。
有了上面的分析,下面的drainTo和clear应该很容易理解了吧。
drainTo操作:
clear操作:
序列化:
反序列:
序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。
总结:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。无论是add,put,还是超时offer操作,都是委托给offer操作。
take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。
poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。
peek操作获取锁,返回队头元素。
remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} that uses * the same ordering rules as class {@link PriorityQueue} and supplies * blocking retrieval operations. While this queue is logically * unbounded, attempted additions may fail due to resource exhaustion * (causing {@code OutOfMemoryError}). This class does not permit * {@code null} elements. A priority queue relying on {@linkplain * Comparable natural ordering} also does not permit insertion of * non-comparable objects (doing so results in * {@code ClassCastException}). * PriorityBlockingQueue是一个与PriorityQueue具有相同排序策略的无界阻塞队列, 提供阻塞检索操作。队列虽说是无界的,但当内存资源耗尽时,尝试添加元素,则 将会失败。队列不允许null元素的存在。优先级队列插入的元素依据元素的Comparable, 不允许插入一个不可比较的元素。 * <p>This class and its iterator implement all of the * [i]optional[/i] methods of the {@link Collection} and {@link * Iterator} interfaces. The Iterator provided in method {@link * #iterator()} is [i]not[/i] guaranteed to traverse the elements of * the PriorityBlockingQueue in any particular order. If you need * ordered traversal, consider using * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} * can be used to [i]remove[/i] some or all elements in priority * order and place them in another collection. 队列实现了集合接口和迭代器的所有方法。迭代器的#iterator方法不能保证, 不能以特殊的顺序traverse(移动)元素。如果需要以特殊的方式traverse(移动)元素,则 可以使用Arrays.sort(pq.toArray()方法。drainTo用于将一些元素,或所有元素以优先级的 顺序移动到另一个集合中。 * * <p>Operations on this class make no guarantees about the ordering * of elements with equal priority. If you need to enforce an * ordering, you can define custom classes or comparators that use a * secondary key to break ties in primary priority values. For * example, here is a class that applies first-in-first-out * tie-breaking to comparable elements. To use it, you would insert a * {@code new FIFOEntry(anEntry)} instead of a plain entry object. * 队列的所有操作不能保证按照元素优先级的顺序。如果需要重新定义一个以原始优先级作为key的 比较器,保证顺序。比如原始为FIFO队列,你可以用 FIFOEntry代替原始的Entry * <pre> {@code * class FIFOEntry<E extends Comparable<? super E>> * implements Comparable<FIFOEntry<E>> { * static final AtomicLong seq = new AtomicLong(0); * final long seqNum; * final E entry; * public FIFOEntry(E entry) { * seqNum = seq.getAndIncrement(); * this.entry = entry; * } * public E getEntry() { return entry; } * public int compareTo(FIFOEntry<E> other) { * int res = entry.compareTo(other.entry); * if (res == 0 && other.entry != this.entry) * res = (seqNum < other.seqNum ? -1 : 1); * return res; * } * }}</pre> * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <E> the type of elements held in this collection */ public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = 5595510919245408276L; /* * The implementation uses an array-based binary heap, with public * operations protected with a single lock. However, allocation * during resizing uses a simple spinlock (used only while not * holding main lock) in order to allow takes to operate * concurrently with allocation. This avoids repeated * postponement of waiting consumers and consequent element * build-up. The need to back away from lock during allocation * makes it impossible to simply wrap delegated * java.util.PriorityQueue operations within a lock, as was done * in a previous version of this class. To maintain * interoperability, a plain PriorityQueue is still used during * serialization, which maintains compatibility at the espense of * transiently doubling overhead. 用一个二进制的数组堆来实现,用一个lock来保护public方法操作。 然而,为take操作与扩容操作的并发,我们用一个自旋锁来控制空间分配。 这可以避免消费者的重复等待和元素的包装。 The need to back away from lock during allocation makes it impossible to simply wrap delegated java.util.PriorityQueue operations within a lock, as was done in a previous version of this class. 为了保证互操作性,在序列化的时候用了一个空白的PriorityQueue, which maintains compatibility at the espense of transiently doubling overhead. */ /** * Default array capacity. */ 默认容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * The maximum size of array to allocate. * Some VMs reserve some header words in an array. * Attempts to allocate larger arrays may result in * OutOfMemoryError: Requested array size exceeds VM limit */ //最大队列容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d. The element with the * lowest value is in queue[0], assuming the queue is nonempty. 优先级队列,表示一个平衡二叉树堆: queue[n]的两个字节点为queue[2*n+1] and queue[2*(n+1)]。优先级队列以元素 的比较作为排序依据,如果比较器为null,则以元素的自然属性排序。每个堆元素n的 子孙d满足n <= d。如果队列为非空,最低优先级元素放在queue[0]。 */ private transient Object[] queue; /** * The number of elements in the priority queue.队列元素数量 */ private transient int size; /** * The comparator, or null if priority queue uses elements' * natural ordering. 比较器,为空,则按自然属性 */ private transient Comparator<? super E> comparator; /** * Lock used for all public operations,public操作保护lock */ private final ReentrantLock lock; /** * Condition for blocking when empty,队列非空条件 */ private final Condition notEmpty; /** * Spinlock for allocation, acquired via CAS,通过CAS获取分配的自旋锁 */ private transient volatile int allocationSpinLock; /** * A plain PriorityQueue used only for serialization, * to maintain compatibility with previous versions * of this class. Non-null only during serialization/deserialization. 空有限级队列用于序列化 */ private PriorityQueue q; /** * Creates a {@code PriorityBlockingQueue} with the default * initial capacity (11) that orders its elements according to * their {@linkplain Comparable natural ordering}. */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * Creates a {@code PriorityBlockingQueue} with the specified * initial capacity that orders its elements according to their * {@linkplain Comparable natural ordering}. * * @param initialCapacity the initial capacity for this priority queue * @throws IllegalArgumentException if {@code initialCapacity} is less * than 1 */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * Creates a {@code PriorityBlockingQueue} with the specified initial * capacity that orders its elements according to the specified * comparator. * 待比较器和容量参数的构造 * @param initialCapacity the initial capacity for this priority queue * @param comparator the comparator that will be used to order this * priority queue. If {@code null}, the {@linkplain Comparable * natural ordering} of the elements will be used. * @throws IllegalArgumentException if {@code initialCapacity} is less * than 1 */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
来看put操作:
/** * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never block. * * @param e the element to add * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */ public void put(E e) { //委托给put操作 offer(e); // never need to block }
//offer操作
/** * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never return {@code false}. * * @param e the element to add * @return {@code true} (as specified by {@link Queue#offer}) * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) //如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。 tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //比较,确定元素存储的位置 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); //容量自增 size = n + 1; //唤醒等待take的线程 notEmpty.signal(); } finally { lock.unlock(); } return true; }
这里有几点要关注:
1.
while ((n = size) >= (cap = (array = queue).length)) //如果队列已满 tryGrow(array, cap);
2.
Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array);
3.
else siftUpUsingComparator(n, e, array, cmp);
先看第一点
1.
while ((n = size) >= (cap = (array = queue).length)) //如果队列已满 tryGrow(array, cap);
/** * Tries to grow array to accommodate at least one more element * (but normally expand by about 50%), giving up (allowing retry) * on contention (which we expect to be rare). Call only while * holding lock. 尝试调整至少一个元素,释放锁。在需要的时候再持有锁 * @param array the heap array * @param oldCap the length of the array */ private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock,先释放锁,需要时,在重新获取 Object[] newArray = null; //获取扩容锁,成功则,重新扩展队列容量,扩容后则重新获取锁,将原始队列拷贝的新的队列中。 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //当容量小于64时,则增长容量为原来的2倍,大于64则每次增长两个。 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow //如果添加元素后,容量移除,或大于MAX_ARRAY_SIZE,则抛出OutOfMemoryError int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating //如果分配失败,则暂定当前线程 Thread.yield(); //自旋后,重新获取锁 lock.lock(); if (newArray != null && queue == array) { queue = newArray; //将原始队列拷贝的新的队列中 System.arraycopy(array, 0, newArray, 0, oldCap); } }
这一步,首先释放锁,获取扩容锁,成功则,重新扩展队列容量,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
2.
Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array);
/** * Inserts item x at position k, maintaining heap invariant by * promoting x up the tree until it is greater than or equal to * its parent, or is the root. * * To simplify and speed up coercions and comparisons. the * Comparable and Comparator versions are separated into different * methods that are otherwise identical. (Similarly for siftDown.) * These methods are static, with heap state as arguments, to * simplify use in light of possible comparator exceptions. * * @param k the position to fill * @param x the item to insert * @param array the heap array * @param n heap size */ private static <T> void siftUpComparable(int k, T x, Object[] array) { //获取元素的比较器 Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { //数组k叶节点的父节点为(k - 1) >>> 1,无符号右移,左补0 int parent = (k - 1) >>> 1; Object e = array[parent]; //比较,确定元素存储的位置 if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
3.
else siftUpUsingComparator(n, e, array, cmp);
这个以上一步没有什么太大的区别,唯一区别是,使用的自定义的比较器
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
小节:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。
add操作:
/** * Inserts the specified element into this priority queue. * * @param e the element to add * @return {@code true} (as specified by {@link Collection#add}) * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); }
超时offer操作:
** * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never block or * return {@code false}. * * @param e the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return {@code true} (as specified by * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); // never need to block }
从上面来看无论是add,put,还是超时offer操作,都是委托给offer操作。
再来看take操作:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //以可中断方式获取锁 lock.lockInterruptibly(); E result; try { while ( (result = extract()) == null) //如果队列为空,则等待非空条件notEmpty notEmpty.await(); } finally { lock.unlock(); } return result; }
这里我们有一点要关注的是
while ( (result = extract()) == null) //如果队列为空,则等待非空条件notEmpty notEmpty.await();
/** * Mechanics for poll(). Call only while holding lock. */ private E extract() { E result; int n = size - 1; if (n < 0) //队列为空 result = null; else { Object[] array = queue; 从队列头获取元素 result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; //从队列头去除元素,则重新调整平衡二叉树 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; } return result; } //调整平衡二叉树 /** * Inserts item x at position k, maintaining heap invariant by * demoting x down the tree repeatedly until it is less than or * equal to its children or is a leaf. * * @param k the position to fill * @param x the item to insert * @param array the heap array * @param n heap size */ private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } //使用比较器调整平衡二叉树 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; }
从上面可以看出,take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。
poll操作
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); E result; try { result = extract(); } finally { lock.unlock(); } return result; }
poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。
再看超时poll,
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = extract()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; }
超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。
再看peek操作:
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); E result; try { result = size > 0 ? (E) queue[0] : null; } finally { lock.unlock(); } return result; }
peek操作获取锁,返回队头元素。
再来看remove操作:
/** * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element {@code e} such * that {@code o.equals(e)}, if this queue contains one or more such * elements. Returns {@code true} if and only if this queue contained * the specified element (or equivalently, if this queue changed as a * result of the call). * * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { boolean removed = false; final ReentrantLock lock = this.lock; lock.lock(); try { //定位数据在队列中的索引 int i = indexOf(o); if (i != -1) { //如果存在对应的元素,则移除 removeAt(i); removed = true; } } finally { lock.unlock(); } return removed; }
有两点要看:
1.
//定位数据在队列中的索引 int i = indexOf(o);
private int indexOf(Object o) { if (o != null) { Object[] array = queue; int n = size; for (int i = 0; i < n; i++) if (o.equals(array[i])) return i; } return -1; }
2.
if (i != -1) { //如果存在对应的元素,则移除 removeAt(i); removed = true; }
/** * Removes the ith element from queue. */ private void removeAt(int i) { Object[] array = queue; int n = size - 1; //移除元素 if (n == i) // removed last element array[i] = null; else { E moved = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; //左旋 if (cmp == null) siftDownComparable(i, moved, array, n); else siftDownUsingComparator(i, moved, array, n, cmp); //右旋 if (array[i] == moved) { if (cmp == null) siftUpComparable(i, moved, array); else siftUpUsingComparator(i, moved, array, cmp); } } size = n; }
从上可以看出,remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
再看contains
public boolean contains(Object o) { int index; final ReentrantLock lock = this.lock; lock.lock(); try { index = indexOf(o); } finally { lock.unlock(); } return index != -1; }
这个不需要多讲了吧。
有了上面的分析,下面的drainTo和clear应该很容易理解了吧。
drainTo操作:
/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; E e; while ( (e = extract()) != null) { c.add(e); ++n; } return n; } finally { lock.unlock(); } } /** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; E e; while (n < maxElements && (e = extract()) != null) { c.add(e); ++n; } return n; } finally { lock.unlock(); } }
clear操作:
/** * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] array = queue; int n = size; size = 0; for (int i = 0; i < n; i++) array[i] = null; } finally { lock.unlock(); } }
序列化:
/** * Saves the state to a stream (that is, serializes it). For * compatibility with previous version of this class, * elements are first copied to a java.util.PriorityQueue, * which is then serialized. */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { lock.lock(); try { int n = size; // avoid zero capacity argument q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator); q.addAll(this); s.defaultWriteObject(); } finally { q = null; lock.unlock(); } }
反序列:
/** * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream * (that is, deserializes it). * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { try { s.defaultReadObject(); this.queue = new Object[q.size()]; comparator = q.comparator(); addAll(q); } finally { q = null; } }
序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。
总结:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。无论是add,put,还是超时offer操作,都是委托给offer操作。
take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。
poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。
peek操作获取锁,返回队头元素。
remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。
评论
5 楼
zhanggang807
2017-04-01
嗯。我也想明白了,是这个意思
4 楼
Donald_Draper
2017-03-31
你说的是offer操作吧,这个不是自旋,感谢提醒,从allocationSpinLock(Spinlock for allocation, acquired via CAS.)的定义来看,字面上的意思为是否允许分配的自旋锁,并且为volatile,可以简单理解为锁。当队列满,需要扩容,因为队列可能有多个线程同时访问,没有allocationSpinLock锁控制,那么,将会有两个线程同时扩容队列,这个肯定不是我们希望看到的。allocationSpinLock可以理解为锁,获取锁,则可以扩容队列,否则不可。希望可以帮到您。
3 楼
Donald_Draper
2017-03-31
您说的是哪个地方?
2 楼
zhanggang807
2017-03-31
为什么要自旋呢?
1 楼
tairan_0729
2017-03-20
朋友,你写的这些文章能否做成PDF,方便离线拜读,非常感谢
发表评论
-
Executors解析
2017-04-07 14:38 1256ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4459ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2128ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4992Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9113Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6088Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3042Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20523Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1511Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1081Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1596Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1069Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1330package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1199/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1164Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1678package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2031线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 926Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1738Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2141Queue接口定义:http://donald-draper. ...
相关推荐
- PriorityBlockingQueue:具有优先级的无界阻塞队列。 - DelayQueue:用于延迟执行元素的无界阻塞队列。 - LinkedTransferQueue:基于链表的无界阻塞队列。 - LinkedBlockingDeque:基于链表的双端阻塞队列。 使用...
在Java中,自Java 5.0起,`java.util.concurrent`包提供了多种阻塞队列的实现,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等。这些类都实现了`java.util.concurrent.BlockingQueue`...
Java阻塞队列实现原理及实例解析 Java阻塞队列是一种特殊的队列,它能够在队列为空或满时阻塞线程,使得线程之间能够更好地协作和通信。阻塞队列的实现原理是基于锁机制和条件变量机制的,通过wait和notify方法来...
4. **非线程安全**:PriorityQueue不是线程安全的,如果在多线程环境中使用,应考虑使用PriorityBlockingQueue,它是线程安全的实现。 PriorityQueue的构造过程主要包含两个步骤: 1. **复制数据**:如果传入一个...
3. `PriorityBlockingQueue`:与`LinkedBlockingQueue`类似,但元素按照优先级排序,可以自定义比较器或者使用对象的自然排序。 4. `SynchronousQueue`:并非真正的队列,更像是一个传递点,每个插入操作必须等待另...
本文将深入解析线程池的管理源码,帮助读者理解其工作原理和优化策略。 在Java中,`java.util.concurrent`包下的`ThreadPoolExecutor`类是线程池的核心实现。它提供了丰富的参数来定制线程池的行为,包括核心线程数...
下面我们将深入探讨这个问题,并结合安卓环境进行详细解析。 1. **消费者-生产者问题的背景**: 消费者-生产者问题是由Dijkstra提出的,用于模拟生产者(Producer)与消费者(Consumer)如何在共享资源池中协作。...
### 最全的线程池资料解析 #### 一、线程池的优点与工作原理 线程池技术在Java多线程编程中占有极其重要的地位,它不仅能够提高系统的响应速度和资源利用率,还能简化程序设计。下面将详细介绍线程池的优势及其...
本文将深入解析Java线程池的工作原理,并给出创建简单实例的步骤。 线程池的核心在于`java.util.concurrent`包中的`ExecutorService`接口,它是执行任务的中心接口。`ExecutorService`扩展了`Executor`,提供了更...
20. **八种阻塞队列**:`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等,每种队列都有不同的特点和适用场景。 #### 四、Spring框架 1. **BeanFactory和FactoryBean**:`BeanFactory`是...
│ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...
│ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...
1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM .........................
1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM ........................