`

读ConcurrentLinkedQueue

阅读更多
//这是一个无阻塞的队列没有加任何锁全部利用CAS机制实现。效率极高。
//先看构造函数
public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
	    //元素不可以为null
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
	    //如果头节点为空
            if (h == null)
                h = t = newNode;
            else {
	        //设为下一个节点
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

//添加元素
public boolean add(E e) {
        return offer(e);
    }

 public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
	    //当前p是尾节点
            if (q == null) {
		//利用cas设置值
                if (p.casNext(null, newNode)) {
                    //这意味着一个线程已经设置了tail.next。相当于每两次更新下tail
                    if (p != t) 
		        //将newNode设置为尾节点
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
	    //在poll的时候设置了哨兵,正好遇到哨兵了。
            else if (p == q)
               
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
		//寻找尾节点。这里有一个优化假如tail已经被修改直接定位到tail作为最后一个节点。
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }


//获取并移除一个节点
public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                 //如果item有值并且将item设置为null成功
                if (item != null && p.casItem(item, null)) {
                    //这和offer思路一样
                    if (p != h) 
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
		//没有了
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
		//遇到哨兵节点了重新循环
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

//只获取但是不移除
public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
		    //设置p为头结点,也就是过滤p前面的空节点。
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

//判断队列是否为空
public boolean isEmpty() {
        return first() == null;
    }

//找到第一个不为null的节点。并且设置为头节点。
Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

//获取元素个数
public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }


final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

//包含某个元素和size的实现差不多
public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && o.equals(item))
                return true;
        }
        return false;
    }

//移除一个元素
public boolean remove(Object o) {
        if (o == null) return false;
        Node<E> pred = null;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null &&
                o.equals(item) &&
                p.casItem(item, null)) {
                Node<E> next = succ(p);
                if (pred != null && next != null)
                    pred.casNext(p, next);
                return true;
            }
            pred = p;
        }
        return false;
    }

//新增全部
 public boolean addAll(Collection<? extends E> c) {
        if (c == this)
            throw new IllegalArgumentException();

        // Copy c into a private chain of Nodes
	//将c变为链表结构
        Node<E> beginningOfTheEnd = null, last = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (beginningOfTheEnd == null)
                beginningOfTheEnd = last = newNode;
            else {
                last.lazySetNext(newNode);
                last = newNode;
            }
        }
        if (beginningOfTheEnd == null)
            return false;

        // Atomically append the chain at the tail of this collection
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
	    //如果当前p正好是队尾
            if (q == null) {
                // p is last node
		//CAS设置值
                if (p.casNext(null, beginningOfTheEnd)) {
                    //可能队尾已经被其他线程设置过了
                    if (!casTail(t, last)) {
                        //再次尝试
                        t = tail;
                        if (last.next == null)
                            casTail(t, last);
                    }
                    return true;
                }
                
            }
            else if (p == q)
              
                p = (t != (t = tail)) ? t : head;
            else
               
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

  public Object[] toArray() {
        
        ArrayList<E> al = new ArrayList<E>();
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null)
                al.add(item);
        }
        return al.toArray();
    }

//这个实现和其他集合的实现不太一样。我个人觉得是因为计算size有开销。所以这样写提升效率。
 public <T> T[] toArray(T[] a) {
       
        int k = 0;
        Node<E> p;
        for (p = first(); p != null && k < a.length; p = succ(p)) {
            E item = p.item;
            if (item != null)
                a[k++] = (T)item;
        }
	//如果a的length>队列的length
        if (p == null) {
            if (k < a.length)
                a[k] = null;
            return a;
        }

        //如果a的length<队列的length
        ArrayList<E> al = new ArrayList<E>();
        for (Node<E> q = first(); q != null; q = succ(q)) {
            E item = q.item;
            if (item != null)
                al.add(item);
        }
        return al.toArray(a);
    }

//迭代器
private class Itr implements Iterator<E> {
       
        private Node<E> nextNode;

        private E nextItem;

        private Node<E> lastRet;

        Itr() {
            advance();
        }

       
        private E advance() {
            lastRet = nextNode;
            E x = nextItem;

            Node<E> pred, p;
            if (nextNode == null) {
                p = first();
                pred = null;
            } else {
                pred = nextNode;
                p = succ(nextNode);
            }

            for (;;) {
                if (p == null) {
                    nextNode = null;
                    nextItem = null;
                    return x;
                }
                E item = p.item;
                if (item != null) {
                    nextNode = p;
                    nextItem = item;
                    return x;
                } else {
                    // skip over nulls
                    Node<E> next = succ(p);
                    if (pred != null && next != null)
                        pred.casNext(p, next);
                    p = next;
                }
            }
        }

        public boolean hasNext() {
            return nextNode != null;
        }

        public E next() {
            if (nextNode == null) throw new NoSuchElementException();
            return advance();
        }

        public void remove() {
            Node<E> l = lastRet;
            if (l == null) throw new IllegalStateException();
            
            l.item = null;
            lastRet = null;
        }
    }
分享到:
评论

相关推荐

    数据结构与算法分析 Java语言描述 读书笔记

    - 并发工具类:如ConcurrentHashMap、ConcurrentLinkedQueue等,为多线程环境下的数据结构操作提供了安全支持。 5. **工具应用**: - 源码分析:通过阅读Java标准库中数据结构和算法的源码,可以深入理解其实现...

    JUC最详细思维导图,一次了解读写锁,可重入锁,Cas原理,volatile 关键字原理

    `ConcurrentLinkedQueue`是一个无界线程安全队列,适合多生产者多消费者场景。`CopyOnWriteArrayList`是线程安全的动态数组,适用于读多写少的情况,通过复制底层数组实现并发访问,保证了读操作的高性能。 阻塞...

    并发容器和线程池,java并发编程3

    - **读操作不加锁**:读操作直接从原始数组中读取数据。 - **写操作创建副本**:写操作会复制原始数组,并在副本上进行修改,最后将引用指向新的副本。 - **适合读多写少的场景**:因为写操作涉及到创建副本,所以在...

    Java从同步容器到并发容器的操作过程

    `ConcurrentLinkedQueue`是一个无界的线程安全队列,基于链接节点实现,吞吐量通常高于`ArrayBlockingQueue`。`LinkedBlockingDeque`是双端队列,也支持线程安全的插入、删除和查看操作。 这些并发容器的设计目标是...

    数据结构面试题及正确答案

    - **并发Queue**:`ConcurrentLinkedQueue`是高性能的无锁队列,`BlockingQueue`接口提供了阻塞队列,如`LinkedBlockingQueue`,适用于生产者-消费者模型。 3. **并发锁与同步机制**: - **重入锁 ReentrantLock*...

    多线程并发集合资料.zip

    - `ConcurrentLinkedQueue`:无界并发队列,基于链接节点实现,内部使用CAS(Compare And Swap)操作保证线程安全。 - `BlockingQueue`:阻塞队列,如`ArrayBlockingQueue`和`LinkedBlockingQueue`,在队列满或空...

    多线程编程.docx

    - **ReadLock与WriteLock**:`ReentrantReadWriteLock`接口提供了读写锁,允许多个读线程同时访问共享资源,但在写操作时则不允许其他读或写线程访问。 - **StickyLock与Condition**:`StickyLock`是Java并发库中的...

    java模拟数据库事务

    在模拟数据库事务时,可能会用到这些机制来管理事务间的隔离级别,如读未提交(Read Uncommitted)、读已提交(Read Committed)、可重复读(Repeatable Read)和串行化(Serializable)。 4) **Java设计模式**:在...

    Java容器.xmind

    写有锁,读无锁,读写之间不阻塞,优于读写锁 写入时先copy一个容器副本,再添加新元素,最后替换引用 copy的容器副本过大时,速度慢,不易使用 CopyOnWriteArraySet 底层使用CopyOnWriteArrayList实现 使用...

    14个Java并发容器,你用过几个?.docx

    3. **CopyOnWriteArraySet**: 基于CopyOnWriteArrayList构建的并发Set,每次添加元素都需要遍历整个集合以检查元素是否已存在,因此适合小规模且读多写少的场景。 4. **ConcurrentLinkedQueue**: 这是一个基于链表...

    java_Thread.rar_java 多线程_java多线程

    在实际开发中,还需要考虑线程安全问题,例如避免脏读、幻读和不可重复读等并发问题。Java提供了一系列的并发工具类,如Atomic系列、ConcurrentHashMap、ConcurrentLinkedQueue等,帮助开发者编写高效、安全的多线程...

    java编发编程:JUC综合讲解

    2. **并发集合类(Concurrent Collections)**:JUC库提供了一系列线程安全的集合类,例如ConcurrentHashMap、ConcurrentLinkedQueue等,这些类在多线程环境下可以保证数据的一致性和完整性,避免竞态条件。...

    并发容器的原理,7大并发容器详解、及使用场景

    4. ConcurrentLinkedQueue 是一个基于链接节点的无界并发队列,它使用了链表结构实现 FIFO(先进先出),并且不使用锁,而是依赖于 CAS 操作。 5. LinkedBlockingQueue、ArrayBlockingQueue 和 ...

    Java并发程序设计+并发

    - **CopyOnWriteArrayList/CopyOnWriteArraySet**:写时复制策略,读操作非常高效,适用于读多写少的情况。 6. **原子变量**: - **AtomicInteger、AtomicLong等**:提供原子性的操作,如自增、自减等,避免`...

    优秀博文汇总1

    `ConcurrentLinkedQueue` 是一个基于链接节点的无界并发队列,它使用了 CAS 操作来保证线程安全。`FutureTask` 是一个可以代表异步计算结果的任务,它实现了`RunnableFuture`接口,可以被提交到线程池执行。 ...

    Java语言中非阻塞算法的实现.zip

    Java并发库还提供了一些线程安全的数据结构,如`ConcurrentLinkedQueue`、`LinkedBlockingQueue`等,它们在内部使用了非阻塞算法,可以在高并发场景下高效地工作。 理解并熟练掌握非阻塞算法在Java中的实现,对于...

    集合类资料

    `CopyOnWriteArrayList`在写操作时会创建底层数组的副本,确保读操作不受写操作的影响;`ConcurrentLinkedQueue`是一个无界的并发队列,基于链接节点实现,提供了高并发性能。 9. **集合转换与集合工具类**:`...

    数据结构面试专题.docx

    - **并发Queue**:`ConcurrentLinkedQueue`使用无锁算法实现,适用于高并发场景。`BlockingQueue`则用于生产者-消费者模式,当队列满时会阻塞生产者。 - **并发Deque**:`LinkedBlockingDeque`在读写时没有进行读写...

    Java并发编程:设计原则与模式(第二版)-3PDF

    - **线程安全的集合**:如`ConcurrentHashMap`、`ConcurrentLinkedQueue`等,内部实现了线程安全的算法。 - **CopyOnWriteArrayList/CopyOnWriteArraySet**:适合读多写少的场景,写操作时复制底层数据结构。 7. ...

Global site tag (gtag.js) - Google Analytics