- 浏览: 985132 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析:http://donald-draper.iteye.com/blog/2364100
SynchronousQueue是同步的队列,里面涉及到数据结构和一些算法的知识,今天我们虚心来看一下,能得到多少,是多少。欢迎网友,给出不同的建议,进行共同学习交流。
下面来看dual队列和栈的实现
先看栈:
自此同步队列的TransferStack已经看完,由于同步队列的内容量较大,我们这一篇先分析到这,下一篇再看TransferQueue和其他部分,先小节一下吧:
SynchronousQueue阻塞队列,每次插入操作必须等待一个协同的移除线程,反之亦然。SynchronousQueue同步队列没有容量,可以说,没有一个容量。由于队列中只有在消费线程,尝试消费元素的时候,才会出现元素,所以不能进行peek操作;不能用任何方法,生产元素,除非有消费者在尝试消费元素,同时由于队列中没有元素,所以不能迭代。head是第一个生产线程尝试生产的元素;如果没有这样的生产线程,那么没有元素可利用,remove和poll操作将会返回null。SynchronousQueue实际一个空集合类。同时同步队列不允许为null。同步队列支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。如果一个同步队列构造为公平性,则可以线程以FIFO访问队列元素。当时非公平策略用的是TransferStack,公平策略用的是TransferQueue;TransferStack和TransferQueue是存放等待操作线程的描述,从TransferStack中Snode节点可以看出:节点关联一个等待线程waiter,后继next,匹配节点match,节点元素item和模式mode;模式由三种,REQUEST节点表示消费者等待消费资源,DATA表示生产者等待生产资源。FULFILLING节点表示生产者正在给等待资源的消费者补给资源,或生产者在等待消费者消费资源。当有线程take/put操作时,查看栈头,如果是空队列,或栈头节点的模式与要放入的节点模式相同;如果是超时等待,判断时间是否小于0,小于0则取消节点等待;如果非超时,则将创建的新节点入栈成功,即放在栈头,自旋等待匹配节点(timed决定超时,不超时);如果匹配返回的是自己,节点取消等待,从栈中移除,并遍历栈移除取消等待的节点;匹配成功,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回返回当前节点元素)。如果与栈头节点的模式不同且不为FULFILLING,匹配节点,成功者,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA(put)模式返回返回当前节点元素。如果栈头为FULFILLING,找出栈头的匹配节点,栈头与匹配到的节点同时出栈。从分析非公平模式下的TransferStack,可以看出一个REQUEST操作必须同时伴随着一个DATA操作,一个DATA操作必须同时伴随着一个REQUEST操作,这也是同步队列的命名中含Synchronous原因。这也应了这句话
SynchronousQueue像一个管道,一个操作必须等待另一个操作的发生。
SynchronousQueue解析下-TransferQueue:http://donald-draper.iteye.com/blog/2364842
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析:http://donald-draper.iteye.com/blog/2364100
SynchronousQueue是同步的队列,里面涉及到数据结构和一些算法的知识,今天我们虚心来看一下,能得到多少,是多少。欢迎网友,给出不同的建议,进行共同学习交流。
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; import java.util.*; /** * A {@linkplain BlockingQueue blocking queue} in which each insert * operation must wait for a corresponding remove operation by another * thread, and vice versa. A synchronous queue does not have any * internal capacity, not even a capacity of one. You cannot * <tt>peek</tt> at a synchronous queue because an element is only * present when you try to remove it; you cannot insert an element * (using any method) unless another thread is trying to remove it; * you cannot iterate as there is nothing to iterate. The * [i]head[/i] of the queue is the element that the first queued * inserting thread is trying to add to the queue; if there is no such * queued thread then no element is available for removal and * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other * <tt>Collection</tt> methods (for example <tt>contains</tt>), a * <tt>SynchronousQueue</tt> acts as an empty collection. This queue * does not permit <tt>null</tt> elements. * SynchronousQueue阻塞队列,每次插入操作必须等待一个协同的移除线程,反之亦然。 SynchronousQueue同步队列没有容量,可以说,没有一个容量。由于队列中只有在消费线程, 尝试消费元素的时候,才会出现元素,所以不能进行peek操作;不能用任何方法, 生产元素,除非有消费者在尝试消费元素,同时由于队列中没有元素,所以不能迭代。 head是第一个生产线程尝试生产的元素;如果没有这样的生产线程,那么没有元素可利用, remove和poll操作将会返回null。SynchronousQueue实际一个空集合类。同时同步队列不允许为null。 * <p>Synchronous queues are similar to rendezvous channels used in * CSP and Ada. They are well suited for handoff designs, in which an * object running in one thread must sync up with an object running * in another thread in order to hand it some information, event, or * task. * 同步队列与CSP和Ada场景下的通道相似(具体CSP和Ada可以google,我查的意思 为CSP-Constraint Satisfaction Problem,只有这个意思看上去有点像,怎么感觉不对, 据说CSP在机器学习中很有用,Ada查的靠谱一点的意思为美国军方的程序设计语言,其他的 都不靠谱,看到这篇文章的网友,可以看一下,可以给我发私信或留言,探讨一下)。 同步队列适用于传输通道设计,一个线程同步或生产一个元素,消息,资源,同时 另一个线程消费这些资源或任务。 * <p> This class supports an optional fairness policy for ordering * waiting producer and consumer threads. By default, this ordering * is not guaranteed. However, a queue constructed with fairness set * to <tt>true</tt> grants threads access in FIFO order. * 同步队列支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。 如果一个同步队列构造为公平性,则可以线程以FIFO访问队列元素。 * <p>This class and its iterator implement all of the * [i]optional[/i] methods of the {@link Collection} and {@link * Iterator} interfaces. * 实现了所有Collection和Iterator接口 * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea and Bill Scherer and Michael Scott * @param <E> the type of elements held in this collection */ public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; /* * This class implements extensions of the dual stack and dual * queue algorithms described in "Nonblocking Concurrent Objects * with Condition Synchronization", by W. N. Scherer III and * M. L. Scott. 18th Annual Conf. on Distributed Computing, * Oct. 2004 (see also * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). 同步队列实现拓展了双栈和双队列算法(条件同步的非阻塞并发对象), 在分布计算年刊中有具体描述,见下面连接 * The (Lifo) stack is used for non-fair mode, and the (Fifo) * queue for fair mode. The performance of the two is generally * similar. Fifo usually supports higher throughput under * contention but Lifo maintains higher thread locality in common * applications. * LIFO栈用于非公平模式,FIFO队列用于公平模式。两者的性能大体相同。 FIFO通常用于有高吞吐量存在竞争的场景,LIFO栈用于 Lifo maintains higher thread locality in common applications.这句不翻译了,保持原味。 * A dual queue (and similarly stack) is one that at any given * time either holds "data" -- items provided by put operations, * or "requests" -- slots representing take operations, or is * empty. A call to "fulfill" (i.e., a call requesting an item * from a queue holding data or vice versa) dequeues a * complementary node. The most interesting feature of these * queues is that any operation can figure out which mode the * queue is in, and act accordingly without needing locks. * 双队列是一个在任何时候持有由put操作提供元素的data,slots表示的 take操作的请求,或为空队列,与栈相似。一个调用fulfill操作(请求队列中 的持有元素,即进行put操作),将会有一个不足元素出队列,反之亦然, 意思为一个take操作对一个put操作,一个put操作必须对应一个take操作。 这种队列最有趣的特点是,任何操作不根据锁,可以判断进队列的模式, 是非公平的LIFO栈stack还是公平的FIFO队列queue。 * Both the queue and stack extend abstract class Transferer * defining the single method transfer that does a put or a * take. These are unified into a single method because in dual * data structures, the put and take operations are symmetrical, * so nearly all code can be combined. The resulting transfer * methods are on the long side, but are easier to follow than * they would be if broken up into nearly-duplicated parts. * 队列和栈继承了Transferer类,Transferer定义简单的方法(转换,转让) 做put或take操作。因为在双数据结构中,put和take操作是对称的,所以他们 统一定义在一个方法中,所以几乎所有的代码可以放在一起。 The resulting transfer methods are on the long side, but are easier to follow than they would be if broken up into nearly-duplicated parts. 这段不翻译保持原味。 * The queue and stack data structures share many conceptual * similarities but very few concrete details. For simplicity, * they are kept distinct so that they can later evolve * separately. * 队列和栈数据结构有许多概念上相同的属性,但也有一些具体的不同。 为了简单起见,他们保持着区别,确保later evolve separately。 * The algorithms here differ from the versions in the above paper * in extending them for use in synchronous queues, as well as * dealing with cancellation. The main differences include: * 这个算法与上面论文中的算法有所不同,我们扩展为了论文中的算法用在同步 队列中,也用于处理cancellation。主要的不同包括: * 1. The original algorithms used bit-marked pointers, but * the ones here use mode bits in nodes, leading to a number * of further adaptations. * 2. SynchronousQueues must block threads waiting to become * fulfilled. * 3. Support for cancellation via timeout and interrupts, * including cleaning out cancelled nodes/threads * from lists to avoid garbage retention and memory depletion. * 1.原始算法中用了bit标记指针,本同步队列实现算法中,在节点中使用bits模式, 将导致number进一步的调整。 2.同步队列必须阻塞线程等待变的可填充。 3.支持通过中断和超时取消等待策略,包括从等待队列中清除取消的节点或线程, 以避免产生垃圾,和内存泄漏。 * Blocking is mainly accomplished using LockSupport park/unpark, * except that nodes that appear to be the next ones to become * fulfilled first spin a bit (on multiprocessors only). On very * busy synchronous queues, spinning can dramatically improve * throughput. And on less busy ones, the amount of spinning is * small enough not to be noticeable. * 通过LockSupport的park/unpark方法,实现阻塞,除了在多处理器上, 下一个变得可填充的先自旋的节点或线程。在繁忙的同步队列中,自旋可以显著 提高吞吐量。在不繁忙时,自旋并不太多的消耗。 * Cleaning is done in different ways in queues vs stacks. For * queues, we can almost always remove a node immediately in O(1) * time (modulo retries for consistency checks) when it is * cancelled. But if it may be pinned as the current tail, it must * wait until some subsequent cancellation. For stacks, we need a * potentially O(n) traversal to be sure that we can remove the * node, but this can run concurrently with other threads * accessing the stack. * 在队列和栈中,清除操作有着不同的实现。在队列中,当一个节点或线程取消时, 我们大多数情况下,可以立即以常量1(一致性检查尝试次数的模)的时间移除一个节点或线程。 但是如果一直在队列的尾部,则必须等后来的线程节点取消。对于栈, 我们可能需要时间O(n)遍历已确定那个节点我们可以移除,但是这个可以与 其他线程并发访问栈。 * While garbage collection takes care of most node reclamation * issues that otherwise complicate nonblocking algorithms, care * is taken to "forget" references to data, other nodes, and * threads that might be held on to long-term by blocked * threads. In cases where setting to null would otherwise * conflict with main algorithms, this is done by changing a * node's link to now point to the node itself. This doesn't arise * much for Stack nodes (because blocked threads do not hang on to * old head pointers), but references in Queue nodes must be * aggressively forgotten to avoid reachability of everything any * node has ever referred to since arrival. 然而垃圾回收器必须关注其他复杂非阻塞算法的节点再生问题,数据,节点的引用 及线程也在会通过阻塞其他线程,以便长期持有锁。以防此类情况的发生, 引用将会为设置为null,以免与主要算法冲突,本算法姐姐方法是节点链接指向其自己。 这样不为引起大量的栈节点(因为阻塞线程,不能停留在head指针上),但是为了 避免其他的所有节点与以前引用的节点可达,队列节点的引用必须显示忘记索引。 */ /** * Shared internal API for dual stacks and queues. 双栈和队列共享内部API,队列和栈的父类 */ abstract static class Transferer { /** * Performs a put or take. * 执行一个put或take操作 * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. 如果元素为非空,则交给消费者处理,如果为null,请求生产者 生产一个元素,并返回元素 * @param timed if this operation should timeout 是否超时 * @param nanos the timeout, in nanoseconds 超时时间 * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. 返回元素,如果非null,要不是队列中已经存在的,要不是生产者刚生产的。 如果为null,以为着由于超时,中断导致操作失败,调用可以通过检查线程中断位, 辨别放生了哪一种情况。 */ abstract Object transfer(Object e, boolean timed, long nanos); } /** The number of CPUs, for spin control 获取运行时环境的处理个数*/ static final int NCPUS = Runtime.getRuntime().availableProcessors(); /** * The number of times to spin before blocking in timed waits. * The value is empirically derived -- it works well across a * variety of processors and OSes. Empirically, the best value * seems not to vary with number of CPUs (beyond 2) so is just * a constant. 在超时等待阻塞前,自旋尝试的次数,这个值是一个,在不同处理器和系统性能上 良好工作的经验值。经验上来讲,最好的值,不要随着CPUS的个数/2的值变动, 所以它是一个常量,当处理器个数小于2,则为0,否则为32。 */ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * The number of times to spin before blocking in untimed waits. * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin. 在非超时等待阻塞之前,自旋的次数,最大非超时自旋时间大于最大自旋 时间,因为由于非超时自旋不需要在每次自旋时,不需要检查时间,所以, 非超时自旋非常快。 */ static final int maxUntimedSpins = maxTimedSpins * 16; /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. 快速自旋的时间,而不是park的时间,一个粗略的估计值。 */ static final long spinForTimeoutThreshold = 1000L; }
下面来看dual队列和栈的实现
先看栈:
/** Dual stack */ static final class TransferStack extends Transferer { /* * This extends Scherer-Scott dual stack algorithm, differing, * among other ways, by using "covering" nodes rather than * bit-marked pointers: Fulfilling operations push on marker * nodes (with FULFILLING bit set in mode) to reserve a spot * to match a waiting node. 本stack实现的是算法是拓展了Scherer-Scott双栈的算法,所不同的时,用 covering节点,而不是bit-marked指针:在bit集填充模式下,填充操作将会为 匹配一个等待节点保留资源,生产一个标记节点。 */ /* Modes for SNodes, ORed together in node fields */ /** Node represents an unfulfilled consumer REQUEST节点表示一个未填充的消费者*/ static final int REQUEST = 0; /** Node represents an unfulfilled producer DATA节点表示一个未填充的生产者*/ static final int DATA = 1; /** Node is fulfilling another unfulfilled DATA or REQUEST FULFILLING节点表示生产者正在给等待资源的消费者补给资源,或生产者在等待消费者消费资源/ static final int FULFILLING = 2; /** Return true if m has fulfilling bit set 如果m是一个填充为单元,则返回true*/ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. 栈节点 */ static final class SNode { volatile SNode next; // next node in stack 节点的后继 volatile SNode match; // the node matched to this 匹配节点 volatile Thread waiter; // to control park/unpark 等待者线程 Object item; // data; or null for REQUESTs 数据,消费者消费的资源 int mode;//节点模式 // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. //元素item和mode需要要可见,由于他们总是在其他可见/原子操作写之前,读之后 SNode(Object item) { this.item = item; } //设置节点后继 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * 尝试匹配目标节点与本节点,如果匹配,可以唤醒线程。补给者调用tryMatch方法 确定它们的等待线程。等待线程阻塞到它们自己被匹配。如果匹配返回true。 * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; //如果等待者不为null,则unpark等待线程 if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; } /** * Tries to cancel a wait by matching node to itself.节点尝试取消等待 */ void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } //match指向自己,则取消等待 boolean isCancelled() { return match == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } /** The head (top) of the stack 栈头节点*/ volatile SNode head; //CAS操作nh为当前head,并比较head旧值是否为h boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } /** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and * reused when possible to help reduce intervals between reads * and CASes of head and to avoid surges of garbage when CASes * to push nodes fail due to contention. 创建或重新设置节点的fields。在节点入栈懒创建,在当可能需要保证减少intervals(间隔) 读和head的CAS操或避免由于竞争CAS操作节点入栈引起的垃圾时,此方法会被transfer调用 */ static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; } /** * Puts or takes an item. put或take一个元素 */ Object transfer(Object e, boolean timed, long nanos) { /* * Basic algorithm is to loop trying one of three actions: * 算法的基本步骤是,循环尝试一下3步 * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * 1.如果队列为空或已经包含相同模式的节点,则尝试节点入栈,等待匹配, 返回,如果取消返回null。 * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * 2.如果包含一个互补模式的节点(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST)), 则尝试一个FULFILLING节点入栈,同时匹配等待的协同节点,两个节点同时出栈,返回匹配的元素。 由于其他线程执行步骤3,实际匹配和解除链接指针动作不会发生。 * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn't return the item. 3.如果栈顶存在另外一个FULFILLING的节点,则匹配节点,并出栈。这段的代码 与fulfilling相同,除非没有元素返回 */ SNode s = null; // constructed/reused as needed //根据元素判断节点模式,元素不为null,则为DATA,否则为REQUEST int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode //如果是空队列,或栈头节点的模式与要放入的节点模式相同 if (timed && nanos <= 0) { // can't wait //如果超时,则取消等待,出栈,设置栈头为其后继 if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else //否则返回null return null; } else if (casHead(h, s = snode(s, e, h, mode))) { //如果非超时,则将创建的新节点入栈成功,即放在栈头,自旋等待匹配节点(timed决定超时,不超时) SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled //如果返回的是自己,节点取消等待,从栈中移除,并遍历栈移除取消等待的节点 clean(s); return null; } if ((h = head) != null && h.next == s) //s节点匹配成功,则设置栈头为s的后继 casHead(h, s.next); // help s's fulfiller //匹配成功,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回当前节点元素 return (mode == REQUEST) ? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill //如果栈头节点模式不为Fulfilling,判断是否取消等待,是则出栈 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry //非取消等待,则是节点入栈 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match //后继节点为null,则出栈 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; //尝试匹配是s节点 if (m.tryMatch(s)) { //匹配成功两个节点则出栈, casHead(s, mn); // pop both s and m return (mode == REQUEST) ? m.item : s.item; } else // lost match //否则,跳过s的后继节点 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller //如果栈头节点模式为Fulfilling,找出栈头的匹配节点 SNode m = h.next; // m is h's match if (m == null) // waiter is gone //如果无后继等待节点,则栈头出栈 casHead(h, null); // pop fulfilling node else { //尝试匹配,如果匹配成功,栈头和匹配节点出栈,否则跳过后继节点 SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } } /** * Spins/blocks until node s is matched by a fulfill operation. 自旋或阻塞,直到节点被一个fulfill操作匹配 * * @param s the waiting node 等待被匹配的节点 * @param timed true if timed wait 是否超时等待 * @param nanos timeout value 时间值 * @return matched node, or s if cancelled 如果匹配返回节点,否则取消等待 */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { /* * When a node/thread is about to block, it sets its waiter * field and then rechecks state at least one more time * before actually parking, thus covering race vs * fulfiller noticing that waiter is non-null so should be * woken. * 当一个节点线程将要阻塞时,在实际park之前,设置等待线程的field,重新至少检查 自身状态一次,这样可以避免在fulfiller注意到有等待线程非null,可以操作时,掩盖了竞争。 * When invoked by nodes that appear at the point of call * to be at the head of the stack, calls to park are * preceded by spins to avoid blocking when producers and * consumers are arriving very close in time. This can * happen enough to bother only on multiprocessors. * 当awaitFulfill被栈头节点调用时,通过自旋park一段时间,以免在刚要阻塞的时刻, 有生产者或消费者到达。这在多处理机上将会发生。 * The order of checks for returning out of main loop * reflects fact that interrupts have precedence over * normal returns, which have precedence over * timeouts. (So, on timeout, one last check for match is * done before giving up.) Except that calls from untimed * SynchronousQueue.{poll/offer} don't check interrupts * and don't wait at all, so are trapped in transfer * method rather than calling awaitFulfill. 主循环检查返回的顺序将会反应,在正常返回时,中断是否处理,还是超时处理。 (在放弃匹配之前,及最后一次检查,正好超时),除非调用SynchronousQueue的 非超时poll/offer操作,不会检查中断,不等待,那么将调用transfer方法中的其他部分逻辑, 而不是调用awaitFulfill。 */ long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); SNode h = head; //获取自旋的次数 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) //如果线程被中断,则取消等待 s.tryCancel(); SNode m = s.match; if (m != null) //如果节点的匹配节点不为null,则返回匹配节点 return m; if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { //如果超时,则取消等待 s.tryCancel(); continue; } } if (spins > 0) //如果自旋次数大于零,且可以自旋,则自旋次数减1 spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) //如果节点S的等待线程为空,则设置当前节点为S节点的等待线程,以便可以park后继节点。 s.waiter = w; // establish waiter so can park next iter else if (!timed) //非超时等在者,park当前线程 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) //如果超时时间大于,最大自旋阈值,则超时park当前线程 LockSupport.parkNanos(this, nanos); } } /** * Returns true if node s is at head or there is an active * fulfiller. 如果节点在栈头或栈头为FULFILLING的节点,则返回true */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); } /** * Unlinks s from the stack. */ void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed * it. But we can stop when we see any node known to * follow s. We use s.next unless it too is cancelled, in * which case we try the node one past. We don't check any * further because we don't want to doubly traverse just to * find sentinel. 最糟糕的情况是我们需要遍历整个栈,unlink节点s。如果有多个线程同时访问 clean方法,由于其他线程可能移除s节点,我们也许看不到s节点。但是我们可以停止 操作,当发现一个节点的后继为s。我们可以用s节点的后继,除非s节点取消,否则, 我们可越过s节点。我们不会进一步地检查,因为我们不想仅仅为了发现s节点,遍历两次。 */ SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head SNode p; while ((p = head) != null && p != past && p.isCancelled()) //设置栈头节点的后继为第一个非取消等待的节点 casHead(p, p.next); // Unsplice embedded nodes,遍历栈,移除取消等待的节点 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } } }
自此同步队列的TransferStack已经看完,由于同步队列的内容量较大,我们这一篇先分析到这,下一篇再看TransferQueue和其他部分,先小节一下吧:
SynchronousQueue阻塞队列,每次插入操作必须等待一个协同的移除线程,反之亦然。SynchronousQueue同步队列没有容量,可以说,没有一个容量。由于队列中只有在消费线程,尝试消费元素的时候,才会出现元素,所以不能进行peek操作;不能用任何方法,生产元素,除非有消费者在尝试消费元素,同时由于队列中没有元素,所以不能迭代。head是第一个生产线程尝试生产的元素;如果没有这样的生产线程,那么没有元素可利用,remove和poll操作将会返回null。SynchronousQueue实际一个空集合类。同时同步队列不允许为null。同步队列支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。如果一个同步队列构造为公平性,则可以线程以FIFO访问队列元素。当时非公平策略用的是TransferStack,公平策略用的是TransferQueue;TransferStack和TransferQueue是存放等待操作线程的描述,从TransferStack中Snode节点可以看出:节点关联一个等待线程waiter,后继next,匹配节点match,节点元素item和模式mode;模式由三种,REQUEST节点表示消费者等待消费资源,DATA表示生产者等待生产资源。FULFILLING节点表示生产者正在给等待资源的消费者补给资源,或生产者在等待消费者消费资源。当有线程take/put操作时,查看栈头,如果是空队列,或栈头节点的模式与要放入的节点模式相同;如果是超时等待,判断时间是否小于0,小于0则取消节点等待;如果非超时,则将创建的新节点入栈成功,即放在栈头,自旋等待匹配节点(timed决定超时,不超时);如果匹配返回的是自己,节点取消等待,从栈中移除,并遍历栈移除取消等待的节点;匹配成功,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回返回当前节点元素)。如果与栈头节点的模式不同且不为FULFILLING,匹配节点,成功者,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA(put)模式返回返回当前节点元素。如果栈头为FULFILLING,找出栈头的匹配节点,栈头与匹配到的节点同时出栈。从分析非公平模式下的TransferStack,可以看出一个REQUEST操作必须同时伴随着一个DATA操作,一个DATA操作必须同时伴随着一个REQUEST操作,这也是同步队列的命名中含Synchronous原因。这也应了这句话
SynchronousQueue像一个管道,一个操作必须等待另一个操作的发生。
SynchronousQueue解析下-TransferQueue:http://donald-draper.iteye.com/blog/2364842
发表评论
-
Executors解析
2017-04-07 14:38 1256ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4459ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2128ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4992Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9113Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6088Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3042Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20523Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1511Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1081Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1596Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1069Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1330package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1199/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1164Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1678package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2031线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 926Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1738Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2141Queue接口定义:http://donald-draper. ...
相关推荐
Java 同步器 SynchronousQueue 详解及实例 Java 中的同步器 SynchronousQueue 是一种特殊的阻塞队列,它最多只能放一个元素,这个元素如果不在特定的时间消费掉就会被删除,队列的长度始终为 0。SynchronousQueue ...
SynchronousQueue是Java中的一个阻塞队列实现,它在Java并发编程中扮演着重要的角色。SynchronousQueue特别之处在于它没有进行数据存储的能力,也就是说它的容量为0。其核心功能是用于线程之间的直接传递数据,实现...
与Exchanger相比,SynchronousQueue同样允许两个线程间交换数据,但Exchanger是在两个线程交换数据后才继续执行,而SynchronousQueue则是实时交换,一旦一个线程完成其操作,另一个线程立即接手。 SynchronousQueue...
SynchronousQueue核心属性和方法源码的分析的代码
- 主要涉及到`ThreadPoolExecutor`实例的创建,参数包括核心线程数、最大线程数、线程存活时间、时间单位以及工作队列的选择(如ArrayBlockingQueue或SynchronousQueue)。 - 任务提交:`execute()`方法用于提交...
- **runnableTaskQueue**:用于存放待处理任务的阻塞队列,如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。 - **ThreadFactory**:创建线程的工厂,可以定制线程属性。 - **...
- **workQueue**: 用于存放等待执行的任务的阻塞队列,常见的有ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue等。 - **threadFactory**: 创建新线程的工厂,用于定制线程的创建行为。 - **handler*...
- SynchronousQueue:不存储元素的阻塞队列,提交任务必须等待另一个线程取走。 - PriorityBlockingQueue:具有优先级的无界阻塞队列。 - DelayQueue:用于延迟执行元素的无界阻塞队列。 - LinkedTransferQueue:...
【阿里技术面试题解析】 1. **SynchronousQueue Quiz** SynchronousQueue 是一个特殊的 BlockingQueue,它不存储任何元素,所有的插入操作必须等到其他线程进行相应的删除操作才能完成。在给定的代码中,试图向空...
### Java多线程面试知识点详解 #### 一、线程与进程的区别 ...- **SynchronousQueue**: - 不存储元素的阻塞队列。 以上内容覆盖了Java多线程面试中的常见问题及知识点,希望能帮助你更好地准备面试。
- **SynchronousQueue实现原理**:一个不存储元素的阻塞队列,适用于传递任务。 - **自定义类的应用场景**:当现有类无法满足需求时,可以自定义类来实现特定功能。 - **双亲委派模型**:类加载机制的一部分,确保类...
线程池可以预先配置最大线程数、空闲线程存活时间等参数,从而优化系统资源的使用,避免过多线程导致的上下文切换开销。 `ThreadPoolExecutor`的构造函数接收以下参数: 1. `corePoolSize`: 核心线程数,即使无任务...
### Java核心知识点解析 #### 1. `hashCode`与`equals`的关系 - **问题**:`hashCode`相等的两个对象一定相等吗?`equals`呢?反过来相等吗? - **解答**: - `hashCode`相等并不意味着两个对象一定相等。这是...
- `workQueue`: 用于保存待执行任务的阻塞队列,有多种类型可供选择,如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue和PriorityBlockingQueue。 - `threadFactory`: 创建线程的工厂,可定制线程属性...
- **选择合适的工作队列**:无界队列(如`LinkedBlockingQueue`)、有界队列(如`ArrayBlockingQueue`)、直接提交给线程(如`SynchronousQueue`)。 - **监控和调整线程池**:使用`ThreadPoolExecutor`的`...
以上是针对阿里技术面试题中的部分知识点解析,涵盖了并发编程、网络编程、软件质量保障、算法优化、系统设计等多个领域,这些都是互联网行业中尤其是阿里集团在招聘技术人才时关注的关键技能。
- **SynchronousQueue**: 特殊的`BlockingQueue`,要求对它的操作必须是交替进行的,即一次放一个元素,然后才能取一个元素。 #### 4. ArrayBlockingQueue与LinkedBlockingQueue的比较 - **数据结构不同**:`...
- **SynchronousQueue**:阻塞队列,不存储任务,新任务会直接交给线程处理。如果所有线程都在工作,会尝试创建新线程,通常将 maximumPoolSize 设置为 Integer.MAX_VALUE。 - **DelayQueue**:任务需要延迟一定...