- 浏览: 980988 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
AtomicInteger解析:http://donald-draper.iteye.com/blog/2359555
锁持有者管理器AbstractOwnableSynchronizer:http://donald-draper.iteye.com/blog/2360109
AQS线程挂起辅助类LockSupport:http://donald-draper.iteye.com/blog/2360206
AQS作为高性能锁的基础,想要理解锁机制,我们需要深入地去剖析AbstractQueuedSynchronizer。今天我们不打算,将独占锁和共享锁的获取与释放,条件等待和唤醒,我们从源码帮助文档,简单看一下AQS是个什么东西,后面的文章我们详细说独占锁和共享锁的获取与释放,条件等待和唤醒。
总结:
从阅读源码帮助文档可看出,AQS使用CAS原始,修改锁的状态state;
等待锁的线程被放入到等待队列(CLH队列)中,每个线程等待状态用NODE来描述。
NODE有共享模式和独占模式,独占模式为NULL。NODE有CANCELLED,SIGNAL,SIGNAL,PROPAGATE
4中状态值。
SIGNAL:节点的后继由于park等原因被阻塞,当节点释放锁或取消时,要
unpark后继节点。为了避免竞争,acquire方法必须,首先检查他们是否
需要唤醒后继节点,再原子获取锁,获成功,失败,阻塞。
简单说,节点释放锁,是否需要唤醒后继节点
CANCELLED:节点有等待锁超时或者中断等原因,被取消,节点不会停留在这个状态。
如果一个线程被取消,线程就不会再被阻塞。
简单说,单节点处于这个状态,将被移除到等待队列
CONDITION: 处于这个状态的节点线程,放在条件队列中。它永远不会被
用作一个同步队列节点,直到等待的条件发生,节点将被转移到同步队列中。
(这个状态与其他状态,没有关联,只是一种简化的机制)。
PROPAGATE: 处于此模式下,释放共享锁具有传递性。头节点调用
doReleaseShared方法,保证传递释放共享锁,即使有其他的操作干涉。
这个时共享模式下的状态。
CLH队列由于虚头节点,队列中线程等待节点有一个前驱和一个后继节点,NODE有一个状态
waitStatus,描述线程的当前状态,有一个线程field用于表示当前等待线程,同时还有
nextWaiter节点,用于描述,节点时候有等待条件,或共享模式,获取锁时,需要通知其他线程。
Node nextWaiter:节点下一个等待条件或共享锁的节点。当线程持有独占锁时,只需要
访问条件队列,所以我们只需要一个简单的连接队列,存储等待条件的线程。
当他们转移到主队列时,可以重新获取锁。由于条件可以是互斥的,
所以我们用,特殊的值,去表示共享模式。
AQS有一个状态state表示锁的状态,一个CLH队列存放等待锁的线程节点。NODE还可以用于描述节点的等待条件节点线程,用nextWaiter去关联,组成的队列是条件队列。条件队列和等待队列并不冲突,当等待条件的线程被唤醒时,可以尝试获取锁,加入到等待对列。当一个等待队列节点线程获取独占锁时,可以访问条件队列,唤醒等待条件的线程。AQS还有一个ConditionObject我们,下一篇文章再讲。
锁持有者管理器AbstractOwnableSynchronizer:http://donald-draper.iteye.com/blog/2360109
AQS线程挂起辅助类LockSupport:http://donald-draper.iteye.com/blog/2360206
AQS作为高性能锁的基础,想要理解锁机制,我们需要深入地去剖析AbstractQueuedSynchronizer。今天我们不打算,将独占锁和共享锁的获取与释放,条件等待和唤醒,我们从源码帮助文档,简单看一下AQS是个什么东西,后面的文章我们详细说独占锁和共享锁的获取与释放,条件等待和唤醒。
/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent.locks; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import sun.misc.Unsafe; /** * Provides a framework for implementing blocking locks and related * synchronizers (semaphores, events, etc) that rely on * first-in-first-out (FIFO) wait queues. This class is designed to * be a useful basis for most kinds of synchronizers that rely on a * single atomic <tt>int</tt> value to represent state. Subclasses * must define the protected methods that change this state, and which * define what that state means in terms of this object being acquired * or released. Given these, the other methods in this class carry * out all queuing and blocking mechanics. Subclasses can maintain * other state fields, but only the atomically updated <tt>int</tt> * value manipulated using methods {@link #getState}, {@link * #setState} and {@link #compareAndSetState} is tracked with respect * to synchronization. * AbstractQueuedSynchronizer主要是依赖于FIFO等待队列,提供阻塞锁,和相关同步(信号量,事件等) 框架。AQS用一个原子的int值表示状态state,是多种同步器或锁的基础。 子类必须定义一个protected方法,用于改变状态state,此状态state以为着, 一个对象(锁)是否获取或释放。其他的方法,用于队列操作和阻塞机制。 子类可以通过setState和getState,#compareAndSetState方法,以原子,获取,改变state值。 * <p>Subclasses should be defined as non-public internal helper * classes that are used to implement the synchronization properties * of their enclosing class. Class * <tt>AbstractQueuedSynchronizer</tt> does not implement any * synchronization interface. Instead it defines methods such as * {@link #acquireInterruptibly} that can be invoked as * appropriate by concrete locks and related synchronizers to * implement their public methods. * 子类必须提供非公开的内部类,用于实现同步器相关功能,操作相关属性。 AQS不实现任何同步接口。AQS提供了具体的锁和相关同步器可以在其public方法, invoked的方法,比如acquireInterruptibly * <p>This class supports either or both a default [i]exclusive[/i] * mode and a [i]shared[/i] mode. When acquired in exclusive mode, * attempted acquires by other threads cannot succeed. Shared mode * acquires by multiple threads may (but need not) succeed. This class * does not "understand" these differences except in the * mechanical sense that when a shared mode acquire succeeds, the next * waiting thread (if one exists) must also determine whether it can * acquire as well. Threads waiting in the different modes share the * same FIFO queue. Usually, implementation subclasses support only * one of these modes, but both can come into play for example in a * {@link ReadWriteLock}. Subclasses that support only exclusive or * only shared modes need not define the methods supporting the unused mode. * AQS支持独占锁和共享锁模式。当一个线程持有独占锁,在其没释放之前,其他线程 尝试获取锁,则不能成功获取。共享锁可以被多个线程所持有。最大的不同是, 共享锁模式下,当一个线程成功获取锁,下一个等待线程必须确定其是否可以获取 锁。不同模式锁下等待的线程,在同一个FIFO队列中。子类可以实现两种模式中的 一种,也可以都实现,比如读写锁ReadWriteLock。子类如果只提供一种模式的锁, 不必实现另一种模式下的方法。 * <p>This class defines a nested {@link ConditionObject} class that * can be used as a {@link Condition} implementation by subclasses * supporting exclusive mode for which method {@link * #isHeldExclusively} reports whether synchronization is exclusively * held with respect to the current thread, method {@link #release} * invoked with the current {@link #getState} value fully releases * this object, and {@link #acquire}, given this saved state value, * eventually restores this object to its previous acquired state. No * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a * condition, so if this constraint cannot be met, do not use it. The * behavior of {@link ConditionObject} depends of course on the * semantics of its synchronizer implementation. * AQS定义一个内部类ConditionObject,用于子类提供Condition的实现。实现独占锁模式的子类, isHeldExclusively方法可以用于判断当前线程是否持有锁, method {@link #release} * invoked with the current {@link #getState} value fully releases * this object, and {@link #acquire}, given this saved state value, * eventually restores this object to its previous acquired state. 上面这一段,暂时先放在这里,不能很好的翻译这段 release方法用于完全释放锁。 No * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a * condition, so if this constraint cannot be met, do not use it. ConditionObject的行为依赖于同步器具体的实现语义的过程。 * <p>This class provides inspection, instrumentation, and monitoring * methods for the internal queue, as well as similar methods for * condition objects. These can be exported as desired into classes * using an <tt>AbstractQueuedSynchronizer</tt> for their * synchronization mechanics. * 此类提供一些方法用于监视,操作内部队列和条件对象。如果锁或同步器,使用AQS作为 它的同步机制,这些方法需要暴露出去,或者说作为一个接口。 * <p>Serialization of this class stores only the underlying atomic * integer maintaining state, so deserialized objects have empty * thread queues. Typical subclasses requiring serializability will * define a <tt>readObject</tt> method that restores this to a known * initial state upon deserialization. * 序列化AQS,只序列胡State的原子整数值,反序列化,只有线程等待队列为空。 需要序列化的子类必须定义一个readObject方法,用于恢复锁状态到一个先前的状态。 * <h3>Usage</h3> * * <p>To use this class as the basis of a synchronizer, redefine the * following methods, as applicable, by inspecting and/or modifying * the synchronization state using {@link #getState}, {@link * #setState} and/or {@link #compareAndSetState}: * 用AQS作为基本的同步器,需要从新定义一下方法。用#setState,#getState,#compareAndSetState 方法修改,监视,获取同步状态 * [list] * <li> {@link #tryAcquire}//尝试获取独占锁 * <li> {@link #tryRelease}//尝试释放独占锁 * <li> {@link #tryAcquireShared}//尝试获取共享锁 * <li> {@link #tryReleaseShared}//尝试释放共享锁 * <li> {@link #isHeldExclusively}//是否持有独占锁 *[/list] * * Each of these methods by default throws {@link * UnsupportedOperationException}. Implementations of these methods * must be internally thread-safe, and should in general be short and * not block. Defining these methods is the [i]only[/i] supported * means of using this class. All other methods are declared * <tt>final</tt> because they cannot be independently varied. 上述一些方法,默认抛出UnsupportedOperationException异常。 实现这些方法必须是内部线程安全的,同时这个过程,在时间上,应尽量的短,同时无阻塞。 定义这些方法意味着,锁或同步器的实现,是基于AQS。其他方法被定义为Final,以防被子类修改。 * * <p>You may also find the inherited methods from {@link * AbstractOwnableSynchronizer} useful to keep track of the thread * owning an exclusive synchronizer. You are encouraged to use them * -- this enables monitoring and diagnostic tools to assist users in * determining which threads hold locks. * 你可以用从AbstractOwnableSynchronizer继承的方法,获取独占锁的持有线程。我们鼓励 用AbstractOwnableSynchronizer继承的方法,去确定那个线程持有锁。 * <p>Even though this class is based on an internal FIFO queue, it * does not automatically enforce FIFO acquisition policies. The core * of exclusive synchronization takes the form: *尽管AQS的内部的队列是FIFO,但AQS不能保证FIFO的准确性,独占锁的核心同步如下 * <pre> * Acquire: * while (!tryAcquire(arg)) { * [i]enqueue thread if it is not already queued[/i]; * [i]possibly block current thread[/i]; * } * 自旋尝试获取锁,获取失败,则查看线程是否在等待队列中,如果没有, 则入队列,同时可能阻塞当前线程 * Release: * if (tryRelease(arg)) * [i]unblock the first queued thread[/i]; * </pre> * 如果释放锁成功,则唤醒队列头部的线程,持有锁 * (Shared mode is similar but may involve cascading signals.) * 共享模式下的锁,获取锁和释放锁类型,但是为引起级联效应 * <p><a name="barging">Because checks in acquire are invoked before * enqueuing, a newly acquiring thread may [i]barge[/i] ahead of * others that are blocked and queued. However, you can, if desired, * define <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to * disable barging by internally invoking one or more of the inspection * methods, thereby providing a [i]fair[/i] FIFO acquisition order. * In particular, most fair synchronizers can define <tt>tryAcquire</tt> * to return <tt>false</tt> if {@link #hasQueuedPredecessors} (a method * specifically designed to be used by fair synchronizers) returns * <tt>true</tt>. Other variations are possible. * 在进入队列,进行获取锁检查时,一个新的获取锁线程也许会优先于,阻塞在队列中的 线程获取锁。如果不想出现这种情况,可以选择通过tryAcquire和tryAcquireShared方法, 通过一个或多个诊断方法,屏蔽这种情况,鉴于这种情况,提供一个精度较高的 FIFO队列。在一些特殊情况下,大部分同步器的#hasQueuedPredecessors方法(专为公平锁设计的方法) 返回true时,可以定义tryAcquire方法返回false,来保证公平性。 * <p>Throughput and scalability are generally highest for the * default barging (also known as [i]greedy[/i], * [i]renouncement[/i], and [i]convoy-avoidance[/i]) strategy. * While this is not guaranteed to be fair or starvation-free, earlier * queued threads are allowed to recontend before later queued * threads, and each recontention has an unbiased chance to succeed * against incoming threads. Also, while acquires do not * "spin" in the usual sense, they may perform multiple * invocations of <tt>tryAcquire</tt> interspersed with other * computations before blocking. This gives most of the benefits of * spins when exclusive synchronization is only briefly held, without * most of the liabilities when it isn't. If so desired, you can * augment this by preceding calls to acquire methods with * "fast-path" checks, possibly prechecking {@link #hasContended} * and/or {@link #hasQueuedThreads} to only do so if the synchronizer * is likely not to be contended. * 非公平锁(贪婪模式或闯入者模式)拥有一个相对较高的稳定性和吞吐量,强烈建议 使用这种模式的锁,这种模式不能保证公平性或饥渴度平衡树,先入队列的线程,允许在 后进入队列的线程之前竞争锁,每个竞争者与刚进来的线程拥有公平的机会,成功竞争锁。 一般情况下,在线程阻塞前,竞争者会自旋,多次执行tryAcquire方法,尽最大可能获取锁。 自旋对于锁的持有者有利,而对于获取者没有任务坏处。如果竞争者不可能获取锁,但同时 有强烈的愿望持有锁,则可以在acquire方法前,通过#hasContended或hasQueuedThreads方法, 检查或预检查锁。 * <p>This class provides an efficient and scalable basis for * synchronization in part by specializing its range of use to * synchronizers that can rely on <tt>int</tt> state, acquire, and * release parameters, and an internal FIFO wait queue. When this does * not suffice, you can build synchronizers from a lower level using * {@link java.util.concurrent.atomic atomic} classes, your own custom * {@link java.util.Queue} classes, and {@link LockSupport} blocking * support. * AQS利用获取和释放原子的int state,内部FIFO等待队列为同步器提供有效和稳定的基础。 当AQS性能不好时,可以利用Atomic和自己实现的Queue,和LockSupport,实现自己的锁机制。 * <h3>Usage Examples</h3> * * <p>Here is a non-reentrant mutual exclusion lock class that uses * the value zero to represent the unlocked state, and one to * represent the locked state. While a non-reentrant lock * does not strictly require recording of the current owner * thread, this class does so anyway to make usage easier to monitor. * It also supports conditions and exposes * one of the instrumentation methods: * 这里是一个非重入互斥锁的实现,用0表示锁打开状态,1表示锁住状态。 非重入互斥锁不需要记录当前锁的持有者,所以用了简单的监视方法实现 isHeldExclusively。Mutex支持Condition,暴露了一些使用方法。 * <pre> 非重入互斥锁 * class Mutex implements Lock, java.io.Serializable { * * // Our internal helper class,内部锁helper * private static class Sync extends AbstractQueuedSynchronizer { * // Report whether in locked state,监控锁状态 * protected boolean isHeldExclusively() { * return getState() == 1; * } * * // Acquire the lock if state is zero * public boolean tryAcquire(int acquires) { //断言acquires为1,当开启断言检查时(VM -ea),acquires不为1,则中断程序 * assert acquires == 1; // Otherwise unused * if (compareAndSetState(0, 1)) { //CAS操作获取锁,如果成功,则设置锁持有者为当前线程 * setExclusiveOwnerThread(Thread.currentThread()); //返回true获取成功 * return true; * } * return false; * } * * // Release the lock by setting state to zero * protected boolean tryRelease(int releases) { //断言releases为1,当开启断言检查时,releases不为1,则中断程序 * assert releases == 1; // Otherwise unused //如果锁为打开状态,抛出非法状态监控异常 * if (getState() == 0) throw new IllegalMonitorStateException(); //设置锁持有者为null,即锁无持有者 * setExclusiveOwnerThread(null); //设置锁为打开状态 * setState(0); //释放成功 * return true; * } * * // Provide a Condition,创建条件 * Condition newCondition() { return new ConditionObject(); } * * // Deserialize properly,反序列化方法 * private void readObject(ObjectInputStream s) * throws IOException, ClassNotFoundException { //调用默认的反序列化 * s.defaultReadObject(); //设置锁为打开状态 * setState(0); // reset to unlocked state * } * } * //同步器sync,做了所有的关键工作,我们只需要利用它实现锁机制 * // The sync object does all the hard work. We just forward to it. * private final Sync sync = new Sync(); * * public void lock() { sync.acquire(1); } //获取锁 * public boolean tryLock() { return sync.tryAcquire(1); }//尝试获取锁 * public void unlock() { sync.release(1); }//释放锁 * public Condition newCondition() { return sync.newCondition(); }//创建条件 * public boolean isLocked() { return sync.isHeldExclusively(); }//是否锁住 * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }//锁是否有等待队列 //以可中断的方式获取锁 * public void lockInterruptibly() throws InterruptedException { * sync.acquireInterruptibly(1);// * } //等待超时时间,再尝试获取锁 * public boolean tryLock(long timeout, TimeUnit unit) * throws InterruptedException { * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); * } * } * </pre> * * <p>Here is a latch class that is like a {@link CountDownLatch} * except that it only requires a single <tt>signal</tt> to * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt> * acquire and release methods. *BooleanLatch是单个signal的闭锁,就像CountDownLatch一样。因为闭锁是非独占锁 ,所以它用acquire和release的共享版本,来获取与释放锁。 * <pre> * class BooleanLatch { * //内部同步器 * private static class Sync extends AbstractQueuedSynchronizer { //当锁状态不为零,代表锁处理打开状态,等待锁打开的线程,此时被唤醒 * boolean isSignalled() { return getState() != 0; } * //获取共享信号锁,锁打开,则获取锁成功。 * protected int tryAcquireShared(int ignore) { * return isSignalled() ? 1 : -1; * } * //释放共享锁,即打开锁 * protected boolean tryReleaseShared(int ignore) { * setState(1); * return true; * } * } * * private final Sync sync = new Sync(); * public boolean isSignalled() { return sync.isSignalled(); }//锁是否打开 * public void signal() { sync.releaseShared(1); }//已共享模式,打开锁 //已共享可中断方式,等待锁打开信号 * public void await() throws InterruptedException { * sync.acquireSharedInterruptibly(1); * } * } * </pre> * * @since 1.5 * @author Doug Lea */ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; /** * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance * with initial synchronization state of zero. */ //创建一个实例,初始化化状态为0,及闭锁状态 protected AbstractQueuedSynchronizer() { } /** * Wait queue node class. *等待队列节点 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. We instead use them for blocking synchronizers, but * use the same basic tactic of holding some of the control * information about a thread in the predecessor of its node. A * "status" field in each node keeps track of whether a thread * should block. A node is signalled when its predecessor * releases. Each node of the queue otherwise serves as a * specific-notification-style monitor holding a single waiting * thread. The status field does NOT control whether threads are * granted locks etc though. A thread may try to acquire if it is * first in the queue. But being first does not guarantee success; * it only gives the right to contend. So the currently released * contender thread may need to rewait. 线程等待队列是CLH 锁队列的一个变种。CLH锁一般用于自旋锁场景。 我们用它阻塞同步器,及一些基本的策略用于描述线程的前驱线程节点。 每个几点的状态status属性,用于描述一个线程是否应该被阻塞。当线程 节点的前驱节点释放锁时,将会唤醒其后继线程节点。队列中的每个线程 节点,描述的是等待线程的状态。节点的status field不能控制节点线程 ,是否可以持有锁。队列的头结点线程,会尝试着获取锁。头节点线程 ,虽然是第一个尝试获取锁的,但是不能保证能够成功获取锁,而是合适的 竞争者。所以当竞争线程释放锁时,想要重新获取锁,必须重新等待。 * * <p>To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre> * 对于CLH队列,当进入队列时,只需要,新建一个尾节点,挂入队列即可; 当出队列时,只需要设置队列的头节点,即可。 * <p>Insertion into a CLH queue requires only a single atomic * operation on "tail", so there is a simple atomic point of * demarcation from unqueued to queued. Similarly, dequeing * involves only updating the "head". However, it takes a bit * more work for nodes to determine who their successors are, * in part to deal with possible cancellation due to timeouts * and interrupts. * 每次进入CLH队列时,需要对尾节点进入队列过程,是一个原子性操作。 在出队列时,我们只需要更新head节点即可。在节点确定它的后继节点时, 需要花一些功夫,用于处理那些,由于等待超时时间结束或中断等原因, 而取消等待锁的线程。 * <p>The "prev" links (not used in original CLH locks), are mainly * needed to handle cancellation. If a node is cancelled, its * successor is (normally) relinked to a non-cancelled * predecessor. For explanation of similar mechanics in the case * of spin locks, see the papers by Scott and Scherer at * http://www.cs.rochester.edu/u/scott/synchronization/ *节点的前驱指针,主要用于处理,取消等待锁的线程。如果一个节点 取消等待锁,则此节点的前驱节点的后继指针,要指向,此节点后继节点中, 非取消等待锁的线程(有效等待锁的线程节点)。自旋锁的相同机制, 可以看Scott and Scherer的论文。 * <p>We also use "next" links to implement blocking mechanics. * The thread id for each node is kept in its own node, so a * predecessor signals the next node to wake up by traversing * next link to determine which thread it is. Determination of * successor must avoid races with newly queued nodes to set * the "next" fields of their predecessors. This is solved * when necessary by checking backwards from the atomically * updated "tail" when a node's successor appears to be null. * (Or, said differently, the next-links are an optimization * so that we don't usually need a backward scan.) * 我们用next指针连接实现阻塞机制。每个节点线程,控制着它自己的节点, 节点通过节点的后继连接唤醒其后继节点。为了避免节点的后继节点与 刚要进队列的线程竞争,通常把刚进的线程节点作为它后继,把节点的后继, 设为刚进来线程节点的后继。上面说的这一段,是非公平可重入锁的特性,为了 提高性能和吞吐量,这个我们后面的文章会说。上述的处理手段,节点了更新 尾节点时,尾节点的后继为null的问题。可以说时next连接的一种优化, 不必要再往后检查节点。 * <p>Cancellation introduces some conservatism to the basic * algorithms. Since we must poll for cancellation of other * nodes, we can miss noticing whether a cancelled node is * ahead or behind us. This is dealt with by always unparking * successors upon cancellation, allowing them to stabilize on * a new predecessor, unless we can identify an uncancelled * predecessor who will carry this responsibility. * 线程的取消,引入了一些保守的基本算法。由于我们必须poll其他节点 的cancellation,而忽略了节点是否是头结点或为节点后继。除非我们能确定 一个非取消前驱节点能够负责这些工作,否则Cancellation机制,总是unpark 后继节点,并需要他们有一个新的前驱。 * <p>CLH queues need a dummy header node to get started. But * we don't create them on construction, because it would be wasted * effort if there is never contention. Instead, the node * is constructed and head and tail pointers are set upon first * contention. * CLH队列需要一个头结点作为开始节点,头结点非实际线程节点。 我们不会再构造函数中,创建它,因为如果没有线程竞争锁,那么, 努力就白费了。取而代之额方案是,当有第一个竞争者时,我们才 构造头指针和尾指针。 * <p>Threads waiting on Conditions use the same nodes, but * use an additional link. Conditions only need to link nodes * in simple (non-concurrent) linked queues because they are * only accessed when exclusively held. Upon await, a node is * inserted into a condition queue. Upon signal, the node is * transferred to the main queue. A special value of status * field is used to mark which queue a node is on. * 线程以那个同一节点等待条件,但是用另外一个连接。条件只需要放在一个 非并发的连接队列与节点关联,因为只有当线程独占持有锁的时候,才会去访问条件。 当一个线程等待条件的时候,节点将会出入到条件队列中。当条件触发时, 节点将会转移到主队列中。有一个状态值,用于描述节点在哪一个队列上。 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. */感谢各位JSR-166规范的成员,对此类设计的批评与建议。 static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node();//标记节点等待一个共享锁 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;//标记节点等待一个独占锁 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1;//表示等待锁的线程,被取消 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1;//表示后继线程需要被唤醒 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2;//表示在等待条件 /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;//表示下一个获取共享锁的线程,无条件传递获取 /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. SIGNAL:节点的后继由于park等原因被阻塞,当节点释放锁或取消时,要 unpark后继节点。为了避免竞争,acquire方法必须,首先检查他们是否 需要唤醒后继节点,再原子获取锁,获成功,失败,阻塞。 * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. CANCELLED:节点有等待锁超时或者中断等原因,被取消,节点不会停留在这个状态。 如果一个线程被取消,线程就不会再被阻塞。 * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) CONDITION: 处于这个状态的节点线程,放在条件队列中。它永远不会被 用作一个同步队列节点,知道等待的条件发生,节点将被转移到同步队列中。 (这个状态与其他状态,没有关联,只是一种简化的机制)。 * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * PROPAGATE: 处于此模式下,释放共享锁具有传递性。头节点调用 doReleaseShared方法,保证传递释放共享锁,即使有其他的操作干涉。 * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * 这些状态值使用数字,表示状态。当值为负值时,表示节点不需要唤醒, 所以当编码时,不用检查精确的值,比较即可。 * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ field初始化为0,表示一个正常的同步节点。CONDITION属于条件节点。 此field,用CAS的手段进行修改等操作。 //等待状态 volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ 当前线程用,前驱节点检查等待状态。为了给GC提供便利,当节点入队列以后, 如果出队列,前继为nulled。如果前驱节点,处于取消状态,我们应该进行一个短暂的 循环,剔除取消的节点,寻到一个非取消节点作为后继,节点总会存在, 因为队列的头结点是,成功获取锁的节点。取消线程节点,不会成功获取锁, 且只能取消它自己。 volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ 当前线程释放锁,根据后继连接,unpark线程。当出队列时,节点的后继为nulled, 以便gc回收。入队列操作不能保证next不为null,直到处理队列链接中,所以一个 节点的后继为null,不意味着,没有入队列。如果一个节点的后继为null, 我们可以从对尾,浏览他的前继,做双保险检查。为了是节点在同步队列中的 生命周期简单化,当一个取消线程节点,取消时,他的后继节点不为null,而是 指向自己。 volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ 进入队列的节点线程 volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ 节点下一个等待条件或共享锁的节点。当线程持有独占锁时,只需要 访问条件队列,所以我们只需要一个简单的连接队列,存储等待条件的线程。 当他们转移到主队列时,可以重新获取锁。由于条件可以是互斥的, 所以我们用,特殊的值,去表示共享模式。 Node nextWaiter; /** * Returns true if node is waiting in shared mode */ 检点是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ 返回节点的前继,如果为null,抛出空指针异常。前继不内为null, 空值检查可以剔除这种情况,帮助VM回收。 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } //创建初始化head,和共享模式 Node() { // Used to establish initial head or SHARED marker } //构建等待条件节点 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //构建等待状态节点 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ //等待队列的头节点,懒加载,通过setHead方法,初始化及修改头节点。 如果头节点已经存在,要保证他的状态不能为CANCELLED. private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ //等待队列的尾节点,懒加载。通过添加一个新的等待节点来修改 private transient volatile Node tail; /** * The synchronization state. */ //同步状态 private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value */ 获取同步状态,从内存中直接读取 protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> write. * @param newState the new state value */ 设置同步状态,直接写内存 protected final void setState(int newState) { state = newState; } /** * Setup to support compareAndSet. We need to natively implement * this here: For the sake of permitting future enhancements, we * cannot explicitly subclass AtomicInteger, which would be * efficient and useful otherwise. So, as the lesser of evils, we * natively implement using hotspot intrinsics API. And while we * are at it, we do the same for other CASable fields (which could * otherwise be done with atomic field updaters). */ 支持CAS操作。为了增强permitting future,我们需要本地化的实现,我们 不用使用实现AtomicInteger的子类,AtomicInteger在其他方面是高效有用的。 为了得到最优的性能,我们使用VM本地化的API,在CAS性质的fields,操作中 使用相同的机制。 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * Condition implementation for a {@link * AbstractQueuedSynchronizer} serving as the basis of a {@link * Lock} implementation. * 作为AQS实现锁的一个基础实现Condition。 * <p>Method documentation for this class describes mechanics, * not behavioral specifications from the point of view of Lock * and Condition users. Exported versions of this class will in * general need to be accompanied by documentation describing * condition semantics that rely on those of the associated * <tt>AbstractQueuedSynchronizer</tt>. *方法文档用于描述这个条件实现机制,不是锁和条件的使用者,可以使用的操作。 此类的版本与AbstractQueuedSynchronizer相关联。 * <p>This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. */ //这个所有的all fields are transient,所以反序列化时,条件没有等待者。 public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ 队列中第一个等待节点线程 private transient Node firstWaiter; /** Last node of condition queue. */ 队列中最后一个等待条件的节点线程 private transient Node lastWaiter; 剩下的我们会在后面的文章单独将,敬请期待.......... } }
总结:
从阅读源码帮助文档可看出,AQS使用CAS原始,修改锁的状态state;
等待锁的线程被放入到等待队列(CLH队列)中,每个线程等待状态用NODE来描述。
NODE有共享模式和独占模式,独占模式为NULL。NODE有CANCELLED,SIGNAL,SIGNAL,PROPAGATE
4中状态值。
SIGNAL:节点的后继由于park等原因被阻塞,当节点释放锁或取消时,要
unpark后继节点。为了避免竞争,acquire方法必须,首先检查他们是否
需要唤醒后继节点,再原子获取锁,获成功,失败,阻塞。
简单说,节点释放锁,是否需要唤醒后继节点
CANCELLED:节点有等待锁超时或者中断等原因,被取消,节点不会停留在这个状态。
如果一个线程被取消,线程就不会再被阻塞。
简单说,单节点处于这个状态,将被移除到等待队列
CONDITION: 处于这个状态的节点线程,放在条件队列中。它永远不会被
用作一个同步队列节点,直到等待的条件发生,节点将被转移到同步队列中。
(这个状态与其他状态,没有关联,只是一种简化的机制)。
PROPAGATE: 处于此模式下,释放共享锁具有传递性。头节点调用
doReleaseShared方法,保证传递释放共享锁,即使有其他的操作干涉。
这个时共享模式下的状态。
CLH队列由于虚头节点,队列中线程等待节点有一个前驱和一个后继节点,NODE有一个状态
waitStatus,描述线程的当前状态,有一个线程field用于表示当前等待线程,同时还有
nextWaiter节点,用于描述,节点时候有等待条件,或共享模式,获取锁时,需要通知其他线程。
Node nextWaiter:节点下一个等待条件或共享锁的节点。当线程持有独占锁时,只需要
访问条件队列,所以我们只需要一个简单的连接队列,存储等待条件的线程。
当他们转移到主队列时,可以重新获取锁。由于条件可以是互斥的,
所以我们用,特殊的值,去表示共享模式。
AQS有一个状态state表示锁的状态,一个CLH队列存放等待锁的线程节点。NODE还可以用于描述节点的等待条件节点线程,用nextWaiter去关联,组成的队列是条件队列。条件队列和等待队列并不冲突,当等待条件的线程被唤醒时,可以尝试获取锁,加入到等待对列。当一个等待队列节点线程获取独占锁时,可以访问条件队列,唤醒等待条件的线程。AQS还有一个ConditionObject我们,下一篇文章再讲。
发表评论
-
Executors解析
2017-04-07 14:38 1246ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4451ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2117ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4987Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9100Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6082Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3036Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20514Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1503Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1072Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1586Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1061Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1325package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1193/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1159Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1672package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2025线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 920Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1733Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2133Queue接口定义:http://donald-draper. ...
相关推荐
AQS_3抽象队列同步器
在 AQS 中,维护了一个 volatile 的整数 state,表示共享资源的状态另外还有一个 FIFO 的线程等待队列,用于存储多线程争用资源被阻塞时的线程。 AQS 提供了四种方法来控制资源的获取和释放:tryAcquire()、...
当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步...
AQS的acquire方法用于线程以独占模式获取资源,若资源被其他线程占有,则线程会加入到等待队列,并被阻塞。release方法则用于释放资源,它会唤醒队列中等待的下一个线程。 AQS还提供了ConditionObject类,它与AQS...
在线程获取锁时会调用AQS的acquire()方法,该方法第一次尝试获取锁如果失败,会将该线程加入到CLH队列中:public final void acqui
当多个线程试图访问一个共享资源时,AQS 使用一个FIFO(先进先出)的等待队列来管理这些线程。线程通过调用AQS提供的API来尝试获取或释放资源。 **二、AQS框架** AQS 的关键特性包括: 1. **state**:表示共享...
- wait():使当前执行的线程进入等待状态,并释放对象锁。 - notify():唤醒在此对象监视器上等待的单个线程。 - notifyAll():唤醒在此对象监视器上等待的所有线程。 - sleep():使当前线程暂停执行指定的时间...
### CountDownLatch 和 CyclicBarrier 的运用(含AQS详解) #### CountDownLatch **定义与特点:** CountDownLatch 是 Java 并发包中的一个重要组件,它主要用于解决“一个或多个线程等待其他线程完成任务”的问题。...
内容包括 ...10-深入理解AQS之Semaphorer&CountDownLatch&CyclicBarrie详解-fox 11-深入理解AQS之CyclicBarrier&ReentrantReadWriteLock详解-fox 12-深入理解AQS之ReentrantReadWriteLock详解-fox ...
- 通过FIFO队列管理等待线程,通过自旋和阻塞结合的方式提高性能。 #### 6. 公平锁与非公平锁 - **公平锁**:按照线程请求锁的时间顺序分配锁,可以避免饥饿现象。 - **非公平锁**:尝试优先获取锁,可能会导致...
- 等待队列由`firstWaiter`和`lastWaiter`两个`Node`类型的成员变量维护,`Node`是`AQS`中用于表示线程等待状态的类,同时也用于同步队列的管理。 - 调用`await()`方法的线程会变为等待状态,并加入等待队列。当被...
线程在队列中按照FIFO规则等待,当锁被释放时,AQS会使用CLH(自旋锁)队列算法唤醒下一个等待的线程。如果线程在等待过程中被中断,节点的`waitStatus`会被设置为CANCELLED,线程将不再参与同步。 6. **条件变量...
如果尝试获取锁失败,则会将当前线程加入到等待队列中并阻塞当前线程。 2. **tryAcquire(int arg)**: 这个方法需要在子类中实现。它用来尝试获取锁,返回值表示是否获取成功。 3. **acquireQueued(Node node, int ...
### Java多线程面试知识点详解 #### 一、线程与进程的区别 - **定义**: - **进程**:操作系统中的最小运行单位,每个进程都有独立的内存空间。 - **线程**:进程内的执行序列,是调度和执行的基本单位。 - **...
一、概念 AQS 是 AbstractQueuedSynchronizer 的简称,AQS 是一个抽象的队列式同步器框架,提供了...等到占有线程释放锁后唤醒队列中的任务争抢锁,这个队列为 CLH 队列。 使用state成员变量表示当前的同步状态,提供
- 在获取结果时,如果结果尚未准备好,则线程会阻塞在锁对象的wait()方法上,直到其他线程调用setResponse方法来设置结果并调用notifyAll()方法唤醒等待线程。 - 等待线程被唤醒后会重新检查结果是否已设置,如果...
否则,线程将变为等待状态,加入到等待队列。 - **入队**:新到来的线程在等待队列尾部插入。如果队列为空,会创建一个哨兵节点作为头节点和尾节点,新线程的节点插入到尾部,更新前驱节点的等待状态为1。 - **...
在公平模式下,等待线程按照进入队列的顺序依次获得资源;而非公平模式则允许新来的线程尝试直接获取资源,增加了获取资源的概率,但可能导致线程饥饿。 自旋锁是AQS的一个重要特性,当一个线程试图获取资源但发现...