`

并发集合 Queue

阅读更多

ArrayBlockingQueue

内部结构

 

private final E[] items;/** The queued items */


private int takeIndex;/** items index for next take, poll or remove */


private int putIndex;/** items index for next put, offer, or add. */


private int count;/** Number of items in the queue */
 

offer 在队 列满了的时候,直接返回false,put 是block住,直到有空位置。

 

	public boolean offer(E e) {


		if (e == null)


			throw new NullPointerException();


		final ReentrantLock lock = this.lock;


		lock.lock();


		try {


			if (count == items.length)


				return false;


			else {


				insert(e);


				return true;


			}


		} finally {


			lock.unlock();


		}


	}
	public void put(E e) throws InterruptedException {


		if (e == null)


			throw new NullPointerException();


		final E[] items = this.items;


		final ReentrantLock lock = this.lock;


		lock.lockInterruptibly();


		try {


			try {


				while (count == items.length)


					notFull.await();


			} catch (InterruptedException ie) {


				notFull.signal(); // propagate to non-interrupted thread


				throw ie;


			}


			insert(e);


		} finally {


			lock.unlock();


		}


	}
	private E extract() {


		final E[] items = this.items;


		E x = items[takeIndex];


		items[takeIndex] = null;


		takeIndex = inc(takeIndex);


		--count;


		notFull.signal();


		return x;


	}
 

同理,take 是block的,poll 不是。

 

public E poll() {


		final ReentrantLock lock = this.lock;


		lock.lock();


		try {


			if (count == 0)


				return null;


			E x = extract();


			return x;


		} finally {


			lock.unlock();


		}


	}
	public E take() throws InterruptedException {


		final ReentrantLock lock = this.lock;


		lock.lockInterruptibly();


		try {


			try {


				while (count == 0)


					notEmpty.await();


			} catch (InterruptedException ie) {


				notEmpty.signal(); // propagate to non-interrupted thread


				throw ie;


			}


			E x = extract();


			return x;


		} finally {


			lock.unlock();


		}


	}
	












private void insert(E x) {


		items[putIndex] = x;


		putIndex = inc(putIndex);


		++count;


		notEmpty.signal();


	}
 

两个超时参数的方法

 

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
public E poll(long timeout, TimeUnit unit) throws InterruptedException

生产者-消费者

 

public class Producer_Consumer_Pattern {


	public static void main(String[] args) {


		BlockingQueue<String> q = new ArrayBlockingQueue<String>(10);


		Producer p = new Producer(q);


		Consumer c1 = new Consumer(q);


		Consumer c2 = new Consumer(q);


		new Thread(p).start();


		new Thread(c1).start();


		new Thread(c2).start();


	}


}





class Producer implements Runnable {


	private final BlockingQueue<String> queue;





	Producer(BlockingQueue<String> q) {


		queue = q;


	}





	public void run() {


		try {


			while (true) {


				queue.put(produce());


			}


		} catch (InterruptedException ex) {





		}


	}





	String produce() {


		return "1";


	}


}





class Consumer implements Runnable {


	private final BlockingQueue<String> queue;





	Consumer(BlockingQueue<String> q) {


		queue = q;


	}





	public void run() {


		try {


			while (true) {


				consume(queue.take());


			}


		} catch (InterruptedException ex) {


		}


	}





	void consume(String x) {


		System.out.println(Thread.currentThread().getName() + " " + x);


	}


}

 

LinkedBlockingQueue

内部是一个单项链表

static class Node<E> {


	E item;


	Node<E> next;


	Node(E x) {


		item = x;


	}


}





private transient Node<E> head;	/** Head of linked list */


private transient Node<E> last;	/** Tail of linked list */


 
private final int capacity;	/** The capacity bound, or Integer.MAX_VALUE if none */


private final AtomicInteger count = new AtomicInteger(0);	/** Current number of elements */





private final ReentrantLock takeLock = new ReentrantLock();	/** Lock held by take, poll, etc */


private final ReentrantLock putLock = new ReentrantLock();	/** Lock held by put, offer, etc */





private final Condition notEmpty = takeLock.newCondition();	/** Wait queue for waiting takes */


private final Condition notFull = putLock.newCondition();	/** Wait queue for waiting puts */

 

入队

private void enqueue(E x) {


	last = last.next = new Node<E>(x);


}
 

出队

private E dequeue() {


	// assert takeLock.isHeldByCurrentThread();


	Node<E> h = head;


	Node<E> first = h.next;


	h.next = h; // help GC


	head = first;


	E x = first.item;


	first.item = null;


	return x;


}


删除,需要将两把锁都锁好

public boolean remove(Object o) {


	if (o == null)


		return false;


	fullyLock();


	try {


		for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {


			if (o.equals(p.item)) {


				unlink(p, trail);


				return true;


			}


		}


		return false;


	} finally {


		fullyUnlock();


	}


}

 

PriorityBlockingQueue

是对 java.util.PriorityQueue<E> 的封装。加入了锁机制。

private final PriorityQueue<E> q;


private final ReentrantLock lock = new ReentrantLock(true);


private final Condition notEmpty = lock.newCondition();

 

public E take() throws InterruptedException {


	final ReentrantLock lock = this.lock;


	lock.lockInterruptibly();


	try {


		try {


			while (q.size() == 0)


				notEmpty.await();


		} catch (InterruptedException ie) {


			notEmpty.signal(); // propagate to non-interrupted thread


			throw ie;


		}


		E x = q.poll();


		assert x != null;


		return x;


	} finally {


		lock.unlock();


	}


}
 
public boolean offer(E e) {


	final ReentrantLock lock = this.lock;


	lock.lock();


	try {


		boolean ok = q.offer(e);


		assert ok;


		notEmpty.signal();


		return true;


	} finally {


		lock.unlock();


	}


}

 

入队时排序

private void siftUp(int k, E x) {


	if (comparator != null)


		siftUpUsingComparator(k, x);


	else


		siftUpComparable(k, x);


}
 
    private void siftUpComparable(int k, E x) {


        Comparable<? super E> key = (Comparable<? super E>) x;


        while (k > 0) {


            int parent = (k - 1) >>> 1;


            Object e = queue[parent];


            if (key.compareTo((E) e) >= 0)


                break;


            queue[k] = e;


            k = parent;


        }


        queue[k] = key;


    }





    private void siftUpUsingComparator(int k, E x) {


        while (k > 0) {


            int parent = (k - 1) >>> 1;


            Object e = queue[parent];


            if (comparator.compare(x, (E) e) >= 0)


                break;


            queue[k] = e;


            k = parent;


        }


        queue[k] = x;


    }

 

SynchrousQueue

内部的主角是一个队列或者堆栈,两者都是Transferer的子类

private transient volatile Transferer transferer;

一个阻塞队列在每次的插入操作中必须等等另一线程执行对应的删除线程,反之亦然。同步队列并没有任何内部的存储空间。

 

只有当有线程正在等待消费某个元素时,SynchronousQueue才会允许将该元素插入到队列中。
就实践方式来看,SynchronousQueue类似于Ada或CSP等语言中的"交会通道(Rendezvous Channel)"。在其它环境中,有时候被称为"连接"。

 

DelayQueue
DelayQueue = BlockingQueue + PriorityQueue + Delayed
DelayQueue 是一个使用优先队列( PriorityQueue )实现的 BlockingQueue ,优先队列的比较基准值是时间。

 

LinkedBlockingDeque [deck]

双向阻塞队列

	/** Doubly-linked list node class */
	static final class Node<E> {
		E item;
		Node<E> prev;
		Node<E> next;

		Node(E x, Node<E> p, Node<E> n) {
			item = x;
			prev = p;
			next = n;
		}
	}
 

 

private boolean linkFirst(E e) {
	// assert lock.isHeldByCurrentThread();
	if (count >= capacity)
		return false;
	Node<E> f = first;
	Node<E> x = new Node<E>(e, null, f);
	first = x;
	if (last == null)
		last = x;
	else
		f.prev = x;
	++count;
	notEmpty.signal();
	return true;
}

private boolean linkLast(E e) {
	// assert lock.isHeldByCurrentThread();
	if (count >= capacity)
		return false;
	Node<E> l = last;
	Node<E> x = new Node<E>(e, l, null);
	last = x;
	if (first == null)
		first = x;
	else
		l.next = x;
	++count;
	notEmpty.signal();
	return true;
}

private E unlinkFirst() {
	// assert lock.isHeldByCurrentThread();
	Node<E> f = first;
	if (f == null)
		return null;
	Node<E> n = f.next;
	E item = f.item;
	f.item = null;
	f.next = f; // help GC
	first = n;
	if (n == null)
		last = null;
	else
		n.prev = null;
	--count;
	notFull.signal();
	return item;
}

private E unlinkLast() {
	// assert lock.isHeldByCurrentThread();
	Node<E> l = last;
	if (l == null)
		return null;
	Node<E> p = l.prev;
	E item = l.item;
	l.item = null;
	l.prev = l; // help GC

	last = p;
	if (p == null)
		first = null;
	else
		p.next = null;
	--count;
	notFull.signal();
	return item;
}

void unlink(Node<E> x) {
	// assert lock.isHeldByCurrentThread();
	Node<E> p = x.prev;
	Node<E> n = x.next;
	if (p == null) {
		unlinkFirst();
	} else if (n == null) {
		unlinkLast();
	} else {
		p.next = n;
		n.prev = p;
		x.item = null;
		// Don't mess with x's links. They may still be in use by
		// an iterator.
		--count;
		notFull.signal();
	}
}

 

分享到:
评论

相关推荐

    动态加载DLL,并发集合

    Windows环境下,Microsoft的.NET Framework提供了一系列并发集合,如ConcurrentDictionary、ConcurrentQueue、ConcurrentStack等。这些类提供了线程安全的插入、删除和查询操作,避免了传统线程同步(如Mutex、...

    C#并发集合的简单方法.pdf

    除了ConcurrentQueue之外,C#并发集合还包括其他几种类型: 3. **ConcurrentStack**:并发栈,一种后进先出(LIFO)的数据结构,与ConcurrentQueue类似,但操作顺序相反。 4. **ConcurrentDictionary, TValue&gt;**:...

    c#官方线程安全集合源码

    让我们深入探讨其中几个关键的线程安全集合:`ConcurrentBag&lt;T&gt;`, `ConcurrentQueue&lt;T&gt;`, `ConcurrentStack&lt;T&gt;`, 和 `ConcurrentDictionary, TValue&gt;`。 1. **ConcurrentBag** `ConcurrentBag&lt;T&gt;` 是一个线程安全...

    并发容器的原理,7大并发容器详解、及使用场景

    而如果需要一个有顺序的并发集合,ConcurrentSkipListMap 或 ConcurrentSkipListSet 可能满足需求;对于需要阻塞操作的队列,ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue 提供了不同的选项。...

    java集合框架全面进阶.pdf

    6. **fail-fast机制**:这是集合框架中处理并发修改的一种机制。当集合在迭代过程中被修改,并且检测到这种修改时,迭代器会立即抛出ConcurrentModificationException异常,以快速失败的方式阻止继续遍历。这是为了...

    java 自定义Queue队列

    在Java编程语言中,`Queue`接口是集合框架的一部分,它代表了先进先出(FIFO)的数据结构,也就是我们通常所说的队列。队列是一种非常基础且实用的数据结构,广泛应用于多线程同步、任务调度、缓存管理等多个场景。...

    测试并发小工具

    5. **并发集合(Concurrent Collections)**:为了解决多线程访问集合时可能出现的问题,.NET Framework提供了一组并发安全的集合类,如`ConcurrentBag`、`ConcurrentDictionary`、`ConcurrentQueue`和`...

    c# queue 队列例子

    5. **性能优化**:根据需求,你可能要考虑队列的大小限制,避免内存过度消耗,或者使用`ConcurrentQueue`等线程安全的集合,以提高并发性能。 在“WindowsApplication3”项目中,这个示例可能包含了一个简单的UI,...

    swift-Concurrent-函数式并发原语的集合

    "swift-Concurrent-函数式并发原语的集合" 主题着重于如何利用Swift语言的特性来实现高效的并行处理。这里我们将深入探讨Swift中的并发编程概念、函数式并发原语以及如何使用它们来优化代码性能。 首先,Swift 支持...

    高并发回应服务器

    首先,Boost是一个开源的C++库集合,它为C++标准库提供了大量的扩展。Boost库包含了许多实用的功能,如智能指针、线程管理、文件系统操作、正则表达式、序列化等,极大地丰富了C++的生态系统。在高并发服务器的开发...

    mapdb,mapdb提供由磁盘存储或堆外内存支持的并发映射、集合和队列。它是一种快速、易于使用的嵌入式Java数据库引擎。.zip

    3. **多种数据结构**:MapDB 提供了多种数据结构,如BTreeMap(支持排序和范围查询)、HTreeMap(适合内存中的高速查找)、HashTreeMap(适用于大容量数据的存储)以及Queue、Set、List等集合类。 4. **事务处理**...

    一个讲解很清晰的Java集合框架PPT

    此外,还可能讲解到泛型、迭代器、并发集合(如ConcurrentHashMap)等内容,这些都是Java集合框架中的重要知识点。 对于初学者来说,了解这些接口的基本操作,如add、remove、contains等,以及它们的区别,是掌握...

    java中关于集合的操作

    - 在多线程环境下考虑使用并发集合。 - 了解并熟练运用Java 8的流API,提升代码简洁性和性能。 以上内容涵盖了Java中集合操作的基本概念和常见用法,对于初学者来说,理解和掌握这些知识点是学习Java编程的基础。...

    java资料各种集合

    Java提供了ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet等并发集合,它们在多线程环境下保证了线程安全,提高了并发性能。 7. **泛型** 泛型是Java集合框架的重要特性,可以限制集合中存储的...

    集合框架学习笔记

    最后,Java集合框架的并发处理也是重要一环。ConcurrentHashMap和CopyOnWriteArrayList等线程安全的集合类,在多线程环境下保证数据一致性,提升了性能。 总的来说,Java集合框架是一个强大的工具箱,为开发者提供...

    JAVA基础集合代码

    在编写代码时,要考虑到集合的线程安全性,必要时使用synchronized关键字或者并发集合如ConcurrentHashMap和CopyOnWriteArrayList。 总的来说,Java集合框架提供了丰富的选择,满足了各种数据存储和操作的需求。...

    Go-delay-queue基于Redis实现的延迟队列

    通过分析`ouqiang-delay-queue-35d75ae`这个压缩包中的源代码,我们可以深入理解上述概念如何转化为实际的代码实现。源码可能包含了任务的创建、调度、消费等接口,以及与Redis交互的具体实现。同时,它可能还包含了...

    C#集合的概念

    在处理大量数据或并发访问时,线程安全的集合则不可或缺。 总的来说,理解C#集合的概念及其与泛型的关系,有助于编写更高效、更安全的代码,提高软件的可维护性和性能。无论是开发简单的应用程序还是复杂的系统,...

Global site tag (gtag.js) - Google Analytics