`
DigitalSonic
  • 浏览: 214873 次
社区版块
存档分类
最新评论

实战Concurrent

阅读更多

编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。

 

讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。

 

java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。

 

为了便于理解,本文使用一个例子来做说明,交代一下它的场景:

假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。

 

1、Executors

通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。

package service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 线程池服务类
 * 
 * @author DigitalSonic
 */
public class ThreadPoolService {
    /**
     * 默认线程池大小
     */
    public static final int  DEFAULT_POOL_SIZE    = 5;

    /**
     * 默认一个任务的超时时间,单位为毫秒
     */
    public static final long DEFAULT_TASK_TIMEOUT = 1000;

    private int              poolSize             = DEFAULT_POOL_SIZE;
    private ExecutorService  executorService;

    /**
     * 根据给定大小创建线程池
     */
    public ThreadPoolService(int poolSize) {
        setPoolSize(poolSize);
    }

    /**
     * 使用线程池中的线程来执行任务
     */
    public void execute(Runnable task) {
        executorService.execute(task);
    }

    /**
     * 在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间
     * 
     * @see #invokeAll(List, long)
     */
    public List<Node> invokeAll(List<ValidationTask> tasks) {
        return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size());
    }

    /**
     * 在线程池中执行所有给定的任务并取回运行结果
     * 
     * @param timeout 以毫秒为单位的超时时间,小于0表示不设定超时
     * @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)
     */
    public List<Node> invokeAll(List<ValidationTask> tasks, long timeout) {
        List<Node> nodes = new ArrayList<Node>(tasks.size());
        try {
            List<Future<Node>> futures = null;
            if (timeout < 0) {
                futures = executorService.invokeAll(tasks);
            } else {
                futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);
            }
            for (Future<Node> future : futures) {
                try {
                    nodes.add(future.get());
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return nodes;
    }

    /**
     * 关闭当前ExecutorService
     * 
     * @param timeout 以毫秒为单位的超时时间
     */
    public void destoryExecutorService(long timeout) {
        if (executorService != null && !executorService.isShutdown()) {
            try {
                executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.shutdown();
        }
    }

    /**
     * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService
     */
    public void createExecutorService() {
        destoryExecutorService(1000);
        executorService = Executors.newFixedThreadPool(poolSize);
    }

    /**
     * 调整线程池大小
     * @see #createExecutorService()
     */
    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
        createExecutorService();
    }
}
 

这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。

还有一个比较诡异的地方
本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。
造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection<? extends Callable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。

 

和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。

 

2、Lock

多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。

 

使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:

Lock l = ...; 
l.lock();
try {
	// 执行操作
} finally {
	l.unlock();
}

 

java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:

package service;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 节点类
 * 
 * @author DigitalSonic
 */
public class Node {
	private String name;
	private String wsdl;
	private String result = "PASS";
	private String[] dependencies = new String[] {};
	private Lock lock = new ReentrantLock();
	/**
	 * 默认构造方法
	 */
	public Node() {
	}
	
	/**
	 * 构造节点对象,设置名称及WSDL
	 */
	public Node(String name, String wsdl) {
		this.name = name;
		this.wsdl = wsdl;
	}

	/**
	 * 返回包含节点名称、WSDL以及验证结果的字符串
	 */
	@Override
	public String toString() {
		String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result;
		return toString;
	}
	
	// Getter & Setter
	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getWsdl() {
		return wsdl;
	}

	public void setWsdl(String wsdl) {
		this.wsdl = wsdl;
	}

	public String getResult() {
		return result;
	}

	public void setResult(String result) {
		this.result = result;
	}

	public String[] getDependencies() {
		return dependencies;
	}

	public void setDependencies(String[] dependencies) {
		this.dependencies = dependencies;
	}

	public Lock getLock() {
		return lock;
	}

}
package service;

import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;

import service.mock.MockNodeValidator;

/**
 * 执行验证的任务类
 * 
 * @author DigitalSonic
 */
public class ValidationTask implements Callable<Node> {
    private static Logger logger = Logger.getLogger("ValidationTask");

    private String        wsdl;

    /**
     * 构造方法,传入节点的WSDL
     */
    public ValidationTask(String wsdl) {
        this.wsdl = wsdl;
    }

    /**
     * 执行针对某个节点的验证<br/>
     * 如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证
     */
    @Override
    public Node call() throws Exception {
        Node node = ValidationService.NODE_MAP.get(wsdl);
        Lock lock = null;
        logger.info("开始验证节点:" + wsdl);
        if (node != null) {
            lock = node.getLock();
            if (lock.tryLock()) {
                // 当前没有其他线程验证该节点
                logger.info("当前没有其他线程验证节点" + node.getName() + "[" + wsdl + "]");
                try {
                    Node result = MockNodeValidator.validateNode(wsdl);
                    mergeNode(result, node);
                } finally {
                    lock.unlock();
                }
            } else {
                // 当前有别的线程正在验证该节点,等待结果
                logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果");
                lock.lock();
                lock.unlock();
            }
        } else {
            // 从未进行过验证,这种情况应该只出现在系统启动初期
            // 这时是在做初始化,不应该有冲突发生
            logger.info("首次验证节点:" + wsdl);
            node = MockNodeValidator.validateNode(wsdl);
            ValidationService.NODE_MAP.put(wsdl, node);
        }
        logger.info("节点" + node.getName() + "[" + wsdl + "]验证结束,验证结果:" + node.getResult());
        return node;
    }

    /**
     * 将src的内容合并进dest节点中,不进行深度拷贝
     */
    private Node mergeNode(Node src, Node dest) {
        dest.setName(src.getName());
        dest.setWsdl(src.getWsdl());
        dest.setDependencies(src.getDependencies());
        dest.setResult(src.getResult());
        return dest;
    }
}
   

请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。

 

讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:

 class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }

说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。

 

3、并发集合类

集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。

 

CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。

package service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 执行验证的服务类
 * 
 * @author DigitalSonic
 */
public class ValidationService {
	/**
	 * 全局节点表
	 */
	public static final Map<String, Node> NODE_MAP = new ConcurrentHashMap<String, Node>();

	private ThreadPoolService threadPoolService;
	
	public ValidationService(ThreadPoolService threadPoolService) {
		this.threadPoolService = threadPoolService;
	}

	/**
	 * 给出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点
	 * 
	 * @param wsdl 入口节点WSDL
	 */
	public void validate(List<String> wsdl) {
		List<String> visitedNodes = new ArrayList<String>();
		List<String> nextRoundNodes = new ArrayList<String>();

		nextRoundNodes.addAll(wsdl);
		while (nextRoundNodes.size() > 0) {
			List<ValidationTask> tasks = getTasks(nextRoundNodes);
			List<Node> nodes = threadPoolService.invokeAll(tasks);

			visitedNodes.addAll(nextRoundNodes);
			nextRoundNodes.clear();
			getNextRoundNodes(nodes, visitedNodes, nextRoundNodes);
		}
	}

	private List<String> getNextRoundNodes(List<Node> nodes,
			List<String> visitedNodes, List<String> nextRoundNodes) {
		for (Node node : nodes) {
			for (String wsdl : node.getDependencies()) {
				if (!visitedNodes.contains(wsdl)) {
					nextRoundNodes.add(wsdl);
				}
			}
		}
		return nextRoundNodes;
	}

	private List<ValidationTask> getTasks(List<String> nodes) {
		List<ValidationTask> tasks = new ArrayList<ValidationTask>(nodes.size());
		for (String wsdl : nodes) {
			tasks.add(new ValidationTask(wsdl));
		}
		return tasks;
	}
}

 

4、AtomicInteger

对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。

 

以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。

 

下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。

package service.mock;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import service.Node;

/**
 * 模拟执行节点验证的Mock类
 * 
 * @author DigitalSonic
 */
public class MockNodeValidator {
    public static final List<Node>         ENTRIES  = new ArrayList<Node>();
    private static final Map<String, Node> NODE_MAP = new HashMap<String, Node>();

    private static AtomicInteger           count    = new AtomicInteger(0);
    private static Logger                  logger   = Logger.getLogger("MockNodeValidator");

    /*
     * 构造模拟数据
     */
    static {
        Node node0 = new Node("NODE0", "http://node0/check?wsdl"); //入口0
        Node node1 = new Node("NODE1", "http://node1/check?wsdl");
        Node node2 = new Node("NODE2", "http://node2/check?wsdl");
        Node node3 = new Node("NODE3", "http://node3/check?wsdl");
        Node node4 = new Node("NODE4", "http://node4/check?wsdl");
        Node node5 = new Node("NODE5", "http://node5/check?wsdl");
        Node node6 = new Node("NODE6", "http://node6/check?wsdl"); //入口1
        Node node7 = new Node("NODE7", "http://node7/check?wsdl");
        Node node8 = new Node("NODE8", "http://node8/check?wsdl");
        Node node9 = new Node("NODE9", "http://node9/check?wsdl");

        node0.setDependencies(new String[] { node1.getWsdl(), node2.getWsdl() });
        node1.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
        node2.setDependencies(new String[] { node5.getWsdl() });
        node6.setDependencies(new String[] { node7.getWsdl(), node8.getWsdl() });
        node7.setDependencies(new String[] { node5.getWsdl(), node9.getWsdl() });
        node8.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });

        node2.setResult("FAILED");

        NODE_MAP.put(node0.getWsdl(), node0);
        NODE_MAP.put(node1.getWsdl(), node1);
        NODE_MAP.put(node2.getWsdl(), node2);
        NODE_MAP.put(node3.getWsdl(), node3);
        NODE_MAP.put(node4.getWsdl(), node4);
        NODE_MAP.put(node5.getWsdl(), node5);
        NODE_MAP.put(node6.getWsdl(), node6);
        NODE_MAP.put(node7.getWsdl(), node7);
        NODE_MAP.put(node8.getWsdl(), node8);
        NODE_MAP.put(node9.getWsdl(), node9);

        ENTRIES.add(node0);
        ENTRIES.add(node6);
    }

    /**
     * 模拟执行远程验证返回节点,每次调用等待500ms
     */
    public static Node validateNode(String wsdl) {
        Node node = cloneNode(NODE_MAP.get(wsdl));
        logger.info("验证节点" + node.getName() + "[" + node.getWsdl() + "]");
        count.getAndIncrement();
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return node;
    }

    /**
     * 获得计数器的值
     */
    public static int getCount() {
        return count.intValue();
    }

    /**
     * 克隆一个新的Node对象(未执行深度克隆)
     */
    public static Node cloneNode(Node originalNode) {
        Node newNode = new Node();

        newNode.setName(originalNode.getName());
        newNode.setWsdl(originalNode.getWsdl());
        newNode.setResult(originalNode.getResult());
        newNode.setDependencies(originalNode.getDependencies());

        return newNode;
    }
}

 

上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:

环境依赖
Node0 [Node1, Node2]
Node1 [Node3, Node4]
Node2 [Node5]
Node6 [Node7, Node8]
Node7 [Node5, Node9]
Node8 [Node3, Node4]

 

5、CountDownLatch

CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。

 

最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。

package service.mock;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import service.Node;
import service.ThreadPoolService;
import service.ValidationService;

/**
 * 模拟执行这个环境的验证
 * 
 * @author DigitalSonic
 */
public class ValidationStarter implements Runnable {
    private List<String>      entries;
    private ValidationService validationService;
    private CountDownLatch    signal;

    public ValidationStarter(List<String> entries, ValidationService validationService,
            CountDownLatch signal) {
        this.entries = entries;
        this.validationService = validationService;
        this.signal = signal;
    }

    /**
     * 线程池大小为10,初始化执行一次,随后并发三个验证
     */
    public static void main(String[] args) {
        ThreadPoolService threadPoolService = new ThreadPoolService(10);
        ValidationService validationService = new ValidationService(threadPoolService);
        List<String> entries = new ArrayList<String>();
        CountDownLatch signal = new CountDownLatch(3);
        long start;
        long stop;

        for (Node node : MockNodeValidator.ENTRIES) {
            entries.add(node.getWsdl());
        }

        start = System.currentTimeMillis();

        validationService.validate(entries);
        threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
        threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
        threadPoolService.execute(new ValidationStarter(entries, validationService, signal));

        try {
            signal.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        stop = System.currentTimeMillis();
        threadPoolService.destoryExecutorService(1000);
        System.out.println("实际执行验证次数: " + MockNodeValidator.getCount());
        System.out.println("实际执行时间: " + (stop - start) + "ms");
    }

    @Override
    public void run() {
        validationService.validate(entries);
        signal.countDown();
    }

}
   

=================================我是分割线==============================

 

本文没有覆盖java.util.concurrent中的所有内容,只是挑选一些比较常用的东西,想要获得更多详细信息请阅读JavaDoc。自打有了“轮子”理论,重复造大轮子的情况的确少了,但还是有人会做些小轮子,例如编写多线程程序时用到的小工具(线程池、锁等等),如果可以,请让自己再“懒惰”一点吧~

分享到:
评论
29 楼 u011358822 2014-04-11  
28 楼 u011358822 2014-04-11  
[url][img][list]
[*]
引用

[/list][/img][/url]
27 楼 wuliupo 2010-01-28  
406656983 写道
要更详细的看那本火车头《java并发编程实践》就最好了,不过翻译的不好

我正在这本书,是翻译的。
是俞黎敏写的推荐序。
26 楼 BruceXX 2010-01-28  
thread1==>
lock.lock() ;
oper1();
lock.unlock();


thread2==>
lock.lock();
oper2();
lock.unlock();

thread1=>thread2

在thread1不结束前,是不会进入thread2的。。
PS===>lock 是synchonize 的显式锁,更灵活。。。
类似vc 里的critial section code....
25 楼 406656983 2009-10-29  
要更详细的看那本火车头《java并发编程实践》就最好了,不过翻译的不好
24 楼 KimShen 2009-09-19  
写的很好 不过比Think in java还是少了点什么 里面写的很详细
23 楼 DigitalSonic 2009-09-18  
<div class="quote_title">solonote 写道</div>
<div class="quote_div">
<pre name="code" class="java">else { 
                // <span style="color: #ff0000;">当前有别的线程正在验证该节点,等待结果</span> 
                logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果"); 
                lock.lock(); 
                lock.unlock(); 
            }  </pre>
<p> </p>
<p> </p>
<p>楼主,请教一下,既然你lock.trylock()失败了, 那不是没有获取锁吗? 干嘛要做</p>
<p>lock.lock();</p>
<p>lock.unlock();</p>
</div>
<p><br>要拿结果,一个比较简单的方法就是等着获取锁,锁被释放了自然就是之前的操作有结果了。</p>
<p>当然这只是示范中的一个应用,场景不同做法也会不同,并不是一定要这么做的</p>
22 楼 lucane 2009-09-15  
aquleo 写道

难道大家都不用处理这些问题?


从事的工作比较没有技术含量
几年估计才能写上几句和线程、同步相关的代码。。。
21 楼 solonote 2009-09-10  
<pre name="code" class="java">else { 
                // 当前有别的线程正在验证该节点,等待结果 
                logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果"); 
                lock.lock(); 
                lock.unlock(); 
            }  </pre>
<p> </p>
<p> </p>
<p>楼主,请教一下,既然你lock.trylock()失败了, 那不是没有获取锁吗? 干嘛要做</p>
<p>lock.lock();</p>
<p>lock.unlock();</p>
20 楼 asin 2009-09-10  
学习下。。。。。。。
19 楼 aquleo 2009-09-10  
很好的内容了...

怎么就没人顶呢...

难道大家都不用处理这些问题?
18 楼 wangpx 2009-04-23  
<div class="quote_title">DigitalSonic 写道</div>
<div class="quote_div">
<p>编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。</p>
<p> </p>
<p>讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。</p>
<p> </p>
<p>java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。</p>
<p> </p>
<p>为了便于理解,本文使用一个例子来做说明,交代一下它的场景:</p>
<p>假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。</p>
<p> </p>
<p><strong>1、Executors</strong> </p>
<p>通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable&lt;T&gt;的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。</p>
<pre name="code" class="java">package service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* 线程池服务类
*
* @author DigitalSonic
*/
public class ThreadPoolService {
    /**
     * 默认线程池大小
     */
    public static final int  DEFAULT_POOL_SIZE    = 5;

    /**
     * 默认一个任务的超时时间,单位为毫秒
     */
    public static final long DEFAULT_TASK_TIMEOUT = 1000;

    private int              poolSize             = DEFAULT_POOL_SIZE;
    private ExecutorService  executorService;

    /**
     * 根据给定大小创建线程池
     */
    public ThreadPoolService(int poolSize) {
        setPoolSize(poolSize);
    }

    /**
     * 使用线程池中的线程来执行任务
     */
    public void execute(Runnable task) {
        executorService.execute(task);
    }

    /**
     * 在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间
     *
     * @see #invokeAll(List, long)
     */
    public List&lt;Node&gt; invokeAll(List&lt;ValidationTask&gt; tasks) {
        return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size());
    }

    /**
     * 在线程池中执行所有给定的任务并取回运行结果
     *
     * @param timeout 以毫秒为单位的超时时间,小于0表示不设定超时
     * @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)
     */
    public List&lt;Node&gt; invokeAll(List&lt;ValidationTask&gt; tasks, long timeout) {
        List&lt;Node&gt; nodes = new ArrayList&lt;Node&gt;(tasks.size());
        try {
            List&lt;Future&lt;Node&gt;&gt; futures = null;
            if (timeout &lt; 0) {
                futures = executorService.invokeAll(tasks);
            } else {
                futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);
            }
            for (Future&lt;Node&gt; future : futures) {
                try {
                    nodes.add(future.get());
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return nodes;
    }

    /**
     * 关闭当前ExecutorService
     *
     * @param timeout 以毫秒为单位的超时时间
     */
    public void destoryExecutorService(long timeout) {
        if (executorService != null &amp;&amp; !executorService.isShutdown()) {
            try {
                executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.shutdown();
        }
    }

    /**
     * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService
     */
    public void createExecutorService() {
        destoryExecutorService(1000);
        executorService = Executors.newFixedThreadPool(poolSize);
    }

    /**
     * 调整线程池大小
     * @see #createExecutorService()
     */
    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
        createExecutorService();
    }
}
</pre>
 
<p>这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable&lt;T&gt;对象,等所有任务完成后返回一个包含了执行结果的List&lt;Future&lt;T&gt;&gt;,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。</p>
<div class="quote_title">还有一个比较诡异的地方 </div>
<div class="quote_div">本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable&lt;Node&gt;,可是它死活不认,类型不匹配,这时可以将参数声明由List&lt;ValidationTask&gt;改为 List&lt;Callable&lt;Node&gt;&gt;。<br>造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks),而1.5里是invokeAll(Collection&lt;Callable&lt;T&gt;&gt; tasks)。网上也有人遇到类似的问题(<a title="invokeAll() is not willing to acept a Collection&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;lt;Callable&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;lt;T&gt;&gt;" href="http://stackoverflow.com/questions/370707/invokeall-is-not-willing-to-acept-a-collectioncallablet" target="_blank">invokeAll() is not willing to acept a Collection&lt;Callable&lt;T&gt;&gt;</a> )。</div>
<p> </p>
<p>和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。</p>
<p> </p>
<p><strong>2、Lock</strong> </p>
<p>多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。</p>
<p> </p>
<p>使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:</p>
<pre name="code" class="java">Lock l = ...;
l.lock();
try {
// 执行操作
} finally {
l.unlock();
}
</pre>
<p> </p>
<p>java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:</p>
<pre name="code" class="java">package service;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 节点类
*
* @author DigitalSonic
*/
public class Node {
private String name;
private String wsdl;
private String result = "PASS";
private String[] dependencies = new String[] {};
private Lock lock = new ReentrantLock();
/**
* 默认构造方法
*/
public Node() {
}

/**
* 构造节点对象,设置名称及WSDL
*/
public Node(String name, String wsdl) {
this.name = name;
this.wsdl = wsdl;
}

/**
* 返回包含节点名称、WSDL以及验证结果的字符串
*/
@Override
public String toString() {
String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result;
return toString;
}

// Getter &amp; Setter
public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getWsdl() {
return wsdl;
}

public void setWsdl(String wsdl) {
this.wsdl = wsdl;
}

public String getResult() {
return result;
}

public void setResult(String result) {
this.result = result;
}

public String[] getDependencies() {
return dependencies;
}

public void setDependencies(String[] dependencies) {
this.dependencies = dependencies;
}

public Lock getLock() {
return lock;
}

}
</pre>
<pre name="code" class="java">package service;

import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;

import service.mock.MockNodeValidator;

/**
* 执行验证的任务类
*
* @author DigitalSonic
*/
public class ValidationTask implements Callable&lt;Node&gt; {
    private static Logger logger = Logger.getLogger("ValidationTask");

    private String        wsdl;

    /**
     * 构造方法,传入节点的WSDL
     */
    public ValidationTask(String wsdl) {
        this.wsdl = wsdl;
    }

    /**
     * 执行针对某个节点的验证&lt;br/&gt;
     * 如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证
     */
    @Override
    public Node call() throws Exception {
        Node node = ValidationService.NODE_MAP.get(wsdl);
        Lock lock = null;
        logger.info("开始验证节点:" + wsdl);
        if (node != null) {
            lock = node.getLock();
            if (lock.tryLock()) {
                // 当前没有其他线程验证该节点
                logger.info("当前没有其他线程验证节点" + node.getName() + "[" + wsdl + "]");
                try {
                    Node result = MockNodeValidator.validateNode(wsdl);
                    mergeNode(result, node);
                } finally {
                    lock.unlock();
                }
            } else {
                // 当前有别的线程正在验证该节点,等待结果
                logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果");
                lock.lock();
                lock.unlock();
            }
        } else {
            // 从未进行过验证,这种情况应该只出现在系统启动初期
            // 这时是在做初始化,不应该有冲突发生
            logger.info("首次验证节点:" + wsdl);
            node = MockNodeValidator.validateNode(wsdl);
            ValidationService.NODE_MAP.put(wsdl, node);
        }
        logger.info("节点" + node.getName() + "[" + wsdl + "]验证结束,验证结果:" + node.getResult());
        return node;
    }

    /**
     * 将src的内容合并进dest节点中,不进行深度拷贝
     */
    private Node mergeNode(Node src, Node dest) {
        dest.setName(src.getName());
        dest.setWsdl(src.getWsdl());
        dest.setDependencies(src.getDependencies());
        dest.setResult(src.getResult());
        return dest;
    }
}
</pre>
   
<p>请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。</p>
<p> </p>
<p>讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:</p>
<pre name="code" class="java"> class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition();
   final Condition notEmpty = lock.newCondition();

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
}
</pre>
<p>说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。</p>
<p> </p>
<p><strong>3、并发集合类</strong> </p>
<p>集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。</p>
<p> </p>
<p>CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。</p>
<pre name="code" class="java">package service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 执行验证的服务类
*
* @author DigitalSonic
*/
public class ValidationService {
/**
* 全局节点表
*/
public static final Map&lt;String, Node&gt; NODE_MAP = new ConcurrentHashMap&lt;String, Node&gt;();

private ThreadPoolService threadPoolService;

public ValidationService(ThreadPoolService threadPoolService) {
this.threadPoolService = threadPoolService;
}

/**
* 给出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点
*
* @param wsdl 入口节点WSDL
*/
public void validate(List&lt;String&gt; wsdl) {
List&lt;String&gt; visitedNodes = new ArrayList&lt;String&gt;();
List&lt;String&gt; nextRoundNodes = new ArrayList&lt;String&gt;();

nextRoundNodes.addAll(wsdl);
while (nextRoundNodes.size() &gt; 0) {
List&lt;ValidationTask&gt; tasks = getTasks(nextRoundNodes);
List&lt;Node&gt; nodes = threadPoolService.invokeAll(tasks);

visitedNodes.addAll(nextRoundNodes);
nextRoundNodes.clear();
getNextRoundNodes(nodes, visitedNodes, nextRoundNodes);
}
}

private List&lt;String&gt; getNextRoundNodes(List&lt;Node&gt; nodes,
List&lt;String&gt; visitedNodes, List&lt;String&gt; nextRoundNodes) {
for (Node node : nodes) {
for (String wsdl : node.getDependencies()) {
if (!visitedNodes.contains(wsdl)) {
nextRoundNodes.add(wsdl);
}
}
}
return nextRoundNodes;
}

private List&lt;ValidationTask&gt; getTasks(List&lt;String&gt; nodes) {
List&lt;ValidationTask&gt; tasks = new ArrayList&lt;ValidationTask&gt;(nodes.size());
for (String wsdl : nodes) {
tasks.add(new ValidationTask(wsdl));
}
return tasks;
}
}
</pre>
<p> </p>
<p><strong>4、AtomicInteger</strong> </p>
<p>对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。</p>
<p> </p>
<p>以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。</p>
<p> </p>
<p>下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。</p>
<pre name="code" class="java">package service.mock;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import service.Node;

/**
* 模拟执行节点验证的Mock类
*
* @author DigitalSonic
*/
public class MockNodeValidator {
    public static final List&lt;Node&gt;         ENTRIES  = new ArrayList&lt;Node&gt;();
    private static final Map&lt;String, Node&gt; NODE_MAP = new HashMap&lt;String, Node&gt;();

    private static AtomicInteger           count    = new AtomicInteger(0);
    private static Logger                  logger   = Logger.getLogger("MockNodeValidator");

    /*
     * 构造模拟数据
     */
    static {
        Node node0 = new Node("NODE0", "http://node0/check?wsdl"); //入口0
        Node node1 = new Node("NODE1", "http://node1/check?wsdl");
        Node node2 = new Node("NODE2", "http://node2/check?wsdl");
        Node node3 = new Node("NODE3", "http://node3/check?wsdl");
        Node node4 = new Node("NODE4", "http://node4/check?wsdl");
        Node node5 = new Node("NODE5", "http://node5/check?wsdl");
        Node node6 = new Node("NODE6", "http://node6/check?wsdl"); //入口1
        Node node7 = new Node("NODE7", "http://node7/check?wsdl");
        Node node8 = new Node("NODE8", "http://node8/check?wsdl");
        Node node9 = new Node("NODE9", "http://node9/check?wsdl");

        node0.setDependencies(new String[] { node1.getWsdl(), node2.getWsdl() });
        node1.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
        node2.setDependencies(new String[] { node5.getWsdl() });
        node6.setDependencies(new String[] { node7.getWsdl(), node8.getWsdl() });
        node7.setDependencies(new String[] { node5.getWsdl(), node9.getWsdl() });
        node8.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });

        node2.setResult("FAILED");

        NODE_MAP.put(node0.getWsdl(), node0);
        NODE_MAP.put(node1.getWsdl(), node1);
        NODE_MAP.put(node2.getWsdl(), node2);
        NODE_MAP.put(node3.getWsdl(), node3);
        NODE_MAP.put(node4.getWsdl(), node4);
        NODE_MAP.put(node5.getWsdl(), node5);
        NODE_MAP.put(node6.getWsdl(), node6);
        NODE_MAP.put(node7.getWsdl(), node7);
        NODE_MAP.put(node8.getWsdl(), node8);
        NODE_MAP.put(node9.getWsdl(), node9);

        ENTRIES.add(node0);
        ENTRIES.add(node6);
    }

    /**
     * 模拟执行远程验证返回节点,每次调用等待500ms
     */
    public static Node validateNode(String wsdl) {
        Node node = cloneNode(NODE_MAP.get(wsdl));
        logger.info("验证节点" + node.getName() + "[" + node.getWsdl() + "]");
        count.getAndIncrement();
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return node;
    }

    /**
     * 获得计数器的值
     */
    public static int getCount() {
        return count.intValue();
    }

    /**
     * 克隆一个新的Node对象(未执行深度克隆)
     */
    public static Node cloneNode(Node originalNode) {
        Node newNode = new Node();

        newNode.setName(originalNode.getName());
        newNode.setWsdl(originalNode.getWsdl());
        newNode.setResult(originalNode.getResult());
        newNode.setDependencies(originalNode.getDependencies());

        return newNode;
    }
}
</pre>
<p> </p>
<p>上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:</p>
<div class="quote_title">环境依赖 </div>
<div class="quote_div">Node0 [Node1, Node2]<br>Node1 [Node3, Node4]<br>Node2 [Node5]<br>Node6 [Node7, Node8]<br>Node7 [Node5, Node9]<br>Node8 [Node3, Node4]</div>
<p> </p>
<p><strong>5、CountDownLatch</strong> </p>
<p>CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。</p>
<p> </p>
<p>最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。</p>
<pre name="code" class="java">package service.mock;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import service.Node;
import service.ThreadPoolService;
import service.ValidationService;

/**
* 模拟执行这个环境的验证
*
* @author DigitalSonic
*/
public class ValidationStarter implements Runnable {
    private List&lt;String&gt;      entries;
    private ValidationService validationService;
    private CountDownLatch    signal;

    public ValidationStarter(List&lt;String&gt; entries, ValidationService validationService,
            CountDownLatch signal) {
        this.entries = entries;
        this.validationService = validationService;
        this.signal = signal;
    }

    /**
     * 线程池大小为10,初始化执行一次,随后并发三个验证
     */
    public static void main(String[] args) {
        ThreadPoolService threadPoolService = new ThreadPoolService(10);
        ValidationService validationService = new ValidationService(threadPoolService);
        List&lt;String&gt; entries = new ArrayList&lt;String&gt;();
        CountDownLatch signal = new CountDownLatch(3);
        long start;
        long stop;

        for (Node node : MockNodeValidator.ENTRIES) {
            entries.add(node.getWsdl());
        }

        start = System.currentTimeMillis();

        validationService.validate(entries);
        threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
        threadPoolService.execute(new ValidationStarter(entries, validationService, signal));
        threadPoolService.execute(new ValidationStarter(entries, validationService, signal));

        try {
            signal.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        stop = System.currentTimeMillis();
        threadPoolService.destoryExecutorService(1000);
        System.out.println("实际执行验证次数: " + MockNodeValidator.getCount());
        System.out.println("实际执行时间: " + (stop - start) + "ms");
    }

    @Override
    public void run() {
        validationService.validate(entries);
        signal.countDown();
    }

}
</pre>
   
<p>=================================我是分割线==============================</p>
<p> </p>
<p>本文没有覆盖java.util.concurrent中的所有内容,只是挑选一些比较常用的东西,想要获得更多详细信息请阅读JavaDoc。自打有了“轮子”理论,重复造大轮子的情况的确少了,但还是有人会做些小轮子,例如编写多线程程序时用到的小工具(线程池、锁等等),如果可以,请让自己再“懒惰”一点吧~<img src="/images/smiles/icon_smile.gif" alt=""></p>
</div>
<p> </p>
17 楼 luanma 2009-04-15  
补充一点儿:
1、 Lock称之为显式锁,synchronized为隐式锁,通常情况下可以看做完全相同的东西。
编码中尽量使用隐式锁(即synchronized块)。原因他们性能上几乎没有差别,但是隐式锁编码的时候不容易出错,而Lock有可能会因为编码的疏忽而忘记释放锁。
只有在隐式锁不能满足你的逻辑的时候才是用Lock,比如Lock.tryLock()方法,等等。

2、多线程程序中应尽量的避免使用锁(包括隐式锁和显式锁),尽可能的使用concurrent包中提供的类来实现,譬如AtomicXXX对象、BlockingQueue、CountDownLatch等等。


最近的项目是网络通讯方面,所有并发用得特别多,多交流:)
16 楼 DigitalSonic 2009-04-09  
<div class="quote_title">fujohnwang 写道</div>
<div class="quote_div">
<div class="quote_title">DigitalSonic 写道</div>
<div class="quote_div">
<div class="quote_title">fujohnwang 写道</div>
<div class="quote_div">Condition那个“替代...”的描述感觉不是很贴切,enhance或许更好一些</div>
<p><br />JDK 1.5和1.6的JavaDoc里是这么说的:</p>
<div class="quote_title">JavaDoc 写道</div>
<div class="quote_div">Where a Lock <strong>replaces</strong> the use of synchronized methods and statements, a Condition <strong>replaces</strong> the use of the Object monitor methods. </div>
 </div>
<p> </p>
<p>你好像没有理解我的意思,使用原来的内部锁,你只能在一个condition上wait或者notify; 使用了新的Lock之后,你可以创建多个Condition,然后再不同的Condition上面wait或者notify</p>
<p> </p>
<p>所以,你说是不是增强了哪?</p>
</div>
<p>了解,其实就是有了Lock可以把之前的给扔了<img src="/images/smiles/icon_smile.gif" alt="" /></p>
15 楼 fujohnwang 2009-04-09  
<div class="quote_title">DigitalSonic 写道</div>
<div class="quote_div">
<div class="quote_title">fujohnwang 写道</div>
<div class="quote_div">Condition那个“替代...”的描述感觉不是很贴切,enhance或许更好一些</div>
<p><br />JDK 1.5和1.6的JavaDoc里是这么说的:</p>
<div class="quote_title">JavaDoc 写道</div>
<div class="quote_div">Where a Lock <strong>replaces</strong> the use of synchronized methods and statements, a Condition <strong>replaces</strong> the use of the Object monitor methods. </div>
 </div>
<p> </p>
<p>你好像没有理解我的意思,使用原来的内部锁,你只能在一个condition上wait或者notify; 使用了新的Lock之后,你可以创建多个Condition,然后再不同的Condition上面wait或者notify</p>
<p> </p>
<p>所以,你说是不是增强了哪?</p>
14 楼 boyingking 2009-04-09  
对多线程有深入研究的人确实越来越少了,不知道是不是国内软件发展的悲哀
13 楼 whaosoft 2009-04-08  
我先研究下去啊~ 在告诉你我的感受 先谢过了啊 正好我线程不好老的 ~
12 楼 DigitalSonic 2009-04-08  
<div class="quote_title">fujohnwang 写道</div>
<div class="quote_div"> Condition那个“替代...”的描述感觉不是很贴切,enhance或许更好一些</div>
<p><br />JDK 1.5和1.6的JavaDoc里是这么说的:</p>
<div class="quote_title">JavaDoc 写道</div>
<div class="quote_div">Where a Lock <strong>replaces</strong> the use of synchronized methods and statements, a Condition <strong>replaces</strong> the use of the Object monitor methods. </div>
 
11 楼 fujohnwang 2009-04-08  
Condition那个“替代...”的描述感觉不是很贴切,enhance或许更好一些
10 楼 yulenice 2009-04-08  
我最近也在看这方面api.
effective java 2e也有介绍.
不过item66感觉有点疑问

相关推荐

    实战Concurrent-BlockQueue

    《实战Concurrent-BlockQueue》 在Java并发编程领域,`Concurrent-BlockQueue`是一个重要的数据结构,它结合了线程安全与高效性能。本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`...

    Doug Lea, Concurrent Programming in Java Design Principles and Patterns

    8. **实战应用**:通过大量实例和代码示例,读者可以了解到如何在实际项目中应用这些理论和技巧。 9. **并发编程的最佳实践**:书中给出了许多实用建议,帮助开发者编写出健壮、可维护和高效的并发代码。 总之,...

    使用Java并发编程Concurrent Programming Using Java

    Java平台提供了丰富的API支持并发编程,如`java.util.concurrent`包下的各种类和接口,这些工具可以帮助开发者更高效地管理多线程环境下的任务调度和数据共享问题。 ### Java并发编程基础 #### 1. 多线程基础 - **...

    java并发编程实战源码-concurrent-programming:《Java并发编程实战》源码整理

    《Java并发编程实战》这本书是Java并发编程领域的一本经典之作,它深入浅出地讲解了如何在Java环境中高效、安全地进行多线程编程。源码整理集合`concurrent-programming-master`提供了书中示例代码,对于学习和理解...

    Java 并发编程实战.pdf

    此外,该书可能会对Java中一些新的并发API进行探讨,如java.util.concurrent包下的工具类和接口,例如Executor框架、Future、CompletableFuture、ConcurrentHashMap、Semaphore等。这些工具类和接口在构建大规模并发...

    concurrent-programming:《实战java高并发程序设计》源码整理

    《实战java高并发程序设计》源码整理联系作者十三的java的学习交流QQ群: 881582471 , 658365129(已满)相关文章书籍封面目录第1章走入并行世界1.1何去何从的并行计算1.1.1忘掉那该死的并行1.1.2可怕的现实:摩尔...

    《java 并发编程实战高清PDF版》

    《Java并发编程实战》是一本深入探讨Java平台并发编程的权威指南。这本书旨在帮助开发者理解和掌握在Java环境中创建高效、可扩展且可靠的多线程应用程序的关键技术和实践。它涵盖了从基本概念到高级主题的广泛内容,...

    (PDF带目录)《Java 并发编程实战》,java并发实战,并发

    《Java 并发编程实战》是一本专注于Java并发编程的权威指南,对于任何希望深入了解Java多线程和并发控制机制的开发者来说,都是不可或缺的参考资料。这本书深入浅出地介绍了如何在Java环境中有效地管理和控制并发...

    java concurrent in practive

    《Java并发编程实战》还会讨论`java.util.concurrent`包中的高级并发工具,如`ExecutorService`和`Future`,它们可以方便地管理和控制线程池,提高系统的并行处理能力。`CountDownLatch`、`CyclicBarrier`和`...

    Java并发编程:设计原则与模式(Concurrent.Programming.in.Java)(中英版)

    这本书不仅提供了丰富的理论知识,还介绍了实战中的设计原则和模式,对于Java开发者来说是不可或缺的参考资源。 在Java并发编程领域,理解设计原则至关重要,因为它们指导我们编写高效、安全且可维护的多线程代码。...

    Concurrent Programming on Windows

    五、实战技巧与理论知识的完美结合 Steve Teixeira,微软Parallel Computing Platform部门的产品单元经理,对本书给予了高度评价。他认为本书既全面覆盖了Windows平台上的并发编程,又实用地介绍了可以直接应用于...

    python爬虫项目实战

    6. **多线程与异步IO**:通过`concurrent.futures`或者`asyncio`库实现多线程或异步爬取,提升爬虫效率。 7. **Scrapy框架应用**:当项目规模扩大时,使用Scrapy框架可以更好地组织代码,处理复杂的爬虫逻辑,如...

    实战Java高并发程序设计(高清版)

    4. **并发工具类**:Java并发包(java.util.concurrent)中包含了很多实用的工具类,如`Future`、`CompletableFuture`、`Semaphore`等,这些工具可以帮助开发者更好地管理并发任务。 5. **J.U.C框架**:Java并发 ...

    Java并发编程实战

    5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 CopyOnWriteArrayList 5.3 阻塞队列和生产者-消费者模式 5.3.1 ...

    java并发编程实战高清版pdf

    《Java并发编程实战》是Java开发者深入理解和掌握并发编程的一本经典著作。这本书全面地介绍了Java平台上的并发和多线程编程技术,旨在帮助开发者在多核时代编写出高效、可伸缩且线程安全的代码。 并发编程是现代...

    c#多线程编程实战(原书第二版)源码

    《C#多线程编程实战(原书第二版)源码》是一本深入探讨C#中多线程技术的专业书籍,其源码提供了丰富的实践示例,帮助读者掌握并发编程的核心概念和技术。在C#中,多线程是实现高性能、响应式应用程序的关键组成部分...

Global site tag (gtag.js) - Google Analytics