劉 恒,譚 良,2
1(四川師范大學(xué) 計(jì)算機(jī)科學(xué)學(xué)院,成都 610101)
2(中國科學(xué)院 計(jì)算技術(shù)研究所,北京 100190)
隨著數(shù)據(jù)量與日俱增,人們對數(shù)據(jù)處理效率的需求也逐步增加.Spark[1]是繼Hadoop[2]之后被提出基于內(nèi)存計(jì)算的可擴(kuò)展的高性能并行計(jì)算框架.針對Hadoop的不足[3,4],即大量的網(wǎng)絡(luò)傳輸和磁盤I/O使得效率低下,Spark使用內(nèi)存進(jìn)行數(shù)據(jù)計(jì)算,同樣的算法在Spark中的運(yùn)行速度比Hadoop快10倍~100倍[5],而且Spark廣泛的支持了多種的計(jì)算模式,包括批處理、迭代計(jì)算、交互式查詢和流處理.這使得Spark的應(yīng)用越來越廣泛,使用它的公司包括雅虎、百度和騰訊.但Spark性能方面仍需提高,特別是當(dāng)大量任務(wù)集中處理時(shí),有限的內(nèi)存資源會限制Spark系統(tǒng)整體性能的發(fā)揮.緩存替換是合理利用內(nèi)存資源、提高任務(wù)處理性能的關(guān)鍵技術(shù),所以針對緩存替換算法的研究便成為了熱點(diǎn)[6-13].
當(dāng)前Spark緩存替換算法是LRU[14].該算法的缺點(diǎn)是在進(jìn)行緩存替換時(shí),Spark不能預(yù)測RDD對象將來的使用順序,沒有考慮到RDD計(jì)算代價(jià)和大小等影響任務(wù)執(zhí)行效率的重要因素,度量方法不夠準(zhǔn)確.當(dāng)高重用但最近未使用或重新計(jì)算耗費(fèi)極高代價(jià)的RDD的分區(qū)被替換出去,應(yīng)用執(zhí)行的效率會降低.針對LRU在計(jì)算框架中的表現(xiàn)不夠理想,便有了一系列改進(jìn)措施被提出.如使用其他典型的替換算法包括:FIFO、LFU、LRFU、MIN來代替LRU等[9].其中權(quán)重RDD緩存替換算法是最重要的改進(jìn)措施之一.在計(jì)算過程中不同的RDD,由于計(jì)算代價(jià)不同、大小不同,所以對整個(gè)計(jì)算過程的影響便不同.然而當(dāng)前權(quán)重RDD緩存替換算法存在對權(quán)重值的計(jì)算結(jié)果不準(zhǔn)確和考慮因素不全面,對計(jì)算代價(jià)缺乏量化的問題.因此,研究RDD權(quán)重緩存替換具有一定的現(xiàn)實(shí)意義.
針對上述問題,本文提出了一種新的RDD分區(qū)權(quán)重緩存替換算法——WCSRP,WCSRP綜合考慮了RDD的計(jì)算代價(jià)、使用次數(shù)、分區(qū)的大小和生命周期四大因素,并進(jìn)行量化計(jì)算.相比于之前的研究,彌補(bǔ)了對RDD計(jì)算代價(jià)缺乏量化的缺陷,增加考慮了應(yīng)用在Spark中運(yùn)算時(shí)Task的locality level決定的輸入RDD位置,這時(shí)會涉及到網(wǎng)絡(luò)傳輸或磁盤I/O,對應(yīng)用執(zhí)行時(shí)間存在影響,另外相比較于使用RDD的權(quán)重值作為是否替換的度量工具,使用分區(qū)的權(quán)重值更符合Spark底層緩存機(jī)制.這樣使得權(quán)重值的計(jì)算更加準(zhǔn)確,提高了內(nèi)存利用率和作業(yè)執(zhí)行效率.
目前,國內(nèi)外對Spark系統(tǒng)的緩存替換算法做出了不少研究,取得了一些的研究成果.下面我們對這些成果進(jìn)行分析和總結(jié).
對于國外,文獻(xiàn)[15]提出了Tachyon,一種基于內(nèi)存的分布式文件系統(tǒng),能夠充分發(fā)揮分布式集群的內(nèi)存特性,能夠?yàn)镾park提供存儲服務(wù),但在Tachyon的實(shí)現(xiàn)中,替換算法還是采用LRU.文獻(xiàn)[10]提出了在內(nèi)存空間不足的情況下,在內(nèi)存上壓縮數(shù)據(jù)來節(jié)省內(nèi)存空間,然而壓縮數(shù)據(jù)又會帶來CPU資源消耗的問題,在對數(shù)據(jù)集進(jìn)行復(fù)雜處理時(shí)CPU資源也是非常寶貴的.文獻(xiàn)[11]中將Spark任務(wù)中同一個(gè)Stage的RDD看作同一組,當(dāng)緩存已滿或沒有足夠空間時(shí),移除緩存中和待緩存RDD中不屬于同一組的RDD,如果都屬于同一組,則溢出到磁盤進(jìn)行存儲,該方法在對RDD進(jìn)行替換時(shí)僅考慮了一個(gè)Stage,這種粗粒度的緩存機(jī)制存在局限性.文獻(xiàn)[16]提出了AWRP(Adaptive Weight Ranking Policy)算法為每個(gè)緩存對象計(jì)算權(quán)重,并優(yōu)先轉(zhuǎn)換權(quán)重值最低的緩存對象,但權(quán)重值計(jì)算方法沒有對Spark的RDD緩存進(jìn)行針對性的優(yōu)化,并不適用于分布式計(jì)算框架.
對于國內(nèi),文獻(xiàn)[12]提出了使用RDD權(quán)重值來進(jìn)行緩存替換,將RDD的計(jì)算代價(jià)、使用次數(shù)、自身大小和生命周期作為衡量RDD權(quán)重的因素.然而作者并沒有對計(jì)算代價(jià)這一因素提出合理的計(jì)算方法,因此影響了RDD權(quán)重計(jì)算的準(zhǔn)確性.文獻(xiàn)[9]提出了自適應(yīng)緩存管理策略,同樣使用RDD權(quán)重值作為緩存替換的度量工具,借鑒了文獻(xiàn)[12]中影響權(quán)重值的四大因素,提出使用RDD生成時(shí)長來代替計(jì)算代價(jià),并且忽略了和RDD大小有關(guān)的因素.這樣雖然使計(jì)算簡便了許多,然而并沒有解決RDD權(quán)重計(jì)算缺乏準(zhǔn)確性這一缺陷,影響因素考慮不全面.作者在做RDD置換的時(shí)候沒有考慮到Spark系統(tǒng)在進(jìn)行緩存置換的時(shí)候置換的單位是Block,對應(yīng)的便是RDD的一個(gè)分區(qū).文獻(xiàn)[13]分析了Spark計(jì)算框架的內(nèi)存管理和緩存機(jī)制,提出了基于RDD分區(qū)的權(quán)重計(jì)算,并以該權(quán)重值作為緩存置換的標(biāo)準(zhǔn).但是文中僅僅考慮了RDD分區(qū)大小、RDD分區(qū)被使用次數(shù)和RDD分區(qū)存在內(nèi)存中的時(shí)間,并沒有考慮影響一個(gè)RDD的其他因素,比如計(jì)算代價(jià).這樣得到的權(quán)重值是不準(zhǔn)確的,會影響Spark集群的計(jì)算效率.
總結(jié)起來,當(dāng)前針對Spark緩存替換的研究工作還不夠完善,已有的基于權(quán)重的緩存替換算法存在權(quán)重值計(jì)算不準(zhǔn)確,考慮因素不全面,度量方法不夠細(xì)致,影響了緩存的命中率和作業(yè)執(zhí)行的效率.本文結(jié)合現(xiàn)有大數(shù)據(jù)快速實(shí)時(shí)處理的需要,對Spark內(nèi)核中RDD對象的緩存替換進(jìn)行研究分析,提出了一種新的RDD分區(qū)權(quán)重緩存替換算法,包括權(quán)重計(jì)算和緩存替換的改進(jìn)措施.通過細(xì)化分析RDD分區(qū)對象的影響因素,并對這些因素進(jìn)行量化分析,使得 RDD分區(qū)權(quán)重值的計(jì)算更加精確.進(jìn)而在內(nèi)存資源不足的情況下,緩存中的RDD分區(qū)的替換更加合理.弱化了內(nèi)存資源對整體性能的影響,讓集群效能得到了極大的發(fā)揮.
Spark中最重要的抽象概念就是RDD,RDD通常是通過HDFS(或其他Hadoop支持的文件系統(tǒng))上的文件,或者驅(qū)動器中的Scala集合對象,來創(chuàng)建或轉(zhuǎn)換得到的.在Spark中一個(gè)RDD被劃分成若干個(gè)partition,因此對于RDDi可表示為RDDi={p1,p2,p3,…,pn}.用戶可以請求Spark將RDD緩存在內(nèi)存中,當(dāng)RDD被緩存在內(nèi)存中,由于內(nèi)存的容量是有限的,會出現(xiàn)內(nèi)存存儲資源不夠用的情況.

