海納百川:Node.js Streams

YOTTA 你最專業的學習夥伴,提供優質內容與有趣觀點,擴大豐富你的視野。

訂閱李健榮 Simen的專欄,閱讀更多內容。


一、前言

前些日子,我發表了一篇「非同步程式碼之霧:Node.js 的事件迴圈與 EventEmitter」,受到社群很大的迴響,謝謝的大家對它的喜歡。


我心中主觀地認為,只要搞懂非同步程式碼、事件迴圈、EventEmitter 以及 Stream,對於 Node.js 的基本認識就應該有 6 ~ 7 成左右了,不僅 Node 核心程式碼實作,包含我們的開發工作,幾乎無時無刻都圍繞在這些概念之上。我相信對這些基礎的認識,能有助於開發者提升 Node.js 程式碼的撰寫品質,同時我也非常鼓勵新手在早期就能夠以偏向基礎的角度來認識 Node.js (我知道新手在越閱讀這些東西時,可能會有一點障礙。但是沒關係,多聽多看總是好的。體悟總有一天會降臨。)

因為 Streams 的原始碼很多,所以我盡量把它們的理念用畫圖的方式給表現出來,然後再以看圖說故事的方式,試著將 Streams 的內部運作給整理出來。原本還有放一些例如 read()、write() 與 push() 等等的原始碼,但後來覺得似乎沒必要,因為覺得看圖說故事就可以表達的不錯了。然後,假使你沒有時間閱讀這篇文章,可以直接跳到「以使用者角度看 Stream」一節,那裡整理的內容極簡,應該 2 分鐘就可以看完,純然只是如何使用標準介面而已,其實對普通的開發工作應該也算夠了。

然後,建議開始閱讀前,先深呼吸一下,因為這篇內容也蠻長的。但有靜下心來看,應該也是 20 ~ 25 分鐘左右可以搞定。如果你也看過「非同步程式碼之霧」,這篇長度有稍短一些,但也沒差多少。看這兩篇花不到 1 個小時,我個人其實覺得很划算。如果你現在還沒下定決心要看,我是建議就先不要看,等到靜下心時再看比較好,吸收的 CP值比較高。 (不覺得我有病嗎?連要人看一篇文章,都還要有工程師思維。是的我有病。)


由於這些技術內容並沒有人幫我校稿,內容難免有誤。在此要謝謝社群朋友們對「非同步程式碼之霧」一文的提醒、糾正以及幫忙測試,本篇同樣還是得請大家多多幫忙,讓我們能夠一起將文章的內容做到最正確。


(原本有計畫寫一系列如何撰寫 module 的文章,但我發現那類型的文章太多了,好像不差我那幾篇 XDD~ 所以只寫了第一篇我就決定先暫停,我真的不是故意找藉口不寫哦!好啦!如果真的有人想看,再請告訴我囉~)

二、為什麼要談 Stream

與 Unix 有著同樣的信仰,Node.js 同樣信奉「Small Modules」以及「Do One Thing and Do It Well」的哲學:

 Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface. - D. McIlroy

在類 Unix 的系統,有個東西叫 pipe (在命令行的符號是 | ),讓我們看看 wiki 怎麼說 (我不會說的比較好),這裡隨便寫一個範例吧!我們把對根目錄 / 給 ls 出來的結果,當成 sort 工具的輸入,做完反序後的輸出,當成 grep 的輸入,然後找出含有 e 字母的字串:


~$ ls / | sort -r | grep e
media
homeetc
dev


總之,因為 Stream 是一種標準介面,我們可以使用 pipe 輕易地將各種小工作給串聯起來,組合成更大的功能(或工作)。


Node.js 也 in-program 實現了這樣的哲學,名字正是 Stream,它不單單只是文字流(或數據流),也支援物件流。同時,它一樣是一種「Universal Interface」。在 Node 中,你可以寫出類似這樣的程式碼:


A.pipe(B).pipe(C).pipe(D);

例如 Node 官網上的檔案壓縮範例:


const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');
inp.pipe(gzip).pipe(out);


由於小模組的理念,再加上 Node.js 核心以及大量第三方函式庫皆以 Stream 實作,又或者提供有 Stream 的介面,以及 Node.js 在 Networking 的強項更是與 Stream 密不可分。這使得 Stream 幾乎是公認 Node.js 開發者必不可少的基礎知識之一。這正是為什麼我想要寫一篇文章來聊一下 Stream,而更重要的是巨人 Dominic Tarr 說的一句話:

 Streams are node's best and most misunderstood idea, ... - D. Tarr, EventStream

驅使著我想要好好搞懂 Node Streams。我將自己的學習過程以及理解整理起來,希望能為再為台灣 Node.js 開發者們帶來一篇值得一讀的文章。



三、console.log


每天苦惱的 Node 工程師總是會在程式裡寫上這麼一兩句:


 console.log('Fuxking explode here, go die fuxk face'); // 幹嘛跟電腦生氣呢!奇怪ㄋㄟ


