- 浏览: 984346 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
AtomicInteger解析:http://donald-draper.iteye.com/blog/2359555
锁持有者管理器AbstractOwnableSynchronizer:http://donald-draper.iteye.com/blog/2360109
AQS线程挂起辅助类:LockSupport:http://donald-draper.iteye.com/blog/2360206
AQS详解-CLH队列,线程等待状态:http://donald-draper.iteye.com/blog/2360256
//AQS
将一个条件队列节点,转移到同步等待队列
小节:从上面可以看出,唤醒节点时,首先等待条件线程节点状态,
设置为线程等待的初始状态0,然后添加到等待队列,
如果线程取消等待,则移除节点线程,否则通知节点前驱,当前驱
节点线程释放锁时,unpark线程。
再回到ConditionObject
下面来看fullyRelease(node);
//AQS
回到 awaitUninterruptibly
看下面一句
//AQS
回到 awaitUninterruptibly
//Thread
回到 awaitUninterruptibly
再看这一段
//AQS
acquireQueued过程是这样的:
1. 如果当前节点是AQS队列的头结点(如果第一个节点是DUMP节点也就是傀儡节点,
那么第二个节点实际上就是头结点了),
就尝试在此获取锁tryAcquire(arg)。
如果成功就将头结点设置为当前节点(不管第一个结点是否是DUMP节点),返回中断位。否则进行2。
2. 检测当前节点是否应该park(),如果应该park()就挂起当前线程并且返回当前线程中断位。进行操作1。
再来看
在回到这个方法
小节:非中断模式等待,首先添加新的等待条件线程节点,到等待条件线程队列;
释放节点状态,释放节点锁状态过程,待子类扩展,在过程中,同时唤醒等待队列
头结点;再判断节点是在同步等待队列上,如果不在,则park当前线程;
如果线程已经中断,则取消线程中断状态,即线程非中断等待条件。
回到ConditionObject的其他方法
//ConditionObject
//AQS
//Thread
该方法与sleep()类似,只是不能由用户指定暂停多长时间,
并且yield()方法只能让同优先级的线程有执行的机会。
//ConditionObject
总结:
ConditionObject是AQS的一个内部类,其实现是基于AQS;ConditionObject中的条件等待队列中的节点与同步队列中的节点基本相同,最大的不同时,同步等待队列
中的节点有先驱pre和后继next,而条件等待队列中的节点只有后继nextWaiter。
条件等待有两种方法,一种为非中断等待awaitUninterruptibly,在线程等待条件时不可中断线程;另外一种,可中断等待await,等待后,确定中断当前线程,还是抛出异常。
非中断模式等待,首先添加新的等待条件线程节点,到等待条件线程队列;
释放节点状态,释放节点锁状态过程,待子类扩展,在过程中,同时唤醒等待队列
头结点;再判断节点是在同步等待队列上,如果不在,则park当前线程;
如果线程已经中断,则取消线程中断状态,即线程非中断等待条件。
唤醒有两种,一种是唤醒等待条件队列中的头结点,另一种,唤醒条件等待队列中,
所有等待此条件的节点线程。唤醒节点时,首先设置线程等待的节点状态的初始状态0,
然后添加到同步等待队列,如果线程取消等待,则移除线程,否则通知节点前驱,当前驱
节点线程释放锁时,unpark线程。
在唤醒等待条件的线程时,为什么要将,节点移动同步等待线程节点能,这是因为条件时锁的一部分,先创建锁,再创建条件;当线程被唤醒时,只能说明,线程条件发生,能处于park状态,要获取锁,才能unpark线程。
锁持有者管理器AbstractOwnableSynchronizer:http://donald-draper.iteye.com/blog/2360109
AQS线程挂起辅助类:LockSupport:http://donald-draper.iteye.com/blog/2360206
AQS详解-CLH队列,线程等待状态:http://donald-draper.iteye.com/blog/2360256
/** * Condition implementation for a {@link * AbstractQueuedSynchronizer} serving as the basis of a {@link * Lock} implementation. * 作为AQS实现锁的一个基础实现Condition。 * <p>Method documentation for this class describes mechanics, * not behavioral specifications from the point of view of Lock * and Condition users. Exported versions of this class will in * general need to be accompanied by documentation describing * condition semantics that rely on those of the associated * <tt>AbstractQueuedSynchronizer</tt>. *方法文档用于描述这个条件实现机制,不是锁和条件的使用者,可以使用的操作。 此类的版本与AbstractQueuedSynchronizer相关联。 * <p>This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. */ //这个所有的all fields are transient,所以反序列化时,条件没有等待者。 public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ 队列中第一个等待节点线程 private transient Node firstWaiter; /** Last node of condition queue. */ 队列中最后一个等待条件的节点线程 private transient Node lastWaiter; /** * Creates a new <tt>ConditionObject</tt> instance. */ //初始化实例 public ConditionObject() { } // Internal methods /** * Adds a new waiter to wait queue. * @return its new wait node */ //添加一个条件等待线程节点,到条件等待队列 private Node addConditionWaiter() { Node t = lastWaiter;//取得队列的尾节点 // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { //移除队列中非等待条件的线程节点 unlinkCancelledWaiters(); t = lastWaiter; } //创建新节点条件线程等待节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); //等队列为空,则新节点为头节点,否则加入到队尾 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ //从条件队列,移除取消等待条件的节点线程。当线程只有锁时,会调用此方法。 当在线程等待条件时,被取消,或者添加新节点时,发现尾节点已经取消等待,则 调用此方法。在没有信号条件的时候,方法需要避免垃圾的产生。尽管可能需要遍历 队列,当超时或取消条件等待时候,仍然会触发此方法。此方法会遍历会有节点, 不会因为某个特殊事件的发生,不移除取消条件等待的垃圾节点。 private void unlinkCancelledWaiters() { //取得头结点 Node t = firstWaiter; Node trail = null; //遍历队列,移除非等待条件的节点 while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next;//队列头 else trail.nextWaiter = next; if (next == null) lastWaiter = trail;//队列尾 } else trail = t; t = next; } } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ //唤醒等待队列中,第一个等待条件的线程节点。 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // } while (!transferForSignal(first) && (first = firstWaiter) != null); } }
//AQS
将一个条件队列节点,转移到同步等待队列
transferForSignal(Node node)
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal). */ //将一个条件队列节点,转移到同步等待队列 final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //将节点状态为等待条件,则将节点的状态,设置线程等待的初始状态0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node);//添加到队列 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果线程取消等待,或设置唤醒成功,则unpark线程 LockSupport.unpark(node.thread); return true; } /** * CAS waitStatus field of a node. */ //修改节点的等待状态 private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ //添加节点到等待队列,以CAS操作,将节点添加到等待队列中 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
小节:从上面可以看出,唤醒节点时,首先等待条件线程节点状态,
设置为线程等待的初始状态0,然后添加到等待队列,
如果线程取消等待,则移除节点线程,否则通知节点前驱,当前驱
节点线程释放锁时,unpark线程。
再回到ConditionObject
//ConditionObject /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ //唤醒队列中所有等待条件的节点 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } // public methods /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ //如果线程持有独占锁,从头结点开始,唤醒第一个等待条件的线程节点 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ //如果线程持有独占锁,从头结点开始,唤醒所有等待条件的线程节点 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Implements uninterruptible condition wait. * [list=1]不可中断条件等待 * <li> Save lock state returned by {@link #getState}. 保存锁的当前state * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled. 调用此方法的线程,阻塞至被唤醒 * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * [/list] */ public final void awaitUninterruptibly() { Node node = addConditionWaiter();//添加条件等待节点 int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
下面来看fullyRelease(node);
//AQS
/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ //释放锁的当前状态,返回保存值,如果是取消等待节点,则抛出异常 final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();//获取当前等待状态 if (release(savedState)) { failed = false; return savedState; } else { //释放失败,抛出异常 throw new IllegalMonitorStateException(); } } finally { if (failed) //如果失释放败,设置节点状态为取消 node.waitStatus = Node.CANCELLED; } } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * 释放独占模式锁,用unblock线程 * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { //如果尝试释放锁成功 Node h = head; if (h != null && h.waitStatus != 0) //如果头结点不为null,且非初始等待状态0,则unpark头结点的后继 unparkSuccessor(h); return true; } return false; } //待子类扩展 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * Wakes up node's successor, if one exists. *唤醒接待的后继 * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //如果状态值为负,线程需要唤醒,尝试着清除状态值, int ws = node.waitStatus; if (ws < 0) //设置线程状态为初始值 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //从队尾遍历器前驱,找到最后一个状态值为负的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //unpark节点线程 LockSupport.unpark(s.thread); }
回到 awaitUninterruptibly
public final void awaitUninterruptibly() { Node node = addConditionWaiter();//添加条件等待节点 int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程 boolean interrupted = false; //如果节点在同步等待队列上 while (!isOnSyncQueue(node)) { //park当前线程 LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
看下面一句
isOnSyncQueue(node)
//AQS
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ //如果一个节点,刚开始在条件队列,现在,再同步等待队列 final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) //条件等待节点线程,返回false return false; if (node.next != null) // If has successor, it must be on queue 如果有前驱,代表在同步等待队列上 return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ 节点的前驱可能为非null,但不在队列中,可能由于CAS操作失败。 所以我们不得不确定,,节点是否在队尾附近。 return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ //从队尾遍历,查看节点是否在同步等待队列上 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
回到 awaitUninterruptibly
public final void awaitUninterruptibly() { Node node = addConditionWaiter();//添加条件等待节点 int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程 boolean interrupted = false; //如果节点在同步等待队列上 while (!isOnSyncQueue(node)) { //park当前线程 LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
//Thread
/** * Tests whether the current thread has been interrupted. The * <i>interrupted status</i> of the thread is cleared by this method. In * other words, if this method were to be called twice in succession, the * second call would return false (unless the current thread were * interrupted again, after the first call had cleared its interrupted * status and before the second call had examined it). * * <p>A thread interruption ignored because a thread was not alive * at the time of the interrupt will be reflected by this method * returning false. * 判断当前线程是否已经被中断,这个方法可以清楚线程的中断状态。换种说法, 当此方法成功调用两次时,第二次返回为false, * @return <code>true</code> if the current thread has been interrupted; * <code>false</code> otherwise. * @see #isInterrupted() * @revised 6.0 */ 查看当前线程是否已经被可中断,如果当前线程处于非中断状态,可返回true,否 返回else。 public static boolean interrupted() { return currentThread().isInterrupted(true); }
回到 awaitUninterruptibly
public final void awaitUninterruptibly() { Node node = addConditionWaiter();//添加条件等待节点 int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程 boolean interrupted = false; //如果节点在同步等待队列上 while (!isOnSyncQueue(node)) { //park当前线程 LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
再看这一段
if (acquireQueued(node, savedState) || interrupted) selfInterrupt();
//AQS
/* * Various flavors of acquire, varying in exclusive/shared and * control modes. Each is mostly the same, but annoyingly * different. Only a little bit of factoring is possible due to * interactions of exception mechanics (including ensuring that we * cancel if tryAcquire throws exception) and other control, at * least not without hurting performance too much. */ 不同的获取锁方式,对应着不同的控制模式,不如独占、共享。 每一种大部分相同,但极少数不同。不同的,不如获取出现异常时的,取消 方式。 /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //如果节点的前继,是头节点则,尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //前驱线程释放锁之后,是否应该唤醒后继节点,如果是,则 //park当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //如果失败,则取消线程 cancelAcquire(node); } }
acquireQueued过程是这样的:
1. 如果当前节点是AQS队列的头结点(如果第一个节点是DUMP节点也就是傀儡节点,
那么第二个节点实际上就是头结点了),
就尝试在此获取锁tryAcquire(arg)。
如果成功就将头结点设置为当前节点(不管第一个结点是否是DUMP节点),返回中断位。否则进行2。
2. 检测当前节点是否应该park(),如果应该park()就挂起当前线程并且返回当前线程中断位。进行操作1。
/** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. *设置头节点 * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev *如果一个节点获取锁失败,则检查和更新节点状态 如果节点应该block,返回true * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. 先驱节点释放锁时,需要唤醒后继节点 */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; 如果先驱节点,等待被取消,向前遍历,找到第一个等待条件的节点 } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ 设置先驱节点状态为SIGNAL,当先驱释放锁时,需要唤醒后继节点线程 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * Convenience method to park and then check if interrupted * park当前线程 * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } // Utilities for various versions of acquire /** * Cancels an ongoing attempt to acquire. *取消一个尝试获取锁的线程 * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //清空节点线程 node.thread = null; // Skip cancelled predecessors Node pred = node.prev; //找到第一个,等待条件的节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //设置节点状态,为取消状态 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } //节点指向自己,等待gc回收 node.next = node; // help GC } }
再来看
selfInterrupt();
/** * Convenience method to interrupt current thread. */ private static void selfInterrupt() { Thread.currentThread().interrupt(); }
在回到这个方法
public final void awaitUninterruptibly() { Node node = addConditionWaiter();//添加条件等待节点 int savedState = fullyRelease(node);//释放节点持有的锁,唤醒队列第一个节点线程 boolean interrupted = false; //如果节点在同步等待队列上 while (!isOnSyncQueue(node)) { //park当前线程 LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
小节:非中断模式等待,首先添加新的等待条件线程节点,到等待条件线程队列;
释放节点状态,释放节点锁状态过程,待子类扩展,在过程中,同时唤醒等待队列
头结点;再判断节点是在同步等待队列上,如果不在,则park当前线程;
如果线程已经中断,则取消线程中断状态,即线程非中断等待条件。
回到ConditionObject的其他方法
//ConditionObject
/* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */ 当线程是可中断等待时,我们需要捕捉是否抛出中断异常, 当阻塞在条件上时,则中断 /** Mode meaning to reinterrupt on exit from wait */ //在此模式上表示,当退出等待条件时,需要二次中断,即消除中断状态 private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ //此模式用于描述,当退出等待条件时,需要抛出异常 private static final int THROW_IE = -1; /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ //检查节点线程,是否需要消除中断,抛出异常,还是不需要中断 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
//AQS
/** * Transfers node, if necessary, to sync queue after a cancelled * wait. Returns true if thread was cancelled before being * signalled. * @param current the waiting thread * @param node its node * @return true if cancelled before the node was signalled */ 在线程被唤醒之前,取消等待则返回true。如果需要的话, 当线程等待被取消时,将线程节点放到,同步等待队列中 final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //如果设置节点为初始化状态成功,则添加到同步等待队列 enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }
//Thread
该方法与sleep()类似,只是不能由用户指定暂停多长时间,
并且yield()方法只能让同优先级的线程有执行的机会。
/** * A hint to the scheduler that the current thread is willing to yield * its current use of a processor. The scheduler is free to ignore this * hint. * * <p> Yield is a heuristic attempt to improve relative progression * between threads that would otherwise over-utilise a CPU. Its use * should be combined with detailed profiling and benchmarking to * ensure that it actually has the desired effect. * * <p> It is rarely appropriate to use this method. It may be useful * for debugging or testing purposes, where it may help to reproduce * bugs due to race conditions. It may also be useful when designing * concurrency control constructs such as the ones in the * {@link java.util.concurrent.locks} package. */ public static native void yield();
//ConditionObject
/** * Implements interruptible condition wait. * [list=1] * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * [/list] */ //可中断的条件等待 public final void await() throws InterruptedException { if (Thread.interrupted()) //如果线程中断,则抛出异常 throw new InterruptedException(); //添加新的线程节点到等待条件队列 Node node = addConditionWaiter(); //释放节点锁状态,unpark等待队列头节点 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //如果节点不在同步等待队列,则park LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //判断线程等待条件后,是中断,还是抛出异常 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //移除取消等待条件的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ //根据线程当前中断模式,确定中断,还是抛出异常 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) //抛出异常 throw new InterruptedException(); else if (interruptMode == REINTERRUPT) //中断 selfInterrupt(); } /** * Implements timed condition wait. * [list=1] * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * [/list] */ 超时等待条件,如果当前线程处于中断,则抛出中断异常, 线程阻塞到,知道唤醒,或时间超时,或被中断。 public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return nanosTimeout - (System.nanoTime() - lastTime); } /** * Implements absolute timed condition wait. * [list=1] * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * [/list] */ 超时等待条件,如果当前线程处于中断,则抛出中断异常, 线程阻塞到,知道唤醒,或到指定时间,或被中断。 public final boolean awaitUntil(Date deadline) throws InterruptedException { if (deadline == null) throw new NullPointerException(); long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * Implements timed condition wait. * [list=1] * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * [/list] */ 超时等待条件,如果当前线程处于中断,则抛出中断异常, 线程阻塞到,知道唤醒,或到时间超时,或被中断。 public final boolean await(long time, TimeUnit unit) throws InterruptedException { if (unit == null) throw new NullPointerException(); long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); long lastTime = System.nanoTime(); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // support for instrumentation /** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ //判断条件是否为AQS创建 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } /** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ //判断时候还有线程在等待这个条件 protected final boolean hasWaiters() { if (!isHeldExclusively()) //如果当前线程非独占锁持有者,抛出异常 throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { //如果等待条件队列中,有等在次条件的节点线程,则返回true if (w.waitStatus == Node.CONDITION) return true; } return false; } /** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ //返回等待此条件的节点线程数 protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } /** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ //获取等待此条件的线程 protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
总结:
ConditionObject是AQS的一个内部类,其实现是基于AQS;ConditionObject中的条件等待队列中的节点与同步队列中的节点基本相同,最大的不同时,同步等待队列
中的节点有先驱pre和后继next,而条件等待队列中的节点只有后继nextWaiter。
条件等待有两种方法,一种为非中断等待awaitUninterruptibly,在线程等待条件时不可中断线程;另外一种,可中断等待await,等待后,确定中断当前线程,还是抛出异常。
非中断模式等待,首先添加新的等待条件线程节点,到等待条件线程队列;
释放节点状态,释放节点锁状态过程,待子类扩展,在过程中,同时唤醒等待队列
头结点;再判断节点是在同步等待队列上,如果不在,则park当前线程;
如果线程已经中断,则取消线程中断状态,即线程非中断等待条件。
唤醒有两种,一种是唤醒等待条件队列中的头结点,另一种,唤醒条件等待队列中,
所有等待此条件的节点线程。唤醒节点时,首先设置线程等待的节点状态的初始状态0,
然后添加到同步等待队列,如果线程取消等待,则移除线程,否则通知节点前驱,当前驱
节点线程释放锁时,unpark线程。
在唤醒等待条件的线程时,为什么要将,节点移动同步等待线程节点能,这是因为条件时锁的一部分,先创建锁,再创建条件;当线程被唤醒时,只能说明,线程条件发生,能处于park状态,要获取锁,才能unpark线程。
发表评论
-
Executors解析
2017-04-07 14:38 1255ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4455ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2127ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4992Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9111Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6087Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3041Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20520Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1510Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1080Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1593Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1068Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1329package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1198/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1162Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1676package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2030线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 925Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1737Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2140Queue接口定义:http://donald-draper. ...
相关推荐
- `Condition`的实现基于`AbstractQueuedSynchronizer`(AQS),一个内部维护了同步队列和等待队列的抽象类。`ConditionObject`是`AQS`的内部类,用于实现`Condition`接口。 - 等待队列由`firstWaiter`和`lastWaiter...
Java并发之AQS详解 AbstractQueuedSynchronizer(AQS)是 Java 并发编程中的一个核心组件,提供了一套多线程访问共享资源的同步器框架。AQS 定义了两种资源共享方式:Exclusive(独占)和 Share(共享)。在 AQS 中...
这个类位于java.util.concurrent.locks包下,是实现并发编程中AQS(AbstractQueuedSynchronizer)框架的重要基础之一。LockSupport的目的是为了简化锁和其他同步类的实现,允许开发者创建更加高级的并发工具。 ...
### CountDownLatch 和 CyclicBarrier 的运用(含AQS详解) #### CountDownLatch **定义与特点:** CountDownLatch 是 Java 并发包中的一个重要组件,它主要用于解决“一个或多个线程等待其他线程完成任务”的问题。...
ReentrantLock源码详解--条件锁 ReentrantLock源码详解中最重要的一个部分就是条件锁,条件锁是指在获取锁之后发现当前业务场景自己无法处理,而需要等待某个条件的出现才可以继续处理时使用的一种锁。今天我们来...
AQS组件详解 AQS(AbstractQueuedSynchronizer),是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器。下面对AQS的基本执行过程、队列节点结构、独占锁模式、共享锁模式、条件...
【Condition接口详解】 在Java并发编程中,`Condition`接口是一个非常重要的概念,它提供了类似于`Object`类的监视器方法(如`wait()`、`notify()`和`notifyAll()`),但具有更高的灵活性。`Condition`接口是Java `...
《Lock详解——深入理解Java并发编程的基石》 在Java并发编程中,Lock接口及其实现是不可或缺的一部分。本文将深入探讨Lock接口的核心实现——ReentrantLock,以及它背后的抽象队列同步器...
4-9 源码探秘之AQS如何用单一int值表示读写两种状态.mp4 4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 ...
4-9 源码探秘之AQS如何用单一int值表示读写两种状态.mp4 4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 ...
4-9 源码探秘之AQS如何用单一int值表示读写两种状态.mp4 4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 ...
4-9 源码探秘之AQS如何用单一int值表示读写两种状态.mp4 4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 ...
4-9 源码探秘之AQS如何用单一int值表示读写两种状态.mp4 4-10 深入剖析ReentrantReadWriteLock之读锁源码实现.mp4 4-11 深入剖析ReentrantReadWriteLock之写锁源码实现.mp4 4-12 锁降级详解.mp4 4-13 ...
AbstractQueuedSynchronizer(AQS)详解.mp4 使用AQS重写自己的锁.mp4 重入锁原理与演示.mp4 读写锁认识与原理.mp4 细读ReentrantReadWriteLock源码.mp4 ReentrantReadWriteLock锁降级详解.mp4 线程安全性问题简单总结...
12.详解Condition的await和signal等待通知机制 13.LockSupport工具 14.并发容器之ConcurrentHashMap(JDK 1.8版本) 15.并发容器之ConcurrentLinkedQueue 16.并发容器之CopyOnWriteArrayList 17.并发容器之ThreadLocal...
"04 并发编程专题07.zip"这个压缩包文件包含的两个子文件" Synchronized&Lock&AQS详解(上)(1).vep"和"Synchronized&Lock&AQS详解(上)(2).vep"很显然是关于Java并发编程中的关键概念——同步机制的深入讲解。...
`Condition`接口是`AbstractQueuedSynchronizer(AQS)`类的一部分,AQS是Java并发包中许多锁和同步器的基础。每个`Condition`对象都关联了一个等待队列,当线程调用`Condition`的`await()`方法时,线程会被阻塞并加入...
第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理00:18:04分钟 | 第26节细读...
第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理00:18:04分钟 | 第26节细读...