`

Java多线程设计模式

    博客分类:
  • JAVA
 
阅读更多

Introduction 1  Java语言的线程

开始执行Java程序后,至少会有一个线程开始操作,有操作的是被称为主线程的线程,主线程执行输入类的main()。当main()里的所有处理均结束后,则主线程也同时结束。

当应用程序的规模大到一定程度,程序里的多线程会以每种形式存在。以下是几个常见的范例:
1)  GUI应用程序。
2)  比较花费时间的I/O处理。
3)  多个客户端。

如欲启动线程时,有下列两种方法:
1)  利用Thread类的子类的实例,启动线程。
2)  利用Runnable接口的实现类的实例,启动线程。

要注意的是,“Thread的实例”和“线程本身”是两个不同的部分。即使建立了Thread的实例,也还没有启动线程,而且就算线程已经结束,Thread实例也不会就这样消失。

建立一个实现Runnable接口的类,将该类的实例传给Thread的构造函数,调用start()...,这就是利用Runnable接口来启动线程的方法。

记住:无论哪种方式,启动新线程的方法永远是Thread类的start()。

利用Thread类的sleep()可以暂停线程的执行。执行下面的语句时,即可让当前的线程(执行此语句的线程)暂时停止越1000ms。
    Thread.sleep(1000);

当实例方法加上关键字synchronized声明之后,就可以只让一个线程操作某类实例的这类方法。“让一个线程操作”并不是说只能让某一个特定的线程操作而已,而是指一次只能让一个线程执行。这种方法称为synchronized方法(同步方法)。

synchronized实例方法是使用this锁定去做线程的共享互斥。synchronized类方法是使用该类的类对象的锁定去做线程的共享互斥。

线程的协调(这三种方法执行的前提是执行线程手中有obj的锁):
1)  obj.wait()是把现在的线程放到obj的wait set;
2)  obj.notify()是从obj的wait set里唤醒一个线程;
3)  obj.notifyAll()是唤醒所有在obj的wait set里的线程。
被唤醒的线程只是处于等锁状态(但已经不再wait set中了),当时的obj的锁还掌握在执行notify()或notifyAll()的线程手上。
当唤醒的线程得到锁之后,将从wait()之后继续执行。

Introduction 2  多线程程序的评量标准

安全性--不损坏对象。
生存性--进行必要的处理。(liveness)
复用性--可再利用类。
性  能--能快速、大量进行处理。

第1章  Single Threaded Execution - 能通过这座桥的,只有一个人

Single Threaded Execution是指“以一个线程执行”的意思。就象细独木桥只能允许一个人通过一样,这个模式用来限制只让一个线程运行。

在 Single Threaded Execution Pattern中,我们将unsafeMethod加以防卫,限制同时只能有一个线程可以调用它 (加上synchronized)。这个必须让单线程执行的程序范围,我们称为“临界区”(critical section)。

使用Single Threaded Execution Pattern时,可能会有发生死锁(deadlock)的危险。

当SharedResource的字段开放给子类访问时,可能会因为子类写出unsafeMethod而导致丧失安全性。

synchronized方法和synchronized块,无论碰到return或是异常,都会确实解除锁定。

结论,Java语言规范中:
1)  基本类型、引用类型的指定、引用是原子的操作。
2)  但是long和double的指定、引用是可以分割的。
3)  要在线程间共享long或double的字段时,必须在synchronized中操作,或是声明成volatile。

第2章 Immutable - 想破坏它也没办法

类被声明为final的,属性都是final和private的。属性只能通过构造方法来设置,没有修改属性值的方法。

Thread.currentThread().getName()是用来得到自己这个线程(所对应的java.lang.Thread类实例)的名称。

当字符串和类实例表达式以+运算符连接的时候,会自动调用实例表达式的toString()方法(这是Java的规定)。

类图中,类的字段名称中加上{frozen}的限制,这是UML中表现“建立实例并初始化字段后就不能再次修改其值”的方式。
类图中,类的方法名称中加上{concurrent}的限制,这是UML中表现“多个线程同时执行也无妨”的方式(指Java中不需要加上synchronized的方法)。

当类声明为final中,这个类无法被继承。

当实例方法被声明为final中,这个方法无法被子类所覆盖(override)。
当类方法被声明为final中,这个方法无法被子类所隐藏(hide)。

final的字段的值只能指定一次:
1)  对于final的实例字段,要么在声明字段时就直接赋初值,要么在构造方法中将值赋给字段。
2)  对于final的类字段,要么在声明字段时就直接赋初值,要么在static块(静态初始化子)中将值赋给字段。

final的局部变量的值(在函数内)只能够指定一次。
final参数的值(在函数内)则一次都不能指定,因为当方法被调用时,已经有值指定进去了。

第3章  Guarded Suspension - 要等到我准备好喔

对象图或交互图中,如果类的边框用粗线,则代表该对象和线程有相关性,意即该对象可以主动地调用方法(这是UML规定的表示法)。被称为主动对象。

第4章  Balking - 不需要的话,就算了吧

Thread类(以及其子类)的实例,一旦调用start()后,就会变成“结束start”状态。
如果start()被再次调用时,就会进行balk让线程的启动不会再次执行,并且抛出IllegalThreadStateException。
也就是说,Thread的start()就是本章所说的“不能执行两次以上”,即start()使用了Balking模式。

第5章  Producer-Consumer - 我来做,你来用

上面所说的思维若整理成口诀,可以得到这样两句:
1)  线程的合作要想“放在中间的东西”。
2)  线程的互斥要想“应该保护的东西”。

如果是多个Producer和一个Consumer的话,只有Comsumer线程会访问的范围,就不需要考虑共享互斥了,这样可以提升程序的性能。

习惯编写Java多线程代码后,就会习惯去注意方法的后面有没有throw InterruptedException。如果方法有这个,通常告诉我们下面两件事:
1)  这是“需要花点时间”的方法。
2)  这是“可以取消”的方法。

Java的标准库中,后面接着throw InterruptedException的方法的有这三个:
1)  java.lang.Object的wait()。
2)  java.lang.Thread的sleep()。
3)  java.lang.Thread的join()。

当sleep中的线程被调用interrupt()时,就会放弃暂停的状态,(sleep线程)并抛出InterruptedException异常。
当对wait中的线程调用interrupt()时,(wait线程)会先重新获得锁定,在抛出InterruptedException异常。
当join(等待其他线程结束)的线程被调用interrupt()时,就会放弃等待状态,(join线程)并抛出InterruptedException异常。

notify()和interrupt()对wait中的线程调用时意义有点相近,但是仍有差异:
1)  notify/notifyAll 是java.lang.Object的方法,是该实例的wait set调用的。而不是对线程直接调用。notify/notifyAll所唤醒的线程, 会前进到wait()的下一条语句。另外,执行notify/notifyAll方法需要获得类的实例。
2)  interrupt是 java.lang.Thread的方法,是对该线程直接调用的,当被interrupt的线程正在sleep、wait或join时,会抛出 InterruptException异常。执行interrupt(取消其他线程),不需要获取该线程的锁定。

interrupt()方法只会改变被interrupt的线程的中断状态(interrupt status)而已。
线程在执行sleep、wait、join时,是在这些方法内不断检查interrupt status的值,如果true了,则自己抛出InterruptedException。
如果在线程还没有执行sleep、wait、join前,就去interrupt它,并不会影响它的正常工作,只有它执行到sleep、wait、join时,才马上抛出InterruptedException。

Thread类的实例方法isInterrupt()可以用来检查指定线程的interrupt status。
Thread类的  类方法interrupted()会检查当前线程的interrupt status并清楚之。

第6章  Read-Wirte Lock - 大家想看就看吧,不过看的时候不能写喔

Java语言中,使用finally可以避免忘记解锁。

Berfore/After Patern的结构:
  before();
  try {
    execute();
  } finally {
    after();
  }
在此,before()在try之外,表示“如果在before()的执行过程中发生异常,就不执行execute()和after()”。

第7章 Thread-Per-Message - 这个工作交给你了

一般而言,我们可以说一个进程里面可以建立多条的线程。

进程和线程的最大的差异在于内存能否共享。
因为线程间的内存是共享的,所以线程之间的沟通可以使用很自然、简单的方式做到。而因为同一个实例可由多个线程同时访问,所以需要正确地进行共享互斥。

切换执行中的线程时,线程和进程一样,需要进行context-switch的操作。然而,线程所管理的context信息比进程要来得少,一般而言线程的context-switch操作要比进程快得多。

第8章  Worker Thread - 等到工作来,来了就工作

invocation与execution分离的用处:
1)  提 高响应性:如果invocation与execution无法分离,当execution很花时间时,invocation的操作将被牵连。如果将 invocation与execution事先分离,即使execution花时间,invocation也可以继续自己前进。这样能提高程序的响应性。
2)  控 制实行顺序:如果invocation与execution无法分离,一旦invoke出来,就必须直接把它execute完。但,如果 invocation与execution分离,execute的顺序就可以于invoke的次序无关。也就是说,我们可以对Request设立优先级, 控制Channel传递Request给Worker的顺序。
3)  可取消和可重复执行:若能分离invocation与execution,就有办法做到“虽然invocation了,但将execution取消”的功能。同样,如果把Request保存下来,就可以做到重复execute。
4)  分散处理的第一步:因为invocation与execution分离了,所以invoke与execute的操作也容易拆开在两台计算机上执行。相当于Request的对象,可通过网络传送到另一台计算机。

Runnable对象,可以作为方法的自变量传递、堆到队列里、通过网络传递、甚至存进文件中。而这样的一个Runnable对象可以经过多次传来传去,最后传到某台计算机的某条线程上,才真正交付执行。
这时,Runnable接口就可以看作是GoF的Command Pattern中的Command。

当Swing组建一旦被实现,可能改变组件状态的程序代码、依赖于状态的程序代码,都必须交给Event-dispatching thread执行。

第9章  Future - 先给您这张提货单

第10章  Two-Phase Termination - 块把玩具收拾好,去睡觉吧

等待指定的线程结束时,要使用join()方法。另外,检查指定的线程现在是否结束了,可以使用java.lang.Thread的isAlive()方法。若返回值是true,该线程还活着,反之表示线程已经结束了。

java.lang.Runtime 的实例方法addShutdownHook()会在Java执行环境全部结束时(调用System.exit()方法或所有非Daemon线程都结束 时),调用指定Thread的start()方法(这时的Thread称为shutdown hook)。使用这个方法,可以编写整个程序的终止处理。

调用interrupt()方法后,可以中断掉线程。这里所说的中断掉线程,是指下面其中一种结果:
1)  线程变成“中断状态”对“状态”的反应。
2)  抛出“异常InterruptException”对“控制的反应”。
通常会是1)。只有线程在sleep、wait、join时会是2)(这个时候不会变成“中断状态”)。
然而,1)和2)是可以互相转换的。

中断状态-->InterruptedException异常的转换:
  if  (Thread.interrupted()) {
    throw new InterruptedException();
  }
  在花时间的处理前,先加上这个if语句,可提高程序对中断的响应性,可以避免不知道自己已经被中断,还开始进行花时间的操作。
  调用Thread.interrupted()方法后,当前线程就不是中断状态了,也就是说,只要调用一次Thread.interrpupted()方法后,中断状态就会被清除。
  如果不想清除中断状态,而要检查当前线程是否被中断,要使用inInterrupted()实例方法,使用方式如下:
  if  (Thread.currentThread().inInterrupted()) {
    // 若为中断状态时需要进行的处理(中断状态不会清除)
  }

InterruptedException异常-->转换为中断状态:
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
  这样可以将收到的InterruptedException,转换为中断状态的形式。
  
InterruptedException异常-->转换为InterruptedException异常:
  收到的InterruptedException异常,也可以不马上抛出去,而留下来晚点再抛:
  InterruptedException savedException = null;
  ...
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    savedException = e;
  }
  ...
  if (savedException != null) {
    throw savedException;
  }

第11章  Thread-Specific Storage - 每个线程的保管箱

Thread-Specific Storage Pattern是只有一个入口,但内部会对每个线程提供特有存储空间的Pattern。
如果使用Java标准链接库时,可以使用java.lang.ThreadLocal类加以实现。

java.lang.ThreadLocal的实例可以想像成一种集合结构或许会比较好理解。
ThreadLocal的set()方法,可以将参数所指定的实例,存放到调用set()的当前线程所对应的存储空间。
ThreadLocal的get()方法,可以将调用get()的当前线程所对应的存储空间中的对应的实例,取出返回(如果没有set过,则返回null)。

手边有一个假定单线程作为执行环境的对象。现在我们想将这个对象放在多线程环境下执行,又不想修改使用端的线程,也不能改变对象的接口。这时就使用Thread-Specfic Storage Pattern。
在 此将目的对象当作TSObject,并建立与TSObject具有相同接口的TSObjectProxy,另外,为了管理 “Client-->TSObject”的对照表,又加上TSObjectCollection。TSObjectProxy会使用 TSObjectCollection取得当前线程所对应的TSObject,而将工作委托给这个TSObject。

第12章  Active Objects - 接受异步消息的主动对象

在多线程程序设计中,千万要随时意识到每个方法是由哪个线程调用的。

与平常不同的是,在这里的方法是由Client以外的线程在执行的。也就是说,Active Object Pattern做到了“异步的方法”。也可以说是做到了“异步的消息”。

再看看ActiveObject包的属性,可以看到,通过ActiveObject包中的所有的参与者的相互协调,组合成一个“主动对象”,这个主动对象,具有:
1)  具有接口(API)-ActiveObject interface定义了API。
2)  可以获取异步消息-Proxy参与者会将方法调用转换成MethodRequest对象,存放在ActivationQueue里。
3)  与Client调用不同的线程-Scheduler提供线程。
4)  可以执行处理-Servant可以单线程执行处理。
5)  可以返回返回值-以Future作为返回值的提货单。
请回想本章前面的比喻,许多人互相协调,可以构成一个“法人”。Active Object Pattern则是协调许多对象,构成一个“主动对象”。

Active Object Pattern是否适合使用要考虑问题的处理量的大小。处理量太小不适合使用的原因,是因为Proxy建立ConcreteMethodRequest与ActivationQueue进行沟通,这些工作的需要花费的时间也不容忽视。

我们可以把Active Object Pattern看作是在以单线程为前提的Servant外面包上一层皮,使多线程的Client可以使用它。

总结 - 多线程程序设计的模式语言

Single Threaded Execution Pattern - 能通过这座桥的,只有一个人
别名:Critical Section,Critical Region
背景:多个线程共享一个实例。
问题:若多个线程都擅自更改实例的状态,实例会丧失安全性。
解决方式:
首先,仔细找出实例状态不稳定的范围(临界区域)。并对临界区域加以防护,使同时执行的线程保持在只有一条的情况。这样一来,就能够保护住实例的安全性。
实现:Java语言里,可使用synchronized来实现出临界区间。
相关:
当实例的状态不会改变时,为了提升性能,可使用Immutable Pattern。
想要将引用实例状态的线程和改变实例状态的线程拆开,以提高性能使,可使用Read-Write Lock Pattern。

Immutable Pattern - 想破坏它也没办法
背景:多个线程共享一个实例,实例的状态不会改变。
问题:使用Single Threaded Execution Pattern,会降低性能。
解决方式:
当实例建立后状态就不会变化时,就要停止使用Single Threaded Execution Pattern。
为了避免失误造成更改了实例的状态,故将类写成无法由线程更改。另外,删除实例里所有用来更新状态的方法(setter)。引用实例状态用的方法(getter)就无妨。
使用Immutable Pattern可提高性能,但是要持续保证不变性(Immutability)并不简单,记得在帮助文件中也要注明这是一个immutable的类。
实现:在Java中使用private来隐藏字段,此外,由于无法确保不可更改,因此还要使用final。
相关:
当多个线程进行共享互斥,可使用Single Threaded Execution Pattern。
当修改用的线程数量比用来读取的线程数量多时,可考虑使用Read-Write Lock Pattern。

Guarded Suspension Pattern - 要等到我准备好喔
别名:Spin Lock,Guarded Wait
背景:多个线程共享一个实例。
问题:若多个线程都擅自更改实例的状态,实例会丧失安全性。
解决方式:
当实例的状态不恰当时,就要求线程等待到适合的状态。首先,以“警戒条件”来表示实例的“适当的状态”。并且在进行有安全性疑惑的操作前,都要检查是否警戒条件满足。如果警戒条件不成立,就要求线程等待到成立为止。
使用Guarded Suspension Pattern,能以警戒条件限制方法的执行。不过,如果警戒条件一直不成立,线程会永远等待下去,会使程序丧失生命性。
实现:Java语言中,检验警戒条件时使用while语句,而要让线程等待时则使用wait()。并使用notify()/notifyAll()通知警戒条件的改变。检验、修改警戒条件时,会使用到Single Threaded Execution Pattern。
相关:
当警戒条件不成立时想要马上退出,就使用Balking Pattern。
Guarded Suspension Pattern中检验、更改警戒条件的部分,会用到Single Threaded Execution Pattern。

Balking Pattern - 不需要的话,就算了吧
背景:多个线程共享一个实例。
问题:若多个线程都擅自更改实例的状态,实例会丧失安全性。可以一直等待安全的时机,又会使程序响应性降低。
解决方式:
当实例的状态不适合时,就中断掉处理的进行。首先,以“警戒条件”来表示实例的“适当的状态”,并且在进行有安全性疑惑的操作前,都要检查是否满足警戒条件。只有在警戒条件成立时,才会继续执行;如果警戒条件不成立,就直接中断(Balk)执行,马上退出。
实现:Java语言中,检验警戒条件时要使用if 语句。当要balk时,可使用return退出方法。或使用throw抛出异常。检验、更改警戒条件时,会使用到Single Threaded Execution Pattern。
相关:
当想要等到警戒条件成立再执行时,可使用Guarded Suspension Pattern。
Balking Pattern中检验、更改警戒条件的部分,会使用到Single Threaded Execution Pattern。

Producer-Consumer Pattern - 我来做,你来用
背景:当要从某个线程(Producer)将数据传给其他线程(Consumer)时。
问题:当Producer与Consumer处理的速度不同时,速度慢的会扯速度快的后腿,而降低程序的性能。另外,当Producer要写入数据时,Consumer若同时读数据,数据会失去安全性。
解决方式:
在 Producer与Consumer之间,加上中继用的Channel,并让Channel存放多条数据。这样一来,就可以缓冲Producer和 Consumer之间处理速度的差异。另外,只要在在Channel里进行共享互斥,数据就不会丧失安全性。于是性能可以不降低,又可在多个线程间安全地 传送数据。
相关:
Channel安全传递数据的部分,使用了Guarded Suspension Pattern。
Future Pattern在传递返回值的时候,使用了Producer-Consumer Pattern。
Worker Pattern在传递请求时,使用了Producer-Consumer Pattern。

Read-Wirte Lock Pattern- 大家想看就看吧,不过看的时候不能写喔
别名:Reader/Writer Lock,Readers/Writers Lock
背景:多条线程共享一个实例,并会有参考实例状态的线程(Reader)与会改变实例状态的线程(Writer)。
问题:若线程之间不进行共享互斥,会丧失安全性。但使用Single Threaded Execution Pattern会是程序西工内能降低。
解决方式:
首 先,将“控制Reader的锁定”与“控制Writer的锁定”分开,加入ReaderWriteLock,以提供两种不同的锁定。 ReadWriteLock会对“Writer-Writer”、“Reader-Writer”进行互斥控制。但对“Reader-Reader”不进 行共享互斥。这样可以在不影响安全性的前提小提高性能。
实现:Java语言可以使用finally块避免忘记解除锁定。
相关:
Read-Write Lock Pattern中,ReadWriteLock进行共享互斥的地方,使用了Guarded Suspension Pattern。
完全没有Writer的时候,可以使用Immutable Pattern。

Thread-Per-Message Pattern - 这个工作交给你了
背景:线程(Client)要调用实例(Host)的方哪国发。
问题:在方法的属性处理完之前,控制权不会从Host退出,如果方法的处理很花时间,程序的响应性会降低。
解决方式:
在Host里,启动新的线程,并且将方法应该进行的工作,交给这个新线程。这样Client的线程就可以继续执行下一个操作了。这样做,不用更改Client的程序代码,并能提高程序的响应性。
实现:Java语言中,为了简化启动线程的程序,可使用匿名的内部类。
相关:
想节省启动线程所花费的时间,可以使用Worker Thread Pattern。
想要将处理的结果返回给Client时,可以使用Future Pattern。

Worker Thread Pattern- 等到工作来,来了就工作
别名:Thread Pool,Background Thread
背景:线程(Client)要调用实例(Host)的方法。
问题:如果问题的处理很花时间,程序的响应会降低。为了提高响应性,而启动新的线程来处理方时,启动线程所花时间又会降低性能。另外,当送出的请求态度时,会启动过多的线程,这会使承受量变差。
解决方式:
首先,我们事先启动一些用来进行处理的线程(Worker Thread)。并将代表请求的实例传给Worker线程,这样就不需要每次都重新启动新的线程了。
相关:
想要获取Worker线程的处理结果时,可以使用Future Pattern。
想要将代表的实例传递给Wroker线程时,可以使用Producer-Consumer Pattern。

Future Pattern- 先给您这张提货单
背景:线程(Client)会将工作委托给其他线程,而Client希望得到处理的结果。
问题:将工作委托给别人时,如果又等待执行结果,会使响应性降低。
解决方式:
首先,建立一个与处理结果具有相同接口的Future,在处理开始时,先把Future当作返回值返回。处理的结果事后再设置给Future。这样Client就可以在适当的时机,通过Future获得(等待)处理的结果。
相关:
等待Client的处理结果时,会使用Guarded Suspension Pattern。
Future Pattern可用在Thread-Per-Message想要获取处理结果时。
Future Pattern可用在Worker Thread Pattern想要获取处理结果时。

Two-Phase Termination Pattern- 快把玩具收拾好,去睡觉吧
背景:想要结束运行中的线程。
问题:从外部忽然结束掉线程,会丧失安全性。
解决方式:
首先,适合进行终止的时机,还是要交给线程自己判断。所以,定义一个送出“终止请求”的方法用来结束线程。这个方法事实上只会将标识设置成“收到终止请求”而已,线程要在每个可以开始终止处理的地方检查这个标识,如果检查结果为真,就开始进行终止处理。
实 现:Java语言中,不但要设置表示收到终止请求的标识,还要使用interrupt()中断掉wait、sleep、join的等待状态。因为线程到 wait、sleep、join抛出InterruptedException以后,就不是中断状态了,所以若是使用isInterrupted()来检 查终止请求,必须特别小心。
相关:
进行终止处理时,为了禁止其他操作,可使用Balking Pattern。
为了确实进行终止处理,使用了Before/After Pattern。

Thread-Specific Storage Pattern - 每个线程的保管箱
别名:Per-Thread Attribute,Thread-Specific Data、Thread-Specific Field、Thread-Local Storage
背景:想要将假定在单线程环境下运行的对象(TSObject),在多线程的环境下使用。
问 题:想要使用TSObject并不简单。要将TSObject改写成支持多线程,可能一不小心就丢掉安全性和生命性了。而且,TSObject可能根本不 能改写,而我们也不想改写使用TSObject的对象(Client)的程序代码,所以也不想修改TSObject的接口。
解决方式:
建立线程独有的空间,并管理这些空间与线程的对照关系。
首先,建立一个与TSObject具有相同接口的TSObjectProxy参与者。并建立TSObjectCollention,管理“Client-->TSObject”的对照关系。
TSObjectProxy会通过TSObjectCollention获取当前线程所对应的TSObject,并将工作委托给TSObject。Client会拿TSObjectProxy来代替TSObject使用。
这样一来,每个TSObject一定只会有特定的一个线程调用他,所以TSObject不需要进行共享互斥。关于多线程的部分,都隐藏在TSObjectCollention里了。另外,TSObject的接口也不必修改。
不过,使用Thread-Specific Pattern等于是在程序里加上隐性的context,有程序的可读性可能变差的危险性。
实现:Java语言中,使用java.lang.ThreadLocal类担任TSObjectCollention。
相关:
要对多数的线程进行共享互斥时,要使用Single Threaded Execution Pattern。

Active Objects Pattern- 接受异步消息的主动对象
别名:Actor,Concurrent Object
背景:这里有送出请求的线程(Client)与时机用来进行处理的对象(Servant)。可是Servant是假定在单线程环境下运行开发出来的。
问题:想要以多个Client使用Srevant,而Servant又不是线程安全。如果Servant处理的操作很花时间,还会拖垮Client的响应性。
解决方式:
我们在此建立一个可接受异步消息的主动对象,让他拥有独立于Client的线程。
首先,在这里加入一个Scheduler。Servant是有Scheduler调用的。这是只有一个Worker线程的Worker Thread Pattern。这样一来,Servant就不需要支持多线程,也能处理多个Client的请求。
Client送出请求的操作实现时是调用Proxy。Proxy会将请求转换成一个对象,并使用Producer-Consumer Pattern,传给Scheduler。这样,就算Servant进行处理需要花时间,也不会拖慢Client的响应性。
接下来,选杂要执行的请求,则是Scheduler的工作。执行请求的顺序是由Scheduler决定的。
执行结果可使用Future Pattern返回给Client。
相关:
事先Scheduler的部分,使用了Worker Thread Pattern。
从Proxy将请求传给Scheduler的部分,使用了Producer-Consumer Pattern。
对Client返回执行结果时,使用了Future Pattern。

附录B  Java 的内存模型

Java 的内存模型分为主存储器(main memory)和工作存储器(working memory)两种。主存储器是对象实例位置所在的区域,所有的实例都 存在于主存储器内。主存储器为所有的线程所共用。工作存储器为各线程独立拥有的工作区,在工作存储器中存在有主存储器必要部分的拷贝,称之为工作拷贝 (working copy)。

线程无法对主存储器直接进行操作,因此它无法直接引用/指定字段的值。当线程预引用字段的值时,会一次将 值从主存储器拷贝到该线程的工作存储器,线程就可以引用这个工作拷贝。但再度引用同一字段的值时,线程是使用前一次的工作拷贝,还是重新从主存储器同步新 的工作拷贝,则是由Java执行处理系统决定的。类似的,当线程预将值指定给字段时,会一次将值指定给位于工作存储器上的工作拷贝,但工作拷贝什么时候会 映像到主存储器,则是由Java执行处理系统决定的,当线程反复指定同一字段,有可能每次都同步到主存储器,也可能只有最后一次的工作拷贝被同步到主存储 器,这还是由Java执行处理系统决定的。

主存储器好比是大家都看得到的黑板;而工作存储器则象是每个学生的笔记本。

synchronized的两项功能:
1)  当线程要进入synchronized时:
a)  如果工作存储器有未映像到主存储器的工作拷贝,则该内容就会被强制写入主存储器。这样之前的计算结果会被全部写入主存储器,因而可以被其他线程看到。
b)  紧接着,工作存储器上的工作拷贝将会被全部丢弃。之后,欲引用主存储器上的值,必定会从主存储器将值拷贝到工作拷贝。
2)  当线程要退出synchronized时:
如果如果工作存储器有未映像到主存储器的工作拷贝,则该内容就会被强制写入主存储器。这样之前的计算结果会被全部写入主存储器,因而可以被其他线程看到。

也就是说,在内存同步方面,当线程欲进出synchronized时,便会将自己工作存储器的内容完全映像到主存储器。

volatile的两项功能:
1)  进行内存的同步。当对volatile的字段进行读操作前,该字段会主存储器拷贝到工作存储器,当volatile的字段进行写操作后,该字段会工作存储器拷贝到主存储器。
2)  以atomic的方式来进行long、double的指定。
atomic和内存同步是两回事,所以,被数个线程进行变更/引用的字段,即使是int型,也必须以synchronized或volatile来保护。

Double Checked Locking Pattern的危险性
为了提高性能,可能会写出下面这样的Double Checked Locking Pattern的代码:
class MySystem {
  private static MySystem instance = null;
  …
  public static MySystem getInstance() {
    if (instance == null) {
      synchronized (MySystem.class) {
        if (instance == null) {
          instance = new MySystem();  // (d)
        }
      }
    }
    return instance;
  }
}
但 这样的代码是有安全性的问题的,原因在于,在刚执行完(d)的工作且还未退出synchronized时,初始化完成信息还存在于当前线程的工作存储器 上,不保证映像到主存储器。而这是如果其他线程获得控制权,会因为instance不为null(这里意思是instance已经被映像到主存储器,而 MySystem的其他实例字段可能还为映像到主存储器),而直接访问主存储器中的instance实例的字段信息,但这些信息在主存储器中还是无效的。

正确的解决方法是要么牺牲性能:
class MySystem {
  private static MySystem instance = null;
  …
  public static synchronized MySystem getInstance() {
    if (instance == null) {
      instance = new MySystem();  // (d)
    }
    return instance;
  }
}
要么更加简化:
class MySystem {
  private static MySystem instance = new MySystem();
  …
  public static synchronized MySystem getInstance() {
    return instance;
  }
}

 


java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。

Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法:

Thread t = new Thread();
t.start();

start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不做就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。

因此,有两个方法可以实现自己的线程:

方法1:自己的类extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:

public class MyThread extends Thread {
    public run() {
        System.out.println("MyThread.run()");
    }
}

在合适的地方启动线程:new MyThread().start();

方法2:如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口:

public class MyThread extends OtherClass implements Runnable {
    public run() {
        System.out.println("MyThread.run()");
    }
}

为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:

MyThread myt = new MyThread();
Thread t = new Thread(myt);
t.start();

事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:

public void run() {
    if (target != null) {
        target.run();
    }
}

线程还有一些Name, ThreadGroup, isDaemon等设置,由于和线程设计模式关联很少,这里就不多说了。

由于同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会造成冲突,因此,线程必须实现共享互斥,使多线程同步。

最简单的同步是将一个方法标记为synchronized,对同一个实例来说,任一时刻只能有一个synchronized方法在执行。当一个方法 正在执行某个synchronized方法时,其他线程如果想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。

但是,非synchronized方法不受影响,不管当前有没有执行synchronized方法,非synchronized方法都可以被多个线程同时执行。

此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不同实例的synchronized方法是可以并发的。 例如,class A定义了synchronized方法sync(),则不同实例a1.sync()和a2.sync()可以同时由两个线程来执行。

多线程同步的实现最终依赖锁机制。我们可以想象某一共享资源是一间屋子,每个人都是一个线程。当A希望进入房间时,他必须获得门锁,一旦A获得门 锁,他进去后就立刻将门锁上,于是B,C,D...就不得不在门外等待,直到A释放锁出来后,B,C,D...中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,可以先到先得,也可以随机挑选),然后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。

Java语言规范内置了对多线程的支持。对于Java程序来说,每一个对象实例都有一把“锁”,一旦某个线程获得了该锁,别的线程如果希望获得该锁,只能等待这个线程释放锁之后。获得锁的方法只有一个,就是synchronized关键字。例如:

public class SharedResource {
    private int count = 0;

    public int getCount() { return count; }

    public synchronized void setCount(int count) { this.count = count; }

}

同步方法public synchronized void setCount(int count) { this.count = count; } 事实上相当于:

public void setCount(int count) {
    synchronized(this) { // 在此获得this锁
         this.count = count;

    } // 在此释放this锁
}

红色部分表示需要同步的代码段,该区域为“危险区域”,如果两个以上的线程同时执行,会引发冲突,因此,要更改SharedResource的内部状态,必须先获得SharedResource实例的锁。

退出synchronized块时,线程拥有的锁自动释放,于是,别的线程又可以获取该锁了。

为了提高性能,不一定要锁定this,例如,SharedResource有两个独立变化的变量:

public class SharedResouce {
    private int a = 0;
    private int b = 0;

    public synchronized void setA(int a) { this.a = a; }

    public synchronized void setB(int b) { this.b = b; }
}

若同步整个方法,则setA()的时候无法setB(),setB()时无法setA()。为了提高性能,可以使用不同对象的锁:

public class SharedResouce {
    private int a = 0;
    private int b = 0;
    private Object sync_a = new Object();
    private Object sync_b = new Object();

    public void setA(int a) {
        synchronized(sync_a) {
            this.a = a;
        }
    }

    public synchronized void setB(int b) {
        synchronized(sync_b) {
            this.b = b;
        }
    }
}

通常,多线程之间需要协调工作。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。如果图片还没有下载完,displayThread可以暂停,当downloadThread完成了任务 后,再通知displayThread“图片准备完毕,可以显示了”,这时,displayThread继续执行。

以上逻辑简单的说就是:如果条件不满足,则等待。当条件满足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait/notify。等待机制与锁机制是密切关联的。例如:

synchronized(obj) {
    while(!condition) {
        obj.wait();
    }
    obj.doSomething();
}

当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait()。

在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A:

synchronized(obj) {
    condition = true;
    obj.notify();
}

需要注意的概念是:

# 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) {...} 代码段内。

# 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj) {...} 代码段内唤醒A。

# 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。

# 如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。

# obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。

# 当B调用obj.notify/notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。


前面讲了wait/notify机制,Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不同点是: sleep并不释放锁,并且sleep的暂停和wait暂停是不一样的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。

但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。

如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在 wait/sleep/join,则线程B会立刻抛出InterruptedException,在catch() {} 中直接return即可安全地结束线程。

需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用 interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到 wait()/sleep()/join()后,就会立刻抛出InterruptedException。


GuardedSuspention模式主要思想是:

当条件不满足时,线程等待,直到条件满足时,等待该条件的线程被唤醒。

我们设计一个客户端线程和一个服务器线程,客户端线程不断发送请求给服务器线程,服务器线程不断处理请求。当请求队列为空时,服务器线程就必须等待,直到客户端发送了请求。

先定义一个请求队列:Queue

package com.crackj2ee.thread;

import java.util.*;

public class Queue {
    private List queue = new LinkedList();

    public synchronized Request getRequest() {
        while(queue.size()==0) {
            try {
                this.wait();
            }
            catch(InterruptedException ie) {
                return null;
            }
        }
        return (Request)queue.remove(0);
    }

    public synchronized void putRequest(Request request) {
        queue.add(request);
        this.notifyAll();
    }

}

蓝色部分就是服务器线程的等待条件,而客户端线程在放入了一个request后,就使服务器线程等待条件满足,于是唤醒服务器线程。

客户端线程:ClientThread

package com.crackj2ee.thread;

public class ClientThread extends Thread {
    private Queue queue;
    private String clientName;

    public ClientThread(Queue queue, String clientName) {
        this.queue = queue;
        this.clientName = clientName;
    }

    public String toString() {
        return "[ClientThread-" + clientName + "]";
    }

    public void run() {
        for(int i=0; i<100; i++) {
            Request request = new Request("" + (long)(Math.random()*10000));
            System.out.println(this + " send request: " + request);
            queue.putRequest(request);
            try {
                Thread.sleep((long)(Math.random() * 10000 + 1000));
            }
            catch(InterruptedException ie) {
            }
        }
        System.out.println(this + " shutdown.");
    }
}

服务器线程:ServerThread

package com.crackj2ee.thread;
public class ServerThread extends Thread {
    private boolean stop = false;
    private Queue queue;

    public ServerThread(Queue queue) {
        this.queue = queue;
    }

    public void shutdown() {
        stop = true;
        this.interrupt();
        try {
            this.join();
        }
        catch(InterruptedException ie) {}
    }

    public void run() {
        while(!stop) {
            Request request = queue.getRequest();
            System.out.println("[ServerThread] handle request: " + request);
            try {
                Thread.sleep(2000);
            }
            catch(InterruptedException ie) {}
        }
        System.out.println("[ServerThread] shutdown.");
    }
}

服务器线程在红色部分可能会阻塞,也就是说,Queue.getRequest是一个阻塞方法。这和java标准库的许多IO方法类似。

最后,写一个Main来启动他们:

package com.crackj2ee.thread;

public class Main {

    public static void main(String[] args) {
        Queue queue = new Queue();
        ServerThread server = new ServerThread(queue);
        server.start();
        ClientThread[] clients = new ClientThread[5];
        for(int i=0; i<clients.length; i++) {
            clients[i] = new ClientThread(queue, ""+i);
            clients[i].start();
        }
        try {
            Thread.sleep(100000);
        }
        catch(InterruptedException ie) {}
        server.shutdown();
    }
}

我们启动了5个客户端线程和一个服务器线程,运行结果如下:

[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......

可以观察到ServerThread处理来自不同客户端的请求。

思考

Q: 服务器线程的wait条件while(queue.size()==0)能否换成if(queue.size()==0)?

A: 在这个例子中可以,因为服务器线程只有一个。但是,如果服务器线程有多个(例如Web应用程序有多个线程处理并发请求,这非常普遍),就会造成严重问题。

Q: 能否用sleep(1000)代替wait()?

A: 绝对不可以。sleep()不会释放锁,因此sleep期间别的线程根本没有办法调用getRequest()和putRequest(),导致所有相关线程都被阻塞。

Q: (Request)queue.remove(0)可以放到synchronized() {}块外面吗?

A: 不可以。因为while()是测试queue,remove()是使用queue,两者是一个原子操作,不能放在synchronized外面。

总结

多线程设计看似简单,实际上必须非常仔细地考虑各种锁定/同步的条件,稍不小心,就可能出错。并且,当线程较少时,很可能发现不了问题,一旦问题出现又难以调试。

所幸的是,已有一些被验证过的模式可以供我们使用,我们会继续介绍一些常用的多线程设计模式。

前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。

然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。

每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。

Worker Pattern实现了类似线程池的功能。首先定义Task接口:

package com.crackj2ee.thread;
public interface Task {
    void execute();
}

线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。

具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask:

// CalculateTask.java
package com.crackj2ee.thread;
public class CalculateTask implements Task {
    private static int count = 0;
    private int num = count;
    public CalculateTask() {
        count++;
    }
    public void execute() {
        System.out.println("[CalculateTask " + num + "] start...");
        try {
            Thread.sleep(3000);
        }
        catch(InterruptedException ie) {}
        System.out.println("[CalculateTask " + num + "] done.");
    }
}

// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
    private static int count = 0;
    private int num = count;
    public TimerTask() {
        count++;
    }
    public void execute() {
        System.out.println("[TimerTask " + num + "] start...");
        try {
            Thread.sleep(2000);
        }
        catch(InterruptedException ie) {}
        System.out.println("[TimerTask " + num + "] done.");
    }
}

以上任务均简单的sleep若干秒。

TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:

package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
    private List queue = new LinkedList();
    public synchronized Task getTask() {
        while(queue.size()==0) {
            try {
                this.wait();
            }
            catch(InterruptedException ie) {
                return null;
            }
        }
        return (Task)queue.remove(0);
    }
    public synchronized void putTask(Task task) {
        queue.add(task);
        this.notifyAll();
    }
}

终于到了真正的WorkerThread,这是真正执行任务的服务器线程:

package com.crackj2ee.thread;
public class WorkerThread extends Thread {
    private static int count = 0;
    private boolean busy = false;
    private boolean stop = false;
    private TaskQueue queue;
    public WorkerThread(ThreadGroup group, TaskQueue queue) {
        super(group, "worker-" + count);
        count++;
        this.queue = queue;
    }
    public void shutdown() {
        stop = true;
        this.interrupt();
        try {
            this.join();
        }
        catch(InterruptedException ie) {}
    }
    public boolean isIdle() {
        return !busy;
    }
    public void run() {
        System.out.println(getName() + " start.");       
        while(!stop) {
            Task task = queue.getTask();
            if(task!=null) {
                busy = true;
                task.execute();
                busy = false;
            }
        }
        System.out.println(getName() + " end.");
    }
}

前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。

最后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数:

package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
    private List threads = new LinkedList();
    private TaskQueue queue;
    public ThreadPool(TaskQueue queue) {
        super("Thread-Pool");
        this.queue = queue;
    }
    public synchronized void addWorkerThread() {
        Thread t = new WorkerThread(this, queue);
        threads.add(t);
        t.start();
    }
    public synchronized void removeWorkerThread() {
        if(threads.size()>0) {
            WorkerThread t = (WorkerThread)threads.remove(0);
            t.shutdown();
        }
    }
    public synchronized void currentStatus() {
        System.out.println("-----------------------------------------------");
        System.out.println("Thread count = " + threads.size());
        Iterator it = threads.iterator();
        while(it.hasNext()) {
            WorkerThread t = (WorkerThread)it.next();
            System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
        }
        System.out.println("-----------------------------------------------");
    }
}

currentStatus()方法是为了方便调试,打印出所有线程的当前状态。

最后,Main负责完成main()方法:

package com.crackj2ee.thread;
public class Main {
    public static void main(String[] args) {
        TaskQueue queue = new TaskQueue();
        ThreadPool pool = new ThreadPool(queue);
        for(int i=0; i<10; i++) {
            queue.putTask(new CalculateTask());
            queue.putTask(new TimerTask());
        }
        pool.addWorkerThread();
        pool.addWorkerThread();
        doSleep(8000);
        pool.currentStatus();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        doSleep(5000);
        pool.currentStatus();
    }
    private static void doSleep(long ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:

worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.

仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,6个线程中的两个状态变成idle,说明处于wait()状态。

思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。

如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,最终会导致服务器内存耗尽。因此,可以限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE

多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。简单说,就是要避免“写-写”冲突和“读-写”冲突。但是同时读是允许的,因为“读-读”不冲突,而且很安全。

要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明:

DataHandler对象保存了一个可读写的char[]数组:

package com.crackj2ee.thread;

public class DataHandler {
    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();

    private char[] doRead() {
        char[] ret = new char[buffer.length];
        for(int i=0; i<buffer.length; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }

    private void doWrite(char[] data) {
        if(data!=null) {
            buffer = new char[data.length];
            for(int i=0; i<buffer.length; i++) {
                buffer[i] = data[i];
                sleep(10);
            }
        }
    }

    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合通常的情况。

为了让多线程能安全读写,我们设计了一个ReadWriteLock:

package com.crackj2ee.thread;
public class ReadWriteLock {
    private int readingThreads = 0;
    private int writingThreads = 0;
    private int waitingThreads = 0; // waiting for write
    private boolean preferWrite = true;

    public synchronized void readLock() throws InterruptedException {
        while(writingThreads>0 || (preferWrite && waitingThreads>0))
            this.wait();
        readingThreads++;
    }

    public synchronized void readUnlock() {
        readingThreads--;
        preferWrite = true;
        notifyAll();
    }

    public synchronized void writeLock() throws InterruptedException {
        waitingThreads++;
        try {
            while(readingThreads>0 || writingThreads>0)
                this.wait();
        }
        finally {
            waitingThreads--;
        }
        writingThreads++;
    }

    public synchronized void writeUnlock() {
        writingThreads--;
        preferWrite = false;
        notifyAll();
    }
}

readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {
    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();
    // lock:
    private ReadWriteLock lock = new ReadWriteLock();

    public char[] read(String name) throws InterruptedException {
        System.out.println(name + " waiting for read...");
        lock.readLock();
        try {
            char[] data = doRead();
            System.out.println(name + " reads data: " + new String(data));
            return data;
        }
        finally {
            lock.readUnlock();
        }
    }

    public void write(String name, char[] data) throws InterruptedException {
        System.out.println(name + " waiting for write...");
        lock.writeLock();
        try {
            System.out.println(name + " wrote data: " + new String(data));
            doWrite(data);
        }
        finally {
            lock.writeUnlock();
        }
    }

    private char[] doRead() {
        char[] ret = new char[buffer.length];
        for(int i=0; i<buffer.length; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }
    private void doWrite(char[] data) {
        if(data!=null) {
            buffer = new char[data.length];
            for(int i=0; i<buffer.length; i++) {
                buffer[i] = data[i];
                sleep(10);
            }
        }
    }
    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

public方法read()和write()完全封装了底层的ReadWriteLock,因此,多线程可以安全地调用这两个方法:

// ReadingThread不断读取数据:
package com.crackj2ee.thread;
public class ReadingThread extends Thread {
    private DataHandler handler;
    public ReadingThread(DataHandler handler) {
        this.handler = handler;
    }
    public void run() {
        for(;;) {
            try {
                char[] data = handler.read(getName());
                Thread.sleep((long)(Math.random()*1000+100));
            }
            catch(InterruptedException ie) {
                break;
            }
        }
    }
}

// WritingThread不断写入数据,每次写入的都是10个相同的字符:
package com.crackj2ee.thread;
public class WritingThread extends Thread {
    private DataHandler handler;
    public WritingThread(DataHandler handler) {
        this.handler = handler;
    }
    public void run() {
        char[] data = new char[10];
        for(;;) {
            try {
                fill(data);
                handler.write(getName(), data);
                Thread.sleep((long)(Math.random()*1000+100));
            }
            catch(InterruptedException ie) {
                break;
            }
        }
    }
    // 产生一个A-Z随机字符,填入char[10]:
    private void fill(char[] data) {
        char c = (char)(Math.random()*26+'A');
        for(int i=0; i<data.length; i++)
            data[i] = c;
    }
}

最后Main负责启动这些线程:

package com.crackj2ee.thread;
public class Main {
    public static void main(String[] args) {
        DataHandler handler = new DataHandler();
        Thread[] ts = new Thread[] {
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new WritingThread(handler),
                new WritingThread(handler)
        };
        for(int i=0; i<ts.length; i++) {
            ts[i].start();
        }
    }
}

我们启动了5个读线程和2个写线程,运行结果如下:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read...
Thread-4 waiting for read...
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read...
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read...
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-5 waiting for write...
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write...
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read...
Thread-3 reads data: AAAAAAAAAA
......

可以看到,每次读/写都是完整的原子操作,因为我们每次写入的都是10个相同字符。并且,每次读出的都是最近一次写入的内容。

如果去掉ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {

    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();

    public char[] read(String name) throws InterruptedException {
        char[] data = doRead();
        System.out.println(name + " reads data: " + new String(data));
        return data;
    }
    public void write(String name, char[] data) throws InterruptedException {
        System.out.println(name + " wrote data: " + new String(data));
        doWrite(data);
    }

    private char[] doRead() {
        char[] ret = new char[10];
        for(int i=0; i<10; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }
    private void doWrite(char[] data) {
        for(int i=0; i<10; i++) {
            buffer[i] = data[i];
            sleep(10);
        }
    }
    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

运行结果如下:

Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE

可以看到在Thread-6写入EEEEEEEEEE的过程中,3个线程读取的内容是不同的。

思考

java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现自己的逻辑锁,就必须仔细设计ReadWriteLock。

Q: lock.readLock()为什么不放入try{ } 内?
A: 因为readLock()会抛出InterruptedException,导致readingThreads++不执行,而readUnlock()在 finally{ } 中,导致readingThreads--执行,从而使readingThread状态出错。writeLock()也是类似的。

Q: preferWrite有用吗?
A: 如果去掉preferWrite,线程安全不受影响。但是,如果读取线程很多,上一个线程还没有读取完,下一个线程又开始读了,就导致写入线程长时间无法 获得writeLock;如果写入线程等待的很多,一个接一个写,也会导致读取线程长时间无法获得readLock。preferWrite的作用是让读 /写交替执行,避免由于读线程繁忙导致写无法进行和由于写线程繁忙导致读无法进行。

Q: notifyAll()换成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一个线程刚读取完毕,此时preferWrite=true,再notify(),若恰好唤醒的是一个读 线程,则while(writingThreads>0 || (preferWrite && waitingThreads>0))可能为true导致该读线程继续等待,而等待写入的线程也处于wait()中,结果所有线程都处于wait ()状态,谁也无法唤醒谁。因此,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read...
Thread-1 waiting for read...
Thread-3 waiting for read...
Thread-0 waiting for read...
Thread-4 waiting for read...
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read...
(运行到此不动了)

注意到这种死锁是由于所有线程都在等待别的线程唤醒自己,结果都无法醒过来。这和两个线程希望获得对方已有的锁造成死锁不同。因此多线程设计的难度远远高于单线程应用。

 
 
分享到:
评论

相关推荐

    Java多线程设计模式上传文件

    Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式...

    Java多线程设计模式_清晰完整PDF版 Java多线程设计模式源代码

    Java多线程设计模式是Java开发中的重要领域,它涉及到如何在并发环境下高效、安全地管理资源和控制程序执行流程。本资料集包含了清晰完整的PDF版书籍和源代码,为学习和理解Java多线程设计模式提供了丰富的素材。 ...

    JAVA多线程设计模式.pdf 下载

    标题和描述均指向了一个关于Java多线程设计模式的PDF文档的下载链接,这暗示了文档的主要内容将围绕Java中的多线程编程及其设计模式展开。在Java领域,多线程是一个核心概念,它允许程序执行多个任务同时进行,极大...

    java多线程设计模式详解(PDF及源码)

    书中包含Java线程的介绍导读、12个重要的线程设计模式和全书总结以及丰富的附录内容。第一章相关线程设计模式的介绍,都举一反三使读者学习更有效。最后附上练习问题,让读者可以温故而知新,能快速地吸收书中的...

    java多线程设计模式详解.pdf

    标题“java多线程设计模式详解.pdf”中提到的知识点是关于Java多线程编程中设计模式的应用。Java多线程是并发编程的重要组成部分,设计模式则是软件工程中用于解决特定问题的最佳实践。将两者结合起来,意味着此文件...

    java多线程设计模式详解(PDF及源码).zip

    Java多线程设计模式是Java开发中的重要领域,它涉及到如何高效、安全地利用系统资源进行并发处理。在这个主题中,我们将深入探讨单线程、生产者与消费者模型以及Java中实现多线程的各种方法。 首先,单线程是程序...

    java多线程设计模式

    ### Java多线程设计模式详解 #### 一、Java多线程基础 Java语言自诞生以来,便内置了对多线程的支持。多线程可以让应用程序在同一时间处理多个任务,提高程序的执行效率和响应速度。Java中创建线程有两种基本方式...

    JAVA多线程设计模式

    本书《JAVA多线程设计模式》针对Java语言的多线程编程进行深入讲解,采用易于理解的方式介绍了与Java线程相关的多个设计模式,并通过实例程序与UML图示辅助阐述。书中的关键代码片段都有标注,易于读者理解与学习,...

    java多线程设计模式详解(pdf版)

    java多线程设计模式,作者是:结城 浩,由 博硕文化 译。2005年4月,由中国铁道出版社出版。内附带部分源代码。

    JAVA多线程设计模式_中国铁道出版社_源码

    JAVA多线程设计模式_中国铁道出版社 本书浅显易懂的介绍了JAVA线程相关的设计模式,通过程序范例和UML图示来一一解说,书中代码的重要部分加了标注以使读者更加容易理解,再加上图文并茂,对于初学者还是程序设计...

    Java 多线程设计模式

    Java 多线程 设计模式

    java 多线程设计模式

    Java多线程设计模式是Java并发编程中的一种高级技巧,它可以帮助开发者在处理并发问题时,提高代码的可读性、可维护性和性能。多线程设计模式是基于Java的并发API,如Thread、Runnable、ExecutorService等,通过特定...

Global site tag (gtag.js) - Google Analytics