闭锁/栅栏/信号量/FutureTask分析及使用
1、闭锁
用途:可用于命令一组线程在同一个时刻开始执行某个任务,或者等待一组相关的操作结束。尤其适合计算并发执行某个任务的耗时。
public class CountDownLatchTest { public void timeTasks(int nThreads, final Runnable task) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for(int i = 0; i < nThreads; i++){ Thread t = new Thread(){ public void run(){ try{ startGate.await(); try{ task.run(); }finally{ endGate.countDown(); } }catch(InterruptedException ignored){ } } }; t.start(); } long start = System.nanoTime(); System.out.println("打开闭锁"); startGate.countDown(); endGate.await(); long end = System.nanoTime(); System.out.println("闭锁退出,共耗时" + (end-start)); } public static void main(String[] args) throws InterruptedException{ CountDownLatchTest test = new CountDownLatchTest(); test.timeTasks(5, test.new RunnableTask()); } class RunnableTask implements Runnable{ @Override public void run() { System.out.println("当前线程为:" + Thread.currentThread().getName()); } }
执行结果为: 打开闭锁 当前线程为:Thread-0 当前线程为:Thread-3 当前线程为:Thread-2 当前线程为:Thread-4 当前线程为:Thread-1 闭锁退出,共耗时1109195
2、栅栏
用途:用于阻塞一组线程直到某个事件发生。所有线程必须同时到达栅栏位置才能继续执行下一步操作,且能够被重置以达到重复利用。而闭锁式一次性对象,一旦进入终止状态,就不能被重置
public class CyclicBarrierTest { private final CyclicBarrier barrier; private final Worker[] workers; public CyclicBarrierTest(){ int count = Runtime.getRuntime().availableProcessors(); this.barrier = new CyclicBarrier(count, new Runnable(){ @Override public void run() { System.out.println("所有线程均到达栅栏位置,开始下一轮计算"); } }); this.workers = new Worker[count]; for(int i = 0; i< count;i++){ workers[i] = new Worker(i); } } private class Worker implements Runnable{ int i; public Worker(int i){ this.i = i; } @Override public void run() { for(int index = 1; index < 3;index++){ System.out.println("线程" + i + "第" + index + "次到达栅栏位置,等待其他线程到达"); try { //注意是await,而不是wait barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); return; } catch (BrokenBarrierException e) { e.printStackTrace(); return; } } } } public void start(){ for(int i=0;i<workers.length;i++){ new Thread(workers[i]).start(); } } public static void main(String[] args){ new CyclicBarrierTest().start(); } }
执行结果为: 线程0第1次到达栅栏位置,等待其他线程到达 线程1第1次到达栅栏位置,等待其他线程到达 线程2第1次到达栅栏位置,等待其他线程到达 线程3第1次到达栅栏位置,等待其他线程到达 所有线程均到达栅栏位置,开始下一轮计算 线程3第2次到达栅栏位置,等待其他线程到达 线程2第2次到达栅栏位置,等待其他线程到达 线程0第2次到达栅栏位置,等待其他线程到达 线程1第2次到达栅栏位置,等待其他线程到达 所有线程均到达栅栏位置,开始下一轮计算
3、信号量
用途:用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量可以用来实现某种资源池,或者对容器施加边界。
public class SemaphoreTest<T> { private final Set<T> set; private final Semaphore sema; public SemaphoreTest(int bound){ this.set = Collections.synchronizedSet(new HashSet<T>()); this.sema = new Semaphore(bound); } public boolean add(T o) throws InterruptedException{ sema.acquire(); boolean wasAdded = false; try{ wasAdded = set.add(o); return wasAdded; }finally{ if(!wasAdded){ sema.release(); } } } public boolean remove(T o){ boolean wasRemoved = set.remove(o); if(wasRemoved){ sema.release(); } return wasRemoved; } public static void main(String[] args) throws InterruptedException{ int permits = 5; int elements = permits + 1; SemaphoreTest<Integer> test = new SemaphoreTest<Integer>(permits); for(int i = 0;i < elements; i++){ test.add(i); } } }
输出结果:由于实际待添加的元素个数大于信号量所允许的数量,因此最后一次添加时,会一直阻塞。
4、巧用FutureTask缓存计算过程
当一个计算的代价比较高,譬如比较耗时,或者耗资源,为了避免重复计算带来的浪费,当第一次计算后,通常会将结果缓存起来。比较常见的方式就是使用synchronized进行同步,但该方式带来的代价是被同步的代码只能被串行执行,如果有多个线程在排队对待计算结果,那么针对最后一个线程的计算时间可能比没有使用缓存的时间会更长。
第二种方式是采用ConcurrentHashMap,但对于耗时比较长的计算过程来说,该方式也存在一个漏洞。如果在第一个线程正在计算的过程中,第二个线程开始获取结果,会发现缓存里没有缓存结果,因此第二个线程又启动了同样的计算,这样就导致重复计算,违背了缓存的初衷。计算过程越长,则出现这种重复计算的几率就会越大。
通过第二种方式的缺点分析,得知真正要缓存的应该是计算是否已被启动,而不是等待漫长的计算过程结束后,再缓存结果。一旦从缓存中得知某个计算过程已被其他线程启动,则当前线程不需要再重新启动计算,只需要阻塞等待计算结果的返回。FutureTask就是实现该功能的最佳选择。
public class Memoizer<A, V> implements Computable<A, V> { private ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer(Computable<A, V> c) { this.c = c; } @Override public V compute(final A a) { while (true) { Future<V> f = cache.get(a); if (null == f) { Callable<V> eval = new Callable<V>() { @Override public V call() throws Exception { return c.compute(a); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(a, ft); if (null == f) { f = ft; ft.run(); } } try { return f.get(); } catch (InterruptedException e) { e.printStackTrace(); cache.remove(a, f); } catch (ExecutionException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException, ExecutionException { Computable<Integer, String> c = new Computable<Integer, String>() { @Override public String compute(Integer a) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String result = "由线程:" + Thread.currentThread().getName() + "计算得到" + a + "的结果"; return result; } }; Memoizer<Integer, String> memoizer = new Memoizer<Integer, String>(c); ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { Task<Integer, String> task = new Task<Integer, String>(memoizer, 10); String result = executorService.submit(task).get(); System.out.println(result); } executorService.shutdown(); } }
结果如下: 由线程:pool-1-thread-1计算得到10的结果 由线程:pool-1-thread-1计算得到10的结果 由线程:pool-1-thread-1计算得到10的结果 当然如果仅仅只是采用FutureTask,仅仅只是减小了启动同一个计算过程的概率。当两个线程同时经过compute方法时,还是会出现重复启动同一个计算的情况,上面的例子通过结合ConcurrentHashMap 的putIfAbsent方法解决了这个问题
相关推荐
FutureTask用法及使用场景介绍 FutureTask是一种异步获取执行结果或取消执行任务的机制,它可以用来处理耗时的计算任务,使主线程不需要等待计算结果,而是继续执行其他任务。下面将详细介绍FutureTask的用法和使用...
在本篇文章中,我们将深入探讨`ThreadPoolTaskExecutor`的配置及其使用,并结合`FutureTask`来讨论异步任务处理。 首先,让我们了解`ThreadPoolTaskExecutor`的基本配置。在Spring中,我们通常通过在配置文件或Java...
Java FutureTask类使用案例解析 Java FutureTask类是一种异步计算的工具,用于执行长时间的任务并获取结果。它实现了Runnable和Future接口,既可以作为一个Runnable对象提交给Executor执行,也可以作为一个Future...
- `FutureTask`内部使用了一个`volatile`变量`state`来管理任务的状态,确保多线程环境下的可见性和有序性。 - 当`FutureTask`被提交给`Executor`时,`Executor`实际上是在另一个线程中调用`run()`方法。 - 如果...
FutureTask 底层实现分析 FutureTask 是 Java 中的一种非常重要的多线程设计模式,用于异步计算线程之间的结果传递。在 JDK 中,FutureTask 类是 Future 模式的实现,它实现了 Runnable 接口,作为单独的线程运行。...
`Future`、`FutureTask`、`Callable`和`Runnable`是Java并发编程中的核心接口和类,它们在Android开发中同样有着广泛的应用。下面将详细介绍这些概念以及它们如何协同工作。 1. `Runnable`: 这是Java中最基础的多...
主要介绍了futuretask源码分析(推荐),小编觉得还是挺不错的,这里给大家分享下,供各位参考。
Java中的`Future`和`FutureTask`是并发编程中重要的工具,它们允许程序异步执行任务并获取结果。`Future`接口提供了对异步计算结果的访问和控制,而`FutureTask`是`Future`的一个具体实现,它还同时实现了`Runnable`...
在实际开发中,ExecutorService(如ThreadPoolExecutor)经常与Future和FutureTask一起使用,因为ExecutorService可以提交Runnable或Callable任务,并返回Future,从而实现对任务的异步处理和结果获取。例如: ```...
《揭密FutureTask:Java异步编程的核心工具》 在Java并发编程中,FutureTask扮演着至关重要的角色,它是实现异步计算的关键组件。本文将深入探讨FutureTask的原理和用法,帮助开发者更好地理解和利用这个强大的工具...
本文将详细介绍Future和FutureTask的关系、使用和分析。 一、Future介绍 Future位于java.util.concurrent包下,是一个接口,定义了五个方法: 1. `cancel(boolean mayInterruptIfRunning)`: 取消任务,取消成功则...
2、 线程池使用一个线程,执行这个FutureTask任务,线程执行任务过程比较简单,最终会调用Callable.call()或者是Runnable.run()方法,然后得到一个结果,把结果存储在FutureTask实例的outcome属性中,同时把状态修改...
例如,使用latch闭锁、FutureTask、Semaphore计数信号量等来实现线程安全。 latch闭锁是指在多线程环境下,使用闭锁来实现线程安全。例如,在桌⾯搜索时,使用闭锁来实现线程安全,以避免出现死锁、竞态条件等问题...
FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的...
在这个简谈中,我们将深入探讨`FutureTask`的实现原理和使用方式。 首先,`FutureTask`实现了`Runnable`接口,这意味着它可以直接被`Executor`服务执行。同时,它也实现了`Future`接口,提供了检查任务是否完成、...
1. **异步处理**:对于耗时较长的操作,如数据库查询、网络请求等,可以使用`FutureTask`来异步执行这些任务,并在主程序中等待结果返回。 2. **线程池管理**:`FutureTask`可以作为任务提交给线程池执行,利用...
JAVA线程总结,包含线程池,显示使用线程实现异步编程,基于JDK中的Future实现异步编程,JDK中的FutureTask等
在Java并发编程中,通常我们会使用`ExecutorService`来提交任务,而`Future`和`FutureTask`就是与这些任务交互的关键。 首先,`Future`是一个接口,它提供了一种机制来检查异步计算是否完成,以及在计算完成后获取...
Semaphore(信号量,流量控制) ReentrantReadWriteLock (读写锁) BlockingQueue(阻塞队列) 线程池 池化技术 线程池的优势 线程池的特点 线程池三大方法 线程池七大参数 线程池四种拒绝策略 ForkJoin 异步回调 ...
Runnable、Callable、Future、FutureTask有什么关联.docx