由此,可以得出通過如下公式計(jì)算出RDD分區(qū)的權(quán)重:
w=α0CRDDi+α1FRDDi+α2Sp+α3LTRDDi+α4ILRDDi
(1)
公式(1)中 ,w表示該RDD分區(qū)的權(quán)重值,CRDDi表示該RDD的計(jì)算代價(jià),F(xiàn)RDDi表示該RDD的使用次數(shù),Sp表示該分區(qū)的大小,LTRDDi表示該RDD的生命周期,ILRDDi表示計(jì)算該RDD輸入RDD的位置。A={α0,α1,α2,α3,α4}中的元素是常數(shù),分別是CRDDi、FRDDi、Sp、LTRDDi和ILRDDi的歸一化權(quán)重,權(quán)重值得選取由用戶的具體任務(wù)Task需求決定。
下面,我們將分別對CRDDi、FRDDi、Sp、LTRDDi和ILRDDi的權(quán)重進(jìn)行量化計(jì)算。
在Spark中RDD作為計(jì)算的參與者,在宏觀上,所有的計(jì)算過程都是根據(jù)RDD進(jìn)行的,但從微觀上來看,算子的操作其實(shí)是作用在RDD的不同分區(qū)上.當(dāng)一個(gè)RDD經(jīng)過轉(zhuǎn)化操作派生出新的RDD時(shí),Spark會使用譜系圖(lineage graph)來記錄這些不同RDD之間的依賴關(guān)系,Spark需要用這些信息來按需計(jì)算每個(gè)RDD,也可以依靠譜系圖通過用緩存中的RDD來恢復(fù)下游的RDD,而不需要從頭計(jì)算[17].因此RDD之間的依賴關(guān)系便成了整個(gè)應(yīng)用運(yùn)行的關(guān)鍵.在Spark源碼中Dependency類是依賴關(guān)系的基類,其中NarrowDependency和ShuffleDependency均繼承自該基類.前者表示RDD之間是窄依賴關(guān)系,這個(gè)的RDD會被劃到同一個(gè)Stage中,這樣就可以以管道的方式迭代執(zhí)行;后者表示RDD之間是寬依賴關(guān)系,依賴上游的RDD不止一個(gè),且多個(gè)子分區(qū)會依賴于同一個(gè)父RDD分區(qū).
由于依賴關(guān)系的不同,在計(jì)算一個(gè)RDD的時(shí)候需要的進(jìn)行計(jì)算的次數(shù)也是不同的.如圖1所示.

