将线程安全性委托给现有的线程安全类,委托时创建线程安全类一个最有效的策略:只需让现有的线程安全类管理所有的状态即可。
5.1 同步容器类
早起JDK的同步容器类包括Vector和Hashtable等,这些同步封装器类是由Collections.synchronizedXxx等工厂方法创建的。
同步容器类的问题是,但有复合操作:迭代、跳转、条件运算的时候,如果有其他线程并发地修改容器的时候,它们可能会出现意料之外的行为。
如果加锁的话,就会牺牲性能和伸缩性。
当容器作为另一个容器的元素或者键值时,就会出现调用hashCode equals等方法,间接调用迭代操作,而可能会产生ConcurrentModificationException异常。同样containsAll、removeAll、retainAll等方法以及把容器作为参数的构造函数都会对容器进行迭代。
5.2 并发容器
JDK5提供了多种并发容器来改进同步容器的性能。上面讲的Vector和Hashtable等是同步容器,它们对所有容器状态访问进行串行化,代价是性能严重低下。
而并发容器是针对多个线程并发访问设计的,在JDK5中,ConcurrentHashMap代替同步且基于散列的Map,CopyOnWriteArrayList用于在遍历操作为主要操作情况下代替同步的List。
注:通过并发容器来代替同步容器,可以极大提高伸缩性和性能并且降低风险
ConcurrentHashMap
CopyOnWriteArrayList
Queue
BlockingQueue:构建生产者-消费者模式的阻塞队列
LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步的List拥有更好的并发性。
PriorityBlockingQueue是一个按优先级排序队列
SynchronizedQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。它维护一组线程,这些线程在等待着把元素加入或者移出队列。 这种区别相当于将文件直接交给同事(SynchronizedQueue),还是将文件放入她的邮箱并希望她能尽快拿到文件。
将桌面搜索问题分解为:文件遍历和建立索引两个独立操作,使用阻塞队列可简化编程模式:
public class ProducerConsumer { static class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue<File> fileQueue, final FileFilter fileFilter, File root) { this.fileQueue = fileQueue; this.root = root; this.fileFilter = new FileFilter() { public boolean accept(File f) { return f.isDirectory() || fileFilter.accept(f); } }; } private boolean alreadyIndexed(File f) { return false; } public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) if (entry.isDirectory()) crawl(entry); else if (!alreadyIndexed(entry)) fileQueue.put(entry); } } } static class Indexer implements Runnable { private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue) { this.queue = queue; } public void run() { try { while (true) indexFile(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public void indexFile(File file) { // Index the file... } } private static final int BOUND = 10; private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); public static void startIndexing(File[] roots) { BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND); FileFilter filter = new FileFilter() { public boolean accept(File file) { return true; } }; for (File root : roots) new Thread(new FileCrawler(queue, filter, root)).start(); for (int i = 0; i < N_CONSUMERS; i++) new Thread(new Indexer(queue)).start(); } }
爬虫程序负责往队列中添加File,属于生产者,而索引线程负责从队列中拿出File,然后建立索引,属于消费者。所有同步管理完全交由容器,相当的爽啊。
对象池利用了串行线程封闭,将对象借给一个请求线程,只要对象池包含足够的内部同步来安全发布池中对象,并且客户端不会发布池中对象或者使用完后返回池中对象,就可以安全地在线程间传递所有权了。
Latch的用法:门闩
public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } }
5.5.3 信号量:
计数信号量 counting semaphore用来控制同时访问某个特定资源的操作数量,或同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。
public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) sem.release(); } } public boolean remove(Object o) { boolean wasRemoved = set.remove(o); if (wasRemoved) sem.release(); return wasRemoved; } }
5.5.4 栅栏 Barrier
上面演示的Latch是一次性对象,一旦进入终止状态,就不能被重置。 Barrier跟Latch类似,它能阻塞一组线程直到某个事件发生。Barrier与Latch的区别在于所有线程必须同时到达Barrier位置,才能继续执行。
Latch用来等待事件,而Barrier用来等待其他线程。
Barrier用来实现一些协议,例如几个家庭决定在某个地方集合: 所有人6:00在地铁洛溪站碰头,到了后要等其他人,之后再讨论下一步怎么做。
CyclicBarrier在并行迭代算法中非常有用,当线程到达barrier位置时候调用await方法,这个方法阻塞直到所有线程都达到barrier位置。如果所有线程都到达barrier的位置,那么barrier will open and release all threads,and then reset state for the next usage。如果对await调用超时或者await阻塞的线程被中断,那么barrier就被认为打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。
细胞自动衍生模拟:
public class CellularAutomata { private final Board mainBoard; private final CyclicBarrier barrier; private final Worker[] workers; public CellularAutomata(Board board) { this.mainBoard = board; int count = Runtime.getRuntime().availableProcessors(); this.barrier = new CyclicBarrier(count, new Runnable() { public void run() { mainBoard.commitNewValues(); } }); this.workers = new Worker[count]; for (int i = 0; i < count; i++) workers[i] = new Worker(mainBoard.getSubBoard(count, i)); } private class Worker implements Runnable { private final Board board; public Worker(Board board) { this.board = board; } public void run() { while (!board.hasConverged()) { for (int x = 0; x < board.getMaxX(); x++) for (int y = 0; y < board.getMaxY(); y++) board.setNewValue(x, y, computeValue(x, y)); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } private int computeValue(int x, int y) { // Compute the new value that goes in (x,y) return 0; } } public void start() { for (int i = 0; i < workers.length; i++) new Thread(workers[i]).start(); mainBoard.waitForConvergence(); } interface Board { int getMaxX(); int getMaxY(); int getValue(int x, int y); int setNewValue(int x, int y, int value); void commitNewValues(); boolean hasConverged(); void waitForConvergence(); Board getSubBoard(int numPartitions, int index); } }
5.6 构建高效可伸缩性的结果缓存:
public class Memoizer<A, V> implements Computable<A, V> { private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer(Computable<A, V> c) { this.c = c; } public V compute(final A arg) throws InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(arg, ft); if (f == null) { f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove(arg, f); } catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } } }
有了这个之后,可以在因式分解中使用这个Memoizer来缓存之前的计算结果,不仅高效而且可扩展性更强:
@ThreadSafe public class Factorizer extends GenericServlet implements Servlet { private final Computable<BigInteger, BigInteger[]> c = new Computable<BigInteger, BigInteger[]>() { public BigInteger[] compute(BigInteger arg) { return factor(arg); } }; private final Computable<BigInteger, BigInteger[]> cache = new Memoizer<BigInteger, BigInteger[]>(c); public void service(ServletRequest req, ServletResponse resp) { try { BigInteger i = extractFromRequest(req); encodeIntoResponse(resp, cache.compute(i)); } catch (InterruptedException e) { encodeError(resp, "factorization interrupted"); } } void encodeIntoResponse(ServletResponse resp, BigInteger[] factors) { } void encodeError(ServletResponse resp, String errorString) { } BigInteger extractFromRequest(ServletRequest req) { return new BigInteger("7"); } BigInteger[] factor(BigInteger i) { // Doesn't really factor return new BigInteger[]{i}; } }
本人博客已搬家,新地址为:http://yidao620c.github.io/
相关推荐
本文将深入探讨"Java并发编程-线程安全与基础构建模块"这一主题,旨在帮助开发者理解如何有效地处理并发问题,提高程序性能和稳定性。 首先,线程安全是并发编程中的核心概念,指的是多个线程访问同一资源时,无论...
第5章 基础构建模块 5.1 同步容器类 5.1.1 同步容器类的问题 5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 ...
《java并发编程实战》读书笔记-第3章-对象的共享,脑图形式,使用xmind8制作 包括同步容器类、并发容器类、阻塞队列和生产者消费者模式、阻塞和中断方法、同步工具类。最后是构建高效且可伸缩的结果缓存
第5章 基础构建模块 5.1 同步容器类 5.1.1 同步容器类的问题 5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 ...
Java编程模式与范例:基础开发技巧是针对Java开发者的一本深入学习指南,旨在提升开发者在实际编程中的技能和效率。这本书涵盖了多种重要的编程模式和最佳实践,为初学者和有经验的Java开发者提供了丰富的知识资源。...
Java并发编程实战,第1章 简介,第2章 线程安全性 第3章 对象的共享 第4章 对象的组合 第5章 基础构建模块 第6章 任务执行 第7章 取消与关闭 第8章 线程池的使用 第9章 图形用户界面应用程序 第10章 避免...
Java并发编程实战是针对Java并发编程的详细指南,随着多核处理器的普及,多线程并发编程成为了提升应用程序性能的关键技术之一。从Java 5和Java 6版本开始,Java在并发编程方面取得了显著的进步,不仅改进了Java...
《Java并发编程实战》个人读书笔记,非常详细: 1 简介 2 线程安全性 3 对象的共享 4 对象的组合 5 基础构建模块 6 任务执行 7 取消与关闭 8 线程池的使用 9 图形用户界面应用程序 10 避免活跃性危险 11 性能与可...
通过学习《Java完美编程(第三版)》的这些章节,读者将能够构建坚实的基础,掌握从基础到高级的Java编程技能,并有能力解决实际开发中的各种问题。这本书对于初学者和有经验的开发者都是宝贵的资源,帮助他们在Java...
在“JAVA 网络编程之QQ 项目编程实训”中,我们主要关注的是使用Java语言进行网络编程,特别是构建类似QQ这样的实时通讯系统。这个实训项目是学习网络编程的一个优秀起点,它涵盖了多种关键技术和概念,对于初学者...
学习这门课程将帮助你构建坚实的Java编程基础,掌握核心概念和技术,为将来进一步深入学习Java技术和框架,乃至整个软件开发领域打下坚实的基础。同时,了解Java的发展历程也有助于理解其设计理念和在技术领域的影响...
16. **并发编程**:Java并发库中的并发工具类,如CountDownLatch、Semaphore、CyclicBarrier等。 17. **Lambda表达式**:Java 8引入的新特性,简化函数式编程。 18. **Stream API**:处理集合的新方式,提供链式...
《Java编程宝典:十年典藏版6》是一本深入探讨Java编程技术的权威著作,旨在为读者提供全面、深入...这本典藏版不仅包含了传统的Java编程知识,还反映了近年来Java技术的发展趋势,是每个Java开发者书架上的必备之作。
11. **模块化系统**:学习Java 9引入的模块系统,理解模块的定义、依赖关系和模块化项目的构建。 12. **Lambda表达式和函数式编程**:熟悉Java 8引入的Lambda表达式的语法,以及如何使用Stream API进行函数式编程。...
《JAVA并发编程实践》这本书深入浅出地讲解了Java并发工具类、线程池、锁机制以及并发容器等内容,帮助开发者规避并发陷阱,提升系统性能。 Spring框架是Java企业级应用的事实标准,其源码解析有助于开发者更深入地...
1. **基础模块**:这些项目通常涉及Java基础语法、面向对象编程(OOP)概念,如类、对象、封装、继承和多态。学习者可以在此基础上巩固Java的基础知识。 2. **数据结构与算法**:可能包含链表、栈、队列、树、图等...
《Java核心技术 卷1 基础知识 原书第8版》是Java开发者必读的经典之作,全面深入地讲解了Java编程的基础概念和技术。这本书涵盖了从Java语言的语法到面向对象编程的核心原理,再到类库的使用,是学习Java开发的权威...