`
wang7839186
  • 浏览: 42053 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

BlockQueue之LinkedBlockingQueue源码解析

阅读更多

     最近在研究blockqueue的源码,从今天开始,和大家分享一下我看源码的一些心得体会

     (1)LinkedBlockingQueue源码解析

     (2)ArrayBlockingQueue源码解析

    

     LinkedBlockingQueue实现了BlockingQueue接口以及Serializable接口,是有序的FIFO队列,构造函数中,可传入一个最大容量值,如果没有传入,则默认是Integer.MAX_VALUE

    一 首先看一下重要的几个类变量:

 

 

/** 保存当前队列中元素的个数 */
    private final AtomicInteger count = new AtomicInteger(0);

    /**
     * 头元素
     * Invariant: head.item == null
     */
    private transient Node<E> head;

    /**
     * 尾元素
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** 消费者锁,Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 使消费者线程等待,直到被唤醒或者打断 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 生产者锁,Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 使生产者线程等待,直到被唤醒或者打断 */
    private final Condition notFull = putLock.newCondition();

 

二 put方法

 

 

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
              while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

   

执行过程如下:

   1 如果传入元素为空,抛出空指针异常

   2 获得put的锁,以及原子的count,然后lock,注意,是可打断的lock
   3 判断当前队列是否饱和,若饱和,生产者线程进入等待状态
   4 如果队列不饱和,则将元素包装为一个node放到队列中
   5 count+1,如果count+1仍然小于队列的最大容量,则生产者线程被唤醒
   6 在finnally中释放锁,最后唤醒消费者,提醒消费者可以从队列中取对象了
 
三 offer方法
   offer提供了两个方法,一个方法是可传入等待时间,另一个则没有
 
   
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
 
执行过程如下:
1 如果传入元素为空,抛出空指针异常
2 根据传入的timeout和unit计算出最长等待的时间
获得put的锁,以及原子的count,表示当前队列中的元素个数,然后lock,注意,是可打断的lock
4 如果队列饱和,则超过等待时间后,直接返回false
5 以下处理构成同put
 
另外一个不带参数的方法,如果判断队列饱和,直接返回false,不阻塞
 
四 take方法
 
 
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
 
执行过程如下:
获得put的锁,以及原子的count,表示当前队列中的元素个数,然后lock,注意,是可打断的lock
2 如果当前队列为空,则阻塞
3 如果非空,则元素出列,count-1,如果count>1,表示队列非空,则消费者线程被唤醒
4 在finally中释放lock,唤醒生产者生产

 

五 pool方法

 

 

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

 

执行过程如下:

1 获得put的锁,以及原子的count,表示当前队列中的元素个数,然后lock,注意,是可打断的lock
2 如果当前队列为空,则线程阻塞,并等待指定时间,如果在指定时间还是空,则直接返回空

 

3 以下过程同take
 
pool也提供了不带参数的方法,表示如果队列为空,则直接返回null,不阻塞
 
六 peek
 
public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
 
不阻塞,不移除队首元素
 
根据以上源码分析,得出方法之间的异同:
(1) 生产者方法:put,offer
(2)消费者方法:take,poll,peek
(3)put方法中,如果队列始终饱和,则当前线程会一直等待,直到有对象出列
     offer(可传入等待时间),如果队列饱满,会等待指定的时间,如果在指定时间内还饱满,则直接返回false
     offer(没有参数),如果队列饱满,直接返回false,线程不等待
     put和offer(可传入等待时间)都是可被打断的

 (4)take在队列为空时,会始终阻塞

          poll分为带等待时间和不带的,如果不带等待时间,则不阻塞,移除队首元素(FIFO)
          peek是不移除队首元素,不阻塞
0
5
分享到:
评论

相关推荐

    linkedblockingqueue

    《LinkedBlockingQueue深度解析与应用实践》 在Java并发编程领域,`LinkedBlockingQueue`是一个不可或缺的工具。作为`java.util.concurrent`包中的一个线程安全的队列,它基于链表结构实现,具备先进先出(FIFO)的...

    LinkedBlockingQueue + 单向链表基本结构

    在分析`SingleLinked.java`源码时,我们可以看到具体如何实现这些概念。文件中的类可能定义了一个简单的单向链表节点,包括节点的数据成员(如数据item和指向下一个节点的引用next)以及可能的辅助方法,例如插入新...

    LinkedBlockingQueue 和 ConcurrentLinkedQueue的区别.docx

    《LinkedBlockingQueue与ConcurrentLinkedQueue的比较与应用》 在Java并发编程中,队列是一种重要的数据结构,尤其在多线程环境下的任务调度和数据传递中扮演着关键角色。LinkedBlockingQueue和...

    JDK容器学习之Queue:LinkedBlockingQueue

    接下来,我们详细解析`LinkedBlockingQueue`的阻塞实现原理: 1. 入队操作`put(E e)`:如果队列已满(`count.get() == capacity`),入队操作会获取`putLock`并进入等待状态,直到`notFull`条件被满足(即队列中有...

    并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解

    "并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解" ArrayBlockingQueue和LinkedBlockingQueue是Java并发容器中两个常用的阻塞队列实现,分别基于数组和链表存储元素。它们都继承自AbstractQueue类...

    并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

    ### 并发队列 ConcurrentLinkedQueue 和阻塞队列 LinkedBlockingQueue 用法详解 #### 一、并发队列 ConcurrentLinkedQueue 概述 `ConcurrentLinkedQueue` 是 Java 并发包 `java.util.concurrent` 提供的一个高性能...

    实战Concurrent-BlockQueue

    《实战Concurrent-BlockQueue》 在Java并发编程领域,`Concurrent-BlockQueue`是一个重要的数据结构,它结合了线程安全与高效性能。本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`...

    ArrayBlockingQueue源码解析-动力节点共

    这时,可以通过调整公平锁、容量大小、以及使用其他的并发数据结构如LinkedBlockingQueue来优化性能。 6. 应用场景:ArrayBlockingQueue常用于生产者-消费者模型,如多线程环境中的任务调度、消息传递等。它的固定...

    LinkedBlockingQueuejava.pdf

    《LinkedBlockingQueue in Java》 Java中的`LinkedBlockingQueue`是`java.util.concurrent`包下的一种线程安全的阻塞队列,它是基于链表结构实现的,具有很好的性能表现。这种队列在多线程环境下的并发操作中被广泛...

    BlockQueue练习

    - `LinkedBlockingQueue`: 基于链表的阻塞队列,大小可变,初始容量默认为Integer.MAX_VALUE。 - `PriorityBlockingQueue`: 优先级阻塞队列,元素按照自然顺序或自定义比较器排序。 - `DelayQueue`: 延迟元素插入...

    详细分析Java并发集合LinkedBlockingQueue的用法

    详细分析Java并发集合LinkedBlockingQueue的用法 Java并发集合LinkedBlockingQueue是Java并发集合中的一种阻塞队列实现,使用链表方式实现的阻塞队列。LinkedBlockingQueue的实现主要包括链表实现、插入和删除节点...

    元素唯一的LinkedBlockingQueue阻塞队列

    《元素唯一的LinkedBlockingQueue阻塞队列》 在Java并发编程中,`LinkedBlockingQueue`是一种基于链表结构的阻塞队列,它在多线程环境下的性能表现优秀,常用于实现生产者消费者模型。这个队列的一个关键特性是其...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    java中LinkedBlockingQueue与ArrayBlockingQueue的异同

    这两种队列在很多方面都有相似之处,但在实现机制和性能特性上存在一些差异。 **相同点** 1. **接口实现**:两者都是`BlockingQueue`接口的实现,提供了`put`、`take`等阻塞操作,使得生产者线程在队列满时会被...

    28个java常用的类库源码

    源码解析能帮助你理解日期时间计算的复杂性。 6. **反射**: java.lang.reflect包中的类(如Class、Constructor、Method)用于运行时获取类的信息和动态调用方法。源码分析有助于理解Java的元数据机制。 7. **...

    Java线程池,正式上线运行的源码,分享欢迎使用并提意见

    本文将深入探讨Java线程池的原理、使用方式以及其源码解析。 首先,Java线程池主要由`java.util.concurrent.ThreadPoolExecutor`类实现。线程池的核心参数包括核心线程数(corePoolSize)、最大线程数...

    完全解析Android多线程中线程池ThreadPool的原理和使用

    5. **工作队列** (workQueue):用于存放待执行任务的队列,如ArrayBlockingQueue、LinkedBlockingQueue等。 6. **线程工厂** (threadFactory):创建新线程的工厂,用于定制线程的创建行为。 线程池的工作逻辑如下:...

    TOMCAT的线程池源码

    《深入解析Tomcat线程池源码》 Tomcat作为一款广泛应用的开源Servlet容器,其在性能优化上有着独到之处。线程池是Tomcat处理并发请求的关键组件,它负责调度和管理线程,有效地提高了服务器的响应速度和并发处理...

Global site tag (gtag.js) - Google Analytics