Dealing well with failure, shutdown, and cancellation is one of the characteristics that distinguishes a well-behaved application from one that merely works.
1> Task Cancellation:
An activity is cancellable if external code can move it to completion before its normal completion. There are a number of reasons why you might want to cancel an activity:
1> User-requested cancellation
2> Time-limited activities
3> Application events.
4> Errors
5> Shutdown
There is no safe way to preemptively stop a thread in Java, and therefore no safe way to preemptively stop a task. There are only cooperative mechanisms, by which the task and the code requesting cancellation follow an agreed-upon protocol.
A task that wants to be cancellable must have a cancellation policy that specifies the "how", "when" and "what" of cancellation.
"HOW": How other code can request cancellation.
"WHEN": When the task checks the cancellation has been requested.
"WHAT": What actions the task takes in response to a cancellation request.
Consider the real-world example of stopping payment on a check. Banks have rules about how to submit a stop-payment request, what responsiveness guaranteed it makes in processing such requests, and what procedures it follows when payment is actually stopped(such as notifying the other bank involved in the transaction and assessing a fee against the payor's account). Taken together, these procedures and guarantees comprise the cancellation policy for check payment.
1> Setting a "cancellation requested" flag (which must be volatile).
package edu.xmu.jcip; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; public class PrimeGenerator implements Runnable { private volatile boolean cancelled; private final List<BigInteger> primes = new ArrayList<BigInteger>(); @Override public void run() { BigInteger p = BigInteger.ONE; while (!cancelled) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public void cancel() { cancelled = true; } public synchronized List<BigInteger> get() { return new ArrayList<BigInteger>(primes); } }
package edu.xmu.jcip; import org.junit.Test; public class PrimeGeneratorTest { @Test public void test() { PrimeGenerator primeGenerator = new PrimeGenerator(); new Thread(primeGenerator).start(); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } finally { primeGenerator.cancel(); } System.out.println(primeGenerator.get()); } }
But if a task that uses this approach calls a blocking method such as "BlockingQueue.put", we could have a more serious problem-the task might never check the cancellation flag and therefore might never terminate. Example below will never terminate because generator is blocked in blockingQueue.put(), and client stoped consume, thus producer do not even have chance to check while(!cancelled);
package edu.xmu.jcip; import java.math.BigInteger; import java.util.concurrent.BlockingQueue; public class BrokenPrimeGenerator implements Runnable { private final BlockingQueue<BigInteger> queue; private volatile boolean cancelled = false; public BrokenPrimeGenerator(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; while (!cancelled) { p = p.nextProbablePrime(); queue.put(p); } } catch (InterruptedException exception) { } } public void cancel() { this.cancelled = true; } }
package edu.xmu.jcip; import java.math.BigInteger; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BrokenPrimeGeneratorTest { public static void main(String[] args) throws InterruptedException { BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>( 10); BrokenPrimeGenerator brokenPrimeGenerator = new BrokenPrimeGenerator( primes); new Thread(brokenPrimeGenerator).start(); Thread.sleep(1000); try { while (needMorePrimes()) { System.out.println(primes.take()); } } finally { brokenPrimeGenerator.cancel(); } } public static boolean needMorePrimes() { // Dummy Method called by client return false; } }
How can we solve the problem mentioned above? Certain blocking library methods support interruption. Thread interruption is a cooperative mechanism for a thread to signal another thread that it should, at its convernience and if it feels like it, stope what it is doing and do something else.
2> Using Thread.interrupt(). Interruption is usually the most sensible way to implement cancellation.
In practice, using interruption for anything but cancellation is fragile and difficult to sustain in larger appliations.
public class Thread{ public void interrupt(){...} // 1) set interrupted to true 2) Call native method to stop current thread. public boolean isInterrupted() {...} public static boolean interrupted() {...} // Clears the interrupted status of the thread and returns its previous value }
Each thread has a boolean interrupted status in its native peer.
1) interrput(): The interrupt method interrupts the target thread, set the interrupted to true.
2) isInterrupted(): Returns the interrupted status of the target thread.
3) interrupted(): 1) Clears the interrupted status? 2) Return its previous value. This is the only way to clear the interrupted status. (What means clear the interrupted status? Set its isInterrupted to false?)
Situation 1: A thread is interrupted when it is blocked. Such as calling Thread.sleep, Object.wait or BlockingQueue.take/poll
Blocking library methods like Thread.sleep and Object.wait try to detect when a thread has been interrupted and return early. They respond to interruption by clearing the interrupted status and throwing InterruptedException, indicating that the blocking operation completed early due to interruption. The JVM makes no guarantees on how quickly a blocking method will detect interruption, but in practice this happens reasonably quickly.
package edu.xmu.jcip; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingInterruptTest { public static void main(String[] args) throws InterruptedException { Thread t = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000L); } catch (InterruptedException e) { System.out.println(Thread.currentThread().isInterrupted()); // "false" will be printed e.printStackTrace(); } } }); t.start(); Thread.sleep(10L); t.interrupt(); final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1); queue.put("A"); t = new Thread(new Runnable() { @Override public void run() { try { queue.put("B"); } catch (InterruptedException e) { System.out.println(Thread.currentThread().isInterrupted()); //"false" will be printed e.printStackTrace(); } } }); t.start(); Thread.sleep(10L); t.interrupt(); } }
Situation B: A thread is interrupted when it is not blocked.
Current thread's interrupted status is set, and it is up to the activity being cancelled to poll the interrupted status to detect interruption. In this way interruption is "sticky" if it doesn't trigger an InterruptionException, evidence of interruption persists until someone deliberately clears the interrupted status.
"Calling interrupt does not necessarily stop the target thread from doing what it is doing; it merely delivers the messages that interruption has been requested."
public class NonBlockingInterruptTest { public static void main(String[] args) throws InterruptedException { Thread t = new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println("Hello"); System.out.println(Thread.currentThread().isInterrupted()); } } }); t.start(); Thread.sleep(10L); t.interrupt(); } }
Example above, thread t will run forever, but for the first 10ms, isInterrupted is false, and after that isInterrupted is true.
An enhanced verson of PrimeProducer using interrput:
package edu.xmu.jcip; import java.math.BigInteger; import java.util.concurrent.BlockingQueue; public class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; public PrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { BigInteger p = BigInteger.ONE; try { while (!isInterrupted()) { p = p.nextProbablePrime(); System.out.println("Put " + p + " into queue"); queue.put(p); } } catch (InterruptedException exception) { exception.printStackTrace(); } } public void cancel() { interrupt(); } }
package edu.xmu.jcip; import java.math.BigInteger; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class PrimeProducerTest { public static void main(String[] args) throws InterruptedException { BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(1); PrimeProducer primeProducer = new PrimeProducer(primes); primeProducer.start(); Thread.sleep(10L); primeProducer.cancel(); } }
Example above, when primeProducer is interrupted when executing/blocking queue.put(); then an InterruptedException will be thrown, then we will jump out of the while loop, and finished run() method.
When primeProducer is interrupted when not executing/blocking queue.put(), then its isInterrupted() would return true, then we will jump out of the while loop as well and finish the run() method.
Also, we can even swallow this exception and let the producer loops forever:
public void run() { BigInteger p = BigInteger.ONE; while (!isInterrupted()) { p = p.nextProbablePrime(); try { queue.put(p); System.out.println("Put " + p + " into queue"); } catch (InterruptedException exception) { // When InterruptedException is thrown by queue.put, it will set // isInterrupted to false exception.printStackTrace(); } } }
A good way to think about interruption is that it does not actually interrupt a running thread; it just requests that the thread interrupt itself at the next convenient opportunity. (These opportunities are called cancellation points.) Some methods, such as wait, sleep and join, take such request seriously, throwing an exception when they receive an interrupt request or encounter an already set interrupt status upon entry.
2> Thread Cancellation
It is important to distinguish between how tasks and threads should react to interruption. A single interrupt request may have more than one desired recipient, interrupting a worker thread in a thread pool can mean both to "cancel the current task" and "shut down the worker thread".
Tasks do not execute in threads they own; they borrow threads owned by a service such as a thread pool. Code that doesn't own the thread (for a thread pool, any code outside of the thread pool implementation) should be careful to preserve the interrupted status so that the owning code can eventually act on it, even if the "guest" code acts on the interruption as well.
This is why most blocking library methods simply throw InterruptedException in response to an interrupt. They will never execute in a thread they own, so they implement the most reasonable cancelation policy for task or library code: get out of the way as quickly as possible and communicate the interruption back to the caller so that code higher up on the call stack can take further action.
A task needn't necessarily drop everything when it detects an interruption request, it can choose to postpone it until a more opportune time by remembering that it was interrputed, finishing the task it was performing, and then throwing InterruptedException or otherwise indicating interruption. This technique can protect data structures from corruption when an activity is interrupted in the middle of an update.
Rule: You should know a thread's interruption policy before interrupting it.
package edu.xmu.jcip; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class TimeRunTest { private static final ScheduledExecutorService service = Executors .newScheduledThreadPool(1); public static void main(String[] args) throws InterruptedException { final Thread taskThread = new Thread(new Runnable() { @Override public void run() { final Thread currentThread = Thread.currentThread(); System.out.println(currentThread.getName() + ": " + currentThread.isAlive() + ": " + currentThread.isInterrupted() + " finished work"); } }); timedRun(taskThread, 10, TimeUnit.SECONDS); Thread.sleep(100L); System.out.println(taskThread.getName() + ": " + taskThread.isAlive() + ": " + taskThread.isInterrupted() + " finished work"); service.shutdown(); } private static void timedRun(final Thread taskThread, long time, TimeUnit unit) throws InterruptedException { service.schedule(new Runnable() { @Override public void run() { taskThread.interrupt(); } }, time, unit); taskThread.start(); System.out.println(taskThread.getName() + ": " + taskThread.isAlive() + ": " + taskThread.isInterrupted()); } }
In example above, the taskThread finished its task and isAlive == false, then after 10s, the scheduler called taskThread.interrupt(), there is no meaning anymore to interrupt it right now.
Since timedRun can be called from an arbitrary thread, it cannot know the calling thread's interruption policy.
1> If the task completes before the timeout, the cancellation task that interrupts the thread in which timedRun was called could go off after timedRun has returned to its caller.We don't know what code will be running when that happens, but the result won't be good. (It is possible but suprisingly tricky to eliminate this risk by using the ScheduledFuture returned by schedule to cancel the cancellation task).
package edu.xmu.jcip; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class TimeRunTest2 { private static final ScheduledExecutorService service = Executors .newScheduledThreadPool(1); public static void main(String[] args) throws InterruptedException { final Runnable task = new Runnable() { @Override public void run() { while (true) { // working } } }; Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println(System.currentTimeMillis()); timedRun(task, 1, TimeUnit.SECONDS); System.out.println(System.currentTimeMillis()); } }); t.start(); Thread.sleep(100L); service.shutdown(); } private static void timedRun(final Runnable task, long time, TimeUnit unit) { final Thread taskThread = Thread.currentThread(); service.schedule(new Runnable() { @Override public void run() { taskThread.interrupt(); } }, time, unit); task.run(); } }
2> If the task is not responsive to interruption, timedRun will not return until the task finishes, which may be long after the desired timeout. A timed run service that doesn't return after the specified time is likely to be irritating to its caller. Example above, thread t will run forever, and timedRun will never return because the runnable is insensitive to interruption.
3> Cancellation via Future:
public class TimeRunTest3 { private static final ExecutorService service = Executors .newSingleThreadExecutor(); public static void main(String[] args) throws InterruptedException { final Runnable task = new Runnable() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { // working } } }; Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println(System.currentTimeMillis()); try { timedRun(task, 1, TimeUnit.SECONDS); } catch (Throwable e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis()); } }); t.start(); Thread.sleep(100L); service.shutdown(); } private static void timedRun(final Runnable task, long timeout, TimeUnit unit) throws Throwable { Future<?> future = service.submit(task); try { future.get(timeout, unit); } catch (TimeoutException e) { // Task will be cancelled below } catch (ExecutionException e) { // Exception thrown in task; rethrow throw new Throwable(e.getCause()); } finally { // Harmless if task already completed future.cancel(true); // Interrupt if running } } }
You should not interrupt a pool thread directly when attempting to cancel a task, because you won't know what task is running when the interrupt request is delivered, do this only through the task's Future.
Reference Links:
1) Java Concurrency In Practice
2) http://stackoverflow.com/questions/3590000/what-does-java-lang-thread-interrupt-do
相关推荐
通过`concurrency::cancellation_token`和`concurrency::cancellation_token_source`,可以在运行时请求任务取消。如果任务在取消请求发出后仍未完成,它将在适当的时候停止执行。 6. **并行算法** PPL还包括一...
Get an easy introduction to reactive streams in Java to handle concurrency, data streams, and the propagation of change in today's applications. This compact book includes in-depth introductions to ...
Java Concurrency in Practice 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者...
Basic concepts of concurrency and thread safety Techniques for building and composing thread-safe classes Using the concurrency building blocks in java.util.concurrent Performance optimization dos ...
本教程"JavaConcurrency:Java并发教程"旨在深入探讨Java平台上的并发机制和最佳实践。 Java并发的核心概念包括线程、同步、互斥、死锁以及线程安全。在Java中,线程是并发执行的基本单元,通过创建Thread对象或者...
《Java并发编程实践》是Java开发者必读的经典之作,由Brian Goetz等多位专家共同撰写。这本书深入浅出地探讨了Java平台上的并发问题,帮助读者理解和掌握如何编写高效、可靠且可维护的多线程应用程序。以下是该书...
《并发的艺术》(The Art of Concurrency: A Thread Monkey's Guide to Writing Parallel Applications)是一本面向程序员的专业书籍,旨在深入讲解并发编程的核心概念和技术。本书由Clay Breshears撰写,并于2009年...
Java Concurrency in practice
This concise book empowers all Java developers to master the complexity of the Java thread APIs and concurrency utilities. This knowledge aids the Java developer in writing correct and complex ...
<<java并行编程>>英文版chm格式,英文名称<Java Concurrency in Practice>,一直想买这本书,但总是缺货,找到了电子版,分享给大家。 Java Concurrency in Practice By Brian Goetz, Tim Peierls, Joshua Bloch,...
Concurrency in Go: Tools and Techniques for Developers by Katherine Cox-Buday English | 19 July 2017 | ISBN: 1491941197 | ASIN: B0742NH2SG | 240 Pages | AZW3 | 1.15 MB Concurrency can be notoriously...
《Java并发编程实践》(Java Concurrency in Practice)是一本深度探讨Java多线程和并发编程的经典著作。这本书由Brian Goetz、Tim Peierls、Joshua Bloch、David Holmes和Doug Lea合著,旨在帮助Java开发者理解和解决...
Java Concurrency in Practice JAVA并发编程实践中文版(全)第二部分
《Java Concurrency in Practice》是Java并发编程领域的一本经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和Doug Lea等专家共同编写。这本书深入探讨了Java平台上的多线程和并发编程,旨在...
java8 源码 并发操作合集 这是一个关于并发的系列。以实战为驱动,了解并发编程中的那些骚操作。文中的示例代码和部分解释来源于网络,你可以把这个系列当做一本工具书,想不起来的时候来看一看,顺便star一发也是...
本项目"Java-Concurrency:Java并发学习演示"旨在提供一个深入理解Java并发机制的实践平台。以下是对相关知识点的详细说明: 1. **线程与进程**:在计算机系统中,进程是资源分配的基本单位,而线程是执行的基本单位...