`
zhaomingzm_23
  • 浏览: 33546 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Java并发包探秘 (一) ConcurrentLinkedQueue

阅读更多
本文是Java并发包探秘的第一篇,旨在介绍一下Java并发容器中用的一些思路和技巧,帮助大家更好的理解Java并发容器,让我们更好的使用并发容器打造更高效的程序。本人能力有限,错误难免。希望及时指出。

Java并发包中有很多精心设计的高并发容器。有ConcurrentHashMapConcurrentSkipListMapConcurrentLinkedQueue等。ConcurrentLinkedQueue就是其中设计最为优雅的高并发容器。它被设计成了无锁的、无界的、非阻塞式的单向链表结构。现在就让我们来一步一步揭开他们神秘的面纱。

G+

正文开始:

一说到链表结构,我们首先就会想到的就是组成链表结构的原件,那就是节点。或者有的人称之为元素。ConcurrentLinkedQueue(为了叙述方便后面用CLQ指代)中称之为Node.

我们先来看看CLQ中Node的结构:

private static class Node<E> {
        private volatile E item;
        private volatile Node<E> next;

        private static final
            AtomicReferenceFieldUpdater<Node, Node>
            nextUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Node.class, "next");
        private static final
            AtomicReferenceFieldUpdater<Node, Object>
            itemUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Object.class, "item");

        Node(E x) { item = x; }

        Node(E x, Node<E> n) { item = x; next = n; }

        E getItem() {
            return item;
        }

        boolean casItem(E cmp, E val) {
            return itemUpdater.compareAndSet(this, cmp, val);
        }

        void setItem(E val) {
            itemUpdater.set(this, val);
        }

        Node<E> getNext() {
            return next;
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return nextUpdater.compareAndSet(this, cmp, val);
        }

        void setNext(Node<E> val) {
            nextUpdater.set(this, val);
        }

    }


1.CLQ中的Node定义成了私有的静态类说明该节点描述只适用在CLQ中。
2.其中用到了AtomicReferenceFieldUpdater原子属性引用原子更新器。该类是抽象的,但该类的内部已经给出了包访问控制级别的一个实现AtomicReferenceFieldUpdaterImpl,原理是利用反射将一个被声明成 volatile 的属性通过Java native interface (JNI)调用,使用cpu指令级的命令将一个变量进行更新,该操作是原子的。atomic 包中还有很多类似的更新器分别针对不同的类型进行原子级别的比较更新原子操作。这里用到了sun.misc.Unsafe 的 compareAndSwap操作(简称CAS)它有三种不同的本地命令分别针对Int、Long、Object进行原子更新操作。
3.我们可以看出CLQ中的Node结构是一个单向的链表结构,因为每个Node只有一个向后的next和一个item用来装内容。CLQ将通过casNext和casItem方法来原子更新Node链的结构。setNext 和setItem则是直接放入

我们再来看CLQ的链结构

private static final
        AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
        tailUpdater =
        AtomicReferenceFieldUpdater.newUpdater
        (ConcurrentLinkedQueue.class, Node.class, "tail");
    private static final
        AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
        headUpdater =
        AtomicReferenceFieldUpdater.newUpdater
        (ConcurrentLinkedQueue.class,  Node.class, "head");

    private boolean casTail(Node<E> cmp, Node<E> val) {
        return tailUpdater.compareAndSet(this, cmp, val);
    }

    private boolean casHead(Node<E> cmp, Node<E> val) {
        return headUpdater.compareAndSet(this, cmp, val);
    }


    /**
     * Pointer to header node, initialized to a dummy node.  The first
     * actual node is at head.getNext().
     */
    private transient volatile Node<E> head = new Node<E>(null, null);

    /** Pointer to last node on list **/
    private transient volatile Node<E> tail = head;


1.实际上经过对Node的分析。CLQ中的头尾指针的更新原理其实也是一样的。都是通过cpu原子操作命令进行的更新。

2.这样我们就有了在高并发下原子更新的基础支持,但是除了原子更新的支持是不够的。原因很简单,这是因为当多个线程同时使用原子更新操作来更新一个链表结构的时候只有一个成功其它的都会失败。失败的操作如何再让它成功才是问题的关键。CLQ优雅的解决了这一问题。

我们再来看看CLQ的放入元素操作:
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> n = new Node<E>(e, null);
        for (;;) {                                   //1
            Node<E> t = tail;                        //2
            Node<E> s = t.getNext();                 //3
            if (t == tail) {                         //4
                if (s == null) {                     //5
                    if (t.casNext(s, n)) {           //6
                        casTail(t, n);               //7
                        return true;                 //8
                    }
                } else {
                    casTail(t, s);                   //9
                }
            }
        }
    }


在有锁得情况下我们只要让获得锁得线程更新,其它线程等待即可解决并发更新的问题,但是在上述的单向链表结构中有更好的无锁解决方法。

1.代码1 啊! 死循环,对,就是利用反复轮询的重复一段逻辑操作。
2.代码2 代码3 先用两个临时变量指向CLQ的尾和尾的下一个节点。这样有什么好处?直接用tail和tail.getNext不行吗?我们说了。这是一个无锁得方法。可能有多个线程同时执行到代码3处,因为临时变量是每线程的而tail是公共的。这样成功执行到代码3的线程都有自己当时的临时CLQ队列结构引用。为后面的判断做好准备。
3.开始判断 代码4 证明 在代码2代码4之间没有被其它线程修改过,因为有可能已经被修改了。那么这时进入新的轮询。
4.代码5 在看代码5之前我们先要明确一个概念就是把一个Node放入一个CLQ队列有两步操作。第一步是tail的next指向新的节点。第二步是tail指向新的节点。代码5 先判断是不是有线程已经在完成加入一个节点的第一步,如果是就帮助它完成第二步,再次进入循环。如果没有线程已经完成第一步。那就自己来完成插入节点的第一步,当然就是调用casNext比较更新的原子操作。上文已经讲过。再来完成插入元素的第二步,以上逻辑由代码6、代码7完成。注意看代码8 恒为真? 为什么?自己调用casTail如果成功返回真毫无疑问。如果失败为什么也返回真?答案很简单,这是因为如果失败说明一定有其它线程进入了代码9 帮自己完成了插入一个节点的第二步操作。所以自己操作肯定是失败的。所以也返回真。

从上面的代码分析可以看出打造一个无锁得并发容器处处都要十分小心。这也是CLQ的高明之处。

我们再来看看删除一个元素的代码:

public E poll() {
        for (;;) {
            Node<E> h = head;                          //1
            Node<E> t = tail;                          //2
            Node<E> first = h.getNext();               //3
            if (h == head) {                           //4
                if (h == t) {                          //5
                    if (first == null)                 //6
                        return null;                   //7
                    else
                        casTail(t, first);             //8
                } else if (casHead(h, first)) {        //9
                    E item = first.getItem();          //10
                    if (item != null) {                //11
                        first.setItem(null);           //12
                        return item;                   //13
                    }
                    // else skip over deleted item, continue loop,
                }
            }
        }
    }

1.同样是轮询,当h!=head的时候继续循环。因为在代码1代码4之间已经有其它线程删除了头元素。从而造成h != head.
2.代码5 是否是空的CLQ。
3.如果是空的CLQ判断头得下一节点是否是null.因为只有时空的才说明没有元素。否则有可能其它线程正在插入元素造成first!=null,这时就帮助其它线程完成为指针更新操作。再继续轮询。
4.如果是非空的CLQ用casHead来原子更新头节点。因为删除一个CLQ的元素是从头开始删除的。如果失败说明有其它线程在删除元素。继续轮询。
5.代码10 如果第一个元素的内容为空说明有线程已经执行到代码12了。所以又开始轮询。
6.只有成功执行到代码13才正真是由当前线程完成了删除一个元素操作。CLQ的peek()操作和poll操作只差代码12的操作,即一个删除元素,一个不删除元素。

在CLQ中迭代器的方法也很精妙:
private E advance() {
            lastRet = nextNode;
            E x = nextItem;

            Node<E> p = (nextNode == null)? first() : nextNode.getNext();
            for (;;) {
                if (p == null) {
                    nextNode = null;
                    nextItem = null;
                    return x;
                }
                E item = p.getItem();
                if (item != null) {
                    nextNode = p;
                    nextItem = item;
                    return x;
                } else // skip over nulls
                    p = p.getNext();
            }
        }


由于CLQ单向链表的特殊性,元素的变化只可能头处删除,在尾处添加。所以使用CLQ的迭代器时元素可能比实际的要多。原因很简单,当你在迭代的时候元素可能已经删除,当然这是你迭代的线程是不可见的。而删除是可见的。


ConcurrentLinkeQueue的其它操作大同小异。都是在不断的轮询中步步判断其它线程的影响,一步一步推进自己的操作逻辑。从而最终完成操作的。
分享到:
评论

相关推荐

    JAVA高并发包介绍

    它是Java并发包中一个线程安全的有序映射表,基于跳表(Skip List)的数据结构实现。ConcurrentSkipListMap通过一种分层的多级链表来维护其内部结构,这使得它在高并发环境下不仅能够保持良好的读写性能,同时还能...

    java5 并发包 (concurrent)思维导图

    Java 5并发包(`java.util.concurrent`,简称`Concurrent`包)是Java平台中用于多线程编程的重要组成部分,它提供了丰富的并发工具类,极大地简化了在多线程环境下的编程工作。这个包的设计目标是提高并发性能,减少...

    ConcurrentLinkedQueue源码分析.rar

    首先,我们要明白`ConcurrentLinkedQueue`是Java并发包`java.util.concurrent`中的一个类,它实现了`Queue`接口,并且遵循FIFO(先进先出)原则。与传统的同步队列如`ArrayBlockingQueue`不同,`...

    Java并发包高级特性1

    对象交换器:当两个线程处理的结果需要交换进行处理的时候,使用并发包中适合多线程情况下的数据结构:非阻塞队列:ConcurrentLinkedQueue Conc

    LinkedBlockingQueue 和 ConcurrentLinkedQueue的区别.docx

    LinkedBlockingQueue和ConcurrentLinkedQueue是Java并发包中两个常用的线程安全队列,它们各有特点,适用于不同的场景。本文将深入探讨两者之间的差异以及如何根据需求选择合适的队列。 首先,LinkedBlockingQueue...

    juc详解juc详解juc详解juc详解juc详解juc详解juc详解

    Java并发编程库(Java Util Concurrency,简称JUC)是Java平台中用于高效并发处理的重要工具包,包含在`java.util.concurrent`包下。JUC提供了丰富的并发原语,如线程池、同步器、并发容器等,极大地简化了多线程...

    java 并发学习总结

    3. **同步工具类**:Java并发包`java.util.concurrent`中的工具类,如`Semaphore`(信号量)、`CyclicBarrier`(回环栅栏)、`CountDownLatch`(倒计时器)和`FutureTask`(未来任务)等,提供了更灵活的线程同步和...

    Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理

    ConcurrentLinkedQueue是Java并发包`java.util.concurrent`下的一个类,它实现了`Queue`接口,并且是一个完全由原子操作构建的无锁队列。队列遵循FIFO(先进先出)原则,这意味着元素会按照它们被插入的顺序被取出。...

    并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

    `ConcurrentLinkedQueue` 是 Java 并发包 `java.util.concurrent` 提供的一个高性能的线程安全队列实现,基于链表结构,它适用于对吞吐量有较高要求的场景。`ConcurrentLinkedQueue` 不提供容量限制,并且在队列为空...

    自己动手让springboot异步处理浏览器发送的请求(只需要使用ConcurrentLinkedQueue即可)

    首先,`ConcurrentLinkedQueue`是Java并发包`java.util.concurrent`中的一个无界线程安全队列,基于链接节点实现,具有很好的性能。它的插入和删除操作都是O(1)的时间复杂度,非常适合用于高并发场景下的任务队列。...

    Java_算法和数据结构的集合.zip

    此外,Java并发包中的ConcurrentLinkedQueue、CopyOnWriteArrayList等类,它们的实现都基于高级的算法设计。 深入理解这些概念不仅可以提升编程技能,还有助于通过技术面试,因为许多公司会在面试过程中考察候选人...

    Java 多线程与并发(15-26)-JUC集合- ConcurrentLinkedQueue详解.pdf

    `ConcurrentLinkedQueue`是Java实用工具包(J.U.C)中的一个高性能线程安全队列,主要用于解决多线程环境下并发访问集合的问题。它基于链表实现,具有非阻塞特性和无界队列的特点。`ConcurrentLinkedQueue`遵循FIFO...

    ConcurrentLinkedQueue

    ConcurrentLinkedQueue

    Java 线程 ? ConcurrentLinkedQueue

    ConcurrentLinkedQueue  在考虑并发的时候可以先考虑单线程的情况,然后再将并发的情况考虑进来。  比如ConcurrentLinkedQueue:  1、先考虑单线的offer ... // 新建一个node final Node&lt;E&gt; newNode = new

    聊聊并发(6)ConcurrentLinkedQueue的

    Java平台为此提供了一系列的并发工具类,其中`ConcurrentLinkedQueue`是一个无界的线程安全队列,它在Java并发编程中有着广泛的应用。本文将深入探讨`ConcurrentLinkedQueue`的实现原理和设计思路。 首先,`...

    java.util.concurrent

    java.util.concurrent总体概览图。 收取资源分3分。需要的同学可以下载一下。 java.util.concurrent主要包括5个部分executor,colletions,locks,atomic...该图详细的列举了并发包下面的结构,包含所有接口和具体实现类。

    Java并发编程ppt.rar

    5. **并发集合**:Java并发包提供了线程安全的集合类,如`ConcurrentHashMap`、`CopyOnWriteArrayList`和`ConcurrentLinkedQueue`等,它们在多线程环境下性能优异。 6. **线程池**:`ExecutorService`、`...

    java并发编程实战源码,java并发编程实战pdf,Java

    《Java并发编程实战》是Java并发编程领域的一本经典著作,它深入浅出地介绍了如何在Java平台上进行高效的多线程编程。这本书的源码提供了丰富的示例,可以帮助读者更好地理解书中的理论知识并将其应用到实际项目中。...

    PDF-Java7ConcurrencyCookbook-英文版.rar

    4. **原子变量**:Java并发包中的Atomic类提供了一组原子操作,如AtomicInteger、AtomicLong等。这些类的使用可以实现无锁编程,提高并发性能。了解其背后的CAS(Compare and Swap)机制,以及如何在实践中应用,是...

    java并发(二十四)多线程结果组装

    6. **并发工具类**:Java并发包提供了许多工具类,如`CountDownLatch`、`CyclicBarrier`和`Semaphore`,它们在多线程结果组装中起到协调和同步的作用。例如,`CountDownLatch`可以用于让主线程等待所有子线程完成后...

Global site tag (gtag.js) - Google Analytics