`
brandNewUser
  • 浏览: 456070 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java多线程简单实现取消和进度

阅读更多

Java中简单实现多线程调度时的可取消和显示进度

 

一个简单的多线程调度实现,统一开始,为了使得所有线程统一开始,类似运动员在听到发令枪时一起进行,使用了CountDownLatch进行控制。

CountDownLatch beginLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(personCount);

主线程建立线程池,并进行调度,由于要在最后进行汇总结果,使用了FutureTask

List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>();
for (int i = 0; i < personCount; i++) {
	futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch,i)));
}

ExecutorService execService = Executors.newFixedThreadPool(threadCount);
	for (FutureTask<String> futureTask : futureTaskList) {
		execService.execute(futureTask);
	}
beginLatch.countDown();

 这样所有线程就会统一开始执行,执行完成后,汇总结果,并关闭线程池。

 

endLatch.await();
System.out.println("--------------");
for (FutureTask<String> futureTask : futureTaskList) {
	System.out.println(futureTask.get());
}
execService.shutdown();

 对于每个线程的执行,都需要共享变量beginLatch和endLatch,各线程代码:

public class ExecuteCallable implements Callable<String> {

	private int id;
	private CountDownLatch beginLatch;
	private CountDownLatch endLatch;

	public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch,
			Exchanger<Integer> exchanger, int id,
			ConcurrentTaskExecutor concurrentTaskExecutor) {
		this.beginLatch = beginLatch;
		this.endLatch = endLatch;
		this.id = id;
	}

	@Override
	public String call() throws Exception {
		beginLatch.await();
		long millis = (long) (Math.random() * 10 * 1000);
		String result = String.format("Player :%s arrived, use %s millis", id, millis);
		Thread.sleep(millis);
		System.out.println(result);
		endLatch.countDown();
		return result;
	}

}

 每个线程在开始等待发令枪(beginLatch),随机等待一段时间(模拟执行时间),最后通知endLatch减一(执行完毕通知),并返回结果。

到这里只是一个简单的实现,我们并不能在主线程中实时了解各线程的执行情况,除非到了所有线程执行完毕(endLatch解除阻塞状态)。这时候我们使用Exchanger机制来进行线程之间数据的交换,在每个线程执行完成后,将其完成的数据量传给主线程进行刷新(模拟进度条工作)。

主线程ConcurrentTaskExecutor类中:

Exchanger<Integer> exchanger = new Exchanger<Integer>();
beginLatch.countDown();

Integer totalResult = Integer.valueOf(0);
for (int i = 0; i < personCount; i++) {
	Integer partialResult = exchanger.exchange(Integer.valueOf(0));
        if(partialResult != 0){
	        totalResult = totalResult + partialResult;
		System.out.println(String.format("Progress: %s/%s", totalResult, personCount));
	}
}

endLatch.await();

 线程类ExecuteCallable构造函数加入exchanger

@Override
public String call() throws Exception {
        beginLatch.await();
	long millis = (long) (Math.random() * 10 * 1000);
	String result = String.format("Player :%s arrived, use %s millis", id, millis);
	Thread.sleep(millis);
	System.out.println(result);
	exchanger.exchange(1);
	endLatch.countDown();
	return result;
}

 在执行完成进行数据交换,返回本次执行进度给主线程(当前默认设置成1,可修改),主线程在所有线程执行完成前,endLatch.await()必定是阻塞状态的,这样主线程就能实时拿到子线程执行完成的进度数据。

 

下面我们再加入一个可以取消的功能,加入系统随机在某个时间点进行取消操作,那么开始执行的线程是无法进行实时响应了,只能等待当前操作执行完毕;如果线程还没有开始执行,那么就取消其行为。

更改的ExecuteCallable执行方法如下:

@Override
public String call() throws Exception {
	beginLatch.await();
	if(concurrentTaskExecutor.isCanceled()){
		endLatch.countDown();
		exchanger.exchange(0);
		return String.format("Player :%s is given up", id);
	}
	long millis = (long) (Math.random() * 10 * 1000);
	String result = String.format("Player :%s arrived, use %s millis", id, millis);
	Thread.sleep(millis);
	System.out.println(result);
	exchanger.exchange(1);
	endLatch.countDown();
	return result;
}

 其中concurrentTaskExecutor类中加入一个类型为boolean的canceled变量,注意这个变量必须是volatile的,以便能够在线程间共享数据,并且该变量的setter和getter方法也是原子性的。

我们的取消操作不能放在主线程中操作,需要额外建立一个线程,并且这个线程也不能通过线程池进行调度,新建的InterruptRunnable类:

public class InterruptRunnable implements Runnable {

	private CountDownLatch beginLatch;
	private ConcurrentTaskExecutor concurrentTaskExecutor;

	public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
		this.beginLatch = beginLatch;
		this.concurrentTaskExecutor = currConcurrentTaskExecutor;
	}

	@Override
	public void run() {
		try {
			beginLatch.await();
			long millis = (long) (Math.random() * 10 * 1000);
			System.out.println(String.format("System need sleep %s millis", millis));
			Thread.sleep(millis);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		concurrentTaskExecutor.setCanceled(true);
	}

}

 更改后的ConcurrentTaskExecutor,在执行发令前,先让该中断线程启动,以便一起等待开始命令:

new Thread(new InterruptRunnable(this, beginLatch)).start();

beginLatch.countDown();

 最后执行结果(取决于中断线程的随机时间长短):

System need sleep 2920 millis
Player :4 arrived, use 917 millis
Progress: 1/10
Player :5 arrived, use 1076 millis
Progress: 2/10
Player :3 arrived, use 2718 millis
Progress: 3/10
Player :1 arrived, use 4013 millis
Progress: 4/10
Player :0 arrived, use 8541 millis
Progress: 5/10
Player :2 arrived, use 8570 millis
Progress: 6/10
Player :6 arrived, use 7261 millis
Progress: 7/10
Player :7 arrived, use 7015 millis
Progress: 8/10
--------------
Player :0 arrived, use 8541 millis
Player :1 arrived, use 4013 millis
Player :2 arrived, use 8570 millis
Player :3 arrived, use 2718 millis
Player :4 arrived, use 917 millis
Player :5 arrived, use 1076 millis
Player :6 arrived, use 7261 millis
Player :7 arrived, use 7015 millis
Player :8 is given up
Player :9 is given up

 

最后,附上最终的程序代码

ConcurrentTaskExecutor:

public class ConcurrentTaskExecutor {

	private volatile boolean canceled = false;

	public void executeTask() throws Exception {
		int personCount = 10;
		int threadCount = 5;

		CountDownLatch beginLatch = new CountDownLatch(1);
		CountDownLatch endLatch = new CountDownLatch(personCount);
		Exchanger<Integer> exchanger = new Exchanger<Integer>();

		List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>();
		for (int i = 0; i < personCount; i++) {
			futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch, exchanger, i, this)));
		}

		ExecutorService execService = Executors.newFixedThreadPool(threadCount);
		for (FutureTask<String> futureTask : futureTaskList) {
			execService.execute(futureTask);
		}
		
		new Thread(new InterruptRunnable(this, beginLatch)).start();

		beginLatch.countDown();

		Integer totalResult = Integer.valueOf(0);
		for (int i = 0; i < personCount; i++) {
			Integer partialResult = exchanger.exchange(Integer.valueOf(0));
			if(partialResult != 0){
				totalResult = totalResult + partialResult;
				System.out.println(String.format("Progress: %s/%s", totalResult, personCount));
			}
		}

		endLatch.await();
		System.out.println("--------------");
		for (FutureTask<String> futureTask : futureTaskList) {
			System.out.println(futureTask.get());
		}
		execService.shutdown();
	}

	public boolean isCanceled() {
		return canceled;
	}
	
	public void setCanceled(boolean canceled){
		this.canceled = canceled;
	}

	public static void main(String[] args) throws Exception {
		ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
		executor.executeTask();
	}

}

 ExecuteCallable

public class ExecuteCallable implements Callable<String> {

	private int id;
	private CountDownLatch beginLatch;
	private CountDownLatch endLatch;
	private Exchanger<Integer> exchanger;
	private ConcurrentTaskExecutor concurrentTaskExecutor;

	public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch,
			Exchanger<Integer> exchanger, int id,
			ConcurrentTaskExecutor concurrentTaskExecutor) {
		this.beginLatch = beginLatch;
		this.endLatch = endLatch;
		this.exchanger = exchanger;
		this.id = id;
		this.concurrentTaskExecutor = concurrentTaskExecutor;
	}

	@Override
	public String call() throws Exception {
		beginLatch.await();
		if(concurrentTaskExecutor.isCanceled()){
			endLatch.countDown();
			exchanger.exchange(0);
			return String.format("Player :%s is given up", id);
		}
		long millis = (long) (Math.random() * 10 * 1000);
		String result = String.format("Player :%s arrived, use %s millis", id, millis);
		Thread.sleep(millis);
		System.out.println(result);
		exchanger.exchange(1);
		endLatch.countDown();
		return result;
	}

}

 InterruptRunnable

public class InterruptRunnable implements Runnable {

	private CountDownLatch beginLatch;
	private ConcurrentTaskExecutor concurrentTaskExecutor;

	public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
		this.beginLatch = beginLatch;
		this.concurrentTaskExecutor = currConcurrentTaskExecutor;
	}

	@Override
	public void run() {
		try {
			beginLatch.await();
			long millis = (long) (Math.random() * 10 * 1000);
			System.out.println(String.format("System need sleep %s millis", millis));
			Thread.sleep(millis);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		concurrentTaskExecutor.setCanceled(true);
	}

}

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    java多线程下载器

    Java多线程下载器是一种利用Java编程语言实现的高效文件下载工具,它通过将大文件分割成多个小部分,然后创建多个线程同时下载这些部分,以提高下载速度。这种技术在处理大文件或者网络带宽有限的情况下尤其有用,...

    java多线程下载图片

    在这个场景中,"java多线程下载图片"意味着我们将探讨如何使用Java来实现一个能够异步下载多个图片的系统。 首先,我们需要理解Java中的线程是如何创建和运行的。Java提供了两种创建线程的方式:继承Thread类和实现...

    Java多线程文件下载

    Java多线程文件下载是一种高效的下载策略,它通过将大文件分割成多个部分,然后创建多个线程分别下载这些部分,来实现并行下载。这种技术可以显著提高下载速度,尤其是在网络条件不稳定或者带宽有限的情况下。下面...

    Java多线程下载网络图片

    总之,"Java多线程下载网络图片"这个案例涵盖了多线程编程的多个核心知识点,包括线程创建、同步机制、线程池管理和异常处理等。通过实践这些技术,开发者可以编写出更加高效、稳定、健壮的并发程序。

    java多线程下载源代码

    Java多线程下载是利用Java编程语言实现的一种高效下载大文件的技术。在传统的单线程下载方式中,如果网络环境不稳定或文件较大,下载过程可能会很慢,甚至中断。而多线程下载则是将文件分割成多个部分,每个部分由一...

    java多线程断点下载

    实现Java多线程断点下载的步骤如下: 1. **检查已下载部分**:首先,我们需要读取本地已有的文件,获取其当前大小,这将作为断点续传的起点。 2. **创建线程池**:使用`ExecutorService`创建一个线程池,线程数量...

    WHUT-java多线程实验-第三周-文件上传和下载.zip

    在“WHUT-java多线程实验-第三周-文件上传和下载.zip”这个实验中,我们将重点探讨如何在多线程环境中实现文件的上传和下载功能。这个实验基于IntelliJ IDEA开发环境,它是一个流行的Java集成开发环境,提供了丰富的...

    Java 多线程断点下载文件

    Java多线程断点下载文件是一种高效的文件下载方式,它允许在下载过程中暂停并从上次停止的地方继续,尤其适用于大...在实际应用中,上述知识点结合具体的代码实现,可以构建一个功能完善的Java多线程断点下载文件系统。

    java多线程下载

    Java多线程下载是一种高效的文件下载技术,它利用了Java多线程编程的优势来提高文件下载的速度和效率。在单线程下载中,文件的下载是连续的,而在多线程下载中,文件被分割成多个部分,每个部分由一个独立的线程负责...

    Java实现多线程下载源代码

    本文将深入探讨如何使用Java实现多线程下载,并通过"Java实现多线程下载源代码"这一主题,详细解析其背后的原理和实践方法。 首先,我们要理解什么是多线程。在单线程环境下,程序执行是顺序的,一次只能执行一个...

    使用java实现http多线程下载

    在Java编程中,实现HTTP多线程下载是一项常见的任务,特别是在处理大文件或者需要提高下载速度的情况下。这个过程涉及到并发编程、网络I/O以及文件操作等多个领域的知识。下面,我们将详细探讨如何使用Java来实现这...

    Java多线程下载工具

    Java多线程下载工具是一种利用Java编程语言实现的软件,它可以将大文件分割成多个部分并行下载,从而显著提高下载速度。这种技术是通过利用Java的多线程特性来实现的,对于处理网络资源的大量请求,尤其是在带宽有限...

    DownloadThread.zip_ DownloadThread_DownloadThread_Java 多线程 下载_ja

    4. `java多线程下载程序.txt`和`用Java设计下载软件.txt`:这两个文本文件可能是项目文档,包含了关于如何设计和实现多线程下载程序的详细说明,包括设计理念、实现方法以及可能遇到的问题和解决方案。 在Java中...

    java Swing 多线程下载器

    Java Swing多线程下载器是一种利用Java Swing库构建的图形用户界面(GUI)应用程序,它具备多线程下载功能,并支持断点续传。这样的工具类似于我们熟知的迅雷下载管理器,允许用户同时下载多个文件,提高下载速度,...

    JAVA多线程端点续传下载MultiDown.

    Java多线程端点续传下载(MultiDown)是一种高效的文件下载技术,它结合了多线程和断点续传的概念,以优化网络资源利用,提高大文件下载速度和可靠性。在Java中实现这样的功能,我们需要理解以下几个核心知识点: 1...

    Java多线程下载 实例

    总结来说,Java多线程下载实例涉及了Java线程编程、HTTP请求、文件操作以及错误处理等多个方面的知识。通过合理地划分任务、使用线程池和`Future`对象,我们可以构建一个高效、可靠的多线程下载程序。

    java多线程多功能下载工具

    Java多线程下载工具是一种利用Java编程语言实现的高效文件下载程序,它通过并发执行多个下载任务来提高文件下载速度。这种技术的核心是利用Java的多线程特性,将一个大文件分割成多个小部分,然后同时下载这些部分,...

    DownTheradsUitl.zip_java 多线程下载

    虽然`DownThreadsUitl.java`并未直接涉及,但实际的多线程下载工具通常会有一个用户界面,展示下载进度、速度等信息,允许用户暂停、恢复或取消下载。 9. **配置参数**: 允许用户自定义下载线程数和下载目录,...

    javafx多线程实现界面实时刷新

    为了解决这个问题,JavaFX提供了对Java多线程特性的良好支持,使我们能够在后台线程中执行这些任务,同时保持UI的流畅和响应。本篇文章将深入探讨如何在JavaFX中通过多线程实现界面实时刷新。 首先,我们需要理解...

Global site tag (gtag.js) - Google Analytics