`
DavyJones2010
  • 浏览: 155457 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java Concurrency: Cancellation and Shutdown

阅读更多

Dealing well with failure, shutdown, and cancellation is one of the characteristics that distinguishes a well-behaved application from one that merely works.

 

1> Task Cancellation:

     An activity is cancellable if external code can move it to completion before its normal completion. There are a number of reasons why you might want to cancel an activity:

     1> User-requested cancellation

     2> Time-limited activities

     3> Application events.

     4> Errors

     5> Shutdown

     There is no safe way to preemptively stop a thread in Java, and therefore no safe way to preemptively stop a task. There are only cooperative mechanisms, by which the task and the code requesting cancellation follow an agreed-upon protocol.

     A task that wants to be cancellable must have a cancellation policy that specifies the "how", "when" and "what" of cancellation.

     "HOW": How other code can request cancellation.

     "WHEN": When the task checks the cancellation has been requested.

     "WHAT": What actions the task takes in response to a cancellation request.

     Consider the real-world example of stopping payment on a check. Banks have rules about how to submit a stop-payment request, what responsiveness guaranteed it makes in processing such requests, and what procedures it follows when payment is actually stopped(such as notifying the other bank involved in the transaction and assessing a fee against the payor's account). Taken together, these procedures and guarantees comprise the cancellation policy for check payment.

     1> Setting a "cancellation requested" flag (which must be volatile).

package edu.xmu.jcip;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;

public class PrimeGenerator implements Runnable {
	private volatile boolean cancelled;
	private final List<BigInteger> primes = new ArrayList<BigInteger>();

	@Override
	public void run() {
		BigInteger p = BigInteger.ONE;
		while (!cancelled) {
			p = p.nextProbablePrime();
			synchronized (this) {
				primes.add(p);
			}
		}
	}

	public void cancel() {
		cancelled = true;
	}

	public synchronized List<BigInteger> get() {
		return new ArrayList<BigInteger>(primes);
	}
}
package edu.xmu.jcip;

import org.junit.Test;

public class PrimeGeneratorTest {
	@Test
	public void test() {
		PrimeGenerator primeGenerator = new PrimeGenerator();
		new Thread(primeGenerator).start();
		try {
			Thread.sleep(1000L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			primeGenerator.cancel();
		}
		System.out.println(primeGenerator.get());
	}
}

    But if a task that uses this approach calls a blocking method such as "BlockingQueue.put", we could have a more serious problem-the task might never check the cancellation flag and therefore might never terminate. Example below will never terminate because generator is blocked in blockingQueue.put(), and client stoped consume, thus producer do not even have chance to check while(!cancelled);

package edu.xmu.jcip;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;

public class BrokenPrimeGenerator implements Runnable {
	private final BlockingQueue<BigInteger> queue;
	private volatile boolean cancelled = false;

	public BrokenPrimeGenerator(BlockingQueue<BigInteger> queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			BigInteger p = BigInteger.ONE;
			while (!cancelled) {
				p = p.nextProbablePrime();
				queue.put(p);
			}
		} catch (InterruptedException exception) {
		}
	}

	public void cancel() {
		this.cancelled = true;
	}
}
package edu.xmu.jcip;

import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BrokenPrimeGeneratorTest {
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(
				10);
		BrokenPrimeGenerator brokenPrimeGenerator = new BrokenPrimeGenerator(
				primes);
		new Thread(brokenPrimeGenerator).start();
		Thread.sleep(1000);
		try {
			while (needMorePrimes()) {
				System.out.println(primes.take());
			}
		} finally {
			brokenPrimeGenerator.cancel();
		}
	}

	public static boolean needMorePrimes() {
		// Dummy Method called by client
		return false;
	}
}

    How can we solve the problem mentioned above? Certain blocking library methods support interruption. Thread interruption is a cooperative mechanism for a thread to signal another thread that it should, at its convernience and if it feels like it, stope what it is doing and do something else.

 

     2> Using Thread.interrupt(). Interruption is usually the most sensible way to implement cancellation.

     In practice, using interruption for anything but cancellation is fragile and difficult to sustain in larger appliations.

public class Thread{
    public void interrupt(){...} // 1) set interrupted to true 2) Call native method to stop current thread.
    public boolean isInterrupted() {...}
    public static boolean interrupted() {...} // Clears the interrupted status of the thread and returns its previous value
}

   Each thread has a boolean interrupted status in its native peer.

   1) interrput(): The interrupt method interrupts the target thread, set the interrupted to true.

   2) isInterrupted(): Returns the interrupted status of the target thread.

   3) interrupted(): 1) Clears the interrupted status? 2) Return its previous value. This is the only way to clear the interrupted status. (What means clear the interrupted status? Set its isInterrupted to false?)

   Situation 1: A thread is interrupted when it is blocked. Such as calling Thread.sleep, Object.wait or BlockingQueue.take/poll

       Blocking library methods like Thread.sleep and Object.wait try to detect when a thread has been interrupted and return early. They respond to interruption by clearing the interrupted status and throwing InterruptedException, indicating that the blocking operation completed early due to interruption. The JVM makes no guarantees on how quickly a blocking method will detect interruption, but in practice this happens reasonably quickly.

package edu.xmu.jcip;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingInterruptTest {
	public static void main(String[] args) throws InterruptedException {
		Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(1000L);
				} catch (InterruptedException e) {
					System.out.println(Thread.currentThread().isInterrupted()); // "false" will be printed
					e.printStackTrace();
				}
			}
		});
		t.start();
		Thread.sleep(10L);
		t.interrupt();

		final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
		queue.put("A");
		t = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					queue.put("B");
				} catch (InterruptedException e) {
					System.out.println(Thread.currentThread().isInterrupted()); //"false" will be printed
					e.printStackTrace();
				}
			}
		});
		t.start();
		Thread.sleep(10L);
		t.interrupt();
	}
}

    Situation B: A thread is interrupted when it is not blocked.

       Current thread's interrupted status is set, and it is up to the activity being cancelled to poll the interrupted status to detect interruption. In this way interruption is "sticky" if it doesn't trigger an InterruptionException, evidence of interruption persists until someone deliberately clears the interrupted status.

       "Calling interrupt does not necessarily stop the target thread from doing what it is doing; it merely delivers the messages that interruption has been requested."

public class NonBlockingInterruptTest {
	public static void main(String[] args) throws InterruptedException {
		Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				while (true) {
					System.out.println("Hello");
					System.out.println(Thread.currentThread().isInterrupted());
				}
			}
		});
		t.start();
		Thread.sleep(10L);
		t.interrupt();
	}
}

   Example above, thread t will run forever, but for the first 10ms, isInterrupted is false, and after that isInterrupted is true.

   An enhanced verson of PrimeProducer using interrput:

package edu.xmu.jcip;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;

public class PrimeProducer extends Thread {
	private final BlockingQueue<BigInteger> queue;

	public PrimeProducer(BlockingQueue<BigInteger> queue) {
		this.queue = queue;
	}

	public void run() {
		BigInteger p = BigInteger.ONE;
		try {
			while (!isInterrupted()) {
				p = p.nextProbablePrime();
				System.out.println("Put " + p + " into queue");
				queue.put(p);
			}
		} catch (InterruptedException exception) {
			exception.printStackTrace();
		}
	}

	public void cancel() {
		interrupt();
	}
}
package edu.xmu.jcip;

import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class PrimeProducerTest {
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(1);
		PrimeProducer primeProducer = new PrimeProducer(primes);
		primeProducer.start();
		Thread.sleep(10L);
		primeProducer.cancel();
	}
}

    Example above, when primeProducer is interrupted when executing/blocking queue.put(); then an InterruptedException will be thrown, then we will jump out of the while loop, and finished run() method.

    When primeProducer is interrupted when not executing/blocking queue.put(), then its isInterrupted() would return true, then we will jump out of the while loop as well and finish the run() method.

    Also, we can even swallow this exception and let the producer loops forever:

public void run() {
	BigInteger p = BigInteger.ONE;
	while (!isInterrupted()) {
		p = p.nextProbablePrime();
		try {
			queue.put(p);
			System.out.println("Put " + p + " into queue");
		} catch (InterruptedException exception) {
			// When InterruptedException is thrown by queue.put, it will set
			// isInterrupted to false
			exception.printStackTrace();
		}
	}
}

    A good way to think about interruption is that it does not actually interrupt a running thread; it just requests that the thread interrupt itself at the next convenient opportunity. (These opportunities are called cancellation points.) Some methods, such as wait, sleep and join, take such request seriously, throwing an exception when they receive an interrupt request or encounter an already set interrupt status upon entry.

 

2> Thread Cancellation

    It is important to distinguish between how tasks and threads should react to interruption. A single interrupt request may have more than one desired recipient, interrupting a worker thread in a thread pool can mean both to "cancel the current task" and "shut down the worker thread".

    Tasks do not execute in threads they own; they borrow threads owned by a service such as a thread pool. Code that doesn't own the thread (for a thread pool, any code outside of the thread pool implementation) should be careful to preserve the interrupted status so that the owning code can eventually act on it, even if the "guest" code acts on the interruption as well.

    This is why most blocking library methods simply throw InterruptedException in response to an interrupt. They will never execute in a thread they own, so they implement the most reasonable cancelation policy for task or library code: get out of the way as quickly as possible and communicate the interruption back to the caller so that code higher up on the call stack can take further action.

    A task needn't necessarily drop everything when it detects an interruption request, it can choose to postpone it until a more opportune time by remembering that it was interrputed, finishing the task it was performing, and then throwing InterruptedException or otherwise indicating interruption. This technique can protect data structures from corruption when an activity is interrupted in the middle of an update.

    Rule: You should know a thread's interruption policy before interrupting it.

package edu.xmu.jcip;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TimeRunTest {
	private static final ScheduledExecutorService service = Executors
			.newScheduledThreadPool(1);

	public static void main(String[] args) throws InterruptedException {
		final Thread taskThread = new Thread(new Runnable() {
			@Override
			public void run() {
				final Thread currentThread = Thread.currentThread();

				System.out.println(currentThread.getName() + ": "
						+ currentThread.isAlive() + ": "
						+ currentThread.isInterrupted() + " finished work");
			}
		});
		timedRun(taskThread, 10, TimeUnit.SECONDS);
		Thread.sleep(100L);
		System.out.println(taskThread.getName() + ": " + taskThread.isAlive()
				+ ": " + taskThread.isInterrupted() + " finished work");
		service.shutdown();
	}

	private static void timedRun(final Thread taskThread, long time,
			TimeUnit unit) throws InterruptedException {
		service.schedule(new Runnable() {
			@Override
			public void run() {
				taskThread.interrupt();
			}
		}, time, unit);
		taskThread.start();
		System.out.println(taskThread.getName() + ": " + taskThread.isAlive()
				+ ": " + taskThread.isInterrupted());
	}
}

    In example above, the taskThread finished its task and isAlive == false, then after 10s, the scheduler called taskThread.interrupt(), there is no meaning anymore to interrupt it right now.

    Since timedRun can be called from an arbitrary thread, it cannot know the calling thread's interruption policy.

    1> If the task completes before the timeout, the cancellation task that interrupts the thread in which timedRun was called could go off after timedRun has returned to its caller.We don't know what code will be running when that happens, but the result won't be good. (It is possible but suprisingly tricky to eliminate this risk by using the ScheduledFuture returned by schedule to cancel the cancellation task).

package edu.xmu.jcip;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TimeRunTest2 {
	private static final ScheduledExecutorService service = Executors
			.newScheduledThreadPool(1);

	public static void main(String[] args) throws InterruptedException {
		final Runnable task = new Runnable() {
			@Override
			public void run() {
				while (true) {
					// working
				}
			}
		};
		Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(System.currentTimeMillis());
				timedRun(task, 1, TimeUnit.SECONDS);
				System.out.println(System.currentTimeMillis());
			}
		});
		t.start();
		Thread.sleep(100L);
		service.shutdown();
	}

	private static void timedRun(final Runnable task, long time, TimeUnit unit) {
		final Thread taskThread = Thread.currentThread();
		service.schedule(new Runnable() {
			@Override
			public void run() {
				taskThread.interrupt();
			}
		}, time, unit);
		task.run();
	}
}

    2> If the task is not responsive to interruption, timedRun will not return until the task finishes, which may be long after the desired timeout. A timed run service that doesn't return after the specified time is likely to be irritating to its caller. Example above, thread t will run forever, and timedRun will never return because the runnable is insensitive to interruption.

    3> Cancellation via Future:

public class TimeRunTest3 {
	private static final ExecutorService service = Executors
			.newSingleThreadExecutor();

	public static void main(String[] args) throws InterruptedException {
		final Runnable task = new Runnable() {
			@Override
			public void run() {
				while (!Thread.currentThread().isInterrupted()) {
					// working
				}
			}
		};
		Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(System.currentTimeMillis());
				try {
					timedRun(task, 1, TimeUnit.SECONDS);
				} catch (Throwable e) {
					e.printStackTrace();
				}
				System.out.println(System.currentTimeMillis());
			}
		});
		t.start();
		Thread.sleep(100L);
		service.shutdown();
	}

	private static void timedRun(final Runnable task, long timeout,
			TimeUnit unit) throws Throwable {
		Future<?> future = service.submit(task);
		try {
			future.get(timeout, unit);
		} catch (TimeoutException e) {
			// Task will be cancelled below
		} catch (ExecutionException e) {
			// Exception thrown in task; rethrow
			throw new Throwable(e.getCause());
		} finally {
			// Harmless if task already completed
			future.cancel(true); // Interrupt if running
		}
	}
}

    You should not interrupt a pool thread directly when attempting to cancel a task, because you won't know what task is running when the interrupt request is delivered, do this only through the task's Future.

 

 

Reference Links:

1) Java Concurrency In Practice

2) http://stackoverflow.com/questions/3590000/what-does-java-lang-thread-interrupt-do

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics