`
Donald_Draper
  • 浏览: 981487 次
社区版块
存档分类
最新评论

SynchronousQueue解析下-TransferQueue

    博客分类:
  • JUC
阅读更多
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析:http://donald-draper.iteye.com/blog/2364100
SynchronousQueue解析上-TransferStack:http://donald-draper.iteye.com/blog/2364622
由于篇幅问题,上一篇只讲到TransferStack,而没讲到TransferQueue和其余部分,试着看看这篇能不能
讲完,不能讲完的话,再分一篇讲。
先回顾一下上一篇:
SynchronousQueue阻塞队列,每次插入操作必须等待一个协同的移除线程,反之亦然。
SynchronousQueue同步队列没有容量,可以说,没有一个容量。由于队列中只有在消费线程,
尝试消费元素的时候,才会出现元素,所以不能进行peek操作;不能用任何方法,生产元素,除非有消费者在尝试消费元素,同时由于队列中没有元素,所以不能迭代。head是第一个生产线程尝试生产的元素;如果没有这样的生产线程,那么没有元素可利用,remove和poll操作将会返回null。SynchronousQueue实际一个空集合类。同时同步队列不允许为null。同步队列支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。如果一个同步队列构造为公平性,则可以线程以FIFO访问队列元素。当时非公平策略用的是
TransferStack,公平策略用的是TransferQueue;TransferStack和TransferQueue是存放等待操作线程的描述,从TransferStack中Snode节点可以看出:节点关联一个等待线程waiter,后继next,匹配节点match,节点元素item和模式mode;模式由三种,REQUEST节点表示消费者等待消费资源,DATA表示生产者等待生产资源。FULFILLING节点表示生产者正在给等待资源的消费者补给资源,或生产者在等待消费者消费资源。当有线程take/put操作时,查看栈头,如果是空队列,或栈头节点的模式与要放入的节点模式相同;如果是超时等待,判断时间是否小于0,小于0则取消节点等待;如果非超时,则将创建的新节点入栈成功,即放在栈头,自旋等待匹配节点(timed决定超时,不超时);如果匹配返回的是自己,节点取消等待,从栈中移除,并遍历栈移除取消等待的节点;匹配成功,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回节点元素(REQUEST)。如果与栈头节点的模式不同且不为FULFILLING,匹配节点,成功者,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回节点元素(REQUEST)。如果栈头为FULFILLING,找出栈头的匹配节点,栈头与匹配到的节点同时出栈。从分析非公平模式下的TransferStack,可以看出一个REQUEST操作必须同时伴随着一个DATA操作,一个DATA操作必须同时伴随着一个REQUEST操作,这也是同步队列的命名中含Synchronous原因。SynchronousQueue像一个管道,一个操作必须等待另一个操作的发生。

 /** Dual Queue */
    static final class TransferQueue extends Transferer {
        /*
         * This extends Scherer-Scott dual queue algorithm, differing,
         * among other ways, by using modes within nodes rather than
         * marked pointers. The algorithm is a little simpler than
         * that for stacks because fulfillers do not need explicit
         * nodes, and matching is done by CAS'ing QNode.item field
         * from non-null to null (for put) or vice versa (for take).
	 本算法实现拓展了Scherer-Scott双队列算法,不同的是用节点模式,
	 而不是标记指针来区分节点操作类型。这个算法比栈算法的实现简单,
	 因为fulfillers需要明确指定节点,同时匹配节点用CAS操作QNode的
	 元素field即可,put操作从非null到null,反则亦然,take从null到非null。
         */

        /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue 后继
            volatile Object item;         // CAS'ed to or from null 节点元素
            volatile Thread waiter;       // to control park/unpark 等待线程
            final boolean isData; //是否为DATA模式
            //设置元素和模式
            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }
            //设置节点的后继
            boolean casNext(QNode cmp, QNode val) {
                return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
	    //设置节点的元素
            boolean casItem(Object cmp, Object val) {
                return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }

            /**
             * Tries to cancel by CAS'ing ref to this as item.
	     取消节点等待,元素指向自己
             */
            void tryCancel(Object cmp) {
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
            }
            //是否取消等待
            boolean isCancelled() {
                return item == this;
            }

            /**
             * Returns true if this node is known to be off the queue
             * because its next pointer has been forgotten due to
             * an advanceHead operation.
	     是否出队列
             */
            boolean isOffList() {
                return next == this;
            }

            // Unsafe mechanics
            private static final sun.misc.Unsafe UNSAFE;
            private static final long itemOffset;
            private static final long nextOffset;

            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class k = QNode.class;
                    itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

        /** Head of queue  队列头节点*/
        transient volatile QNode head;
        /** Tail of queue 队列尾节点*/
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it cancelled.
	 刚入队列的节点,取消等待,但还没有出队列的节点,
         */
        transient volatile QNode cleanMe;

        TransferQueue() {
	    //构造队列
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

        /**
         * Tries to cas nh as new head; if successful, unlink
         * old head's next node to avoid garbage retention.
	 尝试设置新的队头节点为nh,并比较旧头节点,成功则,解除旧队列头节点的next链接,及指向自己
         */
        void advanceHead(QNode h, QNode nh) {
            if (h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
                h.next = h; // forget old next
        }

        /**
         * Tries to cas nt as new tail.
	 尝试设置队尾
         */
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
        }

        /**
         * Tries to CAS cleanMe slot.
	 尝试设置取消等待节点为val。并比较旧的等待节点是否为cmp
         */
        boolean casCleanMe(QNode cmp, QNode val) {
            return cleanMe == cmp &&
                UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
        }

        /**
         * Puts or takes an item.
	 生产或消费一个元素
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
	     基本算法是循环尝试,执行下面两个步中的,其中一个:
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
	     1.如果队列为空,或队列中为相同模式的节点,尝试节点入队列等待,
	     直到fulfilled,返回匹配元素,或者由于中断,超时取消等待。
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
	     2.如果队列中包含节点,transfer方法被一个协同模式的节点调用,
	     则尝试补给或填充等待线程节点的元素,并出队列,返回匹配元素。
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
	     在每一种情况,执行的过程中,检查和尝试帮助其他stalled/slow线程移动队列头和尾节点
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
	     循环开始,首先进行null检查,防止为初始队列头和尾节点。当然这种情况,
	     在当前同步队列中,不可能发生,如果调用持有transferer的non-volatile/final引用,
	     可能出现这种情况。一般在循环的开始,都要进行null检查,检查过程非常快,不用过多担心
	     性能问题。
             */

            QNode s = null; // constructed/reused as needed
	    //如果元素e不为null,则为DATA模式,否则为REQUEST模式
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
		//如果队列头或尾节点没有初始化,则跳出本次自旋
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
		    //如果队列为空,或当前节点与队尾模式相同
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
		        //如果t不是队尾,非一致性读取,跳出本次自旋
                        continue;
                    if (tn != null) {               // lagging tail
		        //如果t的next不为null,设置新的队尾,跳出本次自旋
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
		        //如果超时,且超时时间小于0,则返回null
                        return null;
                    if (s == null)
		        //根据元素和模式构造节点
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
		        //新节点入队列
                        continue;
                    //设置队尾为当前节点
                    advanceTail(t, s);              // swing tail and wait
		    //自旋或阻塞直到节点被fulfilled
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
		        //如果s指向自己,s出队列,并清除队列中取消等待的线程节点
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
		        //如果s节点已经不再队列中,移除
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
		    //如果自旋等待匹配的节点元素不为null,则返回x,否则返回e
                    return (x != null) ? x : e;

                } else {                            // complementary-mode
		    //如果队列不为空,且与队头的模式不同,及匹配成功
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
		        //如果h不为当前队头,则返回,即读取不一致
                        continue;                   // inconsistent read
                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
			//如果队头后继,取消等待,则出队列
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }
		    //否则匹配成功
                    advanceHead(h, m);              // successfully fulfilled
		    //unpark等待线程
                    LockSupport.unpark(m.waiter);
		    //如果匹配节点元素不为null,则返回x,否则返回e,即take操作,返回等待put线程节点元素,
		    //put操作,返回put元素
                    return (x != null) ? x : e;
                }
            }
        }

        /**
         * Spins/blocks until node s is fulfilled.
         *
	 自旋或阻塞直到节点被fulfilled
         * @param s the waiting node,等待节点
         * @param e the comparison value for checking match,检查匹配的比较元素
         * @param timed true if timed wait 是否超时等待
         * @param nanos timeout value 超时等待时间
         * @return matched item, or s if cancelled 成功返回匹配元素,取消返回等待元素
         */
        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill 这里与栈中的实现思路是一样的*/
	    //获取超时的当前时间,当前线程,自旋数
            long lastTime = timed ? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
		    //如果中断,则取消等待
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;//如果s的节点的元素不相等,则返回x,即s节点指向自身,等待clean
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
		        //如果超时,则取消等待
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
		    //自旋数减一
                    --spins;
                else if (s.waiter == null)
		     //如果是节点的等待线程为空,则设置为当前线程
                    s.waiter = w;
                else if (!timed)
		    //非超时,则park
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
		    //超时时间大于自旋时间,则超时park
                    LockSupport.parkNanos(this, nanos);
            }
        }

        /**
         * Gets rid of cancelled node s with original predecessor pred.
	 移除队列中取消等待的线程节点
         */
        void clean(QNode pred, QNode s) {
            s.waiter = null; // forget thread
            /*
             * At any given time, exactly one node on list cannot be
             * deleted -- the last inserted node. To accommodate this,
             * if we cannot delete s, we save its predecessor as
             * "cleanMe", deleting the previously saved version
             * first. At least one of node s or the node previously
             * saved can always be deleted, so this always terminates.
	     在任何时候,最后一个节点入队列时,队列中都有可能存在取消等待,但没有删除的节点。
	     为了将这些节点删除,如果我们不能删除最后入队列的节点,我们可以用cleanMe记录它的前驱,
	     删除cleanMe后继节点。s节点和cleanMe后继节点至少一个删除,则停止。
             */
            while (pred.next == s) { // Return early if already unlinked
	        //如果s为队尾节点,且前驱为旧队尾
                QNode h = head;
                QNode hn = h.next;   // Absorb cancelled first node as head
                if (hn != null && hn.isCancelled()) {
		    //如果队头不为空,且取消等待,设置后继为新的队头元素
                    advanceHead(h, hn);
                    continue;
                }
                QNode t = tail;      // Ensure consistent read for tail
                if (t == h)
		    //空队列,则返回
                    return;
                QNode tn = t.next;
                if (t != tail)
		    //如果队尾有变化,跳出循环
                    continue;
                if (tn != null) {
		    //如果队尾后继不为null,则设置新的队尾
                    advanceTail(t, tn);
                    continue;
                }
                if (s != t) {        // If not tail, try to unsplice
                    QNode sn = s.next;
                    if (sn == s || pred.casNext(s, sn))
		        //s节点指向自己,则返回
                        return;
                }
                QNode dp = cleanMe;
                if (dp != null) {    // Try unlinking previous cancelled node
		    //移除前一个取消等待的节点
                    QNode d = dp.next;
                    QNode dn;
                    if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                         (dn = d.next) != null &&  //   has successor
                         dn != d &&                //   that is on list
                         dp.casNext(d, dn)))       // d unspliced
                        casCleanMe(dp, null);
                    if (dp == pred)
                        return;      // s is already saved node
                } else if (casCleanMe(null, pred))
		    //先前取消等待的节点为null,则将cleanMe设为刚取消等待节点的前驱
                    return;          // Postpone cleaning s
            }
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long cleanMeOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = TransferQueue.class;
                headOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("head"));
                tailOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("tail"));
                cleanMeOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("cleanMe"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

到这里TransferQueue我们已经看完,我们简单的总结一下:
TransferQueue在执行take/put操作时,首先根据元素是否判断当前节点的模式,
如果元素为null则为REQUEST(take)模式,否则为DATA模式(put)。
然后自旋匹配节点,如果队列头或尾节点没有初始化,则跳出本次自旋,
如果队列为空,或当前节点与队尾模式相同,自旋或阻塞直到节点被fulfilled;
如果队列不为空,且与队头的模式不同,及匹配成功,出队列,如果是REQUEST操作,
返回匹配到节点的元素,如果为DATA操作,返回当前节点元素。
TransferQueue相对于TransferStack来说,操作匹配过程更简单,TransferStack为非公平策略下的实现LIFO,TransferQueue是公平策略下的实现FIFO。TransferQueue中的QNODE与TransferStack的SNODE节点有所不同处理后继next,等待线程,节点元素外,SNODE还有一个对应的模式REQUEST,DATA或FULFILLING,而QNODE中用一个布尔值isData来表示模式,这个模式的判断主要根据是元素是否为null,如果为null,则为REQUEST(take)模式,否则为DATA模式(put)。

再来看SynchronousQueue的构造和相关操作
构造:
先看内部Transferer的声明:
 
/**
     * The transferer. Set only in constructor, but cannot be declared
     * as final without further complicating serialization.  Since
     * this is accessed only at most once per public method, there
     * isn't a noticeable performance penalty for using volatile
     * instead of final here.
      transferer在构造函数中初始化,没有进一步的复杂序列化的情况下,不需要
      声明为final。由于transferer至多在public方法中用一次,所以volatile取代final不会有
      太多的性能代价。
     */
    private transient volatile Transferer transferer;

    /**
     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
     */
    public SynchronousQueue() {
       //默认为非公平,栈
        this(false);
    }

    /**
     * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }

如果公平则为TransferQueue,否则为TransferStack。
再看其他操作:
put操作:
 /**
     * Adds the specified element to this queue, waiting if necessary for
     * another thread to receive it.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, false, 0) == null) {
	    //返回为null,则put失败,中断当前线程
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

超时offer:
/**
     * Inserts the specified element into this queue, waiting if necessary
     * up to the specified wait time for another thread to receive it.
     *
     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
     *         specified waiting time elapses before a consumer appears.
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E o, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

再看offer
 /**
     * Inserts the specified element into this queue, if another thread is
     * waiting to receive it.
     *
     * @param e the element to add
     * @return <tt>true</tt> if the element was added to this queue, else
     *         <tt>false</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }

take操作:
 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * for another thread to insert it.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        Object e = transferer.transfer(null, false, 0);
        if (e != null)
            return (E)e;
        Thread.interrupted();
        t
hrow new InterruptedException();
    }
超时poll操作:
    /**
     * Retrieves and removes the head of this queue, waiting
     * if necessary up to the specified wait time, for another thread
     * to insert it.
     *
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element is present.
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return (E)e;
        throw new InterruptedException();
    }

poll操作:
/**
     * Retrieves and removes the head of this queue, if another thread
     * is currently making an element available.
     *
     * @return the head of this queue, or <tt>null</tt> if no
     *         element is available.
     */
    public E poll() {
        return (E)transferer.transfer(null, true, 0);
    }

是否为空
    /**
     * Always returns <tt>true</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @return <tt>true</tt>
     */
    public boolean isEmpty() {
        return true;
    }

总是返回true,说明同步队列总是为空。
size:
 /**
     * Always returns zero.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @return zero.
     */
    public int size() {
        return 0;
    }
remainingCapacity:
    /**
     * Always returns zero.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @return zero.
     */
    public int remainingCapacity() {
        return 0;
    }
clear:
    /**
     * Does nothing.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     */
    public void clear() {
    }
contains:
    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param o the element
     * @return <tt>false</tt>
     */
    public boolean contains(Object o) {
        return false;
    }
remove:
    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param o the element to remove
     * @return <tt>false</tt>
     */
    public boolean remove(Object o) {
        return false;
    }

    /**
     * Returns <tt>false</tt> unless the given collection is empty.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param c the collection
     * @return <tt>false</tt> unless given collection is empty
     */
    public boolean containsAll(Collection<?> c) {
        return c.isEmpty();
    }

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param c the collection
     * @return <tt>false</tt>
     */
    public boolean removeAll(Collection<?> c) {
        return false;
    }

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param c the collection
     * @return <tt>false</tt>
     */
    public boolean retainAll(Collection<?> c) {
        return false;
    }

    /**
     * Always returns <tt>null</tt>.
     * A <tt>SynchronousQueue</tt> does not return elements
     * unless actively waited on.
     *
     * @return <tt>null</tt>
     */
    public E peek() {
        return null;
    }

    /**
     * Returns an empty iterator in which <tt>hasNext</tt> always returns
     * <tt>false</tt>.
     *
     * @return an empty iterator
     */
    public Iterator<E> iterator() {
        return Collections.emptyIterator();
    }


从上面这些方法可以出,由于同步队列总是为空,所以size为0.剩余容量为0,peek返回false,contains返回false,remove返回false。
drainTo操作:
**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while ( (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while (n < maxElements && (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }


下面再看序列化与反序列化:
/*
     * To cope with serialization strategy in the 1.5 version of
     * SynchronousQueue, we declare some unused classes and fields
     * that exist solely to enable serializability across versions.
     * These fields are never used, so are initialized only if this
     * object is ever serialized or deserialized.
     */

    static class WaitQueue implements java.io.Serializable { }
    static class LifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3633113410248163686L;
    }
    static class FifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3623113410248163686L;
    }
    private ReentrantLock qlock;
    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;

    /**
     * Save the state to a stream (that is, serialize it).
     *
     * @param s the stream
     */
    private void writeObject(java.io.ObjectOutputStream s)
        throws java.io.IOException {
        boolean fair = transferer instanceof TransferQueue;
        if (fair) {
            qlock = new ReentrantLock(true);
            waitingProducers = new FifoWaitQueue();
            waitingConsumers = new FifoWaitQueue();
        }
        else {
            qlock = new ReentrantLock();
            waitingProducers = new LifoWaitQueue();
            waitingConsumers = new LifoWaitQueue();
        }
        s.defaultWriteObject();
    }

    private void readObject(final java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        if (waitingProducers instanceof FifoWaitQueue)
            transferer = new TransferQueue();
        else
            transferer = new TransferStack();
    }

序列化与反序列的作用主要是判断同步队列到底是公平的,还是非公平的。

总结:
TransferQueue在执行take/put操作时,首先根据元素是否判断当前节点的模式,如果元素为null则为REQUEST(take)模式,否则为DATA模式(put)。然后自旋匹配节点,如果队列头或尾节点没有初始化,则跳出本次自旋,如果队列为空,或当前节点与队尾模式相同,自旋或阻塞直到节点被fulfilled;如果队列不为空,且与队头的模式不同,及匹配成功,出队列,如果是REQUEST操作,返回匹配到节点的元素,如果为DATA操作,返回当前节点元素。TransferQueue相对于TransferStack来说,操作匹配过程更简单,TransferStack为非公平策略下的实现LIFO,TransferQueue是公平策略下的实现FIFO。TransferQueue中的QNODE与TransferStack的SNODE节点有所不同处理后继next,等待线程,节点元素外,SNODE还有一个对应的模式REQUEST,DATA或FULFILLING,而QNODE中用一个布尔值isData来表示模式,这个模式的判断主要根据是元素是否为null,如果为null,则为REQUEST(take)模式,否则为DATA模式(put)。SynchronousQueue根据构造公平参数,确定transferer为TransferStack还是TransferQueue,默认为TransferStack,SynchronousQueue的put/offer和take/poll统一委托给transferer,即通过TransferStack和TransferQueue的transfer(Object e, boolean timed, long nanos) 方法。由于同步队列一个take伴随着一个put,反之亦然,所有队列总是为空,所以size为0.剩余容量为0,peek返回false,contains返回false,remove返回false。



1
0
分享到:
评论

相关推荐

    java 同步器SynchronousQueue详解及实例

    Java 同步器 SynchronousQueue 详解及实例 Java 中的同步器 SynchronousQueue 是一种特殊的阻塞队列,它最多只能放一个元素,这个元素如果不在特定的时间消费掉就会被删除,队列的长度始终为 0。SynchronousQueue ...

    SynchronousQueue实现原理.pdf

    SynchronousQueue的非公平模式则不同,它不保证操作的顺序性,系统可能会偏向于让最先唤醒的线程进行操作,这在某些高并发的场景下可能会提升性能,但同时也可能导致饥饿现象,即某些线程可能长时间得不到服务。...

    SynchronousQueue详解

    因此,理解SynchronousQueue的工作原理并在适当的情境下使用它是非常重要的。 总的来说,SynchronousQueue是Java并发库中一个强大的工具,它可以提供高效、精确的线程间通信。尽管它有一定的学习曲线,但掌握它的...

    SynchronousQueue核心属性和方法源码的分析

    SynchronousQueue核心属性和方法源码的分析的代码

    PollDemo.rar

    - 主要涉及到`ThreadPoolExecutor`实例的创建,参数包括核心线程数、最大线程数、线程存活时间、时间单位以及工作队列的选择(如ArrayBlockingQueue或SynchronousQueue)。 - 任务提交:`execute()`方法用于提交...

    JAVA线程池的分析和使用

    - **提高可管理性**:线程池提供了对线程数量的控制,便于管理和监控,确保系统在高负载下仍能保持稳定。 2. **线程池的创建与参数** 创建线程池通常使用`ThreadPoolExecutor`类,需要传入以下参数: - **...

    JavaThreaddemo_DEMO_tidecme_线程池Java_源码.zip

    - **workQueue**: 用于存放等待执行的任务的阻塞队列,常见的有ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue等。 - **threadFactory**: 创建新线程的工厂,用于定制线程的创建行为。 - **handler*...

    通过实例了解java TransferQueue

    `LinkedTransferQueue` 是 `TransferQueue` 的一个具体实现,它同时具有 `ConcurrentLinkedQueue` 的无锁特性,`SynchronousQueue` 的公平性以及 `LinkedBlockingQueue` 的链式结构。`LinkedTransferQueue` 在性能上...

    面试必备:Java线程池解析.pdf

    - SynchronousQueue:不存储元素的阻塞队列,提交任务必须等待另一个线程取走。 - PriorityBlockingQueue:具有优先级的无界阻塞队列。 - DelayQueue:用于延迟执行元素的无界阻塞队列。 - LinkedTransferQueue:...

    阿里各岗位技术面试题含答案最新.docx

    然后,可以修改慢指针的前一个节点,使其指向慢指针的下一个节点,从而删除目标节点。 8. **最大频率栈实现** 实现 FreqStack,可以维护两个数据结构:一个栈(用于存储元素),一个哈希表(用于存储元素频率和...

    后端开发基础知识整理JAVA、JVM、操作系统、网络、设计模式、mysql、redis、多线程、spring、springboo

    - **SynchronousQueue实现原理**:一个不存储元素的阻塞队列,适用于传递任务。 - **自定义类的应用场景**:当现有类无法满足需求时,可以自定义类来实现特定功能。 - **双亲委派模型**:类加载机制的一部分,确保类...

    java面试题2024资源下载

    ### Java多线程面试知识点详解 #### 一、线程与进程的区别 ...- **SynchronousQueue**: - 不存储元素的阻塞队列。 以上内容覆盖了Java多线程面试中的常见问题及知识点,希望能帮助你更好地准备面试。

    core_java_20.txt

    ### Java核心知识点解析 #### 1. `hashCode`与`equals`的关系 - **问题**:`hashCode`相等的两个对象一定相等吗?`equals`呢?反过来相等吗? - **解答**: - `hashCode`相等并不意味着两个对象一定相等。这是...

    java 手术任务(线程池)

    - **选择合适的工作队列**:无界队列(如`LinkedBlockingQueue`)、有界队列(如`ArrayBlockingQueue`)、直接提交给线程(如`SynchronousQueue`)。 - **监控和调整线程池**:使用`ThreadPoolExecutor`的`...

    java线程池实例

    常见的有`ArrayBlockingQueue`、`LinkedBlockingQueue`和`SynchronousQueue`等。 6. `threadFactory`: 用于创建新线程的工厂,可以自定义线程的命名、优先级等。 7. `handler`: 当线程池无法接受新任务时,例如达到...

    线程池之ThreadPoolExecutor.docx

    - `workQueue`: 用于保存待执行任务的阻塞队列,有多种类型可供选择,如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue和PriorityBlockingQueue。 - `threadFactory`: 创建线程的工厂,可定制线程属性...

    线程----BlockingQueue

    - **SynchronousQueue**: 特殊的`BlockingQueue`,要求对它的操作必须是交替进行的,即一次放一个元素,然后才能取一个元素。 #### 4. ArrayBlockingQueue与LinkedBlockingQueue的比较 - **数据结构不同**:`...

    阿里各岗位技术面试题含答案最新.pdf

    以上是针对阿里技术面试题中的部分知识点解析,涵盖了并发编程、网络编程、软件质量保障、算法优化、系统设计等多个领域,这些都是互联网行业中尤其是阿里集团在招聘技术人才时关注的关键技能。

Global site tag (gtag.js) - Google Analytics