NodeJS 里的 stream(流)

流文件?文件流?其实很多语言都有,这里只是对于 NodeJS 里的一些流的用法做一个记录。

关于“流”的定义,百度百科的介绍也没有说出个所以然。我个人理解,就是顾名思义,类似于“水流”般的存在和运动,只不过实体不是水,而是二进制。作用是啥,缓冲效果,积少成多。对于大文件的读写,或者网络传输其实都用到流。

这里先分享下两篇大佬的文章:

文章里也介绍了 stream 的一些用法,这里再记录巩固一下。这里也有我个人对于流操作的实操:《文件切片上传(断点续传)》。

另外,官方文档镇楼:《Node.js v20.5.0 documentation》。

1、简单的文件流读取写入,记得是 读取流.pipe(写入流)

const fs = require('fs');
const readStream = fs.createReadStream(读取文件地址);
const writeStream = fs.createWriteStream(写入文件地址);
// 通过 pipe 执行拷贝,数据流转
readStream.pipe(writeStream);
// 数据读取完成监听,即拷贝完成
readStream.on('end', function () {
  console.log('拷贝完成');
});

2、网络响应也可以用 stream 方式返回。也就是 response 对象是一个写入流,写回到请求的浏览器前。

const fs = require('fs');
const http = require('http');
const server = http.createServer(function (req, res) {
  const readStream = fs.createReadStream(读取文件地址);
  readStream.pipe(res); // 将 res 作为 stream 的 dest
});
server.listen(8000);

3、根据上面文章对于 buffer 的介绍,测试确实不占内存

const fs = require('fs');
const json = {};
for(let i = 0; i < 100; i++) {
  json[i] = fs.createReadStream('package-lock.json');
  // json[i] = fs.readFileSync('package-lock.json');
}
for (const [key,value] of Object.entries(process.memoryUsage())){
  console.log(`Memory usage by ${key}, ${value/1000000}MB `);
}

所以 node 做 server 的时候对于文件操作可以用流形式,减少内存泄露的可能性。
依然大佬文章分享《Node.js内存溢出时如何处理?》。

4、可以在运输过程中修改内容

如果有用过 gulp 打包过前端代码的话,可能会有印象它的用法就有流用法如 src.pipe(transform).pipe(dest) 这样子。
也就是数据可以在流管道中进行修改。靠的就是 stream.Transform 流。用法也很简单:

const stream = require('stream');
const myTransform = new stream.Transform({
  transform(chunk, encoding, callback) {
    const content = 
`/* prefix */
${chunk.toString()}
/* suffix */`
    callback(null, content);
  }
});

src.pipe(dest) 这样返回的是后者的写入流,也就是能写在中间的 src.pipe(transform/duplex).pipe(dest) 必须是个双工流,既可以是读取流也可以是写入流。

5、一些事件示例

readStream.on('data', (chunk) => { // chunk 传输的流二进制
  console.log('读取流 data 事件在传输时触发');
});
readStream.on('end', () => {
  console.log('读取流 end 事件在消费完毕时触发');
});
readStream.on('close', () => {
  console.log('读取流 close 事件在 end 事件后触发');
});
writeStream.on('drain', () => {
  console.log('写入流 drain 事件在可以接收更多的数据时触发');
});
writeStream.on('pipe', (src) => { // src => 读取流对象
  console.log('写入流 pipe 事件在读取流导入写入流的 pipe 操作时触发');
});
writeStream.on('error', (error) => { // src => 读取流对象
  console.log('写入流在流通道关闭后再写入就会报错',  error);
});
writeStream.on('finish', () => {
  console.log('写入流 finish 事件调用 end 方法触发'); 
});
writeStream.on('close', () => {
  console.log('写入流 close 事件在 finish 事件后触发');
});

最后再放两篇国外大佬写的介绍文章,不必要全看,看看代码示例也可以了。