Node.js 中的 Stream(流)

Node.js 中的 Stream(流)

本章我们来看 Node.js 当中一个比较重要的概念,那就是 Stream,也就是所谓的流,那么什么是 Stream 呢?

什么是 Stream

Stream 的概念最早来源于 Unix 系统,其可以将一个大型系统拆分成一些小的组件,然后将这些小的组件可以很好地运行,TCP/IP 协议中的 TCP 协议也用到了 Stream 的思想,进而可以进行流量控制、差错控制,在 unix 中通过 | 来表示流,而在 Node.js 中则是通过 pipe() 方法,Stream 可以认为数据就像管道一样,多次不断地被传递下去,而不是一次性全部传递给下游

Node.js 中的流

Node.js API 文档 中可以看到下面一段话

1
2
3
4
5
A stream is an abstract interface implemented by various objects in Node. 

For example a request to an HTTP server is a stream, as is stdout.

Streams are readable, writable, or both. All streams are instances of EventEmitter

简单来说

  • StreamNode.js 中一个非常重要的概念,被大量对象实现,尤其是 Node.js 中的 I/O 操作
  • Stream 是一个抽像的接口,一般不会直接使用,需要实现内部的某些抽象方法(例如 _read_write_transform)
  • StreamEventEmitter 的子类,实际上 Stream 的数据传递内部依然是通过事件(data)来实现的

流的分类

Node.js 中有四种类型的流,它们分别是 readablewriteableDuplextransform

  • readable,可读流,表示数据能够被消费,例如可以通过 fs.createReadStream() 方法创建可读流
  • writeable,可写流,表示数据能被写,例如可以通过 fs.createWriteStream() 方法创建可写流
  • duplex,即表示既是 Readable 流也是 Writable 流,如 TCP Socket
  • transform,它也是 Duplex 流,能够用来修改或转换数据,例如 zlib.createGzip 方法用来使用 gzip 压缩数据(你可以认为 transform 流是一个函数,它的输入是 Writable 流,输出是 Readable 流)
使用情景 需要重写的方法
只读 Readable _read
只写 Writable _write
双工 Duplex _read_write
操作被写入数据,然后读出结果 Transform _transform_flush

此外所有的流都是 EventEmitter 的实例,它们能够监听或触发事件,用于控制读取和写入数据,ReadableWritable 流支持的常见的事件和方法如下图所示

下面我们就一个一个来分类介绍

Readable

可读流(Readable Streams)是对提供数据的源头(source)的抽象,可读流事实上工作在下面两种模式之一 flowingpaused

  • flowing 模式下,可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用
  • paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段

那如何触发这两种模式呢

  • paused mode,调用 pause 方法(没有 pipe 方法)、移除 data 事件和释放所有 pipe
  • flowing mode,注册事件 data、调用 resume 方法、调用 pipe 方法

如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// data 事件触发 flowing mode
Readable.prototype.on = function (ev, fn) {
// ...

if (ev === 'data' && false !== this._readableState.flowing) {
this.resume()
}

// ...
}

// resume 触发 flowing mode
Readable.prototype.resume = function () {
var state = this._readableState
if (!state.flowing) {
debug('resume')
state.flowing = true
resume(this, state)
}
return this
}

// pipe 方法触发 flowing 模式
Readable.prototype.resume = function () {
if (!state.flowing) {
this.resume()
}
}

简单来说,两种模式取决于一个 flowing 字段

1
2
3
true ==> flowing mode

false ==> paused mode

上面三种方式最后均是通过 resume 方法,将 state.flowing = true,可读流的两种操作模式是一种简单抽象,它抽象了在可读流实现(Readable stream implementation)内部发生的复杂的状态管理过程,在任意时刻,任意可读流应确切处于下面三种状态之一

1
2
3
4
5
readable._readableState.flowing = null

readable._readableState.flowing = false

readable._readableState.flowing = true

readable._readableState.flowingnull,由于不存在数据消费者,可读流将不会产生数据,如果监听 data 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法,readable._readableState.flowing 的值将会变为 true,这时,随着数据生成,可读流开始频繁触发事件