我們到 /lib/console.js 看一下,疑?在 new 的時候傳了 process.stdout 跟 process.stderr 給它


module.exports = new Console(process.stdout, process.stderr);


function Console(stdout, stderr) {

 // ...

 prop.value = stdout;

 Object.defineProperty(this, '_stdout', prop);

 prop.value = stderr;

 Object.defineProperty(this, '_stderr', prop);

 // ...

}


當你呼叫 console.log 時,其實只是使用了 process.stdout.write() 啊!! 不過 console.log() 比較好心,會幫你換行 XDDD  (最後有個 '\n')


Console.prototype.log = function() {

 this._stdout.write(util.format.apply(null, arguments) + '\n');

};


那麼 process.stdout 跟 process.stderr 又在哪裡?這個我就不要列太多程式碼,你可以在 /src/node.js 的核心初始化程式碼的 stratup() => startup.processStdio() 找到它們。比較重要的就以下這 4 行:


  // ...

  stdout = createWritableStdioStream(1);

  // ...

  stderr = createWritableStdioStream(2);

  // ...

  var fd = 0;

  stdin = new tty.ReadStream(fd, ...);

  // ...


你可以知道 stdin, stdout, 與 stderr 即對應到作業系統的標準輸入串流 (fd =0, /dev/stdin)、標準輸出串流 (fd=1, /dev/stdout) 以及標準錯誤輸出串流 (fd=2, /dev/stderr)。


會以 console.log() 出發的用意是想告訴你,其實你一天到晚都在用 Stream,所以不要怕他啊!會使用 console.log() debug 的人一定是全天下使用 Stream 最多的人!哈哈哈... (我本人是也~) 所以不要再說你不會用 Stream 啦~(裝死...)

四、Stream 的結構

Stream 除了是資料的來源 (source) 或終點 (sink) 之外,它還是一種「資料流控制」的單元,由於牽涉到資料的控制、存取、流動與暫存,Stream 的每個實例內部都有維護有自己的狀態,每個 Stream 實例都是一個狀態機,這些狀態用於指明 Stream 目前是處在 Flowing 還是 Paused 模式等,Stream 於不同的模式有不同的行為。


由於它有點複雜,所以接下來我所引用的程式碼,只會剪出比較重要的片段,有達到我想表達的意思就好。Steam 的實作落在 Node 原始碼 /lib 目錄底下的 


  • _stream_readable.js
  • _stream_writable.js
  • _stream_transform.js
  • _stream_duplex.js
  • stream.js (以上四個都繼承了這個 Base Class)


我們從 stream.js 開始看起 (以 4.5.0 原始碼為例)。


[ stream.js ]


Stream 這個 Class 以靜態屬性 export 出了 Readable、Writable、Duplex、Transform 以及 PassThrough 這幾個 Classes。


module.exports = Stream;


const EE = require('events');    // <--- EventEmitter

const util = require('util');


util.inherits(Stream, EE);

Stream.Readable = require('_stream_readable');

Stream.Writable = require('_stream_writable');

Stream.Duplex = require('_stream_duplex');

Stream.Transform = require('_stream_transform');

Stream.PassThrough = require('_stream_passthrough');


function Stream() {

 EE.call(this);

}


然後你應該也發現到了,Stream 繼承了 EventEmitter!所以它可以發射事件、你也可以監聽它身上的事件。很重要,再複誦一遍:

Stream 是 EventEmitter

[ _stream_readable.js ]


Readable() 是一個 Constructor,它的每個實例都擁有自己的狀態 this._readableState,另外,還有一支必須由 ReadableStream 實作者所提供的 template method this._read()


function Readable(options) {

 // ...

 this._readableState = new ReadableState(options, this);  // <-- 狀態物件

 // ...

 if (options && typeof options.read === 'function')

   this._read = options.read;  // <-- template method, 由 Stream 實作者提供


 Stream.call(this);

}


我們來看一下狀態物件的建構子,它裡面的狀態非常的多,但是我這裡就剪出它最重要的一個:內部緩衝區


function ReadableState(options, stream) {

 // ...

 this.buffer = [];  // <-- 緩衝區僅僅是一個陣列

 // ...

}


嘿!注意到了嗎?內部緩衝區只是一個陣列,剛初始化時它空空的。由於 Stream 有一個稱為 highWaterMark (HWM) 的屬性,用來限制緩衝區最大可容納的 bytes 數,它的預設值是 16 kB,最大值不可超過 8 MB。這邊我們引出了第二個重點,我們再複誦一遍:

Stream 是 EventEmitter 與 Buffer 的組合

這我們後面看到「圖」的時候,會更有感覺!


[ _stream_xxx.js ]


雖然 Readable、Writable、Duplex、Transform 有不同的實作內容,但是關於 Stream 是 EventEmitter 與 Buffer 的組合,我們上面已經看過一個 Readable,其它的也具有相同性質,所以程式碼我想就不用再貼上來了啦。

五、Stream 的大意

Stream 的實作是單純的 JavaScript,它繼承自 EventEmitter,而且內部擁有一個緩衝區。緩衝區直接的目的是為了做流速控制,間接的好處是能夠使你的程式維持較精省的 memory footprint。

現在想像一個場景,你有一個數據來源,也有一個數據消耗者。若數據來源與消耗者都是 Stream 的實作,我們可以這樣說:

一個 ReadableStream 是數據產生者 (source)
一個 WritableStream 是數據消耗者 (sink)

流速控制 (Backpressure 與 Throttling)


當 sink 消耗速度慢,但 source 的產生速度快,這時候該怎麼辦?

如果是由我們自己來蠻幹,首先想到的辦法,大概也就是先將「速度快」的來源數據先「暫存(緩衝)」起來,然後再慢慢地消耗掉。(嘿!Stream 已經有緩衝區了呀!)


那麼,我怎麼會知道「消耗者」的速度比較慢?沒問題!一個 WritableStream  具有 Backpressure (背壓) 機制,當它的消耗速度跟不上輸入時,它會發事件通知你(在內部緩衝區溢出時),此時你有責任先暫停輸入數據。我們後面會把 Backpressure 講清楚。

另外一種情況,如果我想要將數據來源的速度調慢,調成我想要的速度呢?當成也可以!這樣的變速 (節流, throttling) 機制,也是靠一個緩衝區先將數據收集下來,再以我們想要的速度來輸出即可 (例如從 100 kB/s 降成 50 kB/s 的資料率)。當然,實作者有時候還得搭配 Backpressure 機制來做流量控制。(那能不能調快速度?當然可以,不過這取決於你到底想要什麼樣的數據。調快調慢,都可以靠 Transform 來完成。)


節省 Memory Footprint


這裡給個大家都很喜歡使用的範例,一看你就懂什麼意思了。假設我現在有一個「史詩級的大謎片檔案 xxx.avi」,它的大小有 2 GB (高畫質的喲!!)。我想要寫個 http server 來分享給我的朋友,以顯示我與眾不同的宅:


~$ ll xxx.avi
-rw-rw-r-- 1 simen simen 2097152000 Oct 14 11:13 xxx.avi


什麼??!!你沒有謎片可以測試 (睜眼說瞎話就對了)... 那用 dd 產生一個:


~$ dd if=/dev/zero of=xxx.avi bs=2097152000 count=1


[使用 fs.readFile() 來實作 ]


下面我們的祕密伺服器 server.js,它使用 fs.readFile() 來讀檔,當有 request 進來的時候,我們把檔案送過去給瀏覽器。我們順便檢測一下錯誤,若有錯誤就把它印出來:


var http = require('http');

var fs = require('fs');


var server = http.createServer(function (req, res) {

   fs.readFile('xxx.avi', function (err, data) {    // <-- 讀檔

       if (err) {

           console.log(err);

           res.writeHead(500);

           res.end(err.message);

       } else {

           res.writeHead(200, { 'Content-Type': 'video/avi' });

           res.end(data);

       }

   });

});


server.listen(3000, function () {

   console.log('Secret server is up');

});


執行 server.js 並開啟瀏覽器到 http://locahost:300 來下載檔案。結果令人失望,它爆炸了:



:在你的環境測試這個例子可能不會爆炸,但瀏覽器在準備收檔前等待的時間會卡的有點久,檔案越大,會卡越久。又或者你得到的錯誤可能跟我不同,例如你的物理記憶體根本不夠塞不下這麼大的檔案之類的,也會引發 create buffer 的錯誤。


為了讓這個錯誤出現,我有刻意退回較舊版本的 Node 來執行,原因是舊版本的 kMaxLength 是直接寫死為 0x3ffffff (1023 MB),新版本的 Node 會依據系統的整數指標長度來切換不同的 kMaxLength。你可以在 /src/node_buffer.h 看到 kMaxLength 這個參數:


static const unsigned int kMaxLength =

   sizeof(int32_t) == sizeof(intptr_t) ? 0x3fffffff : 0x7fffffff;



會不會爆炸反映在 /lib/fs.js:


 if (size > kMaxLength) {

   err = new RangeError('File size is greater than possible Buffer: ' +

                        `0x${kMaxLength.toString(16)} bytes`);

   return context.close(err);

 }


[ 改用 Stream 來實作 ]


var server = http.createServer(function (req, res) {

   res.writeHead(200, { 'Content-Type': 'video/avi' });

   fs.createReadStream('xxx.avi').pipe(res)  // <-- 將數據來源變成 ReadableStream

       .on('finish', function () {

           console.log('Sending done.');

       });

});



上面的程式碼看不到懂沒關係,等到這篇文章讀完,你一定能瞭若指掌!總之,我們再次執行看看,然後用瀏覽器下載。看!成功了!



上面兩種作法的差異


使用 fs.readFile(pathToFile, function (err, data) {}) 這支 API,底層會將檔案內容先讀進系統緩衝之中,然後在 callback 透過 data 一次給你一大包,就像下面這張圖的感覺。



