- 浏览: 497127 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (250)
- concurrent (11)
- io (1)
- CI (10)
- linux (57)
- windows (2)
- java (38)
- mac (4)
- eclipse (9)
- db (13)
- python (5)
- groovy (5)
- flex (7)
- hibernate (5)
- odb (8)
- netbeans (1)
- web (31)
- book (14)
- erlang (2)
- communication (2)
- virtualization (5)
- jUnit (0)
- jsf (1)
- perl (1)
- java jax-rs (5)
- Jenkins (2)
- Jenkins Plugin (3)
- android (2)
- git (1)
- big data (0)
- 试读 (1)
最新评论
-
yzzy4793:
讲的很清楚,明白
同步synchronized方法和代码块 -
aa51513:
中文乱码式硬伤
Jersey2.x对REST请求处理流程的分析 -
feiwomoshu1991:
...
同步synchronized方法和代码块 -
marshan:
启动失败的原因是加载的类版本冲突,因此你首先要保证依赖的版本和 ...
richfaces中facelet版本升级到2时的典型错误和解决办法 -
zhaohang6688:
请问我按照你的方式修改还是报错 错误信息还是这个 是为什么啊 ...
richfaces中facelet版本升级到2时的典型错误和解决办法
ArrayBlockingQueue
内部结构
private final E[] items;/** The queued items */ private int takeIndex;/** items index for next take, poll or remove */ private int putIndex;/** items index for next put, offer, or add. */ private int count;/** Number of items in the queue */
offer 在队 列满了的时候,直接返回false,put 是block住,直到有空位置。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
同理,take 是block的,poll 不是。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
两个超时参数的方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
public E poll(long timeout, TimeUnit unit) throws InterruptedException
生产者-消费者
public class Producer_Consumer_Pattern { public static void main(String[] args) { BlockingQueue<String> q = new ArrayBlockingQueue<String>(10); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } } class Producer implements Runnable { private final BlockingQueue<String> queue; Producer(BlockingQueue<String> q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { } } String produce() { return "1"; } } class Consumer implements Runnable { private final BlockingQueue<String> queue; Consumer(BlockingQueue<String> q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { } } void consume(String x) { System.out.println(Thread.currentThread().getName() + " " + x); } }
LinkedBlockingQueue
内部是一个单项链表
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } private transient Node<E> head; /** Head of linked list */ private transient Node<E> last; /** Tail of linked list */
private final int capacity; /** The capacity bound, or Integer.MAX_VALUE if none */ private final AtomicInteger count = new AtomicInteger(0); /** Current number of elements */ private final ReentrantLock takeLock = new ReentrantLock(); /** Lock held by take, poll, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Lock held by put, offer, etc */ private final Condition notEmpty = takeLock.newCondition(); /** Wait queue for waiting takes */ private final Condition notFull = putLock.newCondition(); /** Wait queue for waiting puts */
入队
private void enqueue(E x) { last = last.next = new Node<E>(x); }
出队
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
删除,需要将两把锁都锁好
public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
PriorityBlockingQueue
是对 java.util.PriorityQueue<E> 的封装。加入了锁机制。
private final PriorityQueue<E> q; private final ReentrantLock lock = new ReentrantLock(true); private final Condition notEmpty = lock.newCondition();
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (q.size() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = q.poll(); assert x != null; return x; } finally { lock.unlock(); } }
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { boolean ok = q.offer(e); assert ok; notEmpty.signal(); return true; } finally { lock.unlock(); } }
入队时排序
private void siftUp(int k, E x) { if (comparator != null) siftUpUsingComparator(k, x); else siftUpComparable(k, x); }
private void siftUpComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; } private void siftUpUsingComparator(int k, E x) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; }
SynchrousQueue
内部的主角是一个队列或者堆栈,两者都是Transferer的子类
private transient volatile Transferer transferer;
一个阻塞队列在每次的插入操作中必须等等另一线程执行对应的删除线程,反之亦然。同步队列并没有任何内部的存储空间。
只有当有线程正在等待消费某个元素时,SynchronousQueue才会允许将该元素插入到队列中。
就实践方式来看,SynchronousQueue类似于Ada或CSP等语言中的"交会通道(Rendezvous Channel)"。在其它环境中,有时候被称为"连接"。
DelayQueue
DelayQueue = BlockingQueue + PriorityQueue + Delayed
DelayQueue 是一个使用优先队列( PriorityQueue )实现的 BlockingQueue ,优先队列的比较基准值是时间。
LinkedBlockingDeque [deck]
双向阻塞队列
/** Doubly-linked list node class */ static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x, Node<E> p, Node<E> n) { item = x; prev = p; next = n; } }
private boolean linkFirst(E e) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node<E> f = first; Node<E> x = new Node<E>(e, null, f); first = x; if (last == null) last = x; else f.prev = x; ++count; notEmpty.signal(); return true; } private boolean linkLast(E e) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node<E> l = last; Node<E> x = new Node<E>(e, l, null); last = x; if (first == null) first = x; else l.next = x; ++count; notEmpty.signal(); return true; } private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; if (f == null) return null; Node<E> n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) last = null; else n.prev = null; --count; notFull.signal(); return item; } private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) return null; Node<E> p = l.prev; E item = l.item; l.item = null; l.prev = l; // help GC last = p; if (p == null) first = null; else p.next = null; --count; notFull.signal(); return item; } void unlink(Node<E> x) { // assert lock.isHeldByCurrentThread(); Node<E> p = x.prev; Node<E> n = x.next; if (p == null) { unlinkFirst(); } else if (n == null) { unlinkLast(); } else { p.next = n; n.prev = p; x.item = null; // Don't mess with x's links. They may still be in use by // an iterator. --count; notFull.signal(); } }
发表评论
-
[童虎退壳系列]死锁演示
2011-10-13 01:53 946package creative.fire.multit ... -
[童虎退壳系列]方法加锁测试
2011-10-13 01:34 1161package creative.fire.multit ... -
线程池 浅析
2010-12-03 15:26 1050本文是对java线程池的粗浅分析,视野局限于线程池的基本实现, ... -
并发集合 CopyOnWrite
2010-11-28 01:59 1081CopyOnWriteArrayList 内部结构比较简 ... -
并发集合 ConcurrentHash
2010-11-21 21:09 1258Synchronized Collections -- ... -
同步器
2010-11-17 02:08 1050Latch 门闩 CountDownLatch 的一个有用特 ... -
线程池的实现
2009-12-19 21:51 1405自己实现了一个简单的线程池。希望回复者讨论技术问题,不要说都已 ... -
LRU缓存的实现之性能测试
2009-12-08 15:19 1407针对上一篇文章,这里给出性能测试:10000条随机数据,50个 ... -
LRU缓存的实现
2009-12-08 11:42 2426LinkedHashMap是一个现成的LRU实现。但其缺乏并发 ... -
同步synchronized方法和代码块
2009-03-15 22:40 7713同步synchronized方法和代码块 打个比方:一个ob ...
相关推荐
Windows环境下,Microsoft的.NET Framework提供了一系列并发集合,如ConcurrentDictionary、ConcurrentQueue、ConcurrentStack等。这些类提供了线程安全的插入、删除和查询操作,避免了传统线程同步(如Mutex、...
除了ConcurrentQueue之外,C#并发集合还包括其他几种类型: 3. **ConcurrentStack**:并发栈,一种后进先出(LIFO)的数据结构,与ConcurrentQueue类似,但操作顺序相反。 4. **ConcurrentDictionary, TValue>**:...
让我们深入探讨其中几个关键的线程安全集合:`ConcurrentBag<T>`, `ConcurrentQueue<T>`, `ConcurrentStack<T>`, 和 `ConcurrentDictionary, TValue>`。 1. **ConcurrentBag** `ConcurrentBag<T>` 是一个线程安全...
而如果需要一个有顺序的并发集合,ConcurrentSkipListMap 或 ConcurrentSkipListSet 可能满足需求;对于需要阻塞操作的队列,ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue 提供了不同的选项。...
6. **fail-fast机制**:这是集合框架中处理并发修改的一种机制。当集合在迭代过程中被修改,并且检测到这种修改时,迭代器会立即抛出ConcurrentModificationException异常,以快速失败的方式阻止继续遍历。这是为了...
在Java编程语言中,`Queue`接口是集合框架的一部分,它代表了先进先出(FIFO)的数据结构,也就是我们通常所说的队列。队列是一种非常基础且实用的数据结构,广泛应用于多线程同步、任务调度、缓存管理等多个场景。...
5. **并发集合(Concurrent Collections)**:为了解决多线程访问集合时可能出现的问题,.NET Framework提供了一组并发安全的集合类,如`ConcurrentBag`、`ConcurrentDictionary`、`ConcurrentQueue`和`...
5. **性能优化**:根据需求,你可能要考虑队列的大小限制,避免内存过度消耗,或者使用`ConcurrentQueue`等线程安全的集合,以提高并发性能。 在“WindowsApplication3”项目中,这个示例可能包含了一个简单的UI,...
"swift-Concurrent-函数式并发原语的集合" 主题着重于如何利用Swift语言的特性来实现高效的并行处理。这里我们将深入探讨Swift中的并发编程概念、函数式并发原语以及如何使用它们来优化代码性能。 首先,Swift 支持...
首先,Boost是一个开源的C++库集合,它为C++标准库提供了大量的扩展。Boost库包含了许多实用的功能,如智能指针、线程管理、文件系统操作、正则表达式、序列化等,极大地丰富了C++的生态系统。在高并发服务器的开发...
3. **多种数据结构**:MapDB 提供了多种数据结构,如BTreeMap(支持排序和范围查询)、HTreeMap(适合内存中的高速查找)、HashTreeMap(适用于大容量数据的存储)以及Queue、Set、List等集合类。 4. **事务处理**...
此外,还可能讲解到泛型、迭代器、并发集合(如ConcurrentHashMap)等内容,这些都是Java集合框架中的重要知识点。 对于初学者来说,了解这些接口的基本操作,如add、remove、contains等,以及它们的区别,是掌握...
- 在多线程环境下考虑使用并发集合。 - 了解并熟练运用Java 8的流API,提升代码简洁性和性能。 以上内容涵盖了Java中集合操作的基本概念和常见用法,对于初学者来说,理解和掌握这些知识点是学习Java编程的基础。...
Java提供了ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet等并发集合,它们在多线程环境下保证了线程安全,提高了并发性能。 7. **泛型** 泛型是Java集合框架的重要特性,可以限制集合中存储的...
最后,Java集合框架的并发处理也是重要一环。ConcurrentHashMap和CopyOnWriteArrayList等线程安全的集合类,在多线程环境下保证数据一致性,提升了性能。 总的来说,Java集合框架是一个强大的工具箱,为开发者提供...
在编写代码时,要考虑到集合的线程安全性,必要时使用synchronized关键字或者并发集合如ConcurrentHashMap和CopyOnWriteArrayList。 总的来说,Java集合框架提供了丰富的选择,满足了各种数据存储和操作的需求。...
通过分析`ouqiang-delay-queue-35d75ae`这个压缩包中的源代码,我们可以深入理解上述概念如何转化为实际的代码实现。源码可能包含了任务的创建、调度、消费等接口,以及与Redis交互的具体实现。同时,它可能还包含了...
在处理大量数据或并发访问时,线程安全的集合则不可或缺。 总的来说,理解C#集合的概念及其与泛型的关系,有助于编写更高效、更安全的代码,提高软件的可维护性和性能。无论是开发简单的应用程序还是复杂的系统,...