YOTTA 你最專業的學習夥伴,提供優質內容與有趣觀點,擴大豐富你的視野。
訂閱李健榮 Simen的專欄,閱讀更多內容。
一、前言
前些日子,我發表了一篇「非同步程式碼之霧:Node.js 的事件迴圈與 EventEmitter」,受到社群很大的迴響,謝謝的大家對它的喜歡。
我心中主觀地認為,只要搞懂非同步程式碼、事件迴圈、EventEmitter 以及 Stream,對於 Node.js 的基本認識就應該有 6 ~ 7 成左右了,不僅 Node 核心程式碼實作,包含我們的開發工作,幾乎無時無刻都圍繞在這些概念之上。我相信對這些基礎的認識,能有助於開發者提升 Node.js 程式碼的撰寫品質,同時我也非常鼓勵新手在早期就能夠以偏向基礎的角度來認識 Node.js (我知道新手在越閱讀這些東西時,可能會有一點障礙。但是沒關係,多聽多看總是好的。體悟總有一天會降臨。)
因為 Streams 的原始碼很多,所以我盡量把它們的理念用畫圖的方式給表現出來,然後再以看圖說故事的方式,試著將 Streams 的內部運作給整理出來。原本還有放一些例如 read()、write() 與 push() 等等的原始碼,但後來覺得似乎沒必要,因為覺得看圖說故事就可以表達的不錯了。然後,假使你沒有時間閱讀這篇文章,可以直接跳到「以使用者角度看 Stream」一節,那裡整理的內容極簡,應該 2 分鐘就可以看完,純然只是如何使用標準介面而已,其實對普通的開發工作應該也算夠了。
然後,建議開始閱讀前,先深呼吸一下,因為這篇內容也蠻長的。但有靜下心來看,應該也是 20 ~ 25 分鐘左右可以搞定。如果你也看過「非同步程式碼之霧」,這篇長度有稍短一些,但也沒差多少。看這兩篇花不到 1 個小時,我個人其實覺得很划算。如果你現在還沒下定決心要看,我是建議就先不要看,等到靜下心時再看比較好,吸收的 CP值比較高。 (不覺得我有病嗎?連要人看一篇文章,都還要有工程師思維。是的我有病。)
由於這些技術內容並沒有人幫我校稿,內容難免有誤。在此要謝謝社群朋友們對「非同步程式碼之霧」一文的提醒、糾正以及幫忙測試,本篇同樣還是得請大家多多幫忙,讓我們能夠一起將文章的內容做到最正確。
(原本有計畫寫一系列如何撰寫 module 的文章,但我發現那類型的文章太多了,好像不差我那幾篇 XDD~ 所以只寫了第一篇我就決定先暫停,我真的不是故意找藉口不寫哦!好啦!如果真的有人想看,再請告訴我囉~)
二、為什麼要談 Stream
與 Unix 有著同樣的信仰,Node.js 同樣信奉「Small Modules」以及「Do One Thing and Do It Well」的哲學:
Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface. - D. McIlroy
在類 Unix 的系統,有個東西叫 pipe (在命令行的符號是 | ),讓我們看看 wiki 怎麼說 (我不會說的比較好),這裡隨便寫一個範例吧!我們把對根目錄 / 給 ls 出來的結果,當成 sort 工具的輸入,做完反序後的輸出,當成 grep 的輸入,然後找出含有 e 字母的字串:
~$ ls / | sort -r | grep e media homeetc dev
總之,因為 Stream 是一種標準介面,我們可以使用 pipe 輕易地將各種小工作給串聯起來,組合成更大的功能(或工作)。
Node.js 也 in-program 實現了這樣的哲學,名字正是 Stream,它不單單只是文字流(或數據流),也支援物件流。同時,它一樣是一種「Universal Interface」。在 Node 中,你可以寫出類似這樣的程式碼:
A.pipe(B).pipe(C).pipe(D);
例如 Node 官網上的檔案壓縮範例:
const gzip = zlib.createGzip(); const fs = require('fs'); const inp = fs.createReadStream('input.txt'); const out = fs.createWriteStream('input.txt.gz'); inp.pipe(gzip).pipe(out);
由於小模組的理念,再加上 Node.js 核心以及大量第三方函式庫皆以 Stream 實作,又或者提供有 Stream 的介面,以及 Node.js 在 Networking 的強項更是與 Stream 密不可分。這使得 Stream 幾乎是公認 Node.js 開發者必不可少的基礎知識之一。這正是為什麼我想要寫一篇文章來聊一下 Stream,而更重要的是巨人 Dominic Tarr 說的一句話:
Streams are node's best and most misunderstood idea, ... - D. Tarr, EventStream
驅使著我想要好好搞懂 Node Streams。我將自己的學習過程以及理解整理起來,希望能為再為台灣 Node.js 開發者們帶來一篇值得一讀的文章。
三、console.log
每天苦惱的 Node 工程師總是會在程式裡寫上這麼一兩句:
console.log('Fuxking explode here, go die fuxk face'); // 幹嘛跟電腦生氣呢!奇怪ㄋㄟ
我們到 /lib/console.js 看一下,疑?在 new 的時候傳了 process.stdout 跟 process.stderr 給它
module.exports = new Console(process.stdout, process.stderr);
function Console(stdout, stderr) {
// ...
prop.value = stdout;
Object.defineProperty(this, '_stdout', prop);
prop.value = stderr;
Object.defineProperty(this, '_stderr', prop);
// ...
}
當你呼叫 console.log 時,其實只是使用了 process.stdout.write() 啊!! 不過 console.log() 比較好心,會幫你換行 XDDD (最後有個 '\n')
Console.prototype.log = function() {
this._stdout.write(util.format.apply(null, arguments) + '\n');
};
那麼 process.stdout 跟 process.stderr 又在哪裡?這個我就不要列太多程式碼,你可以在 /src/node.js 的核心初始化程式碼的 stratup() => startup.processStdio() 找到它們。比較重要的就以下這 4 行:
// ...
stdout = createWritableStdioStream(1);
// ...
stderr = createWritableStdioStream(2);
// ...
var fd = 0;
stdin = new tty.ReadStream(fd, ...);
// ...
你可以知道 stdin, stdout, 與 stderr 即對應到作業系統的標準輸入串流 (fd =0, /dev/stdin)、標準輸出串流 (fd=1, /dev/stdout) 以及標準錯誤輸出串流 (fd=2, /dev/stderr)。
會以 console.log() 出發的用意是想告訴你,其實你一天到晚都在用 Stream,所以不要怕他啊!會使用 console.log() debug 的人一定是全天下使用 Stream 最多的人!哈哈哈... (我本人是也~) 所以不要再說你不會用 Stream 啦~(裝死...)
四、Stream 的結構
Stream 除了是資料的來源 (source) 或終點 (sink) 之外,它還是一種「資料流控制」的單元,由於牽涉到資料的控制、存取、流動與暫存,Stream 的每個實例內部都有維護有自己的狀態,每個 Stream 實例都是一個狀態機,這些狀態用於指明 Stream 目前是處在 Flowing 還是 Paused 模式等,Stream 於不同的模式有不同的行為。
由於它有點複雜,所以接下來我所引用的程式碼,只會剪出比較重要的片段,有達到我想表達的意思就好。Steam 的實作落在 Node 原始碼 /lib 目錄底下的
- _stream_readable.js
- _stream_writable.js
- _stream_transform.js
- _stream_duplex.js
- stream.js (以上四個都繼承了這個 Base Class)
我們從 stream.js 開始看起 (以 4.5.0 原始碼為例)。
[ stream.js ]
Stream 這個 Class 以靜態屬性 export 出了 Readable、Writable、Duplex、Transform 以及 PassThrough 這幾個 Classes。
module.exports = Stream;
const EE = require('events'); // <--- EventEmitter
const util = require('util');
util.inherits(Stream, EE);
Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
function Stream() {
EE.call(this);
}
然後你應該也發現到了,Stream 繼承了 EventEmitter!所以它可以發射事件、你也可以監聽它身上的事件。很重要,再複誦一遍:
Stream 是 EventEmitter
[ _stream_readable.js ]
Readable() 是一個 Constructor,它的每個實例都擁有自己的狀態 this._readableState,另外,還有一支必須由 ReadableStream 實作者所提供的 template method this._read()。
function Readable(options) {
// ...
this._readableState = new ReadableState(options, this); // <-- 狀態物件
// ...
if (options && typeof options.read === 'function')
this._read = options.read; // <-- template method, 由 Stream 實作者提供
Stream.call(this);
}
我們來看一下狀態物件的建構子,它裡面的狀態非常的多,但是我這裡就剪出它最重要的一個:內部緩衝區
function ReadableState(options, stream) {
// ...
this.buffer = []; // <-- 緩衝區僅僅是一個陣列
// ...
}
嘿!注意到了嗎?內部緩衝區只是一個陣列,剛初始化時它空空的。由於 Stream 有一個稱為 highWaterMark (HWM) 的屬性,用來限制緩衝區最大可容納的 bytes 數,它的預設值是 16 kB,最大值不可超過 8 MB。這邊我們引出了第二個重點,我們再複誦一遍:
Stream 是 EventEmitter 與 Buffer 的組合
這我們後面看到「圖」的時候,會更有感覺!
[ _stream_xxx.js ]
雖然 Readable、Writable、Duplex、Transform 有不同的實作內容,但是關於 Stream 是 EventEmitter 與 Buffer 的組合,我們上面已經看過一個 Readable,其它的也具有相同性質,所以程式碼我想就不用再貼上來了啦。
五、Stream 的大意
Stream 的實作是單純的 JavaScript,它繼承自 EventEmitter,而且內部擁有一個緩衝區。緩衝區直接的目的是為了做流速控制,間接的好處是能夠使你的程式維持較精省的 memory footprint。
現在想像一個場景,你有一個數據來源,也有一個數據消耗者。若數據來源與消耗者都是 Stream 的實作,我們可以這樣說:
一個 ReadableStream 是數據產生者 (source)
一個 WritableStream 是數據消耗者 (sink)
流速控制 (Backpressure 與 Throttling)
當 sink 消耗速度慢,但 source 的產生速度快,這時候該怎麼辦?
如果是由我們自己來蠻幹,首先想到的辦法,大概也就是先將「速度快」的來源數據先「暫存(緩衝)」起來,然後再慢慢地消耗掉。(嘿!Stream 已經有緩衝區了呀!)
那麼,我怎麼會知道「消耗者」的速度比較慢?沒問題!一個 WritableStream 具有 Backpressure (背壓) 機制,當它的消耗速度跟不上輸入時,它會發事件通知你(在內部緩衝區溢出時),此時你有責任先暫停輸入數據。我們後面會把 Backpressure 講清楚。
另外一種情況,如果我想要將數據來源的速度調慢,調成我想要的速度呢?當成也可以!這樣的變速 (節流, throttling) 機制,也是靠一個緩衝區先將數據收集下來,再以我們想要的速度來輸出即可 (例如從 100 kB/s 降成 50 kB/s 的資料率)。當然,實作者有時候還得搭配 Backpressure 機制來做流量控制。(那能不能調快速度?當然可以,不過這取決於你到底想要什麼樣的數據。調快調慢,都可以靠 Transform 來完成。)
節省 Memory Footprint
這裡給個大家都很喜歡使用的範例,一看你就懂什麼意思了。假設我現在有一個「史詩級的大謎片檔案 xxx.avi」,它的大小有 2 GB (高畫質的喲!!)。我想要寫個 http server 來分享給我的朋友,以顯示我與眾不同的宅:
~$ ll xxx.avi -rw-rw-r-- 1 simen simen 2097152000 Oct 14 11:13 xxx.avi
什麼??!!你沒有謎片可以測試 (睜眼說瞎話就對了)... 那用 dd 產生一個:
~$ dd if=/dev/zero of=xxx.avi bs=2097152000 count=1
[使用 fs.readFile() 來實作 ]
下面我們的祕密伺服器 server.js,它使用 fs.readFile() 來讀檔,當有 request 進來的時候,我們把檔案送過去給瀏覽器。我們順便檢測一下錯誤,若有錯誤就把它印出來:
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile('xxx.avi', function (err, data) { // <-- 讀檔
if (err) {
console.log(err);
res.writeHead(500);
res.end(err.message);
} else {
res.writeHead(200, { 'Content-Type': 'video/avi' });
res.end(data);
}
});
});
server.listen(3000, function () {
console.log('Secret server is up');
});
執行 server.js 並開啟瀏覽器到 http://locahost:300 來下載檔案。結果令人失望,它爆炸了:
註:在你的環境測試這個例子可能不會爆炸,但瀏覽器在準備收檔前等待的時間會卡的有點久,檔案越大,會卡越久。又或者你得到的錯誤可能跟我不同,例如你的物理記憶體根本不夠塞不下這麼大的檔案之類的,也會引發 create buffer 的錯誤。
為了讓這個錯誤出現,我有刻意退回較舊版本的 Node 來執行,原因是舊版本的 kMaxLength 是直接寫死為 0x3ffffff (1023 MB),新版本的 Node 會依據系統的整數指標長度來切換不同的 kMaxLength。你可以在 /src/node_buffer.h 看到 kMaxLength 這個參數:
static const unsigned int kMaxLength =
sizeof(int32_t) == sizeof(intptr_t) ? 0x3fffffff : 0x7fffffff;
會不會爆炸反映在 /lib/fs.js:
if (size > kMaxLength) {
err = new RangeError('File size is greater than possible Buffer: ' +
`0x${kMaxLength.toString(16)} bytes`);
return context.close(err);
}
[ 改用 Stream 來實作 ]
var server = http.createServer(function (req, res) {
res.writeHead(200, { 'Content-Type': 'video/avi' });
fs.createReadStream('xxx.avi').pipe(res) // <-- 將數據來源變成 ReadableStream
.on('finish', function () {
console.log('Sending done.');
});
});
上面的程式碼看不到懂沒關係,等到這篇文章讀完,你一定能瞭若指掌!總之,我們再次執行看看,然後用瀏覽器下載。看!成功了!
上面兩種作法的差異
使用 fs.readFile(pathToFile, function (err, data) {}) 這支 API,底層會將檔案內容先讀進系統緩衝之中,然後在 callback 透過 data 一次給你一大包,就像下面這張圖的感覺。
這樣的 API 也稱為 Bulk I/O,Bulk 是一大坨的意思。這意味著,當你使用這樣的 API 存取 I/O 時,你的 memory footprint 將有那麼一時半刻會衝到很高。如果剛剛好你的系統記憶體負載很繁重,那就只好眼睜睜看著它爆炸了 XDDD....
那如果我們使用 fs 的 Stream 工廠方法 createReadStream(),將數據來源(檔案) 變成是一個 ReadableStream 呢?現在的感覺會像下面這張圖,數據將會是一個 chunk 一個 chunk 地傳送給消耗者,原本一大坨的數據將被切成一小塊一小塊來傳送,這每一小塊的最大容量則是由 Stream 內部緩衝區的最大容量所決定 (預設為 16 kB)。
又或者說,Stream 可以是動態的,數據來就送、來就送,不必等所有數據都到達,然後才一股腦地丟出一大包。因此,在使用 Stream 的情況下,就不會有 memory footprint 瞬間飆很高的情況發生。如此一來,網路中 client/server 之間的互動反應就能更加即時 (只要一收到 request,第一個 chunk 就能透過 response 馬上飛奔出去)。
相信現在您對「Stream 的大意與好處」應該已經稍稍有概念了,接下來,我們就要進入更深一層的解析了。不過別擔心,更深一層也沒多深啦... XDDD (不要嗆我啊!!)
六、Stream1、Stream2、Stream3 與相關函式庫
在開始偷窺 Stream 的內褲 (內部啦!) 之前,先很簡單摘要一下 Stream 的版本變化。這裡大家先很約略看一就好,等稍後從內褲鑽出來之後,再回來看這裡,你就會發現有點臭臭的~ (靠夭啊....)
Node.js 的 Stream 自第一版的 Stream1 (Node v0.8) 演化至今,已經歷經 Stream2 (> v0.10) 而來到 Stream3 (> v0.12)。這裡我先很簡單地作一下摘要:
[ Stream1 ]
- 使用 Push Model,即有資料來,Stream 自動發射 'data' 事件伴隨 data chunk 以將資料推出去,這稱為 Flowing Mode (流動模式)。
[ Stream2 ]
- 預設為 Pull Model,即有資料來時,Stream 將資料累積於內部緩衝區中,並同時發射一個 'readable' 事件來通知 Stream 的使用者,將資料取走 (使用 read() 方法)。若使用者沒有取走,資料將繼續累積於內部緩衝區。
- Stream2 的預設模式也稱為 Paused Mode 或 Non-Flowing Mode (非流動模式)。
- 當 Stream 不再自動將資料推出來、使用者可以自己決定要不要拉出資料,意味著使用者控制「資料流動」的彈性增加了。
- 使用者仍可以將 Stream 轉回 Flowing Mode 來使用,只要掛上 'data' 事件的監聽器或呼叫 resume() 即可。
- Flowing/Non-flowing 模式只能擇一使用,即你的程式碼中不可以同時有 'data' 的監聽器以及 'readable' 的監聽器。若兩者兼有之,程式不會當掉,但是你可能會得到非預期的 Stream 行為。
[ Stream3 ]
- 混合模式(Mixed mode),推拉模型混在一起,程式碼中可以同時有 'data' 的監聽器以及 'readable' 的監聽器,行為容我後面再說明。
- 一般還是建議維持 Flowing / Non-Flowing 模式擇一使用的習慣,通常混著用的機會可能也不大 (或許是有很棒的使用時機?這要請高手指點一下~)。
- Stream3 開始,幾乎都是 performance 調整與 bug 的修正,Stream 的介面與行為看起來目前是穩定下來了。
[ 相關函式庫 ]
- readable-stream:Node Stream2 與 Stream3 實作的獨立抽離版本,有了它,你就能在瀏覽器端擁有 Stream 啦!
- mississippi:Stream 的輔助工具集合,可協助你快速實作 ReadableStream、WritableStream 等等(主要是擺脫一些 boilerplate code),當然你也能直接使用 from2、through2、duplexer2 這些工具來協助你
- 如果要做 Stream 的合併、串連等工作,可以使用 mutilpipe 或 pumpify 等工具
- 以上介紹的都是使用量破表的函式庫,另外還有一個我覺得超威的,就是 highland.js,它的夢想是將一切給 Stream 化,夠瘋狂吧!
接下來,我們終於可以開始來看 Stream 內部的運作機制了!因為數據就是從來源流向消耗者,所以一開始我們會花點篇幅在 ReadableStream 上面,接著 WritableStream 就快了,然後 Transform 跟 Duplex 就只是短短幾句話的事情而已!
我們會先從「內部原理與 Stream 實作者」的角度切入,先把各類 Streams 跑過一遍。隨後再以「Stream 使用者」的角度來看 Stream,這也是大部分開發者的角度,大多時候我們都是 Stream 的使用者!(礙於篇幅,我們不談如何實作自己的 Stream,但是你了解原理之後,會發現要利用 Stream 的基礎建設來設計自己的 Stream 就不會太難了。)
七、Readable Stream
我們先從 Readable Stream 的 Flowing Mode 看起 (掛上 'data' 事件監聽器來接收數據),我相信這也是很多人喜歡的方式。然後,我們再看 Non-flowing Mode (掛上 'readable' 監聽器來獲取數據到來的通知)。
Readable Stream:Flowing Mode
下面這張圖,藍色布景內的東西是 Stream 內部示意圖,它設有緩衝區水桶一個;鐵灰色的部分是 Stream 要求「Stream 實作者」應該要提供實作的部分。而藍色布景的外面,事件 'error' 以及 方法 pause() 這種東西,它們是 Readable Stream 的標準介面。之後我們遇到的圖,都是以這樣的方式來佈局。
接下來,依照圖中標示的 (1), (2), ... (6) 一個個看起,看看狀態處在 Flowing Mode 的 ReadStream 是如何工作的:
- Resource 是數據來源,例如一支檔案、一個 socket、或某種數據產生源。這包含在「實作者提供的實作」中,Stream Class 是基礎建設,數據源得由實作者決定呀!
- _read() 這支 template method 由 ReadableStream 實作者提供,實作內容正是鐵灰色區塊的部分。
- 當實作者從 Resource 拿到一些數據後,他必須使用 this.push(chunk) 將資料塊推入內部緩衝區。注意圖中有兩處 (3),上者表示數據推入緩衝區,下則對應到 _read() 的實作在某處一定會呼叫 this.push()。當水桶滿了 (達到 highWaterMark),呼叫 push() 將傳回 false,讓實作者知道水桶已經滿了,此時實作者必須暫時停止再從 Resource 讀資料出來。此機制稱為 Backpressure,不過這裡是實作者要去照顧的事情。我們在後面以「Stream 使用者」角度切入時,我們使用者需要照顧的是 WritableStream 的 Backpressure。
- 在 Flowing Mode,如果 ReadableStream 沒有被 pause() 住,那麼由 (3) push 進來的 chunk 就會馬上從白色 (4) 的水管噴出去,不會累積在緩衝區,因此稱為流動模式。噴出去的東西預設是 binaries (型別為 Buffer),如果有設定 encoding,也可以直接噴出字串。黑色 (4) 是指 ReadableStream 的使用者可以呼叫 resume() 或 pause() 來開關資料流動,水管不是 ON 就是 OFF。當 pause() 後,由 (3) 所推入的資料塊,就會在緩衝器中累積起來。由於 Stream2 開始,Stream 的預設模式就是 paused mode,因此我們會常常看到一些程式碼在一開頭都會先呼叫 readable.resume()。在 Stream2/3,其實你只要在 ReadableStream 掛上 'data' 事件監聽器,Stream 就會以 Flowing Mode 操作了,除非你有刻意呼叫 pause() 關閉水管。
- 當每次 push 資料進水桶時,Stream 都會檢查目前水位是否已經達到 highWaterMark (HWM),如果已滿,呼叫 push() 將傳回 false。highWaterMark 是可以自己設定的, 預設是 16 kB,最高可以設到 8 MB。注意,push() 方法是給「實作者」用的 API,而不是給「使用者」用的。
- 在 Flowing Mode「使用者」監聽 'data' 事件就可以拿到從 source 流過來的資料。
- 'error' 跟 'close' 是所有類型 Streams 都有的標準事件,'error' 通知錯誤,而 'close' 通知底層 Resource 的關閉,並不是所有的 ReadableStream 都有 'close' 事件,因為數據來源不一定總是 I/O 之類的資源。'end' 是 ReadableStream 的專屬事件,它用來通知「使用者」本次的這一大坨數據已經傳輸完畢 (一大坨是被拆成一小塊一小塊,所以全部傳完要通知一下啊!)。對於「實作者」而言,必須在最後一個 chunk 傳完之後,呼叫 this.push(null),這會讓 ReadableStream 知道應該引發 'end' 事件了!
Readable Stream:Non-flowing Mode
接下來我們來看 Non-flowing mode (paused mode),這是 ReadableStream 在 Stream2/Stream3 的預設模式。現在解釋起來就簡單了,我們一樣按照圖中僅有的 (1)、(2)、(3) 來說明:
- 因為預設是 Non-flowing 的,所以 this.push() 進來的 chunk 都會被累積在緩衝區。
- 每當 this.push() 被實作者呼叫時,ReadableStream 就會引發 'readable' 事件但是不挾帶數據,目的是通知「使用者」,快來哦!有資料來了哦!當然,「使用者」必須監聽這個事件,才收的到通知嘛~
- 當收到通知後,使用者在它的 'readable' 監聽器中,呼叫 readable.read([size]) 來把資料挖走。注意到了嗎?在挖資料的時候,可以用 size 參數來指定一次要挖「多少」, read() 這支 API 就如可調鬆緊的水龍頭,可以控制每次想取回的數據量啊!超讚!(封包剖析就很需要這種功能)。平常的使用大多不會指定 size,而是盡可能地一直呼叫 read() 把緩衝區現有的數據都給挖回來,反正遇到 read() 傳回 null 時,就知道緩衝區空了、挖光了。這我們在以「使用者角度」切入時,再來看!如果外面沒有人呼叫 read() 說要吸,那麼數據就會在緩衝區內累積下去。
關於 Stream3 的行為
- 此時此刻,我覺得我們只要看 Stream3 就可以了。 (Node 都已經到 v7.0 了啊!!!)
- 使用者可以同時掛上 'data' 與 'readable' 監聽器。
- 只要掛上 'data' 監聽器,ReadableStream 就會進入 Flowing mode,此時就算呼叫 pause() 把水管關閉,並不會使其回到 Non-flowing mode。
- 在 Flowing mode,不只 'data' 事件,同時 'readable' 事件一樣會發射,但呼叫 read() 會挖不到東西,因為數據已經伴隨 'data' 事件拋出去了。
- 在 Non-flowing mode,每一次 push(chunk) 都會引發一次 'readable' 事件,但是如果 ReadableStream 內部 source 產生的速度很快 (推入緩衝區的速度也很快),此時外部在聽到較早發生的 'readable' 事件時,用 read() 就會撈到「現在這當下」緩衝區內擁有的東西,然後你又撈光它的話,這會造成聽到較晚的 'readable' 事件時,使用 read() 反而挖不出東西來。這是正常的行為,不要以為有問題啊!因為你還是一樣拿到完整的數據了啊,對吧?
- 如果你想使用 Non-flowing mode,但又掛了 'data' 監聽器,你必須在 ReadableStream 造出來後立即顯式地呼叫 pause() 讓它進入非流動模式,之後當你呼叫 read() 時,'data' 事件會順便被引發,然後你在 'data' 監聽器會得到跟 read() 一樣的數據,也就是說你在 'readable' 與 'data' 兩個監聽器內都有辦法拿到同一份數據,YA~~
- 關於 Stream3 更細緻的行為,請參考官方手冊最準確!但我認為有上述的認識,就差不多了!
呼~ Readable Stream 說了很長,但這是必經之途呀!接下來,Writable Stream 就好講了。請再忍忍!
八、Writable Stream
一樣,看圖說故事!我們有 (1) ~ (6) 共六個號碼:
- Resource 是數據終點,例如一支檔案,一個 socket、或某種數據收取裝置,到底如何往 Resource 寫數據是實作者的事情。
- _write() 這支 template method 由這個 WritableStream 的實作者提供,這支方法主要是要告訴 Stream 如何將數據寫入 Resource。這裡範例的程式碼是用 fs.writeFile() 將數據寫進一個檔案。
- 「使用者」可以呼叫 write(chunk) 或 end(chunk) 往 WritableStream 寫入數據,end() 可在結束時順便寫入最後一筆 chunk。接著看 (4)
- 內部會先判斷緩衝區是否為空,如果是空的,那就直接往 Resource 寫去即可。
- 如果緩衝區不為空,那就進去跟大家一起排隊吧 (lastBufferRequest 是紀錄物件)!WritableStream 內部會在往 Resource 寫東西之後,回頭去檢查緩衝區內是否還有數據,試圖清空緩衝區 (clearBuffer()),直到消耗完畢。
- 當外部使用者呼叫 end() 之後,WritableStream 在來來回回從緩衝區拿出數據往 Resource 寫入,直到緩衝區空了,它知道你已經告訴它 end() 啦,所以在最後一次往 Resource 寫完資料之後,將引發 'finish' 事件通知你,終於寫完啦!爽!哈哈哈!那麼 'drain' 事件又是怎麼回事?這時就可以來聊聊 Backpressure 了!
Backpressure (背壓)
Backpressure 機制作圖如下,當外部呼叫 write(chunk) 或 end(chunk) 往 WritableStream 寫入數據,若內部緩衝區已經滿了則傳回 false,這個時候「你這個使用者」就必須暫停再往它身上寫東西,因為進不去了,不要硬塞!那什麼時候可以再寫東西進去?當 WritableStream 把它的緩衝區消耗完畢後,會引發 'drain' 通知使用者,來吧!Come On Baby!write() 回傳的 false 以及 'drain' 事件通知,它們一起構成了 Backpressure 的機制!
這裡我們要注意的是:
照顧 Writable Stream 的 Backpressure 是使用者的責任
蛤?~~責任?那是甚麼?這讓我想李敖大師說施明德先生的三不主義:「不主動、不拒絕、不負責」 XDDD....
接下來我們來看一下 pipe(),它除了可以幫你把一個 ReadableStream 的輸出水管給接到 WritableStream 的輸入之外,還會幫你扛起處理 Backpressure 的責任哦!
pipe
pipe() 的用途就是將一個 Readable Stream (source) 接給 Writable Stream、Transform Stream 或 DuplexStream 等 sink。pipe() 會再傳出一個 Readable Stream,以致於 pipe() 能夠一直串連下去,就好像我們在文章一開頭看到的 UNIX 命令範例:
$ ls / | sort -r | grep e
一個純然的 Writable Stream 因為沒有 Readable 介面,所以 pipe() 接到純 Writable Stream 就是終點了,無法再 pipe() 下去。
現在,我們來看一下 /lib/stream.js 中,pipe() 方法的實作。請注意,底下的程式碼我有將原始碼做了一點調整,為了是方便說明它的理念,原始碼並非如此。當我們使用 A.pipe(B) 時,A 是 source,B 是 destination (sink)。就看以下這三點:
- pipe() 幫你監聽了 source 的 'data' 事件,然後將收下來的 chunk 用 dest.write() 寫到 dest 去!同時它若檢測到 backpressure,會使用 source.pause() 自動將 source 暫停下來!
- 它也幫你監聽了 dest 的 'drain' 事件,當 dest 的緩衝區消耗完畢,它又幫你恢復 source 的資料流動!它幫你自動照顧 backpressure,根本超佛.... XDDD (pipe 裡面還有幫你照顧 'end' 跟 'close' 事件,我就不列啦!)
- pipe() 最後傳出 destination,若 destination 是純 Writable Stream,那就無法再串 pipe 下去。因為自 Stream2 起,已經嚴格限制必須要是 Readable 才能 pipe 給 Writable Stream。
Stream.prototype.pipe = function(dest, options) {
var source = this;
source.on('data', function (chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
});
dest.on('drain', function () {
if (source.readable && source.resume) {
source.resume();
}
});
dest.emit('pipe', source);
// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
};
如果以撰寫應用程式的觀點來看,原本像以下的寫法:
var fs = require('fs');
var sourceStream = fs.createReadStream('source.txt');
var sinkStream = fs.createWriteStream('sink.txt');
sourceStream.on('data', function (chunk) {
sinkStream.write(chunk);
}).on('end', function () {
sinkStream.end();
});
使用 pipe() 來寫,將變成這樣:
var fs = require('fs'); fs.createReadStream('source.txt').pipe(fs.createWriteStream('sink.txt'));
真的超酷的啊!不覺得嗎?還有一點,我就把它總結成下面這句話吧:
pipe() 幫你串連數據來源與消耗者,同時會自動控制它們之間的 Backpressure,因此,整條 Streams Pipeline 的速度將受限於其中最低速的那一個
好的!有了以上的概念,接下來我們再看 Transform 與 Duplex,真的超簡單!隨便講個幾句你就能理解了!
九、Transform Stream
Transform 擁有 Writable (圖左側) 與 Readable (圖右側) 的標準介面,輸入與輸出彼此間的數據存在有轉換關係,且轉換前後的數據長度可以不一樣沒關係。例如壓縮,轉換後的長度變短;又如解壓縮,轉換後的長度變長。
TransformStream 的實作者必須提供 _transform() 與 _flush() 的內容,_transform() 告訴 TransformStream 如何轉換來源數據(例如壓縮),_transform() 的函式簽署與 _write() 一樣,只不過實作者在內部會呼叫 this.push() 將轉換後的數據推入內部緩衝區,這反而像 _read() 對吧!這是因為轉換後,將形成一個新的數據源,而非寫入某個 Resource。
_flush() 則是用於讓 TransformStream 在 Readable side 要發射 'end' 之前,有最後一次機會將一些殘餘數據推入緩衝區 (這點對於「使用者」其實不用太在意,若你是實作者,當你試著實作一個 Transform Stream 碰到時,就知道這什麼意思了)。
因為遵循著 Readable 與 Writable 的標準介面,所以,就如同使用 Readable Stream 與 Writable Stream 般地大膽地使用它吧!
Transform 完!
十、Duplex Stream
Duplex 擁有 Writable (圖左側) 與 Readable (圖右側) 的標準介面,輸入與輸出彼此間的數據可以毫無關係,例如一個 socket,寫出跟讀入可以完全沒關係,你可以將它的內部視為,由獨立的 Readable Stream 與 Writable Stream 所組合而成,各自使用的 Resource 可以是同一個、也可以不一樣。
因為遵循著 Readable 與 Writable 的標準介面,所以,就如同使用 Readable Stream 與 Writable Stream 般地大膽地使用它吧!
這就是 Duplex,打完收工!
十一、以使用者角度看 Stream
當你已經理解各種 Streams 的運作原理,站在使用者的角度,你會發現這根本沒什麼了啊!不過我們還是把它給總結一下:
使用 Readable Stream
介面事件:'error', 'close', 'end', 'data' (Flowing mode), 'readable' (Non-flowing mode)
介面方法:resume(), pause(), read()
[ 流動模式 ] 聽 'data' 事件,由 handler 收取 chunk
[ 非流動模式] 聽 'readable' 事件,在 handler 中使用 read() 方法收取 chunk,讀到 null 即內部緩衝區已空。平常用法就如綠色區塊內的範例,會使用 while() 直到把緩衝區掏光為止。
使用 Writable Stream
介面事件:'error', 'close', 'finish', 'drain'
介面方法:write(), end()
[使用模式]
檢查 write() 回傳值,backpressure 發生時要暫停寫入。監聽 'drain' 事件以繼續寫入數據。呼叫 end() 告訴 WritableStream 這一坨數據的最後一個 chunk 已經來了,接著當 'finish' 發生時,代表 WritableStream 已經將所有數據都往 Resource 寫入完畢。圖中綠色區塊範例的 fooWrite() 是經過包裝的 writer,它帶有照顧好 backpressure 的機制,你可以實作自己的處理方式。
使用 Transform、Duplex 與 PassThrough
這個我們就不用再畫圖了!因為遵循著 Readable 與 Writable 的標準介面,所以同樣再唱一次國歌:「就如同使用 Readable Stream 與 Writable Stream 般地大膽地使用它們吧!」
怎麼看都很眼熟?
如果你使用 Node 開發網路程式,你可能會常寫出類似以下的程式碼,你知道多數時候你一直在使用著 Stream 的標準介面嗎?
var server = http.createServer( (req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'}); // <- 註: writeHead 不是標準介面哦
res.write('Hello');
res.end('World!');
});
client.on('data', (data) => {
console.log(data.toString());
client.end();
});
Node 核心的許多模組,例如 fs, net, http, dgram, zlib, cryto 大多使用了 Stream 的實作,或提供有 Stream 的介面,所以你可能在很多地方多多少少都會遇見 Stream 的蹤影。當然,Stream 本身是 EventEmitter,因此你當然可以在它身上發射自己的事件,許多核心模組或第三方模組也規劃有自己特化後的事件、方法或者屬性,以符合模組的目的 (可能語意也會比較清晰,畢竟 Stream 的介面是很通用化的)。
再者,由於 Stream 的介面是 Universal 的,這帶給開發者最大的好處就是:只要搞懂 Stream 的介面,就可以在「不太需要詳閱文件」的情況下,自由運用許多基於 Stream 的工具!(需要了解目的,而不需要花太多時間再了解介面)
現在弄清楚 Stream 的介面了,準備好對你的 http server 進行惡搞了嗎? XDDD....
十二、我們沒有講的
你知道,光是要把 Stream 的運作機制給講清楚就已經要花這麼大篇幅了,因此恕我沒辦法再說更多了!這裡提幾點我們沒談到的,我相信只要您已經理解以上內容,自己去探索剩下的東西也就不那麼困難了。我們沒談到的有:
- 如何設定 encoding (存在內部緩衝區內的東西,預設一定是 Buffer 類型,當你塞字串進去時可設定編碼,預設是當成 utf8 將字串編成 binaries。當數據被挖出來的時候,也能設定編碼類型以噴出字串,而不是噴出 binaries)
- Stream 的 Object Mode (還記得 this.buffer = [] 嗎?它其實存的都是一個個 Buffer 類型的實例,也都是物件。但開啟 Object Mode 後,這個陣列就能塞一般物件啦)
- Writable Stream 的 'pipe' 及 'unpipe' 事件,這個我不知道怎麼講。太直覺簡單反而講不出所以然阿 XDDD
- Writable Stream 的 cork() 與 uncork() 方法,其實就是讓你有機會去堵住 Writable Stream 「內部緩衝區」,把它們當成 Readable Stream 的 pause() 與 resume() 你就瞭了
- Writable Stream 什麼時候會被 'unpipe'?例如有 error 發生時。請參閱官方文件
- 捕捉 pipeline 中的 error 以及如何 combine 與 merge streams 等 (前面有列一些輔助工具,請參閱那些工具的文件)
- 如何利用 Stream Classes 來實作自己的 streams
- Stream 更細緻的行為,也請參閱官方文件
這裡有一些很棒的文章/影片,有空可以看一看:
- The History of Node.js Streams - Dominic Tarr (Node 核心開發者)
- Streams - Isaac Schlueter (npm 創辦人/Node 核心與 Stream 的主要貢獻者)
- Streams - Mattias P. Johansson
- The Basics of Node.js Streams - Sandeep Panda
- Full Streams Ahead - David Guttman
- Harnessing The Awesome Power Of Streams - James Halliday (瘋瘋的 substack)
- Node.js Streams - David Gonzalez 與 Matteo Collina
這邊再次說一下,作文的目的是希望:對跟我一樣熱愛 JS 與 Node.js 的開發者有所啟發,然後繼續寫出更棒的程式碼。本文非常歡迎各界拿去分享、修修改改當教材,但我最在乎的是,希望大家有發現錯誤的話,能告訴我,讓我們一起把它修改得更好、更正確!
這次不打粉絲專頁的廣告了,其實根據經驗,往往沒什麼效果,哈哈哈~
這個小節就當成我們的結語吧!寫了 N 天。終於,完了!呼....
--
- 訂閱李健榮 Simen的專欄,看更多文章。
- 一起學習「EE心法|相量之道」。
內文圖片來源:李健榮 Simen
封面圖片來源:pexels