`
blueswind8306
  • 浏览: 126123 次
  • 来自: ...
社区版块
存档分类
最新评论

并发编程系列-并行子任务的超时控制

阅读更多
在并发编程中,经常会遇到提交多个子任务并行执行的场景。比如一个中心节点同时派发任务给多个子节点,然后中心节点等待所有子节点完成任务后继续主流程。在这个过程中,主节点需要设置一个最大等待超时,当达到超时时间后不再等待未返回的节点结果,做功能降级处理。

对于这种需求,如果子任务是阻塞执行的,则一般会使用一个线程池来执行子任务,但主任务如何唤醒超时呢?直接想到的方式是主任务在提交完所有子任务后进入一个循环,不断判断所有子任务是否已经完成或者到达超时了,但这种方式会导致主任务线程需要频繁唤醒,加大了上下文切换的开销。并且由于子任务是异步执行的,还需要考虑结果对象的安全发布问题,加大了编码的复杂性。

在j.u.c中有一个CompletionService接口恰好可以实现上述需求,并且避免了上下文切换的开销。其基本思路是用ExecutorCompletionService包装Executor,并在内部使用一个BlockingQueue保存所有已经完成的任务。当主任务调用ExecutorCompletionService.submit方法时包装一个FutureTask的子类对象QueueingFuture并传递给内部Executor,此对象覆盖了FutureTask的done方法。当线程池的Worker线程在任务完成后会回调这个done方法,然后这个方法将已经完成的任务注入到BlockingQueue中去。这样外部只需要调用BlockingQueue的take或poll方法就可以取到完成的任务了。

注意:
  • 由于最先完成的任务会先注入BlockingQueue中,所以主线程中取得的任务结果集是按照完成的先后顺序排序的。
  • 由于使用FutureTask,保证了对象在线程间的安全发布,所以主线程得到的任务结果对象不会出现一致性问题。


以下是我写的一个Demo,参考了《Java并发编程实践》第6章的6.3.6节的程序,并加入了超时等待、以及超时后的通知子线程任务中断的机制:
public class TestCompletionService {

	private static final ExecutorService executor = Executors.newFixedThreadPool(10);
	
	private static final Random r = new Random();	// 每个子任务随机等待一个时间,以模拟子任务的执行时间
	
	private static final int TASK_TIMEOUT = 5;	// 设定最长超时时间为5s
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		
		final long startTime = System.currentTimeMillis();
		final long endTime = startTime + (TASK_TIMEOUT * 1000L);
		
		CompletionService<SubTaskResult> cs = new ExecutorCompletionService<SubTaskResult>(executor);
		List<Future<SubTaskResult>> futureList = new ArrayList<Future<SubTaskResult>>();
		for (int i = 0; i < 10; i++) {
			Future<SubTaskResult> f = cs.submit(new Callable<SubTaskResult>() {
				@Override
				public SubTaskResult call() throws Exception {
					try {
						// 子任务等待一个随机时间。如果这里不是+1而是+2,就可以模拟出现超时的情况
						long waitTime = (r.nextInt(TASK_TIMEOUT) + 1) * 1000L;
						Thread.sleep(waitTime);
						return new SubTaskResult(Thread.currentThread().getName() + 
								", sub thread sleepTime=" + waitTime + "ms");	
					} catch (InterruptedException e) {
						System.out.println(Thread.currentThread().getName() + 
								", catch an interrupted exception, interrupted status=" + Thread.interrupted());
						throw e;
					}
				}
			});
			futureList.add(f);
		}
		
		try {
			for (int i = 0; i < 10; i++) {
				long timeLeft = endTime - System.currentTimeMillis();
				try {
					// timeLeft可能为负数,由于j.u.c中所有负数与0等同,所以不用单独对负数做判断
					Future<SubTaskResult> responseFuture = cs.poll(timeLeft, TimeUnit.MILLISECONDS);
					if (responseFuture == null) {
						throw new TimeoutException("waiting timeout");
					}
					SubTaskResult response = responseFuture.get();
					System.out.println(response.getResult() + ", main thread waitFor: " + timeLeft);
				} catch (InterruptedException e) {
					e.printStackTrace();
					Thread.currentThread().interrupt();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}	
			}			
		} catch (TimeoutException e) {
			// 如果超时则终止所有任务(注意cancel只是调用子线程的interrupt方法,至于能不能中断得看子线程是否支持)
			// 因为对于已经完成的任务调用Future.cancel不起效,所以不需要排除那些已经完成的任务
			for (Future<SubTaskResult> future : futureList) {
				future.cancel(true);
			}
			e.printStackTrace();
		} finally {
			executor.shutdown();
			System.out.println("used: " + (System.currentTimeMillis() - startTime));	
		}
	}

}

class SubTaskResult {
	private final String result;
	public SubTaskResult(String result) {
		this.result = result;
	}
	
	public String getResult() {
		return result;
	}
}
分享到:
评论

相关推荐

    go 并发编程实战-郝林

    《Go并发编程实战》是郝林撰写的一本深入解析Go语言并发编程的书籍,它以其深入浅出的讲解和丰富的实战示例深受程序员喜爱。在Go语言中,并发编程是其核心特性之一,也是实现高性能服务的关键技术。下面将详细讨论这...

    cpp-并行执行http请求支持超时设置

    总的来说,理解和实现"cpp-并行执行http请求支持超时设置"涉及到对网络编程基础、并发控制、超时管理以及HTTP协议的理解。通过这个主题,开发者可以提升其在C/C++环境中编写高效、健壮网络应用的能力。

    JAVA并发编程实践-线程执行-学习笔记

    总的来说,Java并发编程实践中的任务执行是一个涉及线程调度、线程池管理、任务生命周期控制、结果处理等多个方面的复杂主题。理解和掌握这些概念和技术,能够帮助开发者编写出更加高效、可靠的并发程序。

    JAVA并发编程实践-线程池-学习笔记

    Java并发编程实践中的线程池是一个关键的概念,它在多线程编程中扮演着至关重要的角色,有效地管理和调度线程资源,以提高系统的性能和效率。线程池通过复用已存在的线程来减少线程的创建和销毁开销,避免了频繁的上...

    JAVA并发编程艺术pdf版

    《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...

    [中文]Java并发编程的艺术pdf

    《Java并发编程的艺术》这本书是Java开发者深入理解并发编程的重要参考。并发编程是现代多核处理器环境下不可或缺的技术,它能够充分利用系统资源,提高程序的执行效率。以下将详细阐述书中涉及的一些关键知识点。 ...

    多线程编程指南--c版

    《多线程编程指南--C版》是一本深入讲解C语言环境下多线程编程的教程。本书旨在帮助读者理解多线程的概念、工作原理,并通过丰富的示例代码,...通过实践这些示例,你将能够应对各种并发编程场景,提升软件开发能力。

    面试必问并发编程高级面试专题.zip

    并发编程是计算机科学中的一个重要领域,特别是在Java等多线程编程语言中,它涉及到如何在多个执行线程之间共享资源和协调任务。本面试专题主要针对高级并发编程的知识点进行探讨,旨在帮助求职者在面试中展现出深厚...

    Java并发编程全景图.pdf

    Java并发编程是Java语言中最为复杂且重要的部分之一,它涉及了多线程编程、内存模型、同步机制等多个领域。为了深入理解Java并发编程,有必要了解其核心技术点和相关实现原理,以下将详细介绍文件中提及的关键知识点...

    书---Java并发编程的艺术

    在讨论Java并发编程之前,首先需要区分并发与并行的概念。并发是指在同一时间段内同时处理多个任务的能力,而并行则是指在同一时间点同时执行多个任务。在Java中,我们通常所说的多线程编程指的是并发而非并行。 ##...

    Parallel_and_Concurrent_Programming_in_Haskell

    标题《Parallel and Concurrent Programming in Haskell》和描述表明,该文件是一本专注于Haskell语言在并行和并发编程领域的深入探讨。Haskell,作为一门纯粹的、懒惰的函数式编程语言,为开发者提供了强大的抽象...

    并发数据结构与多核编程21-22秋季1

    【并发数据结构与多核编程】是一门深入探讨面向多核系统的程序设计的课程,旨在让学生掌握并发编程的基础原理和实践经验。课程分为理论和实践两个部分,覆盖了从并发编程的基本概念到具体并发数据结构和算法的设计与...

    并发编程面试题(2020最新版)

    并发编程是当今软件开发中不可或缺的一部分,尤其是在多核处理器日益普及的背景下,合理地使用并发编程能够显著提升程序的性能和效率。然而,编写并发程序也带来了诸多挑战,包括线程安全、死锁、线程同步等问题。...

    Java并发编程图册Java并发编程图册

    Java并发编程是Java开发中的重要领域,它涉及到多线程、同步机制、线程池以及并发集合等核心概念。在Java编程中,理解和熟练掌握并发编程可以极大地提高程序的执行效率,同时避免出现数据不一致性和线程安全问题。本...

    java 并发编程教程

    Java并发编程是Java开发中的重要领域,涉及到多线程、内存模型、同步机制等多个核心概念。本教程将深入探讨这些知识点,帮助开发者更好地理解和利用Java进行高效并发编程。 首先,让我们从基础开始,理解Java中的多...

    c++并发编程.rar_C 并发编程_C++并发编程_售票_售票系统_并发

    C++并发编程是现代软件开发中的重要组成部分,它利用多核处理器的能力,使得程序能够同时执行多个任务,提高系统的效率和响应性。在本示例中,我们有一个简单的并发程序,模拟了飞机售票系统,该系统有三个售票终端...

    scala并发编程开发教程

    Scala并发编程是构建高效、可扩展的分布式系统的关键技术,特别是在大数据处理框架如Spark中。Akka是一个用Scala编写的库,它提供了基于Actor模型的并发解决方案,这使得Akka在构建高性能、高可靠的分布式应用中扮演...

Global site tag (gtag.js) - Google Analytics