`

LinkedBlockingQueue源码分析

jdk 
阅读更多
首先Node的数据结构:

static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;  //并没有使用volatile 关键词进行修饰

        Node(E x) { item = x; }
    }


ps:  突然发现学习源码使用jdk7 比较好,没有8 的函数式风格的内容。更方便

LinkedBlockingQueue中的状态信息如下:

/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0); //count 被final 关键字修饰。

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    private transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();



public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();  //确保放入的元素是非空的
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();  //响应中断
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();  //具有阻塞性
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();  //使用了signal() 提高性能,  put的时候对notFull 条件队列中阻塞的线程进行唤醒。
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty(); // put 的时候, 会在c == 0 的情况下对 notEmpty 条件队列中阻塞的线程进行唤醒,当时这个需要获取takeLock锁。
    }


public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }


public boolean offer(E e) {
        if (e == null) throw new NullPointerException(); //同样对放入队列中的元素进行非空检查
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;  //这里可以看出,如果队列满的情况下,直接返回false, 并不会把数据放入到队列中。 剩余的和put() 方法类似。
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }


public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);  // 使用到了策略模式,TimeUnit的使用。
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);  // 支持超时的操作 !!!! 
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)  //如果队列为空直接返回null,
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }



public boolean contains(Object o) {
        if (o == null) return false;
        fullyLock();  // 像contains(Object o) 和 remove(Object o) 这样的操作需要同时获取putLock 和 takeLock 这两个锁。
        try {
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }


ps: 还有一些方法是在AbstractQueue中定义的,这里只是对LinkedBlockingQueue 中使用到的数据结构进行分析!
分享到:
评论

相关推荐

    LinkedBlockingQueue + 单向链表基本结构

    在分析`SingleLinked.java`源码时,我们可以看到具体如何实现这些概念。文件中的类可能定义了一个简单的单向链表节点,包括节点的数据成员(如数据item和指向下一个节点的引用next)以及可能的辅助方法,例如插入新...

    java并发源码分析之实战编程

    "java并发源码分析之实战编程"这个主题深入探讨了Java平台上的并发处理机制,旨在帮助开发者理解并有效地利用这些机制来提高程序性能和可扩展性。在这个专题中,我们将围绕Java并发库、线程管理、锁机制、并发容器...

    ArrayBlockingQueue源码分析.docx

    下面我们将深入分析其主要的实现机制、方法以及源码。 1. **数据结构与容量** `ArrayBlockingQueue` 内部使用一个数组 `items` 来存储元素,因此它的容量在创建时就需要指定,且不可改变。这个容量限制确保了队列...

    Java并发包源码分析(JDK1.8)

    囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue(ArrayBlockingQueue、...

    元素唯一的LinkedBlockingQueue阻塞队列

    《元素唯一的LinkedBlockingQueue阻塞队列》 在Java并发编程中,`LinkedBlockingQueue`是一种基于...对于想要深入理解Java并发编程和数据结构的人来说,分析和实现`UniqueLinkedBlockingQueue`是一个很好的学习机会。

    28个java常用的类库源码

    源码分析有助于掌握数据传输和文件操作的底层机制。 3. **多线程**: Java提供了Thread类和Runnable接口来实现多线程。理解并发控制(如synchronized、volatile、ReentrantLock)和并发工具类(如ExecutorService...

    java队列源码

    4. **源码分析** - 通常,实现线程安全队列的源码会包含如下关键部分: - **插入操作**:确保在队尾添加元素时不会与其他线程的读写操作冲突。 - **移除操作**:在队头移除元素时,确保元素只被一个线程处理一次...

    javabitset源码-JerrySoundCode:杰瑞声码

    bitset源码Java源码分析 基础集合列表 ArrayList (done) Vector (done) LinkedList (done) Stack (done) ReferenceQueue (done) ArrayDeque (done) Set HashSet (done) TreeSet (done) LinkedHashSet (done) BitSet ...

    JavaInterview:最开源的Java技术知识点,以及Java源码分析。为开源贡献自己的一份力

    LinkedBlockingQueue LinkedBlockingDeque :scroll: 主要介绍LeetCode上面的算法译文,以及面试过程中遇到的实际编码问题总结。 :locked: :file_folder: :laptop: :globe_showing_Asia-Australia: :floppy_...

    Java高并发核心编程(卷2)源码

    源码分析可以帮助理解它们如何保证线程安全和并发性能。 5. **原子类**:`Atomic`系列类(如`AtomicInteger`、`AtomicLong`、`AtomicReference`等)提供了无锁编程的支持。源码中可能会展示如何使用原子类实现高效...

    Java多线程设计模式(带源码)

    在这个主题中,我们将深入探讨几个关键的多线程设计模式,以及如何通过源码分析来理解它们的实现。 1. **生产者消费者模式**:这种模式用于在两个线程之间共享资源,一个线程负责生成数据(生产者),另一个线程...

    java concurrent 精简源码

    这个“java concurrent 精简源码”资源很可能会包含上述概念的实际应用示例,通过学习和分析这些代码,你可以深入理解Java并发编程的精髓,并能更好地应用于实际项目中。在研究时,建议结合Java官方文档和相关的书籍...

    喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。.doc

    此时,我们需要深入到JDK源码层面,研究`LinkedBlockingQueue`的`stream`实现以及`tryAdvance`方法与队列遍历的交互。在JDK 8中,`tryAdvance`可能在多线程环境下与队列的内部结构发生冲突,导致意外的阻塞。而后续...

    java多线程设计模式详解(PDF及源码)

    7. **源码分析** - PDF文档和源码结合可以帮助读者深入理解多线程设计模式的实际应用,通过阅读源码可以更好地掌握并发编程的实践技巧。 以上内容涵盖了Java多线程设计模式的各个方面,通过深入学习,开发者可以...

    生产者消费者java源码

    在Java编程中,"生产者消费者"模式是一种经典的多线程设计模式,它主要用于解决并发问题,特别是数据处理和资源管理。...通过分析提供的源码,我们可以深入学习和掌握如何在Java中实现这一模式,以及如何优化并发性能。

    java线程池概念.txt

    这部分代码就不再追踪下去,有兴趣的读者可以自己打开源码分析,不必害怕,学习大神们的编码方式,看源码能让你学习到很多 } } private void runTask(Runnable task) { final ReentrantLock runLock = this....

    Java线程池,正式上线运行的源码,分享欢迎使用并提意见

    源码分析** 阅读源码可以帮助我们更好地理解线程池的内部机制。例如,`ThreadPoolExecutor.execute()`方法是如何处理任务的提交,`ThreadPoolExecutor.shutdown()`和`shutdownNow()`又是如何优雅地关闭线程池。 **...

    多线程面试题及处理方案和详解

    #### 三、ReentrantReadWriteLock源码分析 - **特点**:`ReentrantReadWriteLock`支持读写分离,允许多个读锁并发持有但只有一个写锁可以被持有。这可以显著提高读多写少场景下的并发性能。 - **关键特性**: - **...

    基于Java的源码-P2P源码 Azureus 2.5.0.2(JAVA).zip

    理解如何避免内存泄漏、有效使用对象池和缓存,以及调整JVM参数以优化性能,都是分析源码时需要关注的点。 8. **日志和调试**:Azureus可能使用了如Log4j这样的日志框架,用于记录程序运行状态和错误信息,这对于...

    Queue-Simulation_java_learnvwf_源码.zip

    通过分析源码,我们可以深入理解Java数据结构、多线程编程以及工作流管理,这些对于提升软件开发技能大有裨益。同时,这个项目也鼓励我们思考如何优化服务系统的效率,以应对现实生活中的挑战。

Global site tag (gtag.js) - Google Analytics