`
miman2008
  • 浏览: 38165 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论
阅读更多

java语言已经内置了多线程支持,所有实现Runnable接口的类都可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再重新启动,只能重新生成一个新实例,再启动一个新线程。

Thread类是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法:

Thread t = new Thread();
t.start();

 

start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不做就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。

因此,有两个方法可以实现自己的线程:

方法1:自己的类extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:

 

public class MyThread extends Thread {
public run() {
System.out.println("MyThread.run()");
}
}

 

在合适的地方启动线程:new MyThread().start();

方法2:如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口:

public class MyThread extends OtherClass implements Runnable {
public run() {
System.out.println("MyThread.run()");
}
}

 

为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:

MyThread myt = new MyThread();
Thread t = new Thread(myt);
t.start();

 

 

事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:

 

public void run() {
if (target != null) {
target.run();
}
}

 

线程还有一些Name, ThreadGroup, isDaemon等设置,由于和线程设计模式关联很少,这里就不多说了。

由于同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会造成冲突,因此,线程必须实现共享互斥,使多线程同步。

最简单的同步是将一个方法标记为synchronized,对同一个实例来说,任一时刻只能有一个synchronized方法在执行。当一个方法正在执行某个synchronized方法时,其他线程如果想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。

但是,非synchronized方法不受影响,不管当前有没有执行synchronized方法,非synchronized方法都可以被多个线程同时执行。

此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不同实例的synchronized方法是可以并发的。例如,class A定义了synchronized方法sync(),则不同实例a1.sync()和a2.sync()可以同时由两个线程来执行。

多线程同步的实现最终依赖锁机制。我们可以想象某一共享资源是一间屋子,每个人都是一个线程。当A希望进入房间时,他必须获得门锁,一旦A获得门锁,他进去后就立刻将门锁上,于是B,C,D...就不得不在门外等待,直到A释放锁出来后,B,C,D...中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,可以先到先得,也可以随机挑选),然后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。

Java语言规范内置了对多线程的支持。对于Java程序来说,每一个对象实例都有一把“锁”,一旦某个线程获得了该锁,别的线程如果希望获得该锁,只能等待这个线程释放锁之后。获得锁的方法只有一个,就是synchronized关键字。例如:

public class SharedResource {
private int count = 0;

public int getCount() { return count; }

public synchronized void setCount(int count) { this.count = count; }

}

 

同步方法public synchronized void setCount(int count) { this.count = count; } 事实上相当于:

public void setCount(int count) {
synchronized(this) { // 在此获得this锁
this.count = count;
} // 在此释放this锁
}

 

如果两个以上的线程同时执行,会引发冲突,因此,要更改SharedResource的内部状态,必须先获得SharedResource实例的锁。

退出synchronized块时,线程拥有的锁自动释放,于是,别的线程又可以获取该锁了。

为了提高性能,不一定要锁定this,例如,SharedResource有两个独立变化的变量:

public class SharedResouce {
private int a = 0;
private int b = 0;

public synchronized void setA(int a) { this.a = a; }

public synchronized void setB(int b) { this.b = b; }
}

 

若同步整个方法,则setA()的时候无法setB(),setB()时无法setA()。为了提高性能,可以使用不同对象的锁:

public class SharedResouce {
private int a = 0;
private int b = 0;
private Object sync_a = new Object();
private Object sync_b = new Object();

public void setA(int a) {
synchronized(sync_a) {
this.a = a;
}
}

public synchronized void setB(int b) {
synchronized(sync_b) {
this.b = b;
}
}
}

 

 

通常,多线程之间需要协调工作。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。如果图片还没有下载完,displayThread可以暂停,当downloadThread完成了任务后,再通知displayThread“图片准备完毕,可以显示了”,这时,displayThread继续执行。

以上逻辑简单的说就是:如果条件不满足,则等待。当条件满足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait/notify。等待机制与锁机制是密切关联的。例如:

synchronized(obj) {
while(!condition) {
obj.wait();
}
obj.doSomething();
}

 

 

当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait()。

在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A:

synchronized(obj) {
condition = true;
obj.notify();
}

 

 

需要注意的概念是:

# 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) {...} 代码段内。

# 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj) {...} 代码段内唤醒A。

# 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。

# 如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。

# obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。

# 当B调用obj.notify/notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。

前面讲了wait/notify机制,Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不同点是: sleep并不释放锁,并且sleep的暂停和wait暂停是不一样的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。

但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。

如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在 wait/sleep/join,则线程B会立刻抛出InterruptedException,在catch() {} 中直接return即可安全地结束线程。

需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用 interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到 wait()/sleep()/join()后,就会立刻抛出InterruptedException。

GuardedSuspention模式主要思想是:

当条件不满足时,线程等待,直到条件满足时,等待该条件的线程被唤醒。

我们设计一个客户端线程和一个服务器线程,客户端线程不断发送请求给服务器线程,服务器线程不断处理请求。当请求队列为空时,服务器线程就必须等待,直到客户端发送了请求。

先定义一个请求队列:Queue

package com.crackj2ee.thread;

import java.util.*;

public class Queue {
private List queue = new LinkedList();

public synchronized Request getRequest() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Request)queue.remove(0);
}

public synchronized void putRequest(Request request) {
queue.add(request);
this.notifyAll();
}

}

 

客户端线程在放入了一个request后,就使服务器线程等待条件满足,于是唤醒服务器线程。

客户端线程:ClientThread

package com.crackj2ee.thread;

public class ClientThread extends Thread {
private Queue queue;
private String clientName;

public ClientThread(Queue queue, String clientName) {
this.queue = queue;
this.clientName = clientName;
}

public String toString() {
return "[ClientThread-" + clientName + "]";
}

public void run() {
for(int i=0; i<100; i++) {
Request request = new Request("" + (long)(Math.random()*10000));
System.out.println(this + " send request: " + request);
queue.putRequest(request);
try {
Thread.sleep((long)(Math.random() * 10000 + 1000));
}
catch(InterruptedException ie) {
}
}
System.out.println(this + " shutdown.");
}
}

 

服务器线程:ServerThread

package com.crackj2ee.thread;
public class ServerThread extends Thread {
private boolean stop = false;
private Queue queue;

public ServerThread(Queue queue) {
this.queue = queue;
}

public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}

public void run() {
while(!stop) {
Request request = queue.getRequest();
System.out.println("[ServerThread] handle request: " + request);
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
}
System.out.println("[ServerThread] shutdown.");
}
}

 

Queue.getRequest是一个阻塞方法。这和java标准库的许多IO方法类似。

最后,写一个Main来启动他们:

package com.crackj2ee.thread;

public class Main {

public static void main(String[] args) {
Queue queue = new Queue();
ServerThread server = new ServerThread(queue);
server.start();
ClientThread[] clients = new ClientThread[5];
for(int i=0; i<clients.length; i++) {
clients[i] = new ClientThread(queue, ""+i);
clients[i].start();
}
try {
Thread.sleep(100000);
}
catch(InterruptedException ie) {}
server.shutdown();
}
}

 

我们启动了5个客户端线程和一个服务器线程,运行结果如下:

[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......

可以观察到ServerThread处理来自不同客户端的请求。

思考

Q: 服务器线程的wait条件while(queue.size()==0)能否换成if(queue.size()==0)?

A: 在这个例子中可以,因为服务器线程只有一个。但是,如果服务器线程有多个(例如Web应用程序有多个线程处理并发请求,这非常普遍),就会造成严重问题。

Q: 能否用sleep(1000)代替wait()?

A: 绝对不可以。sleep()不会释放锁,因此sleep期间别的线程根本没有办法调用getRequest()和putRequest(),导致所有相关线程都被阻塞。

Q: (Request)queue.remove(0)可以放到synchronized() {}块外面吗?

A: 不可以。因为while()是测试queue,remove()是使用queue,两者是一个原子操作,不能放在synchronized外面。

总结

多线程设计看似简单,实际上必须非常仔细地考虑各种锁定/同步的条件,稍不小心,就可能出错。并且,当线程较少时,很可能发现不了问题,一旦问题出现又难以调试。

所幸的是,已有一些被验证过的模式可以供我们使用,我们会继续介绍一些常用的多线程设计模式。

前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。

然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。

每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。

Worker Pattern实现了类似线程池的功能。首先定义Task接口:

package com.crackj2ee.thread;
public interface Task {
void execute();
}

 

线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。

具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask:

// CalculateTask.java

package com.crackj2ee.thread;
public class CalculateTask implements Task {
private static int count = 0;
private int num = count;
public CalculateTask() {
count++;
}
public void execute() {
System.out.println("[CalculateTask " + num + "] start...");
try {
Thread.sleep(3000);
}
catch(InterruptedException ie) {}
System.out.println("[CalculateTask " + num + "] done.");
}
}

// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
private static int count = 0;
private int num = count;
public TimerTask() {
count++;
}
public void execute() {
System.out.println("[TimerTask " + num + "] start...");
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
System.out.println("[TimerTask " + num + "] done.");
}
}

 

以上任务均简单的sleep若干秒。

TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:

package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
private List queue = new LinkedList();
public synchronized Task getTask() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Task)queue.remove(0);
}
public synchronized void putTask(Task task) {
queue.add(task);
this.notifyAll();
}
}

 

终于到了真正的WorkerThread,这是真正执行任务的服务器线程:

package com.crackj2ee.thread;
public class WorkerThread extends Thread {
private static int count = 0;
private boolean busy = false;
private boolean stop = false;
private TaskQueue queue;
public WorkerThread(ThreadGroup group, TaskQueue queue) {
super(group, "worker-" + count);
count++;
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public boolean isIdle() {
return !busy;
}
public void run() {
System.out.println(getName() + " start."); 
while(!stop) {
Task task = queue.getTask();
if(task!=null) {
busy = true;
task.execute();
busy = false;
}
}
System.out.println(getName() + " end.");
}
}

 

前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。

最后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数:

 

package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
private List threads = new LinkedList();
private TaskQueue queue;
public ThreadPool(TaskQueue queue) {
super("Thread-Pool");
this.queue = queue;
}
public synchronized void addWorkerThread() {
Thread t = new WorkerThread(this, queue);
threads.add(t);
t.start();
}
public synchronized void removeWorkerThread() {
if(threads.size()>0) {
WorkerThread t = (WorkerThread)threads.remove(0);
t.shutdown();
}
}
public synchronized void currentStatus() {
System.out.println("-----------------------------------------------");
System.out.println("Thread count = " + threads.size());
Iterator it = threads.iterator();
while(it.hasNext()) {
WorkerThread t = (WorkerThread)it.next();
System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
}
System.out.println("-----------------------------------------------");
}
}

 

currentStatus()方法是为了方便调试,打印出所有线程的当前状态。

最后,Main负责完成main()方法:

package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
TaskQueue queue = new TaskQueue();
ThreadPool pool = new ThreadPool(queue);
for(int i=0; i<10; i++) {
queue.putTask(new CalculateTask());
queue.putTask(new TimerTask());
}
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(8000);
pool.currentStatus();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(5000);
pool.currentStatus();
}
private static void doSleep(long ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

 

main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:

worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.

仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,6个线程中的两个状态变成idle,说明处于wait()状态。

思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。

如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,最终会导致服务器内存耗尽。因此,可以限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE

多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。简单说,就是要避免“写-写”冲突和“读-写”冲突。但是同时读是允许的,因为“读-读”不冲突,而且很安全。

要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明:

DataHandler对象保存了一个可读写的char[]数组:

 

package com.crackj2ee.thread;

public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();

private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}

private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}

private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

 

doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合通常的情况。

为了让多线程能安全读写,我们设计了一个ReadWriteLock:

package com.crackj2ee.thread;
public class ReadWriteLock {
private int readingThreads = 0;
private int writingThreads = 0;
private int waitingThreads = 0; // waiting for write
private boolean preferWrite = true;

public synchronized void readLock() throws InterruptedException {
while(writingThreads>0 || (preferWrite && waitingThreads>0))
this.wait();
readingThreads++;
}

public synchronized void readUnlock() {
readingThreads--;
preferWrite = true;
notifyAll();
}

public synchronized void writeLock() throws InterruptedException {
waitingThreads++;
try {
while(readingThreads>0 || writingThreads>0)
this.wait();
}
finally {
waitingThreads--;
}
writingThreads++;
}

public synchronized void writeUnlock() {
writingThreads--;
preferWrite = false;
notifyAll();
}
}

 

readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();
// lock:
private ReadWriteLock lock = new ReadWriteLock();

public char[] read(String name) throws InterruptedException {
System.out.println(name + " waiting for read...");
lock.readLock();
try {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
finally {
lock.readUnlock();
}
}

public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " waiting for write...");
lock.writeLock();
try {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}
finally {
lock.writeUnlock();
}
}

private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

 

public方法read()和write()完全封装了底层的ReadWriteLock,因此,多线程可以安全地调用这两个方法:

// ReadingThread不断读取数据:

package com.crackj2ee.thread;
public class ReadingThread extends Thread {
private DataHandler handler;
public ReadingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
for(;;) {
try {
char[] data = handler.read(getName());
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
}

 

// WritingThread不断写入数据,每次写入的都是10个相同的字符:

package com.crackj2ee.thread;
public class WritingThread extends Thread {
private DataHandler handler;
public WritingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
char[] data = new char[10];
for(;;) {
try {
fill(data);
handler.write(getName(), data);
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
// 产生一个A-Z随机字符,填入char[10]:
private void fill(char[] data) {
char c = (char)(Math.random()*26+'A');
for(int i=0; i<data.length; i++)
data[i] = c;
}
}

 

最后Main负责启动这些线程:

package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
DataHandler handler = new DataHandler();
Thread[] ts = new Thread[] {
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new WritingThread(handler),
new WritingThread(handler)
};
for(int i=0; i<ts.length; i++) {
ts[i].start();
}
}
}

 

 

我们启动了5个读线程和2个写线程,运行结果如下:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read...
Thread-4 waiting for read...
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read...
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read...
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-5 waiting for write...
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write...
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read...
Thread-3 reads data: AAAAAAAAAA
......

可以看到,每次读/写都是完整的原子操作,因为我们每次写入的都是10个相同字符。并且,每次读出的都是最近一次写入的内容。

如果去掉ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {

// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();

public char[] read(String name) throws InterruptedException {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}

private char[] doRead() {
char[] ret = new char[10];
for(int i=0; i<10; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
for(int i=0; i<10; i++) {
buffer[i] = data[i];
sleep(10);
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

 

运行结果如下:

Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE

可以看到在Thread-6写入EEEEEEEEEE的过程中,3个线程读取的内容是不同的。

思考

java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现自己的逻辑锁,就必须仔细设计ReadWriteLock。

Q: lock.readLock()为什么不放入try{ } 内?
A: 因为readLock()会抛出InterruptedException,导致readingThreads++不执行,而readUnlock()在 finally{ } 中,导致readingThreads--执行,从而使readingThread状态出错。writeLock()也是类似的。

Q: preferWrite有用吗?
A: 如果去掉preferWrite,线程安全不受影响。但是,如果读取线程很多,上一个线程还没有读取完,下一个线程又开始读了,就导致写入线程长时间无法获得writeLock;如果写入线程等待的很多,一个接一个写,也会导致读取线程长时间无法获得readLock。preferWrite的作用是让读 /写交替执行,避免由于读线程繁忙导致写无法进行和由于写线程繁忙导致读无法进行。

Q: notifyAll()换成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一个线程刚读取完毕,此时preferWrite=true,再notify(),若恰好唤醒的是一个读线程,则while(writingThreads>0 || (preferWrite && waitingThreads>0))可能为true导致该读线程继续等待,而等待写入的线程也处于wait()中,结果所有线程都处于wait ()状态,谁也无法唤醒谁。因此,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read...
Thread-1 waiting for read...
Thread-3 waiting for read...
Thread-0 waiting for read...
Thread-4 waiting for read...
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read...
(运行到此不动了)

注意到这种死锁是由于所有线程都在等待别的线程唤醒自己,结果都无法醒过来。这和两个线程希望获得对方已有的锁造成死锁不同。因此多线程设计的难度远远高于单线程应用。

http://todjiang.iteye.com/blog/67338感谢原博主,非常精辟!

分享到:
评论
1 楼 yzmduncan 2010-10-22  
没看懂贴个毛啊

相关推荐

    RT_Thread学习资源

    RT_Thread学习资源包含了一系列关于RT_thread实时操作系统的深入学习材料,是个人在探索与实践RT_thread过程中积累的宝贵笔记。RT_thread是一个强大的、开源的、轻量级的实时操作系统,适用于各种嵌入式设备,从微...

    RT-thread 学习资料大全

    本压缩包中的学习资料大全涵盖了RT-thread操作系统的各个方面,旨在帮助开发者从入门到精通,掌握其核心特性和应用。 1. **RT-thread 操作系统用户手册**: 用户手册是了解RT-thread操作系统功能和使用方法的重要...

    RTThread学习资料

    本套“RTThread学习资料”是为开发者提供的一系列关于RTThread的学习资源,旨在帮助初学者快速掌握这一操作系统的核心概念和实际应用。 RTThread的主要特点包括: 1. **实时性**:作为一款实时操作系统,RTThread...

    rt_thread学习手册

    RT-Thread是一款由中国人开发的开源实时操作系统(RTOS),其设计目标是追求易用性、可伸缩性和可裁剪性,使其适用于多种应用场景。RT-Thread不仅提供了操作系统的常规功能,还特别注重系统的实时性和可靠性,适合...

    Android Thread学习笔记

    ### Android Thread 学习笔记详解 #### Android单线程模型的核心原则 在深入探讨Android中的线程使用之前,我们首先需要理解其核心的单线程模型原则,这为后续的多线程操作提供了基础框架: 1. **不要阻塞UI线程*...

    RT-Thread学习笔记.xmind

    自己做的笔记

    RT-Thread学习笔记总结.tar.gz

    本学习笔记总结将全面介绍RT-Thread的核心概念、功能特性、开发环境搭建以及实际应用案例,帮助初学者快速入门。 一、RT-Thread核心概念 1. 线程:RT-Thread中的线程相当于操作系统的任务,负责执行特定的功能。...

    VC6.0 下Thread学习

    在编程领域,线程是操作系统中的一个基本概念,它代表了程序执行的单一顺序控制流。在多线程环境中,一个进程可以同时拥有多个线程,每个线程都有...通过学习和实践,你可以更好地掌握线程的运用,提高软件的并发性能。

    STM32+RT Thread学习笔记

    在学习STM32与RT-Thread操作系统(OS)的集成过程中,首先需要掌握如何搭建运行环境,包括必要的软件安装、项目初始化、开发板配置、以及如何使用Scons工具自动生成Keil MDK项目文件。以下是详细的知识点梳理。 1. ...

    RT-Thread学习笔记(1)- 内核移植 - 代码

    本文将深入探讨RT-Thread的学习笔记,主要关注内核移植的相关代码实践。 在内核移植的过程中,我们需要考虑以下几个关键点: 1. **处理器架构适配**:RT-Thread支持多种处理器架构,如ARM Cortex-M、Cortex-A、...

    Rtthread学习笔记(二十)RT-Thread使用Esp8266,连接远端服务器IP端口发送数据

    将RTThread移植到STM32上,添加esp8266,连接wifi,从而实现stm32与服务器通讯。其中STM32做客户端,在华为云服务器上开的网络调试助手(具有固定IP端口)做服务器,esp8266的作用是将串口数据透传到网络上,是...

    ThreadX中文学习手册

    ### ThreadX中文学习手册知识点概览 #### 一、ThreadX概述 - **定义与特点**:ThreadX是由Express Logic公司开发的一款高性能实时操作系统(RTOS)内核,专门针对嵌入式应用设计。与传统的RTOS相比,ThreadX具有更...

    一起来学RT-Thread教程连载-更新到第20章

    - **连载教程**:作者jiezhi320分享了一套详细的RT-Thread学习教程,该教程采取连载形式,从内核线程、线程间的通信机制以及各种组件的使用等方面进行介绍。适合初学者逐步掌握RT-Thread的基本操作。 - **硬件平台**...

    RTThread互斥量实验.rar

    这个实验是基于Rt-Thread学习笔记系列的第六部分,旨在帮助开发者理解如何在RTThread中使用互斥量。 互斥量是一种同步机制,用于保护共享资源免受并发访问。在多任务环境下,当多个任务试图同时访问同一资源时,...

    嵌入式操作系统教程教程,包括FreeRTOS,uCOS,RT-Thread,RTX

    Thread教程连载-更新到第20章.pdf》、《RT-Thread编程指南.pdf》以及《正点原子-RT-Thread 环境快速搭建入门教程.pdf》提供了从基础到高级的RT-Thread学习路径,适合新手和进阶者。 **RTX** 是Keil公司开发的实时...

    STM32L496驱动AP3216C光强接近传感器【RT-Thread工程,支持STM32L4系列单片机】.zip

    STM32L496驱动AP3216C光强接近传感器的开发是一个典型的嵌入式系统项目,涉及到的主要技术点包括STM32L496微控制器、RT-Thread实时操作系统、AP3216C传感器以及硬件驱动开发。 STM32L496是意法半导体...

    5,RT Thread操作系统教程及资料_rtthread_RT-Thread_

    这个压缩包中包含的“5,RT Thread操作系统教程及资料(第三方合作伙伴)”很可能是由RT Thread社区或者合作伙伴提供的学习资源,旨在帮助开发者更好地理解和使用RT Thread。 1. **RT Thread简介** RT Thread是一...

    用于threadx学习的c语言代码

    threadx是优秀的实时操作系统,本资源是学习threadx的很好例子,在vs2005上运行,根据信号量,信号队列,字节池,互斥量等threadx的各种资源编写了14个不同的程序,这些程序都已经运行通过,可以使用

    ThreadX学习笔记.pdf

    ### ThreadX学习笔记知识点概述 #### 一、ThreadX启动过程详解 ThreadX是一个轻量级的实时操作系统(RTOS),广泛应用于嵌入式系统中。本节将详细解析ThreadX从启动到正常运行的全过程。 ##### 1. 注册中断向量表...

Global site tag (gtag.js) - Google Analytics