`
zhang_xzhi_xjtu
  • 浏览: 536765 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

[code] 多个线程写入,单线程读出的Stream对

    博客分类:
  • java
阅读更多
今天想做一个System.out的重定向,想要得的结果是有很多个线程写System.out,
利用PipedStread重定向一个JPanel,结果拿到一堆IOException("Write end dead").
看看PipedStream的code,发现里面有些东东值得探讨。

原有的PipedOutputStream和PipedInputStream的工作原理是这样的。
发往PipedOutputStream的byte写到PipedInputStream的一个cache中,基于PipedInputStream
构造一个标准的生产者消费者的锁协议。

查看源代码,有如下问题。

	public void connect(PipedOutputStream src) throws IOException {
		src.connect(this);
	}

	public synchronized void connect(PipedInputStream snk) throws IOException {
		if (snk == null) {
			throw new NullPointerException();
		} else if (sink != null || snk.connected) {
			throw new IOException("Already connected");
		}
		sink = snk;
		snk.in = -1;
		snk.out = 0;
		snk.connected = true;
	}

PipedInputStream的connect方法是没有加锁的,PipedOutputStream的connect方法只对PipedOutputStream加锁,
这样当有两个线程同时调用同一个PipedInputStream的connect方法时是有问题的。改啊。

	public synchronized void connect(MyPipedInputStream snk) throws IOException {
		if (snk == null) {
			throw new NullPointerException();
		}
		if (sink != null) {
			throw new IOException("Already connected");
		}

		synchronized (snk) {
			if (snk.connected) {
				throw new IOException("Already connected");
			}
			sink = snk;
			snk.in = -1;
			snk.out = 0;
			snk.connected = true;
		}
	}


在PipedInputStream里面有Thread readSide和Thread writeSide来分别记录Read thread和write thread.
在每次接收bytes和cache满的时候都要检查readSide,以防止没有reader来读取,这点还是有用的。
但是writeSide的使用导致PipedStream的WriteThread必须在另一个线程读cache的时候为活的。这个也是有一定
意义的,因为pipedStream的doc,已经说明了适用的情况是一个线程写pipe,一个线程读pipe。

但是如果在多个线程写pipe的时候就是一个妨碍了。
看到这里,不是pipedStream不好,是我想要的和pipedStream的spec不吻合。
我想要的是一个可以多个线程写入,单线程读出的Stream对.

/**
 * 多线程写,单线程读的piped stream对.
 */
public class MyOutputStream extends OutputStream {

	private MyInputStream sink;

	private synchronized void connect(MyInputStream snk) throws IOException {
		if (snk == null) {
			throw new NullPointerException();
		}
		if (sink != null) {
			throw new IOException("Already connected");
		}

		synchronized (snk) {
			if (snk.isConnected()) {
				throw new IOException("Already connected");
			}

			snk.connect();
			sink = snk;
		}
	}

	public MyOutputStream(MyInputStream snk) throws IOException {
		connect(snk);
	}

	@Override
	public void write(int b) throws IOException {
		if (sink == null) {
			throw new IOException("Pipe not connected");
		}

		sink.receive(b);
	}

	@Override
	public void write(byte b[], int off, int len) throws IOException {
		if (sink == null) {
			throw new IOException("Pipe not connected");
		}

		sink.receive(b, off, len);
	}

	/**
	 * No need to close this stream, the piped stream pair should closed by read
	 * side.
	 */
	@Override
	public void close() {
	}
}

/**
 * 多线程写,单线程读的piped stream对.
 */
public class MyInputStream extends InputStream {

	private static final int BUFFER_SIZE = 1024;

	private byte buffer[] = new byte[BUFFER_SIZE];;

	private int in = 0;

	private int out = 0;

	private int dataLength = 0;

	private boolean connected = false;

	private boolean isClosed = false;

	public MyInputStream() {
	}

	private void checkState() throws IOException {
		if (!connected) {
			throw new IOException("Pipe not connected.");
		}
		if (isClosed) {
			throw new IOException("Stream is closed.");
		}
	}

	private int readCore() throws IOException {
		int ret = buffer[out] & 0xFF;

		if (out < buffer.length - 1) {
			out++;
		} else {
			out = 0;
		}
		dataLength--;

		return ret;
	}

	private void receiveCore(int b) throws IOException {

		buffer[in] = (byte) (b & 0xFF);
		if (in < buffer.length - 1) {
			in++;
		} else {
			in = 0;
		}
		dataLength++;
	}

	synchronized private void waitSpace() throws IOException {
		while (dataLength == buffer.length) {
			try {
				notifyAll();
				wait();
			} catch (InterruptedException e) {
				throw new InterruptedIOException();
			}
		}
	}

	synchronized private void waitData() throws IOException {
		while (dataLength == 0) {
			try {
				notifyAll();
				wait();
			} catch (InterruptedException e) {
				throw new InterruptedIOException();
			}
		}
	}

	synchronized boolean isConnected() {
		return connected;
	}

	synchronized void connect() {
		connected = true;
	}

	synchronized void receive(int b) throws IOException {
		checkState();

		waitSpace();

		receiveCore(b);

		notifyAll();
	}

	synchronized void receive(byte b[], int off, int len) throws IOException {

		if (b == null) {
			throw new NullPointerException();
		} else if ((off < 0) || (off > b.length) || (len < 0)
				|| ((off + len) > b.length) || ((off + len) < 0)) {
			throw new IndexOutOfBoundsException();
		} else if (len == 0) {
			return;
		}

		checkState();

		waitSpace();

		if (len <= buffer.length - dataLength) {
			for (int i = off; i < off + len; i++) {
				receiveCore(b[i]);
			}
			notifyAll();
		} else {
			int realTransfer = buffer.length - dataLength;
			for (int i = off; i < off + realTransfer; i++) {
				receiveCore(b[i]);
			}
			notifyAll();
			receive(b, off + realTransfer, len - realTransfer);
		}
	}

	@Override
	synchronized public int read() throws IOException {
		checkState();

		waitData();

		int ret = readCore();

		notifyAll();

		return ret;
	}

	@Override
	synchronized public int read(byte b[], int off, int len) throws IOException {

		if (b == null) {
			throw new NullPointerException();
		} else if (off < 0 || len < 0 || len > b.length - off) {
			throw new IndexOutOfBoundsException();
		} else if (len == 0) {
			return 0;
		}

		checkState();

		waitData();

		int realTransfer = Math.min(dataLength, len);

		for (int i = off; i < off + realTransfer; i++) {
			b[i] = (byte) (readCore() & 0xFF);
		}

		notifyAll();

		return realTransfer;
	}

	@Override
	synchronized public int available() throws IOException {
		checkState();
		return dataLength;
	}

	@Override
	synchronized public void close() {
		isClosed = true;
		buffer = null;
	}

	/**
	 * n<=Int.Max
	 */
	@Override
	synchronized public long skip(long n) throws IOException {

		checkState();

		if (n <= 0) {
			return 0;
		}

		int realSkip = (int) Math.min(dataLength, n);

		dataLength = dataLength - realSkip;

		out = (out + realSkip) % buffer.length;

		return realSkip;
	}

}
分享到:
评论
1 楼 ing_for_h 2016-01-02  
 

相关推荐

    linux_code.rar_linux 多线程_linux 线程_多线程编程

    - **信号量(Semaphore)**:信号量提供了一种更为灵活的同步机制,可以控制多个线程对资源的访问数量。 ```c #include sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value); ...

    python 多线程实现多个网址的多次快速访问

    本教程将详细讲解如何使用Python的多线程来实现对多个网址的快速访问,并记录访问结果。 首先,我们需要导入Python的`threading`模块,它是Python标准库中的多线程支持库。`threading.Thread`是创建新线程的类,而`...

    win32多线程程序 EXITCODE

    父进程可以使用`WaitForSingleObject`或`WaitForMultipleObjects`函数来等待一个或多个线程的结束,并通过`GetExitCodeThread`获取线程的`EXITCODE`。这样,父进程可以根据`EXITCODE`的值来判断子线程的执行情况,...

    C#多线程编程实战Code源代码

    C#语言提供了对线程的内置支持,通过System.Threading命名空间,我们可以创建和管理线程。Thread类是C#中表示线程的基本类,通过实例化Thread对象并调用其Start方法可以启动新线程。此外,还有ThreadPool类用于管理...

    C#.NET多线程实例6个(包括多线程基本使用,多线程互斥等全部多线程使用实例),可直接运行

    本资源包含六个C#.NET多线程的实例,涵盖了多线程的基本使用到更高级的概念,如线程互斥。以下是这些实例可能涉及的关键知识点: 1. **线程创建**:C#中创建线程主要有两种方式,一是通过`System.Threading.Thread`...

    C#线程参考手册相关code

    在C#编程中,多线程是一个非常关键的特性,特别是在现代高性能计算和并发操作中。C#提供了丰富的API来支持多线程编程,主要集中在`System.Threading`命名空间下。本篇将深入探讨C#线程相关的知识,并基于描述中的...

    多线程端口扫描 源码 sourcecode

    在端口扫描中,这可能是为了防止多个线程同时尝试写入扫描结果。 3. **线程池**:为避免频繁创建和销毁线程的开销,可以使用线程池。线程池预先创建一定数量的线程,任务来了就分配给空闲线程,提高了效率。在源码...

    多线程并行执行,汇总结果

    多线程并行执行是指在同一个程序中同时运行多个线程,每个线程负责不同的任务,以实现任务的并发执行。这种技术可以充分利用多核处理器的优势,将CPU的计算能力最大化,从而提高程序的运行效率。在Java中,可以通过...

    delphi 的多线程控件

    在Delphi编程环境中,多线程控件是一个强大的工具,它允许开发者在单个应用程序中同时执行多个任务,提升程序的效率和响应性。本文将深入探讨Delphi的多线程控件及其应用。 首先,我们需要理解什么是多线程。在...

    多线程下载的DELPHI源程序

    在IT行业中,多线程技术是一项关键的编程概念,它允许程序同时执行多个任务,显著提高了计算机系统的效率和响应速度。对于大型文件的下载,尤其是网络速度有限的情况下,多线程下载可以将文件分割成若干部分,每个...

    Halcon12+VisualStudio2013实现多线程

    首先,多线程是现代软件开发中的一个重要概念,它允许程序在同一时间执行多个独立的任务。在Halcon中,通过多线程可以并行处理图像,例如,一边进行图像采集,一边进行图像分析,这样可以极大地提高整体处理效率,...

    pin支持多线程的机制

    对多线程应用的模拟和分析工具要么过于强调单线程应用,要么在执行大型多线程程序时速度过慢。另外,当前的系统架构与支持的应用程序之间存在脱节。 为了在动态二进制插桩系统中支持大型多线程应用程序,Pin团队...

    Java多线程调用BlockingDeque跑批量数据的例子

    N个线程对完成的A表数据做最后处理 支持大数据量跑批,就是个例子,本来是公司发送促销邮件用的,欢迎提意见和建议。 运行DispatcherMain可以测试,库结构自己可以根据code随便改成父子表关系的就行

    多线程.zip

    C#多线程编程实战7644OT_Code.zip这部分内容可能是该书的配套源码,开发者可以通过阅读和运行这些代码来加深对书本理论的理解。源码是学习多线程的重要实践工具,它可以帮助我们直观地看到线程如何在程序中工作,...

    Android多线程下载实现方案

    1. **任务分割**:将大文件分成多个小块,每个小块由一个单独的线程负责下载。这样可以充分利用网络带宽,提高下载速度。 2. **并发执行**:多个线程同时工作,下载不同的数据块,有效利用CPU和网络资源。 3. **...

    多线程的VC++高速文件搜索代码

    在提供的压缩包文件名称列表`codefans.net`中,可能包含了一个或多个与多线程文件搜索相关的源代码示例。通过分析和学习这些代码,开发者可以深入理解如何在实际项目中应用上述理论知识。 总的来说,多线程的VC++...

    多线程网络下载

    线程是操作系统分配CPU时间的基本单位,一个进程可以包含多个线程。在单线程下载中,数据是从服务器到客户端的单一流程。而多线程下载则是将大文件分割成若干小块,每个小块由不同的线程单独下载,然后在客户端进行...

    mfc串口异步写入读出

    在MFC(Microsoft Foundation ...在实际项目中,你可能还需要考虑多线程安全、流控、错误恢复等高级特性。在`com_write_read`这个文件中,可能包含了相关的示例代码或详细教程,可以帮助你更好地理解和实现这一功能。

    win32多线程例子(c语言)

    多线程能提高程序的响应速度和效率,特别是在进行I/O操作或者CPU密集型任务时,不同线程可以并行处理,避免了单线程的等待时间。 二、Win32 API与线程 Windows API 提供了一套丰富的函数库,用于创建、管理和同步...

Global site tag (gtag.js) - Google Analytics