肖穎
摘 要: 重分區連接查詢是基于傳統MapReduce框架的最常用的連接查詢算法之一。在討論基于傳統MapReduce框架的標準重分區連接算法及減小數據緩存的改進算法的基礎上,提出了在數據文件分塊階段進行預篩選以精簡MapReduce框架中處理的數據量的方法。該方法能有效減少框架內部各個階段處理的數據總量,進一步壓縮緩存的使用空間并降低不同階段之間數據傳輸的網絡開銷。
關鍵詞: MapReduce; 連接查詢; 重分區連接; 預篩選
中圖分類號:TP312 文獻標志碼:A 文章編號:1006-8228(2016)04-09-03
0 引言
近年來,隨著移動互聯網、電子商務及社交媒體快速發展,網絡的數據信息量呈指數型增長。為了能更快更好地分析處理這些龐大的數據信息,很多企業選擇將數據遷移到價格相對低廉且容錯性能較強的云環境[1]中進行處理。MapReduce框架[2]是云計算最為核心的技術之一。作為海量數據的并行處理平臺,MapReduce編程模型[3]簡單,隱藏了并發、容錯、分布式計算和負載平衡等復雜繁瑣的細節,并具有較高的可擴展性和容錯性,現已廣泛應用于海量數據的分析和處理領域。
但在MapReduce框架中,連接查詢運算仍然過程復雜、工序繁瑣,同時面臨數據傾斜、分布式環境數據傳輸等問題,效率較低。如果能提高MapReduce的連接查詢效率,則可進一步提高數據分析效率和用戶體驗滿意度。
本文就現有的基于傳統MapReduce框架的重分區連接查詢方法進行深入探討和研究,并進一步討論可能的優化策略。
1 傳統MapReduce框架實現機制
傳統MapReduce框架將所有面向海量數據的計算劃分成兩個階段:Map階段和Reduce階段,每個階段可由用戶自行定義其處理函數,且都以(K,V)二元組的形式進行輸入和輸出。但由于大部分Mapper與Reducer并非執行在相同節點上,因此MapReduce框架需要一個介于Map函數和Reduce函數之間的Shuffle過程來實現它們之間的數據整理和傳輸。以下是傳統MapReduce框架的具體工作步驟[4]。
⑴ 準備工作
MapReduce框架將大量輸入數據分割成M個大小固定的塊。
⑵ Map階段
Mapper讀取分配給它的塊信息,并從中分離出各條記錄。
Mapper從每條記錄中抽象出二元組(K1,V1),并傳遞給用戶自定義的Map函數執行生成二元組(K1',V1')。由此塊信息經由Map階段處理得到一個輸出序列{(K1',V1'),(K2',V2'),…,(Kn',Vn')},同時這些數據將被存入緩存。
⑶ MapReduce框架的Shuffle過程
(a) 為了使Reduce函數獲得有序的輸入信息,Shuffle過程負責將Map階段的輸出序列進行排序分組歸并,使得具有相同鍵值K'的數據V'集中在一起,形成(K',list(V')),且list(V')中的值按V'進行排序。因為數據量巨大,所以該階段可能使用外部排序。
(b) 將處理好的(K',list(V'))發送給Reduce函數。
⑷ Reduce階段
執行Reduce函數,生成最終的執行結果(K'',V''),并作為輸出結果寫入文件。
2 重分區連接查詢算法及其優化探討
在數據爆炸的今天,有些大型的互聯網公司每天需要利用高達TB甚至PB級別的日志信息來分析數據,以獲取有利于其發展的統計信息。但其中大部分操作都是對巨型數據表(例如用戶表User和日志表Log)進行連接查詢操作:
SELECT User.Col1, User.Col2, …, User.Coln,
Log.Col1, Log.Col2, …, Log.Colm
FROM User, Log
WHERE User.UserID=Log.UserID AND CUser AND CLog
其中CUser表示僅和表User相關的篩選條件,CLog表示僅和表Log相關的篩選條件,User.Col1,User.Col2,…,User.Coln表示表User中的n個列(表User的列數≥n),Log.Col1,Log.Col2,…,Log.Colm表示表Log中的m個列(表Log的列數≥m)。假設若表User 共有mU行,每行的數據量為lenU字節;表Log共有mL行,每行的數據量為lenL字節,則執行該連接查詢將面臨為(mU×lenU)×(mL×lenL)級別的巨大數據量。
在此我們討論基于傳統MapReduce框架的最常用的連接查詢算法之一——重分區連接查詢算法[5]。該算法類似于并發數據庫管理系統中的分塊歸并排序連接,同時繼承了傳統MapReduce框架的容錯性能和負載均衡性。
2.1 標準重分區連接算法
標準重分區連接操作在一個單獨的MapReduce工作中完成:Map階段進行數據的預處理,Reduce階段進行連接查詢操作。其具體執行步驟如下。
⑴ MapReduce框架將巨型表User和Log分割成M個大小固定的塊。
⑵ 在Map階段,每個Mapper讀取一個塊,繼而提取出該塊中每個記錄的連接鍵值join-key;同時生成含有表標記tag的記錄tagrecord,用以識別該記錄來自于哪一張表。Mapper輸出該塊的(join-key, tagrecord)序列并存入緩存。
⑶ MapReduce框架的Shuffle過程對(join-key, tagrecord)序列進行分組、排序和歸并。相同join-key的記錄被分到一組,并輸出給Reducer。
⑷ 在Reduce階段,每個Reducer首先按tagrecord信息(該記錄來自于表User或Log)將輸入的記錄分為兩組,并分別存入各自的緩存BU和BL中,然后將兩組信息進行笛卡爾積運算,進而實現查詢。
標準重分區連接中存在的問題是:User或Log表中的所有記錄都必須寫入緩存。然而若|User|<<|Log|時,來自于Log中的記錄可能導致內存溢出。
2.2 改進的重分區連接算法
為解決標準重分區連接中可能存在的緩存溢出問題,標準重分區算法可做如下改進。
⑴ 在map函數中,將輸出的二元組序列( join-key, tagrecord )改為(join-key-tag, tagrecord),加入表標識tag保證來自于User的記錄一定排列在Log的記錄之前。
⑵ 在MapReduce框架的shuffle階段自定義分區函數,使得后續所有計算只根據join-key-tag中的join-key部分來進行。
做出改進后,表User中的記錄一定會在Log記錄之前,所以只有User中的記錄需要存入緩存BU,而Log中的記錄則以數據流的形式快速讀出并與相關的User中的記錄進行連接并輸出結果。
改進的重分區連接算法雖然有效地改進了標準算法中的緩存問題,降低了內存溢出的可能,但在mapreduce的shuffle階段仍需對表User和Log進行排序并通過網絡傳輸數據信息,該操作是連接查詢的主要執行開銷,會大幅降低其執行效率。
2.3 改進重分區連接算法的預處理
在重分區連接中,如果表User和Log中的數據信息在進行連接操作之前已經按連接鍵值分區完成,則shuffle階段的開銷就能實現有效降低。該預處理可以通過以下方式實現:表Log在日志記錄生成時根據join-key進行分區,而User表則在將其加載到分布式文件系統中時根據join-key進行預分區。從而在查詢時,User和Log中相互匹配的分區就能直接進行連接查詢。
對比平行關系數據庫管理系統,由于分布式文件系統獨立決定每一個數據塊的存放位置,所以上述方法不能保證表User和Log的相互匹配的分區存放在同一個物理節點中。因此,查詢時必須使用直接連接策略。即每個map任務在Log的一個片段Li上進行。在初始化階段,Mapper從分布式文件系統中取出表User的一個片段Ui,若其尚未進入本地存儲系統則將為其建立內存哈希表HUi;然后map函數掃描Li中的每個記錄并嘗試連接哈希表HUi。由于分區的數量是可選的,因此該方法確保每一個Ui都能裝入內存。
3 精簡連接查詢數據量的預篩選
上述三種重分區連接算法都是從如何減少運算過程中產生的緩存及傳輸的數據量的角度來提高連接查詢效率,但卻忽略了連接查詢本身的計算數據量的精簡,即無論使用上述哪一種算法進行重分區連接查詢,其對應的關系代數都沒發生實質性的優化,而始終為:
換言之,進入MapReduce框架的數據量即最初分塊處理和Mapper都仍然面臨著(mU×lenU)+(mL×lenL)字節的數據量,而Reduce階段的笛卡爾積運算仍將產生具有 (mU×lenU)×(mL×lenL)字節的龐大的中間結果,并需對其進行最終結果篩選。
但根據現實數據的處理情況可知,在MapReduce框架上實現的多個大型表之間的連接運算在大多數情況下仍是等值連接,并且最終從查詢結果中獲取的也只是其中某幾個列的信息。因此,基于MapReduce框架的重分區連接算法還可以通過對大量數據信息進行篩預選處理的方法來降低進行連接查詢的數據量,從而進一步減少緩存的使用空間并有效降低shuffle階段的數據傳輸造成的網絡開銷。
根據關系代數優化的典型啟發式規則,上述關系代數表達式可優化為:
若查詢結果中不包含重復列的信息,則該關系代數能進一步優化為自然連接運算:
其中,為自然連接運算符。
上述優化表達式說明表User和Log在進入MapReduce框架進行連接查詢之前,可以先對大量數據進行數據的預篩選,使與結果無關的數據不參與龐大的連接運算。根據傳統MapReduce框架的工作原理,該篩選操作可以加載在該框架最初的文件分塊階段中。具體操作步驟如下。
⑴ 將表User分塊的同時將其進行一遍掃描:篩選出滿足查詢條件CUser的行的同時,投影出該行中最終查詢結果所需的分量Col1,Col2,…,Coln和連接列分量UserID,構成一個中間結果行,將其存入到分塊中。當一個塊放滿后,將中間結果寫入下一個塊。
⑵ 同理地,將表Log分塊的同時將其進行一遍掃描:篩選出滿足查詢條件CLog的行的同時,投影出該行數據中最終查詢結果所需的分量Col1,Col2,…,Colm和連接列分量UserID構成一個中間結果行,將其存入到分塊中。
⑶ 而后再將這些塊分配給Mapper進行后續操作。
若表User中滿足查詢條件CUser的元組共有mU'行,每行所需的分量Col1,Col2,…,Coln和UserID的數據量為lenU'字節;表Log中滿足查詢條件CLog的元組共有mL'行,每行所需的分量Col1,Col2,…,Colm和UserID的數據量為lenL'字節。則由此可知,進入MapReduce框架的數據量減少為(mU'×lenU')+(mL'×lenL')字節,而最終的連接查詢面對的中間結果也減少為(mU'×lenU')×(mL'×lenL')字節。
若有mU'< 4 未來工作展望 本文提出的預篩選的方法在一定程度上能提高整個MapReduce框架的連接查詢的執行效率,但其算法復雜度并沒有得到質的提升。即若表User或Log中所有的行都分別滿足查詢條件CUser和CLog,且要求查詢兩張表連接之后所有列,則預篩選方法對數據信息量的降低將起不到明顯作用。后續的研究是對該問題進行深入探討,以找出降低算法復雜度的方式,從本質上提高整個查詢運算的效率。 參考文獻(References): [1] VMware vCAT團隊.VMware vCAT權威指南:成功構建云 環境的核心技術和方法[M].機械工業出版社,2014. [2] 董西成.Hadoop技術內幕:深入解析MapReduce架構設計 與實現原理[M].機械工業出版社,2013. [3] Donald Miner, Adam Shook. MapReduce設計模式[M].人民 郵電出版社,2014. [4] Dean J, Ghemawat S. MapReduce: Simplified Data Processing on Large Clusters[C]. Proc. of OSDI'04. San Francisco: [S. n.],2004. [5] Blanas S, Rao J, TianY, et a1. A comparison of joinalgorithms for log processing in MapReduce[C]. Proceedings of the 2010 ACM SIGM0D International Conference on Management of Data,2010.