Node.js中的流十分強(qiáng)大,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數(shù)據(jù)處理和傳遞。正因?yàn)樗绱撕糜?,所以在?shí)戰(zhàn)中我們常常基于它來編寫一些工具 函數(shù)/庫 ,但往往又由于自己對流的某些特性的疏忽,導(dǎo)致寫出的 函數(shù)/庫 在一些情況會達(dá)不到想要的效果,或者埋下一些隱藏的地雷。本文將會提供兩個在編寫基于流的工具時,私以為有些用的兩個tips。
一,警惕EVENTEMITTER內(nèi)存泄露
在一個可能被多次調(diào)用的函數(shù)中,如果需要給流添加事件監(jiān)聽器來執(zhí)行某些操作。那么則需要警惕添加監(jiān)聽器而導(dǎo)致的內(nèi)存泄露:
'use strict';
const fs = require('fs');
const co = require('co');
function getSomeDataFromStream (stream) {
let data = stream.read();
if (data) return Promise.resolve(data);
if (!stream.readable) return Promise.resolve(null);
return new Promise((resolve, reject) => {
stream.once('readable', () => resolve(stream.read()));
stream.on('error', reject);
stream.on('end', resolve);
})
}
let stream = fs.createReadStream('/Path/to/a/big/file');
co(function *() {
let chunk;
while ((chunk = yield getSomeDataFromStream(stream)) !== null) {
console.log(chunk);
}
}).catch(console.error);
在上述代碼中,getSomeDataFromStream函數(shù)會在通過監(jiān)聽error事件和end事件,來在流報錯或沒有數(shù)據(jù)時,完成這個Promise。然而在執(zhí)行代碼時,我們很快就會在控制臺中看到報警信息:(node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit.,因?yàn)槲覀冊诿看握{(diào)用該函數(shù)時,都為傳入的流添加了一個額外的error事件監(jiān)聽器和end事件監(jiān)聽器。為了避免這種潛在的內(nèi)存泄露,我們要確保每次函數(shù)執(zhí)行完畢后,清除所有此次調(diào)用添加的額外監(jiān)聽器,保持函數(shù)無污染:
function getSomeDataFromStream (stream) {
let data = stream.read();
if (data) return Promise.resolve(data);
if (!stream.readable) return Promise.resolve(null);
return new Promise((resolve, reject) => {
stream.once('readable', onData);
stream.on('error', onError);
stream.on('end', done);
function onData () {
done();
resolve(stream.read());
}
function onError (err) {
done();
reject(err);
}
function done () {
stream.removeListener('readable', onData);
stream.removeListener('error', onError);
stream.removeListener('end', done);
}
})
}
二,保證工具函數(shù)的回調(diào)在處理完畢數(shù)據(jù)后才被調(diào)用
工具函數(shù)往往會對外提供一個回調(diào)函數(shù)參數(shù),待處理完流中的所有數(shù)據(jù)后,帶著指定值觸發(fā),通常的做法是將回調(diào)函數(shù)的調(diào)用掛在流的end事件中,但如果處理函數(shù)是耗時的異步操作,回調(diào)函數(shù)則可能在所有數(shù)據(jù)處理完畢前被調(diào)用:
'use strict';
const fs = require('fs');
let stream = fs.createReadStream('/Path/to/a/big/file');
function processSomeData (stream, callback) {
stream.on('data', (data) => {
// 對數(shù)據(jù)進(jìn)行一些異步耗時操作
setTimeout(() => console.log(data), 2000);
});
stream.on('end', () => {
// ...
callback()
})
}
processSomeData(stream, () => console.log('end'));
以上的代碼callback回調(diào)可能會在數(shù)據(jù)并未被全部處理時就被調(diào)用,因?yàn)榱鞯膃nd事件的觸發(fā)時機(jī)僅僅是在流中的數(shù)據(jù)被讀完時。所以我們需要額外地對數(shù)據(jù)是否已處理完進(jìn)行檢查:
function processSomeData (stream, callback) {
let count = 0;
let finished = 0;
let isEnd = false;
stream.on('data', (data) => {
count++;
// 對數(shù)據(jù)進(jìn)行一些異步耗時操作
setTimeout(() => {
console.log(data);
finished++;
check();
}, 2000);
});
stream.on('end', () => {
isEnd = true;
// ...
check();
})
function check () {
if (count === finished && isEnd) callback()
}
}
這樣一來,回調(diào)便會在所有數(shù)據(jù)都處理完畢后觸發(fā)了。