- 浏览: 26120 次
- 性别:
- 来自: 泰安
文章分类
最新评论
讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。
java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。
为了便于理解,本文使用一个例子来做说明,交代一下它的场景:
假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。
1、Executors
通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。
Java代码
package service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ThreadPoolService {
public static final int DEFAULT_POOL_SIZE = 5;
public static final long DEFAULT_TASK_TIMEOUT = 1000;
private int poolSize = DEFAULT_POOL_SIZE;
private ExecutorService executorService;
public ThreadPoolService(int poolSize) {
setPoolSize(poolSize);
}
public void execute(Runnable task) {
executorService.execute(task);
}
public List<Node> invokeAll(List<ValidationTask> tasks) {
return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size());
}
public List<Node> invokeAll(List<ValidationTask> tasks, long timeout) {
List<Node> nodes = new ArrayList<Node>(tasks.size());
try {
List<Future<Node>> futures = null;
if (timeout < 0) {
futures = executorService.invokeAll(tasks);
} else {
futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);
}
for (Future<Node> future : futures) {
try {
nodes.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return nodes;
}
public void destoryExecutorService(long timeout) {
if (executorService != null && !executorService.isShutdown()) {
try {
executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
public void createExecutorService() {
destoryExecutorService(1000);
executorService = Executors.newFixedThreadPool(poolSize);
}
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
createExecutorService();
}
}
这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。
还有一个比较诡异的地方
本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。
造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection<? extends Callable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。
和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。
2、Lock
多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。
使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:
Java代码
Lock l = ...;
l.lock();
try {
// 执行操作
} finally {
l.unlock();
}
java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:
Java代码
package service;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Node {
private String name;
private String wsdl;
private String result = "PASS";
private String[] dependencies = new String[] {};
private Lock lock = new ReentrantLock();
public Node() {
}
public Node(String name, String wsdl) {
this.name = name;
this.wsdl = wsdl;
}
@Override
public String toString() {
String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result;
return toString;
}
// Getter & Setter
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getWsdl() {
return wsdl;
}
public void setWsdl(String wsdl) {
this.wsdl = wsdl;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public String[] getDependencies() {
return dependencies;
}
public void setDependencies(String[] dependencies) {
this.dependencies = dependencies;
}
public Lock getLock() {
return lock;
}
}
Java代码
package service;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;
import service.mock.MockNodeValidator;
public class ValidationTask implements Callable<Node> {
private static Logger logger = Logger.getLogger("ValidationTask");
private String wsdl;
public ValidationTask(String wsdl) {
this.wsdl = wsdl;
}
@Override
public Node call() throws Exception {
Node node = ValidationService.NODE_MAP.get(wsdl);
Lock lock = null;
logger.info("开始验证节点:" + wsdl);
if (node != null) {
lock = node.getLock();
if (lock.tryLock()) {
// 当前没有其他线程验证该节点
logger.info("当前没有其他线程验证节点" + node.getName() + "[" + wsdl + "]");
try {
Node result = MockNodeValidator.validateNode(wsdl);
mergeNode(result, node);
} finally {
lock.unlock();
}
} else {
// 当前有别的线程正在验证该节点,等待结果
logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果");
lock.lock();
lock.unlock();
}
} else {
// 从未进行过验证,这种情况应该只出现在系统启动初期
// 这时是在做初始化,不应该有冲突发生
logger.info("首次验证节点:" + wsdl);
node = MockNodeValidator.validateNode(wsdl);
ValidationService.NODE_MAP.put(wsdl, node);
}
logger.info("节点" + node.getName() + "[" + wsdl + "]验证结束,验证结果:" + node.getResult());
return node;
}
private Node mergeNode(Node src, Node dest) {
dest.setName(src.getName());
dest.setWsdl(src.getWsdl());
dest.setDependencies(src.getDependencies());
dest.setResult(src.getResult());
return dest;
}
}
请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。
讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:
Java代码
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。
3、并发集合类
集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。
CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。
Java代码
package service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ValidationService {
public static final Map<String, Node> NODE_MAP = new ConcurrentHashMap<String, Node>();
private ThreadPoolService threadPoolService;
public ValidationService(ThreadPoolService threadPoolService) {
this.threadPoolService = threadPoolService;
}
public void validate(List<String> wsdl) {
List<String> visitedNodes = new ArrayList<String>();
List<String> nextRoundNodes = new ArrayList<String>();
nextRoundNodes.addAll(wsdl);
while (nextRoundNodes.size() > 0) {
List<ValidationTask> tasks = getTasks(nextRoundNodes);
List<Node> nodes = threadPoolService.invokeAll(tasks);
visitedNodes.addAll(nextRoundNodes);
nextRoundNodes.clear();
getNextRoundNodes(nodes, visitedNodes, nextRoundNodes);
}
}
private List<String> getNextRoundNodes(List<Node> nodes,
List<String> visitedNodes, List<String> nextRoundNodes) {
for (Node node : nodes) {
for (String wsdl : node.getDependencies()) {
if (!visitedNodes.contains(wsdl)) {
nextRoundNodes.add(wsdl);
}
}
}
return nextRoundNodes;
}
private List<ValidationTask> getTasks(List<String> nodes) {
List<ValidationTask> tasks = new ArrayList<ValidationTask>(nodes.size());
for (String wsdl : nodes) {
tasks.add(new ValidationTask(wsdl));
}
return tasks;
}
}
4、AtomicInteger
对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。
以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。
下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。
Java代码
package service.mock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import service.Node;
public class MockNodeValidator {
public static final List<Node> ENTRIES = new ArrayList<Node>();
private static final Map<String, Node> NODE_MAP = new HashMap<String, Node>();
private static AtomicInteger count = new AtomicInteger(0);
private static Logger logger = Logger.getLogger("MockNodeValidator");
static {
Node node0 = new Node("NODE0", "http://node0/check?wsdl"); //入口0
Node node1 = new Node("NODE1", "http://node1/check?wsdl");
Node node2 = new Node("NODE2", "http://node2/check?wsdl");
Node node3 = new Node("NODE3", "http://node3/check?wsdl");
Node node4 = new Node("NODE4", "http://node4/check?wsdl");
Node node5 = new Node("NODE5", "http://node5/check?wsdl");
Node node6 = new Node("NODE6", "http://node6/check?wsdl"); //入口1
Node node7 = new Node("NODE7", "http://node7/check?wsdl");
Node node8 = new Node("NODE8", "http://node8/check?wsdl");
Node node9 = new Node("NODE9", "http://node9/check?wsdl");
node0.setDependencies(new String[] { node1.getWsdl(), node2.getWsdl() });
node1.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
node2.setDependencies(new String[] { node5.getWsdl() });
node6.setDependencies(new String[] { node7.getWsdl(), node8.getWsdl() });
node7.setDependencies(new String[] { node5.getWsdl(), node9.getWsdl() });
node8.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
node2.setResult("FAILED");
NODE_MAP.put(node0.getWsdl(), node0);
NODE_MAP.put(node1.getWsdl(), node1);
NODE_MAP.put(node2.getWsdl(), node2);
NODE_MAP.put(node3.getWsdl(), node3);
NODE_MAP.put(node4.getWsdl(), node4);
NODE_MAP.put(node5.getWsdl(), node5);
NODE_MAP.put(node6.getWsdl(), node6);
NODE_MAP.put(node7.getWsdl(), node7);
NODE_MAP.put(node8.getWsdl(), node8);
NODE_MAP.put(node9.getWsdl(), node9);
ENTRIES.add(node0);
ENTRIES.add(node6);
}
public static Node validateNode(String wsdl) {
Node node = cloneNode(NODE_MAP.get(wsdl));
logger.info("验证节点" + node.getName() + "[" + node.getWsdl() + "]");
count.getAndIncrement();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return node;
}
public static int getCount() {
return count.intValue();
}
public static Node cloneNode(Node originalNode) {
Node newNode = new Node();
newNode.setName(originalNode.getName());
newNode.setWsdl(originalNode.getWsdl());
newNode.setResult(originalNode.getResult());
newNode.setDependencies(originalNode.getDependencies());
return newNode;
}
}
上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:
环境依赖
Node0 [Node1, Node2]
Node1 [Node3, Node4]
Node2 [Node5]
Node6 [Node7, Node8]
Node7 [Node5, Node9]
Node8 [Node3, Node4]
5、CountDownLatch
CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。
最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。
Java代码
package service.mock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import service.Node;
import service.ThreadPoolService;
import service.ValidationService;
public class ValidationStarter implements Runnable {
private List<String> entries;
private ValidationService validationService;
private CountDownLatch signal;
public ValidationStarter(List<String> entries, ValidationService validationService,
CountDownLatch signal) {
this.entries = entries;
this.validationService = validationService;
this.signal = signal;
}
public static void main(String[] args) {
ThreadPoolService threadPoolService = new ThreadPoolService(10);
ValidationService validationService = new ValidationService(threadPoolService);
List<String> entries = new ArrayList<String>();
CountDownLatch signal = new CountDownLatch(3);
long start;
long stop;
for (Node node : MockNodeValidator.ENTRIES) {
entries.add(node.getWsdl());
}
start = System.currentTimeMillis();
validationService.validate(entries);
threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
try {
signal.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
stop = System.currentTimeMillis();
threadPoolService.destoryExecutorService(1000);
System.out.println("实际执行验证次数: " + MockNodeValidator.getCount());
System.out.println("实际执行时间: " + (stop - start) + "ms");
}
@Override
public void run() {
validationService.validate(entries);
signal.countDown();
}
}
=================================我是分割线==============================
本文没有覆盖java.util.concurrent中的所有内容,只是挑选一些比较常用的东西,想要获得更多详细信息请阅读JavaDoc。自打有了“轮子”理论,重复造大轮子的情况的确少了,但还是有人会做些小轮子,例如编写多线程程序时用到的小工具(线程池、锁等等),如果可以,请让自己再“懒惰”一点吧
java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。
为了便于理解,本文使用一个例子来做说明,交代一下它的场景:
假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。
1、Executors
通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。
Java代码
package service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ThreadPoolService {
public static final int DEFAULT_POOL_SIZE = 5;
public static final long DEFAULT_TASK_TIMEOUT = 1000;
private int poolSize = DEFAULT_POOL_SIZE;
private ExecutorService executorService;
public ThreadPoolService(int poolSize) {
setPoolSize(poolSize);
}
public void execute(Runnable task) {
executorService.execute(task);
}
public List<Node> invokeAll(List<ValidationTask> tasks) {
return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size());
}
public List<Node> invokeAll(List<ValidationTask> tasks, long timeout) {
List<Node> nodes = new ArrayList<Node>(tasks.size());
try {
List<Future<Node>> futures = null;
if (timeout < 0) {
futures = executorService.invokeAll(tasks);
} else {
futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);
}
for (Future<Node> future : futures) {
try {
nodes.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return nodes;
}
public void destoryExecutorService(long timeout) {
if (executorService != null && !executorService.isShutdown()) {
try {
executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
public void createExecutorService() {
destoryExecutorService(1000);
executorService = Executors.newFixedThreadPool(poolSize);
}
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
createExecutorService();
}
}
这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。
还有一个比较诡异的地方
本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。
造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection<? extends Callable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。
和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。
2、Lock
多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。
使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:
Java代码
Lock l = ...;
l.lock();
try {
// 执行操作
} finally {
l.unlock();
}
java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:
Java代码
package service;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Node {
private String name;
private String wsdl;
private String result = "PASS";
private String[] dependencies = new String[] {};
private Lock lock = new ReentrantLock();
public Node() {
}
public Node(String name, String wsdl) {
this.name = name;
this.wsdl = wsdl;
}
@Override
public String toString() {
String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result;
return toString;
}
// Getter & Setter
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getWsdl() {
return wsdl;
}
public void setWsdl(String wsdl) {
this.wsdl = wsdl;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public String[] getDependencies() {
return dependencies;
}
public void setDependencies(String[] dependencies) {
this.dependencies = dependencies;
}
public Lock getLock() {
return lock;
}
}
Java代码
package service;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;
import service.mock.MockNodeValidator;
public class ValidationTask implements Callable<Node> {
private static Logger logger = Logger.getLogger("ValidationTask");
private String wsdl;
public ValidationTask(String wsdl) {
this.wsdl = wsdl;
}
@Override
public Node call() throws Exception {
Node node = ValidationService.NODE_MAP.get(wsdl);
Lock lock = null;
logger.info("开始验证节点:" + wsdl);
if (node != null) {
lock = node.getLock();
if (lock.tryLock()) {
// 当前没有其他线程验证该节点
logger.info("当前没有其他线程验证节点" + node.getName() + "[" + wsdl + "]");
try {
Node result = MockNodeValidator.validateNode(wsdl);
mergeNode(result, node);
} finally {
lock.unlock();
}
} else {
// 当前有别的线程正在验证该节点,等待结果
logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果");
lock.lock();
lock.unlock();
}
} else {
// 从未进行过验证,这种情况应该只出现在系统启动初期
// 这时是在做初始化,不应该有冲突发生
logger.info("首次验证节点:" + wsdl);
node = MockNodeValidator.validateNode(wsdl);
ValidationService.NODE_MAP.put(wsdl, node);
}
logger.info("节点" + node.getName() + "[" + wsdl + "]验证结束,验证结果:" + node.getResult());
return node;
}
private Node mergeNode(Node src, Node dest) {
dest.setName(src.getName());
dest.setWsdl(src.getWsdl());
dest.setDependencies(src.getDependencies());
dest.setResult(src.getResult());
return dest;
}
}
请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。
讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:
Java代码
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。
3、并发集合类
集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。
CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。
Java代码
package service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ValidationService {
public static final Map<String, Node> NODE_MAP = new ConcurrentHashMap<String, Node>();
private ThreadPoolService threadPoolService;
public ValidationService(ThreadPoolService threadPoolService) {
this.threadPoolService = threadPoolService;
}
public void validate(List<String> wsdl) {
List<String> visitedNodes = new ArrayList<String>();
List<String> nextRoundNodes = new ArrayList<String>();
nextRoundNodes.addAll(wsdl);
while (nextRoundNodes.size() > 0) {
List<ValidationTask> tasks = getTasks(nextRoundNodes);
List<Node> nodes = threadPoolService.invokeAll(tasks);
visitedNodes.addAll(nextRoundNodes);
nextRoundNodes.clear();
getNextRoundNodes(nodes, visitedNodes, nextRoundNodes);
}
}
private List<String> getNextRoundNodes(List<Node> nodes,
List<String> visitedNodes, List<String> nextRoundNodes) {
for (Node node : nodes) {
for (String wsdl : node.getDependencies()) {
if (!visitedNodes.contains(wsdl)) {
nextRoundNodes.add(wsdl);
}
}
}
return nextRoundNodes;
}
private List<ValidationTask> getTasks(List<String> nodes) {
List<ValidationTask> tasks = new ArrayList<ValidationTask>(nodes.size());
for (String wsdl : nodes) {
tasks.add(new ValidationTask(wsdl));
}
return tasks;
}
}
4、AtomicInteger
对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。
以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。
下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。
Java代码
package service.mock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import service.Node;
public class MockNodeValidator {
public static final List<Node> ENTRIES = new ArrayList<Node>();
private static final Map<String, Node> NODE_MAP = new HashMap<String, Node>();
private static AtomicInteger count = new AtomicInteger(0);
private static Logger logger = Logger.getLogger("MockNodeValidator");
static {
Node node0 = new Node("NODE0", "http://node0/check?wsdl"); //入口0
Node node1 = new Node("NODE1", "http://node1/check?wsdl");
Node node2 = new Node("NODE2", "http://node2/check?wsdl");
Node node3 = new Node("NODE3", "http://node3/check?wsdl");
Node node4 = new Node("NODE4", "http://node4/check?wsdl");
Node node5 = new Node("NODE5", "http://node5/check?wsdl");
Node node6 = new Node("NODE6", "http://node6/check?wsdl"); //入口1
Node node7 = new Node("NODE7", "http://node7/check?wsdl");
Node node8 = new Node("NODE8", "http://node8/check?wsdl");
Node node9 = new Node("NODE9", "http://node9/check?wsdl");
node0.setDependencies(new String[] { node1.getWsdl(), node2.getWsdl() });
node1.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
node2.setDependencies(new String[] { node5.getWsdl() });
node6.setDependencies(new String[] { node7.getWsdl(), node8.getWsdl() });
node7.setDependencies(new String[] { node5.getWsdl(), node9.getWsdl() });
node8.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
node2.setResult("FAILED");
NODE_MAP.put(node0.getWsdl(), node0);
NODE_MAP.put(node1.getWsdl(), node1);
NODE_MAP.put(node2.getWsdl(), node2);
NODE_MAP.put(node3.getWsdl(), node3);
NODE_MAP.put(node4.getWsdl(), node4);
NODE_MAP.put(node5.getWsdl(), node5);
NODE_MAP.put(node6.getWsdl(), node6);
NODE_MAP.put(node7.getWsdl(), node7);
NODE_MAP.put(node8.getWsdl(), node8);
NODE_MAP.put(node9.getWsdl(), node9);
ENTRIES.add(node0);
ENTRIES.add(node6);
}
public static Node validateNode(String wsdl) {
Node node = cloneNode(NODE_MAP.get(wsdl));
logger.info("验证节点" + node.getName() + "[" + node.getWsdl() + "]");
count.getAndIncrement();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return node;
}
public static int getCount() {
return count.intValue();
}
public static Node cloneNode(Node originalNode) {
Node newNode = new Node();
newNode.setName(originalNode.getName());
newNode.setWsdl(originalNode.getWsdl());
newNode.setResult(originalNode.getResult());
newNode.setDependencies(originalNode.getDependencies());
return newNode;
}
}
上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:
环境依赖
Node0 [Node1, Node2]
Node1 [Node3, Node4]
Node2 [Node5]
Node6 [Node7, Node8]
Node7 [Node5, Node9]
Node8 [Node3, Node4]
5、CountDownLatch
CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。
最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。
Java代码
package service.mock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import service.Node;
import service.ThreadPoolService;
import service.ValidationService;
public class ValidationStarter implements Runnable {
private List<String> entries;
private ValidationService validationService;
private CountDownLatch signal;
public ValidationStarter(List<String> entries, ValidationService validationService,
CountDownLatch signal) {
this.entries = entries;
this.validationService = validationService;
this.signal = signal;
}
public static void main(String[] args) {
ThreadPoolService threadPoolService = new ThreadPoolService(10);
ValidationService validationService = new ValidationService(threadPoolService);
List<String> entries = new ArrayList<String>();
CountDownLatch signal = new CountDownLatch(3);
long start;
long stop;
for (Node node : MockNodeValidator.ENTRIES) {
entries.add(node.getWsdl());
}
start = System.currentTimeMillis();
validationService.validate(entries);
threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
try {
signal.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
stop = System.currentTimeMillis();
threadPoolService.destoryExecutorService(1000);
System.out.println("实际执行验证次数: " + MockNodeValidator.getCount());
System.out.println("实际执行时间: " + (stop - start) + "ms");
}
@Override
public void run() {
validationService.validate(entries);
signal.countDown();
}
}
=================================我是分割线==============================
本文没有覆盖java.util.concurrent中的所有内容,只是挑选一些比较常用的东西,想要获得更多详细信息请阅读JavaDoc。自打有了“轮子”理论,重复造大轮子的情况的确少了,但还是有人会做些小轮子,例如编写多线程程序时用到的小工具(线程池、锁等等),如果可以,请让自己再“懒惰”一点吧
发表评论
-
log temple xml
2014-04-11 10:12 639<?xml version="1.0&qu ... -
class proxy
2014-03-06 16:53 7791.Proxy.newProxyInstance(source ... -
[转]Java多线程总结之由synchronized说开去
2012-11-19 16:22 685Java多线程总结之由synchr ... -
JES服务程序
2012-11-13 14:02 758JES,是Java嵌入服务器-Java Embedded Se ... -
[转]JAVA 中止线程
2012-10-19 17:30 708使用Java内置支持的线程写多线程程序是很常见的事情。然而,多 ... -
[转]深入Java对象及元素的存储区域
2012-10-15 09:46 652在JAVA平台上开发应 ... -
【转】【JAVA】 hashcode相关
2012-10-11 15:20 629有许多人学了很长时间 ...
相关推荐
本资源“java concurrent 精简源码”着重关注Java并发库(java.util.concurrent)的核心概念,包括阻塞队列和线程管理。下面将详细阐述这些知识点。 1. **Java并发库(java.util.concurrent)** Java并发库是Java ...
Java Concurrent in practice (animated)
为了简化多线程编程,Java提供了一系列工具和API,如`java.util.Timer`和`java.util.concurrent`包,这些工具可以帮助开发者更高效地管理线程间的同步问题。 ##### 1.2 synchronized关键字 `synchronized`关键字是...
### Java Concurrent处理并发需求 #### 一、Java并发基础与Concurrent API介绍 在现代软件开发中,尤其是在服务器端应用中,对并发处理的需求日益增长。为了满足这种需求,Java平台提供了一系列强大的工具和API来...
java concurrent包分类结构图
JavaConcurrent是Java平台提供的高级并发API,它使得多线程编程更加高效和安全。本资料集合了Java面试中与多线程相关的常见问题及解答,旨在帮助求职者充分准备这一关键领域的知识。 1. **线程的概念与创建** - ...
在JavaConcurrent库中,提供了丰富的工具类和接口来支持高效的并发编程。 首先,了解多线程的作用至关重要。多线程的主要目的是充分利用多核CPU的计算能力,使得程序能够并行执行不同的任务,提高整体效率。同时,...
EBS java concurrent program的实现
Java并发包(java.concurrent)是Java平台中处理多线程编程的核心工具包,它提供了丰富的类和接口,使得开发者能够高效、安全地编写多线程程序。这个包的设计目标是提高并发性能,减少同步代码的复杂性,并提供高级...
这里我们关注的是如何使用`java.concurrent`包中的工具和XML Processing API(通常指的是JAXB或DOM4J等处理XML的库)来高效地生成PDF。下面将详细解释这个过程以及涉及的相关知识点。 首先,`java.concurrent`包是...
资深Java专家10年经验总结,全程案例式讲解,首本全面介绍Java多线程编程技术的专著 结合大量实例,全面讲解Java多线程编程中的并发访问、线程间通信、锁等最难突破的核心技术与应用实践 封底 Java多线程无处不在,...
《Java并发编程实战》还会讨论`java.util.concurrent`包中的高级并发工具,如`ExecutorService`和`Future`,它们可以方便地管理和控制线程池,提高系统的并行处理能力。`CountDownLatch`、`CyclicBarrier`和`...
JAVA的CONCURRENT用法详解.pdf
Java 1.5引入了`java.util.concurrent`包,包含了一系列的并发工具类,如线程池、阻塞队列、并发集合等。这些工具旨在提高并发性能并简化编程模型。例如,`ExecutorService`和`ThreadPoolExecutor`提供了线程池管理...
### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...
JUC使用指导手册 http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html 中文译文
如何启动:以win7系统为例,最好jdk8 1.打开cmd,cd到jdk的path,本机是:cd C:\Java\jdk6\bin ...java -cp D:\javaConcurrentAnimated.jar vgrazi.concurrent.samples.launcher.ConcurrentExampleLauncher
java concurrent源码 Java7 核心类库源码解析 请直接查看.java : 通过JavaDoc+Test书写 ,方便链接到源码 Tracker 20181014 Java11正式发布并作为新的长期支持版本, 未来的应用会逐步迁移到Java11. 因此Java8以前的...
标题中提到了“java.util.concurrent.uml.pdf”,这表明文件是一份Java并发编程工具包java.util.concurrent的UML(统一建模语言)类结构图的PDF格式文件。UML图能够帮助开发者理解Java并发包中的类、接口及其关系,...