圖片來源


這樣的 API 也稱為 Bulk I/O,Bulk 是一大坨的意思。這意味著,當你使用這樣的 API 存取 I/O 時,你的 memory footprint 將有那麼一時半刻會衝到很高。如果剛剛好你的系統記憶體負載很繁重,那就只好眼睜睜看著它爆炸了 XDDD....

那如果我們使用 fs 的 Stream 工廠方法 createReadStream(),將數據來源(檔案) 變成是一個 ReadableStream 呢?現在的感覺會像下面這張圖,數據將會是一個 chunk 一個 chunk 地傳送給消耗者,原本一大坨的數據將被切成一小塊一小塊來傳送,這每一小塊的最大容量則是由 Stream 內部緩衝區的最大容量所決定 (預設為 16 kB)。


圖片來源


又或者說,Stream 可以是動態的,數據來就送、來就送,不必等所有數據都到達,然後才一股腦地丟出一大包。因此,在使用 Stream 的情況下,就不會有 memory footprint 瞬間飆很高的情況發生。如此一來,網路中 client/server 之間的互動反應就能更加即時 (只要一收到 request,第一個 chunk 就能透過 response 馬上飛奔出去)。

相信現在您對「Stream 的大意與好處」應該已經稍稍有概念了,接下來,我們就要進入更深一層的解析了。不過別擔心,更深一層也沒多深啦... XDDD (不要嗆我啊!!)

六、Stream1、Stream2、Stream3 與相關函式庫


在開始偷窺 Stream 的內褲 (內部啦!) 之前,先很簡單摘要一下 Stream 的版本變化。這裡大家先很約略看一就好,等稍後從內褲鑽出來之後,再回來看這裡,你就會發現有點臭臭的~ (靠夭啊....)


Node.js 的 Stream 自第一版的 Stream1 (Node v0.8) 演化至今,已經歷經 Stream2 (>  v0.10) 而來到 Stream3 (> v0.12)。這裡我先很簡單地作一下摘要:

[ Stream1 ]


  • 使用 Push Model,即有資料來,Stream 自動發射 'data' 事件伴隨 data chunk 以將資料推出去,這稱為 Flowing Mode (流動模式)


[ Stream2 ]


  • 預設為 Pull Model,即有資料來時,Stream 將資料累積於內部緩衝區中,並同時發射一個 'readable' 事件來通知 Stream 的使用者,將資料取走 (使用 read() 方法)。若使用者沒有取走,資料將繼續累積於內部緩衝區。
  • Stream2 的預設模式也稱為 Paused ModeNon-Flowing Mode (非流動模式)
  • 當 Stream 不再自動將資料推出來、使用者可以自己決定要不要拉出資料,意味著使用者控制「資料流動」的彈性增加了。
  • 使用者仍可以將 Stream 轉回 Flowing Mode 來使用,只要掛上 'data' 事件的監聽器或呼叫 resume() 即可。
  • Flowing/Non-flowing 模式只能擇一使用,即你的程式碼中不可以同時有 'data' 的監聽器以及 'readable' 的監聽器。若兩者兼有之,程式不會當掉,但是你可能會得到非預期的 Stream 行為。


[ Stream3 ]


  • 混合模式(Mixed mode),推拉模型混在一起,程式碼中可以同時有 'data' 的監聽器以及 'readable' 的監聽器,行為容我後面再說明。
  • 一般還是建議維持 Flowing / Non-Flowing 模式擇一使用的習慣,通常混著用的機會可能也不大 (或許是有很棒的使用時機?這要請高手指點一下~)。
  • Stream3 開始,幾乎都是 performance 調整與 bug 的修正,Stream 的介面與行為看起來目前是穩定下來了。


[ 相關函式庫 ]


  • readable-stream:Node Stream2 與 Stream3 實作的獨立抽離版本,有了它,你就能在瀏覽器端擁有 Stream 啦!
  • mississippi:Stream 的輔助工具集合,可協助你快速實作 ReadableStream、WritableStream 等等(主要是擺脫一些 boilerplate code),當然你也能直接使用 from2through2duplexer2 這些工具來協助你
  • 如果要做 Stream 的合併、串連等工作,可以使用 mutilpipepumpify 等工具
  • 以上介紹的都是使用量破表的函式庫,另外還有一個我覺得超威的,就是 highland.js,它的夢想是將一切給 Stream 化,夠瘋狂吧!


接下來,我們終於可以開始來看 Stream 內部的運作機制了!因為數據就是從來源流向消耗者,所以一開始我們會花點篇幅在 ReadableStream 上面,接著 WritableStream 就快了,然後 Transform 跟 Duplex 就只是短短幾句話的事情而已!

我們會先從「內部原理與 Stream 實作者」的角度切入,先把各類 Streams 跑過一遍。隨後再以「Stream 使用者」的角度來看 Stream,這也是大部分開發者的角度,大多時候我們都是 Stream 的使用者!(礙於篇幅,我們不談如何實作自己的 Stream,但是你了解原理之後,會發現要利用 Stream 的基礎建設來設計自己的 Stream 就不會太難了。)

