ActiveMQ之三 -- 使用ActiveMQ来传送文件
Java的每一个对象除了有一个相关的monitor以外(用做synchronized lock),还有一个相关的wait set,用以存放处于WAITING状态的线程
- wait set是线程的集合
- 当Java对象创建的时候,其wait set是空的。对于wait set操作(将线程加入或移出wait set)都是原子操作
- 对于wait set的操作(加入或移出),而且只能通过Object.wait,Object.notify,Object.notifyAll这三个操作来进行。当线程执行到Object.wait指令后,就会进入到wait set集合中;而执行到Object.notify,Object.notifyAll指令后,则通知处于wait set中的线程,条件满足了。
- 一个线程需要某个条件继续执行,并且认为别的线程会创造这个条件。于是这个线程就呆在wait set里面等待
- 当别的线程创造了这个条件,则通知等待条件的线程
void wait()
该方法必须在synchronized方法或synchronized块中呼叫 |
void wait(long timeout)
等待条件发生。但和wait()方法不同之处在于,如果在timeout(毫秒)的时间内,通知没有到来,则自己略过条件等待,移出wait set,继续执行。
该方法必须在synchronized方法或synchronized块中呼叫 |
void wait(long timeout,int nanos)
等待条件发生。观念和wait(long timeout)是一样的。唯一的差别timeout的时间,到了纳秒级别。
该方法必须在synchronized方法或synchronized块中呼叫 |
void notify()
通知位于wait set中的某一个线程,条件已经发生了。
该方法必须在synchronized方法或synchronized块中呼叫 |
void notifyAll()
通知位于wait set中的所有线程,条件已经发生了。
该方法必须在synchronized方法或synchronized块中呼叫 |
public class ProducerConsumerV6 { private List<Integer> list = null; private Random random = null; private Object lock; public ProducerConsumerV6() { list = new ArrayList<Integer>(); random = new Random(); lock = new Object(); } public void produce() throws InterruptedException { synchronized (lock) { if (list.size() == 0) {// 这个对象的wait set里面有consumer线程 lock.notifyAll();// notify() can work } System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t"); int size = list.size(); for (int i = 0; i < 10 - size; i++) { list.add(random.nextInt(1000)); } System.out.println(list); } } /** * 这里有个问题,当producer产生的食物总量,比consumer的数量少的时候 则有可能发生发生一场状况 * * @throws InterruptedException */ public void consume() throws InterruptedException { synchronized (lock) { while (list.size() == 0) {// 测试竞争条件 // 注意这里必须用while,当一个consumer线程醒来的时候,还必须 // 继续检查list是否为空,如果是则继续等待。否则就继续下一条指 // 令执行 lock.wait(); } System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t"); list.remove(0); System.out.println(list); } } public static void main(String[] args) throws Exception { ProducerConsumerV6 pc = new ProducerConsumerV6(); for (int i = 0; i < 50; i++) { Thread thread = new Thread(pc.new Consumer()); thread.setName("Consumer " + i); thread.start(); } for (int i = 0; i < 2; i++) { Thread thread = new Thread(pc.new Producer()); thread.setName("Producer " + i); thread.start(); } } public class Producer implements Runnable { @Override public void run() { while (true) { try { produce(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return; } } } } public class Consumer implements Runnable { @Override public void run() { while (true) { try { consume(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return; } } } } }
public class ProducerConsumerV7 { private List<Integer> list = null; private Random random = null; public ProducerConsumerV7() { list = new ArrayList<Integer>(); random = new Random(); } public synchronized void produce() throws InterruptedException { if (list.size() == 0) {//这个对象的wait set里面有consumer线程 notifyAll();//notify() can work } System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t"); int size = list.size(); for (int i = 0; i < 10 - size; i++) { list.add(random.nextInt(1000)); } System.out.println(list); } /** * 这里有个问题,当producer产生的食物总量,比consumer的数量少的时候 * 则有可能发生发生一场状况 * @throws InterruptedException */ public synchronized void consume() throws InterruptedException { while(list.size() == 0) {//测试竞争条件 //注意这里必须用while,当一个consumer线程醒来的时候,还必须 //继续检查list是否为空,如果是则继续等待。否则就继续下一条指 //令执行 wait(); } System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t"); list.remove(0); System.out.println(list); } public static void main(String[] args) throws Exception { ProducerConsumerV7 pc = new ProducerConsumerV7(); for (int i = 0; i < 50; i++) { Thread thread = new Thread(pc.new Consumer()); thread.setName("Consumer " + i); thread.start(); } for (int i = 0; i <2; i++) { Thread thread = new Thread(pc.new Producer()); thread.setName("Producer " + i); thread.start(); } } public class Producer implements Runnable { @Override public void run() { while (true) { try { produce(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return; } } } } public class Consumer implements Runnable { @Override public void run() { while (true) { try { consume(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return; } } } } }
- Object.wait方法和Thread.sleep方法的不同之处在于,Object.wait方法需要获得对象的synchronized 锁――即对象的Monitor。Object.wait方法执行时,线程就会进入对象的wait set中,synchronized锁就会被自动释放掉(由JVM内部完成这个操作)。一旦接受到通知,线程从Object.wait中返回之前,必须首先获得synchronized锁。所以Java线程Wait-And-Notification机制,必须有赖于synchronized锁。所以上面表格中所列的几个方法必须在synchronized方法或synchronized块中执行。
- 当线程从wait方法中唤醒以后,这时已经获取了synchronized锁,就会接着wait指令后面的指令继续执行。
- notify() 和 notifyAll()方法类似,区别在于notifyAll会“唤醒”处于wait set中的所有的线程,只是这些线程仍需竞争synchronized lock。只有获取了synchronized lock的线程才能继续执行。(似乎这些线程的状态发生了迁移,从WAITING态迁移到了BLOCKED状态),而notify只是唤醒一个线程,但具体是哪个线程由JVM决定,我们自己对此无能为力。
