`
g21121
  • 浏览: 694578 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java并发之LinkedBlockingQueue

 
阅读更多

        上一篇我们已经学习过了 ArrayBlockingQueue的知识及相关方法的使用,这一篇我们就来再学习一下ArrayBlockingQueue的亲戚 LinkedBlockingQueue。在集合类中 ArrayList与 LinkedList会常常拿来比较,ArrayList内部实现是基于数组的,而 LinkedList内部实现是基于链表,所以他们之间会有很多不同,但是本文不会去重点讨论,感兴趣的朋友可以参考我之前发过的几篇文章,那么有请本节的主角 LinkedBlockingQueue!

        LinkedBlockingQueue

        LinkedBlockingQueue是一个一个基于已链接节点的、范围任意(相对而论)的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。 

        可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。 

        LinkedBlockingQueue及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。 

        我们已经学习过了 ArrayBlockingQueue,所以学习 LinkedBlockingQueue就自然比较轻松,所以本文对于已经明确的相关概念就不做过多介绍了,而是重点放在两者的区别之上。

 

        1.成员变量

        与ArrayBlockingQueue不同 LinkedBlockingQueue的成员变量有些变化,以下是 LinkedBlockingQueue的成员变量:

/** 容量范围,默认值为 Integer.MAX_VALUE */
private final int capacity;

/** 当前队列中元素数量 */
private final AtomicInteger count = new AtomicInteger(0);

/** 头节点 */
private transient Node<E> head;

/** 尾节点 */
private transient Node<E> last;

/** take, poll等方法的锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 获取队列的 Condition(条件)实例 */
private final Condition notEmpty = takeLock.newCondition();

/** put, offer等方法的锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 插入队列的 Condition(条件)实例 */
private final Condition notFull = putLock.newCondition();

        1)首先 LinkedBlockingQueue明确了容量变量,当为指定容量时,默认容量为Int的最大值Integer.MAX_VALUE。

        2)队列元素数量变量 count采用的是 AtomicInteger ,而不是普通的Int型。CAS相关可参考http://286.iteye.com/blog/2295165

        3)LinkedBlockingQueue内部队列实现使用的是 Node节点类,这与 LinkedList类似。

        4)最后也是最重要的一点,那就是获取与插入操作分成了两个锁:takeLock与 putLock来处理,这点下面还会重点分析。

 

        2.构造方法

        有三个构造方法,分别为默认,指定容量,指定容量和初始元素。

/**
 * 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue
 */
public LinkedBlockingQueue() {
	this(Integer.MAX_VALUE);
}

/**
 * 创建一个具有给定(固定)容量的 LinkedBlockingQueue
 */
public LinkedBlockingQueue(int capacity) {
	if (capacity <= 0)
		throw new IllegalArgumentException();
	this.capacity = capacity;
	last = head = new Node<E>(null);
}

/**
 * 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue,
 * 最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
	this(Integer.MAX_VALUE);
	for (E e : c)
		add(e);
}

        默认构造方法创建一个容量为 Integer.MAX_VALUE的 LinkedBlockingQueue实例。

        第二种构造方法,指定了队列容量,首先判断指定容量是否大于零,否则抛出异常。然后为 capacity 赋值,最后创建空节点,并指向 head与 last,两者的 item与 next此时均为 null。

 

 

        最后一种,利用循环向队列中添加指定集合中的元素。

 

        3.Node类

        LinkedBlockingQueue内部列表实现是使用的 Node内部类,Node类也并不复杂,以下是其源代码:

/**
 * 节点类
 */
static class Node<E> {
	/** volatile保障读写分离 */
	volatile E item;
	Node<E> next;

	Node(E x) {
		item = x;
	}
}

        item用于表示元素对象,next指向链表的下一个节点。

 



        LinkedBlockingQueue的大部分方法其实是与  ArrayBlockingQueue类似的,所以本文就只介绍不同于ArrayBlockingQueue的相关方法。

 

        4.添加元素

        1)add方法

        add方法相同就不介绍了,同样调用的是offer方法。

        2)offer方法

        将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。当使用有容量限制的队列时,此方法通常要优于 add 方法,后者可能无法插入元素,而只是抛出一个异常。 

        与ArrayBlockingQueue不同,LinkedBlockingQueue多了一些容量方面的判断。

/**
 * 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量)
 * 在成功时返回 true,如果此队列已满,则返回 false。
 * 当使用有容量限制的队列时,此方法通常要优于 add 方法,
 * 后者可能无法插入元素,而只是抛出一个异常。 
 */
public boolean offer(E e) {
	//判断添加元素是否为null
	if (e == null)
		throw new NullPointerException();
	//第一点不同,使用原子类操作count,因为有两个锁
	final AtomicInteger count = this.count;
	//判断容量,队列是否已满
	if (count.get() == capacity)
		return false;
	int c = -1;
	final ReentrantLock putLock = this.putLock;
	//获取添加锁
	putLock.lock();
	try {
		//再次判断,如果队列未满
		if (count.get() < capacity) {
			//插入元素
			insert(e);
			//增加元素数count
			c = count.getAndIncrement();
			if (c + 1 < capacity)
				//未满则唤醒添加线程
				notFull.signal();
		}
	} finally {
		//释放锁
		putLock.unlock();
	}
	//c等于0说明添加成功
	if (c == 0)
		//唤醒读取线程
		signalNotEmpty();
	return c >= 0;
}

        可以看到offer方法的关键在于 insert方法。

        3)insert方法

         insert方法非常简单,但是却不要小看。

/**
 * 再队尾添加元素
 */
private void insert(E x) {
	last = last.next = new Node<E>(x);
}

        首先,根据指定参数x创建一个Node实例。

        然后,将原尾节点的next指向此节点。

        最后,将尾节点设置尾此节点。

        这样新添加的节点就成为了新的尾节点。



 

        当向链表中添加第一个节点时,因为在初始化时

last = head = new Node<E>(null);

        所以此时 head与 last指向的是同一个对象new Node<E>(null)。

        之后将last.next指向x。

last.next = new Node<E>(x);

        因为此时 head与 last是同一个对象,所以 head.next也指向x。

        最后将 last指向x。

last =  new Node<E>(x);

        这样 head的next就指向了 last。此时head中的 item仍为 null。

 

        4)put方法

        将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。

/**
 * 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用
 */
public void put(E e) throws InterruptedException {
	//判断添加元素是否为null
	if (e == null)
		throw new NullPointerException();
	int c = -1;
	final ReentrantLock putLock = this.putLock;
	final AtomicInteger count = this.count;
	//获取插入的可中断锁
	putLock.lockInterruptibly();
	try {
		try {
			//判断队列是否已满
			while (count.get() == capacity)
				//如果已满则阻塞添加线程
				notFull.await();
		} catch (InterruptedException ie) {
			//失败就唤醒添加线程
			notFull.signal(); 
			throw ie;
		}
		//添加元素
		insert(e);
		//修改c值
		c = count.getAndIncrement();
		//根据c值判断队列是否已满
		if (c + 1 < capacity)
			//未满则唤醒添加线程
			notFull.signal();
	} finally {
		//释放锁
		putLock.unlock();
	}
	//c等于0代表添加成功
	if (c == 0)
		signalNotEmpty();
}

 

        5.获取元素

        1)peek方法

        peek方法获取但不移除此队列的头;如果此队列为空,则返回 null。

/**
 * 获取但不移除此队列的头;如果此队列为空,则返回 null
 */
public E peek() {
	//判断元素数是否为0
	if (count.get() == 0)
		return null;
	final ReentrantLock takeLock = this.takeLock;
	//获取获取锁
	takeLock.lock();
	try {
		//头节点的 next节点即为添加的第一个节点
		Node<E> first = head.next;
		//如果不为空则返回该节点
		if (first == null)
			return null;
		else
			return first.item;
	} finally {
		//释放锁
		takeLock.unlock();
	}
}

        peek方法从头节点直接就可以获取到第一个添加的元素,所以效率是比较高的。如果不存在则返回null。

        2)poll方法

        poll方法获取并移除此队列的头,如果此队列为空,则返回 null。

/**
 * 获取并移除此队列的头,如果此队列为空,则返回 null
 */
public E poll() {
	final AtomicInteger count = this.count;
	//判断元素数量
	if (count.get() == 0)
		return null;
	E x = null;
	int c = -1;
	final ReentrantLock takeLock = this.takeLock;
	//获取获取锁
	takeLock.lock();
	try {
		//再次判断元素数量
		if (count.get() > 0) {
			//调用extract方法获取第一个元素
			x = extract();
			//c=count++
			c = count.getAndDecrement();
			//如果队列中含有元素
			if (c > 1)
				//唤醒读取线程
				notEmpty.signal();
		}
	} finally {
		//释放锁
		takeLock.unlock();
	}
	//如果队列已满
	if (c == capacity)
		//唤醒等待中的添加线程
		signalNotFull();
	return x;
}

        poll与 peek方法不同在于poll获取完元素后移除这个元素,获取与移除是通过 extract()方法实现的。

        注意:其中需要注意的是最后部分代码:

//如果队列已满
if (c == capacity)
	//唤醒等待中的添加线程
	signalNotFull();

        肯定会有朋友有以下疑问:

        1)队列都已经满了,还需要唤醒添加线程干什么?

        2)线程满了就不应该再向里面添加元素了啊?

        3)signalNotFull方法是干什么的?

    signalNotFull方法的作用是唤醒等待中的put线程,signalNotFull只能被 take/poll方法调用,以下是 signalNotFull方法的源代码:

/**
 * 唤醒等待中的put线程,只能被 take/poll方法调用
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    //获取锁
    putLock.lock();
    try {
    	//唤醒添加线程
        notFull.signal();
    } finally {
    	//释放锁
        putLock.unlock();
    }
}

      前两点问题其实转换一下角度就能很好的理解了,虽然队列已经满了,但是此时本线程已经完成了添加,但是其他线程还在等待获取条件进行添加,如果不去主动唤醒的话,那么这些添加操作就只能无限期的等待下去,所以这些等待的添加操作就会失效。所以此时需要唤醒已经排队的添加线程,虽然他们已经无法添加元素至队列。

 

        3)extract方法

        extract方法用于获取并移除头节点。

/**
 * 获取并移除头节点
 */
private E extract() {
	//获取第一个节点,即 head的下一个元素
	Node<E> first = head.next;
	//将head指向此元素
	head = first;
	//获取元素值
	E x = first.item;
	//清除first的item元素为空,即head元素的item为空
	first.item = null;
	//返回
	return x;
}

        这里需要注意的是这里指的头节点并不是 head,而是 head的 next所指 Node的 item元素。因为 head的 item永远为 null。last的 next永远为 null。

        4)take方法

        获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。

/**
 * 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)
 */
public E take() throws InterruptedException {
	E x;
	int c = -1;
	final AtomicInteger count = this.count;
	final ReentrantLock takeLock = this.takeLock;
	//获取可中断锁
	takeLock.lockInterruptibly();
	try {
		try {
			//判断队列是否含有元素
			while (count.get() == 0)
				//没有元素就阻塞获取线程,因为没有元素所以获取线程也就没有必要运行
				notEmpty.await();
		} catch (InterruptedException ie) {
			//失败就唤醒获取线程
			notEmpty.signal(); 
			throw ie;
		}
		//调用 extract方法获取元素
		x = extract();
		//计数c的新值
		c = count.getAndDecrement();
		//如果元素数大于1
		if (c > 1)
			//唤醒获取线程
			notEmpty.signal();
	} finally {
		//释放锁
		takeLock.unlock();
	}
	//如果队列已满
	if (c == capacity)
		//唤醒还在等待的put线程
		signalNotFull();
	return x;
}

        与 poll方法类似,只是take方法采用阻塞的方式来获取元素。

 

        7.其他方法

        1)remainingCapacity方法

/**
 * 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量
 */
public int remainingCapacity() {
    return capacity - count.get();
}

        也就是返回可以立即添加元素的数量。

 

        2)iterator方法

        iterator方法返回在队列中的元素上按适当顺序进行迭代的迭代器。返回的 Iterator 是一个“弱一致”的迭代器,从不抛出 ConcurrentModificationException,并且确保可遍历迭代器构造后所存在的所有元素,并且可能(但并不保证)反映构造后的所有修改。 

/**
 * 返回Itr实例
 */
public Iterator<E> iterator() {
    return new Itr();
}

        iterator方法返回的是一个Itr内部类的实例,通过这个实例可以遍历整个队列。以下是Itr内部类的源代码:

private class Itr implements Iterator<E> {
	//当前节点
	private Node<E> current;
	private Node<E> lastRet;
	//当前元素
	private E currentElement;

	Itr() {
		final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
		final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
		//获取获取与添加锁
		putLock.lock();
		takeLock.lock();
		try {
			current = head.next;
			if (current != null)
				currentElement = current.item;
		} finally {
			takeLock.unlock();
			putLock.unlock();
		}
	}

	public boolean hasNext() {
		return current != null;
	}

	public E next() {
		final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
		final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
		putLock.lock();
		takeLock.lock();
		try {
			if (current == null)
				throw new NoSuchElementException();
			E x = currentElement;
			lastRet = current;
			current = current.next;
			if (current != null)
				currentElement = current.item;
			return x;
		} finally {
			takeLock.unlock();
			putLock.unlock();
		}
	}

	public void remove() {
		if (lastRet == null)
			throw new IllegalStateException();
		final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
		final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
		putLock.lock();
		takeLock.lock();
		try {
			Node<E> node = lastRet;
			lastRet = null;
			Node<E> trail = head;
			Node<E> p = head.next;
			while (p != null && p != node) {
				trail = p;
				p = p.next;
			}
			if (p == node) {
				p.item = null;
				trail.next = p.next;
				if (last == p)
					last = trail;
				int c = count.getAndDecrement();
				if (c == capacity)
					notFull.signalAll();
			}
		} finally {
			takeLock.unlock();
			putLock.unlock();
		}
	}
}

         Itr类不复杂,我就不详细解释了。

        3)清除方法

        clear,drainTo等方法与 ArrayBlockingQueue类似,这里就不说了。

 

        8,.LinkedBlockingQueue与 ArrayBlockingQueue

        1)内部实现不同

        ArrayBlockingQueue内部队列存储使用的是数组:

private final E[] items;

        而 LinkedBlockingQueue内部队列存储使用的是Node节点内部类:

static class Node<E> {
    /** The item, volatile to ensure barrier separating write and read */
    volatile E item;
    Node<E> next;
    Node(E x) { item = x; }
}

 

        2)队列中锁的实现不同

/** LinkedBlockingQueue的获取锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** LinkedBlockingQueue的添加锁 */
private final ReentrantLock putLock = new ReentrantLock();


/** ArrayBlockingQueue的唯一锁 */
private final ReentrantLock lock;

        从源代码就可以看出 ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加与获取使用的是同一个锁;而 LinkedBlockingQueue实现的队列中的锁是分离的,即添加用的是 putLock,获取是 takeLock。

        3)初始化条件不同

        ArrayBlockingQueue实现的队列中必须指定队列的大小。

        LinkedBlockingQueue实现的队列中可以不指定队列的大小,默认容量为Integer.MAX_VALUE。

        4)操作不同

        ArrayBlockingQueue无论是添加还是获取使用的是同一个锁,所以添加的同时就不能读取,读取的同时就不能添加,所以锁方面性能不如 LinkedBlockingQueue。

        LinkedBlockingQueue读取与添加操作使用不同的锁,因为其内部实现的特殊性,添加的时候只需要修改 last即可,而不会影响 head节点。而获取时也只需要修改 head节点即可,同样不会影响 last节点。所以在添加获取方面理论上性能会高于 ArrayBlockingQueue。

        所以 LinkedBlockingQueue更适合实现生产者-消费者队列。

  • 大小: 5.8 KB
  • 大小: 5.4 KB
  • 大小: 6.1 KB
分享到:
评论

相关推荐

    详细分析Java并发集合LinkedBlockingQueue的用法

    详细分析Java并发集合LinkedBlockingQueue的用法 Java并发集合LinkedBlockingQueue是Java并发集合中的一种阻塞队列实现,使用链表方式实现的阻塞队列。LinkedBlockingQueue的实现主要包括链表实现、插入和删除节点...

    Java并发编程实践.pdf

    - **Executor框架**:是Java并发工具包中的核心组件之一,提供了强大的任务管理和执行功能,如定时任务、周期性任务等。 #### 二、Java线程安全问题及解决方案 ##### 2.1 线程安全问题分析 在多线程环境下,如果不...

    JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf

    根据提供的文件信息,“JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf”,我们可以推断出这份文档主要聚焦于Java并发编程的技术实践与理论探讨。下面将从多个角度来解析这个文档可能涵盖的关键知识点。 ...

    java并发编程实践

    ### Java并发编程实践知识点详解 #### 一、Java并发编程基础 ##### 1.1 并发与并行概念区分 在Java并发编程实践中,首先需要理解“并发”与“并行”的区别。“并发”指的是多个任务同时进行,但实际上可能是在多...

    Java并发编程事件 mobi kindle版

    4. **并发工具类**:深入剖析了Java并发工具类的实现原理,如ConcurrentHashMap、LinkedBlockingQueue、ConcurrentSkipListMap等,并提供了实际应用示例。 5. **线程池**:详细解析了Executor框架,包括线程池的...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java并发编程进阶练习代码

    本练习集专注于提升你在Java并发编程中的技巧,通过一系列逐步进阶的代码实例,帮助你掌握从基础到高级的并发概念。 首先,我们从“线程安全”开始。线程安全是指在多线程环境下,一个类或方法能够正确地处理并发...

    Java并发编程实践

    《Java并发编程实践》这本书深入探讨了Java平台上的并发编程技术,涵盖了从基础概念到高级策略的广泛主题。在Java编程中,并发处理是优化性能、提高系统资源利用率的关键手段,尤其是在多核处理器和分布式系统中更为...

    我的java并发PPT

    【标题】"我的java并发PPT"所涉及的知识点主要集中在Java编程语言的并发处理上,这是一门关键且复杂的领域,特别是在多线程环境下的程序设计。在Java中,并发处理是提升程序效率和充分利用多核处理器能力的重要手段...

    java并发工具包

    Java并发工具包是Java平台中用于处理多线程并行计算的重要组件,它包含在`java.util.concurrent`包中。这个工具包提供了多种高级并发工具,使得开发者能够编写出高效、安全、易于理解和维护的多线程代码。下面将详细...

    java并发集合

    在Java编程中,"java并发集合"是一个关键领域,它涉及到多线程环境下如何有效地管理和操作数据集合。Java提供了一系列的并发集合类,使得在并发环境中实现高效且线程安全的数据处理成为可能。这些集合主要存在于`...

    java并发工具包详解

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    JAVA并发编程实践

    《JAVA并发编程实践》这本书是Java并发编程领域的经典之作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowbeer、David Holmes和Doug Lea合著,旨在帮助Java开发者理解和掌握多线程环境下的编程技巧。...

    实战Java高并发程序设计第二版随书代码

    这本书涵盖了Java并发编程的核心概念和技术,旨在帮助开发者在实际项目中高效地处理高并发场景。随书附带的代码提供了丰富的示例,以便读者能够更直观地理解并实践这些理论知识。 1. **Java并发基础** - **线程与...

    java并发编程.docx

    Java并发编程是Java开发中的重要领域,涉及到多线程、线程池以及线程局部变量等概念。在大型系统和高并发环境下,合理地利用这些技术可以极大地提高系统的性能和资源利用率。 一、线程池 线程池是Java并发编程中的...

    java并发源码分析之实战编程

    "java并发源码分析之实战编程"这个主题深入探讨了Java平台上的并发处理机制,旨在帮助开发者理解并有效地利用这些机制来提高程序性能和可扩展性。在这个专题中,我们将围绕Java并发库、线程管理、锁机制、并发容器...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java并发编程的艺术.zip

    《Java并发编程的艺术》这本书是Java开发者深入了解并发编程的重要参考资料。在Java开发中,尤其是在多核处理器和高并发场景下,理解和掌握并发编程是至关重要的。以下是对书中的主要知识点的详细阐述: 1. **Java...

    java多线程并发编程例子

    `TestBlockingQueue.java`可能包含如何使用`ArrayBlockingQueue`、`LinkedBlockingQueue`等实现线程间的通信和数据交换的示例。 3. **CompletionService**:`CompletionService`提供了一种获取异步任务结果的机制,...

Global site tag (gtag.js) - Google Analytics