七、Readable Stream

我們先從 Readable Stream 的 Flowing Mode 看起 (掛上 'data' 事件監聽器來接收數據),我相信這也是很多人喜歡的方式。然後,我們再看 Non-flowing Mode (掛上 'readable' 監聽器來獲取數據到來的通知)。



Readable Stream:Flowing Mode


下面這張圖,藍色布景內的東西是 Stream 內部示意圖,它設有緩衝區水桶一個;鐵灰色的部分是 Stream 要求「Stream 實作者」應該要提供實作的部分。而藍色布景的外面,事件 'error' 以及 方法 pause() 這種東西,它們是 Readable Stream 的標準介面。之後我們遇到的圖,都是以這樣的方式來佈局。


接下來,依照圖中標示的 (1), (2), ... (6) 一個個看起,看看狀態處在 Flowing Mode 的 ReadStream 是如何工作的:


  1. Resource 是數據來源,例如一支檔案、一個 socket、或某種數據產生源。這包含在「實作者提供的實作」中,Stream Class 是基礎建設,數據源得由實作者決定呀!

  2. _read() 這支 template method 由 ReadableStream 實作者提供,實作內容正是鐵灰色區塊的部分。

  3. 當實作者從 Resource 拿到一些數據後,他必須使用 this.push(chunk) 將資料塊推入內部緩衝區。注意圖中有兩處 (3),上者表示數據推入緩衝區,下則對應到 _read() 的實作在某處一定會呼叫 this.push()。當水桶滿了 (達到 highWaterMark),呼叫 push() 將傳回 false,讓實作者知道水桶已經滿了,此時實作者必須暫時停止再從 Resource 讀資料出來。此機制稱為 Backpressure,不過這裡是實作者要去照顧的事情。我們在後面以「Stream 使用者」角度切入時,我們使用者需要照顧的是 WritableStream 的 Backpressure。

  4. 在 Flowing Mode,如果 ReadableStream 沒有被 pause() 住,那麼由 (3) push 進來的 chunk 就會馬上從白色 (4) 的水管噴出去,不會累積在緩衝區,因此稱為流動模式。噴出去的東西預設是 binaries (型別為 Buffer),如果有設定 encoding,也可以直接噴出字串。黑色 (4) 是指 ReadableStream 的使用者可以呼叫  resume() 或 pause() 來開關資料流動,水管不是 ON 就是 OFF。當 pause() 後,由 (3) 所推入的資料塊,就會在緩衝器中累積起來。由於 Stream2 開始,Stream 的預設模式就是 paused mode,因此我們會常常看到一些程式碼在一開頭都會先呼叫 readable.resume()。在 Stream2/3,其實你只要在 ReadableStream 掛上 'data' 事件監聽器,Stream 就會以 Flowing Mode 操作了,除非你有刻意呼叫 pause() 關閉水管。

  5. 當每次 push 資料進水桶時,Stream 都會檢查目前水位是否已經達到 highWaterMark (HWM),如果已滿,呼叫 push() 將傳回 false。highWaterMark 是可以自己設定的, 預設是 16 kB,最高可以設到 8 MB。注意,push() 方法是給「實作者」用的 API,而不是給「使用者」用的。

  6. 在 Flowing Mode「使用者」監聽 'data' 事件就可以拿到從 source 流過來的資料。

  7. 'error' 跟 'close' 是所有類型 Streams 都有的標準事件,'error' 通知錯誤,而 'close' 通知底層 Resource 的關閉,並不是所有的 ReadableStream 都有 'close' 事件,因為數據來源不一定總是 I/O 之類的資源。'end' 是 ReadableStream 的專屬事件,它用來通知「使用者」本次的這一大坨數據已經傳輸完畢 (一大坨是被拆成一小塊一小塊,所以全部傳完要通知一下啊!)。對於「實作者」而言,必須在最後一個 chunk 傳完之後,呼叫 this.push(null),這會讓 ReadableStream 知道應該引發 'end' 事件了!


Readable Stream:Non-flowing Mode


接下來我們來看 Non-flowing mode (paused mode),這是 ReadableStream 在 Stream2/Stream3 的預設模式。現在解釋起來就簡單了,我們一樣按照圖中僅有的 (1)、(2)、(3) 來說明:


  1. 因為預設是 Non-flowing 的,所以 this.push() 進來的 chunk 都會被累積在緩衝區

  2. 每當 this.push() 被實作者呼叫時,ReadableStream 就會引發 'readable' 事件但是不挾帶數據,目的是通知「使用者」,快來哦!有資料來了哦!當然,「使用者」必須監聽這個事件,才收的到通知嘛~

  3. 當收到通知後,使用者在它的 'readable' 監聽器中,呼叫 readable.read([size]) 來把資料挖走。注意到了嗎?在挖資料的時候,可以用 size 參數來指定一次要挖「多少」, read() 這支 API 就如可調鬆緊的水龍頭,可以控制每次想取回的數據量啊!超讚!(封包剖析就很需要這種功能)。平常的使用大多不會指定 size,而是盡可能地一直呼叫 read() 把緩衝區現有的數據都給挖回來,反正遇到 read() 傳回 null 時,就知道緩衝區空了、挖光了。這我們在以「使用者角度」切入時,再來看!如果外面沒有人呼叫 read() 說要吸,那麼數據就會在緩衝區內累積下去


