最近在工作上需要寫一隻會高併發的API。為了不讓DB被灌爆,所以在DB前面再加一層redis當緩存,並且設定TTL過期,因此整個API程式流程大致上變成:
- 從redis取資料 -> 成功取到資料 -> 返回
如果今天redis因為資料TTL過期導致取不到資料,這時候才訪問DB:
- 從redis取資料 -> 沒有取到資料 -> 從DB取資料 -> 成功取到資料,將資料回存redis並設定TTL -> 返回
轉換成程式碼的話看起來也並沒有特別的複雜:
const item = await redisClient.hgetall(itemKey); if(item) { return item; } const sql = 'SELECT * FROM `Product` WHERE `id` = ?'; const [rows, fields] = await dbConnection.query(sql, [itemKey]); if(rows.length > 0) { const [item] = rows; await redisClient .multi() .hmset(itemKey, item) .expire(itemKey, 60) .exec(); return item; } return null;
不過,魔鬼藏在細節裡。Node.js雖然說是使用單執行緒在執行,但因為用到了async/await
的語法,所以實際上當遇到異步函式時,Node.js會將異步函式丟進event loop等有空時才執行,並且當promise被resolve或reject後才從剛剛await
處接續往下。這就讓在高併發的情況下,會發生如下圖中的狀況。
使用者1、使用者2、使用者3、使用者4在極短的時間內同時呼叫了此API,請求取得item1的資訊。由於他們在redis中找不到想要的資料,所以三個人又去訪問DB撈資料,於是DB就處理了三份同樣的請求(node是單執行緒所以補充一下圖中不是parallelism,而是concurrent)。接下來三個人又將重複的內容設定回redis,結果受益的只有使用者4有成功使用到緩存的資源,使用者1、使用者2、使用者3在緩存失效時做了重工,這就是典型的緩存擊穿(cache breakdown)的案例。
從這個情境看來,如果我們可以改善當緩存失效時使用者1、使用者2、使用者3只需其中一個人訪問DB撈資料,其餘的人等待這個人所拿回來的資料的話,就可以大大降低DB做重功的負擔。於是終於要進入本篇的主題 – Mutex(互斥鎖)。
Mutex 互斥鎖
Mutex在程式的執行流程上希望能實現控管一段程式碼區段在同一時刻最多只能被一個人(執行緒)執行,如果critical section(臨界區段)目前已經有人(執行緒)進入了,則必須在進入點前等待。有點難理解嗎,沒關係,我們來看看以下程式碼一步步說明:
mutex.acquire('item1'); // 在item1上獲取互斥鎖 // critical section (臨界區段) // 一次只會有一個人進入執行 ... mutex.release('item1'); // 釋放item1上的互斥鎖
第一行,程式在進入critical section前會先嘗試取得mutex,如果當下沒有成功取得鎖的人(執行緒)會被blocking在這邊等待下一個時機點。如果很幸運地在第一行有成功取得mutex的人(執行緒),則會繼續往下執行進入了critical section。
當critical section執行完畢後,在第7行會呼叫release釋放已取得的mutex。這時原先被blocking在第一行的人(執行緒)們就可以重新嘗試獲取mutex,並且進入critical section。大家可以自行腦補百貨公司週年慶剛開門時,民眾一窩蜂要搶著擠進旋轉門入內,但旋轉門一次只限制一位顧客依序進入的畫面。
註: 由於Node.js是單執行緒搭配event loop切換執行的產物,用執行緒一詞來形容執行者也不太貼切,所以這邊用”人”來稱呼正在操作執行程式的主體。
現在我們知道mutex物件會有兩個函式可以操作,一個是獲取鎖、一個則是釋放鎖,接下來我們就用Node.js內建的Event Emitter來實作mutex吧。首先,新增一個檔案叫Lock.js
:
(因為篇幅的關係,所以完整的程式碼就先附上,然後我們用講解的方式帶過整段程式碼)
const { EventEmitter } = require('events'); const lock = () => { const locked = {}; const eventEmitter = new EventEmitter(); eventEmitter.setMaxListeners(0); return { acquire: (key) => new Promise((resolve) => { if (!locked[key]) { locked[key] = true; resolve(); return; } const tryAcquire = () => { if (!locked[key]) { locked[key] = true; eventEmitter.removeListener(key, tryAcquire); resolve(); } }; eventEmitter.on(key, tryAcquire); }), release: (key) => { delete locked[key]; setImmediate(() => eventEmitter.emit(key)); } }; }; module.exports = lock;
在這個檔案中,總共就只有這個lock建構函式(constructor)供創立mutex物件使用,這個建構函式會回傳acquire(key)
及release(key)
兩個函式給mutex物件的外部操作mutex。我們接著先來說說閉包的私有變數locked
(第4行)跟eventEmitter
(第5行)。
locked
是一個物件,我們拿它來記錄哪些key
正在處於上鎖狀態,這樣操作同一個mutex物件就可以同時對不同的key進行上鎖。eventEmitter
是Node.js內建的事件監聽器。這邊的用途是當沒有取得鎖,在critical section前面等待的人會向eventEmitter
進行註冊監聽,只要有人離開critical section釋放鎖時就會被即時notify再重新嘗試獲取鎖。
acquire(key) 獲取鎖
熟悉了閉包的私有變數後,我們來看acquire(key)
內部的運作原理吧。在進入acquire(key)
後會先判斷locked[ key ]
有沒有東西(第10行),如果是false
沒有東西代表目前沒有人對key
上鎖,那就可以放心地把key
設為上鎖狀態,優雅地resolve()
這個promise並return了(第13行)。
if (!locked[key]) { locked[key] = true; resolve(); return; }
如果今天這個if不成立,代表這個key
已經有人上鎖了,那我們就前往第16行,定義一個叫tryAcquire
的函式。這個tryAcquire
函式在第24行會被註冊進eventEmitter
裡面,當有人離開critical section釋放鎖時,eventEmitter
就會通知監聽的人執行tryAcquire()
。而tryAcquire()
顧名思義就是想要嘗試獲取鎖,所以它做的動作就跟第10~14行一樣,只差在第19行多了從removeListener
,因為當這個人成功取得鎖後就不需要再監聽嘗試獲取鎖了。
const tryAcquire = () => { if (!locked[key]) { locked[key] = true; eventEmitter.removeListener(key, tryAcquire); resolve(); } }; eventEmitter.on(key, tryAcquire);
release(key) 釋放鎖
我們已經實作完獲取鎖的部份了,現在來看看怎麼釋放鎖。
釋放鎖比較簡單只有兩個步驟,第一步先將正在處於上鎖狀態的key從記錄上鎖狀態的物件locked
內移除(第27行),接下來在第28行呼叫eventEmitter.emit(key)
通知其他被拒於critical section門外等待監聽重試獲取鎖的人可以重試獲取了,於是最後終於輪到下一個幸運兒可以進入critical section了。
release: (key) => { delete locked[key]; setImmediate(() => eventEmitter.emit(key)); }
應用在緩存擊穿上
回到最開頭被緩存擊穿的程式碼上我們來做點修改,我們在第1行new一個剛剛寫好的mutex物件,並且在當緩存miss時要去DB取資料前上鎖(第9行)。上鎖後進入critical section的第一件事就是再查一次緩存(11行),因為說不定在成功取到鎖時的前一個人已經將最新的資料寫入緩存了,這樣我們就可以直接使用不用再查一次。
如果在mutex上鎖後第二次查詢緩存還是miss,那就代表你是那第一個要去DB替大家服務取資料的幸運兒了,第13~22行就乖乖去DB取資料並寫回緩存,最後在要return資料前的第27行記得將鎖解開,好放下一個人進來。
const mutex = new lock(); let item = null; item = await redisClient.hgetall(itemKey); if(item) { return item; } await mutex.acquire(itemKey); try { item = await redisClient.hgetall(itemKey); if(!item) { const sql = 'SELECT * FROM `Product` WHERE `id` = ?'; const [rows, fields] = await dbConnection.query(sql, [itemKey]); if(rows.length > 0) { item = rows[0]; await redisClient .multi() .hmset(itemKey, item) .expire(itemKey, 60) .exec(); } } } catch (error) { console.error(error); } finally { await mutex.release(itemKey); } return item;
經過這樣子的修改,我們的程式就不會再發生使用者1、使用者2、使用者3因為同時在redis找不到資料所以對DB執行了三次一模一樣的查詢做重功了。
本篇只適用於啟單個app server使用。若同時有n個app server在redis找不到資料,也會發生對DB執行了n次一模一樣的查詢,這時就需要在redis實做分散式(Redlock)。
item = await redisClient.hgetall(itemKey);
获取两次 item ,似乎显得很呆,有改进空间吗?