概述
构建复杂程序的时候,通常会将系统拆解成若干功能,这些功能的之间的接口遵循一定的规范,以实现组合连接,共同完成复杂任务。例如管道运算符 |
。
在 nodejs 中,实现各种功能,总避免不了和“数据”打交道,这些数据可能是 Buffer、字符串、数组等等。但当处理大量数据的时候,如何保证程序的稳健性?如何不让内存爆掉呢?nodejs 提供了 stream 模块,来让开发者更优雅地处理数据。这需要开发者理解“流”的含义,遵循“流”的相关规范,来进行“流”编程。
相较于其他的模块系列,需要理解的概念偏多。本文主要从以下几个方面深入 stream 模块:
- 什么是“流”?
- 流有哪几种类型?
- 内部缓冲的作用?
- 流动模式 vs 暂停模式
- 背压问题
- 如何产生的?
- 如何解决背压问题?
- 如何定制流
- 实现可写流
- 实现可读流
- 实现双工和转换流
什么是“流”?
流是数据的集合。但它不一定是一次性全部读入内存的,这和程序中的变量不同。举个例子,一个 100GB 的文件,可以通过 fs.createReadStream()
来将文件二进制一点点读入,每次读入的“一点点二进制”,就是流。
乍一看,这样做好像并没有什么显而易见的好处。一点点读入的流,还要一点点处理。但是,处理难度的提高换来的是程序性能提升。如果通过fs.readFile()
一次性将 100GB 大文件读入内存,那么可能会发生以下几个问题:
- 内存错误。本机内存不够,或者超过了 nodejs 的内存限制。
- 程序性能下降。过高的内存占用可能会导致频繁触发 GC,影响其他程序的运行。
借助流,每次仅读入一小部分的数据,待这部分数据被“消费”后,再读入新的数据。转换思路,不一定必须将要用到的数据一次性全部装入内存。
流有哪几种类型?
stream 提供了 4 种基本的流类型:
- Writable:可写入数据流。
- Readable:可读取数据流。
- Duplex:双工流,可读又可写。例如:net.Socket
- Transform:转换流,它是 Duplex 流的一种。它用于在读写过程中,加工数据。例如:zlib
内部缓冲的作用?
在文档开头部分,就用一节专门提到了“缓冲”。可读流和可写流都会在内部缓存器存储数据,Duplex 和 Transform 也在内部维护了缓存器。在开发者基于流开发时,可以通过传递highWaterMark
参数,来修改默认缓冲的大小。
理解缓冲的作用,就要先搞明白缓冲的处理流程:
- 可写流中:
- 调用 write()向流中写入数据
- 数据进入可写流缓冲
- 数据等待被消费
- 可读流中:
- 调用 read()向流中读取数据
- 数据进入可读缓存
- 数据等待被消费
在这个过程中,如果可写/可读缓冲中的数据总大小超过了 highWaterMark
:
- 可写流的 write()会返回 false,直到缓冲可以继续写入,触发
drain
事件 - 可读流会停止从底层资源读取数据
有了内部缓冲机制,就可以限制流的读写速度,防止内存被压垮,解决背压问题。
流动模式 vs 暂停模式
这是可读流的两种模式。可读流开始时是处于暂停模式,之后根据监听的事件、调用的 api,来进行两种模式的切换。文档上写的很详细,但是也会让初学者感到困扰。这里直接从编码风格触发,来学习这两种模式。
编码风格一:监听 readable 事件 + read()
如果可读流监听了 readable 事件,那么处于暂停模式。readable 事件回调触发的条件:
- 有新的数据
- 流到达尽头
由于处于暂停模式,因此在事件回调函数中,需要使用 read()来读取数据。
1 | const fs = require("fs"); |
编码风格二(官方推荐):监听 data 事件 + pause() + resume()
如果可读流监听 data 事件,那么会自动切换至流动模式,事件回调中可以得到数据。不需要调用 read(),换句话说 read()只在暂停模式下调用。
1 | const fs = require("fs"); |
如果配合可写流进行更复杂操作,发生了背压问题,没有可用的消费者来处理数据,则数据将会丢失。为了方便理解,可以认为它是自动调用 read()进行消费。此时使用 pause()来切换到暂停模式,待消费者可以处理时,再调用 resume()恢复流动模式。
背压问题
如何产生的?
当处理数据的时候,如果数据生产者产生数据的速度 > 数据消费者处理数据的速度,那么由于速度差异没被消耗完的数据就会持续堆积下来,这个现象就是背压(也称积压)。
它会导致资源过度占用,内存耗尽,也会增加 GC 的负担。
如何解决背压问题?
结合前面对缓冲的讲解,在向可写流写入数据的时候,如果超过可写缓存,应该暂停数据读取,等待缓存中数据被消耗完毕后,再继续流动可读流。
下面是一个基于 stream,复制大文件的函数:
1 | function copyBigFile(src, dest, callback) { |
更好的解决方案是,使用可读流上的pipe()
函数,或者 stream 模块的pipeline()
函数。
pipe 函数实现了以下几个功能:
- 不断从来源可读流中获得一个指定长度的数据。
- 将获取到的数据写入目标可写流。
- 平衡读取和写入速度,防止读取速度大大超过写入速度时,出现大量滞留数据。
用它来处理背压问题非常简单。前面复制文件的函数改写为:
1 | function copyBigFile(src, dest, callback) { |
pipeline 函数是 pipe 的语法糖,它的参数可以是多个流+一个回调函数:
1 | const { pipeline } = require("stream"); |
如何定制流?
在实现自己的类库的时候,可以借助流来处理大容量数据。nodejs 提供给开发者 API,来定制 4 种类型的流。
实现可写流
继承 Writable 类,需要重写_write()方法。并且在实现中必须调用callback()
函数,无论成功失败。
下面实现了一个解码器,并且能将解码结果处理成小写:
1 | const { Writable } = require("stream"); |
实现可读流
继承 Readable 类,需要重写_read 方法。内部通过 push 方法来推入数据。
为了方便演示,先实现一个产生数据的类,它继承自 EventEmitter :
1 | const EventEmitter = require("events"); |
最后实现一个可读流,并在读取过程中,将字符转成小写:
1 | const { Readable } = require("stream"); |
实现双工和转换流
双工流的实现就是既要重写可写流的_write 方法,也要重写可读流的_read 方法。
转换流的实现是不同的,它需要重写_transform 方法,来接收并产生输出。下面是一段利用转换流来给输出添加标志符的代码:
1 | const { Transform } = require("stream"); |
效果如下: