李 成 許胤龍 郭 帆 吳 思
(中國科學技術大學計算機科學與技術學院 安徽 合肥 230027) (安徽省高性能計算重點實驗室 安徽 合肥 230027)
?
基于MapReduce的內存并行Join算法研究
李成許胤龍郭帆吳思
(中國科學技術大學計算機科學與技術學院安徽 合肥 230027) (安徽省高性能計算重點實驗室安徽 合肥 230027)
摘要傳統的并行Join算法缺少必要的容錯能力,且數據劃分不均往往導致單個線程的阻塞成為整個任務執行的瓶頸。針對以上問題,分析內存連接的各個階段對Join算法性能的影響,提出一種可利用MapReduce的動態機制,避免了傳統并行連接算法的數據任務分派不均和容錯問題。算法使用MapReduce編程框架,并通過封裝分塊標記減少MapReduce Join執行過程中標記和排序的計算開銷,使算法性能顯著提高。實驗結果表明,該算法在共享內存體系結構下,性能上相比已有算法有顯著改進。
關鍵詞內存連接數據封裝MapReduce
0引言
當前,隨著大數據時代的來臨,MapReduce由于其具有良好的可擴展性和容錯性,已經被廣泛應用于面向數據處理的應用中。MapReduce最初是由谷歌工程師Dean等人在2004年推出[1],其最初的設計目的是處理公司大規模的網絡日志數據訪問。MapReduce編程模式通過提供一種簡單的編程接口實現在普通機器上的并行和大規模分布式計算,并能將并行數據處理中容錯、負載均衡和數據分布的復雜細節隱藏起來,自動完成[2]。
Join 算法是進行兩個或者多個數據集聚集連接的操作,是面向數據處理應用的常用算法。優化Join執行的效率,可以有效提升數據分析任務的性能。Join在分布式環境下使用MapReduce并行化的研究成果相對豐富[3-5],但針對內存共享環境下,使用MapReduce對Join算法進行并行化的研究卻仍十分少見[10]。然而,隨著多核處理器的普及和內存數據庫的流行,研究MapReduce在共享內存環境下實現內存Join算法的并行具有重要意義。
傳統的內存Join算法,研究內容主要集中在均攤并行處理數據的多個線程的任務和優化cache訪問兩個方向。其中,一種流行的最新技術是使用Radix-Cluster Hash Join[9]。但是,針對內存多核體系結構,當有多個線程并行處理已經存放到內存的數據時,直接使用以上策略并不能充分發揮其性能。傳統并行方式通過多個線程并行執行任務子集,將帶來單個線程任務過大時會成為整個查詢事務的瓶頸,特別是單個線程查詢失敗將導致整個查詢失敗。
本文提出通過引入MapReduce的機制解決傳統并行Join算法單個線程成為算法瓶頸或者導致整個任務失敗的問題。在標準MapReduce Join算法的基礎上,結合多核體系架構的特性,提出了基于MapReduce的 Radix Join優化算法。在該算法中考慮了cache 命中率和MapReduce執行過程中數據分片規模對算法的影響,在減少中間結果規模的同時,保證算法具有良好的cache 敏感特性。在CMP和SMP環境下的實驗結果表明,該算法無論是對比傳統內存共享并行Join算法還是常用的標準MapReduce Join算法,性能均具有較大提升。
1內存Join算法
內存Join算法優化的研究眾多[6-8],并提出了多種針對不同情形下的優秀算法,其中,Radix Join 便是針對等值Join的突出代表。下面將介紹該算法串行及其并行算法的執行過程。
1.1Radix Join
Balkesen等[9]證實了當哈希表大于cache的大小時,幾乎每個訪問都導致一次cache訪問缺失。因此,切分哈希表,使每個哈希表的大小能夠小于cache的大小,可以提升系統性能。Albutiu等[7]借鑒該思想,通過考慮傳輸后備緩沖器(TLB)對性能的影響,提出了多次劃分的算法思想。現在該思想已經成為Radix Join算法的標準組成。
完整的Radix Join說明如圖1所示。兩個輸入都是通過使用兩次Radix數據劃分的方式劃分到合適的大小。每個ri由基于哈希劃分輸入R得到, ri會根據哈希函數進行第二次劃分。所有的sj劃分的分區會被遍歷并與ri所劃分成的哈希子表中的表項進行連接匹配。在Radix Join中,為了取得良好的cache特性,避免一次過多的數據片劃分產生,兩個輸入表都需要經過多段的數據劃分處理。

