论坛首页 入门技术论坛

多个线程写入,单线程读出的Stream对

浏览 1859 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2009-11-06  
今天想做一个System.out的重定向,想要得的结果是有很多个线程写System.out,
利用PipedStread重定向一个JPanel,结果拿到一堆IOException("Write end dead").
看看PipedStream的code,发现里面有些东东值得探讨。

原有的PipedOutputStream和PipedInputStream的工作原理是这样的。
发往PipedOutputStream的byte写到PipedInputStream的一个cache中,基于PipedInputStream
构造一个标准的生产者消费者的锁协议。

查看源代码,有如下问题。

	public void connect(PipedOutputStream src) throws IOException {
		src.connect(this);
	}

	public synchronized void connect(PipedInputStream snk) throws IOException {
		if (snk == null) {
			throw new NullPointerException();
		} else if (sink != null || snk.connected) {
			throw new IOException("Already connected");
		}
		sink = snk;
		snk.in = -1;
		snk.out = 0;
		snk.connected = true;
	}

PipedInputStream的connect方法是没有加锁的,PipedOutputStream的connect方法只对PipedOutputStream加锁,
这样当有两个线程同时调用同一个PipedInputStream的connect方法时是有问题的。改啊。

	public synchronized void connect(MyPipedInputStream snk) throws IOException {
		if (snk == null) {
			throw new NullPointerException();
		}
		if (sink != null) {
			throw new IOException("Already connected");
		}

		synchronized (snk) {
			if (snk.connected) {
				throw new IOException("Already connected");
			}
			sink = snk;
			snk.in = -1;
			snk.out = 0;
			snk.connected = true;
		}
	}


在PipedInputStream里面有Thread readSide和Thread writeSide来分别记录Read thread和write thread.
在每次接收bytes和cache满的时候都要检查readSide,以防止没有reader来读取,这点还是有用的。
但是writeSide的使用导致PipedStream的WriteThread必须在另一个线程读cache的时候为活的。这个也是有一定
意义的,因为pipedStream的doc,已经说明了适用的情况是一个线程写pipe,一个线程读pipe。

但是如果在多个线程写pipe的时候就是一个妨碍了。
看到这里,不是pipedStream不好,是我想要的和pipedStream的spec不吻合。
我想要的是一个可以多个线程写入,单线程读出的Stream对.

/**
 * 多线程写,单线程读的piped stream对.
 */
public class MyOutputStream extends OutputStream {

	private MyInputStream sink;

	private synchronized void connect(MyInputStream snk) throws IOException {
		if (snk == null) {
			throw new NullPointerException();
		}
		if (sink != null) {
			throw new IOException("Already connected");
		}

		synchronized (snk) {
			if (snk.isConnected()) {
				throw new IOException("Already connected");
			}

			snk.connect();
			sink = snk;
		}
	}

	public MyOutputStream(MyInputStream snk) throws IOException {
		connect(snk);
	}

	@Override
	public void write(int b) throws IOException {
		if (sink == null) {
			throw new IOException("Pipe not connected");
		}

		sink.receive(b);
	}

	@Override
	public void write(byte b[], int off, int len) throws IOException {
		if (sink == null) {
			throw new IOException("Pipe not connected");
		}

		sink.receive(b, off, len);
	}

	/**
	 * No need to close this stream, the piped stream pair should closed by read
	 * side.
	 */
	@Override
	public void close() {
	}
}

/**
 * 多线程写,单线程读的piped stream对.
 */
public class MyInputStream extends InputStream {

	private static final int BUFFER_SIZE = 1024;

	private byte buffer[] = new byte[BUFFER_SIZE];;

	private int in = 0;

	private int out = 0;

	private int dataLength = 0;

	private boolean connected = false;

	private boolean isClosed = false;

	public MyInputStream() {
	}

	private void checkState() throws IOException {
		if (!connected) {
			throw new IOException("Pipe not connected.");
		}
		if (isClosed) {
			throw new IOException("Stream is closed.");
		}
	}

	private int readCore() throws IOException {
		int ret = buffer[out] & 0xFF;

		if (out < buffer.length - 1) {
			out++;
		} else {
			out = 0;
		}
		dataLength--;

		return ret;
	}

	private void receiveCore(int b) throws IOException {

		buffer[in] = (byte) (b & 0xFF);
		if (in < buffer.length - 1) {
			in++;
		} else {
			in = 0;
		}
		dataLength++;
	}

	synchronized private void waitSpace() throws IOException {
		while (dataLength == buffer.length) {
			try {
				notifyAll();
				wait();
			} catch (InterruptedException e) {
				throw new InterruptedIOException();
			}
		}
	}

	synchronized private void waitData() throws IOException {
		while (dataLength == 0) {
			try {
				notifyAll();
				wait();
			} catch (InterruptedException e) {
				throw new InterruptedIOException();
			}
		}
	}

	synchronized boolean isConnected() {
		return connected;
	}

	synchronized void connect() {
		connected = true;
	}

	synchronized void receive(int b) throws IOException {
		checkState();

		waitSpace();

		receiveCore(b);

		notifyAll();
	}

	synchronized void receive(byte b[], int off, int len) throws IOException {

		if (b == null) {
			throw new NullPointerException();
		} else if ((off < 0) || (off > b.length) || (len < 0)
				|| ((off + len) > b.length) || ((off + len) < 0)) {
			throw new IndexOutOfBoundsException();
		} else if (len == 0) {
			return;
		}

		checkState();

		waitSpace();

		if (len <= buffer.length - dataLength) {
			for (int i = off; i < off + len; i++) {
				receiveCore(b[i]);
			}
			notifyAll();
		} else {
			int realTransfer = buffer.length - dataLength;
			for (int i = off; i < off + realTransfer; i++) {
				receiveCore(b[i]);
			}
			notifyAll();
			receive(b, off + realTransfer, len - realTransfer);
		}
	}

	@Override
	synchronized public int read() throws IOException {
		checkState();

		waitData();

		int ret = readCore();

		notifyAll();

		return ret;
	}

	@Override
	synchronized public int read(byte b[], int off, int len) throws IOException {

		if (b == null) {
			throw new NullPointerException();
		} else if (off < 0 || len < 0 || len > b.length - off) {
			throw new IndexOutOfBoundsException();
		} else if (len == 0) {
			return 0;
		}

		checkState();

		waitData();

		int realTransfer = Math.min(dataLength, len);

		for (int i = off; i < off + realTransfer; i++) {
			b[i] = (byte) (readCore() & 0xFF);
		}

		notifyAll();

		return realTransfer;
	}

	@Override
	synchronized public int available() throws IOException {
		checkState();
		return dataLength;
	}

	@Override
	synchronized public void close() {
		isClosed = true;
		buffer = null;
	}

	/**
	 * n<=Int.Max
	 */
	@Override
	synchronized public long skip(long n) throws IOException {

		checkState();

		if (n <= 0) {
			return 0;
		}

		int realSkip = (int) Math.min(dataLength, n);

		dataLength = dataLength - realSkip;

		out = (out + realSkip) % buffer.length;

		return realSkip;
	}

}
论坛首页 入门技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics