前言
Phaser是从JDK7开始提供的一个可重复使用的同步机制,它在功能上类似于CyclicBarrier和CountDownLatch,但它支持更灵活的使用场景。使用Phaser不仅能够替代CyclicBarrier和CountDownLatch,还能够做到它们做不到的功能。
通过前面的章节我们知道,CyclicBarrier用于一组线程相互等待到达公共的屏障,而CountDownLatch用于一个或一组线程等待另一个或另一组线程(其实也相当于有一个公共屏障),不同的是CyclicBarrier的屏障能够被重复使用,而CountDownLatch却不能重复利用,但是它们有一个共同特点,那就是它们的参与线程(个数)在一开始构造各自的实例对象时就已经被指定,在往后的执行过程中无法改变,而Phaser却提供了动态注册和撤销功能,能够在执行过程中的任意时刻,添加新的参与线程和撤销原有的参与线程。
从大体上说,Phaser与CyclicBarrier相似的地方很多,它们都存在可重复使用的公共屏障,并且在参与者都到达屏障点之后,可以执行屏障点的特殊任务,但Phaser更精细化,它分离出了“到达”与“等待”机制,它还支持结束整个同步Phaser,并且还能对外提供内部的执行状态监控。Phaser还可以被构造成树型分层结构,这种父子关系的树形结构能够有效降低同步竞争,增加吞吐量,因为通过这种父子分层结构,相当于将参与者进行了分组,更小的分组将拥有更少的同步竞争参与者,当然这也导致创建更多的Phaser实例所带来的更多消耗。
CyclicBarrier的内部维护着三个在Phaser中也存在的概念:总的参与者线程个数parties、还未达到屏障点的参与线程个数count、由一个布尔变量表示屏障是否有效的屏障对象Generation。在Phaser中,代表屏障的对象不再是简单的布尔值,而是一个取值范围为0~Integer.MAX_VALUE的int变量phase(以下被称为阶段数),不过这个phase变量与总的参与者个数parties、未到达屏障的参与者个数unarrived以及当前Phaser是否终止的标记全部由一个long型的64字节变量state包装表示,如下所示:
Phaser将所有状态都由一个变量保存的方式能够高效的保证操作的原子性, 还能缩短竞争的窗口期提高并发速度。从上图可以看出,低32位中的2个16位用于表示参与者个数,所以一个Phaser支持的参与者上限为65535。但是,你可以而且应该创建分层的树型Phaser结构来适应任意大量超过这个最大值的参与者。高32位中的低31位用于表示屏障的阶段数phase,即0~Integer.MAX_VALUE的范围,当达到最大值,将会再次从0开始。最高位的一个bit位标识该Phaser是否被终止,如果是1表示被终止。
源码解析
先来看Phaser的内部结构:
public class Phaser { /** * 主状态state,分为4部分: * * unarrived -- 还没有到达栅栏的参与者计数。 (bits 0-15) * parties -- 栅栏全部参与者的计数。 (bits 16-31) * phase -- 栅栏当前所处的阶段 (bits 32-62) * terminated -- 栅栏的结束标记 (bit 63 / sign) */ private volatile long state; //下面是一些辅助常量用于对state进行位运算得出以上四种状态的值 private static final int MAX_PARTIES = 0xffff; // 最大参与者个数parties 65535,低16位全1 private static final int MAX_PHASE = Integer.MAX_VALUE; //最大阶段数phase private static final int PARTIES_SHIFT = 16; //总参与者位移个数,16位 private static final int PHASE_SHIFT = 32; //阶段数位移个数,32位 private static final int UNARRIVED_MASK = 0xffff; // 未到达参与者掩码,低16位全1,高16位全0 private static final long PARTIES_MASK = 0xffff0000L; // 总参与者个数掩码,高16位全1,低16位全0 private static final long COUNTS_MASK = 0xffffffffL; // 掩码,低32位全1,高32位全0 private static final long TERMINATION_BIT = 1L << 63; //终止标记 // some special values 一些特殊值 private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; //一个参与者,1左移16位即2^16=十进制65536 private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; //按位或运算,十进制65537 private static final int EMPTY = 1; //注意,Empty并不是0而是1 //下面是利用辅助常量的位运算方法,获取不同域的值 //注意如果state为1时返回的是0,因为在没有注册任何参与者时,state的初始值为1,具体在构造方法中详解 private static int unarrivedOf(long s) {//获取未到达个数 int counts = (int)s; //long强转为int后为一个负数,溢出导致低32位不变,高32位都是1 //按位与低16位都是1的掩码,得到低16位表达的值,即未达到个数 return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); } //无符号右移16位,得到低32位中的高16位,即总参与者个数 private static int partiesOf(long s) { return (int)s >>> PARTIES_SHIFT; } //无符号右移32位,取高32位,表示是否终止的最高位也在里面,所以如果已经终止(最高位为1)得到的结果将是负数 private static int phaseOf(long s) { return (int)(s >>> PHASE_SHIFT); } //已经到达的个数,即总的参与者个数减去还未到达的个数。 private static int arrivedOf(long s) { int counts = (int)s; return (counts == EMPTY) ? 0 : (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); } //当前Phaser的父Phaser,如果没有父节点则为null private final Phaser parent; //当前Phaser的根节点,如果不是树型结构则指向this private final Phaser root; //Treiber Stack的无锁并发栈分别来保存阶段数为奇偶数时的等待线程节点。 private final AtomicReference<QNode> evenQ;//当阶段数phase为偶数时的等待线程节点 private final AtomicReference<QNode> oddQ;//当阶段数phase为奇数时的等待线程节点 //等待队列的节点QNode类 static final class QNode implements ForkJoinPool.ManagedBlocker { final Phaser phaser; final int phase; final boolean interruptible; //支持中断 final boolean timed; //支持超时 boolean wasInterrupted; long nanos; final long deadline; volatile Thread thread; //节点线程,取消等待之后为null QNode next; //指向下一个节点 QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos) { this.phaser = phaser; this.phase = phase; this.interruptible = interruptible; this.nanos = nanos; this.timed = timed; this.deadline = timed ? System.nanoTime() + nanos : 0L; thread = Thread.currentThread(); } //返回true表示可以释放等待阻塞,false表示需要继续阻塞等待 public boolean isReleasable() { if (thread == null) return true; if (phaser.getPhase() != phase) { thread = null; return true; } if (Thread.interrupted()) wasInterrupted = true; if (wasInterrupted && interruptible) { thread = null; return true; } if (timed) { if (nanos > 0L) { nanos = deadline - System.nanoTime(); } if (nanos <= 0L) { thread = null; return true; } } return false; } //阻塞方法,在需要阻塞时阻塞当前线程 public boolean block() { if (isReleasable()) return true; else if (!timed) LockSupport.park(this); else if (nanos > 0L) LockSupport.parkNanos(this, nanos); return isReleasable(); } } //其他省略 }
从以上的内部结构可以看出:
- Phaser包含一个被volatile修饰的主状态字段state,我们知道它被划分为了四个域,分别存储相应的信息,所以Phaser还提供了从主状态中获取这些域的静态辅助方法。另外,Phaser内部也是采用的CAS方式对state变量进行更新的,所以CAS+volatile再一次保证了并发过程中的原子性、可见性、有序性。
- Phaser内部维护了父Phaser和树形分层结构的根Phaser,如果没有父Phaser则为空,如果不是树形分层结构则根Phaser等于自身。
- Phaser采用了两个Treiber Stack结构(即一种基于CAS的无锁并发栈) 分别保存当阶段数phase为奇偶数时的等待参与者线程节点,这样奇偶分离的方式能够在一定程度上避免释放和添加线程节点的竞争。节点内部类QNode的结构很简单,主要保存了一些线程等待的相关信息,还有指向下一个QNode节点的域next,isReleasable方法中的逻辑比较简单:当QNode中的thread为null、或者和phaser的阶段值不相等、或者被中断、或者等待超时,方法都返回true即释放阻塞等待,而block方法就是一个阻塞的过程。另外,QNode实现了ForkJoinPool.ManagedBlocker接口,它的作用这里暂且不提,在我们后面分析ForkJoin框架的时候在研究。
构造方法
接下来,我们分析Phaser的构造方法:
public Phaser() { this(null, 0); } public Phaser(int parties) { this(null, parties); } public Phaser(Phaser parent) { this(parent, 0); } public Phaser(Phaser parent, int parties) { if (parties >>> PARTIES_SHIFT != 0) //无符号右移16位,即32位的高16位,parties不能超过65535 throw new IllegalArgumentException("Illegal number of parties"); int phase = 0; this.parent = parent; if (parent != null) { final Phaser root = parent.root; this.root = root; //如果父级不为空,共享父级的线程等待队列 this.evenQ = root.evenQ; this.oddQ = root.oddQ; if (parties != 0) //如果参与者不为0,注册1个参与者到父级 phase = parent.doRegister(1); } else { //如果父级为空,root就是自身 this.root = this; this.evenQ = new AtomicReference<QNode>(); this.oddQ = new AtomicReference<QNode>(); } this.state = (parties == 0) ? (long)EMPTY : ((long)phase << PHASE_SHIFT) | // 将pahse左移32位,即阶段数 ((long)parties << PARTIES_SHIFT) | //将参与者个数左移16位,即总的参与者个数 ((long)parties); //直接作为低16位,即未到达个数 }
所有的构造方法最终都是调用的同一个构造方法Phaser(Phaser parent, int parties)。通过构造方法可以知道如下几点:
- 构造Phaser的时候,参与者个数不能超过65535.
- 当有父级Phaser的时候,当前Phaser会共享父Phaser的线程等待队列.
- 当有父级Phaser的时候,并且参与者个数不为0时,会将当前Phaser作为一个参与者注册到父Phaser.同时阶段数phase也来至父级。所以子Phaser的阶段数和终止标记始终保持和父节点相同。
- 最后初始化当前Phaser的state成员时,如果参与者为0,那么state为1而不是0.
Register动态注册
通过构造方法我们知道在没有父级的存在下,Phaser非常简单,但存在父级的时候,需要将当前Phaser注册为父级的一个参与者,下面来看看doRegister(int)方法:
private int doRegister(int registrations) { // adjustment to state long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { long s = (parent == null) ? state : reconcileState(); //获取最新state int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; //得到高16位 int unarrived = counts & UNARRIVED_MASK; //得到低16位 //注册的参与者数量和已存在的参与者数量加起来不能超过最大参与者数量 if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int)(s >>> PHASE_SHIFT); //得到高32位 if (phase < 0) //当前Phaser已经终止了,退出循环 break; if (counts != EMPTY) { // 如果不是第一次注册 //如果有父节点,需再次确认根节点的state没有变化 if (parent == null || reconcileState() == s) { if (unarrived == 0) //如果本阶段所有线程都到达了,等待root完成advance任务之后进入下一阶段之后才再次尝试注册 root.internalAwaitAdvance(phase, null); //否则尝试CAS直接将新注册的个数同时加到总参与个数和未到达数量上去 else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) break; } } else if (parent == null) { //否则是第一次注册,但没有父节点 long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; //直接尝试CAS更新当前state } else { //否则是第一次注册到子节点Phaser synchronized (this) { // 锁定当前Phaser实例 if (state == s) { // 再次确认状态无变化 //由于是第一次注册子节点Phaser的参与者,需要将当前子节点注册到父节点,作为父节点的一个参与者 phase = parent.doRegister(1); if (phase < 0) //如果根Phaser已经终止了,也就没有必要继续注册了。 break; // finish registration whenever parent registration // succeeded, even when racing with termination, // since these are part of the same "transaction". //不停尝试更新当前state,直到成功为止,不论是否已经终止。 while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } break; } } } } return phase; //返回当前的阶段数 } //协调state,根据JavaDoc,这种情况发生在当根Phaser的阶段数phase已经递增,但是子Phaser的阶段数phase还没有,即根阶段数的滞后传播问题。 //协调的办法是: //若根Phaser已经终止了,则不变;若当前Phaser的参与者都被注销了(即总参与者为0),则高32位不变,低32位回到初始化Empty状态; //否则是其他情况的滞后,则state的阶段数更新为与根相同,总参与者个数不变,并且将所有参与者都标记为未到达状态。 //由于协调之后的state的高32位是来自root的state的高32位,所以子Phaser的阶段数和是否终止的标识都是和根是一致的。根终止了,所有的子Phaser也终止了。 private long reconcileState() { final Phaser root = this.root; long s = state; if (root != this) { int phase, p; // CAS to root phase with current parties, tripping unarrived //这里是while循环+CAS的方式,不断尝试更新当前Phaser的state,当CAS失败之后,重新拿当前Phaser的最新state进行操作, //直到CAS成功(成功的同时局部变量s也被更新为当前Phaser的最新state值),返回最终的state值。 while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && //根的阶段数与当前Phaser不匹配 !UNSAFE.compareAndSwapLong (this, stateOffset, s, s = (((long)phase << PHASE_SHIFT) | //这是根state的高32位; ((phase < 0) ? (s & COUNTS_MASK) : //根root已经终止,取当前Phaser的state低32位. (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : //根未终止,但总参与者个数为0,取Empty. ((s & PARTIES_MASK) | p)))))) //根未终止,总参与者个数不变,未达到个数重置为所有参与者均未达到。即抛弃了原有的unarrived值。 s = state; //CAS失败之后,更新局部变量s为当前Phaser的最新state值。 } return s; }
根据以上的注册参与者方法简单梳理逻辑如下:
- 首先如果是存在父节点的树形Phaser结构,需要根据root节点的Phaser协调当前Phaser的状态state,具体的协调方式在上面注释已然很清楚,值得注意的是,协调之后当前Phaser的state高32位会保证与root一致,也就是说,当前Phaser的阶段数和终止标识会与root节点一致,所以终止root节点其实也会终止它的子节点。
- 接下来会对当前Phaser的状态进行判断,主要就是参与者个数不能超过最大值65535,如果已经终止了,也就没必要进行注册了,直接返回。
- 如果不是第一次注册参与者,那么如果本阶段所有线程都已经到达,那么需要等待root完成阶段数的递增,进入下一个阶段才能继续尝试注册;否则可以直接尝试将注册的数量同时加到当前总参与者个数和未到达个数上去。
- 如果是第一次注册参与者但不是存在父节点的树形结构,那么可以直接尝试将注册的数量同时加到当前总参与者个数和未到达个数上去。
- 如果是第一次注册参与者并且是存在父节点的树形结构,那么在锁定当前Phaser实例之后,先要将当前Phaser作为一个参与者注册到它的父节点中,然后在将注册的数量同时加到当前总参与者个数和未到达个数上去。
- 返回值是state的高32位,即阶段数和是否终止的标识,所以如果已经终止了,返回值小于0. 否则返回值表示当前参与者被注册到的阶段数。
从上面的逻辑发现,当且仅当第一次向子节点Phaser注册参与者时,都会自动向其父节点注册一个参与者(如果存在父节点的话),这样就构成了一个类似二叉树的结构,即每一个子节点Phaser都作为一个参与者被注册到父节点。所以每一个节点能被注册的参与者个数=65535 - 子节点个数。
doRegister是动态注册的最底层实现,Phaser对外部提供的注册方法都是调用的该方法完成的,这些对外部提供的注册方法如下:
public int register() { return doRegister(1); //注册一个参与者 } public int bulkRegister(int parties) {//一次性注册多个参与者 if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) return getPhase(); return doRegister(parties); }
其实就是一次注册一个或多个的区别,只是批量注册方法里面多个对参数的校验,当为0时,不会出错而是返回阶段数。 注册和后文的撤销只会影响内部的参与者计数,他们不会在Phaser内部保留对注册对象的跟踪,所以参与者线程不能通过查询得知自身是否被注册。(但是,你可以通过继承该类在子类中实现这样的跟踪逻辑)。
AwaitAdvance阻塞等待
在上面的逻辑第3步,当发现当前阶段已经完成,那么注册需要阻塞等待直到root完成advance结束(就是阶段数增加),才能进行,下面继续分析internalAwaitAdvance方法。
private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; releaseWaiters(phase-1); // 释放无效等待的节点 boolean queued = false; // 节点入队标识 int lastUnarrived = 0; // 在发生变化时增加自旋次数 int spins = SPINS_PER_ARRIVAL; long s; int p; //循环操作,直到当前阶段数发生变化,与指定的阶段数不一致 while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { if (node == null) { //不可中断的自旋 //这里的代码是指当未到达参与者数量发生变化,且变化后的未到达数量小于CPU核数时,增加自旋次数 //其实就是为了在CPU资源可用时尽量通过自旋等待,从而避免线程状态在用户态和内核态之间切换带来的性能消耗。 int unarrived = (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; //自旋等待结束,或者当前线程被中断,就创建一个不可中断的节点 boolean interrupted = Thread.interrupted();//获取并清除线程的中断标记 if (interrupted || --spins < 0) { // need node to record intr node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } else if (node.isReleasable()) // 如果当前节点可以释放,退出 break; else if (!queued) { // 节点不可释放,且还没入队 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); //在队列为空,或头节点阶段数与指定的阶段数一致,并且当前阶段数无变化时,入队作为新的头节点 if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); } else { //node节点入队之后,阻塞等待,直到节点isReleasable或block返回true //这里就是为什么QNode类要实现ForkJoinPool.ManagedBlocker的原因,因为ForkJoinPool.managedBlock控制同步阻塞的内部会通过执行被QNode重写的那两个方法来判断是否需要退出阻塞 try { ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { node.wasInterrupted = true; } } } if (node != null) { //对于进入队列的node,重置一些属性 if (node.thread != null) node.thread = null; //释放线程 if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); //不可中断的节点发生了中断时,要补一个中断 //阶段数依旧没有发生变化,表明阻塞过程超时或者发生中断 if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // 释放清理超时或中断而不再等待当前阶段的节点 } releaseWaiters(phase);//释放另一个队列中无效等待的节点 return p; } //释放那些无效的等待队列节点(就是那些阶段数与当前阶段数不一致的节点),并唤醒节点对应的线程 private void releaseWaiters(int phase) { QNode q; // first element of queue Thread t; // its thread //参数phase仅仅用于取奇数或偶数队列 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { //如果节点对应的阶段数不等于当前阶段数,释放节点并转到下一个节点 if (head.compareAndSet(q, q.next) && (t = q.thread) != null) { q.thread = null;//释放节点线程 LockSupport.unpark(t); //通知线程结束等待 } } } //该方法是releaseWaiters的变体,释放那些因为超时或者中断而不再等待当前阶段的节点,目前只释放位于队列前端的以减小内存占用 private int abortWait(int phase) { AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; for (;;) { Thread t; QNode q = head.get(); int p = (int)(root.state >>> PHASE_SHIFT); if (q == null || ((t = q.thread) != null && q.phase == p)) return p; if (head.compareAndSet(q, q.next) && t != null) { q.thread = null; LockSupport.unpark(t); } } }
internalAwaitAdvance的逻辑有点复杂,因为整个Phaser中所有的阻塞等待系列方法都是调用的这个方法实现的,它的作用就是阻塞当前参与者直至:①阶段数增加,即进入下一个阶段,或②节点线程被释放(这种情况只是理论上的,但是应该不会发生),或③可中断节点等待超时或中断。同时它还会对所有Phaser共享的这两个奇偶队列中的节点进行清理并唤醒对应的线程,让它们被移除队列或者继续下一个阶段的执行。具体的逻辑过程就不再一一梳理,只针对doRegister中的情形进行分析。
在doRegister中,当所有参与者都到达之后,注册操作需要阻塞直到阶段数增长(当然还有一种情况就是在internalAwaitAdvance内部创建的不可中断节点线程被释放,即thread被赋值为null)。
关于Phaser中的队列(一奇一偶),上面对于节点入队的internalAwaitAdvance方法都是被root根Phaser实例调用的(不论是AwaitAdvance系列方法还是doRegister,以及后面的arriveAndAwaitAdvance方法),所以虽然所有的子Phaser都共享了根Phaser的两个队列,但其实那些入队的节点还是严格在根Phaser的队列上操作的。
internalAwaitAdvance方法是Phaser中同步操作AwaitAdvance系列方法的实现基础,该系列的其他方法都是在它的基础上实现的,该系列的其他方法是:
public int awaitAdvance(int phase) { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) //已经终止了,直接返回 return phase; if (p == phase) //指定的阶段数与当前阶段数相等,则进行阻塞等待直至阶段数增加 return root.internalAwaitAdvance(phase, null); return p; } public int awaitAdvanceInterruptibly(int phase) throws InterruptedException{ .......//省略相同的代码 if (p == phase) { QNode node = new QNode(this, phase, true, false, 0L); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); } } public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { .......//省略相同的代码 if (p == phase) { QNode node = new QNode(this, phase, true, true, nanos); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); else if (p == phase) throw new TimeoutException(); } }AwaitAdvance系列方法是用来阻塞当前线程直到参数指定的阶段数增长,它们的实现大同小异,不同的是增加了对中断和超时机制的支持。它们的实现中前面部分都是一样的,都需要对当前阶段数进行check,如果当前Phaser已经终止了或者当前阶段数与传入的阶段数参数不相等,则立即返回;否则阻塞当前线程直到阶段数增长,或者当前线程等待被中断或超时。这些方法正常返回的返回值是下一个阶段数,否则要么小于0,要么抛出中断或者超时异常。在抛出中断或超时异常之后。将不会改变Phaser的状态,所以通常如果需要都会在捕捉到这些异常之后通过执行forceTermination方法来强制终结整个Phaser。Phaser也可以被在ForkJoinPool中执行的任务使用,这样可以在当其他任务在一个阶段上阻塞时确保充足的并行性。
相关推荐
Java 5并发包(`java.util.concurrent`,简称`Concurrent`包)是Java平台中用于多线程编程的重要组成部分,它提供了丰富的并发工具类,极大地简化了在多线程环境下的编程工作。这个包的设计目标是提高并发性能,减少...
Java并发编程库(Java Util Concurrency,简称JUC)是Java平台中用于高效并发处理的重要工具包,包含在`java.util.concurrent`包下。JUC提供了丰富的并发原语,如线程池、同步器、并发容器等,极大地简化了多线程...
6. **Phaser**:Phaser是Java 7引入的新工具,类似于CyclicBarrier,但更灵活。它可以动态地添加和移除参与线程,并且在每个阶段可以执行不同的动作。 7. **BlockingQueue**:阻塞队列是一种特殊的队列,当队列为空...
4. **改进的并发工具**:Java并发包(java.util.concurrent)得到了扩展,添加了如CountDownLatch、CyclicBarrier和Phaser等同步工具类,使得多线程编程更易于管理和控制。 5. **XML处理增强**:JAXP(Java API for...
4. **并发工具类**(04章):介绍Java并发包(java.util.concurrent)中的各种工具类,如ExecutorService、Future、Semaphore、CountDownLatch、CyclicBarrier和ThreadPoolExecutor等,它们为并发编程提供了强大的...
2. **第二章:同步机制** - 这部分可能讨论了Java中的同步工具,如synchronized关键字、wait()和notify()方法,以及如何避免数据竞争和死锁问题。此外,还可能涉及线程安全的数据结构,如Vector和Collections....
3. 学习并发工具:熟悉Java并发包(java.util.concurrent)中的各种工具,如Semaphore用于控制并发访问的数量,CyclicBarrier和CountDownLatch用于协调多个线程间的同步,以及ExecutorService和ThreadPoolExecutor...
9. **并发工具类**:Java并发包(`java.util.concurrent`)提供了丰富的工具类,如`Atomic*`类(原子操作)、`CyclicBarrier`(回旋栅栏)、`Phaser`(屏障)等,这些工具可以帮助我们编写高效且线程安全的代码。...
Java并发包(java.util.concurrent)提供了更多的高级并发工具,例如CountDownLatch、CyclicBarrier、Semaphore、Phaser等,它们用于不同场景下的线程同步与协作。例如,CountDownLatch用于一个或多个线程等待其他...
- `Phaser`: Java并发包中的先进同步机制,允许多个线程分阶段协作。 9. **线程安全的集合** - `Collections.synchronizedXXX`方法:对普通集合进行同步包装,使其变为线程安全。 - `ConcurrentHashMap`: 并发...
Java并发包`java.util.concurrent`提供了许多高级同步工具,如`Semaphore`、`BlockingQueue`、`Condition`、`Atomic`类和`Phaser`等,用于实现更复杂的并发控制和线程协作。 #### 十一、Java线程:大总结 Java多...
Java并发包还提供了许多有用的工具类,如: - `Semaphore`:用来控制对有限资源的访问。 - `Exchanger`:允许两个线程交换对象。 - `Phaser`:类似于`CyclicBarrier`,但更加灵活,可以动态注册/注销参与者。 ### ...
10. **并发工具**:Java并发包(java.util.concurrent)得到进一步扩展,添加了如Phaser、CountDownLatch等新的同步工具类,帮助开发者编写多线程程序。 JDK 1.6的压缩包文件"jdk1.6.0_43"包含了该版本的完整安装...
3. **线程安全的数据结构**:Java并发包`java.util.concurrent`中提供了线程安全的数据结构,如`ConcurrentHashMap`、`CopyOnWriteArrayList`等,它们在并发场景下具有高效性能。 4. **并发工具类**:`...
在Java中,可以通过同步块或使用并发包中的原子类(如AtomicInteger、AtomicReference等)来保证操作的原子性。 2. 可见性(Visibility):可见性是指当一个线程修改了共享变量的值时,其他线程可以立即看到这个新...
5. **并发工具类**:Java并发包(java.util.concurrent)中包含许多工具类,如Semaphore(信号量)、CountDownLatch(倒计时锁)、CyclicBarrier(回环栅栏)和Exchanger(交换器),它们是设计复杂多线程同步场景的...
Java并发包提供了一些线程安全的数据结构,如`ConcurrentHashMap`, `CopyOnWriteArrayList`, `Atomic*`类等,它们内部已经实现了同步机制,简化了多线程编程。 10. **线程优先级与调度**: Java的`Thread`类提供...
Java多线程是Java编程中的...同时,面试中可能还会涉及到J.U.C(Java并发包)中的高级特性和最佳实践,例如CountDownLatch、CyclicBarrier、Semaphore等工具类的使用。熟悉并掌握这些内容,将有助于在面试中表现出色。
5. 并发包JUC(java.util.concurrent): - JUC是Java并发编程的核心工具包,包含各种并发工具类,如线程池(ExecutorService)、并发容器(ConcurrentHashMap、CopyOnWriteArrayList等)和并发实用类...