圖1 Radix Join執行過程[7]
1.2并行Radix Join
對于通過將劃分過程中產生的數據子集由相互獨立的多個線程并行執行,串行的Radix Join 算法可以實現算法的并行化[8]。在第一階段,由單獨的線程劃分數據,并對于每個線程將會產生自己專用的部分數據的數據區域[7]。在第一步數據劃分完成以后,各個任務已經具有足夠的獨立性,可以很好地并行完成各自的工作。線程工作的任務分發通過任務隊列實現。通過以上方法,對于一個p核系統,該算法的時間復雜度可以期望為單核的1/p。
上述算法在數據均勻分布時具有較好的并行特性。但是,算法在進行數據劃分時,很可能導致劃分數據的失衡,從而導致在并行執行階段中,數據處理時間最長的線程成為整個任務的瓶頸。更重要的是,在并行執行階段,一旦某個線程處理出了問題,將會導致整個查詢任務的失敗。
2MapReduce Join算法及其優化
MapReduce的動態調度機制和容錯機制,可以很好地解決傳統并行內存連接算法的問題。根據MapReduce的特征,MapReduce Join算法可以有兩類實現:Map-side Join和Reduce-side Join[11]。由于Map-side Join算法要求數據是有序的[11],因此,本文只關注適用范圍更廣的Reduce-side Join。
2.1樸素的MapReduce Join算法
如圖2所示,Reduce-side Join 算法將輸入數據的表項通過Map函數產生中間數據。為了區分R表與S表的表項,通過使用添加標簽的方式,產生對應的鍵值對。標記的鍵值對以進行連接的項作為鍵。Map函數的輸出將按鍵的值進行排序。所有的具有相同鍵的數據會被劃歸為一組,交由一個Reducer處理。執行過程如算法1所描述。

圖2 Reduce-side Join 數據流[11]
算法1樸素的MapReduce Join算法
Require: Input relations R and S for Join operations
1.Map(Key k, Value v)
//map 階段
2. if(v comes from R)
//標記R表和S表表項
3. tag=1 and Join_key=v.a
4. else
5. tag=2 and Join_key=v.b
6. Output.collect( Join_key, T+tag)
//輸出帶標簽鍵值對
7.end Map
8.Reduce( key key,List values)
//reduce 階段
9. for each v in values
10. if(v.tag=1)
//根據標記將數據加入相應數據集
11. add v to ArrayList_R
12. else
13. add v to ArrayList_S
14. for each val1 in ArrayList_R
15. for each val2 in ArrayList_S
16. result=val1 Join val2
//執行join
17. collect(key, result)
18.end reduce
MapReduce的引入,在解決傳統并行Radix Join算法問題的同時,也帶來了新的挑戰。由于在鍵值對生成過程中,需要以添加標簽的方式,讓Reducer區分是R表還是S表的表項,使得添加的標簽處理太多。同時,標準MapReduce編程框架要求中間結果將按鍵值進行排序,需要排序的數據規模太大,將嚴重影響算法的執行性能。另外,為了保持Radix的cache特性,數據的最終劃分也需要合理的選擇。因此,本文提出了一種新的改進方法。
2.2MapReduce Join算法優化
在上文中介紹了樸素的MapReduce Join算法的實現及其執行過程,并提出了內存Join算法在使用MapReduce框架時帶來的挑戰。
在多核體系架構下,樸素的MapReduce的算法設計暴露出其弊端。內存和數據的訪問執行是通過如圖3所示的體系完成的?;谠擉w系架構內的通信代價幾乎是可以忽略的,但是MapReduce標準執行中的標記及排序操作將成為算法的主要開銷。分布式MapReduce環境下的Join算法優化,大多關注于網絡通信代價的優化,缺少對于內存共享環境下,結合計算機多核特性的Join算法的深入研究。基于以上原因,本文將分析Join 算法的執行過程,從而提出并實現適合內存共享環境下的MapReduce Join算法。針對如圖4所示的數據流,根據算法2的執行過程,分別對Map和Reduce兩個階段對算法進行優化。

圖3 三層cache的多核體系結構

圖4 內存MapReduce Join的數據流
算法2改進的MapReduce Join算法
Require: Input relations R and S for Join operations
1.Map(Key k, Values Vs)
//map 階段
2. /*使用Radix hash進行第一次劃分后封裝*/
3. Use the hash1 to split the Vs into blocks Ts
4. if(T comes from R)
//標記R表和S表數據塊
5. tag=1 Join_key=hash1(T.a)
6. else
7. tag=2 Join_key=hash1(T.b)
8. Output.collect( Join_key, T+tag)
//輸出帶標簽鍵值對
9.end map
10.reduce (Key k’, List blocks)
//Reduce 階段
11. for each T in blocks
12. if(T.tag=1)
//根據標記將數據塊對應合并
13. add T to ArrayList_R
14. else
15. add T to ArrayList_S
16. /*使用Radix hash對兩個數據集進行第二次劃分*/
17. split ArrayList_R with hasp into ArrayList_R’
18. split ArrayList_R with hasp into ArrayList_S’
19. for each val1 in ArrayList_R’
20. for each val2 in ArrayList_S’
21. do result=val1 Join val2
//執行join
22. collect(key, result)
23.end reduce
2.2.1Map階段優化
在數據劃分之后,相互獨立的Map任務并行處理分配給自己的數據。各個Map任務通過使用添加標簽的方式,對數據表中的每個進行連接的表項進行處理,并產生對應的鍵值對。在鍵值對生成過程中,鍵值對將按鍵值進行插入排序。由于對單個鍵值對添加標簽,使得添加的標簽處理太多,并且,中間需要排序的數據規模太大,嚴重影響算法的執行性能,本文將對此進行改進。
針對Map階段的優化,本文使用封裝標記法減少算法執行過程中的計算開銷。因為在Map階段需要將所有數據是來自R表還是S表進行標記,針對每個表項進行標記執行代價太高。由于面向的是等值連接,可以使用哈希的方式進行初次的數據劃分。如算法2的第3~5行描述,劃分后的數據塊封裝成一個整體,將該數據塊的哈希值作為鍵,包含有封裝數據地址的數據結構作為值,生成鍵值對。通過這種封裝的方式將每個鍵值對的標記將是對每個數據集進行整體標記,減少了大量標記操作。由于中間的結果數據需要進行排序,適當的封裝同時也減少了中間結果需要排序的數量。
為了具備良好的cache特性,算法利用Radix Hash函數對數據進行劃分。但是對于大規模數據而言(GB級以上),如果Radix哈希的劃分分片太少,將不能充分地發揮MapReduce動態調度的優勢??赡軐е聰祿峙洳痪?,使得單個Reducer任務成為瓶頸而無法充分發揮并行能力。為了解決此問題,本文通過多次劃分的方式解決。在Map階段進行適當規模的數據劃分,初次劃分的規模,經過實驗驗證,如圖5所示。每次數據劃分使用一個字節進行劃分,這樣產生256個分組,將得到性能最接近最優的劃分。以字節的方式劃分,可以通過整個字節截取的方式,在減少了中間數據存儲開銷的同時,也使得一次可以有更多的數據放入cache中,提高cache命中率。對于任務可能的分布不均,將對數據規模超過參數限制(本文中為標準值8倍)的數據塊進行多一次的劃分。

