馬在營 劉建新 徐彬
摘要摘要:MapReduce的優勢集中體現在并行計算上,而在迭代計算上則存在諸多不足??蒲腥藛T不斷對MapReduce并行計算模型進行迭代計算優化,使MapReduce可以支持顯示迭代式計算。介紹了傳統MapReduce框架與迭代式MapReduce框架,通過K-means算法測試了iMapReduce、Hadoop MapReduce的迭代性能,給出了實驗結果及分析。
關鍵詞關鍵詞:迭代式計算;MapReduce;Haloop;Hadoop;iMapReduce
DOIDOI:10.11907/rjdk.162775
中圖分類號:TP301
文獻標識碼:A文章編號文章編號:16727800(2017)005020703
0引言
在全球信息產業高速發展融合的背景下,網絡數據資源規模急劇膨脹,尤其是高能物理、互聯網應用、基因工程、電子商務以及計算機仿真等領域的數據量攀升速度驚人,現有數據分析工具很難滿足日益增長的海量密集型數據的信息處理需求?;诖?,Google實驗室專門設計了MapReduce編程模型,該模型在數據信息的批量處理上優勢明顯,但在迭代處理上問題也相當突出。逐次逼近是迭代計算的基本思想,迭代計算過程為先取近似值,然后采用遞推公式對近似值反復校正,直至精度達到要求為止。當數據量較小時,可以在單機上進行迭代計算,但當數據量非常大時,迭代處理則極其耗時。在數據挖掘、信息檢索、機器學習等領域,有很多算法需要迭代,傳統的迭代計算不能有效應對當前的大數據處理需求。經過幾年時間,科研工作者對MapReduce進行迭代計算改進,產生了一些迭代式計算框架,包括比較知名的Twister、Haloop等。
1傳統MapReduce模型框架
Google于2004年在論文《分布式計算:基于大型集群的數據簡化處理》中首次提出MapReduce框架,指出MapReduce框架在處理密集型應用數據過程中將處理程序簡化抽象為Map與Reduce兩個階段,用戶在進行分布式程序設計過程中,只需調用reduce()和map()兩個函數即可,無需過多考慮任務調度、設備通信、數據分片以及容錯等細節問題。在MapReduce框架內,這些問題都能得到很好的處理。作為MapReduce框架的主要思想,Map與Reduce來自函數編程語言,其原理如圖1所示。Map將數據打散,Reduce則負責聚集這些數據。在Map環節,maptask每讀取一個block,即采用map()函數對數據進行處理,同時將處理結果寫入本地磁盤;在Reduce環節,每個reduce task在對Map Task節點數據進行遠程讀取過程中,都會采用reduce()函數處理數據,同時將處理完成的數據寫入分布式文件系統內。在MapReduce框架內,用戶只要通過map和reduce端口,即可快速計算TB級數據,如數據挖掘和日志分析等常見應用。此外,MapReduce框架還適用于圓周率等科學數據的計算。
通過上述分析可知,在傳統MapReduce框架內,無論是map環節,還是reduce環節,數據處理結果均需寫入磁盤內,雖然這一過程會降低系統性能,但是能夠提高系統可靠性。也正因如此,傳統MapReduce在迭代計算處理上問題突出,若用戶強行在傳統MapReduce上進行迭代計算,系統性能則會變得非常差。
2迭代式MapReduce框架
2.1Twister
在Twister內,大文件不會被自動切割為單個block,所以用戶必須提前將文件分割為小文件才能進行task處理。在map環節,調用map()函數處理后,數據結果存儲于分布式內存之中,然后利用broker network將數據推送至reduce task(若Twister內存較大,則所有中間數據都能存儲其中);在reduce環節,通過combine對reducetask產生的結果進行歸并,此時用戶可依據條件作出是否結束迭代計算的決定。合并后的數據被分解傳送至map task,進行新一輪迭代計算。為有效提高系統容錯性,Twister會定時將reducetask和maptask產生的結果錄入磁盤內,避免task失敗后數據丟失。即使task失敗,依然可以從磁盤內調取數據重新進行迭代處理。Twister架構如圖2所示。
為避免用戶在迭代計算中對task的重建,Twister建立了一個taskpool,當用戶需要使用task時,可直接從pool中讀取。在Twister內,所有數據與消息均由broker network傳遞,brokernetwork是獨立模塊,現階段僅支持ActiveMQ和NaradaBroking。目前Twister尚屬于研究性項目,其設計策略決定了Twister還不能在實際中得到廣泛應用。例如Twister將數據存儲于分布式內存中,而缺乏分布式文件系統,僅提供tool對文件進行訪問與存儲,并存在迭代計算模型不夠抽象、支持應用類型偏少等問題。
2.2Haloop
Haloop是Haloop Mapreduce 的修改版,Haloop不僅擴展了分布迭代編程,并通過使用任務調度器loopaware添加各種緩存機制,大大提高了效率。其架構如圖3所示。
圖3有3個工作在運行,工作1、工作2、工作3。每個工作都有3個任務同時從slave節點上運行。為了適應迭代計算,Haloop以Hadoop為基礎作了一些調整:①Haloop提供了一個新的應用程序用戶編程接口,簡化了迭代表達式的表達;②Haloop的master節點包含一個新的loop控制模塊,可以指定迭代停止條件;③Haloop為迭代計算使用一個新的任務調度器,可以將數據本地化;④Haloop的緩存和索引應用程序在slave節點上。
(1)在Haloop內迭代式任務全部被抽象為:
Ro代表初始輸入,Ri代表第i次迭代的結果,L是迭代計算中保持不變的數據。Haloop主要編程接口為:
SetFixedPointThreshold:設置好迭代計算的結束條件,即距離差的閾值。
Set the number of iterations:設置迭代次數。
Setiterationinput:設置迭代變化輸入數據。
AddinvariantTable:設置不變的輸入數據。
(2) Loop-aware 任務調度。 Haloop 在初次迭代計算中會將不變的輸入數據存儲于計算節點中,以便后續的task調度,且數據應當盡量存儲于locality等固定節點中,使每次迭代計算時無需重新傳輸數據。
(3)Index與Cache。無論是map task的輸入還是輸出,reducetask的輸出都會通過緩存與建索引來加快迭代計算速度。其中,緩存主要指迭代計算結果被寫入磁盤以供循環迭代使用。
總體來看,與Twister相比,Haloop更為抽象,能夠支持多種計算。此外,Haloop是在Hadoop基礎上優化改進而成,因此具備Hadoop的優點。
2.3iMapReduce
張巖峰在論文《iMapReduce:分布式計算框架迭代計算》中提出iMapReduce概念,iMapReduce是以盡量減少系統開銷為目的而構建的高效迭代計算框架。MapReduce需要一系列的迭代作業來處理迭代計算,圖4左圖是MapReduce迭代處理過程。在iMapReduce中,迭代任務只發生在初始階段和結束階段,用戶僅需建立一個作業任務即能完成迭代計算,避免了對系統的反復開銷。通過對本地靜態數據的維護,減少因反復加載數據而產生的系統開銷,同時允許在單次迭代計算中異步執行map任務。iMapReduce極大地提高了迭代算法性能。圖4展示了iMapReduce的迭代計算過程。iMapReduce為用戶提供編程的API接口。iMapReduce兼容Hadoop MapReduce任務,用戶可根據實際決定是否啟用iMapReduce功能。iMapReduce將靜態數據和迭代數據區分開。圖5是iMapReduce迭代計算節點數據流,在初始環節中,靜態數據與迭代數據被加載至本地文件系統,其中靜態數據始終存儲于本地文件系統內,而迭代數據在調用map()函數處理前,需要同靜態數據聯合執行操作,即將靜態數據與迭代數據連接起來,作為map()函數輸入。
通過這些迭代優化,使iMapReduce更加適合大數據迭代處理,并且提供了編程接口API,兼容HadoopMapreduce任務,用戶可選擇是否啟用iMapReduce的迭代處理功能。
3MapReduce迭代計算迭代性能測試
3.1實驗設計
集群環境由5臺計算機組成(1個Master節點,4個Slave節點),系統為64位Windows7,處理器為i7-4790 3.60GHz,內存8GB,網速10Mb/s。
K-means 算法是聚類分析中使用最廣泛的算法之一,可用于測試不同平臺的迭代性能。已知觀測集(x1,x2,x3,...,xn),其中每個觀測都是一個d維實向量,k-平均聚類要將這n個觀測劃分到k個集合中(k≤n),使組內平方和(WCSS,Within-Cluster Sum of Squares)最小。換言之,它的目標是找到使式(1)滿足的聚類Si,其中μi是Si中所有點的均值。簡單描述為:不斷迭代計算各個數據簇的中心點,直到該中心點趨于穩定。
argminS=∑i=k∑x∈Six-μi2(1)
K-means步驟如下:①創建k個數據簇的中心點;②計算所有數據點到k個中心點的距離,將其劃歸到距離自己最近的中心點;③根據上次聚類結果,計算各個數據簇的算數平均值作為新的數據簇中心點;④將所有數據在新中心點上重新聚類;⑤重復第4步,直到中心點趨于穩定。
通過測試K-means算法在Hadoop Mapreduce、iMapReduce的運行情況,統計出不同平臺運行K-means的時間。
3.2實驗結果
從表1中可以看出,iMapReduce運行時間遠遠短于Hadoop MapReduce,大約是Hadoop MapReduce運行時間的25%~39%,這是由于Hadoop不支持迭代計算,花了大量時間在數據通信上,從而大大降低了執行速度。而iMapReduce通過避免傳輸靜態數據而減少了通信開銷,并在一次迭代內允許異步執行,因此對于迭代應用有良好的性能表現。
4結語
迭代式計算是大數據領域中一種重要的計算方法,Twister和Haloop的模型抽象度不夠高,支持的計算有限,iMapReduce有效提升了大數據迭代計算的性能。目前MapReduce迭代式計算還處于發展階段,在以后的研究中,如何優化提高分布式迭代計算的作業調度速度是亟待解決的問題。
參考文獻參考文獻:
[1]ZHANG Y, GAO Q, GAO L, et al. iMapReduce: a distributed computing framework for iterative computation[J]. Journal of Grid Computing, 2012, 10(1):11121121.
[2]EKANAYAKE J, LI H, ZHANG B, et al. Twister: a runtime for iterative MapReduce[C].ACM International Symposium on High Performance Distributed Computing,2010:810818
[3]Bu Y, Howe B, Balazinska M, et al. Haloop: efficient iterative data processing on large clusters[J]. Proceedings of the Vldb Endowment, 2010, 3(12):285296.
[4]張巖峰.云環境下大數據迭代計算研究[J].沈陽:東北大學,2012.
[5]易修文, 李天瑞, 張鈞波,等. 不同MapReduce運行系統的性能測試與分析[J].計算機科學, 2015,42(5):2427.
[6]董的博客. 傳統MapReduce框架介紹[EB/OL]. http://dongxicheng.org/mapreduce/traditionalmapreduceframework/.
[7]Kmeans算法的Hadoop實現[EB/OL].http://blog.csdn.net/chaaaa_wangyc/article/details/53426612?locationNum=9&fps=1.
責任編輯(責任編輯:黃健)