`
longgangbai
  • 浏览: 7330858 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

【转】java线程阻塞中断和LockSupport的常见问题

阅读更多

   转载自 http://www.iteye.com/topic/970055

    上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。

 

在介绍之前,先抛几个问题。

 

  1. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
  2. Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
  3. 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
  4. LockSupport.park()和unpark(),与object.wait()和notify()的区别?
  5. LockSupport.park(Object blocker)传递的blocker对象做什么用?
  6. LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
  7. Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。

那如果不清楚的,带着这几个问题,一起来梳理下。
Thread的interrupt处理的几个方法:
  • public void interrupt() :  执行线程interrupt事件
  • public boolean isInterrupted() : 检查当前线程是否处于interrupt
  • public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()

理解:
1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态
2. 一般调用Thread.interrupt()会有两种处理方式
  • 遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
  • 其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。
在interrupt javadoc中描述:



最佳实践
IBM上有篇文章写的挺不错。Java theory and practice: Dealing with InterruptedException , 里面提到了Interrupt处理的几条最佳实践。
  1. Don't swallow interrupts (别吃掉Interrupt,一般是两种处理:  继续throw InterruptedException异常。  另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。
    Java代码 复制代码 收藏代码
    1. public class TaskRunner implements Runnable {   
    2.     private BlockingQueue<Task> queue;   
    3.   
    4.     public TaskRunner(BlockingQueue<Task> queue) {    
    5.         this.queue = queue;    
    6.     }   
    7.   
    8.     public void run() {    
    9.         try {   
    10.              while (true) {   
    11.                  Task task = queue.take(10, TimeUnit.SECONDS);   
    12.                  task.execute();   
    13.              }   
    14.          }   
    15.          catch (InterruptedException e) {    
    16.              // Restore the interrupted status   
    17.              Thread.currentThread().interrupt();   
    18.          }   
    19.     }   
    20. }  
    public class TaskRunner implements Runnable {
        private BlockingQueue<Task> queue;
    
        public TaskRunner(BlockingQueue<Task> queue) { 
            this.queue = queue; 
        }
    
        public void run() { 
            try {
                 while (true) {
                     Task task = queue.take(10, TimeUnit.SECONDS);
                     task.execute();
                 }
             }
             catch (InterruptedException e) { 
                 // Restore the interrupted status
                 Thread.currentThread().interrupt();
             }
        }
    }
     
  2. Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)
    Java代码 复制代码 收藏代码
    1. public class PrimeProducer extends Thread {   
    2.     private final BlockingQueue<BigInteger> queue;   
    3.   
    4.     PrimeProducer(BlockingQueue<BigInteger> queue) {   
    5.         this.queue = queue;   
    6.     }   
    7.   
    8.     public void run() {   
    9.         try {   
    10.             BigInteger p = BigInteger.ONE;   
    11.             while (!Thread.currentThread().isInterrupted())   
    12.                 queue.put(p = p.nextProbablePrime());   
    13.         } catch (InterruptedException consumed) {   
    14.             /* Allow thread to exit */  
    15.         }   
    16.     }   
    17.   
    18.     public void cancel() { interrupt(); } // 发起中断   
    19. }<SPAN style="WHITE-SPACE: normal"> </SPAN>  
    public class PrimeProducer extends Thread {
        private final BlockingQueue<BigInteger> queue;
    
        PrimeProducer(BlockingQueue<BigInteger> queue) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                BigInteger p = BigInteger.ONE;
                while (!Thread.currentThread().isInterrupted())
                    queue.put(p = p.nextProbablePrime());
            } catch (InterruptedException consumed) {
                /* Allow thread to exit */
            }
        }
    
        public void cancel() { interrupt(); } // 发起中断
    } 

注册Interrupt处理事件(非正常用法)

一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。

 

来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。

 

 

Java代码 复制代码 收藏代码
  1. interface InterruptAble { // 定义可中断的接口   
  2.   
  3.     public void interrupt() throws InterruptedException;   
  4. }   
  5.   
  6. abstract class InterruptSupport implements InterruptAble {   
  7.   
  8.     private volatile boolean interrupted = false;   
  9.     private Interruptible    interruptor = new Interruptible() {   
  10.   
  11.                                              public void interrupt() {   
  12.                                                  interrupted = true;   
  13.                                                  InterruptSupport.this.interrupt(); // 位置3   
  14.                                              }   
  15.                                          };   
  16.   
  17.     public final boolean execute() throws InterruptedException {   
  18.         try {   
  19.             blockedOn(interruptor); // 位置1   
  20.             if (Thread.currentThread().isInterrupted()) { // 立马被interrupted   
  21.                 interruptor.interrupt();   
  22.             }   
  23.             // 执行业务代码   
  24.             bussiness();   
  25.         } finally {   
  26.             blockedOn(null);   // 位置2   
  27.         }   
  28.   
  29.         return interrupted;   
  30.     }   
  31.   
  32.     public abstract void bussiness() ;   
  33.   
  34.     public abstract void interrupt();   
  35.   
  36.     // -- sun.misc.SharedSecrets --   
  37.     static void blockedOn(Interruptible intr) { // package-private   
  38.         sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);   
  39.     }   
  40. }  
interface InterruptAble { // 定义可中断的接口

    public void interrupt() throws InterruptedException;
}

abstract class InterruptSupport implements InterruptAble {

    private volatile boolean interrupted = false;
    private Interruptible    interruptor = new Interruptible() {

                                             public void interrupt() {
                                                 interrupted = true;
                                                 InterruptSupport.this.interrupt(); // 位置3
                                             }
                                         };

    public final boolean execute() throws InterruptedException {
        try {
            blockedOn(interruptor); // 位置1
            if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
                interruptor.interrupt();
            }
            // 执行业务代码
            bussiness();
        } finally {
            blockedOn(null);   // 位置2
        }

        return interrupted;
    }

    public abstract void bussiness() ;

    public abstract void interrupt();

    // -- sun.misc.SharedSecrets --
    static void blockedOn(Interruptible intr) { // package-private
        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
    }
}

 

代码说明,几个取巧的点:

位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。

位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。

位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。

 

使用: 

 

Java代码 复制代码 收藏代码
  1. class InterruptRead extends InterruptSupport {   
  2.   
  3.     private FileInputStream in;   
  4.   
  5.     @Override  
  6.     public void bussiness() {   
  7.         File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完   
  8.         try {   
  9.             in = new FileInputStream(file);   
  10.             byte[] bytes = new byte[1024];   
  11.             while (in.read(bytes, 01024) > 0) {   
  12.                 // Thread.sleep(100);   
  13.                 // if (Thread.interrupted()) {// 以前的Interrupt检查方式   
  14.                 // throw new InterruptedException("");   
  15.                 // }   
  16.             }   
  17.         } catch (Exception e) {   
  18.             throw new RuntimeException(e);   
  19.         }   
  20.     }   
  21.   
  22.     public FileInputStream getIn() {   
  23.         return in;   
  24.     }   
  25.   
  26.     @Override  
  27.     public void interrupt() {   
  28.         try {   
  29.             in.getChannel().close();   
  30.         } catch (IOException e) {   
  31.             e.printStackTrace();   
  32.         }   
  33.     }   
  34.   
  35. }   
  36.   
  37. public static void main(String args[]) throws Exception {   
  38.         final InterruptRead test = new InterruptRead();   
  39.         Thread t = new Thread() {   
  40.   
  41.             @Override  
  42.             public void run() {   
  43.                 long start = System.currentTimeMillis();   
  44.                 try {   
  45.                     System.out.println("InterruptRead start!");   
  46.                     test.execute();   
  47.                 } catch (InterruptedException e) {   
  48.                     System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));   
  49.                     e.printStackTrace();   
  50.                 }   
  51.             }   
  52.         };   
  53.         t.start();   
  54.         // 先让Read执行3秒   
  55.         Thread.sleep(3000);   
  56.         // 发出interrupt中断   
  57.         t.interrupt();   
  58.     }  
class InterruptRead extends InterruptSupport {

    private FileInputStream in;

    @Override
    public void bussiness() {
        File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
        try {
            in = new FileInputStream(file);
            byte[] bytes = new byte[1024];
            while (in.read(bytes, 0, 1024) > 0) {
                // Thread.sleep(100);
                // if (Thread.interrupted()) {// 以前的Interrupt检查方式
                // throw new InterruptedException("");
                // }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public FileInputStream getIn() {
        return in;
    }

    @Override
    public void interrupt() {
        try {
            in.getChannel().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

public static void main(String args[]) throws Exception {
        final InterruptRead test = new InterruptRead();
        Thread t = new Thread() {

            @Override
            public void run() {
                long start = System.currentTimeMillis();
                try {
                    System.out.println("InterruptRead start!");
                    test.execute();
                } catch (InterruptedException e) {
                    System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
                    e.printStackTrace();
                }
            }
        };
        t.start();
        // 先让Read执行3秒
        Thread.sleep(3000);
        // 发出interrupt中断
        t.interrupt();
    }

 

 

jdk源码介绍: 

1. sun提供的钩子可以查看System的相关代码, line : 1125

 

System代码 复制代码 收藏代码
  1. sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){   
  2.             public sun.reflect.ConstantPool getConstantPool(Class klass) {   
  3.                 return klass.getConstantPool();   
  4.             }   
  5.             public void setAnnotationType(Class klass, AnnotationType type) {   
  6.                 klass.setAnnotationType(type);   
  7.             }   
  8.             public AnnotationType getAnnotationType(Class klass) {   
  9.                 return klass.getAnnotationType();   
  10.             }   
  11.             public <E extends Enum<E>>   
  12.             E[] getEnumConstantsShared(Class<E> klass) {   
  13.                 return klass.getEnumConstantsShared();   
  14.             }   
  15.             public void blockedOn(Thread t, Interruptible b) {   
  16.                 t.blockedOn(b);   
  17.             }   
  18.         });  
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
            public sun.reflect.ConstantPool getConstantPool(Class klass) {
                return klass.getConstantPool();
            }
            public void setAnnotationType(Class klass, AnnotationType type) {
                klass.setAnnotationType(type);
            }
            public AnnotationType getAnnotationType(Class klass) {
                return klass.getAnnotationType();
            }
            public <E extends Enum<E>>
		    E[] getEnumConstantsShared(Class<E> klass) {
                return klass.getEnumConstantsShared();
            }
            public void blockedOn(Thread t, Interruptible b) {
                t.blockedOn(b);
            }
        });

 

 2. Thread.interrupt()

 

Java代码 复制代码 收藏代码
  1. public void interrupt() {   
  2.     if (this != Thread.currentThread())   
  3.         checkAccess();   
  4.   
  5.     synchronized (blockerLock) {   
  6.         Interruptible b = blocker;   
  7.         if (b != null) {   
  8.         interrupt0();       // Just to set the interrupt flag   
  9.         b.interrupt(); //回调钩子   
  10.         return;   
  11.         }   
  12.     }   
  13.     interrupt0();   
  14.     }  
public void interrupt() {
	if (this != Thread.currentThread())
	    checkAccess();

	synchronized (blockerLock) {
	    Interruptible b = blocker;
	    if (b != null) {
		interrupt0();		// Just to set the interrupt flag
		b.interrupt(); //回调钩子
		return;
	    }
	}
	interrupt0();
    }

 

 

更多

更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

 

 

最后来解答一下之前的几个问题:

问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? 

答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。

 

Java代码 复制代码 收藏代码
  1. if (Thread.interrupted())  // Clears interrupted status!   
  2.     throw new InterruptedException();  
if (Thread.interrupted())  // Clears interrupted status!
    throw new InterruptedException();

 

问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?

答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。

 

问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?

答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。

 

问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?

答:

1.  面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

2.  实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

 

 

 

问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?

答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.

 

Java代码 复制代码 收藏代码
  1. public static void park(Object blocker) {   
  2.         Thread t = Thread.currentThread();   
  3.         setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值   
  4.         unsafe.park(false, 0L);   
  5.         setBlocker(t, null);  // 清除Thread.parkBlocker属性的值   
  6.     }  
public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值
        unsafe.park(false, 0L);
        setBlocker(t, null);  // 清除Thread.parkBlocker属性的值
    }

 具体LockSupport的javadoc描述也比较清楚,可以看下:

 

 

问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?

答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:


 

相关测试代码

 

Java代码 复制代码 收藏代码
  1. package com.agapple.cocurrent;   
  2.   
  3. import java.io.File;   
  4. import java.io.FileInputStream;   
  5. import java.lang.reflect.Field;   
  6. import java.util.concurrent.TimeUnit;   
  7. import java.util.concurrent.locks.LockSupport;   
  8.   
  9. public class LockSupportTest {   
  10.   
  11.     private static LockSupportTest blocker = new LockSupportTest();   
  12.   
  13.     public static void main(String args[]) throws Exception {   
  14.         lockSupportTest();   
  15.         parkTest();   
  16.         interruptParkTest();   
  17.         interruptSleepTest();   
  18.         interruptWaitTest();   
  19.     }   
  20.   
  21.     /**  
  22.      * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒  
  23.      *   
  24.      * @throws Exception  
  25.      */  
  26.     private static void lockSupportTest() throws Exception {   
  27.         Thread t = doTest(new TestCallBack() {   
  28.   
  29.             @Override  
  30.             public void callback() throws Exception {   
  31.                 // 尝试sleep 5s   
  32.                 System.out.println("blocker");   
  33.                 LockSupport.park(blocker);   
  34.                 System.out.println("wakeup now!");   
  35.             }   
  36.   
  37.             @Override  
  38.             public String getName() {   
  39.                 return "lockSupportTest";   
  40.             }   
  41.   
  42.         });   
  43.         t.start(); // 启动读取线程   
  44.   
  45.         Thread.sleep(150);   
  46.         synchronized (blocker) {   
  47.             Field field = Thread.class.getDeclaredField("parkBlocker");   
  48.             field.setAccessible(true);   
  49.             Object fBlocker = field.get(t);   
  50.             System.out.println(blocker == fBlocker);   
  51.             Thread.sleep(100);   
  52.             System.out.println("notifyAll");   
  53.             blocker.notifyAll();   
  54.         }   
  55.     }   
  56.   
  57.     /**  
  58.      * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常  
  59.      *   
  60.      * @throws InterruptedException  
  61.      */  
  62.     private static void interruptWaitTest() throws InterruptedException {   
  63.         final Object obj = new Object();   
  64.         Thread t = doTest(new TestCallBack() {   
  65.   
  66.             @Override  
  67.             public void callback() throws Exception {   
  68.                 // 尝试sleep 5s   
  69.                 obj.wait();   
  70.                 System.out.println("wakeup now!");   
  71.             }   
  72.   
  73.             @Override  
  74.             public String getName() {   
  75.                 return "interruptWaitTest";   
  76.             }   
  77.   
  78.         });   
  79.         t.start(); // 启动读取线程   
  80.         Thread.sleep(2000);   
  81.         t.interrupt(); // 检查下在park时,是否响应中断   
  82.     }   
  83.   
  84.     /**  
  85.      * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常  
  86.      *   
  87.      * @throws InterruptedException  
  88.      */  
  89.     private static void interruptSleepTest() throws InterruptedException {   
  90.         Thread t = doTest(new TestCallBack() {   
  91.   
  92.             @Override  
  93.             public void callback() throws Exception {   
  94.                 // 尝试sleep 5s   
  95.                 Thread.sleep(5000);   
  96.                 System.out.println("wakeup now!");   
  97.             }   
  98.   
  99.             @Override  
  100.             public String getName() {   
  101.                 return "interruptSleepTest";   
  102.             }   
  103.   
  104.         });   
  105.         t.start(); // 启动读取线程   
  106.         Thread.sleep(2000);   
  107.         t.interrupt(); // 检查下在park时,是否响应中断   
  108.     }   
  109.   
  110.     /**  
  111.      * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常  
  112.      *   
  113.      * @throws InterruptedException  
  114.      */  
  115.     private static void interruptParkTest() throws InterruptedException {   
  116.         Thread t = doTest(new TestCallBack() {   
  117.   
  118.             @Override  
  119.             public void callback() {   
  120.                 // 尝试去park 自己线程   
  121.                 LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));   
  122.                 System.out.println("wakeup now!");   
  123.             }   
  124.   
  125.             @Override  
  126.             public String getName() {   
  127.                 return "interruptParkTest";   
  128.             }   
  129.   
  130.         });   
  131.         t.start(); // 启动读取线程   
  132.         Thread.sleep(2000);   
  133.         t.interrupt(); // 检查下在park时,是否响应中断   
  134.     }   
  135.   
  136.     /**  
  137.      * 尝试去中断一个LockSupport.unPark(),会有响应  
  138.      *   
  139.      * @throws InterruptedException  
  140.      */  
  141.     private static void parkTest() throws InterruptedException {   
  142.         Thread t = doTest(new TestCallBack() {   
  143.   
  144.             @Override  
  145.             public void callback() {   
  146.                 // 尝试去park 自己线程   
  147.                 LockSupport.park(blocker);   
  148.                 System.out.println("wakeup now!");   
  149.             }   
  150.   
  151.             @Override  
  152.             public String getName() {   
  153.                 return "parkTest";   
  154.             }   
  155.   
  156.         });   
  157.   
  158.         t.start(); // 启动读取线程   
  159.         Thread.sleep(2000);   
  160.         LockSupport.unpark(t);   
  161.         t.interrupt();   
  162.     }   
  163.   
  164.     public static Thread doTest(final TestCallBack call) {   
  165.         return new Thread() {   
  166.   
  167.             @Override  
  168.             public void run() {   
  169.                 File file = new File("/dev/urandom"); // 读取linux黑洞   
  170.                 try {   
  171.                     FileInputStream in = new FileInputStream(file);   
  172.                     byte[] bytes = new byte[1024];   
  173.                     while (in.read(bytes, 01024) > 0) {   
  174.                         if (Thread.interrupted()) {   
  175.                             throw new InterruptedException("");   
  176.                         }   
  177.                         System.out.println(bytes[0]);   
  178.                         Thread.sleep(100);   
  179.                         long start = System.currentTimeMillis();   
  180.                         call.callback();   
  181.                         System.out.println(call.getName() + " callback finish cost : "  
  182.                                            + (System.currentTimeMillis() - start));   
  183.                     }   
  184.                 } catch (Exception e) {   
  185.                     e.printStackTrace();   
  186.                 }   
  187.             }   
  188.   
  189.         };   
  190.     }   
  191. }   
  192.   
  193. interface TestCallBack {   
  194.   
  195.     public void callback() throws Exception;   
  196.   
  197.     public String getName();   
  198. }  
package com.agapple.cocurrent;

import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class LockSupportTest {

    private static LockSupportTest blocker = new LockSupportTest();

    public static void main(String args[]) throws Exception {
        lockSupportTest();
        parkTest();
        interruptParkTest();
        interruptSleepTest();
        interruptWaitTest();
    }

    /**
     * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒
     * 
     * @throws Exception
     */
    private static void lockSupportTest() throws Exception {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() throws Exception {
                // 尝试sleep 5s
                System.out.println("blocker");
                LockSupport.park(blocker);
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "lockSupportTest";
            }

        });
        t.start(); // 启动读取线程

        Thread.sleep(150);
        synchronized (blocker) {
            Field field = Thread.class.getDeclaredField("parkBlocker");
            field.setAccessible(true);
            Object fBlocker = field.get(t);
            System.out.println(blocker == fBlocker);
            Thread.sleep(100);
            System.out.println("notifyAll");
            blocker.notifyAll();
        }
    }

    /**
     * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常
     * 
     * @throws InterruptedException
     */
    private static void interruptWaitTest() throws InterruptedException {
        final Object obj = new Object();
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() throws Exception {
                // 尝试sleep 5s
                obj.wait();
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "interruptWaitTest";
            }

        });
        t.start(); // 启动读取线程
        Thread.sleep(2000);
        t.interrupt(); // 检查下在park时,是否响应中断
    }

    /**
     * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常
     * 
     * @throws InterruptedException
     */
    private static void interruptSleepTest() throws InterruptedException {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() throws Exception {
                // 尝试sleep 5s
                Thread.sleep(5000);
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "interruptSleepTest";
            }

        });
        t.start(); // 启动读取线程
        Thread.sleep(2000);
        t.interrupt(); // 检查下在park时,是否响应中断
    }

    /**
     * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常
     * 
     * @throws InterruptedException
     */
    private static void interruptParkTest() throws InterruptedException {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() {
                // 尝试去park 自己线程
                LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "interruptParkTest";
            }

        });
        t.start(); // 启动读取线程
        Thread.sleep(2000);
        t.interrupt(); // 检查下在park时,是否响应中断
    }

    /**
     * 尝试去中断一个LockSupport.unPark(),会有响应
     * 
     * @throws InterruptedException
     */
    private static void parkTest() throws InterruptedException {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() {
                // 尝试去park 自己线程
                LockSupport.park(blocker);
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "parkTest";
            }

        });

        t.start(); // 启动读取线程
        Thread.sleep(2000);
        LockSupport.unpark(t);
        t.interrupt();
    }

    public static Thread doTest(final TestCallBack call) {
        return new Thread() {

            @Override
            public void run() {
                File file = new File("/dev/urandom"); // 读取linux黑洞
                try {
                    FileInputStream in = new FileInputStream(file);
                    byte[] bytes = new byte[1024];
                    while (in.read(bytes, 0, 1024) > 0) {
                        if (Thread.interrupted()) {
                            throw new InterruptedException("");
                        }
                        System.out.println(bytes[0]);
                        Thread.sleep(100);
                        long start = System.currentTimeMillis();
                        call.callback();
                        System.out.println(call.getName() + " callback finish cost : "
                                           + (System.currentTimeMillis() - start));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        };
    }
}

interface TestCallBack {

    public void callback() throws Exception;

    public String getName();
}

 

 

最后

发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.

 

本文仅当抛砖引玉,欢迎发言!

分享到:
评论

相关推荐

    java线程阻塞中断与LockSupport使用介绍

    Java线程阻塞中断是Java并发编程中的一个重要概念,它涉及到线程的生命周期管理以及协作。`Thread.interrupt()` 方法是用于向线程发送中断请求,而`LockSupport` 是Java 5引入的一个低级别的线程同步工具,提供了比`...

    详解Java多线程编程中LockSupport类的线程阻塞用法

    在Java多线程编程中,LockSupport类是一个重要的工具,它提供了一种低级别的线程阻塞和唤醒机制。LockSupport并不像synchronized或java.util.concurrent.locks包中的Lock接口那样提供锁的完整功能,但它提供了两个...

    Java并发编程之LockSupport、Unsafe详解.docx

    在Java并发编程中,LockSupport和Unsafe是两个关键的工具类,它们提供了底层的线程控制功能,使得开发者能够深入地管理和控制线程的行为。LockSupport是Java并发库中的一个核心工具类,它提供了线程的阻塞和唤醒功能...

    Java中LockSupport的使用.docx

    LockSupport的核心功能是提供线程阻塞和唤醒的原语,这些功能在Java的并发编程中扮演着重要角色。Java的锁和同步器框架AbstractQueuedSynchronizer(AQS)就是通过LockSupport的`park()`和`unpark()`方法来管理线程...

    Java 多线程与并发(9-26)-JUC锁- LockSupport详解.pdf

    LockSupport是Java中用于多线程同步的一个工具类,它提供了一组基础的线程阻塞和解除阻塞的方法。这个类位于java.util.concurrent.locks包下,是实现并发编程中AQS(AbstractQueuedSynchronizer)框架的重要基础之一...

    浅析JAVA多线程.pdf

    Java 5.0版本中引入了java.util.concurrent.locks包,其中提供了LockSupport类,它是创建锁和其他同步类的基本线程阻塞原语。LockSupport类中的park方法用于阻塞当前线程,直到获取许可证;unpark方法用于释放指定...

    Java多线程和并发知识整理

    LockSupport是线程阻塞和唤醒的基础,用于构建锁和其他同步组件。 以上内容涵盖了Java多线程和并发编程的主要知识点,从理论到实践,从基础到高级,全面解析了并发编程的核心概念和工具。掌握这些知识,开发者可以...

    Java多线程超级详解(看这篇就足够了).pdf

    创建Java线程主要有五种方式: 1. 继承`Thread`类:创建一个新的类继承自`Thread`,并重写`run()`方法,然后创建该类的实例并调用`start()`。 2. 实现`Runnable`接口:创建一个实现`Runnable`接口的类,重写`run()`...

    java Thread

    本篇将深入探讨Java线程的核心概念、创建方式以及常见操作。 1. **线程的基本概念** - 进程:操作系统分配资源的基本单位,每个进程有自己的内存空间。 - 线程:进程中执行任务的实体,多个线程共享同一进程的...

    java线程入门

    上版本中,线程的状态模型进行了扩展,包括了新建(NEW)、可运行(RUNNABLE)、阻塞(BLOCKED)、等待(WAITING)、定时等待(TIMED_WAITING)和终止(TERMINATED)六种状态。这些状态描述了线程在其生命周期中的...

    LockSupportTester.zip

    LockSupport是Java并发编程中的一个重要工具类,位于`java.util.concurrent.locks`包下,它提供了线程阻塞和唤醒的底层支持。在Java多线程编程中,LockSupport主要用于构建锁和其他同步机制,比如ReentrantLock和...

    java学习资料-线程

    Java线程有五种基本状态:NEW、RUNNABLE、BLOCKED、WAITING 和 TERMINATED。线程从新建(NEW)开始,调用start()后变为可运行(RUNNABLE)。在执行过程中,线程可能进入阻塞(BLOCKED)状态,如等待锁或I/O完成。...

    java+多线程+同步详解Java源码

    综上所述,Java的多线程和同步机制是实现并发编程的基础,通过合理利用这些机制,开发者可以编写出高效、安全的多线程应用程序。在实际工作中,不仅需要理解这些概念,还要能够熟练地应用到项目中,解决多线程环境下...

    多线程思维导图总结

    4. **条件变量(Condition)**:配合锁使用,线程可以等待某个条件成立再继续执行,如`LockSupport.park()`和`LockSupport.unpark()`。 5. **等待/通知机制**:Java中的`Object`类提供了`wait()`, `notify()`和`...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    LockSupport是线程阻塞和唤醒的低级工具,提供了park()和unpark()方法,用于线程的挂起和恢复。它是基于 Unsafe 类实现的,可以实现非阻塞的线程挂起,提高并发效率。 三、Condition接口的设计与实现 Condition接口...

    Java 多线程共享模型之管程(下).doc

    `park()`和`unpark()`是`java.util.concurrent.locks.LockSupport`类中的方法,它们提供了一种低级别的线程阻塞和唤醒机制。`park()`可以使当前线程暂停执行,直到被`unpark()`或者其他外部因素唤醒。`unpark()`则...

    JUC学习.docx Java

    `park`和`unpark`是`LockSupport`提供的方法,用于线程的阻塞和唤醒控制。`yield`让当前线程自愿放弃CPU时间片,但并不保证其他线程会被立即调度。`join`等待其他线程结束,`wait`和`notify`是对象监视器(Monitor)...

    Java concurrency之锁_动力节点Java学院

    4. **LockSupport**:这是一个低级别的线程阻塞和唤醒工具,提供`park()`和`unpark()`方法,避免了`Thread.suspend()`和`Thread.resume()`可能导致的死锁问题。 5. **Condition接口**:Condition允许更精确的线程间...

    [bugfix]重新理解Thread的InterruptedException

    Java中的线程中断是一种协作机制,它并不强制停止一个线程,而是通过设置线程的中断状态来通知线程应该停止当前的工作。当一个线程调用`interrupt()`方法时,它不会立即停止,而是设置一个中断标志。这个标志可以在...

    java多线程之线程同步七种方式代码示例

    `LockSupport`工具类提供了低级别的线程阻塞和唤醒功能。`park()`方法会阻塞当前线程,`unpark(Thread thread)`则会唤醒指定的线程。 6. Condition(条件变量) `java.util.concurrent.locks.Condition`接口提供了...

Global site tag (gtag.js) - Google Analytics