圖1 窄依賴和寬依賴結(jié)構(gòu)圖Fig.1 Narrow dependency and wide dependency
其中,RDD A和RDD B屬于窄依賴,故RDD A的分區(qū)經(jīng)過轉(zhuǎn)換操作一產(chǎn)生RDD B中的分區(qū).有圖可以看出RDD A和RDD B的分區(qū)數(shù)均是4,故計(jì)算產(chǎn)生RDD B時(shí),需要在4個(gè)分區(qū)上進(jìn)行計(jì)算,我們可以把這個(gè)計(jì)算的過程當(dāng)做RDD B的計(jì)算代價(jià),故RDD B的計(jì)算代價(jià)值為4.也就是說通過窄依賴得到的RDD的計(jì)算代價(jià)就是該RDD的父RDD分區(qū)數(shù).RDD C和RDD D屬于寬依賴,RDD C的每個(gè)分區(qū)都可能被RDD D所使用,RDD D的分區(qū)中的數(shù)據(jù)同樣來自RDD C所有的分區(qū),在進(jìn)行計(jì)算的時(shí)候?qū)DD C的每個(gè)分區(qū)的操作次數(shù)等于RDD D的分區(qū)數(shù).從圖1可以知道RDD C的分區(qū)數(shù)為3,RDD D的分區(qū)數(shù)為3,那么總的計(jì)算次數(shù)便是9,和窄依賴相同我們也可以得到RDD D的計(jì)算代價(jià),所以RDD D的計(jì)算代價(jià)值是9.這樣便得到了通過寬依賴得到的RDD的計(jì)算代價(jià)就是該RDD的分區(qū)數(shù)與父RDD的分區(qū)數(shù)的乘積.
由此,我們給出RDD計(jì)算代價(jià)的計(jì)算公式如下:
(2)
其中,parspRDDi表示屬于寬依賴的父RDD的分區(qū)數(shù),a表示是否存在寬依賴,若不存在寬依賴a=0,若存在a=1,parnpRDDj表示屬于窄依賴的父RDD的分區(qū)數(shù),b表示是否存在窄依賴,若不存在窄依賴b=0,若存在b=1.
由于Spark的懶加載機(jī)制,使得Spark可以在Action算子觸發(fā)SparkContext.runJob之前,程序在運(yùn)行時(shí)并沒有引入數(shù)據(jù)進(jìn)行計(jì)算,此時(shí)系統(tǒng)通過分析代碼的邏輯,初始化通過Transformation算子產(chǎn)生的RDD(此時(shí)這些RDD中并沒有實(shí)際的數(shù)據(jù)).又因?yàn)槊總€(gè)RDD都有其依賴(除了最頂級RDD的依賴是空列表),所以可以確定各個(gè)RDD之間的依賴關(guān)系,然后根據(jù)RDD之間的依賴關(guān)系構(gòu)建成DAG[18]圖,將不同的RDD串聯(lián)起來.在DAG圖中點(diǎn)對應(yīng)RDD,邊對應(yīng)一個(gè)算子.
下面,我們給出一個(gè)PageRank的例子,代碼如圖2所示.

圖2 PageRank的Spark實(shí)現(xiàn)代碼圖Fig.2 Implement PageRank in Spark
分析代碼可知,在調(diào)用saveAsTextFile這個(gè)Action算子之前,所有的Transformation算子都是為了構(gòu)建RDD之間的依賴關(guān)系.而這些依賴關(guān)系也就構(gòu)成了DAG圖,圖3便是PageRank在做兩輪迭代時(shí)的DAG圖.因?yàn)镈AG圖中邊對應(yīng)一個(gè)算子,那么也就是說這時(shí)該RDD被使用了一次,在圖3中我們可以看出links出去有三個(gè)箭頭,那么可以知道此時(shí)links被使用了三次.
所以我們可以通過統(tǒng)計(jì)DAG圖中某個(gè)RDD的出度,便可以得到該RDD的使用次數(shù).具體公式如下:
FRDDi=N
(3)
其中,N表示RDDi的出度.
在Spark中使用BlockManager來緩存RDD數(shù)據(jù),程序內(nèi)部定義了抽象類BlockStore,用于制定所有存儲類型的規(guī)范.目前BlockStore的具體實(shí)現(xiàn)包括MemoryStore、DiskStore和TachyonStore.其中MemoryStore是負(fù)責(zé)將沒有序列化的Java對象或序列化的ByteBuffer存儲到內(nèi)存中.開發(fā)者使用 persist()或者 cache()函數(shù)來標(biāo)記一個(gè) RDD 是持久化的,當(dāng)這個(gè) RDD 在被一個(gè)action觸發(fā)的作業(yè)提交計(jì)算后,它就會緩存在內(nèi)存中.我們都知道,RDD 的運(yùn)算是基于partition,每個(gè)task代表一個(gè)分區(qū)上一個(gè)stage內(nèi)的運(yùn)算閉包,task被分別調(diào)度到多個(gè)executor上去運(yùn)行,運(yùn)算過程中的RDD若需要存儲則會將在該executor上運(yùn)行的partition緩存下來對應(yīng)的就是Block,Spark中對存儲內(nèi)容的讀取就是根據(jù)Block進(jìn)行的.本文討論的是在內(nèi)存中的緩存,故在緩存的時(shí)候會調(diào)用到MemoryStore中的實(shí)現(xiàn)方法putIteratorAsValues或putIteratorAsBytes,這兩個(gè)方法的區(qū)別在于一個(gè)是負(fù)責(zé)嘗試將沒有序列化的Java對象放入內(nèi)存,另一個(gè)是試著將序列化的ByteBuffer放入內(nèi)存,這兩個(gè)均是嘗試待緩存的數(shù)據(jù)大小是否超過當(dāng)前剩余的空閑內(nèi)存,如果未超過則進(jìn)行存儲操作,完成緩存動作.

圖3 PageRank的DAG圖Fig.3 DAG graph of PageRank
因此可以通過監(jiān)聽,獲得準(zhǔn)備緩存的分區(qū)的大小,公式如下:
Sp=sacquireMemory
(4)
其中,sacquireMemory表示該分區(qū)申請內(nèi)存的大小.
RDD的生命周期(LTRDDi)就是該RDD存活的時(shí)間段,如果一個(gè)RDD不會再被使用我們可以看做該RDD已經(jīng)死亡,反之一個(gè)RDD還可以通過轉(zhuǎn)換操作產(chǎn)生新的RDD那么該RDD還處于存活狀態(tài).對于生命周期的計(jì)算可以通過DAG圖結(jié)合應(yīng)用的執(zhí)行過程進(jìn)行分析.Spark在進(jìn)行任務(wù)處理的時(shí)候,計(jì)算框架內(nèi)管理線程級別的Task,在進(jìn)行任務(wù)調(diào)度的時(shí)候,一個(gè)Stage中的Task會被分別調(diào)度到計(jì)算節(jié)點(diǎn).Task在運(yùn)行的時(shí)候可以看做pipeline.一個(gè)RDD由多個(gè)partition組成,每個(gè)partition經(jīng)過Transformation操作產(chǎn)生新的RDD中對應(yīng)的partition,依次進(jìn)行下去直到該Stage結(jié)束.這樣的過程便是一個(gè)Task的運(yùn)算過程,當(dāng)Task在一個(gè)Executor上進(jìn)行運(yùn)算時(shí),它是串行執(zhí)行的.故某個(gè)RDD的生命周期是該RDD的第一個(gè)partition產(chǎn)生后一直到最后一個(gè)使用該RDD所在的Stage完成計(jì)算的這段時(shí)間.如圖4所示.

