`
fantaxy025025
  • 浏览: 1317528 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类

nodejs Stream使用手册

 
阅读更多

 

nodejs Stream使用手册(me:文是翻译的,en文为:https://github.com/substack/stream-handbook

介绍

本文介绍了使用 node.js streams 开发程序的基本方法。

cc-by-3.0

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964

doug mcilroy


最早接触Stream是从早期的unix开始的 
数十年的实践证明Stream 思想可以很简单的开发出一些庞大的系统。在unix里,Stream是通过 
|实现的;在node中,作为内置的stream模块,很多核心模块和三方模块都使用到。和unix一样, 
node Stream主要的操作也是.pipe(),使用者可以使用反压力机制来控制读和写的平衡。 
Stream 可以为开发者提供可以重复使用统一的接口,通过抽象的Stream接口来控制Stream之间的读写平衡。


为什么使用Stream

node中的I/O是异步的,因此对磁盘和网络的读写需要通过回调函数来读取数据,下面是一个文件下载服务器

的简单代码:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

这些代码可以实现需要的功能,但是服务在发送文件数据之前需要缓存整个文件数据到内存,如果"data.txt"文件很 
大并且并发量很大的话,会浪费很多内存。因为用户需要等到整个文件缓存到内存才能接受的文件数据,这样导致 
用户体验相当不好。不过还好(req, res)两个参数都是Stream,这样我们可以用fs.createReadStream()代替

fs.readFile():

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

.pipe()方法监听fs.createReadStream()'data' 和'end'事件,这样"data.txt"文件就不需要缓存整 
个文件,当客户端连接完成之后马上可以发送一个数据块到客户端。使用.pipe()另一个好处是可以解决当客户 
端延迟非常大时导致的读写不平衡问题。如果想压缩文件再发送,可以使用三方模块实现:

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

这样文件就会对支持gzip和deflate的浏览器进行压缩。oppressor 模块会处理所有的content-encoding

Stream使开发程序变得简单。

基础概念

有五种基本的Stream: readable, writable, transform, duplex, and"classic”.

pipe

所有类型的Stream收是使用 .pipe() 来创建一个输入输出对,接收一个可读流src并将其数据输出到可写流dst,如下:

src.pipe(dst)

.pipe(dst)方法为返回dst流,这样就可以接连使用多个.pipe(),如下:

a.pipe(b).pipe(c).pipe(d)

功能与下面的代码相同:

a.pipe(b);
b.pipe(c);
c.pipe(d);

这样的用法十分类似于unix命令下面用法:

a | b | c | d

readable streams

通过调用Readable streams的 .pipe()方法可以把Readable streams的数据写入一个

Writable , Transform, 或者Duplex stream。

readableStream.pipe(dst)

创建 readable stream

这里我们创建一个readable stream!

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);


$ node read0.js
beep boop

rs.push(null) 通知数据接收者数据已经发送完毕. 
注意到我们在将所有数据内容压入可读流之前并没有调用rs.pipe(process.stdout);,但是我们压入的所有数据 
内容还是完全的输出了,这是因为可读流在接收者没有读取数据之前,会缓存所有压入的数据。但是在很多情况下, 
更好的方法是只有数据接收着请求数据的时候,才压入数据到可读流而不是缓存整个数据。下面我们重写 一下

._read()函数:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);
``````
$ node read1.js
abcdefghijklmnopqrstuvwxyz

上面的代码通过重写_read()方法实现了只有在数据接受者请求数据才向可读流中压入数据。_read()方法 
也可以接收一个size参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。注意我 
们也可以用util.inherits()继承可读流。为了说明只有在数据接受者请求数据时_read()方法才被调用,我们 
在向可读流压入数据时做一个延时,如下:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

用下面的命令运行程序我们发现_read()方法只调用了5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

使用计时器的原因是系统需要时间来发送信号来通知程序关闭管道。使用process.stdout.on('error', fn) 
是为了处理系统因为header命令关闭管道而发送SIGPIPE信号,因为这样会导致process.stdout触发 
EPIPE事件。如果想创建一个的可以压入任意形式数据的可读流,只要在创建流的时候设置参数objectMode 
为true即可,例如:Readable({ objectMode: true })

读取readable stream数据

大部分情况下我们只要简单的使用pipe方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许 
直接从可读流中读取数据更有用。如下所示:

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});



$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相 
关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 
等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

如下运行程序发现,输出结果并不完全!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0) 
可以达到这个目的。

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

这次运行结果如下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面 
的代码,会按行输出标准输入的内容:

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

当然,有很多模块可以实现这个功能,如:split 。

writable streams

writable streams只可以作为.pipe()函数的目的参数。如下代码:

src.pipe(writableStream)

创建 writable stream

重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

$ (echo beep; sleep 1; echo boop) | node write0.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream

Writable({ objectMode: true })

写数据到 writable stream

调用writable stream的.write(data)方法即可完成数据写入。

process.stdout.write('beep boop\n');

调用.end()方法通知writable stream 数据已经写入完成。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

$ node writing1.js 
$ cat message.txt
beep boop

如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark, 
这样如果缓冲区里的数据超过opts.highWaterMark.write(data)方法会返回false。当缓冲区可写的 
时候,writable stream会触发'drain' 事件。

classic streams

Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好 
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。

classic readable streams

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那 
么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 
的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);


$ node classic0.js
ABCDEFGHIJ

如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});


$ (echo beep; sleep 1; echo boop) | node classic1.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 
延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内 
存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听"data" 
"end"事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听

"data" 和"end" 事件,如下面的代码:

var through = require('through');
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log('__END__');
}

$ (echo beep; sleep 1; echo boop) | node through.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

或者也可以使用concat-stream来缓存整个流的内容:

var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));


$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }

当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的 
时候使用.pause()方法暂停Classic readable streams继续触发"data" 事件。等到 
写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取 
数据。

classic writable streams

Classic writable streams 非常简单。只有 .write(buf).end(buf).destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

transform

transform是一个对读入数据过滤然输出的流。

duplex

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:

a.pipe(b).pipe(a)

read more

  • core stream documentation
  • You can use the readable-stream 
    module to make your streams2 code compliant with node 0.8 and below. Just 
    require('readable-stream') instead of require('stream') after you 
    npm install readable-stream.

 

+

+

+

-

-

=

=

 

分享到:
评论

相关推荐

    Nodejs Stream 数据流使用手册

    本文介绍了使用 node.js streams 开发程序的基本方法。 &lt;code class=hljs&gt;We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage ...

    nodejs-stream-手册.pdf

    《Node.js Stream 手册》是一本专注于Node.js流模块的详细指南,旨在帮助开发者深入理解和有效利用Node.js中的流机制。流是Node.js中一个关键特性,它允许数据以高效、低内存占用的方式进行处理,尤其适用于处理大量...

    nodejs api离线帮助手册

    手册详细列出了Node.js的内置模块,如`fs`模块用于文件系统操作,`http`模块用于创建HTTP服务器和客户端,`path`模块提供路径处理功能,`util`模块包含各种实用工具函数,还有`crypto`模块用于加密和哈希操作,`...

    NodeJS学习手册

    6. **高级特性**:NodeJS还包含许多高级特性,如进程和线程管理(process模块)、性能计时器(perf_hooks模块)、流(stream模块)以及套接字编程(net模块)。这些特性使NodeJS能够处理更复杂的任务,如集群部署、...

    NodeJS整理手册文档

    NodeJS是一种基于Chrome V8引擎的JavaScript运行环境,它允许开发者在服务器端使用JavaScript进行编程,从而打破了JavaScript只能在浏览器中运行的传统。本"NodeJS整理手册文档"旨在为对NodeJS感兴趣的朋友们提供一...

    nodejs中文手册

    这份手册涵盖了Node.js的核心API、模块系统、事件驱动编程模型以及第三方组件的使用,是Node.js初学者理想的入门参考资料。 1. **核心API**:Node.js的核心API包括文件系统操作(fs模块),网络通信(http和https...

    NodeJs图书8本

    《深入浅出node.js》和《nodejs手册中文版》会介绍流的概念和用法。 8. **包管理器npm**:npm是Node.js的包管理器,它允许开发者分享和复用代码。《NodeJS中文文档》会详细讲解npm的使用,包括安装依赖、发布包等。...

    Node.js 中文手册.chm

    6. **流(Stream)**:Node.js 中的流接口允许数据以高效的方式进行处理,特别是在处理大文件或网络传输时,可以逐块读取或写入,而不是一次性加载整个数据。 7. **HTTP server**:Node.js 内置了 HTTP 服务器模块,...

    nodejs中文帮助文档

    Node.js是一种基于Chrome V8引擎的JavaScript运行环境,它允许开发者使用JavaScript编写服务器端应用程序。Node.js采用事件驱动、非阻塞I/O模型,使其轻量又高效,非常适合处理大量并发操作。 在nodejs中文帮助文档...

    Node.js API 0.8.18离线手册

    Node.js API 0.8.18离线手册是一份详尽的文档,涵盖了Node.js版本0.8.18的各个核心模块、API接口以及使用方法。Node.js是一款基于Chrome V8引擎的JavaScript运行环境,它使得开发者能够在服务器端使用JavaScript进行...

    Node.js v0.10.18-v0.10.26 手册 & 文档

    综上所述,Node.js v0.10.18-v0.10.26的手册和文档包含了这些版本中的所有更新和改进,是学习和开发Node.js应用程序的重要参考资料。对于开发者来说,了解这些版本的特性和变化,有助于更好地利用Node.js的潜力,...

    详解NODEJS基于FFMPEG视频推流测试

    如果你对FFmpeg命令行使用手册很熟悉,可以不使用这个包。 5. **VLC播放软件**:VLC是一个开源的媒体播放器,可以用来监控推流、转码和播放是否正常。 核心实现代码如下: ```javascript const ffmpegPath = "./...

    Node.js 中文手册

    Stream的使用可以减少内存占用,提高性能,尤其在处理大文件或网络传输时。 **8. NPM (Node Package Manager)** NPM是Node.js的包管理器,它提供了一个庞大的第三方模块库,可以极大地扩展Node.js的功能。开发者...

    node.pdf学习文档

    首先,我们有两份PDF文档——"nodeJS.pdf"和"nodejs教程.pdf"。这可能是两份不同的教程,分别从不同角度深入讲解Node.js。14页的文档可能侧重于快速入门,覆盖基础概念,如安装、环境配置、基本API使用等。50页的...

    node中文api文档.pdf

    29. **stream(流)模块**:提供了一套抽象接口,用于处理流式数据。 30. **string_decoder(字符串解码器)模块**:用于解码Buffer对象中的数据。 31. **timer(定时器)模块**:提供了一种设置延迟和定期执行...

    M101JS:MongoDB 大学的 NodeJS 课程 MongoDB

    - 数据流处理:Node.js的Stream API可以与MongoDB的Cursor配合,处理大量数据。 - Web开发框架:Express等框架与MongoDB结合,构建RESTful API服务。 5. **课程实践项目** - 构建简单的CRUD应用:通过实际操作,...

Global site tag (gtag.js) - Google Analytics