`
rxin2009
  • 浏览: 17418 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

认识ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue

 
阅读更多

本文的主要内容是对jdk并发包中ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue类的代码解析,主要对比他们的不同。

 

前面两个类都是生产者消费者模型的实现,性能都不错,这毕竟都是大师的杰作。先来认识下ArrayBlockingQueue,这个类的内部数据结构是一个Object数组,通过ReentrantLock控制同步操作,由这个锁派生出两个条件对象,如下代码

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

 

这是非常经典的条件操作的应用。下面看下put方法的实现

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }


 private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

它的思路是:如果队列满,等待,否则入队操作,并发出队列不空的信号。

      以上的思路似乎很合理,但是仔细想想还可以改进:只要在队列为空的时候,入队成功才需发信号说队列不空了;还有一点就是,入队成功后,如果队列依然不满,可以唤醒正在等待的入队操作。这两点都是可以考虑的。

 

以上提到的在LinkedBlockingQueue中得到了实现,它的内部数据结构是一个单向链表,通过两个ReentrantLock来控制同步操作,下面是put方法的实现

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            insert(e);
            c = count.getAndIncrement();
            if (c + 1 < capacity) // 标记1
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)  // 标记2
            signalNotEmpty();
    }

 private void insert(E x) {
        last = last.next = new Node<E>(x);
    }

上述代码中的两个标记处就实现了前面提到的两个不足。

take()方法的对比类似。

 

下面再来说说ConcurrentLinkedQueue,这个类和上面两个有本质的不同,它的内部采用的是非阻塞算法实现,性能比前两个都要好(书上说的,用机会测试下),这个类中采用的原子类是域更新器,看下代码更清楚

 private static final
        AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
        tailUpdater =
        AtomicReferenceFieldUpdater.newUpdater
        (ConcurrentLinkedQueue.class, Node.class, "tail");
    private static final
        AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
        headUpdater =
        AtomicReferenceFieldUpdater.newUpdater
        (ConcurrentLinkedQueue.class,  Node.class, "head");

Node节点中还有两个

 private static class Node<E> {
        private volatile E item;
        private volatile Node<E> next;

        private static final
            AtomicReferenceFieldUpdater<Node, Node>
            nextUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Node.class, "next");
        private static final
            AtomicReferenceFieldUpdater<Node, Object>
            itemUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Object.class, "item");

        Node(E x) { item = x; }

        Node(E x, Node<E> n) { item = x; next = n; }

分别是用来更新队列的头结点、尾节点以及每个节点的item和next。

下面看下它的代码

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> n = new Node<E>(e, null);
        for (;;) {
            Node<E> t = tail;
            Node<E> s = t.getNext();
            if (t == tail) {
                if (s == null) {
                    if (t.casNext(s, n)) {
                        casTail(t, n);
                        return true;
                    }
                } else {
                    casTail(t, s);// 标记1
                }
            }
        }
    }

    public E poll() {
        for (;;) {
            Node<E> h = head;
            Node<E> t = tail;
            Node<E> first = h.getNext();
            if (h == head) {
                if (h == t) {
                    if (first == null)
                        return null;
                    else
                        casTail(t, first);// 标记2
                } else if (casHead(h, first)) {
                    E item = first.getItem();
                    if (item != null) {
                        first.setItem(null);
                        return item;
                    }
                    // else skip over deleted item, continue loop,
                }
            }
        }
    }

代码中的关键问题就是操作的原子性:单个原子操作当然是原子性的,但两个原子操作合起来就不是原子的了。比如在入队操作中要加入节点n,在更新tail节点的next成功后,tail节点指向n不一定会成功;这就需要在其他的操作中弥补这一“过错”,这在offer和poll中都有考虑的,如上述代码中的标记1和标记2。

分享到:
评论

相关推荐

    Java容器.xmind

    container Collection 标记: 顶级接口 List 标记: interface ArrayList 标记: class ...ArrayBlockingQueue ...LinkedBlockingQueue ...LinkedBlockingQueue 链表结构实现,无界队列(默认上限Integer.MAX_VALUE)

    实战Concurrent-BlockQueue

    本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`LinkedBlockingQueue`这三种实现,并分析它们的设计原理与应用场景。 首先,我们来看`ConcurrentLinkedQueue`。它是基于非阻塞算法(CAS,...

    【2018最新最详细】并发多线程教程

    20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之ScheduledThreadPoolExecutor 23.FutureTask基本操作总结 24.Java中atomic包中的原子操作类...

    并发容器的原理,7大并发容器详解、及使用场景

    它们分别基于链表、数组和优先级堆实现,其中 ArrayBlockingQueue 和 LinkedBlockingQueue 使用 ReentrantLock 和 Condition 实现线程安全,PriorityBlockingQueue 则是一个按优先级排序的队列。 在选择并发容器时...

    java多线程与并发1

    1. 并发集合改进:如ArrayBlockingQueue、LinkedBlockingQueue等,提供了并发访问和线程安全的性能提升。 2. ForkJoinPool:Java 7引入的新特性,用于实现分治策略,提高并行计算能力。 七、实战应用 1. 高并发场景...

    Java并发编程进阶练习代码

    Java 5引入了`java.util.concurrent`包,其中`BlockingQueue`接口和其实现如`ArrayBlockingQueue`、`LinkedBlockingQueue`等提供了更高级的线程间通信方式,它们可以用于生产者-消费者模式或者工作窃取模式。...

    Java数据结构实现之Queue.zip

    例如,如果需要高效的并发操作,可以选择`ConcurrentLinkedQueue`或`LinkedBlockingQueue`;如果对元素的顺序有特定要求,可以使用`PriorityQueue`;对于一般用途且对性能要求较高,`ArrayDeque`是不错的选择。 ...

    Java并发程序设计+并发

    - **BlockingQueue**:一种线程安全的数据结构,用于线程间的数据交换,如ArrayBlockingQueue、LinkedBlockingQueue等。 4. **并发工具类**: - **CountDownLatch**:一次性计数器,用于线程同步,等待其他线程...

    并发编程二进阶部分.rar

    Java的`java.util.concurrent`包提供了多种阻塞队列实现,如ArrayBlockingQueue、LinkedBlockingQueue和PriorityBlockingQueue等。这些示例代码可能包含如何创建、插入、删除和操作阻塞队列的实例。 2. **卖票的...

    java队列Java系列2021.pdf

    阻塞队列在Java并发包java.util.concurrent中提供了多种实现,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue和SynchronousQueue等,每种阻塞队列都根据其特性适用于不同的场景。...

    14个Java并发容器,你用过几个?.docx

    9. **LinkedBlockingQueue**: 类似于ArrayBlockingQueue,但它基于链表结构,无固定容量限制,但是添加和获取元素的操作仍然有阻塞效果。 10. **LinkedBlockingDeque**: 双端阻塞队列,可以像LinkedBlockingQueue...

    javabitset源码-JerrySoundCode:杰瑞声码

    java bitset源码Java源码分析 ...ArrayBlockingQueue(done) LinkedBlockingDeque (done) LinkedBlockingQueue (done) PriorityBlockingQueue (done) ConcurrentHashMap (done) ConcurrentLinkedQueue (done) C

    多线程并发集合资料.zip

    - `BlockingQueue`:阻塞队列,如`ArrayBlockingQueue`和`LinkedBlockingQueue`,在队列满或空时,线程会被阻塞,实现生产者-消费者模式。 - `CopyOnWriteArrayList` 和 `CopyOnWriteArraySet`:在读多写少的场景...

    architect-awesome:初步架构师技术图谱

    双向队列:ArrayBlockingQueue(有界),LinkedBlockingQueue(无界),DelayQueue,PriorityBlockingQueue,采用锁机制;使用ReentrantLock锁。 集合 链表,纹理 字典,关联数组 栈 Stack是线程安全的。 内部...

    java 多线程 队列工厂

    常见的`BlockingQueue`实现包括`ArrayBlockingQueue`、`LinkedBlockingQueue`和`PriorityBlockingQueue`等。 ### 5. 并发集合 Java并发库(`java.util.concurrent`包)提供了一系列优化的线程安全集合,如`...

    多线程 队列利用

    3. **并发容器**:Java的`java.util.concurrent`包提供了多种并发队列,如`ArrayBlockingQueue`、`LinkedBlockingQueue`和`ConcurrentLinkedQueue`等,它们为多线程环境提供了高效的队列操作。 4. **工作窃取算法**...

    javabitset源码-java_master:后端架构师技术图谱

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    高级java笔试题-architect-awesome:后端架构师技术图谱

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    亚信java笔试题-Book:架构师之路

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    java简易投票系统源码下载-study-way:学习路线

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

Global site tag (gtag.js) - Google Analytics