Transmitting Event Streams
最後一個章節是 串流處理 (stream processing),先前講的 批次處理 (batch processing) 之輸入都是有限的,而串流處理要做的就是處理無限的輸入,放棄固定以時間片段執行,選擇當事件發生時執行。
但其實我們可以把時間切成極細的片段(每 1 奈秒),這樣批次處理就會等於串流處理了 XDD。
Messaging Systems
一般來說,串流 (stream) 是指會隨著時間逐漸增加且可用的數據,此概念有被運用在很多地方,如 Unix 系統的 stdin
和 stdout
,程式語言的 lazy list
等等。
事件 (event) 可以是字串、JSON、或二進位數據,端看系統使用的編碼方式為何,而事件代表的意義很廣泛,以串流處理的角度來看,事件是按日歷鐘時間逐漸發生的數據;以 Java 的 FileInputStream 來看,事件就是每一行的 byte;以 Day 23 的 log 分析例子來看,事件就是每一筆 log,它們都是相同的概念:一個小的、自我包含的、不可變的對象。
在串流處理的術語中,一個事件被 生產者 (producer) 產生,然後會有多個 消費者 (consumer) 處理事件,相關的事件通常被組合到同一個 topic 中。
直接訊息傳遞
一般常見方法是使用 訊息系統 (messaging system) 來通知消費者有新事件發生,然而有一些訊息系統不會透過中間層節點,生產者透過網路直接將訊息通知消費者,像無代理訊息系統 ZeroMQ 和 nanomsg 透過 TCP 或 IP multicast 實作 發佈/訂閱 (publish/subscribe) 模型。
訊息代理
另一個被廣泛使用的寄送訊息之選擇就是 訊息代理 (message broker) 啦,生產者和消費者都透過 broker 作業。
當多個消費者想從同一個 topic 讀取訊息時,這裡有 2 種訊息傳遞模式可選擇,如下圖 11-1:
- Load balancing 每一條訊息只會傳遞給其中之一個消費者,所以消費者可以並行分散的處理訊息,適合用在訊息處理是昂貴的場景上。
- Fan-out 每一條訊息都會傳遞給所有消費者,這個就像同時從同一個輸入資料中執行多個批次處理作業一樣,消費者之間不會彼此干擾。
但實務上會組合這 2 個模式,像 Kafka 就能多個消費者群組註冊同一個 topic,每條訊息都會傳遞給這些消費者群組,消費者群組中是並行的接收訊息來處理。
Acknowledgments and redelivery
老樣子,消費者任何時間都有可能故障,有可能發生 訊息代理 (message broker) 傳遞訊息給消費者後,它卻沒有處理或只處理到一半,為了確保訊息不遺失,broker 使用 acknowledgments:當消費者在處理完訊息後,必須明確的通知 broker,如此 broker 就可以將訊息從 queue 中刪除。
消費者因故障消失時,broker 會將未 acknowledgments 的訊息傳遞給其他消費者,此時若你的傳遞策略是 load balance 時,會發生如下圖 11-2 的影響:訊息 m3 和 m4 傳遞的順序跟事件發生順序不同。
Consumer 2 在處理 m3 到一半時故障,同時 Consumer 1 正在處理 m4, 非 ack 的訊息 m3 會再傳遞給 Consumer 1,其消費訊息的順序為 m4, m3, m5,與發生順序不同。
如果訊息彼此間有因果關係,避免此問題的方法就是為每個消費者使用單獨的 queue,例如不要用 load balance 傳遞策略。
Partitioned Logs
如果你是使用 AMQP/JMS 風格的訊息模型,當訊息被 acknowledgments 後會從 broker 中刪除,因為它們是以訊息傳遞的思維來建構,如果你在此系統中註冊新增一個消費者,它無法接收註冊前的訊息,相較於資料庫或檔案系統來說就沒這問題,因此,一個混合有耐用性儲存及低延遲訊息通知系統就誕生了:基於 log 的訊息代理 (log-based message broker)。
使用 log 儲存訊息
我們在之前討論過 Log-structured 儲存引擎 和 數據複製,我們可以用相同的架構實作訊息代理:生產者的訊息會附加在 log 中,消費者透過讀取 log 來接收訊息,當消費者讀完訊息了,就後待新訊息附加至 log 後的通知。這就像 Unix 工具的 tail -f
一樣,當檔案被寫入後你會同步看到內容一樣。
這裡 log 也能使用 partition 來應付更高的吞吐量,不同分區由不同的節點負責,每一個分區都是獨立的讀取和寫入,一個主題可以被定義為一組分區,這種方法如下圖 11-3 所示:
每一個分區,broker 會分配一個逐漸累加的序列號到每一條訊息上,稱為 offset,就是下圖方框內的數字,因為 log 只能附加上去,所以每一個分區的訊息皆是完全排序,但不同分區的順序無法保證。
Apache Kafka、Amazon Kinesis Streams 和 Twitter DistributedLog 皆是基於 log 的訊息代理。
消費者 offset
offset 讓消費訊息的追蹤更簡單了,比較一下各訊息的 offset 跟消費者當前的 offset 大小就好,也因此 broker 就不用在花費力氣追蹤各訊息的 acknowledgments 了,它只要定期記錄消費者的 offset 就好,進而提高訊息代理的吞吐量。
保持同步
資料工程師修煉之路走到現在,真的沒有一個系統能同足滿足資料儲存、查詢和邏輯處理,現實世界的應用程式都是由多個不同的系統組件搭建起來;舉例來說我們會使用 OLTP 資料庫來服務客戶,使用快取系統幫 request 加速,使用文字檢索搜尋引擎處理搜尋需求,最後使用資料倉儲做 OLAP,每一個系統組件都為了特地目的而優化。
當你的應用程式需要因應事件發生,需要同步資料到所有系統組件上時該怎麼做呢?
資料倉儲的同步一般都是使用 ETL,取得資料庫的複製並轉換,也就是 批次處理。
如果預期資料庫的 dump 很慢,我們的另一個選擇就是使用 雙重寫入 (dual writes) :當資料改變時同時寫到所有系統組件中。舉例來說就是當資料庫第一次被寫入時,同時更新搜尋引擎索引、快取和其他系統組件。
然而雙重寫入可能會發生如下圖 11-4 的 競爭條件 (race condition) 寫入:2 個 Client 並發的更新資料庫和索引,因為網路或其他理由的時間差,最終導致 2 邊的值不一致。
除非你在這 2 個系統間額外實做並發寫入檢測機制 (Detecting Concurrent Writes),否則你根本不會知道並發寫入發生。
另一個雙重寫入的問題是可能會有部份寫入成功,也是導致 2 邊的值不一致,想當然爾,面對這種 atomic commit 問題還是有方法可以解,但在多個系統上實做的代價很昂貴 (可參考 Atomic Commit and Two-Phase Commit 的介紹)。
Change Data Capture
變更資料補獲 (change data capture – CDC) 是一個能觀察資料寫入至資料庫並提取它們讓其他系統能複製的過程。
如下圖 11-5,CDC 能補獲所有資料庫的變更,並寫入到一個 log 裡,其他的系統組件擔任消費者的角色,以 相同順序 取得資料,我們稱這些 log 消費者為 衍生數據系統 (derived data systems) 。
實現變更資料補獲
衍生數據系統視為資料的多種不同的面向,CDC 機制能確保所有的變更能如實的反應在衍生數據系統上。
至於實現 CDC 的一個好選擇就是用 基於 log 的訊息代理 (log-based message broker) 啦!它很適合從源頭資料庫傳遞變更並保留訊息的順序。