nodejs Stream使用手册(me:文是翻译的,en文为:https://github.com/substack/stream-handbook)
介绍
本文介绍了使用 node.js streams 开发程序的基本方法。
"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."
Doug McIlroy. October 11, 1964
最早接触Stream是从早期的unix开始的
数十年的实践证明Stream 思想可以很简单的开发出一些庞大的系统。在unix里,Stream是通过 |
实现的;在node中,作为内置的stream模块,很多核心模块和三方模块都使用到。和unix一样,
node Stream主要的操作也是.pipe()
,使用者可以使用反压力机制来控制读和写的平衡。
Stream 可以为开发者提供可以重复使用统一的接口,通过抽象的Stream接口来控制Stream之间的读写平衡。
为什么使用Stream
node中的I/O是异步的,因此对磁盘和网络的读写需要通过回调函数来读取数据,下面是一个文件下载服务器
的简单代码:
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);
这些代码可以实现需要的功能,但是服务在发送文件数据之前需要缓存整个文件数据到内存,如果"data.txt"文件很
大并且并发量很大的话,会浪费很多内存。因为用户需要等到整个文件缓存到内存才能接受的文件数据,这样导致
用户体验相当不好。不过还好(req, res)
两个参数都是Stream,这样我们可以用fs.createReadStream()
代替
fs.readFile()
:
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);
.pipe()
方法监听fs.createReadStream()
的'data'
和'end'
事件,这样"data.txt"文件就不需要缓存整
个文件,当客户端连接完成之后马上可以发送一个数据块到客户端。使用.pipe()
另一个好处是可以解决当客户
端延迟非常大时导致的读写不平衡问题。如果想压缩文件再发送,可以使用三方模块实现:
var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);
这样文件就会对支持gzip和deflate的浏览器进行压缩。oppressor 模块会处理所有的content-encoding
。
Stream使开发程序变得简单。
基础概念
有五种基本的Stream: readable, writable, transform, duplex, and"classic”.
pipe
所有类型的Stream收是使用 .pipe()
来创建一个输入输出对,接收一个可读流src
并将其数据输出到可写流dst
,如下:
src.pipe(dst)
.pipe(dst)
方法为返回dst
流,这样就可以接连使用多个.pipe()
,如下:
a.pipe(b).pipe(c).pipe(d)
功能与下面的代码相同:
a.pipe(b);
b.pipe(c);
c.pipe(d);
这样的用法十分类似于unix命令下面用法:
a | b | c | d
readable streams
通过调用Readable streams的 .pipe()
方法可以把Readable streams的数据写入一个
Writable , Transform, 或者Duplex stream。
readableStream.pipe(dst)
创建 readable stream
这里我们创建一个readable stream!
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop
rs.push(null)
通知数据接收者数据已经发送完毕.
注意到我们在将所有数据内容压入可读流之前并没有调用rs.pipe(process.stdout);
,但是我们压入的所有数据
内容还是完全的输出了,这是因为可读流在接收者没有读取数据之前,会缓存所有压入的数据。但是在很多情况下,
更好的方法是只有数据接收着请求数据的时候,才压入数据到可读流而不是缓存整个数据。下面我们重写 一下
._read()
函数:
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
``````
$ node read1.js
abcdefghijklmnopqrstuvwxyz
上面的代码通过重写_read()
方法实现了只有在数据接受者请求数据才向可读流中压入数据。_read()
方法
也可以接收一个size
参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。注意我
们也可以用util.inherits()
继承可读流。为了说明只有在数据接受者请求数据时_read()
方法才被调用,我们
在向可读流压入数据时做一个延时,如下:
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);
用下面的命令运行程序我们发现_read()
方法只调用了5次:
$ node read2.js | head -c5
abcde
_read() called 5 times
使用计时器的原因是系统需要时间来发送信号来通知程序关闭管道。使用process.stdout.on('error', fn)
是为了处理系统因为header
命令关闭管道而发送SIGPIPE信号,因为这样会导致process.stdout
触发
EPIPE事件。如果想创建一个的可以压入任意形式数据的可读流,只要在创建流的时候设置参数objectMode
为true即可,例如:Readable({ objectMode: true })
。
读取readable stream数据
大部分情况下我们只要简单的使用pipe
方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许
直接从可读流中读取数据更有用。如下所示:
process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null
当可读流中有数据可读取时,流会触发'readable'
事件,这样就可以调用.read()
方法来读取相
关数据,当可读流中没有数据可读取时,.read()
会返回null
,这样就可以结束.read()
的调用,
等待下一次'readable'
事件的触发。下面是一个使用.read(n)
从标准输入每次读取3个字节的例子:
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});
如下运行程序发现,输出结果并不完全!
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)
可以达到这个目的。
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});
这次运行结果如下:
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>
我们可以使用 .unshift()
将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面
的代码,会按行输出标准输入的内容:
var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'
当然,有很多模块可以实现这个功能,如:split 。
writable streams
writable streams只可以作为.pipe()
函数的目的参数。如下代码:
src.pipe(writableStream)
创建 writable stream
重写 ._write(chunk, enc, next)
方法就可以接受一个readable stream的数据。
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
第一个参数chunk
是数据输入者写入的数据。第二个参数end
是数据的编码格式。第三个参数next(err)
通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer
,如果在创建流的时候设置Writable({ decodeStrings: false })
参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream
Writable({ objectMode: true })
写数据到 writable stream
调用writable stream的.write(data)
方法即可完成数据写入。
process.stdout.write('beep boop\n');
调用.end()
方法通知writable stream 数据已经写入完成。
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
$ node writing1.js
$ cat message.txt
beep boop
如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark
,
这样如果缓冲区里的数据超过opts.highWaterMark
,.write(data)
方法会返回false。当缓冲区可写的
时候,writable stream会触发'drain'
事件。
classic streams
Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data"
事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。
classic readable streams
Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那
么其触发 "data"
事件,等到数据读取完毕时,会触发"end"
事件。.pipe()
方法通过检查stream.readable
的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ
如果要从classic readable stream中读取数据,注册"data"
和"end"
两个事件的回调函数即可,代码如下:
process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个
延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内
存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()
方法,这样就不用自己监听"data"
和"end"
事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听
"data"
和"end"
事件,如下面的代码:
var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
或者也可以使用concat-stream来缓存整个流的内容:
var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js
{ beep: 'boop' }
当然如果你非要自己监听"data"
和"end"
事件,那么你可以在写数据的流不可写的
时候使用.pause()
方法暂停Classic readable streams继续触发"data"
事件。等到
写数据的流可写的时候再使用.resume()
方法通知流继续触发"data"
事件继续读取
数据。
classic writable streams
Classic writable streams 非常简单。只有 .write(buf)
, .end(buf)
和.destroy()
三个方法。.end(buf)
方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end()
这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)
方法会返回false,如果流再次可写时,流会触发drain
事件。
transform
transform是一个对读入数据过滤然输出的流。
duplex
duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:
a.pipe(b).pipe(a)
read more
- core stream documentation
- You can use the readable-stream
module to make your streams2 code compliant with node 0.8 and below. Justrequire('readable-stream')
instead ofrequire('stream')
after younpm install readable-stream
.
+
+
+
-
-
=
=
相关推荐
本文介绍了使用 node.js streams 开发程序的基本方法。 <code class=hljs>We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage ...
《Node.js Stream 手册》是一本专注于Node.js流模块的详细指南,旨在帮助开发者深入理解和有效利用Node.js中的流机制。流是Node.js中一个关键特性,它允许数据以高效、低内存占用的方式进行处理,尤其适用于处理大量...
手册详细列出了Node.js的内置模块,如`fs`模块用于文件系统操作,`http`模块用于创建HTTP服务器和客户端,`path`模块提供路径处理功能,`util`模块包含各种实用工具函数,还有`crypto`模块用于加密和哈希操作,`...
6. **高级特性**:NodeJS还包含许多高级特性,如进程和线程管理(process模块)、性能计时器(perf_hooks模块)、流(stream模块)以及套接字编程(net模块)。这些特性使NodeJS能够处理更复杂的任务,如集群部署、...
NodeJS是一种基于Chrome V8引擎的JavaScript运行环境,它允许开发者在服务器端使用JavaScript进行编程,从而打破了JavaScript只能在浏览器中运行的传统。本"NodeJS整理手册文档"旨在为对NodeJS感兴趣的朋友们提供一...
这份手册涵盖了Node.js的核心API、模块系统、事件驱动编程模型以及第三方组件的使用,是Node.js初学者理想的入门参考资料。 1. **核心API**:Node.js的核心API包括文件系统操作(fs模块),网络通信(http和https...
《深入浅出node.js》和《nodejs手册中文版》会介绍流的概念和用法。 8. **包管理器npm**:npm是Node.js的包管理器,它允许开发者分享和复用代码。《NodeJS中文文档》会详细讲解npm的使用,包括安装依赖、发布包等。...
6. **流(Stream)**:Node.js 中的流接口允许数据以高效的方式进行处理,特别是在处理大文件或网络传输时,可以逐块读取或写入,而不是一次性加载整个数据。 7. **HTTP server**:Node.js 内置了 HTTP 服务器模块,...
Node.js是一种基于Chrome V8引擎的JavaScript运行环境,它允许开发者使用JavaScript编写服务器端应用程序。Node.js采用事件驱动、非阻塞I/O模型,使其轻量又高效,非常适合处理大量并发操作。 在nodejs中文帮助文档...
Node.js API 0.8.18离线手册是一份详尽的文档,涵盖了Node.js版本0.8.18的各个核心模块、API接口以及使用方法。Node.js是一款基于Chrome V8引擎的JavaScript运行环境,它使得开发者能够在服务器端使用JavaScript进行...
综上所述,Node.js v0.10.18-v0.10.26的手册和文档包含了这些版本中的所有更新和改进,是学习和开发Node.js应用程序的重要参考资料。对于开发者来说,了解这些版本的特性和变化,有助于更好地利用Node.js的潜力,...
如果你对FFmpeg命令行使用手册很熟悉,可以不使用这个包。 5. **VLC播放软件**:VLC是一个开源的媒体播放器,可以用来监控推流、转码和播放是否正常。 核心实现代码如下: ```javascript const ffmpegPath = "./...
Stream的使用可以减少内存占用,提高性能,尤其在处理大文件或网络传输时。 **8. NPM (Node Package Manager)** NPM是Node.js的包管理器,它提供了一个庞大的第三方模块库,可以极大地扩展Node.js的功能。开发者...
首先,我们有两份PDF文档——"nodeJS.pdf"和"nodejs教程.pdf"。这可能是两份不同的教程,分别从不同角度深入讲解Node.js。14页的文档可能侧重于快速入门,覆盖基础概念,如安装、环境配置、基本API使用等。50页的...
29. **stream(流)模块**:提供了一套抽象接口,用于处理流式数据。 30. **string_decoder(字符串解码器)模块**:用于解码Buffer对象中的数据。 31. **timer(定时器)模块**:提供了一种设置延迟和定期执行...
- 数据流处理:Node.js的Stream API可以与MongoDB的Cursor配合,处理大量数据。 - Web开发框架:Express等框架与MongoDB结合,构建RESTful API服务。 5. **课程实践项目** - 构建简单的CRUD应用:通过实际操作,...