在并发编程中,经常会遇到提交多个子任务并行执行的场景。比如一个中心节点同时派发任务给多个子节点,然后中心节点等待所有子节点完成任务后继续主流程。在这个过程中,主节点需要设置一个最大等待超时,当达到超时时间后不再等待未返回的节点结果,做功能降级处理。
对于这种需求,如果子任务是阻塞执行的,则一般会使用一个线程池来执行子任务,但主任务如何唤醒超时呢?直接想到的方式是主任务在提交完所有子任务后进入一个循环,不断判断所有子任务是否已经完成或者到达超时了,但这种方式会导致主任务线程需要频繁唤醒,加大了上下文切换的开销。并且由于子任务是异步执行的,还需要考虑结果对象的安全发布问题,加大了编码的复杂性。
在j.u.c中有一个CompletionService接口恰好可以实现上述需求,并且避免了上下文切换的开销。其基本思路是用ExecutorCompletionService包装Executor,并在内部使用一个BlockingQueue保存所有已经完成的任务。当主任务调用ExecutorCompletionService.submit方法时包装一个FutureTask的子类对象QueueingFuture并传递给内部Executor,此对象覆盖了FutureTask的done方法。当线程池的Worker线程在任务完成后会回调这个done方法,然后这个方法将已经完成的任务注入到BlockingQueue中去。这样外部只需要调用BlockingQueue的take或poll方法就可以取到完成的任务了。
注意:
- 由于最先完成的任务会先注入BlockingQueue中,所以主线程中取得的任务结果集是按照完成的先后顺序排序的。
- 由于使用FutureTask,保证了对象在线程间的安全发布,所以主线程得到的任务结果对象不会出现一致性问题。
以下是我写的一个Demo,参考了《Java并发编程实践》第6章的6.3.6节的程序,并加入了超时等待、以及超时后的通知子线程任务中断的机制:
public class TestCompletionService {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
private static final Random r = new Random(); // 每个子任务随机等待一个时间,以模拟子任务的执行时间
private static final int TASK_TIMEOUT = 5; // 设定最长超时时间为5s
/**
* @param args
*/
public static void main(String[] args) {
final long startTime = System.currentTimeMillis();
final long endTime = startTime + (TASK_TIMEOUT * 1000L);
CompletionService<SubTaskResult> cs = new ExecutorCompletionService<SubTaskResult>(executor);
List<Future<SubTaskResult>> futureList = new ArrayList<Future<SubTaskResult>>();
for (int i = 0; i < 10; i++) {
Future<SubTaskResult> f = cs.submit(new Callable<SubTaskResult>() {
@Override
public SubTaskResult call() throws Exception {
try {
// 子任务等待一个随机时间。如果这里不是+1而是+2,就可以模拟出现超时的情况
long waitTime = (r.nextInt(TASK_TIMEOUT) + 1) * 1000L;
Thread.sleep(waitTime);
return new SubTaskResult(Thread.currentThread().getName() +
", sub thread sleepTime=" + waitTime + "ms");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
", catch an interrupted exception, interrupted status=" + Thread.interrupted());
throw e;
}
}
});
futureList.add(f);
}
try {
for (int i = 0; i < 10; i++) {
long timeLeft = endTime - System.currentTimeMillis();
try {
// timeLeft可能为负数,由于j.u.c中所有负数与0等同,所以不用单独对负数做判断
Future<SubTaskResult> responseFuture = cs.poll(timeLeft, TimeUnit.MILLISECONDS);
if (responseFuture == null) {
throw new TimeoutException("waiting timeout");
}
SubTaskResult response = responseFuture.get();
System.out.println(response.getResult() + ", main thread waitFor: " + timeLeft);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (TimeoutException e) {
// 如果超时则终止所有任务(注意cancel只是调用子线程的interrupt方法,至于能不能中断得看子线程是否支持)
// 因为对于已经完成的任务调用Future.cancel不起效,所以不需要排除那些已经完成的任务
for (Future<SubTaskResult> future : futureList) {
future.cancel(true);
}
e.printStackTrace();
} finally {
executor.shutdown();
System.out.println("used: " + (System.currentTimeMillis() - startTime));
}
}
}
class SubTaskResult {
private final String result;
public SubTaskResult(String result) {
this.result = result;
}
public String getResult() {
return result;
}
}
分享到:
相关推荐
总的来说,理解和实现"cpp-并行执行http请求支持超时设置"涉及到对网络编程基础、并发控制、超时管理以及HTTP协议的理解。通过这个主题,开发者可以提升其在C/C++环境中编写高效、健壮网络应用的能力。
《Go并发编程实战》是郝林撰写的一本深入解析Go语言并发编程的书籍,它以其深入浅出的讲解和丰富的实战示例深受程序员喜爱。在Go语言中,并发编程是其核心特性之一,也是实现高性能服务的关键技术。下面将详细讨论这...
总的来说,Java并发编程实践中的任务执行是一个涉及线程调度、线程池管理、任务生命周期控制、结果处理等多个方面的复杂主题。理解和掌握这些概念和技术,能够帮助开发者编写出更加高效、可靠的并发程序。
Java并发编程实践中的线程池是一个关键的概念,它在多线程编程中扮演着至关重要的角色,有效地管理和调度线程资源,以提高系统的性能和效率。线程池通过复用已存在的线程来减少线程的创建和销毁开销,避免了频繁的上...
《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...
《Java并发编程的艺术》这本书是Java开发者深入理解并发编程的重要参考。并发编程是现代多核处理器环境下不可或缺的技术,它能够充分利用系统资源,提高程序的执行效率。以下将详细阐述书中涉及的一些关键知识点。 ...
《多线程编程指南--C版》是一本深入讲解C语言环境下多线程编程的教程。本书旨在帮助读者理解多线程的概念、工作原理,并通过丰富的示例代码,...通过实践这些示例,你将能够应对各种并发编程场景,提升软件开发能力。
并发编程是计算机科学中的一个重要领域,特别是在Java等多线程编程语言中,它涉及到如何在多个执行线程之间共享资源和协调任务。本面试专题主要针对高级并发编程的知识点进行探讨,旨在帮助求职者在面试中展现出深厚...
Java并发编程是Java语言中最为复杂且重要的部分之一,它涉及了多线程编程、内存模型、同步机制等多个领域。为了深入理解Java并发编程,有必要了解其核心技术点和相关实现原理,以下将详细介绍文件中提及的关键知识点...
在讨论Java并发编程之前,首先需要区分并发与并行的概念。并发是指在同一时间段内同时处理多个任务的能力,而并行则是指在同一时间点同时执行多个任务。在Java中,我们通常所说的多线程编程指的是并发而非并行。 ##...
标题《Parallel and Concurrent Programming in Haskell》和描述表明,该文件是一本专注于Haskell语言在并行和并发编程领域的深入探讨。Haskell,作为一门纯粹的、懒惰的函数式编程语言,为开发者提供了强大的抽象...
【并发数据结构与多核编程】是一门深入探讨面向多核系统的程序设计的课程,旨在让学生掌握并发编程的基础原理和实践经验。课程分为理论和实践两个部分,覆盖了从并发编程的基本概念到具体并发数据结构和算法的设计与...
并发编程是当今软件开发中不可或缺的一部分,尤其是在多核处理器日益普及的背景下,合理地使用并发编程能够显著提升程序的性能和效率。然而,编写并发程序也带来了诸多挑战,包括线程安全、死锁、线程同步等问题。...
Java并发编程是Java开发中的重要领域,涉及到多线程、内存模型、同步机制等多个核心概念。本教程将深入探讨这些知识点,帮助开发者更好地理解和利用Java进行高效并发编程。 首先,让我们从基础开始,理解Java中的多...
C++并发编程是现代软件开发中的重要组成部分,它利用多核处理器的能力,使得程序能够同时执行多个任务,提高系统的效率和响应性。在本示例中,我们有一个简单的并发程序,模拟了飞机售票系统,该系统有三个售票终端...
Scala并发编程是构建高效、可扩展的分布式系统的关键技术,特别是在大数据处理框架如Spark中。Akka是一个用Scala编写的库,它提供了基于Actor模型的并发解决方案,这使得Akka在构建高性能、高可靠的分布式应用中扮演...
Java并发编程涉及对多个线程的管理和协调,使得它们能够有效地共享资源并行执行任务。本文将深入探讨Java并发编程的基础概念、核心机制以及常见应用场景,帮助开发者更好地理解和掌握Java并发编程。 #### 一、并发...