- 浏览: 83222 次
- 性别:
- 来自: 大连
最新评论
-
RepublicW:
大海lb 写道楼主,我最近也在看jdk的一些源码,昨天看了ha ...
HashMap的初始容量(initialCapacity)和装载因子(loadFactor) -
大海lb:
楼主,我最近也在看jdk的一些源码,昨天看了hashmap,那 ...
HashMap的初始容量(initialCapacity)和装载因子(loadFactor)
concurrent包详细解释
编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。
讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。
java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。
为了便于理解,本文使用一个例子来做说明,交代一下它的场景:
假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。
1、Executors
通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。
这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。
还有一个比较诡异的地方
本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。
造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection<? extends Callable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。
和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。
2、Lock
多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。
使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:
java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:
请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。
讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:
说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。
3、并发集合类
集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。
CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。
4、AtomicInteger
对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。
以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。
下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。
上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:
环境依赖
5、CountDownLatch
CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。
最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。
讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。
java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。
为了便于理解,本文使用一个例子来做说明,交代一下它的场景:
假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。
1、Executors
通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。
package service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 线程池服务类 * * @author DigitalSonic */ public class ThreadPoolService { /** * 默认线程池大小 */ public static final int DEFAULT_POOL_SIZE = 5; /** * 默认一个任务的超时时间,单位为毫秒 */ public static final long DEFAULT_TASK_TIMEOUT = 1000; private int poolSize = DEFAULT_POOL_SIZE; private ExecutorService executorService; /** * 根据给定大小创建线程池 */ public ThreadPoolService(int poolSize) { setPoolSize(poolSize); } /** * 使用线程池中的线程来执行任务 */ public void execute(Runnable task) { executorService.execute(task); } /** * 在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间 * * @see #invokeAll(List, long) */ public List<Node> invokeAll(List<ValidationTask> tasks) { return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size()); } /** * 在线程池中执行所有给定的任务并取回运行结果 * * @param timeout 以毫秒为单位的超时时间,小于0表示不设定超时 * @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection) */ public List<Node> invokeAll(List<ValidationTask> tasks, long timeout) { List<Node> nodes = new ArrayList<Node>(tasks.size()); try { List<Future<Node>> futures = null; if (timeout < 0) { futures = executorService.invokeAll(tasks); } else { futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS); } for (Future<Node> future : futures) { try { nodes.add(future.get()); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } return nodes; } /** * 关闭当前ExecutorService * * @param timeout 以毫秒为单位的超时时间 */ public void destoryExecutorService(long timeout) { if (executorService != null && !executorService.isShutdown()) { try { executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); } } /** * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService */ public void createExecutorService() { destoryExecutorService(1000); executorService = Executors.newFixedThreadPool(poolSize); } /** * 调整线程池大小 * @see #createExecutorService() */ public void setPoolSize(int poolSize) { this.poolSize = poolSize; createExecutorService(); } }
这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。
还有一个比较诡异的地方
本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。
造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection<? extends Callable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。
和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。
2、Lock
多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。
使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:
Lock l = ...; l.lock(); try { // 执行操作 } finally { l.unlock(); }
java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:
package service; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 节点类 * * @author DigitalSonic */ public class Node { private String name; private String wsdl; private String result = "PASS"; private String[] dependencies = new String[] {}; private Lock lock = new ReentrantLock(); /** * 默认构造方法 */ public Node() { } /** * 构造节点对象,设置名称及WSDL */ public Node(String name, String wsdl) { this.name = name; this.wsdl = wsdl; } /** * 返回包含节点名称、WSDL以及验证结果的字符串 */ @Override public String toString() { String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result; return toString; } // Getter & Setter public String getName() { return name; } public void setName(String name) { this.name = name; } public String getWsdl() { return wsdl; } public void setWsdl(String wsdl) { this.wsdl = wsdl; } public String getResult() { return result; } public void setResult(String result) { this.result = result; } public String[] getDependencies() { return dependencies; } public void setDependencies(String[] dependencies) { this.dependencies = dependencies; } public Lock getLock() { return lock; } }
package service; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.logging.Logger; import service.mock.MockNodeValidator; /** * 执行验证的任务类 * * @author DigitalSonic */ public class ValidationTask implements Callable<Node> { private static Logger logger = Logger.getLogger("ValidationTask"); private String wsdl; /** * 构造方法,传入节点的WSDL */ public ValidationTask(String wsdl) { this.wsdl = wsdl; } /** * 执行针对某个节点的验证<br/> * 如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证 */ @Override public Node call() throws Exception { Node node = ValidationService.NODE_MAP.get(wsdl); Lock lock = null; logger.info("开始验证节点:" + wsdl); if (node != null) { lock = node.getLock(); if (lock.tryLock()) { // 当前没有其他线程验证该节点 logger.info("当前没有其他线程验证节点" + node.getName() + "[" + wsdl + "]"); try { Node result = MockNodeValidator.validateNode(wsdl); mergeNode(result, node); } finally { lock.unlock(); } } else { // 当前有别的线程正在验证该节点,等待结果 logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果"); lock.lock(); lock.unlock(); } } else { // 从未进行过验证,这种情况应该只出现在系统启动初期 // 这时是在做初始化,不应该有冲突发生 logger.info("首次验证节点:" + wsdl); node = MockNodeValidator.validateNode(wsdl); ValidationService.NODE_MAP.put(wsdl, node); } logger.info("节点" + node.getName() + "[" + wsdl + "]验证结束,验证结果:" + node.getResult()); return node; } /** * 将src的内容合并进dest节点中,不进行深度拷贝 */ private Node mergeNode(Node src, Node dest) { dest.setName(src.getName()); dest.setWsdl(src.getWsdl()); dest.setDependencies(src.getDependencies()); dest.setResult(src.getResult()); return dest; } }
请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。
讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。
3、并发集合类
集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。
CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。
package service; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 执行验证的服务类 * * @author DigitalSonic */ public class ValidationService { /** * 全局节点表 */ public static final Map<String, Node> NODE_MAP = new ConcurrentHashMap<String, Node>(); private ThreadPoolService threadPoolService; public ValidationService(ThreadPoolService threadPoolService) { this.threadPoolService = threadPoolService; } /** * 给出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点 * * @param wsdl 入口节点WSDL */ public void validate(List<String> wsdl) { List<String> visitedNodes = new ArrayList<String>(); List<String> nextRoundNodes = new ArrayList<String>(); nextRoundNodes.addAll(wsdl); while (nextRoundNodes.size() > 0) { List<ValidationTask> tasks = getTasks(nextRoundNodes); List<Node> nodes = threadPoolService.invokeAll(tasks); visitedNodes.addAll(nextRoundNodes); nextRoundNodes.clear(); getNextRoundNodes(nodes, visitedNodes, nextRoundNodes); } } private List<String> getNextRoundNodes(List<Node> nodes, List<String> visitedNodes, List<String> nextRoundNodes) { for (Node node : nodes) { for (String wsdl : node.getDependencies()) { if (!visitedNodes.contains(wsdl)) { nextRoundNodes.add(wsdl); } } } return nextRoundNodes; } private List<ValidationTask> getTasks(List<String> nodes) { List<ValidationTask> tasks = new ArrayList<ValidationTask>(nodes.size()); for (String wsdl : nodes) { tasks.add(new ValidationTask(wsdl)); } return tasks; } }
4、AtomicInteger
对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。
以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。
下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。
package service.mock; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import service.Node; /** * 模拟执行节点验证的Mock类 * * @author DigitalSonic */ public class MockNodeValidator { public static final List<Node> ENTRIES = new ArrayList<Node>(); private static final Map<String, Node> NODE_MAP = new HashMap<String, Node>(); private static AtomicInteger count = new AtomicInteger(0); private static Logger logger = Logger.getLogger("MockNodeValidator"); /* * 构造模拟数据 */ static { Node node0 = new Node("NODE0", "http://node0/check?wsdl"); //入口0 Node node1 = new Node("NODE1", "http://node1/check?wsdl"); Node node2 = new Node("NODE2", "http://node2/check?wsdl"); Node node3 = new Node("NODE3", "http://node3/check?wsdl"); Node node4 = new Node("NODE4", "http://node4/check?wsdl"); Node node5 = new Node("NODE5", "http://node5/check?wsdl"); Node node6 = new Node("NODE6", "http://node6/check?wsdl"); //入口1 Node node7 = new Node("NODE7", "http://node7/check?wsdl"); Node node8 = new Node("NODE8", "http://node8/check?wsdl"); Node node9 = new Node("NODE9", "http://node9/check?wsdl"); node0.setDependencies(new String[] { node1.getWsdl(), node2.getWsdl() }); node1.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() }); node2.setDependencies(new String[] { node5.getWsdl() }); node6.setDependencies(new String[] { node7.getWsdl(), node8.getWsdl() }); node7.setDependencies(new String[] { node5.getWsdl(), node9.getWsdl() }); node8.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() }); node2.setResult("FAILED"); NODE_MAP.put(node0.getWsdl(), node0); NODE_MAP.put(node1.getWsdl(), node1); NODE_MAP.put(node2.getWsdl(), node2); NODE_MAP.put(node3.getWsdl(), node3); NODE_MAP.put(node4.getWsdl(), node4); NODE_MAP.put(node5.getWsdl(), node5); NODE_MAP.put(node6.getWsdl(), node6); NODE_MAP.put(node7.getWsdl(), node7); NODE_MAP.put(node8.getWsdl(), node8); NODE_MAP.put(node9.getWsdl(), node9); ENTRIES.add(node0); ENTRIES.add(node6); } /** * 模拟执行远程验证返回节点,每次调用等待500ms */ public static Node validateNode(String wsdl) { Node node = cloneNode(NODE_MAP.get(wsdl)); logger.info("验证节点" + node.getName() + "[" + node.getWsdl() + "]"); count.getAndIncrement(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return node; } /** * 获得计数器的值 */ public static int getCount() { return count.intValue(); } /** * 克隆一个新的Node对象(未执行深度克隆) */ public static Node cloneNode(Node originalNode) { Node newNode = new Node(); newNode.setName(originalNode.getName()); newNode.setWsdl(originalNode.getWsdl()); newNode.setResult(originalNode.getResult()); newNode.setDependencies(originalNode.getDependencies()); return newNode; } }
上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:
环境依赖
Node0 [Node1, Node2] Node1 [Node3, Node4] Node2 [Node5] Node6 [Node7, Node8] Node7 [Node5, Node9] Node8 [Node3, Node4]
5、CountDownLatch
CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。
最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。
package service.mock; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import service.Node; import service.ThreadPoolService; import service.ValidationService; /** * 模拟执行这个环境的验证 * * @author DigitalSonic */ public class ValidationStarter implements Runnable { private List<String> entries; private ValidationService validationService; private CountDownLatch signal; public ValidationStarter(List<String> entries, ValidationService validationService, CountDownLatch signal) { this.entries = entries; this.validationService = validationService; this.signal = signal; } /** * 线程池大小为10,初始化执行一次,随后并发三个验证 */ public static void main(String[] args) { ThreadPoolService threadPoolService = new ThreadPoolService(10); ValidationService validationService = new ValidationService(threadPoolService); List<String> entries = new ArrayList<String>(); CountDownLatch signal = new CountDownLatch(3); long start; long stop; for (Node node : MockNodeValidator.ENTRIES) { entries.add(node.getWsdl()); } start = System.currentTimeMillis(); validationService.validate(entries); threadPoolService.execute(new ValidationStarter(entries, validationService, signal)); threadPoolService.execute(new ValidationStarter(entries, validationService, signal)); threadPoolService.execute(new ValidationStarter(entries, validationService, signal)); try { signal.await(); } catch (InterruptedException e) { e.printStackTrace(); } stop = System.currentTimeMillis(); threadPoolService.destoryExecutorService(1000); System.out.println("实际执行验证次数: " + MockNodeValidator.getCount()); System.out.println("实际执行时间: " + (stop - start) + "ms"); } @Override public void run() { validationService.validate(entries); signal.countDown(); } }
相关推荐
书中详细解释了这些机制的工作原理和使用场景,以及死锁、活锁和饥饿等问题的预防。 3. **高级并发工具**:随着Java 5的发布,`java.util.concurrent`包引入了许多高级并发工具,如`ExecutorService`、`Future`、`...
作者不仅详细解释了理论基础,还提供了大量实际案例,帮助读者理解并掌握复杂的并发概念。 #### 标签:Joe Duffy Joe Duffy是一位知名的软件工程师和技术作家,在并发编程领域拥有深厚的专业知识和实践经验。通过...
在Java并发编程领域,Doug Lea是一位杰出的专家,他的工作对Java内存模型(JMM)和并发工具类(如java.util.concurrent包)的开发产生了深远影响。本书主要围绕以下几个关键知识点展开: 1. **并发基础**:首先,书...
下面将详细解释这个过程以及涉及的相关知识点。 首先,`java.concurrent`包是Java标准库的一部分,提供了并发编程的支持,包括线程池、同步机制、并发容器等。在生成PDF的过程中,如果数据处理或转换工作量较大,...
书中的`.chm`文件可能包含了完整的书籍内容,包括详细的章节、示例代码和解释,对于学习和提升Java并发编程能力是非常宝贵的资源。阅读并实践这本书中的内容,可以帮助开发者构建出更稳定、高效且易于维护的并发程序...
文档的一部分内容列出了部分接口和类,以及一些关键词,接下来将详细解释这些知识点: Executor是一个顶层接口,定义了一个执行方法execute(Runnable command),该方法允许执行一个没有返回值的任务。它是Java并...
以下是一些基于这个源码包的关键知识点的详细解释: 1. **Lambda 表达式**:Java 8 最显著的新特性之一就是 Lambda 表达式,它简化了函数式编程。在 `java.util.function` 包中,你可以看到各种函数接口,如 `...
下面将详细阐述`concurrent.futures`模块的使用方法和核心功能。 ### 1. `concurrent.futures`概述 `concurrent.futures`模块主要包含了两个核心类:`ThreadPoolExecutor`和`ProcessPoolExecutor`。这两个类分别...
Java中文解释的API是针对Java开发者的极其重要的参考资料,它为Java API提供了详细的中文注解,使得开发者在阅读和理解API时能更加便捷,尤其对于非英语为母语的程序员来说,这是一个非常实用的工具。Java API包含了...
以下是这些包的功能及重要性的详细解释: 1. **JainSipApi1.2**:这是JAIN SIP API的实现,JAIN是Java电信接入接口(Java Advanced Intelligence Network)的缩写,它提供了一个标准接口,使得开发者能够编写与底层...
此外,文件`mult_process_concurrent`可能包含了示例代码或更深入的解释,建议读者结合代码学习,以便更好地理解多进程并发TCP/IP网络编程的细节。 总之,通过掌握Linux下的多进程TCP/IP网络编程,开发者可以构建出...
这个"JAVA很详细的类库"资源很可能包含了这些类库的详细解释、示例代码以及使用技巧,对于Java学习者来说是一份非常宝贵的资料。通过深入学习和实践,开发者不仅可以掌握Java的基础知识,还能提高解决问题和开发高...
6. **多线程**:"java.lang.Thread"和"java.util.concurrent"包展示了Java如何支持并发编程。 7. **反射机制**:"java.lang.reflect"包提供了运行时访问类和对象的能力,是动态编程的重要部分。 8. **编译器与...
"不错的自述文件"可能是指该项目提供了一份详细的文档,解释了其设计原理和使用方法。 标签"Java"表明这个话题与Java语言密切相关,意味着我们讨论的是Java平台上的并发编程技术。 至于"Concurrent-master",这...
以下是对标题"spring依赖架包part2"及描述"spring依赖架包"中涉及的一些关键依赖的详细解释: 1. **Commons FileUpload**:Apache Commons FileUpload 是一个用于处理HTTP请求中的多部分数据(如上传文件)的库。它...
3. **功能详解**:这是手册的核心部分,详细解释了独立与协调控制功能的具体实现方法,包括安装、接线、规格参数以及维护保养等方面的信息。 - **安装**:提供了详细的步骤说明,指导用户如何正确安装NX100控制器...
以下是关于这些jar包的详细解释: 1. **jpe_sdk.jar**:这是ArcSDE Java Platform Edition (JPE)的核心库,包含了访问和操作ArcSDE地理数据库的API。开发者可以使用这个jar包来创建、读取、更新和删除空间数据,...
5. **多线程**:Java.lang.Thread类和java.util.concurrent包提供了多线程支持。文档详细介绍了如何创建和管理线程,以及同步机制如synchronized关键字、Lock接口和Semaphore类。 6. **网络编程**:Java.net包提供...
下面将详细阐述资源包中涉及的知识点。 1. Maven插件3.8.4: Maven是一个广泛使用的Java项目管理工具,它通过POM(Project Object Model)文件来管理项目的构建、依赖和报告。Maven 3.8.4是最新版本,提供了一些...
压缩包中的`jdi-overview.html`、`overview-bundled.html`和`overview-core.html`可能是JDK1.7的文档概述,它们详细解释了JDK1.7的各个部分,包括Java Debug Interface (JDI)、内置的库和核心API。这些文档对于理解...