`
ftj20003
  • 浏览: 132350 次
  • 性别: Icon_minigender_1
  • 来自: ...
社区版块
存档分类
最新评论

多线程基础总结十--LinkedBlockingQueue

    博客分类:
  • Java
阅读更多
    随着多线程基础总结的增多,却明显的感觉知道的越来越少,好像转了一圈又回到了什么都不懂的起点。不过还是试着介绍一下队列的并发实现,努力尽快的驱散迷雾。队列这个数据结构已经很熟悉了,利用其先进先出的特性,多数生产消费模型的首选数据结构就是队列。对于有多个生产者和多个消费者线程的模型来说,最重要是他们共同访问的Queue是线程安全的。JDK中提供的线程安全的Queue的实现还是很丰富的:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,DelayQueue,ConcurrentLinkedQueue等等,多数情况下使用这些数据结构编写并发程序足够了。ArrayBlockingQueue的实现之前的总结中已经有介绍,所以这次是分析一下LinkedBlockingQueue和ConcurrentLinkedQueue的源码实现。

    首先从简单的开始,先看看LinkedBlockingQueue线程安全的实现。之所以介绍它是因为其实现比较典型,对比ArrayBlokcingQueue使用一个ReentrantLock和两个Condition维护内部的数组来说,它使用了两个ReentrantLock,并且分别对应一个Condition来实现对内部数据结构Node型变量的维护。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;

    /**
     * 节点数据结构
     */
    static class Node<E> {
        /** The item, volatile to ensure barrier separating write and read */
        volatile E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    /** 队列的容量 */
    private final int capacity;

    /** 持有节点计数器 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 头指针 */
    private transient Node<E> head;

    /** 尾指针 */
    private transient Node<E> last;

    /** 用于读取的独占锁*/
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 队列是否为空的条件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 用于写入的独占锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 队列是否已满的条件 */
    private final Condition notFull = putLock.newCondition();

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    private void insert(E x) {
        last = last.next = new Node<E>(x);
    }

    private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    private void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    private void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
   ...
}

这里仅仅展示部分源码,主要的方法在后面的分析中列出。分析之前明确一个最基本的概念。天天念叨着编写线程安全的类,什么是线程安全的类?那就是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(改变,遍历,查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类的编写不是线程安全的。

    明确了这个基本的概念就可以很好的理解这个Queue的实现为什么是线程安全的了。在LinkedBlockingQueue的所有共享的全局变量中,final声明的capacity在构造器生成实例时就成了不变量了。而final声明的count由于是AtomicInteger类型的,所以能够保证其操作的原子性。剩下的final的变量都是初始化成了不变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的

    首先从上面的源码可以看到insert(E x),extract()是真正的操作head,last来入队和出对的方法,但是由于是私有的,所以不能被直接访问,不用担心线程的问题。实际入队的公开的方法是put(E e),offer(E e)和offer(E e, long timeout, TimeUnit unit)。put(...)方法与offer(...)都是把新元素加入到队尾,所不同的是如果不满足条件put会把当前执行的线程扔到等待集中等待被唤醒继续执行,而offer则是直接退出,所以如果是需要使用它的阻塞特性的话,不能直接使用poll(...)。
   
    put(...)方法中加入元素的操作使用this.putLock来限制多线程的访问,并且使用了可中断的方式:
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count; //----------------a
        putLock.lockInterruptibly();//随时保证响应中断 //--------b
        try {
            //*****************************(1)*********************************
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
           //*****************************end*********************************
            insert(e);//真正的入队操作
           //********************(2)**********************
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
            //******************end**********************
        } finally {
            putLock.unlock();
        } //-------------------------c
        if (c == 0) //---------------d
            signalNotEmpty();
}

代码段(1)是阻塞操作,代码段(2)是count递增和唤醒等待的操作。两者之间的insert(e)才是入队操作,其实际是操作的队尾引用last,并且没有牵涉到head。所以设计两个锁的原因就在这里!因为出队操作take(),poll()实际是执行extract()仅仅操作队首引用head。增加了this.takeLock这个锁,就实现了多个不同任务的线程入队的同时可以进行出对的操作,并且由于两个操作所共同使用的count是AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。假设count换成int,则相应的putLock内的count++和takeLock内的count--有可能相互覆盖,最终造成count的值被腐蚀,故这种设计必须使用原子操作类。
      我之前说过,保证类的线程安全只要保证head和last的操作的线程安全,也就是保证insert(E x)和extract()线程安全即可。那么上面的put方法中的代码段(1)放在a,b之间,代码段(2)放在c,d之间不是更好?毕竟锁的粒度越小越好。单纯的考虑count的话这样的改变是正确的,但是await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的那个线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。而这两段代码中包含notFull.await()和notFull.signal()这两句使得(1),(2)必须放在lock保护块内。这里说明主要是count本身并不需要putLock或者takeLock的保护,从  
public int size() {
        return count.get();
}

可以看出count的访问是不需要任何锁的。而在put等方法中,其与锁机制的混用很容易造成迷惑。最后put中的代码d的作用主要是一个低位及时通知的作用,也就是队列刚有值试图获得takeLock去通知等待集中的出队线程。因为c==0意味着count.getAndIncrement()原子递增成功,所以count > 0成立。类似作用的代码:
if (c == capacity)
       signalNotFull();

在take和poll中也有出现,实现了高位及时通知。

    分析完了put,对应的offer,take,poll方法都是类似的实现。下面看看遍历队列的操作:
public Object[] toArray() {
        fullyLock();
        try {
            int size = count.get();
            Object[] a = new Object[size];
            int k = 0;
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = p.item;
            return a;
        } finally {
            fullyUnlock();
        }
}

这个方法很简单主要是要清楚一点:这个操作执行时不允许其他线程再修改队首和队尾,所以使用了fullyLock去获取putLock和takeLock,只要成功则可以保证不会再有修改队列的操作。然后就是安心的遍历到最后一个元素为止了。

    另外在offer(E e, long timeout, TimeUnit unit)这个方法中提供了带有超时的入队操作,如果一直不成功的话,它会尝试在timeout的时间内入队:
for (;;) {
     ...//入队操作
     if (nanos <= 0)
         return false;
     try {
          nanos = notFull.awaitNanos(nanos);
     } catch (InterruptedException ie) {
           notFull.signal(); // propagate to a non-interrupted thread
           throw ie;
     }
}

其内部循环使用notFull.awaitNanos(nanos)方法反复的计算剩余时间的大概值用于实现延时功能。nanos<=0则放弃尝试,直接退出。

    整体而言,LinkedBlockingQueue的实现还是很清晰的。相对于后面要介绍的ConcurrentLinkedQueue来说,它属于简单的实现。这些看似复杂的数据结构的实现实质都是多线程的基础的综合应用。就好像数学中千变万化的难题其实都是基础公式的组合一样,如果有清晰的基础认知,还是能找到自己分析的思路的。本来是想从mina中找找类似的实现,不过很遗憾的是它好像仅仅实现了一个非线程安全的循环队列,然后在其基础上使用synchronized进行封装成线程安全的Queue。
3
1
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    IBM多线程编程基础教程

    1. **线程基础**(IBM多线程教程一-线程基础.doc): - 线程的概念:线程是操作系统分配CPU时间的基本单元,一个进程中可以有多个线程同时执行。 - 创建线程:通过实现`Runnable`接口或继承`Thread`类来创建线程...

    思维导图-多线程进阶总结02

    本文将围绕“思维导图-多线程进阶总结02”的主题,深入探讨线程间的通信、生产者消费者问题以及如何处理冻结状态的线程。 一、线程间的通信 线程间通信是多线程编程中必不可少的一部分,它确保了共享资源的有效...

    Java 多线程 设计模式

    在Java编程中,多线程是一项关键特性,它允许程序同时执行多个任务,极大地提高了程序的效率和响应性。设计模式则是解决特定问题的成熟、可重用的解决方案,是软件设计中的宝贵经验总结。当将多线程与设计模式结合时...

    Java多线程与高并发入门到精通-视频教程网盘链接提取码下载.txt

    综上所述,《Java多线程与高并发入门到精通》是一门全面涵盖Java多线程编程基础及高级技术的课程。无论您是初学者还是有一定基础的开发者,都能够从中获得宝贵的知识和经验。通过学习这门课程,您将能够掌握多线程的...

    java-Thread-study-summary.zip_java 多线程

    Java多线程是Java编程中的核心概念,它允许程序同时执行多个任务,...通过阅读“java多线程编程总结.pdf”,读者将全面了解Java多线程的概念、实现方式、同步机制以及如何处理并发问题,为实际项目开发打下坚实基础。

    Java线程学习和总结

    - **synchronized关键字**:用于控制多线程对共享资源的访问,保证同一时刻只有一个线程可以访问特定代码块。 - **wait(), notify(), notifyAll()**:这些方法用于线程间通信,配合synchronized使用,改变线程的...

    Java互联网架构多线程并发编程原理及实战 视频教程 下载.zip

    Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载4.zip

    Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载2.zip

    Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载3.zip

    Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....

    Java互联网架构多线程并发编程原理及实战 视频教程 下载1.zip

    Java互联网架构多线程并发编程原理及实战 视频教程 下载 1-1 课程简介.mp4 1-2 什么是并发编程.mp4 1-3 并发编程的挑战之频繁的上下文切换.mp4 1-4 并发编程的挑战之死锁.mp4 1-5 并发编程的挑战之线程安全....

    Java多线程与并发库高级应用

    #### 一、Java多线程基础 在深入探讨Java多线程与并发库的高级应用之前,我们首先需要回顾一下Java多线程的基础概念和技术要点。 ##### 1.1 线程的概念 在计算机科学中,线程是操作系统能够进行运算调度的最小...

    Java 线程总结

    - 阻塞队列和阻塞栈:如`ArrayBlockingQueue`和`LinkedBlockingQueue`,它们在多线程协作中起到数据交换的作用。 - 原子量(Atomic类):提供原子操作,保证在并发环境下的数据一致性。 - 障碍器(CyclicBarrier...

    多线程BUFFER的实现JAVA

    **一、多线程基础** 1. **线程的创建与启动**:Java通过`Thread`类或者`Runnable`接口来创建线程。你可以直接继承`Thread`类并重写`run()`方法,或者实现`Runnable`接口并提供`run()`方法,然后将`Runnable`对象传...

    java多线程编程起步

    本文将深入探讨Java多线程编程的基础知识,包括线程的创建、同步、通信以及相关实例。 一、线程的创建 在Java中,有两种主要的创建线程的方式: 1. 继承Thread类:创建一个新的类,继承自Thread类,然后重写它的...

    多线程面试专题及答案.pdf

    程同步的基础面试问题,主要考察...总结起来,这些面试问题涵盖了多线程编程的核心概念,包括线程同步、锁机制、并发控制、异常处理以及线程状态的管理。熟练掌握这些知识点对于成为一名专业的Java并发程序员至关重要。

    JAVA 基础知识总结(手写)

    `LinkedHashSet`保持了元素的插入顺序,而`LinkedBlockingQueue`是线程安全的队列,遵循先进先出(FIFO)原则,并且在多线程环境中保证了操作的顺序性。 `Vector`是较旧的线程安全的列表实现,但其性能较低,因为它...

    java并发编程-超级大全整理

    在多核处理器、对称多处理机(SMP)以及芯片级多处理(CMP)或同时多线程(SMT)处理器的系统中,多线程技术尤其显著地提高了处理器的利用率。 #### 1.1 多线程使用场景 - **后台任务**:例如定时发送邮件、定期...

    Java中实现多线程关键词整理(总结)

    总结来说,Java中实现多线程的关键在于选择合适的方式(`Runnable`/`Callable`)、利用`ExecutorService`和线程池(`ThreadPoolExecutor`)进行任务调度,以及使用`Future`或`FutureTask`来处理任务结果。...

Global site tag (gtag.js) - Google Analytics