圖4 Stage中Task的執(zhí)行過程圖Fig.4 Execution of the Task in the Stage
故對于RDDi的生命周期就是RDD生成第一個(gè)分區(qū)的時(shí)間fpTRDDi與最后一次使用該RDD的Stage完成計(jì)算時(shí)間eTstagei之差.我們給出的公式如下:
LTRDDi=eTstagei-fpTRDDi
(5)
在Spark中任務(wù)的處理也要考慮數(shù)據(jù)的本地性,Spark目前支持PROCESS_LOCAL(本地進(jìn)程)、NODE_LOCAL(本地節(jié)點(diǎn))、NO_PREE(沒有喜好)、PACK_LOCAL(本地機(jī)架)、ANY(任何).Spark的設(shè)置中有關(guān)于本地進(jìn)程、本地節(jié)點(diǎn)和本地機(jī)架等的等待時(shí)間.當(dāng)某一個(gè)Task在根據(jù)自己的locality level執(zhí)行時(shí),由于啟動失敗,然后以自己當(dāng)前的locality level等待第二次被調(diào)度,若等待時(shí)間超過了該本地化級別的默認(rèn)等待時(shí)間,則該Task會通過降低自己的locality level來嘗試被再次啟動.我們知道,對于很多Task來說,執(zhí)行時(shí)間往往比網(wǎng)絡(luò)傳輸和磁盤I/O的耗時(shí)要短得多.本文將不同的Locality Level進(jìn)行量化依次是:1、2、3、4、5,隨著數(shù)字的增大代表著Task計(jì)算節(jié)點(diǎn)與Task的輸入數(shù)據(jù)的節(jié)點(diǎn)距離越來越遠(yuǎn),這樣也就產(chǎn)生了網(wǎng)絡(luò)傳輸對Task執(zhí)行時(shí)間的影響,進(jìn)而也影響著RDD的計(jì)算效率.通過在Spark源碼中內(nèi)嵌代碼,統(tǒng)計(jì)RDDi的各個(gè)partition對應(yīng)的Task運(yùn)行的Locality Level,然后進(jìn)行疊加計(jì)算出RDDi的輸入RDD位置值.由于在Task運(yùn)行的過程中,是在一個(gè)executor上進(jìn)行的串行式的計(jì)算,所以除Task起始的那個(gè)RDD其他RDD的輸入位置都是本地進(jìn)程.
由此,我們給出公式如下:
(6)
其中,pLk表示RDDi中的第k個(gè)分區(qū)的輸入節(jié)點(diǎn)的Locality Level.
由公式(1)-公式(6)可得:
(7)
根據(jù)公式(7)我們提出基于WCSRP模型的緩存替換算法.具體操作如算法1所示.
算法1. 基于WCSRP模型的緩存替換算法
輸入: RDD分區(qū)權(quán)重集合 wList
空閑緩存大小 freememory
待緩存RDD分區(qū)的大小 size
待緩存RDD分區(qū)的權(quán)重 weight
輸出:緩存成功 true; 緩存失敗 false
初始化:
//待替換列表
rptlist<-new List
//待替換列表的總大小
rptlistsize = 0
//替換列表
rplist<-new List
for i=0 to wList.length-1 do
if weight > wList[i].weight then
rptlist.add(wList[i]);
rptlistsize+=wList[i].size;
end if
end for
if rptlist.Length==0 then
return false;
end if
if rptlistsize+freememory rptlist.clean(); return false; else rptlist.orderByWeight(); // 重排序 for i=0 to rptlist.Length-1 do rplist.add(rptlist[i]); freememory+=rptlist[i].size; if freememory>size then for j=0 to rplist.Length-1 do //移除替換列表中的Block delete(rplist[j]); end for rplist.clean(); return true; end if end for rptlist.clean(); return false; end if 算法1的具體實(shí)施過程如下: 1.首先,獲取需要緩存的RDD分區(qū)的大小和該RDD分區(qū)的權(quán)重值. 2.然后對緩存中的Block進(jìn)行過濾,將權(quán)重值小于待緩存RDD分區(qū)權(quán)重值的加入待替換列表. 3.若待替換列表為空,則停止替換,不緩存該RDD的分區(qū).若不為空,判斷待替換列表的大小和空閑空間的和是否小于申請大小,若小于則停止替換并清空待替換列表,若大于則將列表按權(quán)重值從小到大的順序排列. 4.遍歷待替換列表中的Block,依次加入替換列表,直到空閑內(nèi)存與替換列表中Block總大小的和大于等于待緩存RDD分區(qū)的大小,則停止遍歷,將替換列表中的Block替換出緩存,將待緩存RDD分區(qū)加入緩存,并清空替換列表和待替換列表. 5.若待替換列表中的Block的總大小與空閑內(nèi)存空間的大小的和小于待緩存RDD分區(qū)的大小,則停止替換,不緩存該RDD的分區(qū),清空待替換列表. 當(dāng)前Spark權(quán)重緩存替換算法,通常涉及RDD計(jì)算代價(jià)、RDD使用頻率、RDD分區(qū)大小和RDD生命周期等關(guān)鍵參數(shù).下面我們將本文提出的WCSRP與文獻(xiàn)[9]提出SACM、文獻(xiàn)[13]提出的DWRP以及Spark平臺現(xiàn)有的置換算法LRU進(jìn)行比較.如表1所示. 表1 相關(guān)緩存置換算法對比Table 1 Comparison of correlation cache replacement algorithms 對于SACM,計(jì)算權(quán)重時(shí)考慮了RDD使用頻率和RDD計(jì)算代價(jià),提供參數(shù)校準(zhǔn).和本文提出的WCSRP相比,考慮的因素不夠全面,另外其在進(jìn)行內(nèi)存置換時(shí)的置換目標(biāo)是RDD,當(dāng)出現(xiàn)內(nèi)存置換時(shí),會影響其他Task的執(zhí)行.因此,在采用本文WCSRP替換算法,不會出現(xiàn)Task運(yùn)行時(shí)所使用的RDD分區(qū)被替換出去需要重新計(jì)算的情況,應(yīng)用的運(yùn)行時(shí)間會比SACM有所降低.當(dāng)運(yùn)行長作業(yè)時(shí),降低的幅度尤為明顯. 對于DWRP,計(jì)算權(quán)重時(shí)考慮的因素有RDD使用頻率、RDD分區(qū)大小和RDD生命周期.和WCSRP相比較,其在進(jìn)行權(quán)重計(jì)算時(shí)僅考慮了三個(gè)因素,而RDD計(jì)算代價(jià)和Locality Level這兩個(gè)因素對應(yīng)用的執(zhí)行時(shí)間有著很大影響,另外權(quán)重計(jì)算公式中沒有提供參數(shù)校準(zhǔn).所以最終計(jì)算得到的權(quán)重值沒有本文提供的公式計(jì)算的權(quán)重值精確,在進(jìn)行內(nèi)存置換時(shí)對Spark作業(yè)執(zhí)行效率的提升沒有本文WCSRP明顯. 最后,對于Spark自帶的LRU置換算法,在進(jìn)行內(nèi)存置換時(shí)忽略了RDD分區(qū)的差異性,僅考慮當(dāng)前內(nèi)存中的RDD分區(qū)的訪問順序,而當(dāng)高重用但最近未使用或重新計(jì)算耗費(fèi)極高代價(jià)的RDD的分區(qū)被替換出去,應(yīng)用執(zhí)行的耗時(shí)增加,整個(gè)框架作業(yè)運(yùn)行效率會降低.而本文的WCSRP替換算法綜合考慮了影響RDD分區(qū)的多個(gè)因素,使得在進(jìn)行替換時(shí)不是盲目的僅根據(jù)當(dāng)前的訪問順序.這樣可以省去不必要的重新計(jì)算所花費(fèi)的時(shí)間,提高應(yīng)用的運(yùn)行效率. 為了實(shí)驗(yàn)驗(yàn)證WCSRP,我們在Spark平臺上實(shí)現(xiàn)了WCSRP,WCSRP各因素初始的權(quán)重值是A={а0,а1,а2,а3,а4}={0.3,0.2,0.2,0.2,0.1}.為了與本文提出的WCSRP形成對照,我們選取Spark自帶的緩存替換算法LRU來作對比. 實(shí)驗(yàn)在一臺服務(wù)器DELL PowerEdge T620(Intel Xeon E5-2620*2/32G/1T)上進(jìn)行,操作系統(tǒng)使用Debian 8.7,在服務(wù)器上虛擬出了4臺主機(jī),利用這四臺主機(jī)搭建了Spark集群.其中一個(gè)主機(jī)作為Spark的Master節(jié)點(diǎn),其他三個(gè)作為worker節(jié)點(diǎn).應(yīng)用提交后的運(yùn)行時(shí)間可以通過Spark的日志文件得到.實(shí)驗(yàn)數(shù)據(jù)選擇由SNAP提供的標(biāo)準(zhǔn)數(shù)據(jù)集Amazon0601,實(shí)驗(yàn)是測試使用不同的緩存替換機(jī)制,在不同迭代次數(shù)下PageRank算法的執(zhí)行時(shí)間.選擇PageRank算法來進(jìn)行實(shí)驗(yàn)的原因是因?yàn)镻ageRank算法是典型的數(shù)據(jù)密集型算法,會涉及到多次迭代,當(dāng)使用緩存后會有效的提升計(jì)算的效率. 首先測試PageRank算法使用Spark默認(rèn)的緩存替換算法,記錄不同的迭代次數(shù)進(jìn)行實(shí)驗(yàn)的結(jié)果,每個(gè)迭代次數(shù)進(jìn)行5次實(shí)驗(yàn),分別記錄下執(zhí)行的時(shí)間,然后得出平均值便是該迭代次數(shù)的執(zhí)行時(shí)間.然后使用新的權(quán)重緩存替換算法進(jìn)行同樣的實(shí)驗(yàn),對記錄的數(shù)據(jù)求平均值.實(shí)驗(yàn)結(jié)果如表2所示. 表2 LRU和WCSRP對比試驗(yàn)的統(tǒng)計(jì)結(jié)果Table 2 Result of Comparative test between LRU and WCSRP 單位:秒(S) 通過上表,便可得到圖5. 圖5 PageRank使用不同緩存替換算法對比試驗(yàn)Fig.5 PageRank uses different cache replacement algorithms to experiment 實(shí)驗(yàn)結(jié)果顯示,在相同的迭代次數(shù)使用不同的緩存替換算法的情況下,PageRank在Spark框架中的執(zhí)行時(shí)間是不同的.盡管WCSRP和LRU均隨著迭代次數(shù)的增加,執(zhí)行時(shí)間也在增加.但是使用WCSRP算法作為緩存替換算法時(shí)應(yīng)用的執(zhí)行時(shí)間明顯比使用LRU算法作為緩存替換算法時(shí)有所降低.因?yàn)樵谑褂肔RU算法時(shí),僅僅只是考慮內(nèi)存中的Block被訪問的時(shí)間,長期未被訪問的便會被置換出去,它忽略了該Block對整個(gè)應(yīng)用的價(jià)值.而在使用WCSRP算法時(shí)我們不僅綜合考慮RDD的計(jì)算代價(jià)、使用次數(shù)、分區(qū)的大小和生命周期四大因素對權(quán)重的影響,而且還增加考慮了Task執(zhí)行時(shí)locality level這個(gè)因素,準(zhǔn)確計(jì)算出了緩存中的每個(gè)Block的權(quán)重,將對該應(yīng)用執(zhí)行最有價(jià)值的Block繼續(xù)緩存在內(nèi)存中,防止在需要的時(shí)候進(jìn)行重復(fù)計(jì)算,使得有限的內(nèi)存資源得到了充分的利用,減少了重復(fù)計(jì)算的出現(xiàn),也便提高了整個(gè)應(yīng)用執(zhí)行的效率.所以才出現(xiàn)了實(shí)驗(yàn)結(jié)果中的執(zhí)行時(shí)間減少的現(xiàn)象. 本文提出了新的權(quán)重緩存替換算法,綜合考慮了影響RDD緩存的各大因素,改進(jìn)了之前提出的RDD權(quán)重緩存替換策略,用細(xì)粒度的RDD分區(qū)權(quán)重值來作為替換的度量標(biāo)準(zhǔn),改進(jìn)了各影響因素的量化方式.通過這種方法,提高了Spark框架應(yīng)用執(zhí)行的效率,減少了因內(nèi)存異常導(dǎo)致任務(wù)執(zhí)行失敗的情況出現(xiàn).理論和實(shí)驗(yàn)結(jié)果均證明了新的權(quán)重緩存替換算法對Spark框架有著明顯的優(yōu)化作用.5 理論分析與實(shí)驗(yàn)評價(jià)
5.1 理論比較分析

5.2 實(shí)驗(yàn)評價(jià)


6 總 結(jié)