`

读SynchronousQueue源码

阅读更多
//先看构造方法
public SynchronousQueue() {
        this(false);
    }
 //公平模式或者非公平模式
 public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }



//所有的生产和消费都走的这个方法
Object transfer(Object e, boolean timed, long nanos) {
            
            SNode s = null; // constructed/reused as needed
	    //e==null是消费者,否则为生产者。
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
		//如果头为空或者有相同的模式说明没有其他线程
                if (h == null || h.mode == mode) {  
		  //说明不能等待
                    if (timed && nanos <= 0) {    
		     //再次判断有没有head
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                      //如果可以等待,CAS设置当前node为head。
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
		        //一直自旋等到匹配
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (mode == REQUEST) ? m.item : s.item;
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
			    //尝试匹配
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (mode == REQUEST) ? m.item : s.item;
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }


boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
        }


 SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            //阻塞版本timed为false所以这里是0
            long lastTime = timed ? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            SNode h = head;
	    //是否需要旋转如果需要则自旋需要则根据timed判断自旋多少次。
	    //这里有一个疑问为什么要自旋而不直接挂起。我觉得是因为效率问题。
	    //自旋等待可以减少线程调度。
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
               //自旋完了就挂起
		else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

boolean shouldSpin(SNode s) {
            SNode h = head;
	    //如果是头节点或者头节点为null
            return (h == s || h == null || isFulfilling(h.mode));
        }

 boolean tryMatch(SNode s) {
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }


/**
总结:这个类相当于一个生产者对应一个消费者。一个人生产了就必须要有一个来消费。
相当于一对一。一个线程进来然后设置为头结点,然后自旋等待。另外一个线程进来,发现有头结点了,
说明有线程了。然后将自己设置为头结点,并将next指向刚才的头结点。然后尝试匹配。匹配成功。
判断当前线程的模式,是生产者则取头结点的值,消费者则取第二个节点的值。
*/

分享到:
评论

相关推荐

    java 同步器SynchronousQueue详解及实例

    Java 同步器 SynchronousQueue 详解及实例 Java 中的同步器 SynchronousQueue 是一种特殊的阻塞队列,它最多只能放一个元素,这个元素如果不在特定的时间消费掉就会被删除,队列的长度始终为 0。SynchronousQueue ...

    SynchronousQueue核心属性和方法源码的分析

    SynchronousQueue核心属性和方法源码的分析的代码

    SynchronousQueue实现原理.pdf

    SynchronousQueue是Java中的一个阻塞队列实现,它在Java并发编程中扮演着重要的角色。SynchronousQueue特别之处在于它没有进行数据存储的能力,也就是说它的容量为0。其核心功能是用于线程之间的直接传递数据,实现...

    SynchronousQueue详解

    与Exchanger相比,SynchronousQueue同样允许两个线程间交换数据,但Exchanger是在两个线程交换数据后才继续执行,而SynchronousQueue则是实时交换,一旦一个线程完成其操作,另一个线程立即接手。 SynchronousQueue...

    JDK之ThreadPoolExecutor源码分析1

    例如,线程池的SHUTDOWN、STOP等状态会影响是否接收新任务,而工作队列的类型(如ArrayBlockingQueue、SynchronousQueue等)则会影响任务的入队和出队行为。 在分析源码时,我们需要关注以下几个关键点: 1. `...

    Java线程池,正式上线运行的源码,分享欢迎使用并提意见

    根据不同的应用场景,可以选择不同类型的队列,如无界队列(LinkedBlockingQueue)、有界队列(ArrayBlockingQueue)或同步队列(SynchronousQueue)。 线程池的工作流程大致如下: 1. 当一个任务被提交到线程池时...

    java线程池threadpool简单使用源码

    常见的工作队列有`ArrayBlockingQueue`、`LinkedBlockingQueue`和`SynchronousQueue`等。选择哪种队列取决于应用的需求,如是否需要限制并发数量、任务提交速度与处理速度的关系等。 5. **ThreadGroup**:在Java中...

    JavaThreaddemo_DEMO_tidecme_线程池Java_源码.zip

    - **workQueue**: 用于存放等待执行的任务的阻塞队列,常见的有ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue等。 - **threadFactory**: 创建新线程的工厂,用于定制线程的创建行为。 - **handler*...

    java线程池源码-java-source:Java源码学习多线程、线程池、集合

    5. 工作队列:用来存放待执行任务的队列,有多种类型,如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等,它们有不同的容量和竞争策略。 线程池的工作流程如下: 1. 当有新任务提交时,如果当前线程...

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...

    04 并发编程专题02.zip

    4. `SynchronousQueue`: 它不是一个真正的队列,而是一个同步点。任何插入操作必须等待一个匹配的删除操作,反之亦然。这种队列常用于实现生产者-消费者模型,或者在`ThreadPoolExecutor`中实现“线程池”中的“线程...

    线程池调用队列

    3. **直接提交(Direct Handoff)**:如`SynchronousQueue`,它不是一个真正的队列,而是一种特殊的线程间传递。当一个任务被提交时,如果线程池中没有空闲线程,则会尝试直接将任务交给另一个线程,如果没有其他...

    多线程编程.docx

    - **SynchronousQueue**:一个特殊的阻塞队列,它不存储元素,而是在生产和消费之间直接传递数据。 - **双端阻塞队列**: - **LinkedBlockingDeque**:基于链表实现的阻塞双端队列,可以从两端进行插入或删除操作...

    Android开发经验谈:并发编程(线程与线程池)(推荐)

    Android开发经验谈:并发编程(线程与线程池) Android开发中,线程和线程池是两个非常重要的概念。...* ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等队列的使用方法

    java并发工具包 java.util.concurrent中文版用户指南pdf

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    阿里各岗位技术面试题含答案最新.docx

    SynchronousQueue 是一个特殊的 BlockingQueue,它不存储任何元素,所有的插入操作必须等到其他线程进行相应的删除操作才能完成。在给定的代码中,试图向空的 SynchronousQueue 添加三个元素(1, 2, 3)并尝试取出...

    2004_DISC_dual_DS.pdf

    标题《2004_DISC_dual_DS.pdf》和描述“SynchronousQueue 底层算法相关实现论文”暗示了文档与Java并发编程中的非阻塞同步队列实现及其底层算法相关。从提供的部分内容,我们可以看出,文档讨论了线性化的经典理论...

    Concurrent In java

    这种方式适用于读多写少的场景,因为在写操作时需要创建一个完整的副本,这可能会导致较大的内存消耗和性能开销。 #### 三、ThreadPool 线程池是Java并发编程的核心组成部分之一,它允许预先创建和复用一组线程,...

Global site tag (gtag.js) - Google Analytics