private final E[] items;/** The queued items */ private int takeIndex;/** items index for next take, poll or remove */ private int putIndex;/** items index for next put, offer, or add. */ private int count;/** Number of items in the queue */
offer 在队 列满了的时候,直接返回false,put 是block住,直到有空位置。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
同理,take 是block的,poll 不是。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
public E poll(long timeout, TimeUnit unit) throws InterruptedException
public class Producer_Consumer_Pattern { public static void main(String[] args) { BlockingQueue<String> q = new ArrayBlockingQueue<String>(10); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } } class Producer implements Runnable { private final BlockingQueue<String> queue; Producer(BlockingQueue<String> q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { } } String produce() { return "1"; } } class Consumer implements Runnable { private final BlockingQueue<String> queue; Consumer(BlockingQueue<String> q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { } } void consume(String x) { System.out.println(Thread.currentThread().getName() + " " + x); } }
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } private transient Node<E> head; /** Head of linked list */ private transient Node<E> last; /** Tail of linked list */
private final int capacity; /** The capacity bound, or Integer.MAX_VALUE if none */ private final AtomicInteger count = new AtomicInteger(0); /** Current number of elements */ private final ReentrantLock takeLock = new ReentrantLock(); /** Lock held by take, poll, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Lock held by put, offer, etc */ private final Condition notEmpty = takeLock.newCondition(); /** Wait queue for waiting takes */ private final Condition notFull = putLock.newCondition(); /** Wait queue for waiting puts */
private void enqueue(E x) { last = last.next = new Node<E>(x); }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
是对 java.util.PriorityQueue<E> 的封装。加入了锁机制。
private final PriorityQueue<E> q; private final ReentrantLock lock = new ReentrantLock(true); private final Condition notEmpty = lock.newCondition();
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (q.size() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = q.poll(); assert x != null; return x; } finally { lock.unlock(); } }
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { boolean ok = q.offer(e); assert ok; notEmpty.signal(); return true; } finally { lock.unlock(); } }
private void siftUp(int k, E x) { if (comparator != null) siftUpUsingComparator(k, x); else siftUpComparable(k, x); }
private void siftUpComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; } private void siftUpUsingComparator(int k, E x) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; }
private transient volatile Transferer transferer;
就实践方式来看,SynchronousQueue类似于Ada或CSP等语言中的"交会通道(Rendezvous Channel)"。在其它环境中,有时候被称为"连接"。
DelayQueue = BlockingQueue + PriorityQueue + Delayed
DelayQueue 是一个使用优先队列( PriorityQueue )实现的 BlockingQueue ,优先队列的比较基准值是时间。
LinkedBlockingDeque [deck]
/** Doubly-linked list node class */ static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x, Node<E> p, Node<E> n) { item = x; prev = p; next = n; } }
private boolean linkFirst(E e) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node<E> f = first; Node<E> x = new Node<E>(e, null, f); first = x; if (last == null) last = x; else f.prev = x; ++count; notEmpty.signal(); return true; } private boolean linkLast(E e) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node<E> l = last; Node<E> x = new Node<E>(e, l, null); last = x; if (first == null) first = x; else l.next = x; ++count; notEmpty.signal(); return true; } private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; if (f == null) return null; Node<E> n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) last = null; else n.prev = null; --count; notFull.signal(); return item; } private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) return null; Node<E> p = l.prev; E item = l.item; l.item = null; l.prev = l; // help GC last = p; if (p == null) first = null; else p.next = null; --count; notFull.signal(); return item; } void unlink(Node<E> x) { // assert lock.isHeldByCurrentThread(); Node<E> p = x.prev; Node<E> n = x.next; if (p == null) { unlinkFirst(); } else if (n == null) { unlinkLast(); } else { p.next = n; n.prev = p; x.item = null; // Don't mess with x's links. They may still be in use by // an iterator. --count; notFull.signal(); } }
