`
CherryRemind
  • 浏览: 54683 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论
阅读更多

java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。

Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法:

Thread t = new Thread();
t.start();

start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不做就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。

因此,有两个方法可以实现自己的线程:

方法1:自己的类extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:

public class MyThread extends Thread {
public run() {
System.out.println("MyThread.run()");
}
}

在合适的地方启动线程:new MyThread().start();

方法2:如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口:

public class MyThread extends OtherClass implements Runnable {
public run() {
System.out.println("MyThread.run()");
}
}

为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:

MyThread myt = new MyThread();
Thread t = new Thread(myt);
t.start();

事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:

public void run() {
if (target != null) {
target.run();
}
}

线程还有一些Name, ThreadGroup, isDaemon等设置,由于和线程设计模式关联很少,这里就不多说了。

由于同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会造成冲突,因此,线程必须实现共享互斥,使多线程同步。

最简单的同步是将一个方法标记为synchronized,对同一个实例来说,任一时刻只能有一个synchronized方法在执行。当一个方法正在执行某个synchronized方法时,其他线程如果想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。

但是,非synchronized方法不受影响,不管当前有没有执行synchronized方法,非synchronized方法都可以被多个线程同时执行。

此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不同实例的synchronized方法是可以并发的。例如,class A定义了synchronized方法sync(),则不同实例a1.sync()和a2.sync()可以同时由两个线程来执行。

多线程同步的实现最终依赖锁机制。我们可以想象某一共享资源是一间屋子,每个人都是一个线程。当A希望进入房间时,他必须获得门锁,一旦A获得门锁,他进去后就立刻将门锁上,于是B,C,D...就不得不在门外等待,直到A释放锁出来后,B,C,D...中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,可以先到先得,也可以随机挑选),然后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。

Java语言规范内置了对多线程的支持。对于Java程序来说,每一个对象实例都有一把“锁”,一旦某个线程获得了该锁,别的线程如果希望获得该锁,只能等待这个线程释放锁之后。获得锁的方法只有一个,就是synchronized关键字。例如:

public class SharedResource {
private int count = 0;

public int getCount() { return count; }

public synchronized void setCount(int count) { this.count = count; }

}

同步方法public synchronized void setCount(int count) { this.count = count; } 事实上相当于:

public void setCount(int count) {
synchronized(this) { // 在此获得this锁
this.count = count;
} // 在此释放this锁
}

红色部分表示需要同步的代码段,该区域为“危险区域”,如果两个以上的线程同时执行,会引发冲突,因此,要更改SharedResource的内部状态,必须先获得SharedResource实例的锁。

退出synchronized块时,线程拥有的锁自动释放,于是,别的线程又可以获取该锁了。

为了提高性能,不一定要锁定this,例如,SharedResource有两个独立变化的变量:

public class SharedResouce {
private int a = 0;
private int b = 0;

public synchronized void setA(int a) { this.a = a; }

public synchronized void setB(int b) { this.b = b; }
}

若同步整个方法,则setA()的时候无法setB(),setB()时无法setA()。为了提高性能,可以使用不同对象的锁:

public class SharedResouce {
private int a = 0;
private int b = 0;
private Object sync_a = new Object();
private Object sync_b = new Object();

public void setA(int a) {
synchronized(sync_a) {
this.a = a;
}
}

public synchronized void setB(int b) {
synchronized(sync_b) {
this.b = b;
}
}
}

通常,多线程之间需要协调工作。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。如果图片还没有下载完,displayThread可以暂停,当downloadThread完成了任务后,再通知displayThread“图片准备完毕,可以显示了”,这时,displayThread继续执行。

以上逻辑简单的说就是:如果条件不满足,则等待。当条件满足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait/notify。等待机制与锁机制是密切关联的。例如:

synchronized(obj) {
while(!condition) {
obj.wait();
}
obj.doSomething();
}

当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait()。

在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A:

synchronized(obj) {
condition = true;
obj.notify();
}

需要注意的概念是:

# 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) {...} 代码段内。

# 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj) {...} 代码段内唤醒A。

# 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。

# 如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。

# obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。

# 当B调用obj.notify/notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。

前面讲了wait/notify机制,Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不同点是: sleep并不释放锁,并且sleep的暂停和wait暂停是不一样的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。

但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。

如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在 wait/sleep/join,则线程B会立刻抛出InterruptedException,在catch() {} 中直接return即可安全地结束线程。

需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用 interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到 wait()/sleep()/join()后,就会立刻抛出InterruptedException。

GuardedSuspention模式主要思想是:

当条件不满足时,线程等待,直到条件满足时,等待该条件的线程被唤醒。

我们设计一个客户端线程和一个服务器线程,客户端线程不断发送请求给服务器线程,服务器线程不断处理请求。当请求队列为空时,服务器线程就必须等待,直到客户端发送了请求。

先定义一个请求队列:Queue

package com.crackj2ee.thread;

import java.util.*;

public class Queue {
private List queue = new LinkedList();

public synchronized Request getRequest() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Request)queue.remove(0);
}

public synchronized void putRequest(Request request) {
queue.add(request);
this.notifyAll();
}

}

蓝色部分就是服务器线程的等待条件,而客户端线程在放入了一个request后,就使服务器线程等待条件满足,于是唤醒服务器线程。

客户端线程:ClientThread

package com.crackj2ee.thread;

public class ClientThread extends Thread {
private Queue queue;
private String clientName;

public ClientThread(Queue queue, String clientName) {
this.queue = queue;
this.clientName = clientName;
}

public String toString() {
return "[ClientThread-" + clientName + "]";
}

public void run() {
for(int i=0; i<100; i++) {
Request request = new Request("" + (long)(Math.random()*10000));
System.out.println(this + " send request: " + request);
queue.putRequest(request);
try {
Thread.sleep((long)(Math.random() * 10000 + 1000));
}
catch(InterruptedException ie) {
}
}
System.out.println(this + " shutdown.");
}
}

服务器线程:ServerThread

package com.crackj2ee.thread;
public class ServerThread extends Thread {
private boolean stop = false;
private Queue queue;

public ServerThread(Queue queue) {
this.queue = queue;
}

public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}

public void run() {
while(!stop) {
Request request = queue.getRequest();
System.out.println("[ServerThread] handle request: " + request);
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
}
System.out.println("[ServerThread] shutdown.");
}
}

服务器线程在红色部分可能会阻塞,也就是说,Queue.getRequest是一个阻塞方法。这和java标准库的许多IO方法类似。

最后,写一个Main来启动他们:

package com.crackj2ee.thread;

public class Main {

public static void main(String[] args) {
Queue queue = new Queue();
ServerThread server = new ServerThread(queue);
server.start();
ClientThread[] clients = new ClientThread[5];
for(int i=0; i<clients.length; i++) {
clients[i] = new ClientThread(queue, ""+i);
clients[i].start();
}
try {
Thread.sleep(100000);
}
catch(InterruptedException ie) {}
server.shutdown();
}
}

我们启动了5个客户端线程和一个服务器线程,运行结果如下:

[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......

可以观察到ServerThread处理来自不同客户端的请求。

思考

Q: 服务器线程的wait条件while(queue.size()==0)能否换成if(queue.size()==0)?

A: 在这个例子中可以,因为服务器线程只有一个。但是,如果服务器线程有多个(例如Web应用程序有多个线程处理并发请求,这非常普遍),就会造成严重问题。

Q: 能否用sleep(1000)代替wait()?

A: 绝对不可以。sleep()不会释放锁,因此sleep期间别的线程根本没有办法调用getRequest()和putRequest(),导致所有相关线程都被阻塞。

Q: (Request)queue.remove(0)可以放到synchronized() {}块外面吗?

A: 不可以。因为while()是测试queue,remove()是使用queue,两者是一个原子操作,不能放在synchronized外面。

总结

多线程设计看似简单,实际上必须非常仔细地考虑各种锁定/同步的条件,稍不小心,就可能出错。并且,当线程较少时,很可能发现不了问题,一旦问题出现又难以调试。

所幸的是,已有一些被验证过的模式可以供我们使用,我们会继续介绍一些常用的多线程设计模式。

前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。

然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。

每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。

Worker Pattern实现了类似线程池的功能。首先定义Task接口:

package com.crackj2ee.thread;
public interface Task {
void execute();
}

线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。

具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask:

// CalculateTask.java
package com.crackj2ee.thread;
public class CalculateTask implements Task {
private static int count = 0;
private int num = count;
public CalculateTask() {
count++;
}
public void execute() {
System.out.println("[CalculateTask " + num + "] start...");
try {
Thread.sleep(3000);
}
catch(InterruptedException ie) {}
System.out.println("[CalculateTask " + num + "] done.");
}
}

// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
private static int count = 0;
private int num = count;
public TimerTask() {
count++;
}
public void execute() {
System.out.println("[TimerTask " + num + "] start...");
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
System.out.println("[TimerTask " + num + "] done.");
}
}

以上任务均简单的sleep若干秒。

TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:

package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
private List queue = new LinkedList();
public synchronized Task getTask() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Task)queue.remove(0);
}
public synchronized void putTask(Task task) {
queue.add(task);
this.notifyAll();
}
}

终于到了真正的WorkerThread,这是真正执行任务的服务器线程:

package com.crackj2ee.thread;
public class WorkerThread extends Thread {
private static int count = 0;
private boolean busy = false;
private boolean stop = false;
private TaskQueue queue;
public WorkerThread(ThreadGroup group, TaskQueue queue) {
super(group, "worker-" + count);
count++;
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public boolean isIdle() {
return !busy;
}
public void run() {
System.out.println(getName() + " start.");
while(!stop) {
Task task = queue.getTask();
if(task!=null) {
busy = true;
task.execute();
busy = false;
}
}
System.out.println(getName() + " end.");
}
}

前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。

最后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数:

package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
private List threads = new LinkedList();
private TaskQueue queue;
public ThreadPool(TaskQueue queue) {
super("Thread-Pool");
this.queue = queue;
}
public synchronized void addWorkerThread() {
Thread t = new WorkerThread(this, queue);
threads.add(t);
t.start();
}
public synchronized void removeWorkerThread() {
if(threads.size()>0) {
WorkerThread t = (WorkerThread)threads.remove(0);
t.shutdown();
}
}
public synchronized void currentStatus() {
System.out.println("-----------------------------------------------");
System.out.println("Thread count = " + threads.size());
Iterator it = threads.iterator();
while(it.hasNext()) {
WorkerThread t = (WorkerThread)it.next();
System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
}
System.out.println("-----------------------------------------------");
}
}

currentStatus()方法是为了方便调试,打印出所有线程的当前状态。

最后,Main负责完成main()方法:

package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
TaskQueue queue = new TaskQueue();
ThreadPool pool = new ThreadPool(queue);
for(int i=0; i<10; i++) {
queue.putTask(new CalculateTask());
queue.putTask(new TimerTask());
}
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(8000);
pool.currentStatus();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(5000);
pool.currentStatus();
}
private static void doSleep(long ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:

worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.

仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,6个线程中的两个状态变成idle,说明处于wait()状态。

思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。

如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,最终会导致服务器内存耗尽。因此,可以限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE

多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。简单说,就是要避免“写-写”冲突和“读-写”冲突。但是同时读是允许的,因为“读-读”不冲突,而且很安全。

要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明:

DataHandler对象保存了一个可读写的char[]数组:

package com.crackj2ee.thread;

public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();

private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}

private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}

private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合通常的情况。

为了让多线程能安全读写,我们设计了一个ReadWriteLock:

package com.crackj2ee.thread;
public class ReadWriteLock {
private int readingThreads = 0;
private int writingThreads = 0;
private int waitingThreads = 0; // waiting for write
private boolean preferWrite = true;

public synchronized void readLock() throws InterruptedException {
while(writingThreads>0 || (preferWrite && waitingThreads>0))
this.wait();
readingThreads++;
}

public synchronized void readUnlock() {
readingThreads--;
preferWrite = true;
notifyAll();
}

public synchronized void writeLock() throws InterruptedException {
waitingThreads++;
try {
while(readingThreads>0 || writingThreads>0)
this.wait();
}
finally {
waitingThreads--;
}
writingThreads++;
}

public synchronized void writeUnlock() {
writingThreads--;
preferWrite = false;
notifyAll();
}
}

readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();
// lock:
private ReadWriteLock lock = new ReadWriteLock();

public char[] read(String name) throws InterruptedException {
System.out.println(name + " waiting for read...");
lock.readLock();
try {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
finally {
lock.readUnlock();
}
}

public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " waiting for write...");
lock.writeLock();
try {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}
finally {
lock.writeUnlock();
}
}

private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

public方法read()和write()完全封装了底层的ReadWriteLock,因此,多线程可以安全地调用这两个方法:

// ReadingThread不断读取数据:
package com.crackj2ee.thread;
public class ReadingThread extends Thread {
private DataHandler handler;
public ReadingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
for(;;) {
try {
char[] data = handler.read(getName());
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
}

// WritingThread不断写入数据,每次写入的都是10个相同的字符:
package com.crackj2ee.thread;
public class WritingThread extends Thread {
private DataHandler handler;
public WritingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
char[] data = new char[10];
for(;;) {
try {
fill(data);
handler.write(getName(), data);
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
// 产生一个A-Z随机字符,填入char[10]:
private void fill(char[] data) {
char c = (char)(Math.random()*26+'A');
for(int i=0; i<data.length; i++)
data[i] = c;
}
}

最后Main负责启动这些线程:

package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
DataHandler handler = new DataHandler();
Thread[] ts = new Thread[] {
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new WritingThread(handler),
new WritingThread(handler)
};
for(int i=0; i<ts.length; i++) {
ts[i].start();
}
}
}

我们启动了5个读线程和2个写线程,运行结果如下:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read...
Thread-4 waiting for read...
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read...
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read...
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-5 waiting for write...
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write...
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read...
Thread-3 reads data: AAAAAAAAAA
......

可以看到,每次读/写都是完整的原子操作,因为我们每次写入的都是10个相同字符。并且,每次读出的都是最近一次写入的内容。

如果去掉ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {

// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();

public char[] read(String name) throws InterruptedException {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}

private char[] doRead() {
char[] ret = new char[10];
for(int i=0; i<10; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
for(int i=0; i<10; i++) {
buffer[i] = data[i];
sleep(10);
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

运行结果如下:

Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE

可以看到在Thread-6写入EEEEEEEEEE的过程中,3个线程读取的内容是不同的。

思考

java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现自己的逻辑锁,就必须仔细设计ReadWriteLock。

Q: lock.readLock()为什么不放入try{ } 内?
A: 因为readLock()会抛出InterruptedException,导致readingThreads++不执行,而readUnlock()在 finally{ } 中,导致readingThreads--执行,从而使readingThread状态出错。writeLock()也是类似的。

Q: preferWrite有用吗?
A: 如果去掉preferWrite,线程安全不受影响。但是,如果读取线程很多,上一个线程还没有读取完,下一个线程又开始读了,就导致写入线程长时间无法获得writeLock;如果写入线程等待的很多,一个接一个写,也会导致读取线程长时间无法获得readLock。preferWrite的作用是让读 /写交替执行,避免由于读线程繁忙导致写无法进行和由于写线程繁忙导致读无法进行。

Q: notifyAll()换成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一个线程刚读取完毕,此时preferWrite=true,再notify(),若恰好唤醒的是一个读线程,则while(writingThreads>0 || (preferWrite && waitingThreads>0))可能为true导致该读线程继续等待,而等待写入的线程也处于wait()中,结果所有线程都处于wait ()状态,谁也无法唤醒谁。因此,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read...
Thread-1 waiting for read...
Thread-3 waiting for read...
Thread-0 waiting for read...
Thread-4 waiting for read...
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read...
(运行到此不动了)

注意到这种死锁是由于所有线程都在等待别的线程唤醒自己,结果都无法醒过来。这和两个线程希望获得对方已有的锁造成死锁不同。因此多线程设计的难度远远高于单线程应用。

分享到:
评论

相关推荐

    thread thread thread thread

    thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread thread ...

    RT-Thread常见函数.zip_RTT_rt thread_rt-thread函数_rt_thread函数_手册

    《RT-Thread常见函数》是针对RT-Thread实时操作系统中常用函数的一份详细参考资料,旨在帮助开发者更好地理解和应用RTT的API。RT-Thread(简称RTT)是一款成熟、稳定且功能丰富的开源实时操作系统,广泛应用于物联网...

    5,RT Thread操作系统教程及资料_rtthread_RT-Thread_

    RT Thread是一款轻量级、高性能、组件丰富的实时操作系统(RTOS),尤其适合于嵌入式系统。这个压缩包中包含的“5,RT Thread操作系统教程及资料(第三方合作伙伴)”很可能是由RT Thread社区或者合作伙伴提供的学习...

    rt-thread-3.1.3_rtthread_RT-Thread_nano_rtthreadopenocd_RT-Threa

    RT-Thread是一个开源的实时操作系统(RTOS),专为嵌入式系统设计,具有高可靠性、低功耗和轻量级的特点。"rt-thread-3.1.3" 是该操作系统的特定版本,代表着该版本在功能和性能上的优化与改进。 RT-Thread Nano是...

    C#多线程之Thread中Thread.IsAlive属性用法分析

    `Thread`类是.NET框架提供的用于创建和管理线程的核心类。其中,`Thread.IsAlive`属性是一个非常重要的成员,它允许开发者检查线程的当前状态,以确定线程是否已经启动并且仍在运行。在理解`Thread.IsAlive`属性的...

    RT-Thread中文文档

    RT-Thread是一款由中国开源社区贡献的物联网操作系统,它具备极强的可伸缩性。可伸缩性体现在其核心可以在最小的ARMCortex-M0微控制器上运行,到中等规模的ARMCortex-M3/M4/M7系统,以及更高性能的MIPS32和ARMCortex...

    RTThread Stm32 标准库参考工程.zip_RT-Thread STD库_rtos stm32_rtthread_rt

    RTThreadSTM32标准库参考工程是一个专门为基于STM32微控制器的实时操作系统(RTOS)RT-Thread设计的示例项目。RT-Thread是一个开源、轻量级且功能强大的RTOS,广泛应用于物联网(IoT)设备和嵌入式系统。这个压缩包...

    threadx中文资料

    ### ThreadX概述与嵌入式实时系统 #### 一、ThreadX简介 ThreadX是一款由ExpressLogic公司开发的高性能实时操作系统(RTOS),主要用于嵌入式系统开发领域。该操作系统以其出色的稳定性和实时性著称,并且具备较高的...

    ThreadX中文学习手册

    ### ThreadX中文学习手册知识点概览 #### 一、ThreadX概述 - **定义与特点**:ThreadX是由Express Logic公司开发的一款高性能实时操作系统(RTOS)内核,专门针对嵌入式应用设计。与传统的RTOS相比,ThreadX具有更...

    ThreadX-中文文档

    ### ThreadX实时操作系统知识点 #### 一、ThreadX概述 - **定义与特点**:ThreadX是一款由ExpressLogic公司开发的高性能实时操作系统内核。它以其高性能、通用性及强大的可移植性著称,适用于各种嵌入式系统,尤其...

    rt-thread入门教程PPT

    RT-Thread 入门教程 PPT RT-Thread 是一款国产的嵌入式操作系统,诞生于 2006 年,经过十多年的积淀,已经成为一款知名度较高、口碑极佳、高度稳定可靠的实时操作系统。RT-Thread 作为一款中间件平台,具有极强的...

    AT32F435移植THreadX;AT32F437移植THreadX,移植最新版THreadX

    STM32可以有STM32CubeMX轻松移植THreadX,雅特力从AT43F435/437系列开始不能完全兼容STM32,用不了THreadX了。 本移植采用最新(截至2023年3月12号)的AT32F435/437固件库V2.1.2,THreadX版本V6.2.1;依照雅特力官方...

    threadX的VC开发环境

    ThreadX是一款广泛应用于嵌入式系统的实时操作系统(RTOS),它为开发者提供了高效、可靠的多线程环境,以实现复杂的实时任务调度。ThreadX的核心特性包括轻量级内核、优先级继承、时间片轮转、内存管理以及中断处理...

    ThreadX5.1 Win32 Demo

    ThreadX是一款嵌入式实时操作系统(RTOS),由Express Logic公司开发。这个"ThreadX5.1 Win32 Demo"是2009年的最新版本,它提供了在Win32平台上运行ThreadX操作系统的演示环境。了解ThreadX对于深入学习嵌入式系统...

    实时操作系统ThreadX剖析

    实时操作系统(RTOS)ThreadX剖析 实时操作系统ThreadX是一种专门为嵌入式系统设计的操作系统,它能够满足实时性要求较高的任务调度和管理需求。在嵌入式系统的领域中,实时性是系统能否及时响应外部事件并进行处理...

    RT-Thread API参考手册.pdf

    RT-Thread是一款由国内团队开发并维护的嵌入式实时操作系统,拥有完全的自主知识产权。自成立以来,RT-Thread经过长时间的发展,已经成为了一个功能强大、组件丰富的物联网操作系统,尤其适应于资源受限的微控制器...

    RT-Thread常见问题

    RT-Thread常见问题解答 本文将对RT-Thread操作系统开发中常见的问题进行解答,涵盖了线程控制、日志输出、线程删除、静态和动态线程定义等方面。 一、线程控制问题 1. 创建线程后,使用list_thread()显示线程的...

    RT-THREAD 编程指南 中文手册

    RT-Thread是一个流行的开源实时操作系统,广泛应用于嵌入式系统和物联网领域。它具有高度模块化、可裁剪的内核设计,适合资源有限的微控制器使用。以下是根据提供的文件信息,详细的RT-Thread编程指南相关知识点: ...

    threadx 在arm9 上的移植指导

    ### ThreadX在ARM9上的移植指导 #### 一、ThreadX简介与工作原理 ThreadX是一款由Express Logic公司开发的实时操作系统(RTOS),以其高效、可靠及轻量级等特点著称,广泛应用于嵌入式系统领域。ThreadX采用抢占式...

    RT-Thread.pdf

    RT-Thread是一款面向嵌入式设备的开源实时微操作系统(RTOS),其具备高度的模块化和可配置性,非常适合资源受限的单片机平台。作为一款轻量级操作系统,RT-Thread提供了完整的实时操作系统所需的功能,包括线程管理...

Global site tag (gtag.js) - Google Analytics