调用 readable.pause() 方法,readable.unpipe() 方法,或者接收背压(关于背压的概念我们会在下面进行介绍),将导致 readable._readableState.flowing 值变为 false,这将暂停事件流,但不会暂停数据生成,当 readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中

需要注意的是,应该选择其中『一种』来消费数据,而『不应该』在单个流使用多种方法来消费数据,对于大多数用户,建议使用 readable.pipe() 方法来消费流数据,因为它是最简单的一种实现,如果要精细地控制数据传递和产生的过程,可以使用 EventEmitterreadable.pause()/readable.resume() 提供的 API

paused mode

paused mode 下,需要手动地读取数据,并且可以直接指定读取数据的长度

1
2
3
4
5
6
7
8
9
var Read = require('stream').Readable
var r = new Read()

r.push('hello')
r.push('world')
r.push(null)

console.log(r.read(1).toString()) // h
console.log(r.read(3).toString()) // ell

还可以通过监听事件 readable,触发时手工读取 chunk 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
var Read = require('stream').Readable
var r = new Read()

r.push('hello')
r.push('world')
r.push(null)

r.on('readable', function() {
var chunk = r.read()
console.log(chunk.toString())
})

// helloworld

需要注意的是,一旦注册了 readable 事件,必须手工读取 read 数据,否则数据就会流失,看看内部实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
function emitReadable_(stream) {
debug('emit readable')
stream.emit('readable')
flow(stream)
}

function flow(stream) {
var state = stream._readableState
debug('flow', state.flowing)
if (state.flowing) {
do {
var chunk = stream.read()
} while (null !== chunk && state.flowing)
}
}

Readable.prototype.read = function (n) {
// ...

var res = fromList(n, state)

if (!util.isNull(ret)) {
this.emit('data', ret)
}

// ...
}

flow 方法直接 read 数据,将得到的数据通过事件 data 交付出去,然而此处没有注册 data 事件监控,因此,得到的 chunk 数据并没有交付给任何对象,这样数据就白白流失了,所以在触发 emit('readable') 时,需要提前 read 数据

flowing mode

通过注册 datapiperesume 可以自动获取所需要的数据,比如通过事件 data 的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
var Read = require('stream').Readable
var r = new Read()

r.push('hello')
r.push('world')
r.push(null)

r.on('data', function (chunk) {
console.log(chunk.toString())
})

// hello
// world

或者通过 pipe 的方式

1
2
3
4
5
6
7
8
9
10
var Read = require('stream').Readable
var r = new Read()

r.push('hello')
r.push('world')
r.push(null)

r.pipe(process.stdout)

// helloworld

以上两种 mode 总体如下

readable 与 data 事件

read()Readable 流的基石,无论流处于什么模式,只要是涉及读取数据最终都会转到 read() 上面来,它的主要功能是

  • 读取缓冲区数据并返回给消费者,并按需发射各种事件
  • 按需调用 _read()_read() 会从底层汲取数据,并填充缓冲区

它的流程大致如下

务必记住 read() 是『同步』的,因此它并不是直接从底层数据那里读取数据,而是从缓冲区读取数据,而缓冲区的数据则是由 _read() 负责补充

_read() 可以是同步或者异步,Node.js 内部的实现经常会调用 read(0),因为参数是 0 所以不会破坏缓冲区中的数据和状态,但可以触发 _read() 来从底层汲取数据并填充缓冲区,_read() 是流实现者需要重写的函数,它从底层汲取数据并填充缓冲区(flowing 模式不会填充而是直接发送给消费者),它的大致流程如下

注意在 addChunk() 后会根据情况发射 readable 或者 data 事件,然后依次调用

1
read() ==> _read(0) ==> ... ==> addChunk()

从而形成一个循环,因为一旦调用了 _read() 之后,流就会默默在底层读取数据,直到数据都消耗完为止

readable 事件

文档上关于 readable 事件的描述如下

事实上,readable 事件表明流有了新的动态,要么是有了新的数据,要么是到了流的尾部,对于前者 stream.read() 将返回可用的数据,而对于后者 stream.read() 将返回 null

