`
frank-liu
  • 浏览: 1684569 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Cancel, Reject tasks in ThreadPool

 
阅读更多

简介

    在前面的文章中我们讨论过提交task到线程池中间执行。那里主要是考虑任务提交后一切都正常执行的情况。但是,如果在某些情况下如果我们想要取消或者拒绝task的提交和执行。则需要用到一些其他的特性。

Cancel Task

    Cancel task在线程池里有比较方便的支持。我们提交task的时候,一般会期待线程池的执行返回Future<T>类型的结果。在执行线程池的submit方法后我们就可以取得Future<T>结果的引用了。所以如果我们想要取消运行的话,可以直接调用该引用的cancel方法。

    我们来看一个具体的示例:

    首先我们定义一个可以提交给线程池的task,它需要实现Callable接口。

import java.util.concurrent.Callable;

public class Task implements Callable<String> {
	@Override
	public String call() throws Exception {
		while(true) {
			System.out.printf("Task: Test\n");
			Thread.sleep(100);
		}
	}
}

     这部分代码里有一个看起来比较诡异的地方。就是我们在call方法的实现里并没有返回任何String类型的结果。它只是一个无限循环。看起来它可能会报错。实际上因为会一直循环下去。代码还是可以通过编译并且运行的。

    然后,我们再来定义使用它的场景代码:

import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {
		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
		Task task = new Task();
		System.out.printf("Main: Executing the Task\n");
		Future<String> result = executor.submit(task);
		
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch(InterruptedException e) {
			e.printStackTrace();
		}
		System.out.printf("Main: Canceling the task\n");
		result.cancel(true);
		System.out.printf("Main: Canceled: %s\n", result.isCancelled());
		System.out.printf("Main: Done: %s\n", result.isDone());
		executor.shutdown();
		System.out.printf("Main: The executor has finished\n");
	}
}

     在前面的代码里,我们创建了一个线程池,然后通过submit方法提交task。在sleep两秒钟之后又通过result.cancel(true)方法取消线程的执行。下面是程序的执行结果:

Main: Executing the Task
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Main: Canceling the task
Main: Canceled: true
Main: Done: true
Main: The executor has finished

 

Reject Task

    在前面cancel task的场景中,我们是基于一个thread pool可以接收task并且能够执行。在某些情况下thread pool可能会拒绝task的执行。一般来说,当我们使用线程池结束的时候会调用线程池的shutdown方法。这个方法不会马上结束,它会等待里面的线程执行结束之后再结束。在这个时候,如果再向线程池提交task的话会被线程池拒绝。在thread pool里面提供了一种机制,当提交的task被拒绝之后,它将会被调用。

     在线程池里有一个setRejectedExecutionHandler,这个方法相当于一个事件的注册机制。它接收RejectedExecutionHandler类型的参数。在线程被reject之后该接口的rejectedExecution方法会被触发。

    我们现在来看看具体的实现示例:

    首先,实现一个继承RejectedExecutionHandler的类:

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;


public class RejectedTaskController implements RejectedExecutionHandler {
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		System.out.printf("RejectedTaskController: The task %s has" +
				"been rejected\n", r.toString());
		System.out.printf("RejectedTaskController: %s\n", executor.toString());
		System.out.printf("RejectedTaskController: Terminating: %s\n", 
				executor.isTerminating());
		System.out.printf("RejectedTaskController: Terminated: %s\n", 
				executor.isTerminated());
	}
}

     然后定义一个提交的线程:

import java.util.concurrent.TimeUnit;


public class Task implements Runnable {
	private String name;
	public Task(String name) {
		this.name = name;
	}
	
	@Override
	public void run() {
		System.out.printf("Task " + name + ": Starting");
		
		try {
			long duration = (long)(Math.random() * 10);
			System.out.printf("Task %s: ReportGenerator: Generating " +
					"a report during %d seconds\n", name, duration);
			TimeUnit.SECONDS.sleep(duration);
		} catch(InterruptedException e) {
			e.printStackTrace();
		}
		System.out.printf("Task %s: Ending\n", name);
	}
	
	public String toString() {
		return name;
	}
}

   该线程只是打印一些相关的信息并sleep几秒钟。

    具体调用实现的方法如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Main {

	public static void main(String[] args) {
		RejectedTaskController controller = 
				new RejectedTaskController();
		ThreadPoolExecutor executor = 
				(ThreadPoolExecutor) Executors.newCachedThreadPool();
		executor.setRejectedExecutionHandler(controller);
		System.out.printf("Main: Starting.\n");
		for(int i = 0; i < 3; i++) {
			Task task = new Task("Task" + i);
			executor.submit(task);
		}
		
		System.out.printf("Main: Shutting down the Executor.\n");
		executor.shutdown();
		
		System.out.printf("Main: Sending another Task.\n");
		Task task = new Task("RejectedTask");
		executor.submit(task);
		System.out.printf("Main: End.\n");
	}
}

    前面启动了3个线程让他们正常执行。然后再调用线程池的shutdown方法来中止线程池的运行。后面当我们再尝试提交线程的时候,我们可以看到rejectedExecution方法被触发。 

     下面是程序执行的结果:

Main: Starting.
Task Task0: StartingMain: Shutting down the Executor.
Task Task2: StartingTask Task1: StartingMain: Sending another Task.
RejectedTaskController: The task java.util.concurrent.FutureTask@18aa5e75 hasbeen rejected
Task Task1: ReportGenerator: Generating a report during 2 seconds
Task Task2: ReportGenerator: Generating a report during 8 seconds
Task Task0: ReportGenerator: Generating a report during 3 seconds
RejectedTaskController: java.util.concurrent.ThreadPoolExecutor@4b26f29f[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
RejectedTaskController: Terminating: true
RejectedTaskController: Terminated: false
Main: End.
Task Task1: Ending
Task Task0: Ending
Task Task2: Ending

 

总结

    对于提交给线程池执行的线程来说,如果我们希望对线程的执行有更多的控制,比如说取消或者拒绝线程的执行。我们可以通过取得的Future<T>引用调用cancel方法来实现。另外,在线程池已经调用shutdown方法之后,我们继续提交的task会被拒绝。为了知晓线程提交后被拒绝的消息以及做后续的处理,我们可以通过实现接口RejectedExecutionHandler,并注册该实现到线程池中。

参考材料

Java 7 concurrency cookbook

1
3
分享到:
评论

相关推荐

    reject-用于Java和Android的依赖注入框架

    ** Reject 框架概述** `Reject` 是一个专为Java和Android开发设计的轻量级依赖注入(Dependency Injection,简称DI)框架。依赖注入是一种软件设计模式,它可以帮助开发者解耦组件,使得代码更易于测试和维护。通过...

    ABAP流程处理的命令说明(stop,exit,return,check,reject)

    本文将深入解析ABAP中的五个关键流程处理命令:`STOP`, `EXIT`, `RETURN`, `CHECK`, 和 `REJECT`,这些命令用于控制程序的执行流程,帮助开发者更灵活地管理程序的运行状态。 ### STOP 命令 `STOP` 命令用于立即...

    Conveyor FIFO (first in first out) w-Reject.rar

    在本文中,我们将深入探讨一个使用AB PLC实现的Conveyor FIFO(先进先出)系统,并附加了w-Reject功能,即当系统检测到不合格产品时能够自动将其剔除。 **一、Conveyor FIFO(先进先出)原理** Conveyor FIFO系统...

    ipt_REJECT.rar_IS

    标题中的“ipt_REJECT.rar_IS”暗示我们正在讨论与iptables相关的拒绝策略,这在网络安全和Linux系统管理中是非常重要的一个方面。iptables是一个用于在Linux内核中实现包过滤和网络地址转换(NAT)的工具,它允许...

    NR网络拒绝码-5gmm_cause = 15(No suitable cells in tracking area).docx

    标题中的“NR网络拒绝码-5gmm_cause = 15(No suitable cells in tracking area)”是指在5G NR网络中,UE(用户设备)接收到一个特定的网络拒绝码,即5gmm_cause #15,表示在当前追踪区域(Tracking Area)内没有适合...

    nf_reject.rar_run

    标题“nf_reject.rar_run”和描述中的“Select calibration to run for Linux v2.13.6.”暗示了我们正在处理一个与Linux内核相关的项目,特别是涉及到网络过滤和校准的部分。在这个场景中,“nf_reject”可能指的是...

    nft_reject.rar_Apples

    标题中的“nft_reject.rar_Apples”暗示了这是一个与苹果(Apple)设备相关的软件更新或驱动程序包,其中可能包含对“nft_reject”功能的修改或增强。描述指出这是“Backlight Driver for Intel-based Apples”,这...

    refuse-,reject的用法区别参考.doc

    此外,"deny" 与 "refuse" 和 "reject" 有所不同,"deny" 主要用于否认事实或拒绝给予某人所需,比如:"He denied being involved in the incident."(他否认参与了那起事件。)或 "She denied him the chance to ...

    js代码-promise resolve reject

    当我们调用 `resolve` 时,Promise的状态会从 pending 变为 fulfilled,而调用 `reject` 时,则会从 pending 变为 rejected。 在 `main.js` 文件中,我们可以预期看到如何使用Promise的示例代码。通常,Promise的...

    nft_reject_bridge.rar_return

    标题中的“nft_reject_bridge.rar_return”暗示了我们正在讨论与网络过滤和包处理相关的主题,特别是与Netfilter(NFT)框架下的拒绝规则和桥接(bridge)有关的内容。在Linux系统中,Netfilter是内核的一部分,用于...

    libipt_REJECT.rar_iptables

    Shared library add-on to iptables to add customized REJECT support.

    band-reject-filter.rar_dsp_filter_processing

    band reject filter advanced signal processing

    Promise.all中对于reject的处理方法

    然而,如果任何一个Promise被拒绝(reject),Promise.all则会立即拒绝,不再等待其他的Promise。 在实际应用中,网络请求可能会因为各种原因失败,比如网络超时、服务器错误等。在这些情况下,如果我们依赖于...

    Reject Service Worker-crx插件

    "Reject Service Worker-crx插件"显然是针对这个技术的扩展程序,它的主要功能是阻止Service Worker的注册和运行。 首先,我们要理解Service Worker的作用。Service Worker是现代Web应用程序的关键组成部分,它可以...

    nf_reject_ipv6.rar_Too Short_nf_reject_ipv4.ko

    IP header checks: fragment, too short.

    nf_reject_ipv4.rar_Harder_skb

    标题 "nf_reject_ipv4.rar_Harder_skb" 指涉的是与网络过滤和IP路由相关的编程问题,特别是涉及到Linux内核中的网络子系统。描述 "ip_route_me_harder expects skb-&gt;dst to be set." 提示我们这个问题的核心在于`ip_...

    Accept_Reject_Packets.zip_Destination

    "Accept_Reject_Packets.zip_Destination" 文件压缩包提供了一个示例代码,帮助我们理解如何让路由器根据特定条件接受或拒绝来自某个目的地的数据包。这个场景常见于网络安全、访问控制以及流量管理等应用。 1. **...

    Why-Reject, 苹果AppStore被拒理由大全.zip

    7. **内购和订阅**:应用内购买和订阅需要遵循苹果的规则,不能绕过IAP(In-App Purchase)系统,否则会被拒。 8. **更新策略**:应用长时间未更新,或者忽视了用户的反馈和评论,可能会被视为维护不足,影响其在...

    js代码-面试题3:实现PromiseA+规范,实现all、resolve、reject、finally、race等方法

    面试中,要求实现Promise的核心功能,如all、resolve、reject、finally和race等方法,这是一项检验开发者对Promise深入理解的任务。下面我们将逐一探讨这些方法的实现原理与应用。 首先,Promise对象有三种状态:...

Global site tag (gtag.js) - Google Analytics