`
837062099
  • 浏览: 112717 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

PipedInputStream/PipedOutputStream原理

阅读更多
      PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信.一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道.PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据.这两个类主要用来完成线程之间的通信.一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据.

ps:使用这组I/O流必须在多线程环境下.

        首先简单的介绍一下这两个类的实现原理,PipedInputStream和PipedOutputStream的实现原理类似于"生产者-消费者"原理,PipedOutputStream是生产者,PipedInputStream是消费者,在PipedInputStream中有一个buffer字节数组,默认大小为1024,作为缓冲区,存放"生产者"生产出来的东东.还有两个变量,in,out,in是用来记录"生产者"生产了多少,out是用来记录"消费者"消费了多少,in为-1表示消费完了,in==out表示生产满了.当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费.

        因为生产和消费的方法都是synchronized的,所以肯定是生产者先生产出一定数量的东西,消费者才可以开始消费,所以在生产的时候发现in==out,那一定是满了,同理,在消费的时候发现in==out,那一定是消费完了,因为生产的东西永远要比消费来得早,消费者最多可以消费和生产的数量相等的东西,而不会超出.

        好了,介绍完之后,看看SUN高手是怎么实现这些功能的.由于buffer(存放产品的通道)这个关键变量在PipedInputStream消费者这个类中,所以要想对buffer操作,只能通过PipedInputStream来操作,因此将产品放入通道的操作是在PipedInputStream中.

存放产品的行为:
    protected synchronized void receive(int b) throws IOException {// 这里好像有些问题,因为这个方法是在PipedOutputStream类中调用的,而这个方法是protected的,下面另一个receive方法就不是protected,可能是我的源码有些问题,也请大家帮我看看
        checkStateForReceive();// 检测通道是否连接,准备好接收产品
        writeSide = Thread.currentThread();// 当前线程是生产者
        if (in == out)
            awaitSpace();// 发现通道满了,没地方放东西啦,等吧~~
        if (in < 0) {// in<0,表示通道是空的,将生产和消费的位置都置成第一个位置
            in = 0;
            out = 0;
        }
        buffer[in++] = (byte) (b & 0xFF);
        if (in >= buffer.length) {// 如果生产位置到达了通道的末尾,为了循环利用通道,将in置成0
            in = 0;
        }
    }

    synchronized void receive(byte b[], int off, int len) throws IOException {// 看,这个方法不是protected的!
        checkStateForReceive();
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;// 需要接收多少产品的数量
        while (bytesToTransfer > 0) {
            if (in == out)
                awaitSpace();
            int nextTransferAmount = 0;// 本次实际可以接收的数量
            if (out < in) {
                nextTransferAmount = buffer.length - in;// 如果消费的当前位置<生产的当前位置,则还可以再生产buffer.length-in这么多
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;// 如果已经消费完,则将in,out置成0,从头开始接收
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;// 如果消费的当前位置>生产的当前位置,而且还没消费完,那么至少还可以再生产out-in这么多,注意,这种情况是因为通道被重复利用而产生的!
                }
            }
            if (nextTransferAmount > bytesToTransfer)// 如果本次实际可以接收的数量要大于当前传过来的数量,
                nextTransferAmount = bytesToTransfer;// 那么本次实际只能接收当前传过来的这么多了
            assert (nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);// 把本次实际接收的数量放进通道
            bytesToTransfer -= nextTransferAmount;// 算出还剩多少需要放进通道
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {// 到末尾了,该从头开始了
                in = 0;
            }
        }
    }


消费产品的行为:
    public synchronized int read() throws IOException {// 消费单个产品
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = 2;
        while (in < 0) {// in<0,表示通道是空的,等待生产者生产
            if (closedByWriter) {
                /**//* closed by writer, return EOF */
                return -1;// 返回-1表示生产者已经不再生产产品了,closedByWriter为true表示是由生产者将通道关闭的
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /**//* might be a writer waiting */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;// 如果消费到通道的末尾了,从通道头开始继续循环消费
        }
        if (in == out) {
            /**//* now empty */
            in = -1;// 消费的位置和生产的位置重合了,表示消费完了,需要生产者生产,in置为-1
        }
        return ret;
    }

    public synchronized int read(byte b[], int off, int len) throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0)
                || ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        /**//* possibly wait on the first character */
        int c = read();// 利用消费单个产品来检测通道是否连接,并且通道中是否有东西可消费
        if (c < 0) {
            return -1;// 返回-1表示生产者生产完了,消费者也消费完了,消费者可以关闭通道了
        }
        b[off] = (byte) c;
        int rlen = 1;

        // 这里没有采用receive(byte [], int ,
        // int)方法中System.arrayCopy()的方法,其实用System.arrayCopy()的方法也可以实现
        /**//*
         * 这是用System.arrayCopy()实现的方法 int bytesToConsume = len - 1; while
         * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
         * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
         * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
         * buffer.length - out; }
         * 
         * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
         * bytesToConsume; assert (nextConsumeAmount > 0);
         * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
         * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
         * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
         * buffer.length) { out = 0; } if(in == out) { in = -1; } }
         */

        while ((in >= 0) && (--len > 0)) {
            b[off + rlen] = buffer[out++];
            rlen++;
            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /**//* now empty */
                in = -1;// in==out,表示满了,将in置成-1
            }
        }
        return rlen;
    }
分享到:
评论

相关推荐

    Java-Io流,练习

    - 双向流(如PipedInputStream/PipedOutputStream,PipedReader/PipedWriter):可以同时进行读写操作。 四、缓冲流(Buffered Stream) 为了提高IO操作的效率,Java提供了缓冲流,如`BufferedInputStream`和`...

    Java多线程设计模式(带源码)

    6. 管道模式:线程间通过管道(PipedInputStream/PipedOutputStream)进行数据传递,常用于构建生产者-消费者模型。 7. 原子操作模式:使用`java.util.concurrent.atomic`包中的原子类,如`AtomicInteger`、`...

    io流详解,字符流和字节流代码

    - **PipedInputStream/PipedOutputStream**:与PipedReader/PipedWriter类似,但处理字节数据。 6. **文件流**: - **FileInputStream/FileOutputStream**:用于直接读写文件的字节流。 - **FileReader/...

    JAVA期末复习知识点整理

    3. 流的连接:管道流(PipedInputStream/PipedOutputStream),缓冲流(BufferedReader/BufferedWriter)。 4. 转换流:InputStreamReader/OutputStreamWriter用于在字节流和字符流之间转换。 六、多线程 1. 线程的...

    2021-2022计算机二级等级考试试题及答案No.19438.docx

    22. Java中,管道流(PipedInputStream/PipedOutputStream)用于线程间通信,可以将数据从一个线程传递到另一个线程。 23. `for(i=0; i; i++)` 循环结束后,`i`的值为4,因为在循环结束前进行了最后一次自增操作,...

    2021-2022计算机二级等级考试试题及答案No.1509.docx

    17. Java线程通信:Java中的管道流(PipedInputStream/PipedOutputStream)可用于线程间的数据通信。 18. C++赋值语句:正确的赋值语句是`a[0] = 7`,给数组的第一个元素赋值。 19. 类的继承原则:在面向对象编程...

    Java毕业设计外文翻译

    - **PipedInputStream/PipedOutputStream**:这两个类用于实现管道流,允许线程之间的数据通信。 #### 目录列表与过滤 - **File类的list()方法**:可以列出指定目录下的所有文件和子目录。 - **FilenameFilter接口...

    2021-2022计算机二级等级考试试题及答案No.5114.docx

    20. Java线程通信:管道流(PipedInputStream/PipedOutputStream)用于Java线程间的通信。 21. 数据库系统特性:数据库系统支持抽象的数据模型,如关系模型。 22. Access软件:Access是由Microsoft公司开发的关系...

    Java管道流.pdf

    1. 管道输出流(PipedOutputStream)与管道输入流(PipedInputStream) PipedOutputStream是OutputStream的直接子类,负责向管道中写入字节数据;而PipedInputStream是InputStream的直接子类,负责从管道中读取字节...

    115个Java面试题和答案——终极(下)(1).rar

    - **流的连接与转换**:管道流(PipedInputStream/PipedOutputStream)和缓冲流(BufferedInputStream/BufferedReader)。 8. **NIO(New IO)** - **非阻塞I/O**:与传统的BIO相比,NIO提供了非阻塞的读写方式。...

    IO体系.java

    |--PipedOutputStream/:可以将管道输出流连接到管道输入流来创建通信管道。 | 用方法connect(PipedInputStream snk) 将此管道输出流连接到接收者。 同样使用多线程技术,避免死锁。 |--ByteArrayOutputStream/:...

    管道流说明和例子,例子很详细的

    它由`PipedInputStream`和`PipedOutputStream`组成。其中`PipedOutputStream`用于写入数据,而`PipedInputStream`则用于读取这些数据。 - **工作原理**: - 当一个线程通过`PipedOutputStream`写入数据时,这些数据...

    javaIO学习课件 很详细的讲解

    管道流如PipedInputStream和PipedOutputStream,常用于线程间通信。过滤器流(FilterInputStream, FilterOutputStream等)是基本流的包装,通过添加额外的功能,如数据压缩或加密。FilterReader和FilterWriter也提供...

    java代码-Java 输入输出命令

    10. **管道流(PipedInputStream/PipedOutputStream)** - 用于在两个线程间创建数据管道,实现数据的同步传输。 11. **数据流(DataInputStream/DataOutputStream)** - 用于读写基本数据类型的流,如int、float...

    JAVA程序设计:第14章 Java流式IO编程.ppt

    FileReader、FileWriter、FileInputStream、FileOutputStream、Memory Array、CharArrayReader、CharArrayWriter、ByteArrayInputStream、...、PipedWriter、PipedInputStream、PipedOutputStream等...

    JavaIO底层是如何工作的?Java开发Java经验技巧

    Java IO支持流的链接,如通过PipedInputStream和PipedOutputStream实现线程间的通信。FilterStream类可以用于创建带有额外功能的流,如数据转换、压缩或解压缩。 5. **NIO(New IO)** 自Java 1.4引入NIO以来,...

    2021-2022计算机二级等级考试试题及答案No.9832.docx

    - **结论**:正确答案为管道流(PipedInputStream/PipedOutputStream)。 #### 18. 线性表的顺序存储 - **题目描述**:线性表的顺序存储结构。 - **结论**:正确答案为A,线性表的顺序存储结构中,各元素所占的字节...

    java IO类学习一点通

    - PipedInputStream和PipedOutputStream:用于线程间通信,一个线程写入,另一个线程读取。 9. 序列化流: - ObjectInputStream和ObjectOutputStream:用于处理序列化对象的输入和输出。 10. NIO(New Input/...

    Javaio.zip

    如果包含多线程I/O的示例,那会涉及到PipedInputStream和PipedOutputStream,它们用于在不同的线程之间传递数据。 理解并熟练掌握Java IO是每个Java开发者的基础技能,通过实践这些示例,你可以更深入地了解IO流的...

    IO知识总结

    7. 多线程IO:在多线程环境中,使用PipedInputStream和PipedOutputStream可以实现线程间的通信。 8. NIO(非阻塞IO):Java 1.4引入了NIO,它提供了一种非阻塞的IO模型,通过选择器(Selector)和通道(Channel)...

Global site tag (gtag.js) - Google Analytics