圖5 Radix哈希分塊使用位數變化對性能的影響
2.2.2Reduce階段優化
本文不僅關注Map執行階段的優化,還關注Reduce階段的優化。Reducers等待所有的Map任務的返回結果。中間結果中針對每個hash值的數據,都會調用一次Reduce 任務。
每個Reduce任務負責處理多個Reduce數據塊(如圖4所示)。每個Map任務處理部分的數據,Reducer歸并所有部分的數據放在合適的緩存中。Reducer需要計算所有具有相同鍵值的中間值,并輸出數據最終的處理結果。正如本文前面介紹的Radix Join所示,需要設計合理的調度策略來優化數據的訪問性能。
如Map階段所描述,為了減少中間結果排序時間等,將Map階段劃分的數據塊數控制在一個較小的規模。這使得每個Reduce任務需要處理的數據規模過大。本文通過對數據進行二次劃分,并將數據切分到可以容納至最低cache層,進行對多個數據集的并行執行。
為了最優化內存駐留的算法,選擇一個簡單并且合適的內存訪問體系模型將非常重要。盡管本文選用的多核結構的內存體系(如圖3所示),相對于真實的內存體系稍微簡單,但其是不同平臺共有的基本框架[12]。本文的算法是cache敏感的,其優化主要針對第一層cache,也就是L1層的cache優化。因為對于不同的平臺,高層的cache多有變化,而上層的cache訪問往往不會影響最低層的cache訪問性能,這使得本文的研究具有普遍的適用性。本文將進行連接的最終數據規模劃分到不能超過L1層的cache大小,這樣既避免了最低層的cache震蕩,還可以適度地優化高層的cache命中率。
3實驗及結果分析
為了評價本文的算法,我們在不同環境下實現了文中所提到的樸素算法和改進算法,并對比已有的傳統Radix Join算法并行方法。實驗運行在一個具有16核的 Intel Xeon SMP 硬件系統上,操作系統為64位版本的CentOS Linux 6.4。
3.1實驗數據
為了更好地對比,本文采用的數據集與文獻[8]中使用的數據集相似。輸入的數據全部是整數,特別是已有算法的假設數據場景是面向列式存儲的,所以我們將數據表執行定義為每行僅是鍵值對的表項,每條記錄長度為16 B,鍵和值的數據類型是8個字節的長整數。實驗所測試的數據R表與S表數據規模相等,由于硬件條件的限制,為了防止數據刷到虛擬內存,影響算法性能的測量, 最大數據集為兩個表各5億條記錄,共16 GB。
3.2實驗設置
本文通過機器和任務參數配置來模擬不同場景下的內存Join算法。使用不同大小的數據輸入表進行不同數據規模下的性能對比。實驗結果顯示,本文的算法性能在所有測試情況下都優于其他的算法。
在前兩組實驗中,依次在CMP(8核)和SMP(16核)環境下固定核數,使用數據規模為1、2、4、8、16 GB的不同規模輸入的數據集進行實驗。最后的一組實驗將評估本文算法隨核數變化的可擴展性。通過固定R表與S表規模(共1 GB條記錄),改變處理數據使用的核數,評估算法對于處理器核數變化的可擴展性方面的性能。
3.3實驗結果及分析
圖6展現了在固定核數的情況下,本文算法與已有相關算法在CMP環境下隨著數據規模輸入的增大,執行時間的變化。由圖可知,在各個數據集下,改進的MapReduce join 算法均優于標準的Radix Join 并行化實現,而樸素的MapReduce Join算法性能最差。隨著實驗數據規模的增大,各個算法的執行時間都有顯著的增大。并且隨著數據規模的增大,改進算法相對標準并行Radix Join和樸素MapReduce Join性能提升更加明顯,由1 GB數據規模的性能分別提升了28.1%和77.3%,到16 GB數據規模的性能分別提升了46.7%和77.9%。這是因為隨著數據規模的增加,MapReduce動態調度更能突出其優勢,而樸素的MapReduce Join算法因為大量地添加標簽操作以及中間數據排序操作花費了太多時間。實驗結果表明,將原分布式環境下MapReduce編程模型簡單搬到內存共享環境下并不能取得突出的性能表現,需要根據環境特征重新設計算法,才能取得良好的性能。

圖6 Join算法在8核CMP系統上的性能對比
圖7展示了SMP環境下(16核)處理與CMP(8核)環境下相同數據集的各個算法的處理時間。由圖可知,各算法在16核環境下執行時間都有不同程度的減少,但在SMP環境下,改進MapReduce Join算法相對其他兩種算法的性能仍然有很大提升。以實驗數據1 GB的第一組實驗為例,改進的MapReduce Join的執行時間由CMP環境下的0.983 s下降到0.6196 s,相對于標準并行Radix Join和樸素MapReduce Join性能分別提升了26.9%和76.6%。相對CMP環境下提升雖然略有下降,但基本上和CMP(8核)環境下取得了一致的結果。

圖7 Join算法在16核SMP系統上的性能對比
圖8展示了對于同一數據集,各個算法隨著計算核數變化的執行時間變化。該擴展性測試顯示,各個算法隨著核數的增加,執行時間逐漸減少,而本文所提出算法的執行時間隨核數增加而下降最為迅速。因為隨著實際使用核數的增加,將會有更多的線程同時在共享數據的情況下進行數據處理,使得每個Map任務或者Reduce任務處理的數據規模減少。在單核環境下,雖然算法的執行時間稍遜色于標準并行Radix Join,但當核數增多后,由于并行處理數據劃分等原因,改進的MapReduce Join算法表現的性能開始超過標準并行Radix Join。并且最終隨著核數的增加,算法性能的提升呈現保持的趨勢。圖8展示的對于擴展性能的測試結果驗證了改進后的MapReduce Join算法不僅具有高效性,還具有良好的可擴展性。

