`
BrokenDreams
  • 浏览: 255476 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100733
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(27)-Exchanger

阅读更多

Jdk1.6 JUC源码解析(27)-Exchanger

作者:大飞

 

功能简介:
  • Exchanger是一种线程间安全交换数据的机制。可以和之前分析过的SynchronousQueue对比一下:线程A通过SynchronousQueue将数据a交给线程B;线程A通过Exchanger和线程B交换数据,线程A把数据a交给线程B,同时线程B把数据b交给线程A。可见,SynchronousQueue是交给一个数据,Exchanger是交换两个数据。
 
源码分析:
  • 先看下内部结构:
    private static final class Node extends AtomicReference<Object> {
        /** 创建这个节点的线程提供的用于交换的数据。 */
        public final Object item;
        /** 等待唤醒的线程 */
        public volatile Thread waiter;
        /**
         * Creates node with given item and empty hole.
         * @param item the item
         */
        public Node(Object item) {
            this.item = item;
        }
    }

    /**
     * 一个Slot就是一对线程交换数据的地方。
     * 这里对Slot做了缓存行填充,能够避免伪共享问题。
     * 虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。
     */
    private static final class Slot extends AtomicReference<Object> {
        // Improve likelihood of isolation on <= 64 byte cache lines
        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
    }

    /**
     * Slot数组,在需要时才进行初始化。
     * 用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。
     */
    private volatile Slot[] arena = new Slot[CAPACITY];
    /**
     * arena(Slot数组)的容量。设置这个值用来避免竞争。
     */
    private static final int CAPACITY = 32;
    /**
     * 正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后,
     * 这个值会递增;当一个线程自旋等待超时后,这个值会递减。
     */
    private final AtomicInteger max = new AtomicInteger();
       内部结构很清晰,首先内部包含一个Slot数组,默认容量是32,用来避免以一些竞争,有点类似于ConcurrentHashMap的策略;其次,交换数据的场所就是Slot,它本身进行了cache line填充,避免了伪共享问题;最后,每个要进行数据交换的线程在内部会用一个Node来表示。
       伪共享说明:假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpu cache line里面。并发情况下,如果一个线程修改了a,会导致整个cache line失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能。
 
  • 看完了内部结构,接下来就从Exchanger的交换数据方法exchange入手来分析代码:
    /**
     * 等待其他线程到达交换点,然后与其进行数据交换。
     *
     * 如果其他线程到来,那么交换数据,返回。
     *
     * 如果其他线程未到来,那么当前线程等待,知道如下情况发生:
     *   1.有其他线程来进行数据交换。
     *   2.当前线程被中断。
     */
    public V exchange(V x) throws InterruptedException {
        if (!Thread.interrupted()) {//检测当前线程是否被中断。
            //进行数据交换。
            Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
            if (v == NULL_ITEM)
                return null; //检测结果是否为null。
            if (v != CANCEL) //检测是否被取消。
                return (V)v;
            Thread.interrupted(); // 清除中断标记。
        }
        throw new InterruptedException();
    }
    /**
     * 等待其他线程到达交换点,然后与其进行数据交换。
     * 
     * 如果其他线程到来,那么交换数据,返回。
     * 
     * 如果其他线程未到来,那么当前线程等待,知道如下情况发生:
     *   1.有其他线程来进行数据交换。
     *   2.当前线程被中断。
     *   3.超时。
     */
    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        if (!Thread.interrupted()) {
            Object v = doExchange(x == null? NULL_ITEM : x,
                                  true, unit.toNanos(timeout));
            if (v == NULL_ITEM)
                return null;
            if (v != CANCEL)
                return (V)v;
            if (!Thread.interrupted())
                throw new TimeoutException();
        }
        throw new InterruptedException();
    }

       上面的方法都调用了doExchange方法,主要逻辑在这个方法里,分析下这个方法: 

    /**
     * 这个方法会处理不同的情况,使用Object而不是泛型,主要是为了返回一些
     * 哨兵值(比如表示null和取消的对象)。
     *
     * @param item 用来进行交换的数据。
     * @param timed 如果有超时延迟,设置为true
     * @param nanos 具体的超时时间。
     * @return 返回另一个线程(与当前线程交换数据)的数据,或者CANCEL(表示取消)
     */
    private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);                 // 创建当前节点me。
        int index = hashIndex();                  // 计算出当前slot的下标。
        int fails = 0;                            // 用来保存CAS失败的次数。
        for (;;) {
            Object y;                             // 用来保存当前slot中可能存在的Node。
            Slot slot = arena[index];             // 按照前面计算出的下标获取当前slot。
            if (slot == null)                     
                createSlot(index);                // 如果slot为null,那么创建一个slot,然后继续循环。
            else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { // 如果slot不为空,那么slot可能被另一个Node给占了,如果确实存在这个Node,尝试将其置空。(表示当前节点要和这个Node交换数据了)
                Node you = (Node)y;               // 给这个Node转型,赋给you。
                if (you.compareAndSet(null, item)) { // 将item设置给you,注意you本身是一个AtomicReference,这里相当于把item设置到you的value字段上。
                    LockSupport.unpark(you.waiter); // 然后唤醒you节点上等待的线程。
                    return you.item;                // 返回you的item。
                }                                 // 竞争失败,放弃,继续循环。
            }
            else if (y == null &&                 // 如果slot为空,那么说明没有要和当前线程交换数据的线程,
                     slot.compareAndSet(null, me)) { //那么当前线程先尝试把这个slot给占了。
                if (index == 0)                   // 如果slot下标为0,那么阻塞等待。
                    return timed? awaitNanos(me, slot, nanos): await(me, slot); // 有超时的话,会阻塞给定的时间。
                Object v = spinWait(me, slot);    // 如果slot下标不是0,自旋等待,等待其他线程来和当前线程交换数据,然后返回交换后的数据。
                if (v != CANCEL)
                    return v;
                me = new Node(item);              // 如果取消的话,重试,重建一个Node,之前的Node就丢弃了。
                int m = max.get();                // 获取当前slot下标的最大值。
                if (m > (index >>>= 1))           // 如果当前允许的最大索引太大。
                    max.compareAndSet(m, m - 1);  // 递减最大索引
            }
            else if (++fails > 1) {               // 如果1个slot竞争失败超过2次。
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) //如果竞争失败超过3次,尝试递增最大索引值。
                    index = m + 1;                // 增加索引值。
                else if (--index < 0)             // 换个index。
                    index = m;                    // 绕回逻辑,防止index越界。
            }
        }
    }
       这里形象的理解一下:
              其实就是"我"和"你"(可能有多个"我",多个"你")在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:
              1.我到交易地点(Slot)的时候,你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我只能再找别人了,进入第5步。
              2.我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
              3.我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上...),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
              4.你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
              5.如果之前我尝试交易了2次都没成功,那我就想我TM选的这个位置(Slot下标)是不是风水不好啊,换个地儿继续(从头开始);如果之前都尝试交易了4次还没成功,我怒了,喊过来交易地点的管理员:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!
 

       看一下doExchange调用的计算slot下标的方法:

    /**
     * Returns a hash index for the current thread.  Uses a one-step
     * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
     * based on the current thread's Thread.getId().  These hash codes
     * have more uniform distribution properties with respect to small
     * moduli (here 1-31) than do other simple hashing functions.
     *
     * <p>To return an index between 0 and max, we use a cheap
     * approximation to a mod operation, that also corrects for bias
     * due to non-power-of-2 remaindering (see {@link
     * java.util.Random#nextInt}).  Bits of the hashcode are masked
     * with "nbits", the ceiling power of two of table size (looked up
     * in a table packed into three ints).  If too large, this is
     * retried after rotating the hash by nbits bits, while forcing new
     * top bit to 0, which guarantees eventual termination (although
     * with a non-random-bias).  This requires an average of less than
     * 2 tries for all table sizes, and has a maximum 2% difference
     * from perfectly uniform slot probabilities when applied to all
     * possible hash codes for sizes less than 32.
     *
     * @return a per-thread-random index, 0 <= index < max
     */
    private final int hashIndex() {
        long id = Thread.currentThread().getId();
        int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
        int m = max.get();
        int nbits = (((0xfffffc00  >> m) & 4) | // Compute ceil(log2(m+1))
                     ((0x000001f8 >>> m) & 2) | // The constants hold
                     ((0xffff00f2 >>> m) & 1)); // a lookup table
        int index;
        while ((index = hash & ((1 << nbits) - 1)) > m)       // May retry on
            hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
        return index;
    }
       这里就是根据当前线程的ID,算一个hash值,然后针对slot最大index值做了一个近似取模的操作来计算slot的下标。
 

       接下来看一下createSlot方法:  

    private void createSlot(int index) {
        // 在同步块外面创建Slot实例,以减小同步块范围。
        Slot newSlot = new Slot();
        Slot[] a = arena;
        synchronized (a) {
            if (a[index] == null)
                a[index] = newSlot;
        }
    }

 

       再看一下awaitNanos方法:
    /**
     * 在下标为0的Slot上等待获取其他线程填充的值。
     * 如果在Slot被填充之前超时或者被中断,那么操作失败。
     */
    private Object awaitNanos(Node node, Slot slot, long nanos) {
        int spins = TIMED_SPINS;
        long lastTime = 0;
        Thread w = null;
        for (;;) {
            Object v = node.get();
            if (v != null)
                //如果已经被其他线程填充了值,那么返回这个值。
                return v;
            long now = System.nanoTime();
            if (w == null)
                w = Thread.currentThread();
            else
                nanos -= now - lastTime;
            lastTime = now;
            if (nanos > 0) {
                if (spins > 0)
                    --spins; //先自旋几次。
                else if (node.waiter == null)
                    node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。
                else if (w.isInterrupted())
                    tryCancel(node, slot); //如果当前线程被中断,尝试取消node。
                else
                    LockSupport.parkNanos(node, nanos); //阻塞给定的时间。
            }
            else if (tryCancel(node, slot) && !w.isInterrupted())
                //超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点
                return scanOnTimeout(node);
        }
    }
       awaitNanos中的自旋次数为TIMED_SPINS,这里说明一下自旋次数: 
    /**
     * 单核处理器下这个自旋次数为0
     * 多核情况下,这个值设置为大多数系统中上下文切换时间的平均值。
     */
    private static final int SPINS = (NCPU == 1) ? 0 : 2000;
    /**
     * 在有超时情况下阻塞等待之前自旋的次数。.
     * 超时等待的自旋次数之所以更少,是因为检测时间也需要耗费时间。
     * 这里的值是一个经验值。
     */
    private static final int TIMED_SPINS = SPINS / 20;
 
       继续看一下tryCancel方法:
    private static boolean tryCancel(Node node, Slot slot) {
        if (!node.compareAndSet(null, CANCEL))//尝试取消node
            return false;
        if (slot.get() == node) // pre-check to minimize contention
            slot.compareAndSet(node, null); //如果还关联在sot上,断开关联。
        return true;
    }
 
       继续看awaitNanos方法中最后调用的scanOnTimeout方法,这个方法在要取消的时候调用,找一下其他下标的Slot上有没有可以交换数据的节点,找到的话就可以成功交换数据,而不取消了:
    private Object scanOnTimeout(Node node) {
        Object y;
        for (int j = arena.length - 1; j >= 0; --j) {
            //从Slot数组的后面往前找
            Slot slot = arena[j];
            if (slot != null) {
                //找到了有初始化好的Slot,然后看看里面有没有node。
                while ((y = slot.get()) != null) {
                    //发现有node,尝试和这个node进行数据交换。
                    if (slot.compareAndSet(y, null)) {
                        Node you = (Node)y;
                        //尝试进行数据交换,
                        if (you.compareAndSet(null, node.item)) {
                            //如果交换成功(把当前节点的数据交给you),唤醒you上面等待的线程。
                            LockSupport.unpark(you.waiter);
                            //返回you的数据。
                            return you.item;
                        }
                    }
                }
            }
        }
        //没找到其他等待交换数据的线程,最后取消当前节点node。
        return CANCEL;
    }
 
       上面看的awaitNanos方法是在下标为0的Slot里面,有超时情况下的处理方式。再看下没有超时情况的处理方法await:
    private static Object await(Node node, Slot slot) {
        Thread w = Thread.currentThread();
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                //如果已经被其他线程填充了值,那么返回这个值。
                return v;
            else if (spins > 0)                 // 先自旋几次。
                --spins;
            else if (node.waiter == null)       // 自旋阶段完毕后,将当前线程设置到node的waiter域。
                node.waiter = w;
            else if (w.isInterrupted())         // 如果当前线程被中断,尝试取消当前node。
                tryCancel(node, slot);
            else                                // 否则阻塞当前线程。
                LockSupport.park(node);
        }
    }
 
      之前看的awaitNanos和await方法都是在下标为0的Slot的情况下采取的有阻塞行为的处理方式,如果下标不为0,采取完全自旋的方式,调用方法spinWait:
    private static Object spinWait(Node node, Slot slot) {
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            else if (spins > 0)
                --spins; //先自旋
            else
                tryCancel(node, slot); //自旋了指定的次数还没等到交换的数据,尝试取消。
        }
    }
 
      最后看一下arena(Slot数组),默认的容量和实际使用的下标最大值:
    private static final int CAPACITY = 32;
    /**
     * The value of "max" that will hold all threads without
     * contention.  When this value is less than CAPACITY, some
     * otherwise wasted expansion can be avoided.
     */
    private static final int FULL =
        Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
        前面说过arena容量默认为32,目的是为了减少线程的竞争,但实际上对arena的使用不会超过FULL这个值(避免一些空间浪费)。这个值取的是32(默认CAPACITY)和CPU核心数量的一半,这两个数的较小值在减1的数和0的较大值.... 也就是说,如果CPU核很多的情况下,这个值最大也就是31,;如果是单核或者双核CPU,这个值就是0,也就是说只能用arena[0]。这也是为什么前面的hashIndex方法里面会做的(近似)取模操作比较复杂,因为实际的能使用的Slot数组范围可能不是2的幂。
 
       Exchanger的代码解析完毕!

 

分享到:
评论
2 楼 BrokenDreams 2015-12-22  
pcgreat 写道
写的很好 。

感谢支持
1 楼 pcgreat 2015-12-22  
写的很好 。

相关推荐

    aspose-words-15.8.0-jdk1.6

    aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...

    jdk-1.6-windows-64-01

    2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02

    okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.zip

    1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar

    JDK 1.6 64位(jdk-6u45-windows-x64(1.6 64))

    下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...

    jdk-1.6-linux-64-3

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    jdk-jdk1.6.0.24-windows-i586.exe

    标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...

    jdk1.6集成jjwt的问题

    标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...

    java-jdk1.6-jdk-6u45-windows-x64.zip

    1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...

    jdk-1.6-linux-64-2

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    jdk-1.6-windows-32-3

    三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3

    jdk-1.6-linux-64-1

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    JDK1.6安装及与JDK-1.5版本共存

    ### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...

    logback-cfca-jdk1.6-3.1.0.0.jar

    logback-cfca-jdk1.6-3.1.0.0.jar

    zxing jar包,支持jdk1.6,包括源码

    - 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...

    jdk-1.6-linux-32-2

    jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3

    jdk-6u45-linux-x64.zip_jdk-1.6u45_jdk-6u45_jdk-6u45-linux-x64_jd

    这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...

    jdk1.6官方版 jdk-6u45-windows-x64 下载

    java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64

    jdk1.6 Java编程工具jdk-6u10-windows-i586-p.exe

    Java编程开发工具包,最新版本,很好用,经典

    JDK1.6_10_windows-i586.exe

    《深入解析JDK 1.6:构建高效Java开发环境》 JDK(Java Development Kit)是Oracle公司发布的用于开发Java应用程序的重要工具集,它为Java开发者提供了编译、调试和运行Java程序所需的环境。JDK 1.6,又称为Java SE...

    Linux64位JDK1.6(jdk-6u45-linux-x64.bin)

    Linux64位环境下的jdk6安装包:jdk-6u45-linux-x64.bin。 由于积分无法修改,现提供网盘下载地址: https://pan.baidu.com/s/1BE55ImTxZTQO6T22051P2g 提取码:5wvm

Global site tag (gtag.js) - Google Analytics