由此我们可以知道 readable 事件意味着

  • 流有了新的数据(注意,这里只说明有了新数据,至于新数据如何读取是调用者自己的事情)
  • 流到达了尾部

来看下面这个示例

1
2
3
4
5
6
7
// 可以将 size 设为 1 或 undefined 来进行测试
const size = 1
const rs = require('fs').createReadStream('./test.js')

rs.on('readable', () => {
console.log(rs.read(size))
})

总之,readable 只是负责通知用户流有了新的动态,事件发生的时候是否读取数据,如何读取数据则是调用者的事情(如果一直不读取事件,则数据会存在于缓冲区中),例如可以给 readable 注册一个回调函数,该回调函数调用无参的 read(),它会读取并清空缓冲区的全部数据,这样就使得每次 readable 发生的时候都可以读取到最新的数据

readable 的触发时机

readable 在以下几种情况会被触发

  • onEofChunk 中,且 _read() 从底层汲取的数据为空,这个场景意味着流中的数据已经全部消耗完
  • addChunk() 中,且 _read() 从底层汲取的数据不为空且处于 pause 模式,这个场景意味着流中有新数据
  • read(n) 中,且 n0 是的某些情况下
  • 通过 on()readable 添加监听器,如果此时缓冲区有数据则会触发,这个场景意味着流中已经有数据可供 read() 直接调用

data 事件

data 事件的意义则明确很多,文档上关于 data 事件的描述如下

The ‘data’ event is emitted whenever the stream is relinquishing ownership of a chunk of data to a consumer.

readable 不同的是,data 事件代表的意义清晰单一,流将数据交付给消费者时触发,并且会将对应的数据通过回调传递给用户

data 的触发时机

从源码来看,有两个地方会触发 data 事件

  • read() 中,如果缓冲区此时有数据可以返回给调用者,这种情况只会在调用 pipe() 时候发生,如果 readable() 被暂停过并重新启动,此时缓冲区内残留的数据会通过 read() 读出然后借助 data 事件传递出去
  • addChunk() 中,此时 _read() 从底层汲取的数据不为空,且满足以下条件
    • 处于 flowing 模式
    • 缓冲区为空
    • 处于异步调用模式

在这种情况下,数据直接就交付给消费者了,并没有在缓冲区缓存,而文档中的说法是

当流转换到 flowing 模式时会触发该事件,调用 readable.pipe()readable.resume() 方法,或为 data 事件添加回调可以将流转换到 flowing 模式, data 事件也会在调用 readable.read() 方法并有数据返回时触发

似乎两者不太一致?其实本质上调用 readable.pipe()readable.resume() 或为 data 事件添加回调,最终都会依次调用

1
read() ==> _read() ==> addChunk()

然后最终才进行发射 data 事件,结合 _read() 的流程图,可以发现,通过 on()readabledata 事件添加监听器后,程序就开始循环汲取底层数据直至消耗完为止

如果同时监听 readable 和 data 事件

如下示例

1
2
3
4
const rs = require('fs').createReadStream('./test.js')

rs.on('readable', () => console.log('readable 触发'))
rs.on('data', console.log)

运行结果如下

1
2
<Buffer 63 6f 6e 73 74 20 72 73 20 3d 20 72 65 71 75 69 72 65 28 27 66 73 27 29 2e 63 72 65 61 74 65 52 65 61 64 53 74 72 65 61 6d 28 27 2e 2f 74 65 73 74 2e ... >
readable 触发

从上面的流程图我们知道,在 addChunk() 中当有新数据到来的时候,redabledata 都有可能触发,那究竟触发哪个?让我们来看看 addChunk() 的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
function addChunk(stream, state, chunk, addToFront) {
// 如果处于 flowing 模式,且缓冲区为空,且为异步调用时候,触发 data 事件
if (state.flowing && state.length === 0 && !state.sync) {
state.awaitDrain = 0
stream.emit('data', chunk)
} else {
// 更新缓冲区已有数据数量
state.length += state.objectMode ? 1 : chunk.length
if (addToFront)
// 插入缓冲区头部
state.buffer.unshift(chunk)
else
// 插入缓冲区尾部
state.buffer.push(chunk)

if (state.needReadable)
// 触发 readable 事件
emitReadable(stream)
}
maybeReadMore(stream, state)
}

由于为 data 事件添加回调会使得流进入 flowing 模式,因此我们的例子中,有新数据时只会发射 data 事件,而 readable 事件则流结束的时候发射一次

Writable

所有 Writable 流都实现了 stream.Writable 类定义的接口,尽管特定的 Writable 流的实现可能略有差别,所有的 Writable streams 都可以按一种基本模式进行使用,如下

1
2
3
4
5
const myStream = getWritableStreamSomehow()

myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

本质上 只是需要实现的是 _write(data, enc, next) 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const Writable = require('stream').Writable

const writable = Writable()

// 实现 _write 方法
// 这是将数据写入底层的逻辑
writable._write = function (data, enc, next) {
// 将流中的数据写入底层
process.stdout.write(data.toString().toUpperCase())
// 写入完成时,调用 next() 方法通知流传入下一个数据
process.nextTick(next)
}

// 所有数据均已写入底层
writable.on('finish', () => process.stdout.write('DONE'))

// 将一个数据写入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')

// 再无数据写入流时,需要调用 end() 方法
writable.end()

上游通过调用 writable.write(data) 将数据写入可写流中,write() 方法会调用 _write()data 写入底层,在 _write 方法中,当数据成功写入底层后,必须调用 next([err]) 告诉流开始处理下一个数据

next 的调用既可以是同步的,也可以是异步的,上游必须调用 writable.end(data) 来结束可写流,data 是可选的,此后,不能再调用 write 新增数据,在 end 方法调用后,当所有底层的写操作均完成时,会触发 finish 事件

Readable Stream 与 Writeable Stream

二者的关系

  • Readable Stream 是提供数据的 Stream,外部来源的数据均会存储到内部的 Buffer 数组内缓存起来
  • writeable Stream 是消费数据的 Stream,从 readable stream 中获取数据,然后对得到的 chunk 块数据进行处理,至于如何处理,就依赖于具体实现(也就是 _write 的实现)

首先看看 Readdable Streamwriteable stream 二者之间的流动关系

pipe 的流程

stream 内部是从 readable stream 流到 writeable stream,有两种处理方法

pipe 连接两个 stream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var Read = require('stream').Readable
var Write = require('stream').Writable
var r = new Read()
var w = new Write()

r.push('hello')
r.push('world')
r.push(null)

w._write = function(chunk, ev, cb) {
console.log(chunk.toString())
cb()
}

r.pipe(w)

// hello
// world

pipe 是一种最简单直接的方法连接两个 stream,内部实现了数据传递的整个过程,在开发的时候不需要关注内部数据的流动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Readable.prototype.pipe = function (dest, pipeOpts) {
var src = this

// ...

src.on('data', ondata)

function ondata(chunk) {
var ret = dest.write(chunk)
if (false === ret) {
debug('false write response, pause',
src._readableState.awaitDrain)
src._readableState.awaitDrain++
src.pause()
}
}

// ...

}

附一张 pipe() 的流程图

事件 data + 事件 drain 联合实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var Read = require('stream').Readable
var Write = require('stream').Writable
var r = new Read()
var w = new Write()

r.push('hello')
r.push('world')
r.push(null)

w._write = function(chunk, ev, cb) {
console.log(chunk.toString())
cb()
}

r.on('data', function(chunk) {
if (!w.write(chunk)) {
r.pause()
}
})

w.on('drain', function() {
r.resume()
})

// hello
// world

Duplex

