`
BrokenDreams
  • 浏览: 257101 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:101817
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(15)-SynchronousQueue

阅读更多

Jdk1.6 JUC源码解析(15)-SynchronousQueue

作者:大飞

 

功能简介:
  • SynchronousQueue是一种特殊的阻塞队列,它本身没有容量,只有当一个线程从队列取数据的同时,另一个线程才能放一个数据到队列中,反之亦然。存取过程相当于一个线程把数据(安全的)交给另一个线程的过程。
  • SynchronousQueue也支持公平和非公平模式。
源码分析:
  • SynchronousQueue内部采用伪栈和伪队列来实现,分别对应非公平模式和公平模式。先看下这部分实现。
       伪栈和伪队列的公共基类:
    static abstract class Transferer {
        /**
         * 转移数据的方法,用来实现put或者take。
         *
         * @param e 如果不为null,相当于将一个数据交给消费者;
         *          如果为null,相当于从一个生产者接收一个消费者交出的数据。
         * @param timed 操作是否支持超时。
         * @param nanos 超时时间,单位纳秒。
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract Object transfer(Object e, boolean timed, long nanos);
    }
 

       先看下伪栈实现,内部结构如下:

    static final class TransferStack extends Transferer {
  
        /** 表示一个没有得到数据的消费者 */
        static final int REQUEST    = 0;
        /** 表示一个没有交出数据的生产者 */
        static final int DATA       = 1;
        /**
         * 表示正在匹配另一个生产者或者消费者。
         */
        static final int FULFILLING = 2;
        /** 判断是否包含正在匹配(FULFILLING)的标记 */
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
        /** Node class for TransferStacks. */
        static final class SNode {
            volatile SNode next;        // 栈中的下一个节点
            volatile SNode match;       // 和当前节点完成匹配的节点
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
            // Note: item和mode不需要volatile修饰, 
            // 是因为它们在其他的volatile/atomic操作之前写,之后读。
            SNode(Object item) {
                this.item = item;
            }
            static final AtomicReferenceFieldUpdater<SNode, SNode>
                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
                (SNode.class, SNode.class, "next");
            boolean casNext(SNode cmp, SNode val) {
                return (cmp == next &&
                        nextUpdater.compareAndSet(this, cmp, val));
            }
            static final AtomicReferenceFieldUpdater<SNode, SNode>
                matchUpdater = AtomicReferenceFieldUpdater.newUpdater
                (SNode.class, SNode.class, "match");
            /**
             * 尝试匹配节点s和当前节点,如果匹配成功,唤醒等待线程。
             * (向消费者传递数据或向生产者获取数据)调用tryMatch方法 
             * 来确定它们的等待线程,然后唤醒这个等待线程。
             *
             * @param s the node to match
             * @return true if successfully matched to s
             */
            boolean tryMatch(SNode s) {
                if (match == null &&
                    matchUpdater.compareAndSet(this, null, s)) {
                    //如果当前节点的match为空,那么CAS设置s为match,然后唤醒waiter。
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                //如果match不为null,或者CAS设置match失败,那么比较match和s是否为相同对象。
                //如果相同,说明已经完成匹配,匹配成功。
                return match == s;
            }
            /**
             * 尝试取消当前节点(有线程等待),通过将match设置为自身。
             */
            void tryCancel() {
                matchUpdater.compareAndSet(this, null, this);
            }
            boolean isCancelled() {
                return match == this;
            }
        }
        /** The head (top) of the stack */
        volatile SNode head;
        static final AtomicReferenceFieldUpdater<TransferStack, SNode>
            headUpdater = AtomicReferenceFieldUpdater.newUpdater
            (TransferStack.class,  SNode.class, "head");

 

 

       下面看下伪栈中transfer方法实现细节吧:

        /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /*
             * 基本算法是在一个无限循环中尝试下面三种情况里面的一种:
             *
             * 1. 如果当前栈为空或者包含与给定节点模式相同的节点,尝试 
             *    将节点压入栈内,并等待一个匹配节点,最后返回匹配节点 
             *    或者null(如果被取消)。
             *
             * 2. 如果当前栈包含于给定节点模式互补的节点,尝试将这个节 
             *    点打上FULFILLING标记,然后压入栈中,和相应的节点进行 
             *    匹配,然后将两个节点(当前节点和互补节点)弹出栈,并返 
             *    回匹配节点的数据。匹配和删除动作不是必须要做的,因为 
             *    其他线程会执行动作3:
             *
             * 3. 如果栈顶已经存在一个FULFILLING(正在满足其他节点)的节 
             *    点,帮助这个节点完成匹配和移除(出栈)的操作。然后继续 
             *    执行(主循环)。这部分代码基本和动作2的代码一样,只是 
             *    不会返回节点的数据。
             */
            SNode s = null; // constructed/reused as needed
            int mode = (e == null)? REQUEST : DATA;
            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // head为null或者head和e的mode相同。
                    if (timed && nanos <= 0) {      // 如果超时
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // 如果h不为null且被取消,弹出h。
                        else
                            return null;            // 否则返回null。
                    } else if (casHead(h, s = snode(s, e, h, mode))) {//创建一个SNode,赋给s,将原本的head节点做为其next节点,并尝试将其设置为新的head。
                        //等待其他线程来满足当前线程。
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // awaitFulfill方法返回后,判断下是否被取消。
                            clean(s);               // 如果取消,清理一下s节点。
                            return null;
                        }
                        if ((h = head) != null && h.next == s) //因为上面已经将s设置为head,如果满足这个条件说明有其他节点t插入到s前面,变成了head,而且这个t就是和s匹配的节点,他们已经完成匹配。
                            casHead(h, s.next);     // 将s的next节点设置为head。相当于把s和t一起移除了。
                        return mode == REQUEST? m.item : s.item;
                    }
                } else if (!isFulfilling(h.mode)) { 
                    /*
                     * 如果栈中存在头节点,且和当前节点不是相同模式,
                     * 那么说明它们是一对儿对等的节点,尝试用当前节
                     * 点s来满足h节点。
                     */
                    if (h.isCancelled())            // 如果h节点已经被取消
                        casHead(h, h.next);         // 将h节点弹出,并将h节点的next节点设置为栈的head。
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//尝试将当前节点打上"正在匹配"的标记,并设置为head。
                        for (;;) {
                            SNode m = s.next;       // s是当前节点,m是s的next节点,它们是正在匹配的两个节点。
                            if (m == null) {        // 如果m为空,可能其他节点把m匹配走了。
                                casHead(s, null);   // 将s弹出
                                s = null;           // 将s置空,下轮循环的时候还会新建。
                                break;              // 回退到主循环再来一次。
                            }
                            SNode mn = m.next;      // 获取m的next节点,如果s和m匹配成功,mn就得补上head的位置了。
                            if (m.tryMatch(s)) {    // 尝试匹配一下,匹配成功的话会把m上等待的线程唤醒。
                                casHead(s, mn);     // 如果匹配成功,把s和m弹出。
                                return (mode == REQUEST)? m.item : s.item;
                            } else                  // 没匹配成功的话,说明m可能被其他节点满足了。
                                s.casNext(m, mn);   // 说明m已经被其他节点匹配了,那就把m移除掉。
                        }
                    }
                } else {                            // 到这儿的话,说明栈顶的h正在匹配过程中。
                    SNode m = h.next;               // m是h的配对儿,h正在和m匹配。
                    if (m == null)                  // 如果m为空,其他节点把m匹配走了。
                        casHead(h, null);           // 弹出h。
                    else {
                        SNode mn = m.next;          // 获取m的next节点,如果m和h匹配成功,mn就得补上head的位置了。
                        if (m.tryMatch(h))          // 帮忙匹配一下m和h。
                            casHead(h, mn);         // 匹配成功的话,把h和m弹出。
                        else                        // 没匹配成功的话,说明m可能被其他节点满足了。
                            h.casNext(m, mn);       // 没成功的话,说明m已经被其他节点匹配了,那就把m移除掉。
                    }
                }
            }
        }

 

     看下上面方法中调用的创建节点的snode方法:

        static SNode snode(SNode s, Object e, SNode next, int mode) {
            if (s == null) s = new SNode(e);
            s.mode = mode;
            s.next = next;
            return s;
        }

 

     再看下等待被匹配的方法:

        /**
         * 自旋/阻塞直到节点被匹配。
         *
         * @param s the waiting node
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched node, or s if cancelled
         */
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            /*
             * 在s节点真正阻塞之前,将当前线程设置到s上面,然后检
             * 查中断状态(不少于一次),以确保后续和s匹配的节点来
             * 唤醒当前线程。
             *
             * 当执行此方法时,如果执行节点恰好在栈顶,阻塞之前会 
             * 做一些自旋,为的是如果有生产者或消费者马上到来,就 
             * 不需要执行节点阻塞了。这种优化在多核下是有意义的。
             */
            long lastTime = (timed)? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            SNode h = head;
            int spins = (shouldSpin(s)?
                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();//如果当前线程被中断了,那么取消当前节点。
                SNode m = s.match;
                if (m != null)
                    return m; //如果已经匹配成功,就返回匹配的节点。
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
                        s.tryCancel(); //如果超时了,也取消当前节点。
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s)? (spins-1) : 0; //自旋控制,每次循环都检测是否满足自旋条件,满足的话,自旋值就减去1,然后进入下次循环(一直减到0)
                else if (s.waiter == null)
                    s.waiter = w; //第一次循环时,会将当前线程设置到s上。
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos); //有超时条件下,会检测超时时间是否大于超时阀值(这应该是一个经验值),大于就阻塞,小于就自旋。
            }
        }
        /**
         * 如果s节点就是当前栈中头节点,或者头节点正在匹配过程中,那么可以自旋一下。
         */
        boolean shouldSpin(SNode s) {
            SNode h = head;
            return (h == s || h == null || isFulfilling(h.mode));
        }
    
    //=========下面是自旋相关参数,定义在SynchronousQueue类中============================================
    /** The number of CPUs, for spin control */
    static final int NCPUS = Runtime.getRuntime().availableProcessors();
    /**
     * The number of times to spin before blocking in timed waits.
     * The value is empirically derived -- it works well across a
     * variety of processors and OSes. Empirically, the best value
     * seems not to vary with number of CPUs (beyond 2) so is just
     * a constant.
     */
    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
    /**
     * The number of times to spin before blocking in untimed waits.
     * This is greater than timed value because untimed waits spin
     * faster since they don't need to check times on each spin.
     */
    static final int maxUntimedSpins = maxTimedSpins * 16;
    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices.
     */
    static final long spinForTimeoutThreshold = 1000L;  

 

     最后看清理节点方法:

        /**
         * 当s节点被取消时,才会调用这个方法。
         */
        void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread
            /*
             * At worst we may need to traverse entire stack to unlink
             * s. If there are multiple concurrent calls to clean, we
             * might not see s if another thread has already removed
             * it. But we can stop when we see any node known to
             * follow s. We use s.next unless it too is cancelled, in
             * which case we try the node one past. We don't check any
             * further because we don't want to doubly traverse just to
             * find sentinel.
             */
            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next; 
            // 将从栈顶节点开始到past的连续的取消节点移除。
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);
            // 如果p本身未取消(上面的while碰到一个未取消的节点就会退出,但这个节点和past节点之间可能还有取消节点),再把p到past之间的取消节点都移除。
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }
    }

   

 
       再看下伪队列实现,内部结构如下:
    static final class TransferQueue extends Transferer {
    
        /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;
            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }
            ...
            /**
             * 尝试取消节点。
             * 取消就是将节点的item域指向自身。
             */
            void tryCancel(Object cmp) {
                itemUpdater.compareAndSet(this, cmp, this);
            }
            boolean isCancelled() {
                return item == this;
            }
            /**
             * 判断节点是否离开了队列。
             */
            boolean isOffList() {
                return next == this;
            }
        }
        /** 队列头节点 */
        transient volatile QNode head;
        /** 队列尾节点 */
        transient volatile QNode tail;
        /**
         * 指向一个被取消的节点,如果取消这个节点时,它是最后一个进入队列的节点,
         * 那么这个节点可能还没有离开队列。
         */
        transient volatile QNode cleanMe;
        TransferQueue() {
            QNode h = new QNode(null, false); // 初始化一个哨兵节点。
            head = h;
            tail = h;
        }
 

       看下伪队列中transfer方法实现:

        /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* 
             * 基本算法是在一个无限循环中尝试下面两种动作里面的一种:
             *
             * 1. 如果队列为空,或者包含相同模式(存或者取)的节点。
             *    尝试将节点加入等待的队列,直到被匹配(或被取消),
             *    同时返回匹配节点的数据。
             *
             * 2. 如果队列中包含等待的节点,并且当前节点和这个等待
             *    节点能相互匹配,那么尝试匹配等待节点并将这个节点 
             *    出队,然后返回匹配节点的数据。
             *
             * 在每个动作里面,都会检测并帮助其他线程来完成节点推进。
             *
             * 在循环开始的时候会做一个非空检测,以避免当前线程看到 
             * 未初始化的头尾节点。这种情况在当前SynchronousQueue中 
             * 永远不会发生,但如果调用者持有一个非volatile/final域 
             * 的话,就有可能会发生。在循环开始的时间做这个非空检测 
             * 要比在内部(分支里)做性能好一些。
             */
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);
            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // 如果看到未初始化的头尾节点
                    continue;                       
                if (h == t || t.isData == isData) { // 队列为空或者当前节点和队列中节点模式相同。
                    QNode tn = t.next;
                    if (t != tail)                  // 读取到不一致的结果,说明同时有其他线程修改了tail。
                        continue;                   
                    if (tn != null) {               // 说明其他线程已经添加了新节点tn,但还没将其设置为tail。
                        advanceTail(t, tn);         // 当前线程帮忙推进尾节点,就是尝试将tn设置为尾节点。
                        continue;                   
                    }
                    if (timed && nanos <= 0)        // 超时。
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);   // 初始化s。
                    if (!t.casNext(null, s))        // 尝试将当前节点s拼接到t后面。
                        continue;                   // 不成功就继续下次循环
                    advanceTail(t, s);              // 尝试将s设置为队列尾节点。
                    Object x = awaitFulfill(s, e, timed, nanos); // 然后等着被匹配。
                    if (x == s) {                   // 如果被取消。
                        clean(t, s);                // 清理s节点。
                        return null;
                    }
                    if (!s.isOffList()) {           // 如果s节点还没有离开队列。
                        advanceHead(t, s);          // 尝试将s设置为头节点,移除t。
                        if (x != null)              
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null)? x : e;
                } else {                            // 模式正好互补。
                    QNode m = h.next;               // 找到能匹配的节点。
                    if (t != tail || m == null || h != head)
                        continue;                   // 读取到不一致的结果,进入下一轮循环。
                    Object x = m.item;
                    if (isData == (x != null) ||    // 如果m已经被匹配了。
                        x == m ||                   // 或者m被取消了。
                        !m.casItem(x, e)) {         // 如果尝试将数据e设置到m上失败。
                        advanceHead(h, m);          // 将h出队,m设置为头结点,然后重试。
                        continue;
                    }
                    advanceHead(h, m);              // 成功匹配,推进头节点。
                    LockSupport.unpark(m.waiter);   // 唤醒m上的等待线程。
                    return (x != null)? x : e;
                }
            }
        }

 

       看下transfer方法中调用的advanceHead和advanceTail方法:

        /**
         * Tries to cas nh as new head; if successful, unlink
         * old head's next node to avoid garbage retention.
         */
        void advanceHead(QNode h, QNode nh) {
            if (h == head && headUpdater.compareAndSet(this, h, nh))
                h.next = h; // forget old next
        }
        /**
         * Tries to cas nt as new tail.
         */
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                tailUpdater.compareAndSet(this, t, nt);
        }

 

       再看下transfer方法中调用的awaitFulfill方法:

        /**
         * Spins/blocks until node s is fulfilled.
         *
         * @param s the waiting node
         * @param e the comparison value for checking match
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched item, or s if cancelled
         */
        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            long lastTime = (timed)? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

 

       TransferStack的awaitFulfill方法思路差不多,代码就不做解析了。再看下transfer中调用的clean方法:

        void clean(QNode pred, QNode s) {
            s.waiter = null; // forget thread
            /*
             * 在任意给定的时间点,能删除的节点一定是最后入队的节点。
             * 为了满足这个条件,如果当前无法删除s,就将其前驱节点保 
             * 存为"cleanMe",先删除之前保存的版本。至少节点s和之前 
             * 保存的节点里面有一个能被删除,所以方法一定会结束。
             */
            while (pred.next == s) { // Return early if already unlinked
                QNode h = head;
                QNode hn = h.next;   
                if (hn != null && hn.isCancelled()) {
                    advanceHead(h, hn); //如果head节点的next节点被取消,那么推进一下head节点。
                    continue;
                }
		        QNode t = tail;      // Ensure consistent read for tail
                if (t == h)          // 如果队列为空,
                    return;
		        QNode tn = t.next;    
		        if (t != tail)       // 出现不一致读,重试。
                    continue;
                if (tn != null) {
                    advanceTail(t, tn); // 帮助推进尾节点。
                    continue;
                }
                if (s != t) {        // 如果s不是尾节点,移除s。
                    QNode sn = s.next;
                    if (sn == s || pred.casNext(s, sn)) //如果s已经被移除退出循环,否则尝试断开s
                        return;
                }
                /*
                 * 下面要做的事情大体就是:如果s是位节点,那么不会马上删除s,
                 * 而是将s的前驱节点设置为cleanMe,下次清理其他取消节点的时候
                 * 会顺便把s移除。
                 */
                QNode dp = cleanMe;
                if (dp != null) {    // 如果dp不为null,说明是前一个被取消节点,将其移除。
                    QNode d = dp.next;
                    QNode dn;
                    if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                         (dn = d.next) != null &&  //   has successor
                         dn != d &&                //   that is on list
                         dp.casNext(d, dn)))       //  把之前标记为cleanMe节点的next节点d移除。
                        casCleanMe(dp, null);      
                    if (dp == pred)
                        return;      // 说明s的前驱已经是cleanMe了(后续会被删掉)。
                 } else if (casCleanMe(null, pred))
                    return;          // 如果当前cleanMe为null,那么将s前驱节点设置为cleanMe,并退出。
              }
           }

         

       小总结一下:
       从上面的分析可以看出,伪栈的结构下,新来的线程会作为栈顶节点或者优先和栈顶的等待节点进行匹配,并不是公平的;但伪队列的结构下,新来的线程会在队尾,或者和队头的等待节点(最前到的)进行匹配,能够保证一定的公平性。
 
  • 有了内部伪栈和伪队列的实现,SynchronousQueue实现起来就很容易了,看下代码:
    private transient volatile Transferer transferer;

    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        transferer = (fair)? new TransferQueue() : new TransferStack();
    }
    /**
     * 添加一个数据到队列,等到其他线程接收这个数据。
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, false, 0) == null) {
	        Thread.interrupted();
            throw new InterruptedException();
	    }
    }
    /**
     * 添加一个数据到队列,等到其他线程接收这个数据或者超时。
     *
     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
     *         specified waiting time elapses before a consumer appears.
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E o, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
    /**
     * 添加一个数据到队列,如果有其他线程正等待接收这个数据且接收成功,返回true;否则返回false。
     * 
     * 这个方法不阻塞。
     * @param e the element to add
     * @return <tt>true</tt> if the element was added to this queue, else
     *         <tt>false</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }
    /**
     * 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据。
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        Object e = transferer.transfer(null, false, 0);
        if (e != null)
            return (E)e;
	Thread.interrupted();
        throw new InterruptedException();
    }
    /**
     * 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据或者超时。
     *
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element is present.
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return (E)e;
        throw new InterruptedException();
    }
    /**
     * 如果其他线程正在添加数据到队列,那么尝试获取并移除这个数据。
     * 
     * 这个方法不阻塞。
     * @return the head of this queue, or <tt>null</tt> if no
     *         element is available.
     */
    public E poll() {
        return (E)transferer.transfer(null, true, 0);
    }

 

       由于SynchronousQueue没有实际的容量,所以其他方法实现起来很简单了:

    public boolean isEmpty() {
        return true;
    }

    public int size() {
        return 0;
    }

    public int remainingCapacity() {
        return 0;
    }
 
    public void clear() {
    }

    public boolean contains(Object o) {
        return false;
    }

    public boolean remove(Object o) {
        return false;
    }

    public boolean containsAll(Collection<?> c) {
        return c.isEmpty();
    }

    public boolean removeAll(Collection<?> c) {
        return false;
    }

    public boolean retainAll(Collection<?> c) {
        return false;
    }
    ...

 

  • 最后看一下SynchronousQueue的序列化,序列化比较特别,因为transferer域本身不需要序列化,但需要记住transferer是内部伪栈和伪队列:
    static class WaitQueue implements java.io.Serializable { }
    static class LifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3633113410248163686L;
    }
    static class FifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3623113410248163686L;
    }
    private ReentrantLock qlock;
    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;
    /**
     * Save the state to a stream (that is, serialize it).
     *
     * @param s the stream
     */
    private void writeObject(java.io.ObjectOutputStream s)
        throws java.io.IOException {
        //序列化时根据TransferQueue类型来创建WaitQueue实例。
        boolean fair = transferer instanceof TransferQueue;
        if (fair) {
            qlock = new ReentrantLock(true);
            waitingProducers = new FifoWaitQueue();
            waitingConsumers = new FifoWaitQueue();
        }
        else {
            qlock = new ReentrantLock();
            waitingProducers = new LifoWaitQueue();
            waitingConsumers = new LifoWaitQueue();
        }
        s.defaultWriteObject();
    }
    private void readObject(final java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        if (waitingProducers instanceof FifoWaitQueue)
            transferer = new TransferQueue();
        else
            transferer = new TransferStack();
    }

 

 
       SynchronousQueue的代码解析完毕!
 
 
 
 

 

分享到:
评论

相关推荐

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...

    Java并发编程实践分享PPT教案学习.pptx

    在JDK1.5及以后版本中,Java引入了Java并发工具包(java.util.concurrent,简称JUC),提供了一系列实用的并发编程类和接口。 2. **并发容器**: - 在示例中提到了`ArrayList`和`Vector`。`ArrayList`是非线程安全...

    智慧园区3D可视化解决方案PPT(24页).pptx

    在智慧园区建设的浪潮中,一个集高效、安全、便捷于一体的综合解决方案正逐步成为现代园区管理的标配。这一方案旨在解决传统园区面临的智能化水平低、信息孤岛、管理手段落后等痛点,通过信息化平台与智能硬件的深度融合,为园区带来前所未有的变革。 首先,智慧园区综合解决方案以提升园区整体智能化水平为核心,打破了信息孤岛现象。通过构建统一的智能运营中心(IOC),采用1+N模式,即一个智能运营中心集成多个应用系统,实现了园区内各系统的互联互通与数据共享。IOC运营中心如同园区的“智慧大脑”,利用大数据可视化技术,将园区安防、机电设备运行、车辆通行、人员流动、能源能耗等关键信息实时呈现在拼接巨屏上,管理者可直观掌握园区运行状态,实现科学决策。这种“万物互联”的能力不仅消除了系统间的壁垒,还大幅提升了管理效率,让园区管理更加精细化、智能化。 更令人兴奋的是,该方案融入了诸多前沿科技,让智慧园区充满了未来感。例如,利用AI视频分析技术,智慧园区实现了对人脸、车辆、行为的智能识别与追踪,不仅极大提升了安防水平,还能为园区提供精准的人流分析、车辆管理等增值服务。同时,无人机巡查、巡逻机器人等智能设备的加入,让园区安全无死角,管理更轻松。特别是巡逻机器人,不仅能进行360度地面全天候巡检,还能自主绕障、充电,甚至具备火灾预警、空气质量检测等环境感知能力,成为了园区管理的得力助手。此外,通过构建高精度数字孪生系统,将园区现实场景与数字世界完美融合,管理者可借助VR/AR技术进行远程巡检、设备维护等操作,仿佛置身于一个虚拟与现实交织的智慧世界。 最值得关注的是,智慧园区综合解决方案还带来了显著的经济与社会效益。通过优化园区管理流程,实现降本增效。例如,智能库存管理、及时响应采购需求等举措,大幅减少了库存积压与浪费;而设备自动化与远程监控则降低了维修与人力成本。同时,借助大数据分析技术,园区可精准把握产业趋势,优化招商策略,提高入驻企业满意度与营收水平。此外,智慧园区的低碳节能设计,通过能源分析与精细化管理,实现了能耗的显著降低,为园区可持续发展奠定了坚实基础。总之,这一综合解决方案不仅让园区管理变得更加智慧、高效,更为入驻企业与员工带来了更加舒适、便捷的工作与生活环境,是未来园区建设的必然趋势。

    labelme标注的json转mask掩码图,用于分割数据集 批量转化,生成cityscapes格式的数据集

    labelme标注的json转mask掩码图,用于分割数据集 批量转化,生成cityscapes格式的数据集

    (参考GUI)MATLAB GUI漂浮物垃圾分类检测.zip

    (参考GUI)MATLAB GUI漂浮物垃圾分类检测.zip

    人脸识别_OpenCV_活体检测_证件照拍照_Demo_1741778955.zip

    人脸识别项目源码实战

    人脸识别_科大讯飞_Face_签到系统_Swface_1741770704.zip

    人脸识别项目实战

    跟网型逆变器小干扰稳定性分析与控制策略优化simulink仿真模型和代码.zip

    本仿真模型基于MATLAB/Simulink(版本MATLAB 2016Rb)软件。建议采用matlab2016 Rb及以上版本打开。(若需要其他版本可联系代为转换) CSDN详情地址:https://blog.csdn.net/qq_50594161/article/details/146242453sharetype=blogdetail&sharerId=146242453&sharerefer=PC&sharesource=qq_50594161&spm=1011.2480.3001.8118

    16-1文本表示&词嵌入.ipynb

    实战练习分词、创建词表、文本处理

    45页-零碳智慧园区标准解决方案:模块化、可扩展且可复制的解决方案.pdf

    在智慧园区建设的浪潮中,一个集高效、安全、便捷于一体的综合解决方案正逐步成为现代园区管理的标配。这一方案旨在解决传统园区面临的智能化水平低、信息孤岛、管理手段落后等痛点,通过信息化平台与智能硬件的深度融合,为园区带来前所未有的变革。 首先,智慧园区综合解决方案以提升园区整体智能化水平为核心,打破了信息孤岛现象。通过构建统一的智能运营中心(IOC),采用1+N模式,即一个智能运营中心集成多个应用系统,实现了园区内各系统的互联互通与数据共享。IOC运营中心如同园区的“智慧大脑”,利用大数据可视化技术,将园区安防、机电设备运行、车辆通行、人员流动、能源能耗等关键信息实时呈现在拼接巨屏上,管理者可直观掌握园区运行状态,实现科学决策。这种“万物互联”的能力不仅消除了系统间的壁垒,还大幅提升了管理效率,让园区管理更加精细化、智能化。 更令人兴奋的是,该方案融入了诸多前沿科技,让智慧园区充满了未来感。例如,利用AI视频分析技术,智慧园区实现了对人脸、车辆、行为的智能识别与追踪,不仅极大提升了安防水平,还能为园区提供精准的人流分析、车辆管理等增值服务。同时,无人机巡查、巡逻机器人等智能设备的加入,让园区安全无死角,管理更轻松。特别是巡逻机器人,不仅能进行360度地面全天候巡检,还能自主绕障、充电,甚至具备火灾预警、空气质量检测等环境感知能力,成为了园区管理的得力助手。此外,通过构建高精度数字孪生系统,将园区现实场景与数字世界完美融合,管理者可借助VR/AR技术进行远程巡检、设备维护等操作,仿佛置身于一个虚拟与现实交织的智慧世界。 最值得关注的是,智慧园区综合解决方案还带来了显著的经济与社会效益。通过优化园区管理流程,实现降本增效。例如,智能库存管理、及时响应采购需求等举措,大幅减少了库存积压与浪费;而设备自动化与远程监控则降低了维修与人力成本。同时,借助大数据分析技术,园区可精准把握产业趋势,优化招商策略,提高入驻企业满意度与营收水平。此外,智慧园区的低碳节能设计,通过能源分析与精细化管理,实现了能耗的显著降低,为园区可持续发展奠定了坚实基础。总之,这一综合解决方案不仅让园区管理变得更加智慧、高效,更为入驻企业与员工带来了更加舒适、便捷的工作与生活环境,是未来园区建设的必然趋势。

    人脸识别_活体检测_数据录入_登录系统Face_Login_1741778308.zip

    人脸识别项目源码实战

    学生信息管理平台是一个基于Java Web技术的综合性管理平台

    学生信息管理系统是一个基于Java Web技术的综合性管理平台。通过此系统,可以实现对学生、教师、选课信息等的动态管理, 提升学校管理效率。系统采用分层架构设计,前端使用HTML、CSS,JavaScript和jQuery,后端基于Servlet,JSP和Spring框架,数据库采用MySQL。主要有四个大功能,学生管理( 增加学生信息、删除学生信息、修改学生信息、查询学生信息)、教师管理(增加教师信息、删除教师信息、修改教师信息、查询教师信息)、选课信息管理(添加选课、查询选课情况、删除选课记录)、系统管理( 登录与注册功能、 用户角色管理(老师,学生,管理员)、系统日志查看)。 技术架构 1.前端技术 HTML,CSS:静态页面布局与样式 JavaScript,jQuery:动态交互、DOM操作和AJAX请求 2.后端技术 Servlet:控制层,处理用户请求 JSP:页面动态生成 Spring:依赖注入,业务逻辑分离 3.数据库 MySQL:存储学生、教师,课程等数据 JDBC:数据库连接与操作

    PHP进阶系列之Swoole入门精讲(课程视频)

    本课程是 PHP 进阶系列之 Swoole 入门精讲,系统讲解 Swoole 在 PHP 高性能开发中的应用,涵盖 协程、异步编程、WebSocket、TCP/UDP 通信、任务投递、定时器等核心功能。通过理论解析和实战案例相结合,帮助开发者掌握 Swoole 的基本使用方法及其在高并发场景下的应用。 适用人群: 适合 有一定 PHP 基础的开发者、希望提升后端性能优化能力的工程师,以及 对高并发、异步编程感兴趣的学习者。 能学到什么: 掌握 Swoole 基础——理解 Swoole 的核心概念,如协程、异步编程、事件驱动等。 高并发处理——学习如何使用 Swoole 构建高并发的 Web 服务器、TCP/UDP 服务器。 实战项目经验——通过案例实践,掌握 Swoole 在 WebSocket、消息队列、微服务等场景的应用。 阅读建议: 建议先掌握 PHP 基础,了解 HTTP 服务器和并发处理相关概念。学习过程中,结合 官方文档和实际项目 进行实践,加深理解,逐步提升 Swoole 开发能力。

    人脸识别_表情分析_spider运行_数据采集用途_1741771318.zip

    人脸识别项目实战

    美颜_GPUimage_人脸识别_动态贴纸_Demo_1741771705.zip

    人脸识别项目实战

    人脸照片文件批量分辨率裁剪工具

    功能简介:本工具可实现批量对照片文件的人脸识别,并按指定分辨率进行转换保存。 可为人脸识别采集系统提供很好的辅助工具。 软件基本于OPENVC开发,识别精确,转换高效。 人脸识别工具 +人脸采集处理

    基于强化学习与肌肉长度反馈控制的高效无意识姿态稳定算法研究(可复现,有问题请联系博主)

    内容概要:本文探讨了利用肌长变化反馈控制(FCM-ML)和演员-评论家强化学习(ACRL-NGN)来有效实现人体上肢和下肢无意识姿态稳定的算法方法。通过构建一个包含949条肌肉和22个关节的全身计算模型,在不同初始姿势的情况下进行模拟试验,验证了这些方法的有效性和鲁棒性,结果显示FCM-ML方法比其他传统方法更适用于此类任务。研究指出人类及其他脊椎动物在无意识状态下,通过抗拮抗性的肌肉长度变化反馈机制来维持舒适状态下的自然身体姿势(NBP)。此外,研究还表明这种控制策略有助于机器人设计、运动员训练以及康复患者的治疗。 适用人群:生物力学、机器人学以及神经科学领域的研究人员、工程师,以及关注人体姿态控制及其应用的学者和技术人员。 使用场景及目标:①解释人和非人的脊椎动物如何在无意识情况下维持最佳姿势,特别是处于重力环境中的自然身体姿势(NBP)。②为机器人肌肉控制提供理论支持和发展方向,特别是在模拟多肌肉协调控制方面。③指导运动训练及病患恢复计划的设计与优化。 其他说明:研究发现ACRL-NGN结合FCM-ML不仅能够迅速有效地实现期望的姿态稳定性,而且不需要对肌肉分类,这使其在复

Global site tag (gtag.js) - Google Analytics