`

线程任务的取消

阅读更多

转自: http://www.blogjava.net/killme2008/archive/2007/09/03/142344.html

当外部代码能够在活动自然完成之前,把它的状态更改为完成状态,那么这个活动被称为可取消(cancellable)。取消任务是一个很常见的需求,无论是由于用户请求还是系统错误引起的服务关闭等等原因。最简单的任务取消策略就是在线程中维持一个bool变量,在run方法中判断此变量的bool值来决定是否取消任务。显然,这个bool变量需要声明为volatile,以保持多线程环境下可见性(所谓可见性,就是当一个线程修改共享对象的某个状态变量后,另一个线程可以马上看到修改结果)。下面是一个来自《java并发编程实践》的例子:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PrimeGenerator implements Runnable {
    
private final List<BigInteger> primes = new ArrayList<BigInteger>();

    
private volatile boolean cancelled;
   
public void run() {
        BigInteger p 
= BigInteger.ONE;
        
while (!cancelled) {
            p 
= p.nextProbablePrime();
            
synchronized (this) {
                primes.add(p);
            }
        }
    }
   
public void cancel() {
        cancelled 
= true;
    }
   
public synchronized List<BigInteger> get() {
        
return new ArrayList<BigInteger>(primes);
    }

   
public static void main(String args[]) throws InterruptedException {
        PrimeGenerator generator 
= new PrimeGenerator();
        
new Thread(generator).start();
        
try {
            TimeUnit.SECONDS.sleep(
1);
        } 
finally {
            generator.cancel();
        }
    }
}

    main中启动一个素数生成的任务,线程运行一秒就取消掉。通过线程中的cancelled变量来表征任务是否继续执行。既然是最简单的策略,那么什么是例外情况?显然,阻塞操作下(比如调用join,wait,sleep方法),这样的策略会出问题。任务因为调用这些阻塞方法而被阻塞,它将不会去检查volatile变量,导致取消操作失效。那么解决办法是什么?中断!考虑我们用BlockingQueue去保存生成的素数,BlockingQueue的put方法是阻塞的(当BlockingQueue满的时候,put操作会阻塞直到有元素被take),让我们看看不采用中断,仍然采用简单策略会出现什么情况:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BrokenPrimeProducer extends Thread {
    
static int i = 1000;

    
private final BlockingQueue<BigInteger> queue;

    
private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue
<BigInteger> queue) {
        
this.queue = queue;
    }

    
public void run() {
        BigInteger p 
= BigInteger.ONE;
        
try {
            
while (!cancelled) {
                p 
= p.nextProbablePrime();
                queue.put(p);
            }
        } 
catch (InterruptedException cusumed) {
        }
    }

    
public void cancel() {
        
this.cancelled = false;
    }

    
public static void main(String args[]) throws InterruptedException {
        BlockingQueue
<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                
10);
        BrokenPrimeProducer producer 
= new BrokenPrimeProducer(queue);
        producer.start();
        
try {
            
while (needMorePrimes())
                queue.take();
        } 
finally {
            producer.cancel();
        }
    }

    
public static boolean needMorePrimes() throws InterruptedException {
        
boolean result = true;
        i
--;
        
if (i == 0)
            result 
= false;
        
return result;
    }
}

    我们在main中通过queue.take来消费产生的素数(虽然仅仅是取出扔掉),我们只消费了1000个素数,然后尝试取消产生素数的任务,很遗憾,取消不了,因为产生素数的线程产生素数的速度大于我们消费的速度,我们在消费1000后就停止消费了,那么任务将被queue的put方法阻塞,永远也不会去判断cancelled状态变量,任务取消不了。正确的做法应当是使用中断(interrupt):

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class PrimeProducer extends Thread {
    
static int i = 1000;

    
private final BlockingQueue<BigInteger> queue;

    
private volatile boolean cancelled = false;

    PrimeProducer(BlockingQueue
<BigInteger> queue) {
        
this.queue = queue;
    }

    
public void run() {
        BigInteger p 
= BigInteger.ONE;
        
try {
            
while (!Thread.currentThread().isInterrupted()) {
                p 
= p.nextProbablePrime();
                queue.put(p);
            }
        } 
catch (InterruptedException cusumed) {
        }
    }

    
public void cancel() {
        interrupt();
    }

    
public static void main(String args[]) throws InterruptedException {
        BlockingQueue
<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                
10);
        PrimeProducer producer 
= new PrimeProducer(queue);
        producer.start();
        
try {
            
while (needMorePrimes())
                queue.take();
        } 
finally {
            producer.cancel();
        }
    }

    
public static boolean needMorePrimes() throws InterruptedException {
        
boolean result = true;
        i
--;
        
if (i == 0)
            result 
= false;
        
return result;
    }
}

   在run方法中,通过Thread的isInterrupted来判断interrupt status是否已经被修改,从而正确实现了任务的取消。关于interrupt,有一点需要特别说明,调用interrupt并不意味着必然停止目标线程的正在进行的工作,它仅仅是传递一个请求中断的信号给目标线程,目标线程会在下一个方便的时刻中断。而对于阻塞方法产生的InterruptedException的处理,两种选择:要么重新抛出让上层代码来处理,要么在catch块中调用Thread的interrupt来保存中断状态。除非你确定要让工作线程终止(如上所示代码),否则不要仅仅是catch而不做任务处理工作(生吞了InterruptedException),更详细可以参考这里。如果不清楚外部线程的中断策略,生搬硬套地调用interrupt可能产生不可预料的后果,可参见书中7.1.4例子。

   另外一个取消任务的方法就是采用Future来管理任务,这是JDK5引入的,用于管理任务的生命周期,处理异常等。比如调用ExecutorService的sumit方法会返回一个Future来描述任务,而Future有一个cancel方法用于取消任务。
   那么,如果任务调用了不可中断的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那么该怎么处理呢?简单地,关闭它们!参考下面的例子:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package net.rubyeye.concurrency.chapter7;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;

/**
 * 展示对于不可中断阻塞的取消任务 通过关闭socket引发异常来中断
 * 
 * 
@author Admin
 * 
 
*/
public abstract class ReaderThread extends Thread {
    
private final Socket socket;

    
private final InputStream in;

    
public ReaderThread(Socket socket) throws IOException {
        
this.socket = socket;
        
this.in = socket.getInputStream();
    }

    
// 重写interrupt方法
    public void interrupt() {
        
try {
            socket.close();
        } 
catch (IOException e) {
        } 
finally {
            
super.interrupt();
        }
    }

    
public void run() {
        
try {
            
byte[] buf = new byte[1024];
            
while (true) {
                
int count = in.read(buf);
                
if (count < 0)
                    
break;
                
else if (count > 0)
                    processBuff(buf, count);
            }
        } 
catch (IOException e) {
        }
    }

    
public abstract void processBuff(byte[] buf, int count);
}

    Reader线程重写了interrupt方法,其中调用了socket的close方法用于中断read方法,最后,又调用了super.interrupt(),防止当调用可中断的阻塞方法时不能正常中断。

分享到:
评论

相关推荐

    停止线程和取消线程任务的案例代码

    这个资源文件中,包含了停止线程和取消线程任务的案例代码,介绍了四五种停止线程、取消线程任务的方式,有Thread.stop()、Thread.interrupt()、future,cancel(true)等等,也可以查看文章...获取。

    winform 多线程 多任务管理

    它可能包含了对线程的创建、监控、暂停、恢复和取消等功能。可以设计一个`Task`类来表示每个任务,包含任务状态、优先级、回调方法等信息。然后,`TaskManager`可以维护一个任务列表,提供添加、删除任务以及启动、...

    多线程实现任务管理器

    在编程领域,多线程是一种常见且强大的技术,它允许应用程序同时执行多个任务,从而提高系统效率和响应性。在C#中,多线程的使用尤其广泛,尤其是在需要进行并发处理或者异步操作的场景下。本篇将深入探讨如何在C#中...

    BlockingQueue队列自定义超时时间取消线程池任务

    `FutureTask`是线程池处理任务的一种方式,它不仅包含了一个`Runnable`或`Callable`任务,还提供了检查任务是否完成、获取结果、取消任务等能力。当我们提交一个`FutureTask`到线程池时,可以通过`Future`接口的方法...

    taskTimer 线程 任务

    线程任务,特别是定时任务,在IT领域中是至关重要的,特别是在服务器端的后台处理和自动化流程中。"taskTimer 线程任务"这个主题主要关注如何利用编程语言中的线程机制来创建和管理定时执行的任务。下面我们将深入...

    IOS-多线程多任务下载

    7. **管理任务**: 为了控制多任务下载,可以使用数组来存储所有任务,根据需要暂停、恢复或取消特定任务。当达到某个限制(如网络连接状况、内存使用情况等)时,可以暂停部分任务,待条件改善后再继续。 8. **更新...

    多线程多任务下载,可查看任务进度

    8. **用户界面**:提供友好的用户界面,展示任务列表、每个任务的进度、速度和预计完成时间,以及暂停、恢复、取消等操作。 9. **下载队列管理**:对下载任务进行排队,根据优先级和资源情况决定哪个任务先下载,...

    MFC实现多任务多线程下载软件

    当某个任务完成或需要取消时,可以通过这个容器找到并操作相应的线程。 在错误处理方面,MFC提供了异常处理机制,可以捕获并处理线程中的异常,避免整个程序因单个任务失败而崩溃。此外,还需要考虑网络连接问题,...

    VC++ 多任务多线程下载程序源码

    6. **错误处理**:在下载过程中,可能会遇到网络中断、服务器错误等问题,需要有相应的错误处理机制,例如重试、断点续传、取消任务等。 7. **资源释放**:当所有任务完成后,需要确保释放所有占用的资源,如关闭...

    MFC多线程的创建,包括工作线程和用户界面线程

    - **联系**:定时器可以结合多线程使用,例如在工作者线程中设置定时器来定期执行某个任务,或者在用户界面线程中设置定时器来定期刷新界面。 #### 五、线程消息传递 在多线程环境中,线程间通信非常重要。MFC提供...

    易语言多线程控制线程数量源码

    易语言提供了诸如`等待线程`、`唤醒线程`、`取消线程`等命令来协调线程之间的执行顺序。同时,还可以利用`互斥量`、`信号量`等同步机制确保共享资源的安全访问。 5. **控制线程数量**: 为了优化系统性能和避免...

    C#多线程,多任务下载文件工具,带断点续传

    同时,可能还需要一个用户界面来展示当前的下载状态,包括每个任务的进度条、速度等信息,以及控制任务的启动、暂停、恢复和取消。 在项目"MyDownloader_src"中,我们可以期待找到以下组件: 1. `Downloader` 类:...

    C# 如何挂起线程、休眠线程和终止线程(源码例)

    通常,更安全的做法是通过设置共享标志(例如使用volatile关键字修饰的变量)来通知线程自行终止,或者在执行到某个已知点时检查取消标志。 在实际应用中,多线程操作需要谨慎处理,因为线程间的交互可能引发竞态...

    C#判断线程池中所有的线程是否已经完成

    线程池中的线程通常用于执行异步任务,因此在某些场景下,我们需要判断线程池中所有的线程是否已经完成工作。这个问题可以通过注册一个等待单个对象(RegisterWaitForSingleObject)并配合定期检查线程池状态来解决...

    Android线程结束——合理的结束你想结束的线程

    在Android开发中,线程管理是一项重要的任务,尤其是在处理耗时操作时,如网络请求、数据库操作或大计算量的任务。本篇文章将深入探讨如何合理地结束Android中的线程,以确保应用程序的性能和稳定性。 首先,理解...

    多线程开发,异步任务asyntask android的demo,实现从网络下载图片,亲测可用,改了bug

    2. **任务取消**:可以通过`isCancelled()`检查任务是否被取消,避免不必要的资源消耗。 3. **版本兼容性**:自Android 3.0(API level 11)开始,AsyncTask执行方式发生了变化。对于低于API level 11的版本,建议...

    Posix线程编程指南

    该指南将详细介绍POSIX线程编程的基本知识点,包括线程创建、线程间通信、线程取消、线程私有数据、线程同步、线程终止和杂项函数等内容。 线程与进程是操作系统中两种不同的执行流。进程是资源管理的基本单位,而...

    多线程发邮件

    `FutureTask`不仅可以代表一个Callable的结果,还可以用于取消任务或者检查任务是否完成。 提到ExecutorService,它是Java并发包`java.util.concurrent`中的核心接口,它提供了一种管理和控制线程的方法。在这个...

    QNX环境下多线程编程

    取消的状态有两种:PTHREAD_CANCEL_ENABLE表示将线程设为取消状态,PTHREAD_CANCEL_DISABLE表示忽略取消状态;取消的类型也有两种:THREAD_CANCEL_DEFFERED表示执行到取消点取消,PTHREAD_CANCEL_ASYCHRONOUS表示...

    线程池c++,通过线程控制实现线程集动态调整线程个数

    - **任务超时和取消**:提供一种方式让提交的任务能够在一定时间内完成或被取消。 - **扩展性**:设计线程池时要考虑其可扩展性,以便在未来添加更多功能,如任务优先级队列、线程池的大小限制策略等。 总结来说,...

Global site tag (gtag.js) - Google Analytics