Duplex 实际上就是继承了 ReadableWritable 的一类流,所以,一个 Duplex 对象既可当成可读流来使用(需要实现 _read 方法),也可当成可写流来使用(需要实现 _write 方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var Duplex = require('stream').Duplex

var duplex = Duplex()

// 可读端底层读取逻辑
duplex._read = function () {
this._readNum = this._readNum || 0
if (this._readNum > 1) {
this.push(null)
} else {
this.push(' ' + (this._readNum++))
}
}

// 可写端底层写逻辑
duplex._write = function (buf, enc, next) {
// a, b
process.stdout.write('_write ' + buf.toString() + '\n')
next()
}

// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))

duplex.write('a')
duplex.write('b')

duplex.end()

上面的代码中实现了 _read 方法,所以可以监听 data 事件来消耗 Duplex 产生的数据,同时,又实现了 _write 方法,可作为下游去消耗数据,因为它既可读又可写,所以它有两端,可写端和可读端,可写端的接口与 Writable 一致,作为下游来使用,可读端的接口与 Readable 一致,作为上游来使用,下面是另外一个示例,读取从 AZ 的字母

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const { Duplex } = require('stream')

const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString())
callback()
},

read() {
this.push(String.fromCharCode(this.currentCharCode++))
if (this.currentCharCode > 90) {
this.push(null)
}
}
})

inoutStream.currentCharCode = 65

process.stdin.pipe(inoutStream).pipe(process.stdout)

inoutStream.end()

我们将可读的 stdin 流传输到 duplex stream 当中以使用 callback(),然后将 duplex stream 本身传输到可写的 stdout 流以查看我们的输出结果

Transform

Tranform 继承自 Duplex,并已经实现了 _read_write 方法,我们只需要实现将两者结合起来的 transform 方法即可,它具有 write 方法,我们可以使用它来推送数据

下面是一个简单的 transform stream 示例,它会将你的输入结果转换为大写

1
2
3
4
5
6
7
8
9
10
const { Transform } = require('stream')

const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase())
callback()
}
})

process.stdin.pipe(upperCaseTr).pipe(process.stdout)

内置的 transform stream

Node.js 有一些内置的 transform stream,比如 zlibcrypto,下面是一个使用 zlib.createGzip() 方法结合 fsreadable/writable 流实现的一个文件压缩示例

1
2
3
4
5
6
7
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'))

上述示例可以将传递进来的文件进行 gzip 压缩,下面我们来稍微扩展一下,比如我们希望用户在运行时可以看到进度结果,并且在完成的时侯看到已经完成的提示

1
2
3
4
5
6
7
8
9
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('.'))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'))

在上面示例当中,我们也可以不使用 on 去监听其中的数据事件,而只需创建一个 transform stream 来追踪进度,然后将 .on() 方法替换为另一个 .pipe() 即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

const { Transform } = require('stream')

const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.')
callback(null, chunk)
}
})

fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'))

注意上面示例当中的 callback() 方法的第二个参数,这样写是为了优先推送数据,在或者,我们需要在 gzip 压缩之前或之后对文件进行加密,和上面的示例一样,我们只需要按照我们想要的顺序添加另外一个 transform stream 即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const crypto = require('crypto')
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

const { Transform } = require('stream')

const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.')
callback(null, chunk)
}
})

fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_secret'))
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'))

当然,为了解压上面我们压缩过的内容,我们只需要以相反的顺序执行 cryptozlib 即可

1
2
3
4
5
6
fs.createReadStream(file)
.pipe(crypto.createDecipher('aes192', 'a_secret'))
.pipe(zlib.createGunzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file.slice(0, -3)))
.on('finish', () => console.log('Done'))

自定义流

自定义流的实现很简单,只要实现相应的内部待实现方法就可以了,具体来说

  • readable stream,实现 _read 方法来解决数据的获取问题
  • writeable stream,实现 _write 方法来解决数据的去向问题
  • tranform stream,实现 _tranform 方法来解决数据存放在 Buffer 前的转换工作

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 自定义 readable stream 的实现
var Stream = require('stream')
var Read = Stream.Readable
var util = require('util')

util.inherits(MyReadStream, Read)

function MyReadStream(data, opt) {
Read.call(this, opt)
this.data = data || []
}

MyReadStream.prototype._read = function () {
var _this = this
this.data.forEach(function (d) {
_this.push(d)
})
this.push(null)
}

var data = ['aa', 'bb', 'cc']
var r = new MyReadStream(data)

