`
wen866595
  • 浏览: 268377 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Java8 Striped64 和 LongAdder

    博客分类:
  • java
阅读更多

 

原文链接码蜂笔记 - Java8 Striped64 和 LongAdder

 

数据 striping

根据维基百科的这段说明

In computer data storage, data striping is the technique of segmenting logically sequential data, such as a file, so that consecutive segments are stored on different physical storage devices.

Striping is useful when a processing device requests data more quickly than a single storage device can provide it. By spreading segments across multiple devices which can be accessed concurrently, total data throughput is increased. It is also a useful method for balancing I/O load across an array of disks. Striping is used across disk drives in redundant array of independent disks (RAID) storage, network interface controllers, different computers in clustered file systems and grid-oriented storage, and RAM in some systems.

数据 striping 就是把逻辑上连续的数据分为多个段,使这一序列的段存储在不同的物理设备上。通过把段分散到多个设备上可以增加访问并发性,从而提升总体的吞吐量。

 

Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

 

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

 

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对 性能有巨大的副作用),在没有这个防备下。

 

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

 

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

 

 

通过 ThreadLocalRandom 维护线程探针字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存 在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法(Marsaglia XorShift)来尝试找到一个自由槽位。

 

表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们 到达容量,我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence 是慢的,因为线程通常不会一直绑定到CPU上,可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。

 

当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。

 

设计思路小结

  • striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。
  • 分解表示:对于一个数字 5,可以分解为一序列数的和:2 + 3,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)
  • 通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。
  • 当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。

 

Cell 类

Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。本质上是一个填充了的、提供了CAS更新的volatile变量。

@sun.misc.Contended static final class Cell {
     volatile long value;
     Cell(long x) { value = x; }
     final boolean cas(long cmp, long val) {
          return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
     }

     // Unsafe mechanics
     private static final sun.misc.Unsafe UNSAFE;
     private static final long valueOffset;
     static {
          try {
               UNSAFE = sun.misc.Unsafe.getUnsafe();
               Class<?> ak = Cell.class;
               valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
          } catch (Exception e) {
               throw new Error(e);
          }
     }
}

 

Striped64

Striped64 通过一个 Cell 数组维持了一序列分解数的表示,通过 base 字段维持数的初始值,通过 cellsBusy 字段来控制 resing 和/或 创建Cell。它还提供了对数进行累加的机制。

abstract class Striped64 extends Number {
    static final int NCPU = Runtime.getRuntime().availableProcessors();

     // 存放 Cell 的表。当不为空时大小是 2 的幂。
    transient volatile Cell[] cells;

     // base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。
    transient volatile long base;

     // 自旋锁,在 resizing 和/或 创建Cell时使用。
    transient volatile int cellsBusy;
}

 

累加机制 longAccumulate

设计思路里针对机制的实现,核心逻辑。该方法处理涉及初始化、resing、创建新cell、和/或竞争的更新。

逻辑如下:

  • if 表已初始化

    • if 映射到的槽是空的,加锁后再次判断,如果仍然是空的,初始化cell并关联到槽。
    • else if (槽不为空)在槽上之前的CAS已经失败,重试。
    • else if (槽不为空、且之前的CAS没失败,)在此槽的cell上尝试更新
    • else if 表已达到容量上限或被扩容了,重试。
    • else if 如果不存在冲突,则设置为存在冲突,重试。
    • else if 如果成功获取到锁,则扩容。
    • else 重散列,尝试其他槽。
  • else if 锁空闲且获取锁成功,初始化表

  • else if 回退 base 上更新且成功则退出
  • else 继续
final void longAccumulate(long x, LongBinaryOperator fn,
                                boolean wasUncontended) {
     int h;
     if ((h = getProbe()) == 0) {
          // 未初始化的
          ThreadLocalRandom.current(); // 强制初始化
          h = getProbe();
          wasUncontended = true;
     }

     // 最后的槽不为空则 true,也用于控制扩容,false重试。
     boolean collide = false;
     for (;;) {
          Cell[] as; Cell a; int n; long v;
          if ((as = cells) != null && (n = as.length) > 0) {
               // 表已经初始化

               if ((a = as[(n - 1) & h]) == null) {
                    // 线程所映射到的槽是空的。

                    if (cellsBusy == 0) {       // 尝试关联新的Cell

                         // 锁未被使用,乐观地创建并初始化cell。
                         Cell r = new Cell(x);

                         if (cellsBusy == 0 && casCellsBusy()) {
                              // 锁仍然是空闲的、且成功获取到锁

                              boolean created = false;
                              try {               // 在持有锁时再次检查槽是否空闲。
                                   Cell[] rs; int m, j;
                                   if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        // 所映射的槽仍为空

                                        rs[j] = r;          // 关联 cell 到槽
                                        created = true;
                                   }
                              } finally {
                                   cellsBusy = 0;     // 释放锁
                              }
                              if (created)
                                   break;               // 成功创建cell并关联到槽,退出
                              continue;           // 槽现在不为空了
                         }
                    }
                    // 锁被占用了,重试
                    collide = false;
               }
               // 槽被占用了

               else if (!wasUncontended)       // 已经知道 CAS 失败
                    wasUncontended = true;      // 在重散列后继续

               // 在当前槽的cell上尝试更新
               else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                  fn.applyAsLong(v, x))))
                    break;

               // 表大小达到上限或扩容了;
               // 表达到上限后就不会再尝试下面if的扩容了,只会重散列,尝试其他槽
               else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale

               //  如果不存在冲突,则设置为存在冲突
               else if (!collide)
                    collide = true;

               // 有竞争,需要扩容
               else if (cellsBusy == 0 && casCellsBusy()) {
                    // 锁空闲且成功获取到锁
                    try {
                         if (cells == as) {      // 距上一次检查后表没有改变,扩容:加倍
                              Cell[] rs = new Cell[n << 1];
                              for (int i = 0; i < n; ++i)
                                   rs[i] = as[i];
                              cells = rs;
                         }
                    } finally {
                         cellsBusy = 0;     // 释放锁
                    }
                    collide = false;
                    continue;                   // 在扩容后的表上重试
               }

               // 没法获取锁,重散列,尝试其他槽
               h = advanceProbe(h);
          }
          else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
               // 加锁的情况下初始化表

               boolean init = false;
               try {                           // Initialize table
                    if (cells == as) {
                         Cell[] rs = new Cell[2];
                         rs[h & 1] = new Cell(x);
                         cells = rs;
                         init = true;
                    }
               } finally {
                    cellsBusy = 0;     // 释放锁
               }
               if (init)
                    break;     // 成功初始化,已更新,跳出循环
          }
          else if (casBase(v = base, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
               // 表未被初始化,可能正在初始化,回退使用 base。
               break;                          // 回退到使用 base
     }
}

 

LongAdder

LongAdder 继承自 Striped64,它的方法只针对简单的情况:cell存在且更新无竞争,其余情况都通过 Striped64 的longAccumulate方法来完成。

public void add(long x) {
     Cell[] as; long b, v; int m; Cell a;
     if ((as = cells) != null || !casBase(b = base, b + x)) {
          // cells 不为空 或在 base 上cas失败。也即出现了竞争。
          boolean uncontended = true;

          //
          if (as == null || (m = as.length - 1) < 0 ||
               (a = as[getProbe() & m]) == null ||
               !(uncontended = a.cas(v = a.value, v + x)))
               // 如果所映射的槽不为空,且成功更新则返回,否则进入复杂处理流程。
               longAccumulate(x, null, uncontended);
     }
}

// 获取当前的和。base值加上每个cell的值。
public long sum() {
     Cell[] as = cells; Cell a;
     long sum = base;
     if (as != null) {
          for (int i = 0; i < as.length; ++i) {
               if ((a = as[i]) != null)
                    sum += a.value;
          }
     }
     return sum;
}
0
0
分享到:
评论

相关推荐

    Java并发编程包中atomic的实现原理示例详解

    他们都是继承于Striped64。LongAdder与AtomicLong的区别是,LongAdder引入了分段锁的概念,可以根据当前线程哈希到对于Cell上进行修改。 五、LongAdder的实现原理 LongAdder的实现原理是通过CAS乐观锁保证原子性,...

    java进度条的使用视频

    Java进度条是GUI(图形用户界面)编程中一个常见的组件,通常用于在执行长时间操作时向用户显示任务的进度,从而提高用户体验。本教程将详细讲解如何在Java中使用进度条,以配合视频学习。 首先,Java中的进度条...

    java 1.7 api 中文

    10. **并发编程改进**:`ConcurrentHashMap`的性能提升,以及`Phaser`和`Striped64`等新的并发工具类,增强了Java的多线程编程能力。 Java 1.7 API中文手册不仅涵盖了这些核心特性,还详细解释了每一个类、接口、...

    java 数据缓存

    源码中,你可以看到它使用了Striped64来实现并发控制,以及WeakReference和SoftReference来管理内存占用。Guava Cache的使用非常简单,可以通过CacheBuilder构建器定制缓存配置,并通过LoadingCache接口进行操作。 ...

    java笔试题算法-Complete-Striped-Smith-Waterman-Library:Complete-Striped-Smit

    java笔试题算法SSW 图书馆 用于基因组应用的 SIMD Smith-Waterman C/C++/Python/Java/R 库 许可证:麻省理工学院 版权所有 (c) 2012-2015 波士顿学院 特此授予任何人免费获得本软件副本和相关文档文件(“软件”)的...

    Bootstrap表格控件的使用,与JAVA后台数据连接,可自动翻页

    在使用`DataTables`时,需要在HTML文件中引入相关的CSS和JS文件,并对目标表格添加特定的类名,如`table table-striped table-bordered`等,以激活这个插件。 接着,我们来看如何与JAVA后台进行数据连接。通常,...

    Guava for java 实例学习

    3. **并发工具**:Guava包含了一些线程安全的数据结构,如LoadingCache和Striped锁,以及并发集合,如ConcurrentHashMultiset和CountingDownLatch,帮助开发者编写高效的并发代码。 4. **函数式编程**:Guava提供了...

    无锁队列Disruptor超详细教程

    Java 8引入了LongAdder和Striped64类,它们利用Cell元素来消除伪共享,提高并发性能。 ### 2. Disruptor的使用实战 #### (1) 定义Event和工厂 Disruptor的运作基于Event,开发者需要定义Event类以存储传递的数据。...

    Java基础学习55.pdf

    在Java基础学习中,Bootstrap是一个重要的前端开发框架,它以其响应式布局和移动优先的设计理念被广泛应用于网页开发。Bootstrap包含了大量的CSS和JavaScript组件,简化了前端开发过程。在学习Bootstrap时,了解并...

    JDK8Translation:学习Java8代码,了解Java常用类库的原理,该工程转换为JDK8常用的类库注释

    2.Striped64翻译初版。该类关于并发计数提出了新的解决思路,ConcurrentHashMap元素个数的统计都是基于该思想做的。 3.翻译线程中的状态类,对应线程类中的枚举类State。 4.翻译线程池相关类中的部分注释。 5....

    jdk课程配套课件资料.zip

    5. **Striped64**:这是一种新的基础线程安全的数据结构,提高了并发性能。 **JDK1.8的重要特性** JDK1.8(也称为Java 8)是Java历史上的一个关键版本,引入了许多创新特性,包括: 1. **Lambda表达式**:Lambda...

    JDK中文版1.7

    `java.util.concurrent`包中添加了一些新的并发工具类,如`Phaser`,`Striped64`等,以帮助开发者更好地管理多线程环境中的同步问题。 ### **JAVA_API_1.7中文.chm** `JAVA_API_1.7中文.chm`是JDK 1.7的中文版API...

    Molecular Cloning and Characterization of the First Caspase in the Striped Stem Borer, Chilo suppressalis

    标题所指的知识点包括:分子克隆、半胱天冬氨酶(caspases)、水稻二化螟(Striped Stem Borer, Chilo suppressalis)、基因克隆与特性描述。 描述所涉及的知识点有:水稻害虫、细胞凋亡、半胱天冬氨酶的作用、水稻...

    google guava23.5

    - **Lists**, **Sets** 和 **Maps** 的并发实现,如 `ConcurrentHashMultiset` 和 `Striped64`。 - **Futures**:增强的异步计算模型,支持取消任务和检查结果是否可用。 - **CountDownLatch** 和 **...

    toolkit:Java工具箱(常用工具,省时省力)

    Java工具箱和Google Guava库是Java开发者的得力助手,它们封装了许多常用功能,简化了复杂操作,提高了代码的可读性和维护性。在实际开发中,熟练掌握并合理运用这些工具,将极大地提升工作效率。

    batik-all-striped-1.7.jar

    官方版本,亲测可用

    com.google.common.jar的替换包guava-r07.jar

    2. **并发工具**:Guava提供了线程安全的数据结构,如LoadingCache、Striped锁、CountDownLatch、CyclicBarrier等,以及Futures和ListenableFutures,帮助开发者更方便地处理异步计算。 3. **字符串处理**:Guava的...

    基于javaweb 的bootstrap table使用案例源码下载

    8. **扩展功能**:Bootstrap Table还有许多其他特性,如行选择、行编辑、导出数据等,开发者可以根据需要进行探索和实现。 通过这个案例,开发者不仅可以学习到如何使用Bootstrap Table和Ajax构建动态数据表格,还...

    guava相关资源

    8. **并发容器**:Guava提供了线程安全的容器,如LoadingCache、Striped等,适应多线程环境。 总之,Guava是一个强大的工具库,它的各种功能和优化手段可以帮助开发者编写出更加高效、健壮的Java应用。通过使用...

Global site tag (gtag.js) - Google Analytics