`

读Exchanger源码

阅读更多
//用于线程间交换数据
public V exchange(V x) throws InterruptedException {
        if (!Thread.interrupted()) {
            Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
            if (v == NULL_ITEM)
                return null;
            if (v != CANCEL)
                return (V)v;
            Thread.interrupted(); // Clear interrupt status on IE throw
        }
        throw new InterruptedException();
    }

 private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);                 // Create in case occupying
        //计算hash值
	int index = hashIndex();                  // Index of current slot
        int fails = 0;                            // Number of CAS failures

        for (;;) {
            Object y;                             // Contents of current slot
            Slot slot = arena[index];
            if (slot == null) 
	        //创建slot
                createSlot(index);     
            //如果有值
            else if ((y = slot.get()) != null &&  // Try to fulfill
                     slot.compareAndSet(y, null)) {
                Node you = (Node)y;               // Transfer item
                if (you.compareAndSet(null, item)) {
		    //唤醒线程
                    LockSupport.unpark(you.waiter);
                    return you.item;
                }                                 // Else cancelled; continue
            }

            else if (y == null &&                 // Try to occupy
                     slot.compareAndSet(null, me)) {
                if (index == 0)                   // Blocking wait for slot 0
                    return timed ?
		     //阻塞当前线程
                        awaitNanos(me, slot, nanos) :
			await(me, slot);
                Object v = spinWait(me, slot);    // Spin wait for non-0
                if (v != CANCEL)
                    return v;
                me = new Node(item);              // Throw away cancelled node
                int m = max.get();
                if (m > (index >>>= 1))           // Decrease index
                    max.compareAndSet(m, m - 1);  // Maybe shrink table
            }
            else if (++fails > 1) {               // Allow 2 fails on 1st slot
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                    index = m + 1;                // Grow on 3rd failed slot
                else if (--index < 0)
                    index = m;                    // Circularly traverse
            }
        }
    }


private void createSlot(int index) {
        // Create slot outside of lock to narrow sync region
        Slot newSlot = new Slot();
        Slot[] a = arena;
        synchronized (a) {
            if (a[index] == null)
                a[index] = newSlot;
        }
    }


private static Object spinWait(Node node, Slot slot) {
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            else if (spins > 0)
                --spins;
            else
                tryCancel(node, slot);
        }
    }

//尝试取消
private static boolean tryCancel(Node node, Slot slot) {
        if (!node.compareAndSet(null, CANCEL))
            return false;
        if (slot.get() == node) // pre-check to minimize contention
            slot.compareAndSet(node, null);
        return true;
    }


//自旋挂起当前线程
private static Object await(Node node, Slot slot) {
        Thread w = Thread.currentThread();
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            else if (spins > 0)                 // Spin-wait phase
                --spins;
            else if (node.waiter == null)       // Set up to block next
                node.waiter = w;
            else if (w.isInterrupted())         // Abort on interrupt
                tryCancel(node, slot);
            else                                // Block
                LockSupport.park(node);
        }
    }

//自选等待挂起当前线程指定时间
private Object awaitNanos(Node node, Slot slot, long nanos) {
        int spins = TIMED_SPINS;
        long lastTime = 0;
        Thread w = null;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            long now = System.nanoTime();
            if (w == null)
                w = Thread.currentThread();
            else
                nanos -= now - lastTime;
            lastTime = now;
            if (nanos > 0) {
                if (spins > 0)
                    --spins;
                else if (node.waiter == null)
                    node.waiter = w;
                else if (w.isInterrupted())
                    tryCancel(node, slot);
                else
                    LockSupport.parkNanos(node, nanos);
            }
            else if (tryCancel(node, slot) && !w.isInterrupted())
                return scanOnTimeout(node);
        }
    }

    /**
    总结:该类的实现,是当一个线程过来先查看对应的slot中是否已经有数据在等待交换,
    没有则挂起当前线程等待交换数据。有则将数据交换然后唤醒之前等待交换的线程。
    */

分享到:
评论

相关推荐

    Exchanger_1.0.0_win32-源码.rar

    Exchanger_1.0.0_win32-源码.rar

    currency-exchanger

    "currency-exchanger" 是一个基于JavaScript的项目,主要功能可能是实现货币兑换的计算或实时汇率查询。这个项目可能包含了一套完整的前端解决方案,用于帮助用户转换不同国家的货币值。根据提供的信息,我们可以...

    Java Exchanger并发类使用方法

    Java Exchanger并发类使用方法 Java Exchanger是Java 5引入的并发类,用来实现两个线程之间的对象交换。Exchanger顾名思义,就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用...

    Java:Exchanger类的作用.docx

    Java中的`Exchanger`类是`java.util.concurrent`包的一部分,设计用于在多线程环境中进行数据交换。这个类提供了一个同步点,使得两个线程能够有效地传递它们各自的数据。`Exchanger`的核心功能在于它维护了两个槽位...

    java并发Exchanger的使用

    Exchanger是java 5引入的并发类,Exchanger顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。 两个线程都...

    29 一手交钱,一手交货—Exchanger详解.pdf

    在Java并发编程中,Exchanger是一个非常有用的工具类,它允许两个线程间进行数据交换。这个类在处理需要同步的交互式任务时特别有用,比如在多阶段处理或者需要线程间协作的情况。Exchanger的工作原理就像一个中介,...

    JUC并发编程与源码分析视频课.zip

    5. **并发工具类**:包括CountDownLatch、CyclicBarrier、Semaphore、Exchanger等,这些工具可以帮助开发者协调多个线程间的协作,实现复杂的并发控制逻辑。 6. **Future和CompletableFuture**:讲解如何使用Future...

    Java编程线程同步工具Exchanger的使用实例解析

    Java编程线程同步工具Exchanger的使用实例解析 Java编程线程同步工具Exchanger是Java编程语言中的一种线程同步工具,它提供了一种方便的方式来交换线程之间的信息。Exchanger类可以作为两个线程交换对象的同步点,...

    Multi-Objective Optimization of heat exchanger by entropy generation

    ### 多目标优化在换热器设计中的应用:基于熵产... Multi-Objective Optimization of Heat Exchanger Design by Entropy Generation Minimization. Journal of Heat Transfer, 132(8), 081006. DOI: 10.1115/1.4001317.

    Dubbo 源码分析

    深入理解Dubbo的源码有助于开发者优化服务性能,解决实际问题,以及更好地定制化服务。下面,我们将详细探讨Dubbo的几个关键模块。 **1. 服务提供者(Provider)** 服务提供者是Dubbo架构中的基础组件,它负责暴露...

    Hot Water Heat Exchanger Control.rar

    PLC程序会定义传感器(如温度传感器)如何读取热水温度,以及如何根据读数调整阀门或泵的运行以控制热交换效率。PID(比例-积分-微分)控制器是这类系统中常见的算法,它能自动调整设备参数以维持目标温度。 此外,...

    Aspen Plus Heat Exchanger Example换热器模拟案例.pdf

    Aspen Plus是一款用于化工过程模拟、工艺设计与模拟的严格稳态模拟工具。它主要利用以下物理关系: 1. 质量和能量平衡:这是模拟过程中最基本的原理,质量守恒和能量守恒是化工过程模拟的基础。...

    Java多线程编程之使用Exchanger数据交换实例

    Java多线程编程中的Exchanger是一个非常有用的工具类,它位于`java.util.concurrent`包下,主要用于线程间的数据交换。Exchanger的核心功能是让两个线程在一个同步点相遇,进行数据交换。当一个线程调用`exchange`...

    heat_exchanger_simulink换热器_simulink换热_heat_simulink热_simulink仿真_

    本压缩包文件“heat_exchanger_simulink换热器_simulink换热_heat_simulink热_simulink仿真_源码.zip”或"heat_exchanger_simulink换热器_simulink换热_heat_simulink热_simulink仿真_源码.rar"很可能包含了用于模拟...

    Air Cooled Heat Exchanger Design.zip

    风冷式换热器(Air Cooled Heat Exchanger, 简称ACH)利用大气作为冷却介质,通过强制对流的方式带走设备内部热量,与水冷式换热器相比,具有无需水源、维护成本低的优点。 在"Air Cooled Heat Exchanger Design....

    heat_exchanger_simulink换热器_simulink换热_heat_simulink热_simulink仿真.

    本项目涉及的主题是“heat_exchanger_simulink换热器”,它使用Simulink进行热交换过程的建模和仿真。Simulink是MATLAB的一个扩展工具,专门用于动态系统建模和仿真,包括物理系统、控制系统、信号处理和通信系统等...

    java并发工具类(CountDownLatch+Semaphore+Exchanger)

    Java并发工具类是Java并发编程中的重要组成部分,其中包括了多种实用的工具,如CountDownLatch、Semaphore和Exchanger,这些工具类极大地简化了多线程环境下的同步和协调问题。 1. **CountDownLatch**: ...

    CADExchanger:一个使用CAD Exchanger允许导入和导出其他CAD文件格式的FreeCAD插件

    用于FreeCAD的CAD Exchanger插件这个插件允许导入和导出到支持的所有商业CAD文件格式。描述是一种多平台(Windows,MacOS和Linux)商业付费应用程序,必须在其网站上购买(可以免费获得30天的评估)。 此附加组件...

    PDF文件的书签批量自动导入和导出 PDFBookmark-Exchanger-附件资源

    PDF文件的书签批量自动导入和导出 PDFBookmark-Exchanger-附件资源

Global site tag (gtag.js) - Google Analytics