Batch Processing (2) – MapReduce Job Execution

MapReduce and Distributed Filesystems

MapReduce 有點像 上一篇 講的 Unix 工具,它通常不會修改到輸入檔案,除了輸出以外它沒有副作用 (side effect),最大差別是它能分散到上千台機器執行。

MapReuce 讀取和寫入都在 分散式檔案系統 (distributed filesystem) 上作業,像實作了 MapReduce 的 Hadoop 那樣,Hadoop 的檔案系統稱之為 HDFS (Hadoop Distributed File System),一個重新實現 Google File System (GFS) 的 open source。

HDFS 以 shard-nothing 原則為基礎,此原則不需要特殊的硬體,只要透過網路把傳統數據中心的電腦連起來就好了,在每一台機器上都會執行 守護行程 (daemon process) – DataNode,再找一台 Master 執行 NameNode 就可追蹤所有檔案區塊在哪一台 DataNode 上。

為了容錯,檔案會被切成多個小塊 (file blocks),然後複製到多台節點上,這概念就像 RAID,透過同台機器 冗餘 (redundancy) 的硬碟預防硬碟壞掉,HDFS 的架構如下圖,有興趣的就自己去研究啦,今天主角是 MapReduce。

MapReduce Job Execution

MapReduce 是一個程式框架讓你能寫少少的程式就能在分散式檔案系統上處理很大量的資料,我們延用 上一篇 的分析 nginx log 例子,瞧瞧 MapReduce Job 的資料處理模式為何,它們其實很相似:

  1. 讀取輸入檔案,然後分解成 records,以 log 例子來看,records 等於行(也就是用 \n 當分隔)。
  2. 呼叫 mapper 函式去每一行 record 提取 key 和 value 出來,以 log 例子來看就是 awk '{print $7}' 步驟。
  3. 用 key 排序所有 key-value 組合,以 log 例子來看就是第一次的 sort 命令。
  4. 呼叫 reducer 函式去迭代已排序 key-value 組合,如果同一個 key 出現多次,排序步驟使它們在 list 中相鄰,所以可以很簡單的把它們整合在一起,且不用在記憶體中保存太多狀態,以 log 例子來看就是 uniq -c 步驟。

這 4 步就是 MapReduce 的 Job,第 2 步的 map 和 第 4 步的 reduce 是你需要寫程式的地方,第 1 步由 parser 處理,第 3 步由 MapReduce 處理,因為 mapper 在輸出資料給 reducer 前會做好排序。

昨天的 log 例子還有第 5, 6 步,從 MapReudce 的觀點來看,這會是第 2 次的 MapReduce Job 了。

MapReduce 的分散式執行 (Distributed execution of MapReduce)

跟 Unix 命令主要的差異是 MapReduce 能夠 並行 (paralleize) 的在多台機器上執行,卻不用寫並行相關程式,mapper 和 reducer 一次只處理一筆 record,它們並不在意輸入從哪來,輸出到哪去,所以這個框架能在很多台機器中處理複雜的資料。

下圖 10-1 展示了 Hadoop MapReduce Job 的資料流,它的並行以 分區 (partition) 為基礎,job 的輸入源通常是 HDFS 資料夾,資料夾中的每一個 file block,可視為由多個 map 任務分別處理的分區,map task 標記為 m1, m2, m3

每一個 map 任務的輸入資料通常是幾百 mb,MapReduce 的資源調度器 (scheduler) 會嘗試在有輸入資料存在的機器上執行 Map 任務,此原則也稱為 putting the computation near the data ,省下了透過網路做數據複製的時間,增加本地性。

而 reudce 階段也會做 分區 (partition),為了確保所有 key-value 組合中相同 key 的資料最終能抵達同個 reducer,框架會使用 key 的 hash 值 來決定 key-value 該去哪個 reducer。

Key-value 組合必須要排序完,但資料集若太大就不適合在單台機器上用傳統的排序演算法排序,取而代之的是,排序會分階段進行,首先每一個 map 任務會以 key 的 hash 值做分區,然後每個分區會把資料寫入至已排序檔案中,存到 mapper 機器的本地硬碟上,類似之前講的 SSLTables and LSM-Trees 的方式。

當 mapper 讀完 & 寫完已排序檔案後,MapReduce 資源調度器就會通知 reducer 可以開始去每一台 mapper 複製下載資料,這個以 reducer 分區、排序、複製資料的過程也被稱為 shuffle

接下來 reducer 就開始合併資料,保留資料順序,然後就開始迭代資料,執行你想計算的邏輯啦,最後就看你要不要把輸出資料寫到分散式檔案系統中了。

tshine73
tshine73
文章: 51

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *