`
BrokenDreams
  • 浏览: 254069 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100190
社区版块
存档分类
最新评论

Jdk1.8 JUC源码增量解析(1)-atomic-Striped64

阅读更多

Jdk1.8 JUC源码增量解析(1)-atomic-Striped64

作者:大飞

 

功能简介:
  • Striped64是jdk1.8提供的用于支持如Long累加器,Double累加器这样机制的基础类。
  • Striped64的设计核心思路就是通过内部的分散计算来避免竞争(比如多线程CAS操作时的竞争)。
  • Striped64内部包含一个基础值和一个单元哈希表。没有竞争的情况下,要累加的数会累加到这个基础值上;如果有竞争的话,会将要累加的数累加到单元哈希表中的某个单元里面。所以整个Striped64的值包括基础值和单元哈希表中所有单元的值的总和。
源码分析:
  • 先看一下内部结构:
    /**
     * 存放Cell的hash表,大小为2的幂。
     */
    transient volatile Cell[] cells;
    /**
     * 基础值,没有竞争时会使用(更新)这个值,同时做为初始化竞争失败的回退方案。
     * 原子更新。
     */
    transient volatile long base;
    /**
     * 自旋锁,通过CAS操作加锁,用于保护创建或者扩展Cell表。
     */
    transient volatile int cellsBusy;

 

       看下Cell的内部结构:

    @sun.misc.Contended 
    static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
       Cell内部保存了一个volatile修饰的long型域,同时提供了原子操作,看起来像一个原子量。
       注意到Cell类被一个Contended注解修饰,Contended的作用是对Cell做缓存行填充,避免伪共享。
 
  • Striped64主要提供了longAccumulate和doubleAccumulate方法来支持子类,先看下longAccumulate:
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        //获取当前线程的probe值作为hash值。
        if ((h = getProbe()) == 0) {
            //如果probe值为0,强制初始化当前线程的probe值,这次初始化的probe值不会为0。
            ThreadLocalRandom.current(); 
            //再次获取probe值作为hash值。
            h = getProbe();
            //这次相当于再次计算了hash,所以设置未竞争标记为true。
            wasUncontended = true;
        }
        boolean collide = false;
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                //通过h从cell表中选定一个cell位置。
                if ((a = as[(n - 1) & h]) == null) {
                    //如果当前位置没有cell,尝试新建一个。
                    if (cellsBusy == 0) {
                        //创建一个Cell。       
                        Cell r = new Cell(x); 
                        //尝试或者cellsBusy锁。
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               
                                Cell[] rs; int m, j;
                                //在获取锁的情况下再次检测一下。
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    //设置新建的cell到指定位置。
                                    rs[j] = r;
                                    //创建标记设置为true。
                                    created = true;
                                }
                            } finally {
                                //释放cellsBusy锁。
                                cellsBusy = 0;
                            }
                            if (created)
                                //如果创建成功,直接跳出循环,退出方法。
                                break;
                            //说明上面指定的cell的位置上有cell了,继续尝试。
                            continue;  
                        }
                    }
                    //走到这里说明获取cellsBusy锁失败。
                    collide = false;
                }
                //以下条件说明上面通过h选定的cell表的位置上有Cell,就是a。
                else if (!wasUncontended)       // CAS already known to fail
                    //如果之前的CAS失败,说明已经发生竞争,
                    //这里会设置未竞争标志位true,然后再次算一个probe值,然后重试。
                    wasUncontended = true;      // Continue after rehash
                //这里尝试将x值加到a的value上。
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    //如果尝试成功,跳出循环,方法退出。
                    break;
                else if (n >= NCPU || cells != as)
                    //如果cell表的size已经最大,或者cell表已经发生变化(as是一个过时的)。
                    collide = false;           
                else if (!collide)
                    //设置冲突标志,表示发生了冲突,重试。
                    collide = true;
                //尝试获取cellsBusy锁。
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //检测as是否过时。
                        if (cells == as) {      
                            //给cell表扩容。
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        //释放cellsBusy锁。
                        cellsBusy = 0;
                    }
                    collide = false;
                    //扩容cell表后,再次重试。
                    continue;                  
                }
                //算出下一个hash值。
                h = advanceProbe(h);
            }
            //如果cell表还未创建,先尝试获取cellsBusy锁。
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                          
                    if (cells == as) {
                        //初始化cell表,初始容量为2。
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //释放cellsBusy锁。
                    cellsBusy = 0;
                }
                if (init)
                    //初始化cell表成功后,退出方法。
                    break;
            }
            //如果创建cell表由于竞争导致失败,尝试将x累加到base上。
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          
        }
    }
       说明一下这个方法,方法的作用是将给定的值x累加到当前值(Striped64本身)上,x值为正就是加、为负就是减。
       方法流程细节:
       首先,方法内部首先会算一个hash值,用来确定cell数组的下标。hash值初始源于当前Thread中的threadLocalRandomProbe域,如果hash值初始后为0,会初始化一下当前线程的threadLocalRandomProbe值,然后再次赋给hash值。注意方法传入第三个参数wasUncontended表示调用方法之前是否未发生竞争,加入前面走了初始化threadLocalRandomProbe的过程,就会将wasUncontended设置为true。
       接下来,方法进入主循环。
       1.先判断Cell表是否创建。
       1.1.如果Cell表未创建,尝试获取cellsBusy锁。
       1.1.1.如果获取cellsBusy锁成功,会创建一个size为2的Cell表作为初始cell表,然后新建一个保存给定x的Cell实例,然后根据hash值设置到Cell表对应的位置上;
       1.1.2.如果获取cellsBusy锁失败,会尝试将x累加到base上,失败重试。
       1.2.如果Cell表已经创建,通过hash值算出一个Cell表中的位置,然后获取这个位置上的Cell,称为a。
       1.2.1.如果a为null,尝试获取cellsBusy锁。
       1.2.1.1.如果获取cellsBusy成功,创建一个新的Cell,然后赋值给a,方法退出。(过程中需要多次检测冲突)
       1.2.1.2.如果获取cellsBusy失败,会将collide设置为false(实际上是表示发生了冲突),然后重试。
       1.2.2.如果a不为null。
       1.2.2.1.如果wasUncontended为false,说明之前发生过CAS竞争失败,设置wasUncontended为true,重新计算hash值,重试;如果wasUncontended为true,继续尝试下面过程。
       1.2.2.2.尝试通过CAS方式将x累加到a的value上,如果尝试成功,方法退出;如果尝试失败,继续尝试下面过程。
       1.2.2.3.如果当前Cell表的大小以及达到最大值(当前处理器核数),或者Cell表发生了变化(竞争导致过时),那么会设置collide为false,重新计算hash值,然后重试;否则,继续尝试下面过程。
       1.2.2.4.如果collide为false,说明之前发生过冲突,将collide设置为true,重新计算hash值,然后重试;否则,继续尝试下面过程。
       1.2.2.5.尝试获取cellsBusy,如果成功,扩展Cell表,并将collide设置为false,然后重试;否则,重新计算hash值,然后重试;
 

       看下longAccumulate中使用到的一些方法:

    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

    //计算下一个随机值作为hash值,使用xorshift算法。
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        //设置到当前线程的threadLocalRandomProbe域。
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

  

  • 再看下doubleAccumulate:
    final void doubleAccumulate(double x, DoubleBinaryOperator fn,
                                boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(Double.doubleToRawLongBits(x));
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value,
                               ((fn == null) ?
                                Double.doubleToRawLongBits
                                (Double.longBitsToDouble(v) + x) :
                                Double.doubleToRawLongBits
                                (fn.applyAsDouble
                                 (Double.longBitsToDouble(v), x)))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base,
                             ((fn == null) ?
                              Double.doubleToRawLongBits
                              (Double.longBitsToDouble(v) + x) :
                              Double.doubleToRawLongBits
                              (fn.applyAsDouble
                               (Double.longBitsToDouble(v), x)))))
                break;                          // Fall back on using base
        }
    }
       doubleAccumulate方法是针对double值做累加的,逻辑和longAccumulate一致。但由于Cell内部用long保存数据,所以在累加的时候会利用Double的doubleToRawLongBits和longBitsToDouble方法做double和longBits形式的double之间的转换。
 
       Striped64的代码解析完毕!
      

 

 

分享到:
评论

相关推荐

    java-jdk1.8-8u361-all-jdk-win-linux

    java-jdk1.8-8u361-all-jdk-win-linux 该压缩包中包含jdk1.8-8u361下windows版本和linux版本,其包含快速安装包和对应的jdk压缩包版本,具体内容如下: jdk-8u361-linux-aarch64.rpm jdk-8u361-linux-i586.rpm jdk-8...

    java-jdk1.8-jdk-8u202-linux-x64.zip

    1. **下载**:首先,你需要从Oracle官方网站或者通过提供的链接(blog.csdn.net/FL1623863129/article/details/134426730)下载适用于Linux x64的`java-jdk1.8-jdk-8u202-linux-x64.zip`压缩文件。确保文件完整无损...

    JDK1.8安装包Linux可用(jdk-8u391-linux-aarch64.tar)

    这里的"jdk-8u391-linux-aarch64.tar"是一个针对64位(aarch64架构)Linux系统的归档文件,我们需要对其进行解压并安装。 安装步骤如下: 1. **下载**:首先,你需要将JDK1.8的安装包下载到你的Linux系统中。这...

    java-jdk1.8-jdk-8u202-windows-x64.zip

    1. **下载**:从官方Oracle网站或其他可信赖的源下载`jdk-8u202-windows-x64.exe`安装程序。 2. **验证文件**:确保下载的文件完整且未被篡改,可以检查MD5或SHA校验和。 3. **双击安装**:在Windows x64系统上,...

    java-jdk1.8-jdk-8u181-windows-x64.zip

    这个压缩包"java-jdk1.8-jdk-8u181-windows-x64.zip"内包含两个文件:一个是主安装程序“jdk-8u181-windows-x64.exe”,用于在Windows 64位系统上安装JDK 1.8的更新181版本;另一个是“使用说明.txt”,通常会提供...

    java-jdk1.8-jdk-8u201-windows-x64.zip

    安装JDK 1.8的过程非常简单,只需要双击下载的“jdk-8u201-windows-x64.exe”文件,然后按照安装向导的提示进行操作。通常,安装过程中会提供选择安装路径、是否设置环境变量等选项。为了确保Java环境的正确配置,...

    jdk1.8可用的dubbo-admin-2.5.4.war

    jdk1.8版本可用,本地测试成功,本地版本 java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

    MacOS M2 jdk1.8资源 jdk-8u381-macosx-aarch64.dmg

    首先,`jdk-8u381-macosx-aarch64.dmg`是JDK 1.8更新381的MacOS版安装文件,其扩展名`.dmg`表明这是苹果操作系统使用的磁盘映像文件。这个版本特别针对M2芯片进行了优化,因为`aarch64`代表的是ARM架构的64位版本,...

    java-jdk1.8-jdk-8u191-linux-x64.zip

    这个压缩包文件“java-jdk1.8-jdk-8u191-linux-x64.zip”包含了用于在64位Linux系统上安装和使用的JDK 1.8更新191的所有必要组件。JDK(Java Development Kit)是开发和运行Java应用程序的基础,它包括了Java编译器...

    java-jdk1.8-jdk-8u201-linux-x64.zip

    1. 解压下载的zip文件,使用命令`unzip java-jdk1.8-jdk-8u201-linux-x64.zip`。 2. 这将解压出名为`jdk-8u201-linux-x64.tar.gz`的文件,接着使用`tar -zxvf jdk-8u80-linux-x64.tar.gz`将其解压到指定目录。 3. ...

    JDK 1.8 jdk-8u171-windows-x64

    JDK 1.8 jdk-8u171-windows-x64

    推荐一款JDK1.8版本非常好用-jdk-8u361-windows-x64

    JDK1.8安装文件安装即可,非常Nice~ 步骤1:下载JDK 首先,您需要把当前资源下载下来节约到指定目录 步骤2:安装JDK 一旦下载完成,您需要运行安装程序来安装JDK。如果您下载的是压缩包,需要解压缩到您希望安装...

    linux版jdk1.8 jdk-8u212-linux-x64.tar.gz

    Linux x64版本并下载jdk-8u212-linux-x64.tar.gz文件。 将下载的文件解压缩到您选择的目录中。 配置环境变量,使系统能够找到Java运行时环境。可以通过设置JAVA_HOME和PATH环境变量来实现。

    jdk-1.8-linux-64-03

    三部分: jdk-1.8-linux-64-01 jdk-1.8-linux-64-02 jdk-1.8-linux-64-03

    jdk1.8-windows-32位-免安装

    "jdk1.8-windows-32位-免安装"是指针对Windows操作系统,适用于32位系统的JDK 1.8版本,它具有无需安装的特点,用户只需将其解压缩到指定位置即可开始使用。 JDK 1.8,也被称为Java 8,是Java编程语言的一个重要...

    java-jdk1.8-jdk-8u192-linux-x64.zip

    1. 首先,使用`unzip java-jdk1.8-jdk-8u192-linux-x64.zip`命令解压缩文件。 2. 解压后,你会得到一个名为`jdk-8u192-linux-x64.tar.gz`的文件,使用`tar -zxvf jdk-8u192-linux-x64.tar.gz`命令将其解压到指定目录...

    最后一版支持JDK1.8的eclipse eclipse-jee-2020-06-R-win32-x86_64

    1. JDK 1.8:Java Development Kit(JDK)的第8个主要版本,它引入了多项重要特性,如 Lambda 表达式、函数式编程、Stream API、方法引用、新的日期/时间API以及对并发的改进。JDK 1.8的长期支持(LTS)状态也使得...

    java-jdk1.8-jdk-8u191-windows-x64.zip

    关于安装步骤,描述中提到“双击安装即可”,这意味着在Windows x64环境下,用户只需要下载`jdk-8u191-windows-x64.exe`这个安装文件,然后双击运行,按照安装向导的提示进行操作,选择安装路径,确认许可协议,最后...

    java-jdk1.8-jdk-8u152-linux-x64.zip

    1. **下载**:从官方Oracle网站或其他可靠的来源下载适用于Linux x64的JDK 1.8u152压缩包,文件名为`jdk-8u152-linux-x64.tar.gz`。 2. **解压**:使用`tar`命令解压下载的文件。例如,`tar -zxvf jdk-8u152-linux-...

    jdk1.8 32位官方正式版 jdk-8u91-windows-i586

    jdk1.8 32位官方正式版 jdk-8u91-windows-i586

Global site tag (gtag.js) - Google Analytics