`

java阻塞队列BlockingQueue--LinkedBlockingQueue

阅读更多

前言

 

java中的阻塞队列,主要用于在多线程环境解决生产者和消费者问题。也就是说有多个生产者、多个消费者并行执行的情况下,如何保证队列的线程安全问题,是BlockingQueue的主要职责。

 

试想下,如果我们如果不用阻塞队列,而是使用一个LinkedList作为消息存储队列。在多线程情况下生产者从头部放入队列,消费者从尾部取出队列中的数据时,需要对LinkedList的头尾进行加锁;如果要实现阻塞可以利用条件队列Condition在队列为空时阻塞消费者线程,在队列满时阻塞生产者线程。

 

这是一种通用的做法,java API已经把这种通用的做法做成工具类方便大家使用,避免每次都需要自己开发。比如上述场景其实就是LinkedBlockingQueue的核心实现流程,假如我们直接使用LinkedBlockingQueue来实现生产者和消费者问题,就会简单很多,所有的加锁以及条件队列操作都被封装到LinkedBlockingQueue的具体实现中:

public class BlockingQueueTest {
    public static void main(String[] args) {
        BlockingQueue<String> q = new LinkedBlockingQueue();
        Producer p = new Producer(q,"生产者1");
        Consumer c1 = new Consumer(q,"消费者1");
        Consumer c2 = new Consumer(q,"消费者2");
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}
 
class Producer implements Runnable {
  private final BlockingQueue<String> queue;
  private String name;
 
    public Producer(BlockingQueue<String> queue, String name) {
        this.queue = queue;
        this.name = name;
    }
 
   public void run() {
     try {
         while (true) {
             Thread.sleep(1000);//1秒生成一个
             queue.put(produce());
         }
     } catch (InterruptedException ex) {
 
     }
   }
   String produce() {
       Random  random = new Random();
       String p = random.nextInt(1000)+"";
       System.out.println(name+"生成:"+p);
       return p;
   }
 }
 
 class Consumer implements Runnable {
     private final BlockingQueue<String> queue;
     private String name;
 
     public Consumer(BlockingQueue<String> queue, String name) {
         this.queue = queue;
         this.name = name;
     }
 
    public void run() {
      try {
        while (true) { consume(queue.take()); }
      } catch (InterruptedException ex) {
 
      }
    }
    void consume(Object x) {
        System.out.println(name+"消费:"+x);
    }
 }
 

 

这里只模拟了一个生产者线程和两个消费者线程,使用的队列是LinkedBlockingQueueBlockingQueue只是定了接口,在java API中有如下几个实现类:ArrayBlockingQueue(数组 有界)、DelayQueue(延迟 无界)、LinkedBlockingQueue(链表 无界)、PriorityBlockingQueue(优先 无界)、SynchronousQueueLinkedTransferQueue(jdk 1.7引入)LinkedBlockingDeque(双端 无界)、DelayedWorkQueue

 

可以根据业务的需要选择不同的BlockingQueue。在上述示例中直接使用的new Thread来开启线程,在实际工作中 我们不会很次都new Thread,因为每次开启和关闭线程的开销都很大。一般我们使用线程池。

 

BlockingQueue与线程池

 

简单的理解阻塞队列,就是把数据“安全的”从一个线程转移到另一个线程。这就像工厂的流水线一样,一个机器只做一件事情 比如打包,做好之后放到“转送带”上;下一个机器从传送带上取出包裹贴标签,完成后又放到传送带上。这里的传送带就可以理解为阻塞队列,做事的机器可以理解为线程,可以看到有的线程即是生产者又是消费者

 

我们编程也是同样的道理,首先把一个大任务拆分成几个小任务,然后创建对应的几组线程,每组线程只处理一个小任务,待所有的小任务完成后,整个大任务就完成了:



 

这样做的好处是什么呢?首先职责更清晰,每个线程池只做一件事情,便于维护;其次,根据具体情况,可以动态的把资源分配到不同的线程池上;最后,任务没有前后依赖的任务可以并行执行。缺点就是,需要有扎实的多线程并发编程基础。

 

文章开头的示例中,我们是直接new Thread创建的线程,这种做法不便于线程的重复利用。其实java的线程池技术,从根本上讲就是对生产者消费者问题的封装。java线程池ThreadPoolExecutor的构造方法中就有个BlockingQueue参数(关于ThreadPoolExecutor,详见这里),用于指定线程池的队列,调用ThreadPoolExecutorexecute方法,本质上调用的是BlockingQueueoffer方法往队列中添加数据;当线程执行任务时,是调用BlockingQueuetake方法获取任务执行。也就是说线程池即提供了生产者接口(调用该接口的线程是生产者),又是消费者。

 

当生产者生产的速度大于消费者消费的速度时,就会产生数据积压,这些积压的数据都会存到BlockingQueue中。所以使用无界队列作为线程池的队列时要注意,如果生产速度过快,队列有可能占用所有jvm的内存 导致系统崩溃。所以在生产环境下,应该尽量少的使用无界队列创建线程池,建议直接使用ThreadPoolExecutor的构造方法自己创建。另外,Executors4个静态方法在生产环境中如果不知道生产者和消费者的生成和消费速度的情况下,尽量少用:newFixedThreadPool使用的是LinkedBlockingQueue无界队列;newCachedThreadPool虽然使用的不是无界队列,但创建线程数量没有限制,同样会出现内存耗尽的情况;newScheduledThreadPool使用的DelayedWorkQueue同样是无界队列;newSingleThreadExecutor使用的LinkedBlockingQueue无界队列。

 

另外,SpringThreadPoolTaskExecutorThreadPoolExecutor进行了包装,可以手动指定队列长度,建议在使用spring框架的系统中使用(其实就是实现了对ThreadPoolExecutor构造方法7个参数的配置化)。

 

LinkedBlockingQueue类实现原理

 

构造方法

使用LinkedBlockingQueue的无参构造方法,默认队列容量为:231次方-1,可以理解为无界队列;但如果使用待参数的构造方法,会创建指定容量的队列。也就是LinkedBlockingQueue也可以作为有界队列使用,但必须带参数:

 

//final成员,只能被构造方法赋值一次,不能更改
private final int capacity;
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
 

 

LinkedBlockingQueue本质上是一个单向链表,从其成员变量,以及Node内部类就可以看出:

 
transient Node<E> head;//头结点
private transient Node<E> last;//尾结点
//节点类
static class Node<E> {
        E item;//节点数据
        Node<E> next;//下一个结点
        Node(E x) { item = x; }
    }

 

可以看出这是典型的单向链表结构。LinkedBlockingQueue的核心方法就是以线程安全的方式操作这个单向链表。另外 要实现线程安全就得加锁,要实现阻塞可以使用ConditionLinkedBlockingQueue的另外几个核心成员变量:

 
//队列头部锁,控制取数据端,在take, poll等方法中使用。非公平锁
    private final ReentrantLock takeLock = new ReentrantLock();
 
//队列头部条件队列
private final Condition notEmpty = takeLock.newCondition();
 
    //队列尾部锁,控制放入数据端,在put, offer等方法中使用。非公平锁
    private final ReentrantLock putLock = new ReentrantLock();
 
    //队列尾部条件队列
private final Condition notFull = putLock.newCondition();
 

 

LinkedBlockingQueue的取数据是在队列头部进行,放入数据是在队列尾部进行。为了不存取数据互不干扰,需要对头尾两端分别进行加锁,所以需要两个锁 以及对应的两个条件队列。

 

take方法:

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //当队列为空是,调用await阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();//取出头部节点
            //注意这里的c是从队列取出1个节点之前的队列长度
            c = count.getAndDecrement();
            //如果队列中还有数据,唤醒其他线程的await
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果现在的队列长度为capacity-1,就唤醒头部放入队列阻塞
        if (c == capacity)
            signalNotFull();
        return x;
}
 
private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
//唤醒put时队列已满导致的注释阻塞,详见put方法
            notFull.signal();
        } finally {
            putLock.unlock();
        }
}

 

在理解这个方法时,一定要在多线程的环境下去理解。当队列为空时,多个消费者线程调用take方法,这时多个线程都会被阻塞在notEmpty.await()这一步;此时只能等到put方法,只要put成功一条数据,就应该唤醒上一步take方法中阻塞的一个线程(见put方法实现末尾);一个线程被唤醒后,从队列中取出一条数据,如果队列里还有数据,继续通过notEmpty.signal()唤醒下一个线程继续获取。

 

take方法末尾,如果现在的队列长度为capacity-1,说明刚刚队列是满的,如果是manput方法应该是阻塞的,现在被取走了一条数据,就应该唤醒put方法的一个阻塞线程。

 

put方法

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;//队列长度
        putLock.lockInterruptibly();
        try {
            //当队列长度已满,就阻塞调用put方法的线程
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);//加入队列末尾
            c = count.getAndIncrement();
            //如果容量还没有满,唤醒其他线程继续put
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //put成功后,如果队列中只有1条数据,说明之前队列为空,take方法应该被阻塞,这时应该唤醒一个阻塞在take方法上的线程,可以获取数据了
        if (c == 0)
            signalNotEmpty();
    }
 

 

仔细对比下put方法做的事情刚好与take方法相反。理解了take方法,再结合给出的注释,就很好理解了。

 

offerpoll方法

这两个方法的实现与puttake方法基本相同,唯一的区别是不会产生阻塞:offer方法如果队列已满直接返回falsepoll方法如果队列已满,直接返回null。后续操作交给调用方去处理,一般做法是睡眠一会儿,继续offer或者poll

 

延时版offerpoll方法

这两个方法的实现与puttake方法基本相同,唯一区别在使用条件队列阻塞时,延时的offer或者poll方法使用的是延时的阻塞方法awaitNanos(xxx)

 

peek方法

peek的中文意思是瞟一眼,也就是说这个方法只会返回一个指向头结点的指针,而不会从队列中移除节点,这是与takepoll方法的区别。主要用于判断队列头是不是为空,也可以在外部修改这个节点。具体实现如下:

public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock(); //使用头部锁 加锁
        try {
            Node<E> first = head.next; //创建一个指针指向头结点
            if (first == null)
                return null;
            else
                return first.item;//返回节点数据指针
        } finally {
            takeLock.unlock();
        }
}
 

 

至此LinkedBlockingQueue7个核心方法讲解完毕:puttakeofferpoll、延时版offer、延时版pollpeek

 

总结

 

文章前部分主要是在讲解了BlockingQueue,以及与线程池的关系。后面中断就LinkedBlockingQueue实现过程进行了分析,对应其他实现类,后面有时间再一一分析。

 

 

 

 

  • 大小: 43.2 KB
0
0
分享到:
评论

相关推荐

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...

    java 中 阻塞队列BlockingQueue详解及实例

    Java中的阻塞队列BlockingQueue是一种并发编程中常用的工具,它实现了线程间的同步和通信。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;当队列满时,尝试添加元素的...

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    ### 10、阻塞队列BlockingQueue 实战及其原理分析 #### 一、阻塞队列概述 阻塞队列(BlockingQueue)是Java语言中`java.util.concurrent`包下提供的一种重要的线程安全队列。它继承自`Queue`接口,并在此基础上...

    Java并发编程--BlockingQueue.docx

    2. LinkedBlockingQueue:基于链表的有界阻塞队列,默认无大小限制,也可以指定最大容量。 3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列,元素按照自然顺序或自定义比较器进行排序。 4. DelayQueue:...

    阻塞队列阻塞队列阻塞队列

    这些阻塞队列都实现了java.util.concurrent.BlockingQueue接口,提供了如put、take、offer、poll等方法,用于进行元素的插入、移除以及检查操作。它们广泛应用于生产者-消费者模型、线程池的工作队列等并发场景,...

    java中线程队列BlockingQueue的用法

    在Java编程中,`BlockingQueue`(阻塞队列)是一种重要的并发工具,它结合了队列的数据结构和线程同步机制。`BlockingQueue`接口位于`java.util.concurrent`包中,提供了线程安全的数据结构,可以用于实现生产者-...

    blockingqueue-example

    《深入理解Java阻塞队列BlockingQueue:基于blockingqueue-example示例》 在Java编程中,阻塞队列(BlockingQueue)是一种线程安全的数据结构,它在并发编程中扮演着重要角色。本文将深入探讨 BlockingQueue 的概念...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

    `LinkedBlockingQueue` 同样是 `java.util.concurrent` 包下的一个线程安全的阻塞队列实现,它继承自 `AbstractQueue` 并实现了 `BlockingQueue` 接口。`LinkedBlockingQueue` 的特点是可以在队列满时阻塞生产者线程...

    java模拟阻塞队列

    Java中的阻塞队列实现主要依赖于`java.util.concurrent`包下的几个类,如`BlockingQueue`接口、`ArrayBlockingQueue`、`LinkedBlockingQueue`等。`BlockingQueue`接口定义了一组操作,如`put`、`take`、`offer`等,...

    详解Java阻塞队列(BlockingQueue)的实现原理

    "详解Java阻塞队列(BlockingQueue)的实现原理" Java阻塞队列(BlockingQueue)是Java.util.concurrent包下重要的数据结构,提供了线程安全的队列访问方式。BlockingQueue的实现原理主要是基于四组不同的方法用于...

    java阻塞队列实现原理及实例解析.docx

    在Java中,自Java 5.0起,`java.util.concurrent`包提供了多种阻塞队列的实现,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等。这些类都实现了`java.util.concurrent.BlockingQueue`...

    java阻塞队列实现原理及实例解析

    Java阻塞队列实现原理及实例解析 Java阻塞队列是一种特殊的队列,它能够在队列为空或满时阻塞线程,使得线程之间能够更好地协作和通信。阻塞队列的实现原理是基于锁机制和条件变量机制的,通过wait和notify方法来...

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    基于Java的实现通常会利用阻塞队列(BlockingQueue)和生产者消费者模型来确保线程安全和高效率。在这个框架中,生产者负责生成任务或消息,而消费者则负责处理这些任务或消息。 ### Java 阻塞队列 Java阻塞队列是...

    支持多线程和泛型的阻塞队列

    在Java中,`java.util.concurrent`包提供了多种阻塞队列实现,如`ArrayBlockingQueue`, `LinkedBlockingQueue`等。它们都实现了`BlockingQueue`接口,提供了一套线程安全的方法来添加和移除元素,如`put()`, `take()...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...

    java并发工具包详解

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. ...

    BlockingQueue队列自定义超时时间取消线程池任务

    首先,`BlockingQueue`是一个并发容器,它遵循先进先出(FIFO)原则,具有阻塞性质,当队列满时,生产者线程会被阻塞,直到有消费者取走元素;当队列空时,消费者线程会被阻塞,直到生产者放入新的元素。常用实现如`...

    并发-线程池和阻塞队列

    阻塞队列(BlockingQueue)是Java并发包(java.util.concurrent)中的一个重要数据结构,它实现了队列的特性,同时具备线程安全的特性。当队列满时,添加元素的线程会被阻塞,直到队列有空位;当队列为空时,取出...

Global site tag (gtag.js) - Google Analytics