r.on('data', function (chunk) {
console.log(chunk.toString())
})

背压

我们在上面介绍可读流(Readable Streams)的时候,提及了一个背压的概念,下面我们就来看看到底什么是背压,本小结内容主要是参考 Backpressuring in Streams 这个文章整合而成

数据流中的积压问题

通常在数据处理的时候我们会遇到一个普遍的问题,那就是背压,意思是在数据传输过程中有一大堆数据在缓存之后积压着,每次当数据到达结尾又遇到复杂的运算,又或者无论什么原因它比预期的慢,这样累积下来,从源头来的数据就会变得很庞大,像一个塞子一样堵塞住

为解决这个问题,必须存在一种适当的代理机制,确保流从一个源流入另外一个的时候是平滑顺畅的,不同的社区组织针对他们各自的问题单独做了解决,好例子比如 Unix 的管道和 TCPSocket,在 Node.js 中,流(stream)已经是被采纳的解决方案

数据太多,速度太快

有太多的例子证明有时 Readable 传输给 Writable 的速度远大于它接受和处理的速度,如果发生了这种情况,消费者开始为后面的消费而将数据列队形式积压起来,写入队列的时间越来越长,也正因为如此,更多的数据不得不保存在内存中知道整个流程全部处理完毕,写入磁盘的速度远比从磁盘读取数据慢得多,因此当我们试图压缩一个文件并写入磁盘时,积压的问题也就出现了,因为写磁盘的速度不能跟上读磁盘的速度

1
2
// 数据将会在读入侧堆积,这样写入侧才能和数据流的读入速度保持同步
inp.pipe(gzip).pipe(outputFile)

这就是为什么说积压机制很重要,如果积压机制不存在,进程将用完你全部的系统内存,从而对其它进程产生显著影响,它独占系统大量资源直到任务完成为止,这最终将会导致一些问题

  • 明显使得其它进程处理变慢
  • 太多繁重的垃圾回收
  • 内存耗尽

pipe 的背压平衡机制

假设现在有一对 ReadableWritable,要求编程实现从 Readable 里面读取数据然后写到 Writable 中,那么面临的问题很有可能就是如果两者对数据的产生/消费速度不一致,那么需要手动协调两者速度使得任务可以完成,思路可能这样

  1. Readable 进入 flowing 模式,然后进入步骤 2
  2. data 事件,一旦有数据到达则进入步骤 2,如果捕捉到 end 事件就结束任务
  3. 数据写入到 Writable,如果返回 true 进入步骤 1,否则进入步骤 3
  4. Readable 进入 pause 模式,并等待 Writable 发射 drain 事件
  5. Writable 发射了 drain 事件,则返回步骤 1

而事实上 pipe() 的过程和上述很相似,它的源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Readable.prototype.pipe = function (dest, pipeOpts) {

// ...

var ondrain = pipeOnDrain(src)
// 当写操作返回 false 的时候,正常情况下必然会在稍后触发一个 drain 事件
dest.on('drain', ondrain)
src.on('data', ondata)
function ondata(chunk) {
var ret = dest.write(chunk)
// 如果写操作的返回值为 false,则暂停 readable 流
if (ret === false) {
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
!cleanedUp) {
state.awaitDrain++
}
src.pause()
}
}

// ...

return dest
}

function pipeOnDrain(src) {
return function () {
var state = src._readableState
if (state.awaitDrain)
state.awaitDrain--
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
// 将流重新设为 flowing 模式
state.flowing = true
// 将缓冲区中残留的数据读取并重新触发 data 事件
flow(src)
}
}
}

通过上面的代码我们可以看到

  • 当向 dest 写入数据返回 false 时,马上调用 src.pause() 暂停流,src.pause() 将暂停事件流,但不会暂停数据生成
  • 也就是说 src 此时依然汲取底层数据填充缓冲区,只是暂停发射 data 事件,等到缓冲区的数据量超过警戒线才会停止汲取
  • 因为写入数据返回 false,因此在稍后的某个时候 dest 必然会发射 drain 事件
  • drain 事件发生后,src 再次进入 flowing 模式自动产生数据,同时将缓冲区中的残留数据写入 dest