關於 Stream3 的行為


  • 此時此刻,我覺得我們只要看 Stream3 就可以了。 (Node 都已經到 v7.0 了啊!!!)

  • 使用者可以同時掛上 'data' 與 'readable' 監聽器。

  • 只要掛上 'data' 監聽器,ReadableStream 就會進入 Flowing mode,此時就算呼叫 pause() 把水管關閉,並不會使其回到 Non-flowing mode。

  • 在 Flowing mode,不只 'data' 事件,同時 'readable' 事件一樣會發射,但呼叫 read() 會挖不到東西,因為數據已經伴隨 'data' 事件拋出去了。

  • 在 Non-flowing mode,每一次 push(chunk) 都會引發一次 'readable' 事件,但是如果 ReadableStream 內部 source 產生的速度很快 (推入緩衝區的速度也很快),此時外部在聽到較早發生的 'readable' 事件時,用 read() 就會撈到「現在這當下」緩衝區內擁有的東西,然後你又撈光它的話,這會造成聽到較晚的 'readable' 事件時,使用 read() 反而挖不出東西來。這是正常的行為,不要以為有問題啊!因為你還是一樣拿到完整的數據了啊,對吧?

  • 如果你想使用 Non-flowing mode,但又掛了 'data' 監聽器,你必須在 ReadableStream 造出來後立即顯式地呼叫 pause() 讓它進入非流動模式,之後當你呼叫 read() 時,'data' 事件會順便被引發,然後你在 'data' 監聽器會得到跟 read() 一樣的數據,也就是說你在 'readable' 與 'data' 兩個監聽器內都有辦法拿到同一份數據,YA~~

  • 關於 Stream3 更細緻的行為,請參考官方手冊最準確!但我認為有上述的認識,就差不多了!


呼~ Readable Stream 說了很長,但這是必經之途呀!接下來,Writable Stream 就好講了。請再忍忍!

八、Writable Stream

一樣,看圖說故事!我們有 (1) ~ (6) 共六個號碼:


  1. Resource 是數據終點,例如一支檔案,一個 socket、或某種數據收取裝置,到底如何往 Resource 寫數據是實作者的事情。

  2. _write() 這支 template method 由這個 WritableStream 的實作者提供,這支方法主要是要告訴 Stream 如何將數據寫入 Resource。這裡範例的程式碼是用 fs.writeFile() 將數據寫進一個檔案。

  3. 「使用者」可以呼叫 write(chunk) 或 end(chunk) 往 WritableStream 寫入數據,end() 可在結束時順便寫入最後一筆 chunk。接著看 (4)

  4. 內部會先判斷緩衝區是否為空,如果是空的,那就直接往 Resource 寫去即可。

  5. 如果緩衝區不為空,那就進去跟大家一起排隊吧 (lastBufferRequest 是紀錄物件)!WritableStream 內部會在往 Resource 寫東西之後,回頭去檢查緩衝區內是否還有數據,試圖清空緩衝區 (clearBuffer()),直到消耗完畢。

  6. 當外部使用者呼叫 end() 之後,WritableStream 在來來回回從緩衝區拿出數據往 Resource 寫入,直到緩衝區空了,它知道你已經告訴它 end() 啦,所以在最後一次往 Resource 寫完資料之後,將引發 'finish' 事件通知你,終於寫完啦!爽!哈哈哈!那麼 'drain' 事件又是怎麼回事?這時就可以來聊聊 Backpressure 了!


Backpressure (背壓)


Backpressure 機制作圖如下,當外部呼叫 write(chunk) 或 end(chunk) 往 WritableStream 寫入數據,若內部緩衝區已經滿了則傳回 false,這個時候「你這個使用者」就必須暫停再往它身上寫東西,因為進不去了,不要硬塞!那什麼時候可以再寫東西進去?當 WritableStream 把它的緩衝區消耗完畢後,會引發 'drain' 通知使用者,來吧!Come On Baby!write() 回傳的 false 以及 'drain' 事件通知,它們一起構成了 Backpressure 的機制!



這裡我們要注意的是:

照顧 Writable Stream 的 Backpressure 是使用者的責任

蛤?~~責任?那是甚麼?這讓我想李敖大師說施明德先生的三不主義:「不主動、不拒絕、不負責」 XDDD....


接下來我們來看一下 pipe(),它除了可以幫你把一個 ReadableStream 的輸出水管給接到 WritableStream 的輸入之外,還會幫你扛起處理 Backpressure 的責任哦!


pipe


pipe() 的用途就是將一個 Readable Stream (source) 接給 Writable Stream、Transform Stream 或 DuplexStream 等 sink。pipe() 會再傳出一個 Readable Stream,以致於 pipe() 能夠一直串連下去,就好像我們在文章一開頭看到的 UNIX 命令範例:


$ ls / | sort -r | grep e


一個純然的 Writable Stream 因為沒有 Readable 介面,所以 pipe() 接到純 Writable Stream 就是終點了,無法再 pipe() 下去。


現在,我們來看一下 /lib/stream.js 中,pipe() 方法的實作。請注意,底下的程式碼我有將原始碼做了一點調整,為了是方便說明它的理念,原始碼並非如此。當我們使用 A.pipe(B) 時,A 是 source,B 是 destination (sink)。就看以下這三點:


  • pipe() 幫你監聽了 source 的 'data' 事件,然後將收下來的 chunk 用 dest.write() 寫到 dest 去!同時它若檢測到 backpressure,會使用 source.pause() 自動將 source 暫停下來!
  • 它也幫你監聽了 dest 的 'drain' 事件,當 dest 的緩衝區消耗完畢,它又幫你恢復 source 的資料流動!它幫你自動照顧 backpressure,根本超佛....  XDDD (pipe 裡面還有幫你照顧 'end' 跟 'close' 事件,我就不列啦!)
  • pipe() 最後傳出 destination,若 destination 是純 Writable Stream,那就無法再串 pipe 下去。因為自 Stream2 起,已經嚴格限制必須要是 Readable 才能 pipe 給 Writable Stream。


Stream.prototype.pipe = function(dest, options) {

 var source = this;

 source.on('data', function (chunk) {

   if (dest.writable) {

     if (false === dest.write(chunk) && source.pause) {

       source.pause();

     }

   }

 });


 dest.on('drain',   function () {

   if (source.readable && source.resume) {

     source.resume();

   }

 });


 dest.emit('pipe', source);


 // Allow for unix-like usage: A.pipe(B).pipe(C)

 return dest;

};


如果以撰寫應用程式的觀點來看,原本像以下的寫法:


var fs = require('fs');

var sourceStream = fs.createReadStream('source.txt');

var sinkStream = fs.createWriteStream('sink.txt');


sourceStream.on('data', function (chunk) {

 sinkStream.write(chunk);

}).on('end', function () {

 sinkStream.end();

});


使用 pipe() 來寫,將變成這樣:


var fs = require('fs');
fs.createReadStream('source.txt').pipe(fs.createWriteStream('sink.txt'));


真的超酷的啊!不覺得嗎?還有一點,我就把它總結成下面這句話吧:

 pipe() 幫你串連數據來源與消耗者,同時會自動控制它們之間的 Backpressure,因此,整條 Streams Pipeline 的速度將受限於其中最低速的那一個

好的!有了以上的概念,接下來我們再看 Transform 與 Duplex,真的超簡單!隨便講個幾句你就能理解了!

九、Transform Stream

Transform 擁有  Writable (圖左側) 與 Readable (圖右側) 的標準介面,輸入與輸出彼此間的數據存在有轉換關係,且轉換前後的數據長度可以不一樣沒關係。例如壓縮,轉換後的長度變短;又如解壓縮,轉換後的長度變長。


TransformStream 的實作者必須提供 _transform()_flush() 的內容,_transform() 告訴 TransformStream 如何轉換來源數據(例如壓縮),_transform() 的函式簽署與 _write() 一樣,只不過實作者在內部會呼叫 this.push() 將轉換後的數據推入內部緩衝區,這反而像 _read() 對吧!這是因為轉換後,將形成一個新的數據源,而非寫入某個 Resource。


_flush() 則是用於讓 TransformStream 在 Readable side 要發射 'end' 之前,有最後一次機會將一些殘餘數據推入緩衝區 (這點對於「使用者」其實不用太在意,若你是實作者,當你試著實作一個 Transform Stream 碰到時,就知道這什麼意思了)。

因為遵循著 Readable 與 Writable 的標準介面,所以,就如同使用 Readable Stream 與 Writable Stream 般地大膽地使用它吧!


Transform 完!

十、Duplex Stream

Duplex 擁有  Writable (圖左側) 與 Readable (圖右側) 的標準介面,輸入與輸出彼此間的數據可以毫無關係,例如一個 socket,寫出跟讀入可以完全沒關係,你可以將它的內部視為,由獨立的 Readable Stream 與 Writable Stream 所組合而成,各自使用的 Resource 可以是同一個、也可以不一樣。



因為遵循著 Readable 與 Writable 的標準介面,所以,就如同使用 Readable Stream 與 Writable Stream 般地大膽地使用它吧!


這就是 Duplex,打完收工!

十一、以使用者角度看 Stream

當你已經理解各種 Streams 的運作原理,站在使用者的角度,你會發現這根本沒什麼了啊!不過我們還是把它給總結一下:


使用 Readable Stream