圖8 Join算法擴展性測試
4結語
本文提出一種新的內存Join算法,且該算法在多核共享內存體系結構下可以取得高效性能。該算法借助MapReduce編程框架,并利用Radix算法的特性,在標準實現上加以改進,解決了傳統并行Join算法單個線程阻塞成為整個任務瓶頸以及缺少容錯性的問題。通過在Map階段劃分后封裝減少中間結果數據規模,解決了因引入MapReduce方式帶來中間結果標記和排序開銷過大的問題,使得算法在具有了MapReduce良好容錯性的同時,具有高效性。新的MapReduce Join算法在多核內存共享環境下,相對于原有算法,在計算性能和良好的可擴展性方面均具有突出的優勢。
參考文獻
[1] Dean J,Ghemawat S.MapReduce:simplified data processing on large clusters[J].Communications of the ACM,2008,51(1):107-113.
[2] Ranger C,Raghuraman R,Penmetsa A,et al.Evaluating MapReduce for multi-core and multiprocessor systems[C]//High Performance Computer Architecture,2007.HPCA 2007.IEEE 13th International Symposium on.IEEE,2007:13-24.
[3] Afrati F N,Ullman J D.Optimizing Joins in a Map-Reduce environment[C]//Proceedings of the 13th International Conference on Extending Database Technology.ACM,2010:99-110.
[4] Blanas S,Patel J M,Ercegovac V,et al.A comparison of Join algorithms for log processing in MapReduce[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of data.ACM,2010:975-986.
[5] Afrati F N,Ullman J D.Optimizing multiway Joins in a Map-Reduce environment[J].Knowledge and Data Engineering,IEEE Transactions on,2011,23(9):1282-1298.
[6] Blanas S,Li Y,Patel J M.Design and evaluation of main memory hash Join algorithms for multi-core CPUs[C]//Proceedings of the 2011 ACM SIGMOD International Conference on Management of data.ACM,2011:37-48.
[7] Albutiu M C,Kemper A,Neumann T.Massively parallel sort-merge Joins in main memory multi-core database systems[J].Proceedings of the VLDB Endowment,2012,5(10):1064-1075.
[8] Balkesen C,Teubner J,Alonso G,et al.Main-Memory Hash Joins on Modern Processor Architectures[J].Knowledge and Data Engineering,IEEE Transactions on,2014,26(3):99-113.
[9] Balkesen C,Teubner J,Alonso G,et al.Main-memory hash Joins on multi-core CPUs: Tuning to the underlying hardware[C]//Data Engineering (ICDE),2013 IEEE 29th International Conference on.IEEE,2013:362-373.
[10] Jiang W,Ravi V T,Agrawal G.A map-reduce system with an alternate api for multi-core environments[C]//Proc of the 10th Int Conf on Cluster,Cloud,and Grid Computing.IEEE,2010:84-93.
[11] Jadhav V,Aghav J,Dorwani S.Join Algorithms Using MapReduce:A Survey[C]//International Conference on Electrical Engineering and Computer Science,21st.2013.
[12] Boncz P A,Manegold S,Kersten M L.Database architecture optimized for the new bottleneck:Memory access[C]//VLDB,1999,99:54-65.
收稿日期:2015-02-11。李成,碩士生,主研領域:數據庫優化查詢。許胤龍,教授。郭帆,碩士生。吳思,博士生。
中圖分類號TP3
文獻標識碼A
DOI:10.3969/j.issn.1000-386x.2016.07.059
RESEARCH ON MAPREDUCE-BASED IN-MEMORY PARALLEL JOIN ALGORITHM
Li ChengXu YinlongGuo FanWu Si
(SchoolofComputerScienceandTechnology,UniversityofScienceandTechnologyofChina,Hefei230027,Anhui,China) (TheKeyLaboratoryonHighPerformanceComputing,Hefei230027,Anhui,China)
AbstractTraditional parallel Join algorithms lack the necessary fault tolerance capability, and data partitioning inequality often leads to a single thread obstruction which in turn becomes the bottleneck of the whole task execution. In light of the above problem, this paper dissects the influence of each phase of in-memory join on the performance of Join algorithm, and proposes a dynamic mechanism in which the MapReduce is applicable, thus avoids the problems of traditional parallel Join algorithm implementation in unequal data tasks allocation and fault tolerance. The algorithm uses MapReduce programming framework, and reduces the computational cost of tagging and ranking in execution process of MapReduce Join through encapsulating the blocking tags, this makes the performance of the algorithm improve remarkably. Experimental results show that this algorithm has evident improvement in performance for shared-memory architecture.
KeywordsIn-memory JoinData encapsulationMapReduce