Batch Processing (3) – MapReduce Reduce-Side Joins and Grouping

Reduce-Side Joins and Grouping

當 MapReuce Job 執行時,它會讀取所有的輸入資料,相較於資料庫來說等於 全表掃描 (full table scan),在資料庫中,如果你查詢的資料量不大,資料庫通常會使用 index 快速定位至你的資料所在 (Log structured and Hash Index),如果查詢涉及 join,資料庫可能會查找多個 index,然而 MapReduce 中沒有 index 這種概念的東西 – 至少不是一般認知中的 index,所以若你想在 MapReduce Job 中過濾出少量的資料,其效能會遠低於使用 index 查找。

在分析查詢的場景下,大部份是從大量資料中做某種程度的聚合,故全表掃描是合理的,再加上 MapReduce 能並行的在多台機器執行,所以我們在談論批次處理中的 join 時,意味著會讀取所有相關的資料集做事,那我們這就來看看 MapReduce join 的細節吧!

例子:分析 user 活動事件

如下圖 10-2,左邊是 user 的活動事件,但我們想在分析時加入年齡維度,所以需要 join 右邊的 user 資料,找到生日。

Srot-merge joins

回憶一下 mapper 做的事,它會從每一筆 record 中提取 key 和 value 出來,因為要做 join,所以我們會有 2 個 mapper ,2 個 mapper 的 key 都是 user ID,整個 join 流程如下圖 10-3:

mapper 會以 key 為基準,完成資料的分區及排序,所以相同 user ID 的資料會相鄰的當做 reducer 的輸入,除了 key 的排序外,MapReduce Job 可以安排 user database 的資料在前面,接著後面的活動 event 再用 timestamp 做排序,這個技巧也稱為 二次排序 (secondary sort)

感謝二次排序,reducer 在處理時,它一次只需要保留第一筆的 user 生日在記憶體中,爾後直接附加生日年份到 event 中,這個算法也稱之為 sort-merge join:mapper 輸出用 key 排序的資料,然後 reducer 將再合併雙方已排序的資料(做 join)。

Group by

除了 join,另一個常在分析用到的功能就是 group by 了,像昨天分析 log 的例子就是以 URL 做為 key, group by 後計算次數,所以,MapReduce 幫我們在 mapper 階段做好 group by 了。

處理傾斜

看到傾斜 2 字就該警覺一下了,因為 MapReduce 的模式是,把所有相同 key 的資料都帶往同一個地方,若有幾個 key 的資料特別多呢?以社群網路舉例,你我的跟隨者可能才 100 人不到,但名人的跟隨者可能是上百萬,這種不成比例的資料也稱為 hot keys

當有單一 reducer 因為 hot key 的關係,比其他 reducer 收到更大量資料,稱為 傾斜 (skew) 或者 hot spots,MapReduce Job 只有在所有 mapper 和 reducer 都完成時才會完成,所以往後 workflow 的 job 都會因為某個 reducer 傾斜的關係,拖慢速度。

當 join 包含 hot keys 時,這裡介紹 2 個不同工具的解決辦法。

Pig 的 skewed join 方法是,它首先會執行一個簡單的 job 檢測哪個 key hot,當執行到 join 時,mapper 會把 hot key 中的資料以隨機方式送到多個 reducer 中,其他 join 的資料,皆會複製一份到所有分配到 hot key 資料的 reducer 中。

Hive 則是以不同方式優化 skewed join,它需要在 table 的 metadata 中明確指定 hot keys,然後把這些資料分散到不同的檔案中,當執行到 join 時,它使用 map-side join 處理。

Map-Side Joins

昨天講的 join 是在 reducer 端完成,所以也稱為 reduce-side joins,這個 join 方法的優點就是你不用對輸入資料做出任何假設,你只要知道關鍵資料就好了,而缺點就是所有的排序、複製到 reducer 和 reducer 合併所有輸入的操作都是昂貴的,端看你分配的硬體資源有多少,資料可能在 MapReduce 的階段中多次寫入到硬碟裡。

換個方向說,如果你能對輸入資料做出某些假設,它是有可能讓 join 變快速,稱之為 map-side join,這個方法使用縮減的 MapReduce Job,意味者 map-side join 未使用 reducer 和排序,每一個 mapper 就只讀取輸入資料,然後輸出,搞定!

Broadcast hash joins

執行 map-side join 的最簡單方法適用於大量數據集 join 小量數據集,精確一點的說,小量數據集需要小到能在每個 mapper 的記憶體中存一份。

一樣舉昨天圖 10-2 分析 user 活動事件的例子,將 user 的生日 join 到活動事件。

若 User 資料庫小到能存進記憶體中,當 mapper 啟動時,mapper 就能把 User 資料庫存成 hash table,爾後 mapper 開始掃描 user 活動事件時,就能快速從 hash table 查找 user 的生日年份了。

這簡單但有效的算法稱之為 broadcast hash join廣播 (broadcast) 代表每個 mapper 皆有小量數據集,hash 代表使用 hash table;此 join 在 Pig (叫做 replicated join) 和 Hive (叫做 MapJoin) 皆有支援。

Partitioned hash joins

如果 map-site join 所有用到的輸入資料都是用一樣的方法做 分區 (parition) 的話,你就能更進一步的縮小載入記憶體中的數據集大小,一樣用上圖 10-2 的例子,你可以安排所有的活動事件和 user 資料庫皆以 user ID 的個位數數字做分區,所以 mapper 3 就會載入所有 ID 尾數為 3 的資料,然後完成 join。

Partitioned hash joins 在 Hive 也被稱為 bucketed map joins

Map-side merge joins

另一個 map-side join 的變體是,不只所有輸入資料分區策略相同,還是以相同 key 排序過的資料。

在這個狀況下,小量數據集能否存進記憶體就不重要了,因為 mapper 也能執行相同的合併操作(這通常由 reducer 完成),漸進的讀取 join 2 邊的資料,然後完成 join。

若 map-side merge join 可以作業,它的輸入十之八九是由其他 MapReduce Job 的輸出產生,雖然大多情況下會像昨天那樣,由 reduce-side 搞定 join,然而,若該資料集可能會被其他的 join 用到,就很值得先做好分區 & 排好序,然後使用 map-side merge join 完成 join 作業。

Join 小結

在優化 join 策略時,不只是要知道編碼或資料夾命名為何,更重要的是要了解分散式檔案系統裡相關數據集的物理佈局,知道檔案怎麼儲存,用什麼 key 當做分區策略等等。

了解這些過後,才能幫助你選擇一個適合的 join 策略。

tshine73
tshine73
文章: 51

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *