`

java.util.concurrent 之LinkedBlockingQueue源码分析

阅读更多

   LinkedBlockingQueue类

一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。 

可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。 

 

   这个类的常见用法,应该用过的都应该知道了,就不再举例了,直接进入源码的分析。在分析其他方法前,先看下类变量

	/** The capacity bound, or Integer.MAX_VALUE if none */
	private final int capacity;

	/** Current number of elements */
	private final AtomicInteger count = new AtomicInteger(0);

	/** Head of linked list */
	private transient Node<E> head;

	/** Tail of linked list */
	private transient Node<E> last;

	/** Lock held by take, poll, etc */
	private final ReentrantLock takeLock = new ReentrantLock();

	/** Wait queue for waiting takes */
	private final Condition notEmpty = takeLock.newCondition();

	/** Lock held by put, offer, etc */
	private final ReentrantLock putLock = new ReentrantLock();

	/** Wait queue for waiting puts */
	private final Condition notFull = putLock.newCondition();

 

其中有两个锁和两个条件变量是最重要的,决定了下面的方法实现。。阻塞的效果是靠这些来实现的。具体看下面的方法分析。

 

new 创建对象

	/**
	 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
	 * {@link Integer#MAX_VALUE}. 从类名可以看出它主要是用链表来保存数据的。。下面的分析也可以看出来确实如此
	 * 
	 */
	public LinkedBlockingQueue() {
		// 这里可以看出这个类是有容量限制的,默认是最大容量 int.max
		this(Integer.MAX_VALUE);
	}

	public LinkedBlockingQueue(int capacity) {
		if (capacity <= 0)
			throw new IllegalArgumentException();
		this.capacity = capacity;
		// 这里可以看出链表的头和尾默认都是null节点
		last = head = new Node<E>(null);
	}

	/**
	 * Linked list node class 这里是链表的节点实现了
	 */
	static class Node<E> {
		/** The item, volatile to ensure barrier separating write and read */
		// 这里的volatile变量确保了多线程中内存的一致性
		volatile E item;
		Node<E> next;

		Node(E x) {
			item = x;
		}
	}

 

put方法

	public void put(E e) throws InterruptedException {
		if (e == null)
			throw new NullPointerException();
		// Note: convention in all put/take/etc is to preset
		// local var holding count negative to indicate failure unless set.
		int c = -1;
		final ReentrantLock putLock = this.putLock;
		final AtomicInteger count = this.count;
		// put 加锁
		putLock.lockInterruptibly();
		try {
			/*
			 * Note that count is used in wait guard even though it is not
			 * protected by lock. This works because count can only decrease at
			 * this point (all other puts are shut out by lock), and we (or some
			 * other waiting put) are signalled if it ever changes from
			 * capacity. Similarly for all other uses of count in other wait
			 * guards.
			 */
			try {
				// 判断是否满了,如果满了则notFull 线程等待
				while (count.get() == capacity)
					notFull.await();
			} catch (InterruptedException ie) {
				notFull.signal(); // propagate to a non-interrupted thread
				throw ie;
			}
			// 插入链表
			insert(e);
			// 更新容量
			c = count.getAndIncrement();
			// 没满则唤醒notFull线程
			if (c + 1 < capacity)
				notFull.signal();
		} finally {
			putLock.unlock();
		}
		// 当c==0代表容量刚从空转为非空状态,则唤醒非空线程
		if (c == 0)
			signalNotEmpty();
	}

	/**
	 * Signals a waiting take. Called only from put/offer (which do not
	 * otherwise ordinarily lock takeLock.)
	 */
	private void signalNotEmpty() {
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lock();
		try {
			// 唤醒非空线程
			notEmpty.signal();
		} finally {
			takeLock.unlock();
		}
	}

 

take 方法

	// 这里的加锁方式刚好和put 方法相反。就不多说了。
	public E take() throws InterruptedException {
		E x;
		int c = -1;
		final AtomicInteger count = this.count;
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lockInterruptibly();
		try {
			try {
				while (count.get() == 0)
					notEmpty.await();
			} catch (InterruptedException ie) {
				notEmpty.signal(); // propagate to a non-interrupted thread
				throw ie;
			}

			x = extract();
			c = count.getAndDecrement();
			if (c > 1)
				notEmpty.signal();
		} finally {
			takeLock.unlock();
		}
		if (c == capacity)
			signalNotFull();
		return x;
	}

 

poll 方法

// 这个其实和poll()无参数的方法类似,只是多了for循环的一个计数的功能。poll()就不分析了。
	public E poll(long timeout, TimeUnit unit) throws InterruptedException {
		E x = null;
		int c = -1;
		long nanos = unit.toNanos(timeout);
		final AtomicInteger count = this.count;
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lockInterruptibly();
		try {
			for (;;) {// 这个是为了计时而做的循环
				if (count.get() > 0) {
					x = extract();
					c = count.getAndDecrement();
					if (c > 1)
						notEmpty.signal();
					break;
				}
				if (nanos <= 0)
					return null;
				try {
					// awaitNanos 方法返回的是nanosTimeout
					// 值减去花费在等待此方法的返回结果的时间的估算,相当于就是剩余时间了。
					nanos = notEmpty.awaitNanos(nanos);
				} catch (InterruptedException ie) {
					notEmpty.signal(); // propagate to a non-interrupted thread
					throw ie;
				}
			}
		} finally {
			takeLock.unlock();
		}
		if (c == capacity)
			signalNotFull();
		return x;
	}

 

offer方法

// 这里和poll(long timeout, TimeUnit unit)方法的锁的实现相反。。也不多说了。
	public boolean offer(E e, long timeout, TimeUnit unit)
			throws InterruptedException {

		if (e == null)
			throw new NullPointerException();
		long nanos = unit.toNanos(timeout);
		int c = -1;
		final ReentrantLock putLock = this.putLock;
		final AtomicInteger count = this.count;
		putLock.lockInterruptibly();
		try {
			for (;;) {
				if (count.get() < capacity) {
					insert(e);
					c = count.getAndIncrement();
					if (c + 1 < capacity)
						notFull.signal();
					break;
				}
				if (nanos <= 0)
					return false;
				try {
					nanos = notFull.awaitNanos(nanos);
				} catch (InterruptedException ie) {
					notFull.signal(); // propagate to a non-interrupted thread
					throw ie;
				}
			}
		} finally {
			putLock.unlock();
		}
		if (c == 0)
			signalNotEmpty();
		return true;
	}

 

可以看出LinkedBlockingQueue 类的实现比较简单,也很容易理解,灵活应用了条件变量(Condition),减少了锁的竞争。。   by zhxing

0
1
分享到:
评论

相关推荐

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    java.util.concurrent

    java.util.concurrent总体概览图。 收取资源分3分。需要的同学可以下载一下。 java.util.concurrent主要包括5个部分executor,colletions,locks,atomic,tools。 该图详细的列举了并发包下面的结构,包含所有接口和...

    java并发工具包 java.util.concurrent中文版pdf

    ### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...

    java.util.concurrent 实现线程池队列

    `java.util.concurrent` 包(简称JUC)是Java提供的一个强大的并发工具包,它提供了丰富的并发组件,如线程池、并发容器、锁和同步机制等,极大地简化了并发编程的复杂性。本篇文章将深入探讨如何使用`java.util....

    java并发工具包 java.util.concurrent中文版-带书签版

    Java并发工具包(java.util.concurrent)是Java平台上用于高效、安全地处理多线程编程的重要组件。这个包包含了丰富的并发工具类,旨在帮助开发者构建高度并发的程序,提高程序的性能和可伸缩性。本资源是该工具包的...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版pdf

    Java并发工具包java.util.concurrent是Java平台在Java 5版本中引入的一组新的并发编程类库,旨在帮助Java开发者更容易地实现复杂的并发程序。这一包的出现,极大地简化了开发者在处理线程和数据同步时所遇到的难题,...

    关于 java.util.concurrent 您不知道的 5 件事,第 2 部分

    在Java编程领域,`java.util.concurrent`包是并发编程的核心工具包,提供了高效、线程安全的类和接口,使得开发者能够更容易地处理多线程环境。本篇将深入探讨这个包中一些鲜为人知的知识点,以帮助你提升并发编程的...

    java.util源码-java-util:javautil源代码

    11. **队列和并发队列**:`java.util.Queue`接口及其实现如ArrayDeque、LinkedList(作为Queue的实现),以及`java.util.concurrent`包下的并发队列如LinkedBlockingQueue、ConcurrentLinkedQueue等,提供高效的数据...

    java_util_concurrent_user_guide

    `java.util.concurrent`(JUC)包是Java标准库提供的一组强大的并发工具,它为开发者提供了丰富的类和接口,帮助简化并发编程。本用户指南将深入探讨这个包中的核心概念和主要类。 1. **线程池**: `java.util....

    工作在同一个java虚拟机中的线程能实现消息互发(alpha)

    2. **线程间的阻塞队列**:如`java.util.concurrent`包下的`BlockingQueue`接口及其实现类,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`等,线程可以通过`put()`和`take()`方法进行消息传递。 3. **信号量...

    concurrent 源代码

    3. **同步容器**: `java.util.concurrent`包中的`BlockingQueue`接口及其实现(如`ArrayBlockingQueue`, `LinkedBlockingQueue`)是线程安全的数据结构,适用于生产者-消费者模型。此外,还有`ConcurrentHashMap`,...

    编写程序,使用两个线程,一个队列,其中一个线程从键盘读取数据,放入到队列中,直到读取的数据是字符串quit则结束

    1. 编写程序,使用两个线程,一个队列, 其中一个线程从键盘读取数据,放入到队列中,直到读取的数据是字符串quit则结束,线程的任务就是循环读取数据直到... (b) 必须使用java.util.concurrent.LinkedBlockingQueue.

    java.concurrent包的应用

    Java标准库中的`java.util.concurrent`包提供了线程池的实现,主要由`ExecutorService`接口和`ThreadPoolExecutor`类组成。`ExecutorService`定义了执行任务的接口,而`ThreadPoolExecutor`则是具体的线程池实现,...

    java concurrent 精简源码

    这个“java concurrent 精简源码”资源很可能会包含上述概念的实际应用示例,通过学习和分析这些代码,你可以深入理解Java并发编程的精髓,并能更好地应用于实际项目中。在研究时,建议结合Java官方文档和相关的书籍...

    spider_java.rar_Java spider

    import java.util.concurrent.LinkedBlockingQueue; public class SimpleSpider { private Queue&lt;String&gt; urls = new LinkedBlockingQueue(); private Set&lt;String&gt; visited = new HashSet(); public void start...

    java的concurrent用法详解

    `Executor`框架是`java.util.concurrent`的核心组件之一,它为任务的执行提供了一个统一的接口。其中最重要的接口是`ExecutorService`,它定义了线程池的行为,使得我们可以将任务(`Runnable`或`Callable`对象)...

    Java数据结构实现之Queue.zip

    5. **LinkedBlockingQueue**:`java.util.concurrent.LinkedBlockingQueue` 是一个线程安全的有界队列,基于链接节点实现。它的大小可以指定,不指定则默认为Integer.MAX_VALUE。插入和删除操作的时间复杂度为O(1),...

    LinkedBlockingQueuejava.pdf

    Java中的`LinkedBlockingQueue`是`java.util.concurrent`包下的一种线程安全的阻塞队列,它是基于链表结构实现的,具有很好的性能表现。这种队列在多线程环境下的并发操作中被广泛使用,因为它允许生产者线程向队列...

Global site tag (gtag.js) - Google Analytics