.pipe() 的生命周期

为了对积压有一个更好的理解,这里有一副 Readable 流正通过 piped 流入 Writable 流的整个生命周期图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
                                                     +===================+
x--> Piping functions +--> src.pipe(dest) |
x are set up during |===================|
x the .pipe method. | Event callbacks |
+===============+ x |-------------------|
| Your Data | x They exist outside | .on('close', cb) |
+=======+=======+ x the data flow, but | .on('data', cb) |
| x importantly attach | .on('drain', cb) |
| x events, and their | .on('unpipe', cb) |
+---------v---------+ x respective callbacks. | .on('error', cb) |
| Readable Stream +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Writable Stream +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | Is this chunk too big? |
^ | | emit.end() | Is the queue busy? |
| | +-> else +-------+----------------+---+
| ^ | emit.write() | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< No | | Yes |
^ | +------+ +---v---+
^ | |
| ^ emit.pause() +=================+ |
| ^---------------^-----------------------+ return false <-----+---+
| +=================+ |
| |
^ when queue is empty +============+ |
^------------^-----------------------< Buffering | |
| |============| |
+> emit.drain() | ^Buffer^ | |
+> emit.resume() +------------+ |
| ^Buffer^ | |
+------------+ add chunk to queue |
| <---^---------------------<
+============+

注意,如果你创建一些管道准备把一些流串联起来从而操纵数据,你应该实现 Transform 流,在这种情况下,从 Readable 流中的输出进入 Transform,并且会被管道输送进入 Writable

1
Readable.pipe(Transformable).pipe(Writable)

积压将被自动应用,但是同时请注意输入和输出 Transform 的水准值,可以手动控制,并且会影响到积压系统,如果想要了解更多,可以参考 通过源码解析 Node.js 中导流(pipe)的实现 这篇文章

总结

Node.js 中有四种类型的流,它们是 readablewriteableduplextransform

  • readable,可读流,表示数据能够被消费,是对提供数据的源头(source)的抽象,工作在下面两种模式之一
    • flowing 模式下,可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用
      • 通过注册 datapiperesume 可以自动获取所需要的数据,比如通过事件 data 的方式
    • paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段
      • paused mode 下,需要手动地读取数据,并且可以直接指定读取数据的长度
      • 还可以通过监听事件 readable,触发时手工读取 chunk 数据
      • 一旦注册了 readable 事件,必须手工读取 read 数据,否则数据就会流失
    • 如何触发
      • paused mode,调用 pause 方法(没有 pipe 方法)、移除 data 事件和释放所有 pipe
      • flowing mode,注册事件 data、调用 resume 方法、调用 pipe 方法
  • writeable,可写流,表示数据能被写,所有 Writable 流都实现了 stream.Writable 类定义的接口
    • 本质上 只是需要实现的是 _write(data, enc, next) 方法
    • _write 方法中,当数据成功写入底层后,必须调用 next([err]) 告诉流开始处理下一个数据
    • next 的调用既可以是同步的,也可以是异步的
    • 上游必须调用 writable.end(data) 来结束可写流,data 是可选的,此后,不能再调用 write 新增数据
    • end 方法调用后,当所有底层的写操作均完成时,会触发 finish 事件
  • duplex,实际上就是继承了 ReadableWritable 的一类流
    • 一个 duplex 对象既可当成可读流来使用(需要实现 _read 方法),也可当成可写流来使用(需要实现 _write 方法)
  • transform,它也是 duplex 流,能够用来修改或转换数据,例如 zlib.createGzip 方法用来使用 gzip 压缩数据(内置的 transform stream
    • 你可以认为 transform 流是一个函数,它的输入是 Writable 流,输出是 Readable
    • Tranform 继承自 duplex,并已经实现了 _read_write 方法
    • 只需要实现将两者结合起来的 transform 方法即可,它具有 write 方法,我们可以使用它来推送数据

推荐一下这篇文章 Node Stream,干货很多

参考

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×