`

《Java并发编程》之五:取消和关闭线程

    博客分类:
  • Java
阅读更多

Java没有提供任何机制来安全地终止线程,虽然Thread.stop和suspend等方法提供了这样的机制,但是存在严重的缺陷,应该避免使用这些方法。但是Java提供了中断Interruption机制,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作方式是必要的,我们很少希望某个任务线程或者服务立即停止,因为这种立即停止会时某个共享的数据结构处于不一致的状态。相反,在编写任务和服务的时候可以使用一种协作方式:当需要停止的时候,它们会先清除当前正在执行的工作,然后再结束。

 

7.1  任务取消

如果外部代码能够在某个操作正常完成之前将其置于 完成 状态,那么这个操作就可以称为可取消的Cancellable

其中一种协作机制是设置一个取消标志Cancellation Requested标志,而任务定期查看该标志。

@ThreadSafe
public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @GuardedBy("this")
    private final List<BigInteger> primes = new ArrayList<BigInteger>();
    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        exec.execute(generator);
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        return generator.get();
    }
}

 在Java的API或语言规范中,并没有将中断与任何语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

 

每个线程都有一个boolean类型的中断状态。但中断线程时,这个线程的中断状态将被设置成true。

 

Thread中的三个方法:

public void interrupt()   中断一个线程

public boolean isInterrupted()  获取中断标志,判断是否中断

public static boolean interrupted()  清楚中断状态,并返回它之前的状态值

 

线程在阻塞状态下发生中断的时候会抛出InterruptedException,例如Thread.sleep(), Thread.wait(), Thread.join()等方法。

当线程在非阻塞状态下中断的时候,它的中断状态将被设置,然后根据检查中断状态来判断是否中断。

 

调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息,换句话说,仅仅修改了线程的isInterrupted标志字段。

 

通常,中断时实现取消的最合理方式。

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }

    public void cancel() {
        interrupt();
    }
}

 

7.1.3  响应中断

只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规任务和库代码中都不应该屏蔽中断请求。

两种方法响应中断:

* 传递异常InterruptedException
* 恢复中断状态,从而使调用栈中上层代码能够对其进行处理
不可取消的任务在退出前恢复中断标志
public class NoncancelableTask {
    public Task getNextTask(BlockingQueue<Task> queue) {
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return queue.take();
                } catch (InterruptedException e) {
                    interrupted = true;
                    // fall through and retry
                }
            }
        } finally {
            if (interrupted)
                Thread.currentThread().interrupt();
        }
    }

    interface Task {
    }
}
 
7.1.5  定时任务,通过Future来实现取消:
除非你清楚线程的中断策略,否则不要中断线程,那么在神马情况下调用cancel可以将参数指定为true呢。
如果任务的线程是由标准的Executor创建的,那么可以设置mayInterruptIfRunning。
public class TimedRun {
    private static final ExecutorService taskExec = Executors.newCachedThreadPool();

    public static void timedRun(Runnable r, long timeout, TimeUnit unit)
            throws InterruptedException {
        Future<?> task = taskExec.submit(r);
        try {
            task.get(timeout, unit);
        } catch (TimeoutException e) {
            // task will be cancelled below
        } catch (ExecutionException e) {
            // exception thrown in task; rethrow
            throw launderThrowable(e.getCause());
        } finally {
            // Harmless if task already completed
            task.cancel(true); // interrupt if running
        }
    }
}
 
7.1.6  处理不可中断的阻塞
对于这些线程,中断请求只能设置线程的中断状态,除此之外没有其他任何作用。
我们可以使用类似中断的手段来停止这些线程,但这要求知道线程阻塞的原因。
通过newTaskFor将非标准的取消操作封装在一个任务中:
 
public abstract class SocketUsingTask<T> implements CancellableTask<T> {
    @GuardedBy("this")
    private Socket socket;

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }

    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) {
        }
    }

    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}


interface CancellableTask<T> extends Callable<T> {
    void cancel();

    RunnableFuture<T> newTask();
}
 

 7.2  停止基于线程的服务

应用程序通常会创建拥有多个线程的服务,如果应用程序准备退出,那么这些服务所拥有的线程也需要正确的结束,由于java没有抢占式方法停止线程,因此需要它们自行结束。

正确的封装原则是:除非拥有某个线程,否则不要对该线程进行操控,例如中断线程或者修改优先级等。

 

线程有个相应的所有者,即创建该线程的类,因此线程池是其工作者线程的所有者,如果要中断线程,那么应该使用线程池去中断。

线程的所有权是不可传递的。服务应该提供生命周期方法Lifecycle Method来关闭它自己以及它所拥有的线程。这样当应用程序关闭该服务的时候,服务就可以关闭所有的线程了。在ExecutorService中提供了shutdown和shutdownNow等方法,同样,在其他拥有线程的服务方法中也应该提供类似的关闭机制。

Tips:对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

 

7.2.1  示例:日志服务

我们通常会在应用程序中加入log信息,一般用的框架就是log4j。但是这种内联的日志功能会给一些高容量Highvolume应用程序带来一定的性能开销。另外一种替代方法是通过调用log方法将日志消息放入某个队列中,并由其他线程来处理。

public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    @GuardedBy("this")
    private boolean isShutdown;
    @GuardedBy("this")
    private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown)
                throw new IllegalStateException(/*...*/);
            ++reservations;
        }
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0)
                                break;
                        }
                        String msg = queue.take();
                        synchronized (LogService.this) {
                            --reservations;
                        }
                        writer.println(msg);
                    } catch (InterruptedException e) { /* retry */
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

 

7.2.2  通过ExecutorService去关闭

简单的程序可以直接在main函数中启动和关闭全局的ExecutorService,而在复杂程序中,通常会将ExecutorService封装在某个更高级别的服务中,并且该服务提供自己的生命周期方法。下面我们利用ExecutorService重构上面的日志服务:

public class LogService {
    public void stop() throws InterruptedException {
        try {
            exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT);
        }
    }
}

 

7.2.3  利用Poison Pill对象关闭Producer-Consumer服务

 

7.2.5  当关闭线程池的时候,保存尚未开始的和开始后取消的任务数据,以备后面重新处理,下面是一个网页爬虫程序,关闭爬虫服务的时候将记录所有尚未开始的和已经取消的所有页面URL:

public abstract class WebCrawler {
    private volatile TrackingExecutor exec;
    @GuardedBy("this")
    private final Set<URL> urlsToCrawl = new HashSet<URL>();

    private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
    private static final long TIMEOUT = 500;
    private static final TimeUnit UNIT = MILLISECONDS;

    public WebCrawler(URL startUrl) {
        urlsToCrawl.add(startUrl);
    }

    public synchronized void start() {
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for (URL url : urlsToCrawl) submitCrawlTask(url);
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow());
            if (exec.awaitTermination(TIMEOUT, UNIT))
                saveUncrawled(exec.getCancelledTasks());
        } finally {
            exec = null;
        }
    }

    protected abstract List<URL> processPage(URL url);

    private void saveUncrawled(List<Runnable> uncrawled) {
        for (Runnable task : uncrawled)
            urlsToCrawl.add(((CrawlTask) task).getPage());
    }

    private void submitCrawlTask(URL u) {
        exec.execute(new CrawlTask(u));
    }

    private class CrawlTask implements Runnable {
        private final URL url;

        CrawlTask(URL url) {
            this.url = url;
        }

        private int count = 1;

        boolean alreadyCrawled() {
            return seen.putIfAbsent(url, true) != null;
        }

        void markUncrawled() {
            seen.remove(url);
            System.out.printf("marking %s uncrawled%n", url);
        }

        public void run() {
            for (URL link : processPage(url)) {
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            }
        }

        public URL getPage() {
            return url;
        }
    }
}

 

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public void shutdown() {
        exec.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(/*...*/);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}

 

7.3  处理非正常的线程终止

通过给应用程序提供一个UncaughtExceptionHandler异常处理器来处理未捕获的异常:

public class UEHLogger implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
    }
}

 只有通过execute提交的任务,才能将它抛出的异常交给未捕获异常处理器。而通过submit提交的任务,无论是抛出未检查异常还是已检查异常,都将被认为是任务返回状态的一部分

 

7.4  JVM关闭的时候提供关闭钩子

在正常关闭JVM的时候,JVM首先调用所有已注册的关闭钩子Shutdown Hook。关闭钩子可以用来实现服务或者应用程序的清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。

最佳实践是对所有服务都使用同一个关闭钩子,并且在该关闭钩子中执行一系列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免竞态条件的发生或者死锁问题。

Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        try{LogService.this.stop();} catch(InterruptedException) {..}
    }
})

 

总结:在任务、线程、服务以及应用程序等模块中的生命周期结束问题,可能会增加它们在设计和实现的时候的复杂性。我们通过利用FutureTask和Executor框架,可以帮助我们构建可取消的任务和服务。

 

博客新地址:http://yidao620c.github.io

 

分享到:
评论

相关推荐

    java并发编程实战源码,java并发编程实战pdf,Java

    《Java并发编程实战》是Java并发编程领域的一本经典著作,它深入浅出地介绍了如何在Java平台上进行高效的多线程编程。这本书的源码提供了丰富的示例,可以帮助读者更好地理解书中的理论知识并将其应用到实际项目中。...

    java 并发编程的艺术pdf清晰完整版 源码

    通过阅读《Java并发编程的艺术》这本书,开发者不仅可以掌握Java并发编程的基础知识,还能了解到一些高级特性和技巧,从而在实际开发中游刃有余。同时,附带的源码将有助于加深理解,提供实际操作的机会。

    JAVA并发编程艺术pdf版

    《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...

    Java 并发编程实战.pdf

    根据提供的信息,“Java 并发编程实战.pdf”这本书聚焦于Java并发编程的实践与应用,旨在帮助读者深入了解并掌握Java中的多线程技术及其在实际项目中的应用技巧。虽然部分内容未能提供具体章节或实例,但从标题及...

    java并发编程与实践

    "Java并发编程与实践"文档深入剖析了这一主题,旨在帮助开发者理解和掌握如何在Java环境中有效地实现并发。 并发是指在单个执行单元(如CPU)中同时执行两个或更多任务的能力。在Java中,这主要通过线程来实现,...

    java并发编程2

    以上知识点覆盖了Java并发编程的主要方面,包括线程管理、同步机制、并发工具、设计模式、并发集合以及并发编程的最佳实践等,是理解和掌握Java并发编程的关键。在实际开发中,理解和熟练运用这些知识可以编写出高效...

    java并发编程书籍

    Java并发编程是软件开发中的一个关键领域,尤其是在大型企业级应用和分布式系统中。通过学习相关的书籍,开发者可以深入理解如何有效地设计和实现高效的多线程应用程序,避免并发问题,如竞态条件、死锁、活锁等。...

    《java 并发编程实战高清PDF版》

    在Java并发编程中,多线程是核心概念之一。多线程允许程序同时执行多个任务,从而充分利用系统资源,提高程序性能。然而,多线程编程也带来了同步和竞态条件等问题,这需要开发者具备良好的线程管理和同步机制的知识...

    Java并发编程:设计原则与模式(第二版)-3

    《Java并发编程:设计原则与模式(第二版)》是一本深入探讨Java多线程编程技术的权威著作。这本书详细阐述了在Java平台中进行高效并发处理的关键概念、设计原则和实用模式。以下是对该书内容的一些核心知识点的概述...

    Java并发编程实践高清pdf及源码

    《Java并发编程实践》是一本深入探讨Java多线程编程的经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和David Holmes等专家共同编写。这本书全面介绍了Java平台上的并发编程技术,是Java开发...

    java并发编程艺术

    总而言之,《Java并发编程艺术》这本书将系统性地介绍Java并发编程的各种技术和最佳实践,帮助读者提升在多线程环境下的编程能力,从而设计出更加健壮、高效的Java应用。无论你是初级开发者还是经验丰富的工程师,这...

    Java 并发核心编程

    ### Java 并发核心编程知识点解析 #### 一、Java并发概述 自Java诞生之初,其设计者就赋予了该语言强大的并发处理能力。Java语言内置了对线程和锁的支持,这...理解和掌握这些概念和技术是成功进行并发编程的关键。

    Java并发编程设计原则和模式

    本资料“Java并发编程设计原则和模式”深入探讨了如何在Java环境中有效地进行并发处理,以充分利用系统资源并避免潜在的并发问题。 一、并发编程基础 并发是指两个或多个操作在同一时间段内执行,但并不意味着这些...

    java并发编程内部分享PPT

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。这份“java并发编程内部分享PPT”显然是一个深入探讨这一主题的资料,旨在帮助开发者...

    Java并发编程实战华章专业开发者书库 (Tim Peierls 等 美Brian Goetz).pdf

    《Java并发编程实战》是一本深入探讨Java平台并发编程的权威指南,由Tim Peierls等人与Brian Goetz合著,旨在帮助Java开发者理解和掌握在多线程环境中编写高效、安全的代码。这本书由拥有丰富经验的JDK并发大师及...

    java并发编程实战(英文版)

    综上所述,《Java并发编程实战》不仅涵盖了Java并发编程的基础知识和技术细节,还包含了丰富的实践经验和前瞻性的思考,是任何一位从事Java开发工作的程序员不可或缺的学习资源。无论是初学者还是有经验的开发者都能...

    (PDF带目录)《Java 并发编程实战》,java并发实战,并发

    《Java 并发编程实战》是一本专注于Java并发编程的权威指南,对于任何希望深入了解Java多线程和并发控制机制的开发者来说,都是不可或缺的参考资料。这本书深入浅出地介绍了如何在Java环境中有效地管理和控制并发...

    java并发编程

    Java并发编程是Java开发者必须掌握的关键技能之一,它涉及到如何在多线程环境中高效、安全地执行程序。并发编程能够充分利用多核处理器的计算能力,提高应用程序的响应速度和整体性能。《Java编程并发实战》这本书是...

Global site tag (gtag.js) - Google Analytics