`
headof
  • 浏览: 23540 次
  • 来自: ...
社区版块
存档分类
最新评论

ConcurrentLinkedQueue

 
阅读更多
ConcurrentLinkedQueue是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性.
数据结构为:单向链表.
变量使用volatile修改,保证内在可见性(happens-before,对变量的写操作对后续的读操作是可见的),同样也不会导致CPU指令的重排序.
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

  /*** 
   * Sets the value of the object field at the specified offset in the 
   * supplied object to the given value, with volatile store semantics. 
   * 设置obj对象中offset偏移地址对应的object型field的值为指定值。支持volatile store语义 
   *  
   * @param obj the object containing the field to modify. 
   *    包含需要修改field的对象 
   * @param offset the offset of the object field within <code>obj</code>. 
   *     <code>obj</code>中object型field的偏移量 
   * @param value the new value of the field. 
   *       field将被设置的新值 
   * @see #putObject(Object,long,Object) 
   */  
  public native void putObjectVolatile(Object obj, long offset, Object value); 

  /*** 
   * Compares the value of the object field at the specified offset 
   * in the supplied object with the given expected value, and updates 
   * it if they match.  The operation of this method should be atomic, 
   * thus providing an uninterruptible way of updating an object field. 
   * 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法 
   * 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。 
   *  
   * @param obj the object containing the field to modify. 
   *    包含要修改field的对象  
   * @param offset the offset of the object field within <code>obj</code>. 
   *         <code>obj</code>中object型field的偏移量 
   * @param expect the expected value of the field. 
   *               希望field中存在的值 
   * @param update the new value of the field if it equals <code>expect</code>. 
   *               如果期望值expect与field的当前值相同,设置filed的值为这个新值 
   * @return true if the field was changed. 
   *              如果field的值被更改 
   */  
  public native boolean compareAndSwapObject(Object obj, long offset,  
                                             Object expect, Object update);  

  /*** 
   * Sets the value of the object field at the specified offset in the 
   * supplied object to the given value.  This is an ordered or lazy 
   * version of <code>putObjectVolatile(Object,long,Object)</code>, which 
   * doesn't guarantee the immediate visibility of the change to other 
   * threads.  It is only really useful where the object field is 
   * <code>volatile</code>, and is thus expected to change unexpectedly. 
   * 设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者 
   * 有延迟的<code>putObjectVolatile</cdoe>方法,并且不保证值的改变被其他线程立 
   * 即看到。只有在field被<code>volatile</code>修饰并且期望被意外修改的时候 
   * 使用才有用。 
   * 
   * @param obj the object containing the field to modify. 
   *    包含需要修改field的对象 
   * @param offset the offset of the object field within <code>obj</code>. 
   *       <code>obj</code>中long型field的偏移量 
   * @param value the new value of the field. 
   *      field将被设置的新值 
   */  
  public native void putOrderedObject(Object obj, long offset, Object value); 


构造head和tail都为一个空的Node
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

因为只继承AbstractQueue,Queue所以只有add(e),remove,offer(e),poll.主要的方法.

    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

放入到Queue整个过程.
1.获取尾结点t,设置p=t,q为t的下一个结点,判断q是否为空.如果为空,说明为最后的结点,使用CAS更新p的next node,
如果成功,返回true;如果失败说明next node已经被更新,那么执行循环从1开始执行;如果成功执行2.
2.判断p和q是否为同一对象,如果不是使用CAS更新tail(并发时可能出现),更新失败说明已经有tail了.不管成功失败都返回true.
3.如果q不为null 并且p == q,如果tail没有改变,那么jump to head,重新检查,继续执行1
4.如果q不为null 并且p <> q,也就是说此时尾节点后面还有元素,那么就需要把尾节点往后移,继续执行1.

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

从Queue取item整个过程。
1.p = head,如果p.item不过空,那么通过CAS将p.item更新为null,如果更新成功返回item。
2.如果p.item为空,判断p.next是否为空,如果为空更新head为p。
3.如果p.next不为空,那么p = q(p.next).重新执行1。
4.如果p.next不为空,当p==q说明1中更新没有成功,重新执行1。

    public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

获取队列大小的过程,由于没有一个计数器来对队列大小计数,所以获取队列的大小只能通过从头到尾完整的遍历队列,代价是很大的。所以通常情况下ConcurrentLinkedQueue需要和一个AtomicInteger搭配来获取队列大小。

需要说明的是,对尾节点的tail的操作需要换成临时变量t和s,一方面是为了去掉volatile变量的可变性,另一方面是为了减少volatile的性能影响。
分享到:
评论

相关推荐

    ConcurrentLinkedQueue源码分析.rar

    《并发编程:深入剖析ConcurrentLinkedQueue》 在Java并发编程领域,`ConcurrentLinkedQueue`是一个非常重要的数据结构,它是线程安全的无界队列,基于链接节点实现,性能高效。本篇文章将深入探讨`...

    LinkedBlockingQueue 和 ConcurrentLinkedQueue的区别.docx

    《LinkedBlockingQueue与ConcurrentLinkedQueue的比较与应用》 在Java并发编程中,队列是一种重要的数据结构,尤其在多线程环境下的任务调度和数据传递中扮演着关键角色。LinkedBlockingQueue和...

    并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

    ### 并发队列 ConcurrentLinkedQueue 和阻塞队列 LinkedBlockingQueue 用法详解 #### 一、并发队列 ConcurrentLinkedQueue 概述 `ConcurrentLinkedQueue` 是 Java 并发包 `java.util.concurrent` 提供的一个高性能...

    自己动手让springboot异步处理浏览器发送的请求(只需要使用ConcurrentLinkedQueue即可)

    本文将详细介绍如何使用Java中的`ConcurrentLinkedQueue`数据结构,结合Spring Boot的特性,实现异步处理浏览器发送的请求。 首先,`ConcurrentLinkedQueue`是Java并发包`java.util.concurrent`中的一个无界线程...

    聊聊并发(6)ConcurrentLinkedQueue的

    【标题】:“聊聊并发(6)ConcurrentLinkedQueue的实现原理分析” 【正文】: 并发编程是现代软件开发中的重要组成部分,特别是在多核处理器和分布式系统中,有效地处理并发能够显著提升系统的性能和响应能力。...

    Java 多线程与并发(15-26)-JUC集合- ConcurrentLinkedQueue详解.pdf

    ### Java多线程与并发(15-26)-JUC集合-ConcurrentLinkedQueue详解 #### 一、ConcurrentLinkedQueue概述 `ConcurrentLinkedQueue`是Java实用工具包(J.U.C)中的一个高性能线程安全队列,主要用于解决多线程环境下...

    Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理

    【Java并发集合与ConcurrentLinkedQueue详解】 在Java并发编程中,集合类的线程安全性是至关重要的。本文将深入探讨Java并发集合中的ConcurrentLinkedQueue,这是一个无界线程安全队列,特别适合处理高并发场景。 ...

    Java 线程 ? ConcurrentLinkedQueue

    ConcurrentLinkedQueue  在考虑并发的时候可以先考虑单线程的情况,然后再将并发的情况考虑进来。  比如ConcurrentLinkedQueue:  1、先考虑单线的offer  2、再考虑多线程时候的offer:  · 多个线程offer...

    线程安全的jdbc连接池

    在这个简单的实现中,我们利用了`ConcurrentLinkedQueue`数据结构来确保多线程环境下的安全性和效率。 首先,让我们了解什么是JDBC连接池。JDBC连接池(Java Database Connectivity Connection Pool)是数据库资源...

    Android代码-MutilDialogManger

    随着项目的不断迭代,加上产品经理大法(这里加一个弹窗提示,...为了防止多个线程同时操作DialogManager中的queue对象,所以我们采用线程安全的ConcurrentLinkedQueue,这里简单的介绍下ConcurrentLinkedQueue实现和数

    Android eventbus

    private final ConcurrentLinkedQueue&lt;Object&gt; eventQueue = new ConcurrentLinkedQueue(); private ConcurrentHashMap, ConcurrentLinkedQueue&lt;Subscriber&gt;&gt; subscribersMap = new ConcurrentHashMap(); public...

    实战Concurrent-BlockQueue

    本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`LinkedBlockingQueue`这三种实现,并分析它们的设计原理与应用场景。 首先,我们来看`ConcurrentLinkedQueue`。它是基于非阻塞算法(CAS,...

    使用-Java-构造高可扩展应用

    使用 Java 构造高可扩展应用需要遵循一些简单规则和工具,例如使用 LockFreeQueue 替换标准库中的 ConcurrentLinkedQueue,检测锁的使用和冲突,等等。本文提供了一些实用的经验和建议,帮助开发人员提高 Java 多...

    concurrent-1.3.4.jar

    并发控制:concurrent包提供了一些线程安全的集合类,如ConcurrentHashMap、ConcurrentLinkedQueue等,可以在多线程环境下安全地对集合进行操作,而无需手动添加同步机制。 原子操作:concurrent包提供了一些原子...

    Java 常见并发容器总结

    - **`ConcurrentLinkedQueue`** : 高效的并发队列,使用链表实现。可以看做一个线程安全的 `LinkedList`,这是一个非阻塞队列。 - **`BlockingQueue`** : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口...

    并发容器和线程池,java并发编程3

    `ConcurrentLinkedQueue`是一个基于链接节点的无界线程安全队列,它遵循先进先出(FIFO)原则。队列的插入操作发生在队列的尾部,而删除操作发生在队列的头部。 **特点**: - **无界队列**:理论上可以无限存储元素,...

    无锁队列

    在Java中,一个著名的无锁队列实现是`ConcurrentLinkedQueue`,它是Java并发包`java.util.concurrent`的一部分。这个队列基于循环队列的设计,利用了链表节点的CAS操作来实现元素的插入和移除。在`...

    Java数据结构实现之Queue.zip

    4. **ConcurrentLinkedQueue**:`java.util.concurrent.ConcurrentLinkedQueue` 是一个线程安全的无界队列,基于链接节点的非阻塞算法实现。它在多线程环境下表现出色,插入和删除操作的时间复杂度均为O(1)。 5. **...

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    Java并发编程常见知识点源码集锦,涉及到对象锁,Executors多任务线程框架,线程池等... ConcurrentLinkedQueue、DelayQueue示例、自定义的线程拒绝策略、自定义线程池(使用有界队列)、自定义线程池(使用无界队列)。。。

Global site tag (gtag.js) - Google Analytics