這篇文章主要介紹了Node.js中的流(Stream)介紹,本文講解了什么是流、pipe方法、流的分類、Readable流狀態(tài)的切換等內(nèi)容,需要的朋友可以參考下
什么是流?
說到流,就涉及到一個(gè)*nix的概念:管道——在*nix中,流在Shell中被實(shí)現(xiàn)為可以通過 |(管道符) 進(jìn)行橋接的數(shù)據(jù),一個(gè)進(jìn)程的輸出(stdout)可被直接作為下一個(gè)進(jìn)程的輸入(stdin)。
在Node中,流(Stream)的概念與之類似,代表一種數(shù)據(jù)流可供橋接的能力。
pipe
流化的精髓在于 .pipe()方法。可供橋接的能力,在于數(shù)據(jù)流的兩端(上游/下游 或稱為 讀/寫流)以一個(gè) .pipe()方法進(jìn)行橋接。
Node.js中的流(Stream)介紹 三聯(lián)
偽代碼的表現(xiàn)形式為:
代碼如下:
//上游.pipe(下游)
Readable.pipe(Writable);
流的分類
這里并不打算討論所謂的Node v0.4 之前的“經(jīng)典”流。那么,流分為這么幾類(皆為抽象接口:
1.stream.Readable 可讀流(需要實(shí)現(xiàn)_read方法,關(guān)注點(diǎn)在于對數(shù)據(jù)流讀取的細(xì)節(jié)
2.stream.Writable 可寫流(需要實(shí)現(xiàn)_write方法,關(guān)注點(diǎn)在于對數(shù)據(jù)流寫入的細(xì)節(jié)
3.stream.Duplex 可讀/寫流(需要實(shí)現(xiàn)以上兩接口,關(guān)注點(diǎn)為以上兩接口的細(xì)節(jié)
4.stream.Transform 繼承自Duplex(需要實(shí)現(xiàn)_transform方法,關(guān)注點(diǎn)在于對數(shù)據(jù)塊的處理
簡單來說:
1).pipe() 的擁有者一定具備 Readable 流(并不局限于)能力,它擁有 'readable'/'data'/'end'/'close'/'error' 一系列事件可供訂閱,也提供 .read()/.pause()/.resume()等一系列方法供調(diào)用;
2).pipe() 的參數(shù)一定具備Writable 流(并不局限于 )能力,它擁有 'drain'/'pipe'/'unpipe'/'error'/'finish' 事件可供訪問,也提供 .write()/.end() 等一系列方法供調(diào)用
什么鬼
有沒有一絲絲焦慮?別急,做為一個(gè)說人話的低級碼工,我會把Stream掰開了和您扯一扯的。
Stream類,在 Node.js的源碼 里,是這么定義的:
代碼如下:
var EE = require('events').EventEmitter;
var util = require('util');
util.inherits(Stream, EE);
function Stream() {
EE.call(this);
}
可以看出,本質(zhì)上,Stream是一個(gè)EventEmitter,那意味著它具備事件驅(qū)動的功能(.emit/.on...)。眾所周知,“Node.js 就是基于V8的事件驅(qū)動平臺”,實(shí)現(xiàn)了事件驅(qū)動的流式編程,具備了和Node一樣的異步回調(diào)的特征。
比如在 Readable 流中,有一個(gè) readable 事件,在一個(gè)暫停的只讀流中,只要有數(shù)據(jù)塊準(zhǔn)備好可讀時(shí),它就會被發(fā)送給訂閱者(Readable 流有哪些呢?express中的 req,ftp或者mutli-form上傳組件的req.part,系統(tǒng)中的標(biāo)準(zhǔn)輸入 process.stdin等)。有了readable 事件,我們可以做個(gè)處理shell 命令輸出的分析器之類的工具:
代碼如下:
process.stdin.on('readable', function(){
var buf = process.stdin.read();
if(buf){
var data = buf.toString();
// parsing data ...
}
});
這樣調(diào)用:
代碼如下:
head -10 some.txt | node parser.js
對于 Readable 流,我們還可以訂閱它的 data 和 end 事件,以獲取數(shù)據(jù)塊并在流枯竭時(shí)獲得通知,如 經(jīng)典socket示例 中那樣:
代碼如下:
req.on('connect', function(res, socket, head) {
socket.on('data', function(chunk) {
console.log(chunk.toString());
});
socket.on('end', function() {
proxy.close();
});
});
Readable流狀態(tài)的切換
需要注意的是,Readable 流有兩種狀態(tài):flowing mode(激流) 和 pause mode(暫停)。前者根本停不下來,誰被pipe上了就馬上不停的給;后者會暫停,直到下游顯式的調(diào)用 Stream.read() 請求才讀取數(shù)據(jù)塊。Readable 流初始化時(shí)是 pause mode的。
這兩種狀態(tài)可以互為切換的,其中,
有以下任一行為,pause 轉(zhuǎn) flowing:
1.對 Readable 流添加一個(gè)data事件訂閱
2.對 Readable 調(diào)用 .resume() 顯式開啟flowing
3.調(diào)用 Readable 流的 .pipe(writable) ,橋接到一個(gè) Writable 流上
有以下任一行為,flowing 轉(zhuǎn)回 pause:
1.Readable 流還沒有 pipe 到任何流上,可調(diào) .pause() 暫停
2.Readable 流已經(jīng) pipe 到了流上,需 remove 掉所有 data 事件訂閱,并且調(diào)用 .unpipe()方法逐一解除與下游流的關(guān)系
妙用
結(jié)合流的異步特性,我可以寫出這樣的應(yīng)用:直接將 用戶A 的輸出橋接到 用戶B 的頁面上輸出:
代碼如下:
router.post('/post', function(req, res) {
var destination = req.headers['destination']; //發(fā)給誰
cache[destionation] = req;
//是的,并不返回,所以最好是個(gè)ajax請求
});
用戶B請求的時(shí)候:
代碼如下:
router.get('/inbox', function(req, res){
var user = req.headers['user'];
cache.find(user, function(err, previousReq){ //找到之前存的req
var form = new multiparty.Form();
form.parse(previousReq); // 有文件給我
form.on('part', function (part) {
part.pipe(res); //流式大法好:)
part.on('error', function (err) {
console.log(err);
messaging.setRequestDone(uniqueID);
return res.end(err);
});
});
});
});
參考
how to write node programs with streams: stream-handbook
更多信息請查看IT技術(shù)專欄
2025國考·省考課程試聽報(bào)名