Java中简单实现多线程调度时的可取消和显示进度
一个简单的多线程调度实现,统一开始,为了使得所有线程统一开始,类似运动员在听到发令枪时一起进行,使用了CountDownLatch进行控制。
CountDownLatch beginLatch = new CountDownLatch(1); CountDownLatch endLatch = new CountDownLatch(personCount);
主线程建立线程池,并进行调度,由于要在最后进行汇总结果,使用了FutureTask
List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>(); for (int i = 0; i < personCount; i++) { futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch,i))); } ExecutorService execService = Executors.newFixedThreadPool(threadCount); for (FutureTask<String> futureTask : futureTaskList) { execService.execute(futureTask); } beginLatch.countDown();
这样所有线程就会统一开始执行,执行完成后,汇总结果,并关闭线程池。
endLatch.await(); System.out.println("--------------"); for (FutureTask<String> futureTask : futureTaskList) { System.out.println(futureTask.get()); } execService.shutdown();
对于每个线程的执行,都需要共享变量beginLatch和endLatch,各线程代码:
public class ExecuteCallable implements Callable<String> { private int id; private CountDownLatch beginLatch; private CountDownLatch endLatch; public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch, Exchanger<Integer> exchanger, int id, ConcurrentTaskExecutor concurrentTaskExecutor) { this.beginLatch = beginLatch; this.endLatch = endLatch; this.id = id; } @Override public String call() throws Exception { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); endLatch.countDown(); return result; } }
每个线程在开始等待发令枪(beginLatch),随机等待一段时间(模拟执行时间),最后通知endLatch减一(执行完毕通知),并返回结果。
到这里只是一个简单的实现,我们并不能在主线程中实时了解各线程的执行情况,除非到了所有线程执行完毕(endLatch解除阻塞状态)。这时候我们使用Exchanger机制来进行线程之间数据的交换,在每个线程执行完成后,将其完成的数据量传给主线程进行刷新(模拟进度条工作)。
主线程ConcurrentTaskExecutor类中:
Exchanger<Integer> exchanger = new Exchanger<Integer>(); beginLatch.countDown(); Integer totalResult = Integer.valueOf(0); for (int i = 0; i < personCount; i++) { Integer partialResult = exchanger.exchange(Integer.valueOf(0)); if(partialResult != 0){ totalResult = totalResult + partialResult; System.out.println(String.format("Progress: %s/%s", totalResult, personCount)); } } endLatch.await();
线程类ExecuteCallable构造函数加入exchanger
@Override public String call() throws Exception { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); exchanger.exchange(1); endLatch.countDown(); return result; }
在执行完成进行数据交换,返回本次执行进度给主线程(当前默认设置成1,可修改),主线程在所有线程执行完成前,endLatch.await()必定是阻塞状态的,这样主线程就能实时拿到子线程执行完成的进度数据。
下面我们再加入一个可以取消的功能,加入系统随机在某个时间点进行取消操作,那么开始执行的线程是无法进行实时响应了,只能等待当前操作执行完毕;如果线程还没有开始执行,那么就取消其行为。
更改的ExecuteCallable执行方法如下:
@Override public String call() throws Exception { beginLatch.await(); if(concurrentTaskExecutor.isCanceled()){ endLatch.countDown(); exchanger.exchange(0); return String.format("Player :%s is given up", id); } long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); exchanger.exchange(1); endLatch.countDown(); return result; }
其中concurrentTaskExecutor类中加入一个类型为boolean的canceled变量,注意这个变量必须是volatile的,以便能够在线程间共享数据,并且该变量的setter和getter方法也是原子性的。
我们的取消操作不能放在主线程中操作,需要额外建立一个线程,并且这个线程也不能通过线程池进行调度,新建的InterruptRunnable类:
public class InterruptRunnable implements Runnable { private CountDownLatch beginLatch; private ConcurrentTaskExecutor concurrentTaskExecutor; public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) { this.beginLatch = beginLatch; this.concurrentTaskExecutor = currConcurrentTaskExecutor; } @Override public void run() { try { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); System.out.println(String.format("System need sleep %s millis", millis)); Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } concurrentTaskExecutor.setCanceled(true); } }
更改后的ConcurrentTaskExecutor,在执行发令前,先让该中断线程启动,以便一起等待开始命令:
new Thread(new InterruptRunnable(this, beginLatch)).start(); beginLatch.countDown();
最后执行结果(取决于中断线程的随机时间长短):
System need sleep 2920 millis Player :4 arrived, use 917 millis Progress: 1/10 Player :5 arrived, use 1076 millis Progress: 2/10 Player :3 arrived, use 2718 millis Progress: 3/10 Player :1 arrived, use 4013 millis Progress: 4/10 Player :0 arrived, use 8541 millis Progress: 5/10 Player :2 arrived, use 8570 millis Progress: 6/10 Player :6 arrived, use 7261 millis Progress: 7/10 Player :7 arrived, use 7015 millis Progress: 8/10 -------------- Player :0 arrived, use 8541 millis Player :1 arrived, use 4013 millis Player :2 arrived, use 8570 millis Player :3 arrived, use 2718 millis Player :4 arrived, use 917 millis Player :5 arrived, use 1076 millis Player :6 arrived, use 7261 millis Player :7 arrived, use 7015 millis Player :8 is given up Player :9 is given up
最后,附上最终的程序代码
ConcurrentTaskExecutor:
public class ConcurrentTaskExecutor { private volatile boolean canceled = false; public void executeTask() throws Exception { int personCount = 10; int threadCount = 5; CountDownLatch beginLatch = new CountDownLatch(1); CountDownLatch endLatch = new CountDownLatch(personCount); Exchanger<Integer> exchanger = new Exchanger<Integer>(); List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>(); for (int i = 0; i < personCount; i++) { futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch, exchanger, i, this))); } ExecutorService execService = Executors.newFixedThreadPool(threadCount); for (FutureTask<String> futureTask : futureTaskList) { execService.execute(futureTask); } new Thread(new InterruptRunnable(this, beginLatch)).start(); beginLatch.countDown(); Integer totalResult = Integer.valueOf(0); for (int i = 0; i < personCount; i++) { Integer partialResult = exchanger.exchange(Integer.valueOf(0)); if(partialResult != 0){ totalResult = totalResult + partialResult; System.out.println(String.format("Progress: %s/%s", totalResult, personCount)); } } endLatch.await(); System.out.println("--------------"); for (FutureTask<String> futureTask : futureTaskList) { System.out.println(futureTask.get()); } execService.shutdown(); } public boolean isCanceled() { return canceled; } public void setCanceled(boolean canceled){ this.canceled = canceled; } public static void main(String[] args) throws Exception { ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(); executor.executeTask(); } }
ExecuteCallable
public class ExecuteCallable implements Callable<String> { private int id; private CountDownLatch beginLatch; private CountDownLatch endLatch; private Exchanger<Integer> exchanger; private ConcurrentTaskExecutor concurrentTaskExecutor; public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch, Exchanger<Integer> exchanger, int id, ConcurrentTaskExecutor concurrentTaskExecutor) { this.beginLatch = beginLatch; this.endLatch = endLatch; this.exchanger = exchanger; this.id = id; this.concurrentTaskExecutor = concurrentTaskExecutor; } @Override public String call() throws Exception { beginLatch.await(); if(concurrentTaskExecutor.isCanceled()){ endLatch.countDown(); exchanger.exchange(0); return String.format("Player :%s is given up", id); } long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); exchanger.exchange(1); endLatch.countDown(); return result; } }
InterruptRunnable
public class InterruptRunnable implements Runnable { private CountDownLatch beginLatch; private ConcurrentTaskExecutor concurrentTaskExecutor; public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) { this.beginLatch = beginLatch; this.concurrentTaskExecutor = currConcurrentTaskExecutor; } @Override public void run() { try { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); System.out.println(String.format("System need sleep %s millis", millis)); Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } concurrentTaskExecutor.setCanceled(true); } }
相关推荐
Java多线程下载器是一种利用Java编程语言实现的高效文件下载工具,它通过将大文件分割成多个小部分,然后创建多个线程同时下载这些部分,以提高下载速度。这种技术在处理大文件或者网络带宽有限的情况下尤其有用,...
在这个场景中,"java多线程下载图片"意味着我们将探讨如何使用Java来实现一个能够异步下载多个图片的系统。 首先,我们需要理解Java中的线程是如何创建和运行的。Java提供了两种创建线程的方式:继承Thread类和实现...
Java多线程文件下载是一种高效的下载策略,它通过将大文件分割成多个部分,然后创建多个线程分别下载这些部分,来实现并行下载。这种技术可以显著提高下载速度,尤其是在网络条件不稳定或者带宽有限的情况下。下面...
总之,"Java多线程下载网络图片"这个案例涵盖了多线程编程的多个核心知识点,包括线程创建、同步机制、线程池管理和异常处理等。通过实践这些技术,开发者可以编写出更加高效、稳定、健壮的并发程序。
Java多线程下载是利用Java编程语言实现的一种高效下载大文件的技术。在传统的单线程下载方式中,如果网络环境不稳定或文件较大,下载过程可能会很慢,甚至中断。而多线程下载则是将文件分割成多个部分,每个部分由一...
实现Java多线程断点下载的步骤如下: 1. **检查已下载部分**:首先,我们需要读取本地已有的文件,获取其当前大小,这将作为断点续传的起点。 2. **创建线程池**:使用`ExecutorService`创建一个线程池,线程数量...
在“WHUT-java多线程实验-第三周-文件上传和下载.zip”这个实验中,我们将重点探讨如何在多线程环境中实现文件的上传和下载功能。这个实验基于IntelliJ IDEA开发环境,它是一个流行的Java集成开发环境,提供了丰富的...
Java多线程断点下载文件是一种高效的文件下载方式,它允许在下载过程中暂停并从上次停止的地方继续,尤其适用于大...在实际应用中,上述知识点结合具体的代码实现,可以构建一个功能完善的Java多线程断点下载文件系统。
Java多线程下载是一种高效的文件下载技术,它利用了Java多线程编程的优势来提高文件下载的速度和效率。在单线程下载中,文件的下载是连续的,而在多线程下载中,文件被分割成多个部分,每个部分由一个独立的线程负责...
本文将深入探讨如何使用Java实现多线程下载,并通过"Java实现多线程下载源代码"这一主题,详细解析其背后的原理和实践方法。 首先,我们要理解什么是多线程。在单线程环境下,程序执行是顺序的,一次只能执行一个...
在Java编程中,实现HTTP多线程下载是一项常见的任务,特别是在处理大文件或者需要提高下载速度的情况下。这个过程涉及到并发编程、网络I/O以及文件操作等多个领域的知识。下面,我们将详细探讨如何使用Java来实现这...
Java多线程下载工具是一种利用Java编程语言实现的软件,它可以将大文件分割成多个部分并行下载,从而显著提高下载速度。这种技术是通过利用Java的多线程特性来实现的,对于处理网络资源的大量请求,尤其是在带宽有限...
4. `java多线程下载程序.txt`和`用Java设计下载软件.txt`:这两个文本文件可能是项目文档,包含了关于如何设计和实现多线程下载程序的详细说明,包括设计理念、实现方法以及可能遇到的问题和解决方案。 在Java中...
Java Swing多线程下载器是一种利用Java Swing库构建的图形用户界面(GUI)应用程序,它具备多线程下载功能,并支持断点续传。这样的工具类似于我们熟知的迅雷下载管理器,允许用户同时下载多个文件,提高下载速度,...
Java多线程端点续传下载(MultiDown)是一种高效的文件下载技术,它结合了多线程和断点续传的概念,以优化网络资源利用,提高大文件下载速度和可靠性。在Java中实现这样的功能,我们需要理解以下几个核心知识点: 1...
总结来说,Java多线程下载实例涉及了Java线程编程、HTTP请求、文件操作以及错误处理等多个方面的知识。通过合理地划分任务、使用线程池和`Future`对象,我们可以构建一个高效、可靠的多线程下载程序。
在 Java 中实现多线程下载,可以使用 `java.util.concurrent` 包中的 `ExecutorService` 和 `Future` 类。每个下载任务作为一个线程,通过 `ExecutorService` 创建和管理,`Future` 对象则用于监控任务状态,如是否...
Java多线程下载工具是一种利用Java编程语言实现的高效文件下载程序,它通过并发执行多个下载任务来提高文件下载速度。这种技术的核心是利用Java的多线程特性,将一个大文件分割成多个小部分,然后同时下载这些部分,...
虽然`DownThreadsUitl.java`并未直接涉及,但实际的多线程下载工具通常会有一个用户界面,展示下载进度、速度等信息,允许用户暂停、恢复或取消下载。 9. **配置参数**: 允许用户自定义下载线程数和下载目录,...
为了解决这个问题,JavaFX提供了对Java多线程特性的良好支持,使我们能够在后台线程中执行这些任务,同时保持UI的流畅和响应。本篇文章将深入探讨如何在JavaFX中通过多线程实现界面实时刷新。 首先,我们需要理解...