- 浏览: 562944 次
- 性别:
- 来自: 北京
最新评论
-
anqinghaozi:
请问 你这weblogic jms 如何部署到tomcat上去 ...
Spring+weblogic接收JMS消息 -
cjliang:
1456746014@qq.com 我也要
jqGrid学习 ----------------- 第一个实例 -
Moy_Yeung:
Moy_Yeung 写道 您好 我最近在学习使用这个插件 能麻 ...
jqGrid学习 ----------------- 第一个实例 -
Moy_Yeung:
您好 我最近在学习使用这个插件 能麻烦博主发份demo吗 谢 ...
jqGrid学习 ----------------- 第一个实例 -
十叶木竹:
最近在自学这个插件,麻烦博主,发一份源码,以供学习,谢谢博主啦 ...
jqGrid学习 ----------------- 第一个实例
一篇介绍java线程的文章,个人感觉说的非常的好。
地址:http://www.liaoxuefeng.com/it-d225a33ad6e947cea997cc02b1826e7f-1
Java多线程设计模式
线程的创建和启动
Java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。
Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法:
view sourceprint?
1.Thread t = new Thread();
2.t.start();
start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不做就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。
因此,有两个方法可以实现自己的线程:
方法1:自己的类extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:
view sourceprint?
1.public class MyThread extends Thread {
2. public run() {
3. System.out.println("MyThread.run()");
4. }
5.}
在合适的地方启动线程:new MyThread().start();
方法2:如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口:
view sourceprint?
1.public class MyThread extends OtherClass implements Runnable {
2. public run() {
3. System.out.println("MyThread.run()");
4. }
5.}
为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:
view sourceprint?
1.MyThread myt = new MyThread();
2.Thread t = new Thread(myt);
3.t.start();
事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:
view sourceprint?
1.public void run() {
2. if (target != null) {
3. target.run();
4. }
5.}
线程还有一些Name, ThreadGroup, isDaemon等设置,由于和线程设计模式关联很少,这里就不多说了。
线程的同步
由于同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会造成冲突,因此,线程必须实现共享互斥,使多线程同步。
最简单的同步是将一个方法标记为synchronized,对同一个实例来说,任一时刻只能有一个synchronized方法在执行。当一个方法正在执行某个synchronized方法时,其他线程如果想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。
但是,非synchronized方法不受影响,不管当前有没有执行synchronized方法,非synchronized方法都可以被多个线程同时执行。
此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不同实例的synchronized方法是可以并发的。例如,class A定义了synchronized方法sync(),则不同实例a1.sync()和a2.sync()可以同时由两个线程来执行。
Java锁机制
多线程同步的实现最终依赖锁机制。我们可以想象某一共享资源是一间屋子,每个人都是一个线程。当A希望进入房间时,他必须获得门锁,一旦A获得门锁,他进去后就立刻将门锁上,于是B,C,D...就不得不在门外等待,直到A释放锁出来后,B,C,D...中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,可以先到先得,也可以随机挑选),然后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。
Java语言规范内置了对多线程的支持。对于Java程序来说,每一个对象实例都有一把“锁”,一旦某个线程获得了该锁,别的线程如果希望获得该锁,只能等待这个线程释放锁之后。获得锁的方法只有一个,就是synchronized关键字。例如:
view sourceprint?
1.public class SharedResource {
2. private int count = 0;
3.
4. public int getCount() { return count; }
5.
6. public synchronized void setCount(int count) { this.count = count; }
7.}
注意,如果将synchronized关键字标记在方法上,例如上面的:
view sourceprint?
1.public synchronized void setCount(int count) { ... }
那么,锁住的是哪个对象呢?答案是this对象,因此,以上方法事实上完全等同于下面的写法:
view sourceprint?
1.public void setCount(int count) {
2. synchronized(this) { // 在此获得this锁
3. this.count = count;
4. } // 在此释放this锁
5.}
synchronized {}括号内的部分表示需要同步的代码段,该区域为“危险区域”,如果两个以上的线程同时执行,会引发冲突,因此,要更改SharedResource的内部状态,必须先获得SharedResource实例的锁。
退出synchronized块时,线程拥有的锁自动释放,于是,别的线程又可以获取该锁了。
为了提高性能,不一定要锁定this,例如,SharedResource有两个独立变化的变量:
view sourceprint?
1.public class SharedResouce {
2. private int a = 0;
3. private int b = 0;
4.
5. public synchronized void setA(int a) { this.a = a; }
6. public synchronized void setB(int b) { this.b = b; }
7.}
若同步整个方法,则setA()的时候无法setB(),setB()时无法setA()。为了提高性能,可以使用不同对象的锁:
view sourceprint?
01.public class SharedResouce {
02. private int a = 0;
03. private int b = 0;
04. private Object sync_a = new Object();
05. private Object sync_b = new Object();
06.
07. public void setA(int a) {
08. synchronized(sync_a) {
09. this.a = a;
10. }
11. }
12.
13. public synchronized void setB(int b) {
14. synchronized(sync_b) {
15. this.b = b;
16. }
17. }
18.}
如果将synchronized关键字标记在静态方法上,由于静态方法不可能访问this实例,那么,锁住的是哪个对象呢?答案是当前类的Class对象,原因是每个对象的Class实例是唯一且不可变的。比如:
view sourceprint?
1.public synchronized static void sync() { ... }
事实上完全等同于下面的写法:
view sourceprint?
1.public static void sync() {
2. synchronized(SharedResource.class) {
3. ...
4. }
5.}
wait/notify机制
通常,多线程之间需要协调工作。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。如果图片还没有下载完,displayThread可以暂停,当downloadThread完成了任务后,再通知displayThread“图片准备完毕,可以显示了”,这时,displayThread继续执行。
以上逻辑简单的说就是:如果条件不满足,则等待。当条件满足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait/notify。等待机制与锁机制是密切关联的。例如:
view sourceprint?
1.synchronized(obj) {
2. while(!condition) {
3. obj.wait();
4. }
5. obj.doSomething();
6.}
当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait()。
在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A:
view sourceprint?
1.synchronized(obj) {
2. condition = true;
3. obj.notify();
4.}
需要注意的概念是:
# 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) {...} 代码段内。
# 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj) {...} 代码段内唤醒A。
# 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。
# 如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。
# obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。
# 当B调用obj.notify/notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。
wait/sleep的区别
Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不同点是:sleep并不释放锁,并且sleep的暂停和wait暂停是不一样的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。
但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。
如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在 wait/sleep/join,则线程B会立刻抛出InterruptedException,在catch() {} 中直接return即可安全地结束线程。
需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用 interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到 wait()/sleep()/join()后,就会立刻抛出InterruptedException。
Worker Pattern
前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。
然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。
每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。
Worker Pattern实现了类似线程池的功能。首先定义Task接口:
view sourceprint?
1.public interface Task {
2. void execute();
3.}
线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。
具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask:
view sourceprint?
01.// CalculateTask.java
02.public class CalculateTask implements Task {
03. private static int count = 0;
04. private int num = count;
05. public CalculateTask() {
06. count++;
07. }
08. public void execute() {
09. System.out.println("[CalculateTask " + num + "] start...");
10. try {
11. Thread.sleep(3000);
12. }
13. catch(InterruptedException ie) {}
14. System.out.println("[CalculateTask " + num + "] done.");
15. }
16.}
17.
18.// TimerTask.java
19.public class TimerTask implements Task {
20. private static int count = 0;
21. private int num = count;
22. public TimerTask() {
23. count++;
24. }
25. public void execute() {
26. System.out.println("[TimerTask " + num + "] start...");
27. try {
28. Thread.sleep(2000);
29. }
30. catch(InterruptedException ie) {}
31. System.out.println("[TimerTask " + num + "] done.");
32. }
33.}
以上任务均简单的sleep若干秒。
TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:
view sourceprint?
01.import java.util.*;
02.
03.public class TaskQueue {
04. private List queue = new LinkedList();
05. public synchronized Task getTask() {
06. while(queue.size()==0) {
07. try {
08. this.wait();
09. }
10. catch(InterruptedException ie) {
11. return null;
12. }
13. }
14. return (Task)queue.remove(0);
15. }
16. public synchronized void putTask(Task task) {
17. queue.add(task);
18. this.notifyAll();
19. }
20.}
终于到了真正的WorkerThread,这是真正执行任务的服务器线程:
view sourceprint?
01.public class WorkerThread extends Thread {
02. private static int count = 0;
03. private boolean busy = false;
04. private boolean stop = false;
05. private TaskQueue queue;
06. public WorkerThread(ThreadGroup group, TaskQueue queue) {
07. super(group, "worker-" + count);
08. count++;
09. this.queue = queue;
10. }
11. public void shutdown() {
12. stop = true;
13. this.interrupt();
14. try {
15. this.join();
16. }
17. catch(InterruptedException ie) {}
18. }
19. public boolean isIdle() {
20. return !busy;
21. }
22. public void run() {
23. System.out.println(getName() + " start.");
24. while(!stop) {
25. Task task = queue.getTask();
26. if(task!=null) {
27. busy = true;
28. task.execute();
29. busy = false;
30. }
31. }
32. System.out.println(getName() + " end.");
33. }
34.}
前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。
最后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数:
view sourceprint?
01.import java.util.*;
02.
03.public class ThreadPool extends ThreadGroup {
04. private List threads = new LinkedList();
05. private TaskQueue queue;
06. public ThreadPool(TaskQueue queue) {
07. super("Thread-Pool");
08. this.queue = queue;
09. }
10. public synchronized void addWorkerThread() {
11. Thread t = new WorkerThread(this, queue);
12. threads.add(t);
13. t.start();
14. }
15. public synchronized void removeWorkerThread() {
16. if(threads.size()>0) {
17. WorkerThread t = (WorkerThread)threads.remove(0);
18. t.shutdown();
19. }
20. }
21. public synchronized void currentStatus() {
22. System.out.println("-----------------------------------------------");
23. System.out.println("Thread count = " + threads.size());
24. Iterator it = threads.iterator();
25. while(it.hasNext()) {
26. WorkerThread t = (WorkerThread)it.next();
27. System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
28. }
29. System.out.println("-----------------------------------------------");
30. }
31.}
currentStatus()方法是为了方便调试,打印出所有线程的当前状态。
最后,Main负责完成main()方法:
view sourceprint?
01.public class Main {
02. public static void main(String[] args) {
03. TaskQueue queue = new TaskQueue();
04. ThreadPool pool = new ThreadPool(queue);
05. for(int i=0; i<10; i++) {
06. queue.putTask(new CalculateTask());
07. queue.putTask(new TimerTask());
08. }
09. pool.addWorkerThread();
10. pool.addWorkerThread();
11. doSleep(8000);
12. pool.currentStatus();
13. pool.addWorkerThread();
14. pool.addWorkerThread();
15. pool.addWorkerThread();
16. pool.addWorkerThread();
17. pool.addWorkerThread();
18. doSleep(5000);
19. pool.currentStatus();
20. }
21. private static void doSleep(long ms) {
22. try {
23. Thread.sleep(ms);
24. }
25. catch(InterruptedException ie) {}
26. }
27.}
main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:
worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.
仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,7个线程中的两个状态变成idle,说明处于wait()状态。
思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。
如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,最终会导致服务器内存耗尽。因此,可以限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE
从JDK 5开始,java.util.concurrent包已经内置了Worker线程模式(即java.util.concurrent.Executors),无需我们手动编写上述代码。不过,理解Worker模式的原理非常重要。
ReadWriteLock模式
多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。简单说,就是要避免“写-写”冲突和“读-写”冲突。但是同时读是允许的,因为“读-读”不冲突,而且很安全。
要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明:
DataHandler对象保存了一个可读写的char[]数组:
view sourceprint?
01.public class DataHandler {
02. // store data:
03. private char[] buffer = "AAAAAAAAAA".toCharArray();
04.
05. private char[] doRead() {
06. char[] ret = new char[buffer.length];
07. for(int i=0; i<buffer.length; i++) {
08. ret[i] = buffer[i];
09. sleep(3);
10. }
11. return ret;
12. }
13.
14. private void doWrite(char[] data) {
15. if(data!=null) {
16. buffer = new char[data.length];
17. for(int i=0; i<buffer.length; i++) {
18. buffer[i] = data[i];
19. sleep(10);
20. }
21. }
22. }
23.
24. private void sleep(int ms) {
25. try {
26. Thread.sleep(ms);
27. }
28. catch(InterruptedException ie) {}
29. }
30.}
doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合通常的情况。
为了让多线程能安全读写,我们设计了一个ReadWriteLock:
view sourceprint?
01.public class ReadWriteLock {
02. private int readingThreads = 0;
03. private int writingThreads = 0;
04. private int waitingThreads = 0; // waiting for write
05. private boolean preferWrite = true;
06.
07. public synchronized void readLock() throws InterruptedException {
08. while(writingThreads>0 || (preferWrite && waitingThreads>0))
09. this.wait();
10. readingThreads++;
11. }
12.
13. public synchronized void readUnlock() {
14. readingThreads--;
15. preferWrite = true;
16. notifyAll();
17. }
18.
19. public synchronized void writeLock() throws InterruptedException {
20. waitingThreads++;
21. try {
22. while(readingThreads>0 || writingThreads>0)
23. this.wait();
24. }
25. finally {
26. waitingThreads--;
27. }
28. writingThreads++;
29. }
30.
31. public synchronized void writeUnlock() {
32. writingThreads--;
33. preferWrite = false;
34. notifyAll();
35. }
36.}
readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock:
view sourceprint?
01.public class DataHandler {
02. // store data:
03. private char[] buffer = "AAAAAAAAAA".toCharArray();
04. // lock:
05. private ReadWriteLock lock = new ReadWriteLock();
06.
07. public char[] read(String name) throws InterruptedException {
08. System.out.println(name + " waiting for read...");
09. lock.readLock();
10. try {
11. char[] data = doRead();
12. System.out.println(name + " reads data: " + new String(data));
13. return data;
14. }
15. finally {
16. lock.readUnlock();
17. }
18. }
19.
20. public void write(String name, char[] data) throws InterruptedException {
21. System.out.println(name + " waiting for write...");
22. lock.writeLock();
23. try {
24. System.out.println(name + " wrote data: " + new String(data));
25. doWrite(data);
26. }
27. finally {
28. lock.writeUnlock();
29. }
30. }
31.
32. private char[] doRead() {
33. char[] ret = new char[buffer.length];
34. for(int i=0; i<buffer.length; i++) {
35. ret[i] = buffer[i];
36. sleep(3);
37. }
38. return ret;
39. }
40. private void doWrite(char[] data) {
41. if(data!=null) {
42. buffer = new char[data.length];
43. for(int i=0; i<buffer.length; i++) {
44. buffer[i] = data[i];
45. sleep(10);
46. }
47. }
48. }
49. private void sleep(int ms) {
50. try {
51. Thread.sleep(ms);
52. }
53. catch(InterruptedException ie) {}
54. }
55.}
public方法read()和write()完全封装了底层的ReadWriteLock,因此,多线程可以安全地调用这两个方法:
view sourceprint?
01.// ReadingThread不断读取数据:
02.public class ReadingThread extends Thread {
03. private DataHandler handler;
04. public ReadingThread(DataHandler handler) {
05. this.handler = handler;
06. }
07. public void run() {
08. for(;;) {
09. try {
10. char[] data = handler.read(getName());
11. Thread.sleep((long)(Math.random()*1000+100));
12. }
13. catch(InterruptedException ie) {
14. break;
15. }
16. }
17. }
18.}
19.
20.// WritingThread不断写入数据,每次写入的都是10个相同的字符:
21.public class WritingThread extends Thread {
22. private DataHandler handler;
23. public WritingThread(DataHandler handler) {
24. this.handler = handler;
25. }
26. public void run() {
27. char[] data = new char[10];
28. for(;;) {
29. try {
30. fill(data);
31. handler.write(getName(), data);
32. Thread.sleep((long)(Math.random()*1000+100));
33. }
34. catch(InterruptedException ie) {
35. break;
36. }
37. }
38. }
39. // 产生一个A-Z随机字符,填入char[10]:
40. private void fill(char[] data) {
41. char c = (char)(Math.random()*26+'A');
42. for(int i=0; i<data.length; i++)
43. data[i] = c;
44. }
45.}
最后Main负责启动这些线程:
view sourceprint?
01.public class Main {
02. public static void main(String[] args) {
03. DataHandler handler = new DataHandler();
04. Thread[] ts = new Thread[] {
05. new ReadingThread(handler),
06. new ReadingThread(handler),
07. new ReadingThread(handler),
08. new ReadingThread(handler),
09. new ReadingThread(handler),
10. new WritingThread(handler),
11. new WritingThread(handler)
12. };
13. for(int i=0; i<ts.length; i++) {
14. ts[i].start();
15. }
16. }
17.}
我们启动了5个读线程和2个写线程,运行结果如下:
Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read...
Thread-4 waiting for read...
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read...
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read...
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-5 waiting for write...
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write...
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read...
Thread-3 reads data: AAAAAAAAAA
......
可以看到,每次读/写都是完整的原子操作,因为我们每次写入的都是10个相同字符。并且,每次读出的都是最近一次写入的内容。
如果去掉ReadWriteLock:
view sourceprint?
01.public class DataHandler {
02.
03. // store data:
04. private char[] buffer = "AAAAAAAAAA".toCharArray();
05.
06. public char[] read(String name) throws InterruptedException {
07. char[] data = doRead();
08. System.out.println(name + " reads data: " + new String(data));
09. return data;
10. }
11. public void write(String name, char[] data) throws InterruptedException {
12. System.out.println(name + " wrote data: " + new String(data));
13. doWrite(data);
14. }
15.
16. private char[] doRead() {
17. char[] ret = new char[10];
18. for(int i=0; i<10; i++) {
19. ret[i] = buffer[i];
20. sleep(3);
21. }
22. return ret;
23. }
24. private void doWrite(char[] data) {
25. for(int i=0; i<10; i++) {
26. buffer[i] = data[i];
27. sleep(10);
28. }
29. }
30. private void sleep(int ms) {
31. try {
32. Thread.sleep(ms);
33. }
34. catch(InterruptedException ie) {}
35. }
36.}
运行结果如下:
Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE
从最后4行可以看到在Thread-6写入EEEEEEEEEE的过程中,3个线程读取的内容是不同的。
思考
java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现自己的逻辑锁,就必须仔细设计ReadWriteLock。
Q: lock.readLock()为什么不放入try { }内?
A: 因为readLock()会抛出InterruptedException,导致readingThreads++不执行,而readUnlock()在 finally { }中,导致readingThreads--执行,从而使readingThread状态出错。writeLock()也是类似的。
Q: preferWrite有用吗?
A: 如果去掉preferWrite,线程安全不受影响。但是,如果读取线程很多,上一个线程还没有读取完,下一个线程又开始读了,就导致写入线程长时间无法获得writeLock;如果写入线程等待的很多,一个接一个写,也会导致读取线程长时间无法获得readLock。preferWrite的作用是让读 /写交替执行,避免由于读线程繁忙导致写无法进行和由于写线程繁忙导致读无法进行。
Q: notifyAll()换成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一个线程刚读取完毕,此时preferWrite=true,再notify(),若恰好唤醒的是一个读线程,则while(writingThreads>0 || (preferWrite && waitingThreads>0))可能为true导致该读线程继续等待,而等待写入的线程也处于wait()中,结果所有线程都处于 wait ()状态,谁也无法唤醒谁。因此,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:
Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read...
Thread-1 waiting for read...
Thread-3 waiting for read...
Thread-0 waiting for read...
Thread-4 waiting for read...
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read...
(运行到此不动了)
注意到这种死锁是由于所有线程都在等待别的线程唤醒自己,结果都无法醒过来。这和两个线程希望获得对方已有的锁造成死锁不同。因此多线程设计的难度远远高于单线程应用。
从JDK 5开始,java.util.concurrent包就已经包含了ReadWriteLock,使用更简单,无需我们自行实现上述代码。但是,理解ReadWriteLock的原理仍非常重要。
地址:http://www.liaoxuefeng.com/it-d225a33ad6e947cea997cc02b1826e7f-1
Java多线程设计模式
线程的创建和启动
Java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。
Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法:
view sourceprint?
1.Thread t = new Thread();
2.t.start();
start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不做就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。
因此,有两个方法可以实现自己的线程:
方法1:自己的类extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:
view sourceprint?
1.public class MyThread extends Thread {
2. public run() {
3. System.out.println("MyThread.run()");
4. }
5.}
在合适的地方启动线程:new MyThread().start();
方法2:如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口:
view sourceprint?
1.public class MyThread extends OtherClass implements Runnable {
2. public run() {
3. System.out.println("MyThread.run()");
4. }
5.}
为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:
view sourceprint?
1.MyThread myt = new MyThread();
2.Thread t = new Thread(myt);
3.t.start();
事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:
view sourceprint?
1.public void run() {
2. if (target != null) {
3. target.run();
4. }
5.}
线程还有一些Name, ThreadGroup, isDaemon等设置,由于和线程设计模式关联很少,这里就不多说了。
线程的同步
由于同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会造成冲突,因此,线程必须实现共享互斥,使多线程同步。
最简单的同步是将一个方法标记为synchronized,对同一个实例来说,任一时刻只能有一个synchronized方法在执行。当一个方法正在执行某个synchronized方法时,其他线程如果想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。
但是,非synchronized方法不受影响,不管当前有没有执行synchronized方法,非synchronized方法都可以被多个线程同时执行。
此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不同实例的synchronized方法是可以并发的。例如,class A定义了synchronized方法sync(),则不同实例a1.sync()和a2.sync()可以同时由两个线程来执行。
Java锁机制
多线程同步的实现最终依赖锁机制。我们可以想象某一共享资源是一间屋子,每个人都是一个线程。当A希望进入房间时,他必须获得门锁,一旦A获得门锁,他进去后就立刻将门锁上,于是B,C,D...就不得不在门外等待,直到A释放锁出来后,B,C,D...中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,可以先到先得,也可以随机挑选),然后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。
Java语言规范内置了对多线程的支持。对于Java程序来说,每一个对象实例都有一把“锁”,一旦某个线程获得了该锁,别的线程如果希望获得该锁,只能等待这个线程释放锁之后。获得锁的方法只有一个,就是synchronized关键字。例如:
view sourceprint?
1.public class SharedResource {
2. private int count = 0;
3.
4. public int getCount() { return count; }
5.
6. public synchronized void setCount(int count) { this.count = count; }
7.}
注意,如果将synchronized关键字标记在方法上,例如上面的:
view sourceprint?
1.public synchronized void setCount(int count) { ... }
那么,锁住的是哪个对象呢?答案是this对象,因此,以上方法事实上完全等同于下面的写法:
view sourceprint?
1.public void setCount(int count) {
2. synchronized(this) { // 在此获得this锁
3. this.count = count;
4. } // 在此释放this锁
5.}
synchronized {}括号内的部分表示需要同步的代码段,该区域为“危险区域”,如果两个以上的线程同时执行,会引发冲突,因此,要更改SharedResource的内部状态,必须先获得SharedResource实例的锁。
退出synchronized块时,线程拥有的锁自动释放,于是,别的线程又可以获取该锁了。
为了提高性能,不一定要锁定this,例如,SharedResource有两个独立变化的变量:
view sourceprint?
1.public class SharedResouce {
2. private int a = 0;
3. private int b = 0;
4.
5. public synchronized void setA(int a) { this.a = a; }
6. public synchronized void setB(int b) { this.b = b; }
7.}
若同步整个方法,则setA()的时候无法setB(),setB()时无法setA()。为了提高性能,可以使用不同对象的锁:
view sourceprint?
01.public class SharedResouce {
02. private int a = 0;
03. private int b = 0;
04. private Object sync_a = new Object();
05. private Object sync_b = new Object();
06.
07. public void setA(int a) {
08. synchronized(sync_a) {
09. this.a = a;
10. }
11. }
12.
13. public synchronized void setB(int b) {
14. synchronized(sync_b) {
15. this.b = b;
16. }
17. }
18.}
如果将synchronized关键字标记在静态方法上,由于静态方法不可能访问this实例,那么,锁住的是哪个对象呢?答案是当前类的Class对象,原因是每个对象的Class实例是唯一且不可变的。比如:
view sourceprint?
1.public synchronized static void sync() { ... }
事实上完全等同于下面的写法:
view sourceprint?
1.public static void sync() {
2. synchronized(SharedResource.class) {
3. ...
4. }
5.}
wait/notify机制
通常,多线程之间需要协调工作。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。如果图片还没有下载完,displayThread可以暂停,当downloadThread完成了任务后,再通知displayThread“图片准备完毕,可以显示了”,这时,displayThread继续执行。
以上逻辑简单的说就是:如果条件不满足,则等待。当条件满足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait/notify。等待机制与锁机制是密切关联的。例如:
view sourceprint?
1.synchronized(obj) {
2. while(!condition) {
3. obj.wait();
4. }
5. obj.doSomething();
6.}
当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait()。
在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A:
view sourceprint?
1.synchronized(obj) {
2. condition = true;
3. obj.notify();
4.}
需要注意的概念是:
# 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) {...} 代码段内。
# 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj) {...} 代码段内唤醒A。
# 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。
# 如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。
# obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。
# 当B调用obj.notify/notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。
wait/sleep的区别
Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不同点是:sleep并不释放锁,并且sleep的暂停和wait暂停是不一样的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。
但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。
如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在 wait/sleep/join,则线程B会立刻抛出InterruptedException,在catch() {} 中直接return即可安全地结束线程。
需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用 interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到 wait()/sleep()/join()后,就会立刻抛出InterruptedException。
Worker Pattern
前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。
然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。
每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。
Worker Pattern实现了类似线程池的功能。首先定义Task接口:
view sourceprint?
1.public interface Task {
2. void execute();
3.}
线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。
具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask:
view sourceprint?
01.// CalculateTask.java
02.public class CalculateTask implements Task {
03. private static int count = 0;
04. private int num = count;
05. public CalculateTask() {
06. count++;
07. }
08. public void execute() {
09. System.out.println("[CalculateTask " + num + "] start...");
10. try {
11. Thread.sleep(3000);
12. }
13. catch(InterruptedException ie) {}
14. System.out.println("[CalculateTask " + num + "] done.");
15. }
16.}
17.
18.// TimerTask.java
19.public class TimerTask implements Task {
20. private static int count = 0;
21. private int num = count;
22. public TimerTask() {
23. count++;
24. }
25. public void execute() {
26. System.out.println("[TimerTask " + num + "] start...");
27. try {
28. Thread.sleep(2000);
29. }
30. catch(InterruptedException ie) {}
31. System.out.println("[TimerTask " + num + "] done.");
32. }
33.}
以上任务均简单的sleep若干秒。
TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:
view sourceprint?
01.import java.util.*;
02.
03.public class TaskQueue {
04. private List queue = new LinkedList();
05. public synchronized Task getTask() {
06. while(queue.size()==0) {
07. try {
08. this.wait();
09. }
10. catch(InterruptedException ie) {
11. return null;
12. }
13. }
14. return (Task)queue.remove(0);
15. }
16. public synchronized void putTask(Task task) {
17. queue.add(task);
18. this.notifyAll();
19. }
20.}
终于到了真正的WorkerThread,这是真正执行任务的服务器线程:
view sourceprint?
01.public class WorkerThread extends Thread {
02. private static int count = 0;
03. private boolean busy = false;
04. private boolean stop = false;
05. private TaskQueue queue;
06. public WorkerThread(ThreadGroup group, TaskQueue queue) {
07. super(group, "worker-" + count);
08. count++;
09. this.queue = queue;
10. }
11. public void shutdown() {
12. stop = true;
13. this.interrupt();
14. try {
15. this.join();
16. }
17. catch(InterruptedException ie) {}
18. }
19. public boolean isIdle() {
20. return !busy;
21. }
22. public void run() {
23. System.out.println(getName() + " start.");
24. while(!stop) {
25. Task task = queue.getTask();
26. if(task!=null) {
27. busy = true;
28. task.execute();
29. busy = false;
30. }
31. }
32. System.out.println(getName() + " end.");
33. }
34.}
前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。
最后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数:
view sourceprint?
01.import java.util.*;
02.
03.public class ThreadPool extends ThreadGroup {
04. private List threads = new LinkedList();
05. private TaskQueue queue;
06. public ThreadPool(TaskQueue queue) {
07. super("Thread-Pool");
08. this.queue = queue;
09. }
10. public synchronized void addWorkerThread() {
11. Thread t = new WorkerThread(this, queue);
12. threads.add(t);
13. t.start();
14. }
15. public synchronized void removeWorkerThread() {
16. if(threads.size()>0) {
17. WorkerThread t = (WorkerThread)threads.remove(0);
18. t.shutdown();
19. }
20. }
21. public synchronized void currentStatus() {
22. System.out.println("-----------------------------------------------");
23. System.out.println("Thread count = " + threads.size());
24. Iterator it = threads.iterator();
25. while(it.hasNext()) {
26. WorkerThread t = (WorkerThread)it.next();
27. System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
28. }
29. System.out.println("-----------------------------------------------");
30. }
31.}
currentStatus()方法是为了方便调试,打印出所有线程的当前状态。
最后,Main负责完成main()方法:
view sourceprint?
01.public class Main {
02. public static void main(String[] args) {
03. TaskQueue queue = new TaskQueue();
04. ThreadPool pool = new ThreadPool(queue);
05. for(int i=0; i<10; i++) {
06. queue.putTask(new CalculateTask());
07. queue.putTask(new TimerTask());
08. }
09. pool.addWorkerThread();
10. pool.addWorkerThread();
11. doSleep(8000);
12. pool.currentStatus();
13. pool.addWorkerThread();
14. pool.addWorkerThread();
15. pool.addWorkerThread();
16. pool.addWorkerThread();
17. pool.addWorkerThread();
18. doSleep(5000);
19. pool.currentStatus();
20. }
21. private static void doSleep(long ms) {
22. try {
23. Thread.sleep(ms);
24. }
25. catch(InterruptedException ie) {}
26. }
27.}
main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:
worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.
仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,7个线程中的两个状态变成idle,说明处于wait()状态。
思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。
如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,最终会导致服务器内存耗尽。因此,可以限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE
从JDK 5开始,java.util.concurrent包已经内置了Worker线程模式(即java.util.concurrent.Executors),无需我们手动编写上述代码。不过,理解Worker模式的原理非常重要。
ReadWriteLock模式
多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。简单说,就是要避免“写-写”冲突和“读-写”冲突。但是同时读是允许的,因为“读-读”不冲突,而且很安全。
要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明:
DataHandler对象保存了一个可读写的char[]数组:
view sourceprint?
01.public class DataHandler {
02. // store data:
03. private char[] buffer = "AAAAAAAAAA".toCharArray();
04.
05. private char[] doRead() {
06. char[] ret = new char[buffer.length];
07. for(int i=0; i<buffer.length; i++) {
08. ret[i] = buffer[i];
09. sleep(3);
10. }
11. return ret;
12. }
13.
14. private void doWrite(char[] data) {
15. if(data!=null) {
16. buffer = new char[data.length];
17. for(int i=0; i<buffer.length; i++) {
18. buffer[i] = data[i];
19. sleep(10);
20. }
21. }
22. }
23.
24. private void sleep(int ms) {
25. try {
26. Thread.sleep(ms);
27. }
28. catch(InterruptedException ie) {}
29. }
30.}
doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合通常的情况。
为了让多线程能安全读写,我们设计了一个ReadWriteLock:
view sourceprint?
01.public class ReadWriteLock {
02. private int readingThreads = 0;
03. private int writingThreads = 0;
04. private int waitingThreads = 0; // waiting for write
05. private boolean preferWrite = true;
06.
07. public synchronized void readLock() throws InterruptedException {
08. while(writingThreads>0 || (preferWrite && waitingThreads>0))
09. this.wait();
10. readingThreads++;
11. }
12.
13. public synchronized void readUnlock() {
14. readingThreads--;
15. preferWrite = true;
16. notifyAll();
17. }
18.
19. public synchronized void writeLock() throws InterruptedException {
20. waitingThreads++;
21. try {
22. while(readingThreads>0 || writingThreads>0)
23. this.wait();
24. }
25. finally {
26. waitingThreads--;
27. }
28. writingThreads++;
29. }
30.
31. public synchronized void writeUnlock() {
32. writingThreads--;
33. preferWrite = false;
34. notifyAll();
35. }
36.}
readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock:
view sourceprint?
01.public class DataHandler {
02. // store data:
03. private char[] buffer = "AAAAAAAAAA".toCharArray();
04. // lock:
05. private ReadWriteLock lock = new ReadWriteLock();
06.
07. public char[] read(String name) throws InterruptedException {
08. System.out.println(name + " waiting for read...");
09. lock.readLock();
10. try {
11. char[] data = doRead();
12. System.out.println(name + " reads data: " + new String(data));
13. return data;
14. }
15. finally {
16. lock.readUnlock();
17. }
18. }
19.
20. public void write(String name, char[] data) throws InterruptedException {
21. System.out.println(name + " waiting for write...");
22. lock.writeLock();
23. try {
24. System.out.println(name + " wrote data: " + new String(data));
25. doWrite(data);
26. }
27. finally {
28. lock.writeUnlock();
29. }
30. }
31.
32. private char[] doRead() {
33. char[] ret = new char[buffer.length];
34. for(int i=0; i<buffer.length; i++) {
35. ret[i] = buffer[i];
36. sleep(3);
37. }
38. return ret;
39. }
40. private void doWrite(char[] data) {
41. if(data!=null) {
42. buffer = new char[data.length];
43. for(int i=0; i<buffer.length; i++) {
44. buffer[i] = data[i];
45. sleep(10);
46. }
47. }
48. }
49. private void sleep(int ms) {
50. try {
51. Thread.sleep(ms);
52. }
53. catch(InterruptedException ie) {}
54. }
55.}
public方法read()和write()完全封装了底层的ReadWriteLock,因此,多线程可以安全地调用这两个方法:
view sourceprint?
01.// ReadingThread不断读取数据:
02.public class ReadingThread extends Thread {
03. private DataHandler handler;
04. public ReadingThread(DataHandler handler) {
05. this.handler = handler;
06. }
07. public void run() {
08. for(;;) {
09. try {
10. char[] data = handler.read(getName());
11. Thread.sleep((long)(Math.random()*1000+100));
12. }
13. catch(InterruptedException ie) {
14. break;
15. }
16. }
17. }
18.}
19.
20.// WritingThread不断写入数据,每次写入的都是10个相同的字符:
21.public class WritingThread extends Thread {
22. private DataHandler handler;
23. public WritingThread(DataHandler handler) {
24. this.handler = handler;
25. }
26. public void run() {
27. char[] data = new char[10];
28. for(;;) {
29. try {
30. fill(data);
31. handler.write(getName(), data);
32. Thread.sleep((long)(Math.random()*1000+100));
33. }
34. catch(InterruptedException ie) {
35. break;
36. }
37. }
38. }
39. // 产生一个A-Z随机字符,填入char[10]:
40. private void fill(char[] data) {
41. char c = (char)(Math.random()*26+'A');
42. for(int i=0; i<data.length; i++)
43. data[i] = c;
44. }
45.}
最后Main负责启动这些线程:
view sourceprint?
01.public class Main {
02. public static void main(String[] args) {
03. DataHandler handler = new DataHandler();
04. Thread[] ts = new Thread[] {
05. new ReadingThread(handler),
06. new ReadingThread(handler),
07. new ReadingThread(handler),
08. new ReadingThread(handler),
09. new ReadingThread(handler),
10. new WritingThread(handler),
11. new WritingThread(handler)
12. };
13. for(int i=0; i<ts.length; i++) {
14. ts[i].start();
15. }
16. }
17.}
我们启动了5个读线程和2个写线程,运行结果如下:
Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read...
Thread-4 waiting for read...
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read...
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read...
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-5 waiting for write...
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write...
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read...
Thread-3 reads data: AAAAAAAAAA
......
可以看到,每次读/写都是完整的原子操作,因为我们每次写入的都是10个相同字符。并且,每次读出的都是最近一次写入的内容。
如果去掉ReadWriteLock:
view sourceprint?
01.public class DataHandler {
02.
03. // store data:
04. private char[] buffer = "AAAAAAAAAA".toCharArray();
05.
06. public char[] read(String name) throws InterruptedException {
07. char[] data = doRead();
08. System.out.println(name + " reads data: " + new String(data));
09. return data;
10. }
11. public void write(String name, char[] data) throws InterruptedException {
12. System.out.println(name + " wrote data: " + new String(data));
13. doWrite(data);
14. }
15.
16. private char[] doRead() {
17. char[] ret = new char[10];
18. for(int i=0; i<10; i++) {
19. ret[i] = buffer[i];
20. sleep(3);
21. }
22. return ret;
23. }
24. private void doWrite(char[] data) {
25. for(int i=0; i<10; i++) {
26. buffer[i] = data[i];
27. sleep(10);
28. }
29. }
30. private void sleep(int ms) {
31. try {
32. Thread.sleep(ms);
33. }
34. catch(InterruptedException ie) {}
35. }
36.}
运行结果如下:
Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE
从最后4行可以看到在Thread-6写入EEEEEEEEEE的过程中,3个线程读取的内容是不同的。
思考
java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现自己的逻辑锁,就必须仔细设计ReadWriteLock。
Q: lock.readLock()为什么不放入try { }内?
A: 因为readLock()会抛出InterruptedException,导致readingThreads++不执行,而readUnlock()在 finally { }中,导致readingThreads--执行,从而使readingThread状态出错。writeLock()也是类似的。
Q: preferWrite有用吗?
A: 如果去掉preferWrite,线程安全不受影响。但是,如果读取线程很多,上一个线程还没有读取完,下一个线程又开始读了,就导致写入线程长时间无法获得writeLock;如果写入线程等待的很多,一个接一个写,也会导致读取线程长时间无法获得readLock。preferWrite的作用是让读 /写交替执行,避免由于读线程繁忙导致写无法进行和由于写线程繁忙导致读无法进行。
Q: notifyAll()换成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一个线程刚读取完毕,此时preferWrite=true,再notify(),若恰好唤醒的是一个读线程,则while(writingThreads>0 || (preferWrite && waitingThreads>0))可能为true导致该读线程继续等待,而等待写入的线程也处于wait()中,结果所有线程都处于 wait ()状态,谁也无法唤醒谁。因此,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:
Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read...
Thread-1 waiting for read...
Thread-3 waiting for read...
Thread-0 waiting for read...
Thread-4 waiting for read...
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read...
(运行到此不动了)
注意到这种死锁是由于所有线程都在等待别的线程唤醒自己,结果都无法醒过来。这和两个线程希望获得对方已有的锁造成死锁不同。因此多线程设计的难度远远高于单线程应用。
从JDK 5开始,java.util.concurrent包就已经包含了ReadWriteLock,使用更简单,无需我们自行实现上述代码。但是,理解ReadWriteLock的原理仍非常重要。
发表评论
-
python学习摘要
2011-04-18 15:27 1409学习一门脚本语言是很 ... -
tmux快捷键
2011-04-16 07:39 1558摘要 http://rainbird.blog.51cto.c ... -
eclipse subclipse javahl 库加载错误
2011-04-13 17:31 2387网上搜集 http://islandlinux.org/how ... -
使用Msmtp mutt shell发邮件 (转)
2010-11-26 09:32 2842原文地址:http://fdsazi.blog.51cto.c ... -
Ubuntu10.0.4 Maven环境变量设置
2010-08-15 14:02 34711. 下载并解压缩apache-maven-2.2.1-bin ... -
Ubuntu10.0.4 Java环境变量设置
2010-08-15 14:00 4069Ubuntu10.0.4 下手工安装jdk及其环境变量设置 ... -
Managing Hierarchical Data in MySQL(转)
2010-07-09 10:01 1186http://dev.mysql.com/tech-resou ... -
JS的encode跟decode
2010-05-21 16:03 10639网上看到的,感觉能用得到,收藏下吧 /** * * URL ... -
Apache ActiveMQ
2009-12-25 15:46 3434一、特点 支持各种语言和协议的客户端。如:Java、C、C++ ... -
mysql数据的备份恢复命令
2009-12-14 15:37 1323记录下命令,害怕忘记! 导出整个数据库命令: D:\mys ... -
Struts2.18 的 interceptor
2009-12-08 08:20 3036首先定义我们自己的Interceptor package ... -
修改非安装版本mysql字符集
2009-12-08 08:08 1918如果我们的msyql是免安装版本,在windows系统下,my ... -
jqGrid学习 --------------自定义搜索
2009-12-06 15:45 15315定义自己的查询 <div id="myse ... -
jqGrid学习 -------------- 搜索工具栏
2009-12-06 15:13 13096搜索工具栏只是在列标题下面构造一个输入框。且使用表格的url进 ... -
jqGrid学习 -------------- 搜索
2009-12-06 13:32 9419表格中所有的列都可以作为搜索条件。 所用到的语言包文件 $ ... -
jqGrid学习 -------------- 自定义格式化类型
2009-12-06 13:04 8675<script> jQuery(" ... -
jqGrid学习 -------------- 格式化
2009-12-06 11:29 12479jqGrid的格式化是定义在 ... -
jqGrid学习 -------------- 自定义按钮
2009-12-06 11:14 16086用法: <script> ... jQue ... -
jqGrid学习 -------------- 翻页(2)
2009-12-06 10:32 8054jqGrid的翻页导航是一个方法,你可以事先定义一些其他操作, ... -
jqGrid学习 -------------- 翻页
2009-12-05 21:45 5376jqGrid的翻页要定义在html里,通常是在grid的下面, ...
相关推荐
Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式...
Java多线程设计模式是Java开发中的重要领域,它涉及到如何在并发环境下高效、安全地管理资源和控制程序执行流程。本资料集包含了清晰完整的PDF版书籍和源代码,为学习和理解Java多线程设计模式提供了丰富的素材。 ...
标题和描述均指向了一个关于Java多线程设计模式的PDF文档的下载链接,这暗示了文档的主要内容将围绕Java中的多线程编程及其设计模式展开。在Java领域,多线程是一个核心概念,它允许程序执行多个任务同时进行,极大...
书中包含Java线程的介绍导读、12个重要的线程设计模式和全书总结以及丰富的附录内容。第一章相关线程设计模式的介绍,都举一反三使读者学习更有效。最后附上练习问题,让读者可以温故而知新,能快速地吸收书中的...
标题“java多线程设计模式详解.pdf”中提到的知识点是关于Java多线程编程中设计模式的应用。Java多线程是并发编程的重要组成部分,设计模式则是软件工程中用于解决特定问题的最佳实践。将两者结合起来,意味着此文件...
Java多线程设计模式是Java开发中的重要领域,它涉及到如何高效、安全地利用系统资源进行并发处理。在这个主题中,我们将深入探讨单线程、生产者与消费者模型以及Java中实现多线程的各种方法。 首先,单线程是程序...
### Java多线程设计模式详解 #### 一、Java多线程基础 Java语言自诞生以来,便内置了对多线程的支持。多线程可以让应用程序在同一时间处理多个任务,提高程序的执行效率和响应速度。Java中创建线程有两种基本方式...
本书《JAVA多线程设计模式》针对Java语言的多线程编程进行深入讲解,采用易于理解的方式介绍了与Java线程相关的多个设计模式,并通过实例程序与UML图示辅助阐述。书中的关键代码片段都有标注,易于读者理解与学习,...
java多线程设计模式,作者是:结城 浩,由 博硕文化 译。2005年4月,由中国铁道出版社出版。内附带部分源代码。
JAVA多线程设计模式_中国铁道出版社 本书浅显易懂的介绍了JAVA线程相关的设计模式,通过程序范例和UML图示来一一解说,书中代码的重要部分加了标注以使读者更加容易理解,再加上图文并茂,对于初学者还是程序设计...
Java 多线程 设计模式
Java多线程设计模式是Java并发编程中的一种高级技巧,它可以帮助开发者在处理并发问题时,提高代码的可读性、可维护性和性能。多线程设计模式是基于Java的并发API,如Thread、Runnable、ExecutorService等,通过特定...