Jdk1.6 JUC源码解析(27)-Exchanger
作者:大飞
功能简介:
- Exchanger是一种线程间安全交换数据的机制。可以和之前分析过的SynchronousQueue对比一下:线程A通过SynchronousQueue将数据a交给线程B;线程A通过Exchanger和线程B交换数据,线程A把数据a交给线程B,同时线程B把数据b交给线程A。可见,SynchronousQueue是交给一个数据,Exchanger是交换两个数据。
源码分析:
- 先看下内部结构:
private static final class Node extends AtomicReference<Object> { /** 创建这个节点的线程提供的用于交换的数据。 */ public final Object item; /** 等待唤醒的线程 */ public volatile Thread waiter; /** * Creates node with given item and empty hole. * @param item the item */ public Node(Object item) { this.item = item; } } /** * 一个Slot就是一对线程交换数据的地方。 * 这里对Slot做了缓存行填充,能够避免伪共享问题。 * 虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。 */ private static final class Slot extends AtomicReference<Object> { // Improve likelihood of isolation on <= 64 byte cache lines long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; } /** * Slot数组,在需要时才进行初始化。 * 用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。 */ private volatile Slot[] arena = new Slot[CAPACITY]; /** * arena(Slot数组)的容量。设置这个值用来避免竞争。 */ private static final int CAPACITY = 32; /** * 正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后, * 这个值会递增;当一个线程自旋等待超时后,这个值会递减。 */ private final AtomicInteger max = new AtomicInteger();
内部结构很清晰,首先内部包含一个Slot数组,默认容量是32,用来避免以一些竞争,有点类似于ConcurrentHashMap的策略;其次,交换数据的场所就是Slot,它本身进行了cache line填充,避免了伪共享问题;最后,每个要进行数据交换的线程在内部会用一个Node来表示。
伪共享说明:假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpu cache line里面。并发情况下,如果一个线程修改了a,会导致整个cache line失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能。
- 看完了内部结构,接下来就从Exchanger的交换数据方法exchange入手来分析代码:
/** * 等待其他线程到达交换点,然后与其进行数据交换。 * * 如果其他线程到来,那么交换数据,返回。 * * 如果其他线程未到来,那么当前线程等待,知道如下情况发生: * 1.有其他线程来进行数据交换。 * 2.当前线程被中断。 */ public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) {//检测当前线程是否被中断。 //进行数据交换。 Object v = doExchange(x == null? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; //检测结果是否为null。 if (v != CANCEL) //检测是否被取消。 return (V)v; Thread.interrupted(); // 清除中断标记。 } throw new InterruptedException(); } /** * 等待其他线程到达交换点,然后与其进行数据交换。 * * 如果其他线程到来,那么交换数据,返回。 * * 如果其他线程未到来,那么当前线程等待,知道如下情况发生: * 1.有其他线程来进行数据交换。 * 2.当前线程被中断。 * 3.超时。 */ public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { if (!Thread.interrupted()) { Object v = doExchange(x == null? NULL_ITEM : x, true, unit.toNanos(timeout)); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; if (!Thread.interrupted()) throw new TimeoutException(); } throw new InterruptedException(); }
上面的方法都调用了doExchange方法,主要逻辑在这个方法里,分析下这个方法:
/** * 这个方法会处理不同的情况,使用Object而不是泛型,主要是为了返回一些 * 哨兵值(比如表示null和取消的对象)。 * * @param item 用来进行交换的数据。 * @param timed 如果有超时延迟,设置为true * @param nanos 具体的超时时间。 * @return 返回另一个线程(与当前线程交换数据)的数据,或者CANCEL(表示取消) */ private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // 创建当前节点me。 int index = hashIndex(); // 计算出当前slot的下标。 int fails = 0; // 用来保存CAS失败的次数。 for (;;) { Object y; // 用来保存当前slot中可能存在的Node。 Slot slot = arena[index]; // 按照前面计算出的下标获取当前slot。 if (slot == null) createSlot(index); // 如果slot为null,那么创建一个slot,然后继续循环。 else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { // 如果slot不为空,那么slot可能被另一个Node给占了,如果确实存在这个Node,尝试将其置空。(表示当前节点要和这个Node交换数据了) Node you = (Node)y; // 给这个Node转型,赋给you。 if (you.compareAndSet(null, item)) { // 将item设置给you,注意you本身是一个AtomicReference,这里相当于把item设置到you的value字段上。 LockSupport.unpark(you.waiter); // 然后唤醒you节点上等待的线程。 return you.item; // 返回you的item。 } // 竞争失败,放弃,继续循环。 } else if (y == null && // 如果slot为空,那么说明没有要和当前线程交换数据的线程, slot.compareAndSet(null, me)) { //那么当前线程先尝试把这个slot给占了。 if (index == 0) // 如果slot下标为0,那么阻塞等待。 return timed? awaitNanos(me, slot, nanos): await(me, slot); // 有超时的话,会阻塞给定的时间。 Object v = spinWait(me, slot); // 如果slot下标不是0,自旋等待,等待其他线程来和当前线程交换数据,然后返回交换后的数据。 if (v != CANCEL) return v; me = new Node(item); // 如果取消的话,重试,重建一个Node,之前的Node就丢弃了。 int m = max.get(); // 获取当前slot下标的最大值。 if (m > (index >>>= 1)) // 如果当前允许的最大索引太大。 max.compareAndSet(m, m - 1); // 递减最大索引 } else if (++fails > 1) { // 如果1个slot竞争失败超过2次。 int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) //如果竞争失败超过3次,尝试递增最大索引值。 index = m + 1; // 增加索引值。 else if (--index < 0) // 换个index。 index = m; // 绕回逻辑,防止index越界。 } } }
这里形象的理解一下:
其实就是"我"和"你"(可能有多个"我",多个"你")在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:
1.我到交易地点(Slot)的时候,你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我只能再找别人了,进入第5步。
2.我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
3.我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上...),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
4.你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
5.如果之前我尝试交易了2次都没成功,那我就想我TM选的这个位置(Slot下标)是不是风水不好啊,换个地儿继续(从头开始);如果之前都尝试交易了4次还没成功,我怒了,喊过来交易地点的管理员:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!
看一下doExchange调用的计算slot下标的方法:
/** * Returns a hash index for the current thread. Uses a one-step * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/) * based on the current thread's Thread.getId(). These hash codes * have more uniform distribution properties with respect to small * moduli (here 1-31) than do other simple hashing functions. * * <p>To return an index between 0 and max, we use a cheap * approximation to a mod operation, that also corrects for bias * due to non-power-of-2 remaindering (see {@link * java.util.Random#nextInt}). Bits of the hashcode are masked * with "nbits", the ceiling power of two of table size (looked up * in a table packed into three ints). If too large, this is * retried after rotating the hash by nbits bits, while forcing new * top bit to 0, which guarantees eventual termination (although * with a non-random-bias). This requires an average of less than * 2 tries for all table sizes, and has a maximum 2% difference * from perfectly uniform slot probabilities when applied to all * possible hash codes for sizes less than 32. * * @return a per-thread-random index, 0 <= index < max */ private final int hashIndex() { long id = Thread.currentThread().getId(); int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193; int m = max.get(); int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1)) ((0x000001f8 >>> m) & 2) | // The constants hold ((0xffff00f2 >>> m) & 1)); // a lookup table int index; while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m return index; }
这里就是根据当前线程的ID,算一个hash值,然后针对slot最大index值做了一个近似取模的操作来计算slot的下标。
接下来看一下createSlot方法:
private void createSlot(int index) { // 在同步块外面创建Slot实例,以减小同步块范围。 Slot newSlot = new Slot(); Slot[] a = arena; synchronized (a) { if (a[index] == null) a[index] = newSlot; } }
/** * 在下标为0的Slot上等待获取其他线程填充的值。 * 如果在Slot被填充之前超时或者被中断,那么操作失败。 */ private Object awaitNanos(Node node, Slot slot, long nanos) { int spins = TIMED_SPINS; long lastTime = 0; Thread w = null; for (;;) { Object v = node.get(); if (v != null) //如果已经被其他线程填充了值,那么返回这个值。 return v; long now = System.nanoTime(); if (w == null) w = Thread.currentThread(); else nanos -= now - lastTime; lastTime = now; if (nanos > 0) { if (spins > 0) --spins; //先自旋几次。 else if (node.waiter == null) node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。 else if (w.isInterrupted()) tryCancel(node, slot); //如果当前线程被中断,尝试取消node。 else LockSupport.parkNanos(node, nanos); //阻塞给定的时间。 } else if (tryCancel(node, slot) && !w.isInterrupted()) //超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点 return scanOnTimeout(node); } }awaitNanos中的自旋次数为TIMED_SPINS,这里说明一下自旋次数:
/** * 单核处理器下这个自旋次数为0 * 多核情况下,这个值设置为大多数系统中上下文切换时间的平均值。 */ private static final int SPINS = (NCPU == 1) ? 0 : 2000; /** * 在有超时情况下阻塞等待之前自旋的次数。. * 超时等待的自旋次数之所以更少,是因为检测时间也需要耗费时间。 * 这里的值是一个经验值。 */ private static final int TIMED_SPINS = SPINS / 20;
private static boolean tryCancel(Node node, Slot slot) { if (!node.compareAndSet(null, CANCEL))//尝试取消node return false; if (slot.get() == node) // pre-check to minimize contention slot.compareAndSet(node, null); //如果还关联在sot上,断开关联。 return true; }
继续看awaitNanos方法中最后调用的scanOnTimeout方法,这个方法在要取消的时候调用,找一下其他下标的Slot上有没有可以交换数据的节点,找到的话就可以成功交换数据,而不取消了:
private Object scanOnTimeout(Node node) { Object y; for (int j = arena.length - 1; j >= 0; --j) { //从Slot数组的后面往前找 Slot slot = arena[j]; if (slot != null) { //找到了有初始化好的Slot,然后看看里面有没有node。 while ((y = slot.get()) != null) { //发现有node,尝试和这个node进行数据交换。 if (slot.compareAndSet(y, null)) { Node you = (Node)y; //尝试进行数据交换, if (you.compareAndSet(null, node.item)) { //如果交换成功(把当前节点的数据交给you),唤醒you上面等待的线程。 LockSupport.unpark(you.waiter); //返回you的数据。 return you.item; } } } } } //没找到其他等待交换数据的线程,最后取消当前节点node。 return CANCEL; }
上面看的awaitNanos方法是在下标为0的Slot里面,有超时情况下的处理方式。再看下没有超时情况的处理方法await:
private static Object await(Node node, Slot slot) { Thread w = Thread.currentThread(); int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) //如果已经被其他线程填充了值,那么返回这个值。 return v; else if (spins > 0) // 先自旋几次。 --spins; else if (node.waiter == null) // 自旋阶段完毕后,将当前线程设置到node的waiter域。 node.waiter = w; else if (w.isInterrupted()) // 如果当前线程被中断,尝试取消当前node。 tryCancel(node, slot); else // 否则阻塞当前线程。 LockSupport.park(node); } }
之前看的awaitNanos和await方法都是在下标为0的Slot的情况下采取的有阻塞行为的处理方式,如果下标不为0,采取完全自旋的方式,调用方法spinWait:
private static Object spinWait(Node node, Slot slot) { int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) --spins; //先自旋 else tryCancel(node, slot); //自旋了指定的次数还没等到交换的数据,尝试取消。 } }
最后看一下arena(Slot数组),默认的容量和实际使用的下标最大值:
private static final int CAPACITY = 32; /** * The value of "max" that will hold all threads without * contention. When this value is less than CAPACITY, some * otherwise wasted expansion can be avoided. */ private static final int FULL = Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);前面说过arena容量默认为32,目的是为了减少线程的竞争,但实际上对arena的使用不会超过FULL这个值(避免一些空间浪费)。这个值取的是32(默认CAPACITY)和CPU核心数量的一半,这两个数的较小值在减1的数和0的较大值.... 也就是说,如果CPU核很多的情况下,这个值最大也就是31,;如果是单核或者双核CPU,这个值就是0,也就是说只能用arena[0]。这也是为什么前面的hashIndex方法里面会做的(近似)取模操作比较复杂,因为实际的能使用的Slot数组范围可能不是2的幂。
Exchanger的代码解析完毕!
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...
标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...
1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...
logback-cfca-jdk1.6-3.1.0.0.jar
- 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...
java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64
Java编程开发工具包,最新版本,很好用,经典
《深入解析JDK 1.6:构建高效Java开发环境》 JDK(Java Development Kit)是Oracle公司发布的用于开发Java应用程序的重要工具集,它为Java开发者提供了编译、调试和运行Java程序所需的环境。JDK 1.6,又称为Java SE...
Linux64位环境下的jdk6安装包:jdk-6u45-linux-x64.bin。 由于积分无法修改,现提供网盘下载地址: https://pan.baidu.com/s/1BE55ImTxZTQO6T22051P2g 提取码:5wvm