Doug Lea 最初编写的 util.concurrent
包变成了 JSR-166 ,然后又变成了 J2SE 平台的 Tiger 版本。这个新库提供的是并发程序中通常需要的一组实用程序。如果对于优化对集合的多线程访问有兴趣,那么您就找对地方了。请在本文对应的讨论论坛上与作者 John Zukowski 及其他读者分享您对本文的想法。(您也可以单击文章顶部或底部的 讨论来访问论坛)。
在 Java 编程的早期阶段,位于 Oswego 市的纽约州立大学(SUNY) 的一位教授决定创建一个简单的库,以帮助开发人员构建可以更好地处理多线程情况的应用程序。这并不是说用现有的库就不能实现,但是就像有了标准网络库一样,用经过调试的、可信任的库更容易自己处理多线程。在 Addision-Wesley 的一本相关书籍的帮助下,这个库变得越来越流行了。最终,作者 Doug Lea 决定设法让它成为 Java 平台的标准部分 —— JSR-166。这个库最后变成了 Tiger 版本的 java.util.concurrent
包。在这篇新的 驯服 Tiger 技巧中,我们将探讨 Collection Framework 中新的 Queue
接口、这个接口的非并发和并发实现、并发 Map
实现和专用于读操作大大超过写操作这种情况的并发 List
和 Set
实现。
介绍 Queue 接口
java.util
包为集合提供了一个新的基本接口: java.util.Queue
。虽然肯定可以在相对应的两端进行添加和删除而将 java.util.List
作为队列对待,但是这个新的 Queue
接口提供了支持添加、删除和检查集合的更多方法,如下所示:
public boolean offer(Object element)
public Object remove()
public Object poll()
public Object element()
public Object peek()
|
基本上,一个队列就是一个先入先出(FIFO)的数据结构。一些队列有大小限制,因此如果想在一个满的队列中加入一个新项,多出的项就会被拒绝。这时新的 offer
方法就可以起作用了。它不是对调用 add()
方法抛出一个 unchecked 异常,而只是得到由 offer()
返回的 false。 remove()
和 poll()
方法都是从队列中删除第一个元素(head)。 remove()
的行为与 Collection
接口的版本相似,但是新的 poll()
方法在用空集合调用时不是抛出异常,只是返回 null。因此新的方法更适合容易出现异常条件的情况。后两个方法 element()
和 peek()
用于在队列的头部查询元素。与 remove()
方法类似,在队列为空时, element()
抛出一个异常,而 peek()
返回 null。
使用基本队列
在 Tiger 中有两组 Queue
实现:实现了新 BlockingQueue
接口的和没有实现这个接口的。我将首先分析那些没有实现的。
在最简单的情况下,原来有的 java.util.LinkedList
实现已经改造成不仅实现 java.util.List
接口,而且还实现 java.util.Queue
接口。可以将集合看成这两者中的任何一种。清单 1 显示将 LinkedList
作为 Queue
使用的一种方法:
清单 1. 使用 Queue 实现
Queue queue = new LinkedList();
queue.offer("One");
queue.offer("Two");
queue.offer("Three");
queue.offer("Four");
// Head of queue should be One
System.out.println("Head of queue is: " + queue.poll());
|
再复杂一点的是新的 java.util.AbstractQueue
类。这个类的工作方式类似于 java.util.AbstractList
和 java.util.AbstractSet
类。在创建自定义集合时,不用自己实现整个接口,只是继承抽象实现并填入细节。使用 AbstractQueue
时,必须为方法 offer()
、 poll()
和 peek()
提供实现。像 add()
和 addAll()
这样的方法修改为使用 offer()
,而 clear()
和 remove()
使用 poll()
。最后, element()
使用 peek()
。当然可以在子类中提供这些方法的优化实现,但是不是必须这么做。而且,不必创建自己的子类,可以使用几个内置的实现, 其中两个是不阻塞队列: PriorityQueue
和 ConcurrentLinkedQueue
。
PriorityQueue
和 ConcurrentLinkedQueue
类在 Collection Framework 中加入两个具体集合实现。 PriorityQueue
类实质上维护了一个有序列表。加入到 Queue
中的元素根据它们的天然排序(通过其 java.util.Comparable
实现)或者根据传递给构造函数的 java.util.Comparator
实现来定位。将清单 2 中的 LinkedList
改变为 PriorityQueue
将会打印出 Four 而不是 One,因为按字母排列 —— 字符串的天然顺序 —— Four 是第一个。 ConcurrentLinkedQueue
是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大小, ConcurrentLinkedQueue
对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。
使用阻塞队列
新的 java.util.concurrent
包在 Collection Framework 中可用的具体集合类中加入了 BlockingQueue
接口和五个阻塞队列类。假如不熟悉阻塞队列概念,它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。 BlockingQueue
接口的 Javadoc 给出了阻塞队列的基本用法,如清单 2 所示。生产者中的 put()
操作会在没有空间可用时阻塞,而消费者的 take()
操作会在队列中没有任何东西时阻塞。
清单 2. 使用 BlockingQueue
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
|
五个队列所提供的各有不同:
ArrayBlockingQueue
:一个由数组支持的有界队列。
LinkedBlockingQueue
:一个由链接节点支持的可选有界队列。
PriorityBlockingQueue
:一个由优先级堆支持的无界优先级队列。
DelayQueue
:一个由优先级堆支持的、基于时间的调度队列。
SynchronousQueue
:一个利用 BlockingQueue
接口的简单聚集(rendezvous)机制。
前两个类 ArrayBlockingQueue
和 LinkedBlockingQueue
几乎相同,只是在后备存储器方面有所不同, LinkedBlockingQueue
并不总是有容量界限。无大小界限的 LinkedBlockingQueue
类在添加元素时永远不会有阻塞队列的等待(至少在其中有 Integer.MAX_VALUE
元素之前不会)。
PriorityBlockingQueue
是具有无界限容量的队列,它利用所包含元素的 Comparable
排序顺序来以逻辑顺序维护元素。可以将它看作 TreeSet
的可能替代物。例如,在队列中加入字符串 One、Two、Three 和 Four 会导致 Four 被第一个取出来。对于没有天然顺序的元素,可以为构造函数提供一个 Comparator
。不过对 PriorityBlockingQueue
有一个技巧。从 iterator()
返回的 Iterator
实例不需要以优先级顺序返回元素。如果必须以优先级顺序遍历所有元素,那么让它们都通过 toArray()
方法并自己对它们排序,像 Arrays.sort(pq.toArray())
。
新的 DelayQueue
实现可能是其中最有意思(也是最复杂)的一个。加入到队列中的元素必须实现新的 Delayed
接口(只有一个方法 —— long getDelay(java.util.concurrent.TimeUnit unit)
)。因为队列的大小没有界限,使得添加可以立即返回,但是在延迟时间过去之前,不能从队列中取出元素。如果多个元素完成了延迟,那么最早失效/失效时间最长的元素将第一个取出。实际上没有听上去这样复杂。清单 3 演示了这种新的阻塞队列集合的使用:
清单 3. 使用 DelayQueue 实现
import java.util.*;
import java.util.concurrent.*;
public class Delay {
/**
* Delayed implementation that actually delays
*/
static class NanoDelay implements Delayed {
long trigger;
NanoDelay(long i) {
trigger = System.nanoTime() + i;
}
public int compareTo(Object y) {
long i = trigger;
long j = ((NanoDelay)y).trigger;
if (i < j) return -1;
if (i > j) return 1;
return 0;
}
public boolean equals(Object other) {
return ((NanoDelay)other).trigger == trigger;
}
public boolean equals(NanoDelay other) {
return ((NanoDelay)other).trigger == trigger;
}
public long getDelay(TimeUnit unit) {
long n = trigger - System.nanoTime();
return unit.convert(n, TimeUnit.NANOSECONDS);
}
public long getTriggerTime() {
return trigger;
}
public String toString() {
return String.valueOf(trigger);
}
}
public static void main(String args[]) throws InterruptedException {
Random random = new Random();
DelayQueue queue = new DelayQueue();
for (int i=0; i < 5; i++) {
queue.add(new NanoDelay(random.nextInt(1000)));
}
long last = 0;
for (int i=0; i < 5; i++) {
NanoDelay delay = (NanoDelay)(queue.take());
long tt = delay.getTriggerTime();
System.out.println("Trigger time: " + tt);
if (i != 0) {
System.out.println("Delta: " + (tt - last));
}
last = tt;
}
}
}
|
这个例子首先是一个内部类 NanoDelay
,它实质上将暂停给定的任意纳秒(nanosecond)数,这里利用了 System
的新 nanoTime()
方法。然后 main()
方法只是将 NanoDelay
对象放到队列中并再次将它们取出来。如果希望队列项做一些其他事情,就需要在 Delayed
对象的实现中加入方法,并在从队列中取出后调用这个新方法。(请随意扩展 NanoDelay
以试验加入其他方法做一些有趣的事情。)显示从队列中取出元素的两次调用之间的时间差。如果时间差是负数,可以视为一个错误,因为永远不会在延迟时间结束后,在一个更早的触发时间从队列中取得项。
SynchronousQueue
类是最简单的。它没有内部容量。它就像线程之间的手递手机制。在队列中加入一个元素的生产者会等待另一个线程的消费者。当这个消费者出现时,这个元素就直接在消费者和生产者之间传递,永远不会加入到阻塞队列中。
使用 ConcurrentMap 实现
新的 java.util.concurrent.ConcurrentMap
接口和 ConcurrentHashMap
实现只能在键不存在时将元素加入到 map 中,只有在键存在并映射到特定值时才能从 map 中删除一个元素。
有一个新的 putIfAbsent()
方法用于在 map 中进行添加。这个方法以要添加到 ConcurrentMap
实现中的键的值为参数,就像普通的 put()
方法,但是只有在 map 不包含这个键时,才能将键加入到 map 中。如果 map 已经包含这个键,那么这个键的现有值就会保留。 putIfAbsent()
方法是原子的。如果不调用这个原子操作,就需要从适当的同步块中调用清单 4 中的代码:
清单 4. 等价的 putIfAbsent() 代码
if (!map.containsKey(key)) {
return map.put(key, value);
} else {
return map.get(key);
}
|
像 putIfAbsent()
方法一样,重载后的 remove()
方法有两个参数 —— 键和值。在调用时,只有当键映射到指定的值时才从 map 中删除这个键。如果不匹配,那么就不删除这个键,并返回 false。如果值匹配键的当前映射内容,那么就删除这个键。清单 5 显示了这种操作的等价源代码:
清单 5. 等价的 remove() 代码
if (map.get(key).equals(value)) {
map.remove(key);
return true;
} else {
return false;
}
|
使用 CopyOnWriteArrayList 和 CopyOnWriteArraySet
在 Doug Lea 的 Concurrent Programming in Java一书的第 2 章第 2.4.4 节(请参阅 参考资料)中,对 copy-on-write 模式作了最好的描述。实质上,这个模式声明了,为了维护对象的一致性快照,要依靠不可变性(immutability)来消除在协调读取不同的但是相关的属性时需要的同步。对于集合,这意味着如果有大量的读(即 get()
) 和迭代,不必同步操作以照顾偶尔的写(即 add()
)调用。对于新的 CopyOnWriteArrayList
和 CopyOnWriteArraySet
类,所有可变的(mutable)操作都首先取得后台数组的副本,对副本进行更改,然后替换副本。这种做法保证了在遍历自身更改的集合时,永远不会抛出 ConcurrentModificationException
。遍历集合会用原来的集合完成,而在以后的操作中使用更新后的集合。
这些新的集合, CopyOnWriteArrayList
和 CopyOnWriteArraySet
,最适合于读操作通常大大超过写操作的情况。一个最常提到的例子是使用监听器列表。已经说过,Swing 组件还没有改为使用新的集合。相反,它们继续使用 javax.swing.event.EventListenerList
来维护它们的监听器列表。
如清单 6 所示,集合的使用与它们的非 copy-on-write 替代物完全一样。只是创建集合并在其中加入或者删除元素。即使对象加入到了集合中,原来的 Iterator
也可以进行,继续遍历原来集合中的项。
清单 6. 展示一个 copy-on-write 集合
import java.util.*;
import java.util.concurrent.*;
public class CopyOnWrite {
public static void main(String args[]) {
List list1 = new CopyOnWriteArrayList(Arrays.asList(args));
List list2 = new ArrayList(Arrays.asList(args));
Iterator itor1 = list1.iterator();
Iterator itor2 = list2.iterator();
list1.add("New");
list2.add("New");
try {
printAll(itor1);
} catch (ConcurrentModificationException e) {
System.err.println("Shouldn't get here");
}
try {
printAll(itor2);
} catch (ConcurrentModificationException e) {
System.err.println("Will get here.");
}
}
private static void printAll(Iterator itor) {
while (itor.hasNext()) {
System.out.println(itor.next());
}
}
}
|
这个示例程序用命令行参数创建 CopyOnWriteArrayList
和 ArrayList
这两个实例。在得到每一个实例的 Iterator
后,分别在其中加入一个元素。当 ArrayList
迭代因一个 ConcurrentModificationException
问题而立即停止时, CopyOnWriteArrayList
迭代可以继续,不会抛出异常,因为原来的集合是在得到 iterator 之后改变的。如果这种行为(比如通知原来一组事件监听器中的所有元素)是您需要的,那么最好使用 copy-on-write 集合。如果不使用的话,就还用原来的,并保证在出现异常时对它进行处理。
结束语
在 J2SE 平台的 Tiger 版中有许多重要的增加。除了语言级别的改变,如一般性支持,这个库也许是最重要的增加了,因为它会被最广泛的用户使用。不要忽视加入到平台中的其他包,像 Java Management Extensions (JMX),但是大多数其他重要的库增强只针对范围很窄的开发人员。但是这个库不是。除了用于锁定和原子操作的其他并发实用程序,这些类也会经常使用。尽早学习它们并利用它们所提供的功能。
分享到:
相关推荐
一个高性能的Java线程库,该库是 JDK 1.5 中的 java.util.concurrent 包的补充,可用于基于并发消息机制的应用。该类库不提供远程的消息功能,其设计的宗旨是实现一个内存中的消息传递机制. 主要特点有: * All ...
《backport-util-concurrent:Java并发编程的利器》 在Java的世界里,高效并发处理是提升应用程序性能的关键因素之一。backport-util-concurrent库,正如其名,是一种将Java 5及以上版本的并发特性“回移植”到Java...
1.打开cmd,cd到jdk的path,本机是:cd C:\Java\jdk6\bin 2.资源javaConcurrentAnimated.jar放在D盘根目录 3.使用java -cp命令: java -cp D:\javaConcurrentAnimated.jar vgrazi.concurrent.samples.launcher....
在并发编程方面,JDK1.5引入了并发工具类(java.util.concurrent),包括Semaphore、CyclicBarrier、CountDownLatch等,这些工具极大地简化了多线程编程中的同步和协调。 在内存模型和并发性能上,JDK1.5引入了Java...
9. **并发API(Concurrent APIs)**: JDK1.5加强了对多线程编程的支持,引入了并发工具类如`java.util.concurrent`包,包括`ExecutorService`、`Semaphore`、`CountDownLatch`等,以及`java.util.concurrent.atomic`...
在并发编程方面,JDK 1.5引入了并发工具类(java.util.concurrent package),如Executor框架、Semaphore、CountDownLatch和CyclicBarrier等,这些工具大大简化了多线程编程,提高了并发应用的效率和可靠性。...
- **并发集合**:`java.util.concurrent`包提供了线程安全的集合,如`ConcurrentHashMap`和`CopyOnWriteArrayList`。 - **Treeset和TreeMap**:基于红黑树的数据结构,提供了有序的集合和映射操作。 4. **I/O流...
12. **并发编程改进**:包括`java.util.concurrent`包的引入,提供了线程池、并发容器、并发工具类等,简化了多线程编程。 13. **XML支持的增强**:JAXB(Java Architecture for XML Binding)的引入,使得Java对象...
对于并发编程,JDK1.5引入了java.util.concurrent包,其中包括了线程池、Future、Callable接口以及CyclicBarrier和Semaphore等同步工具类,极大地丰富了并发处理能力,提升了多线程环境下的性能和可维护性。...
JDK 1.5引入了`java.util.concurrent`包,提供了线程安全的数据结构和并发编程工具,如`Executor`框架、`Future`接口、`Semaphore`、`CyclicBarrier`等,极大地简化了多线程编程。 ### 10. **Synchronized关键字...
13. **内存模型和并发工具**:`java.util.concurrent.atomic`和`java.util.concurrent.locks`包提供了原子变量和锁机制,帮助开发者正确地处理多线程环境下的共享数据。 在"docs"这个压缩包文件中,开发者可以深入...
JDK 1.5引入了java.util.concurrent包,包含了线程池、并发集合和并发工具类,极大地改善了多线程编程的效率和可靠性。 11. **NIO.2(New I/O 2)** 虽然NIO.2是在JDK 7中引入的,但JDK 1.5开始的NIO(非阻塞I/O...
10. **并发改进**:添加了`java.util.concurrent`包,包含许多线程管理和同步工具类,如Executor框架,Semaphore,CountDownLatch等。 11. **内存模型的增强**:JDK 1.5对Java内存模型(JMM)进行了改进,确保了多...
在并发处理上,JDK1.5引入了并发工具类(java.util.concurrent),包括线程池(ExecutorService)、并发容器(如ConcurrentHashMap)以及Future接口等,这些工具极大地提高了多线程环境下的程序设计效率和性能。...
9. **并发编程改进(Concurrency Improvements)**:提供了`java.util.concurrent`包,包含了许多线程安全的数据结构和同步工具类。 **绿色版JDK1.5的使用** 绿色版JDK1.5通常是一个.zip或.rar文件,解压后包含bin...
在Java编程中,`java.util.ConcurrentModificationException` 是一个常见的运行时异常,通常发生在尝试并发修改集合时。这个异常的产生是由于集合类(如HashMap)的非线程安全特性,当你在一个线程中使用迭代器遍历...
10. **线程并发库**:JDK 1.5加强了对多线程编程的支持,提供了`java.util.concurrent`包,包含如`ExecutorService`、`Future`、`Semaphore`等工具类,使得并发编程更加高效和安全。 11. **二进制表示(二进制 ...
9. **并发编程改进**:包括`java.util.concurrent`包的引入,提供了线程池、Future、CyclicBarrier等高级并发工具,简化了多线程编程。 **安装过程** JDK 1.5.0.22的64位安装版文件名为`jdk-1_5_0_22-windows-amd...
9. **并发改进(Concurrency Enhancements)**:包括`java.util.concurrent`包的引入,提供了线程安全的数据结构和高级并发工具,如`Executor`框架,简化了多线程编程。 10. **NIO.2(New I/O 2)**:虽然JDK 1.5中...
引入了`java.util.concurrent`包,包含了线程池、并发集合等高效并发工具类,提升了多线程编程的效率和安全性。 通过这些特性,JDK1.5极大地提升了Java语言的表达能力和编程效率,为后来的Java版本奠定了坚实的...