介面事件:'error', 'close', 'end', 'data' (Flowing mode), 'readable' (Non-flowing mode)

介面方法:resume(), pause(), read()


[ 流動模式 ] 聽 'data' 事件,由 handler 收取 chunk



[ 非流動模式] 聽 'readable' 事件,在 handler 中使用 read() 方法收取 chunk,讀到 null 即內部緩衝區已空。平常用法就如綠色區塊內的範例,會使用 while() 直到把緩衝區掏光為止。



使用 Writable Stream


介面事件:'error', 'close', 'finish', 'drain'

介面方法:write(), end()

[使用模式] 


檢查 write() 回傳值,backpressure 發生時要暫停寫入。監聽 'drain' 事件以繼續寫入數據。呼叫 end() 告訴 WritableStream 這一坨數據的最後一個 chunk 已經來了,接著當 'finish' 發生時,代表 WritableStream 已經將所有數據都往 Resource 寫入完畢。圖中綠色區塊範例的 fooWrite() 是經過包裝的 writer,它帶有照顧好 backpressure 的機制,你可以實作自己的處理方式。



使用 Transform、Duplex 與 PassThrough


這個我們就不用再畫圖了!因為遵循著 Readable 與 Writable 的標準介面,所以同樣再唱一次國歌:「就如同使用 Readable Stream 與 Writable Stream 般地大膽地使用它們吧!

 

怎麼看都很眼熟?


如果你使用 Node 開發網路程式,你可能會常寫出類似以下的程式碼,你知道多數時候你一直在使用著 Stream 的標準介面嗎?


var server = http.createServer( (req, res) => {

 res.writeHead(200, {'Content-Type': 'text/plain'});  // <- 註: writeHead 不是標準介面哦

 res.write('Hello');

 res.end('World!');

});


client.on('data', (data) => {

 console.log(data.toString());

 client.end();

});



Node 核心的許多模組,例如 fs, net, http, dgram, zlib, cryto 大多使用了 Stream 的實作,或提供有 Stream 的介面,所以你可能在很多地方多多少少都會遇見 Stream 的蹤影。當然,Stream 本身是 EventEmitter,因此你當然可以在它身上發射自己的事件,許多核心模組或第三方模組也規劃有自己特化後的事件、方法或者屬性,以符合模組的目的 (可能語意也會比較清晰,畢竟 Stream 的介面是很通用化的)。

再者,由於 Stream 的介面是 Universal 的,這帶給開發者最大的好處就是:只要搞懂 Stream 的介面,就可以在「不太需要詳閱文件」的情況下,自由運用許多基於 Stream 的工具!(需要了解目的,而不需要花太多時間再了解介面)

現在弄清楚 Stream 的介面了,準備好對你的 http server 進行惡搞了嗎? XDDD....

十二、我們沒有講的

你知道,光是要把 Stream 的運作機制給講清楚就已經要花這麼大篇幅了,因此恕我沒辦法再說更多了!這裡提幾點我們沒談到的,我相信只要您已經理解以上內容,自己去探索剩下的東西也就不那麼困難了。我們沒談到的有:


  • 如何設定 encoding  (存在內部緩衝區內的東西,預設一定是 Buffer 類型,當你塞字串進去時可設定編碼,預設是當成 utf8 將字串編成 binaries。當數據被挖出來的時候,也能設定編碼類型以噴出字串,而不是噴出 binaries)

  • Stream 的 Object Mode (還記得 this.buffer = [] 嗎?它其實存的都是一個個 Buffer 類型的實例,也都是物件。但開啟 Object Mode 後,這個陣列就能塞一般物件啦)
  • Writable Stream 的 'pipe' 及 'unpipe' 事件,這個我不知道怎麼講。太直覺簡單反而講不出所以然阿 XDDD

  • Writable Stream 的 cork() 與 uncork() 方法,其實就是讓你有機會去堵住 Writable Stream 「內部緩衝區」,把它們當成 Readable Stream 的 pause() 與 resume() 你就瞭了

  • Writable Stream 什麼時候會被 'unpipe'?例如有 error 發生時。請參閱官方文件

  • 捕捉 pipeline 中的 error 以及如何 combine 與 merge streams 等 (前面有列一些輔助工具,請參閱那些工具的文件)

  • 如何利用 Stream Classes 來實作自己的 streams

  • Stream 更細緻的行為,也請參閱官方文件

這裡有一些很棒的文章/影片,有空可以看一看:


這邊再次說一下,作文的目的是希望:對跟我一樣熱愛 JS 與 Node.js 的開發者有所啟發,然後繼續寫出更棒的程式碼。本文非常歡迎各界拿去分享、修修改改當教材,但我最在乎的是,希望大家有發現錯誤的話,能告訴我,讓我們一起把它修改得更好、更正確!


這次不打粉絲專頁的廣告了,其實根據經驗,往往沒什麼效果,哈哈哈~

這個小節就當成我們的結語吧!寫了 N 天。終於,完了!呼....


--




內文圖片來源:李健榮 Simen

封面圖片來源:pexels