丁夢(mèng)蘇,陳世敏
(計(jì)算機(jī)體系結(jié)構(gòu)國(guó)家重點(diǎn)實(shí)驗(yàn)室(中國(guó)科學(xué)院計(jì)算技術(shù)研究所),北京 100190)
(*通信作者電子郵箱chensm@ict.ac.cn)
輕量級(jí)大數(shù)據(jù)運(yùn)算系統(tǒng)Helius
丁夢(mèng)蘇,陳世敏*
(計(jì)算機(jī)體系結(jié)構(gòu)國(guó)家重點(diǎn)實(shí)驗(yàn)室(中國(guó)科學(xué)院計(jì)算技術(shù)研究所),北京 100190)
(*通信作者電子郵箱chensm@ict.ac.cn)
針對(duì)Spark數(shù)據(jù)集不可變,以及Java虛擬機(jī)(JVM)依賴環(huán)境引起的代碼執(zhí)行、內(nèi)存管理、數(shù)據(jù)序列化/反序列化等開銷過(guò)多的不足,采用C/C++語(yǔ)言,設(shè)計(jì)并實(shí)現(xiàn)了一種輕量級(jí)的大數(shù)據(jù)運(yùn)算系統(tǒng)——Helius。Helius支持Spark的基本操作,同時(shí)允許數(shù)據(jù)集整體修改;同時(shí),Helius利用C/C++優(yōu)化內(nèi)存管理和網(wǎng)絡(luò)傳輸,并采用stateless worker機(jī)制簡(jiǎn)化分布式計(jì)算平臺(tái)的容錯(cuò)恢復(fù)過(guò)程。實(shí)驗(yàn)結(jié)果顯示:5次迭代中,Helius運(yùn)行PageRank算法的時(shí)間僅為Spark的25.12%~53.14%,運(yùn)行TPCH Q6的時(shí)間僅為Spark的57.37%;在PageRank迭代1次的基礎(chǔ)上,運(yùn)行在Helius系統(tǒng)下時(shí),master節(jié)點(diǎn)IP接收和發(fā)送數(shù)據(jù)量約為運(yùn)行于Spark系統(tǒng)的40%和15%,而且200 s的運(yùn)行過(guò)程中,Helius占用的總內(nèi)存約為Spark的25%。實(shí)驗(yàn)結(jié)果與分析表明,與Spark相比,Helius具有節(jié)約內(nèi)存、不需要序列化和反序列化、減少網(wǎng)絡(luò)交互以及容錯(cuò)簡(jiǎn)單等優(yōu)點(diǎn)。
內(nèi)存計(jì)算;大數(shù)據(jù)運(yùn)算;分布式計(jì)算;有向無(wú)環(huán)圖調(diào)度;容錯(cuò)恢復(fù)
在科學(xué)研究和產(chǎn)業(yè)實(shí)踐中,MapReduce[1]集群編程模型已經(jīng)廣泛應(yīng)用于大規(guī)模數(shù)據(jù)處理。MapReduce系統(tǒng)把用戶編制的串行Map和Reduce程序自動(dòng)地分布并行執(zhí)行,在每次運(yùn)算前,系統(tǒng)需要從分布式文件系統(tǒng)中讀取輸入數(shù)據(jù),運(yùn)算完成后,系統(tǒng)要將計(jì)算結(jié)果寫入分布式文件系統(tǒng)中。如此一來(lái),多個(gè)MapReduce運(yùn)算之間只能通過(guò)分布式文件系統(tǒng)才能共享數(shù)據(jù),這不僅產(chǎn)生了大量的中間文件,而且反復(fù)讀寫磁盤大幅降低了運(yùn)算性能。隨著內(nèi)存容量指數(shù)級(jí)增長(zhǎng)和單位內(nèi)存價(jià)格不斷下降,大容量?jī)?nèi)存正成為服務(wù)器的標(biāo)準(zhǔn)配置,于是內(nèi)存計(jì)算逐漸被主流商用系統(tǒng)和開源工具所接受。以內(nèi)存計(jì)算為核心思想的Spark[2-4]在性能上遠(yuǎn)超基于MapReduce的Hadoop[5]:迭代計(jì)算性能和數(shù)據(jù)分析性能分別可以提高20倍和40倍。Spark在保持MapReduce自動(dòng)容錯(cuò)、位置感知調(diào)度、可擴(kuò)展性等優(yōu)點(diǎn)的同時(shí),高效地支持多個(gè)運(yùn)算通過(guò)內(nèi)存重用中間結(jié)果,從而避免了外存訪問(wèn)的開銷。Spark的基本數(shù)據(jù)模型是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset, RDD)[3]。一個(gè)RDD是一個(gè)只讀的數(shù)據(jù)集合,生成之后不能修改。RDD支持粗粒度的運(yùn)算,即集合中的每個(gè)數(shù)據(jù)元素都進(jìn)行統(tǒng)一的運(yùn)算。RDD可以劃分為分區(qū)分布在多個(gè)機(jī)器節(jié)點(diǎn)上,所以一個(gè)運(yùn)算可以在多個(gè)節(jié)點(diǎn)上分布式地執(zhí)行。Spark通過(guò)記錄計(jì)算間的沿襲(Lineage)以支持容錯(cuò),當(dāng)出現(xiàn)故障導(dǎo)致RDD分區(qū)丟失時(shí),Spark根據(jù)記錄的計(jì)算沿襲,重新計(jì)算并重建丟失的分區(qū)。
然而,Spark的設(shè)計(jì)和實(shí)現(xiàn)存在著一定的局限性。首先, RDD被設(shè)計(jì)成只讀的數(shù)據(jù)集,既不支持重寫,也不支持?jǐn)?shù)據(jù)追加。于是,Spark需要為每個(gè)新創(chuàng)建的RDD分配內(nèi)存空間,尤其在迭代計(jì)算時(shí),每個(gè)循環(huán)都產(chǎn)生一組新的RDD,這加大了內(nèi)存開銷。其次,Spark采用Scala程序設(shè)計(jì)語(yǔ)言實(shí)現(xiàn),在Java虛擬機(jī)(Java Virtual Machine, JVM)[6]上運(yùn)行,繼承了Java的一系列問(wèn)題。程序編譯后生成字節(jié)碼,執(zhí)行時(shí)再由JVM解釋執(zhí)行或進(jìn)行即時(shí)(Just-In-Time, JIT)編譯成為機(jī)器碼。內(nèi)存管理無(wú)法主動(dòng)釋放內(nèi)存,必須由JVM的垃圾回收機(jī)制才能釋放內(nèi)存。數(shù)據(jù)傳輸時(shí),需要經(jīng)歷數(shù)據(jù)的序列化和反序列化,不僅增加了轉(zhuǎn)換的計(jì)算代價(jià),而且序列化的數(shù)據(jù)通常增加了類型等信息,引起網(wǎng)絡(luò)傳輸數(shù)據(jù)量的增加。這些問(wèn)題在一定程度上限制了系統(tǒng)的性能。
為此,用C/C++語(yǔ)言設(shè)計(jì)并實(shí)現(xiàn)了一種輕量級(jí)的大數(shù)據(jù)運(yùn)算系統(tǒng)——Helius。Helius采用了一種類似于RDD的數(shù)據(jù)模型,稱為BPD(Bulk Parallel Dataset)。BPD與RDD的區(qū)別在于BPD可寫,RDD只可讀。用戶可以選擇重寫B(tài)PD,系統(tǒng)無(wú)須重新分配內(nèi)存,而是直接覆蓋原來(lái)的區(qū)域。這樣不僅節(jié)省了內(nèi)存開銷,而且提高了運(yùn)算性能。BPD在多個(gè)計(jì)算間提供了一種高效的共享方式,計(jì)算結(jié)果存入內(nèi)存,其他計(jì)算通過(guò)直接訪問(wèn)內(nèi)存快速地獲取輸入。
與Spark相同,Helius也采用master-worker分布式架構(gòu),通過(guò)記錄各個(gè)BPD操作之間的計(jì)算沿襲構(gòu)建依賴關(guān)系,動(dòng)態(tài)生成計(jì)算的有向無(wú)環(huán)圖(Directed Acyclic Graph, DAG)[7],劃分計(jì)算階段,在每個(gè)階段中多個(gè)計(jì)算任務(wù)并行執(zhí)行。Helius支持Spark的各項(xiàng)計(jì)算、自動(dòng)容錯(cuò)、感知調(diào)度和可擴(kuò)展性。相對(duì)Spark而言,Helius的優(yōu)勢(shì)具體如下:
1)降低內(nèi)存開銷。Helius采用C/C++實(shí)現(xiàn),程序運(yùn)行時(shí)能夠?qū)崟r(shí)回收內(nèi)存;此外,BPD的可變性支持系統(tǒng)在計(jì)算過(guò)程中充分利用已有的內(nèi)存空間,減少了不必要的內(nèi)存開銷。
2)不需要序列化和反序列化。數(shù)據(jù)在系統(tǒng)中以二進(jìn)制字節(jié)的方式存儲(chǔ),當(dāng)集群節(jié)點(diǎn)都是x86機(jī)器時(shí),網(wǎng)絡(luò)傳輸時(shí)可以直接發(fā)送二進(jìn)制數(shù)據(jù),不需要進(jìn)行Endian轉(zhuǎn)換和序列化/反序列化。
3)減少網(wǎng)絡(luò)交互。Helius使用一種類似于push的方式傳遞數(shù)據(jù),master直接操控?cái)?shù)據(jù)的傳輸,worker之間不需要互相發(fā)送請(qǐng)求,從而減少了網(wǎng)絡(luò)請(qǐng)求的交互。
4)簡(jiǎn)化容錯(cuò)恢復(fù)。Helius應(yīng)用了一種stateless worker的思想,worker遵循網(wǎng)絡(luò)請(qǐng)求進(jìn)行工作,請(qǐng)求包含計(jì)算所需的狀態(tài)信息,而worker除BPD數(shù)據(jù)分區(qū)外,不保存計(jì)算狀態(tài)。這樣,系統(tǒng)將多點(diǎn)故障集中到了對(duì)單點(diǎn)master的故障處理。
1.1 BPD數(shù)據(jù)模型
類似于Spark系統(tǒng)中的RDD,BPD是一種分布式的數(shù)據(jù)集合,可以劃分為多個(gè)數(shù)據(jù)分區(qū),存放在多個(gè)worker節(jié)點(diǎn)上。BPD支持粗粒度的運(yùn)算,集合中的每個(gè)數(shù)據(jù)元素都進(jìn)行統(tǒng)一的操作。這樣,不同worker節(jié)點(diǎn)上的BPD分區(qū)可以并行執(zhí)行相同的運(yùn)算。
與RDD不同,BPD是可變數(shù)據(jù)集。考慮一個(gè)簡(jiǎn)單的例子,對(duì)所有數(shù)據(jù)元素自增,因RDD只讀性的限制,Spark需要分配內(nèi)存空間,為這個(gè)操作創(chuàng)建一個(gè)新的RDD;而Helius避免了額外的空間開銷,新的結(jié)果可以直接填充覆蓋原始的數(shù)據(jù)集。
BPD遵循一套嚴(yán)格且靈活的可變機(jī)制。嚴(yán)格性是系統(tǒng)層考慮的問(wèn)題,體現(xiàn)在只有用戶計(jì)算產(chǎn)生的BPD可變,并且要求計(jì)算過(guò)程中新產(chǎn)生的數(shù)據(jù)元素占用的內(nèi)存空間維持不變。Helius針對(duì)少部分遵循可變機(jī)制的用戶計(jì)算函數(shù)(UDFListCombine函數(shù))實(shí)現(xiàn)更新接口,其他函數(shù)不提供BPD更新支持。靈活性針對(duì)用戶層而言,用戶調(diào)用支持BPD更新的函數(shù)(如UDFListCombine)時(shí),可以通過(guò)設(shè)置函數(shù)參數(shù)(真或假)指示該BPD是否在該運(yùn)算中更新。若不更新,系統(tǒng)將新創(chuàng)建一個(gè)BPD;若更新,新結(jié)果將覆蓋待計(jì)算的BPD,無(wú)需重新分配空間。對(duì)于一系列不改變數(shù)據(jù)結(jié)構(gòu)的操作而言,系統(tǒng)只需覆蓋相應(yīng)數(shù)值,在處理大量的數(shù)據(jù)時(shí)節(jié)省了時(shí)空開銷。
RDD的只讀性簡(jiǎn)化了數(shù)據(jù)一致性的實(shí)現(xiàn)。在Helius中則需要考慮如何保持BPD的一致性。與Spark相似,在Helius中,用戶提供一個(gè)主驅(qū)動(dòng)程序,BPD體現(xiàn)為程序中的特殊變量,變量之間的運(yùn)算對(duì)應(yīng)于BPD分布式運(yùn)算。Helius的master節(jié)點(diǎn)加載主驅(qū)動(dòng)程序,按照?qǐng)?zhí)行步驟,執(zhí)行相應(yīng)的分布式運(yùn)算。從概念上看,雖然BPD運(yùn)算是分布并行的,但是這個(gè)主驅(qū)動(dòng)程序?qū)嶋H上是一個(gè)串行程序(當(dāng)然可以包含循環(huán)、分支等控制流語(yǔ)句),它描述了BPD運(yùn)算步驟之間的串行執(zhí)行順序和依賴關(guān)系。所以,單一的主驅(qū)動(dòng)程序可以保證BPD數(shù)據(jù)的一致性。對(duì)于多個(gè)并發(fā)執(zhí)行的主驅(qū)動(dòng)程序,Helius禁止發(fā)生修改的BPD在多個(gè)并發(fā)程序之間共享,只有當(dāng)進(jìn)行修改操作的程序執(zhí)行完畢后,被修改的BPD才可以被其他程序所使用。
具體實(shí)現(xiàn)時(shí),master記錄BPD的元數(shù)據(jù),主要包含依賴關(guān)系、分區(qū)數(shù)據(jù)、分區(qū)劃分信息和存儲(chǔ)方式。依賴關(guān)系記錄了父子BPD之間的轉(zhuǎn)換關(guān)系,分區(qū)數(shù)據(jù)記錄了子分區(qū)與一個(gè)或多個(gè)父分區(qū)之間的生成規(guī)則。一個(gè)BPD的多個(gè)分區(qū)大小可以不等,存儲(chǔ)在worker節(jié)點(diǎn)上。worker把內(nèi)存劃分為等長(zhǎng)的數(shù)據(jù)塊,一個(gè)BPD分區(qū)由一個(gè)或多個(gè)數(shù)據(jù)塊組成,這些數(shù)據(jù)塊分布在內(nèi)存或文件中,具體的存儲(chǔ)位置由BPD的存儲(chǔ)方式?jīng)Q定。用戶可調(diào)用系統(tǒng)接口選擇數(shù)據(jù)的存儲(chǔ)方式。BPD可以按照用戶指定的劃分方式重新哈希散列成指定個(gè)數(shù)的分區(qū)。
1.2 BPD記錄數(shù)據(jù)類型
Helius系統(tǒng)將BPD按二進(jìn)制數(shù)據(jù)進(jìn)行存儲(chǔ)和處理。一個(gè)BPD數(shù)據(jù)集中的所有記錄都具有相同的結(jié)構(gòu),可以是鍵值對(duì)Key-Value元組,也可以是無(wú)key或是無(wú)value的單個(gè)元素。對(duì)于key或是value,它的結(jié)構(gòu)可以是定長(zhǎng)或變長(zhǎng)的數(shù)據(jù),可以表達(dá)C/C++中的原子類型(數(shù)值類型、字符串類型等)、struct和class數(shù)據(jù)(內(nèi)部不允許指針、沒有虛函數(shù))。key或value也可以進(jìn)一步有內(nèi)部嵌套結(jié)構(gòu),可以嵌套包含兩個(gè)值或是多個(gè)值。嵌套主要發(fā)生在Join等操作的結(jié)果BPD上。系統(tǒng)提供方法獲取BPD記錄的key或value的二進(jìn)制數(shù)據(jù),二進(jìn)制數(shù)據(jù)與正確的數(shù)值類型的轉(zhuǎn)換依賴于用戶的代碼。通常在C/C++程序中,只需要對(duì)相應(yīng)類型的指針變量賦值即可,不需要額外的轉(zhuǎn)換和拷貝。
1.3 編程模型
用戶將C/C++的主驅(qū)動(dòng)程序編譯成動(dòng)態(tài)庫(kù)后提交給master,與此同時(shí)指定master的運(yùn)行入口函數(shù)。master解析庫(kù)文件,依次執(zhí)行函數(shù)體內(nèi)的語(yǔ)句。在主驅(qū)動(dòng)程序中,一個(gè)BPD表現(xiàn)為一個(gè)可操作的C++對(duì)象。各種計(jì)算通過(guò)調(diào)用該對(duì)象相應(yīng)的方法而實(shí)現(xiàn),計(jì)算可以生成新的BPD對(duì)象,或者修改已有的BPD對(duì)象。而這些BPD對(duì)象上的操作,就被Helius對(duì)應(yīng)為對(duì)BPD多個(gè)分區(qū)上的分布式運(yùn)算。
Helius提供兩大類計(jì)算,用以處理BPD數(shù)據(jù)集:系統(tǒng)計(jì)算和用戶計(jì)算。
1)系統(tǒng)計(jì)算:完全由系統(tǒng)實(shí)現(xiàn)的計(jì)算,包括union、cartesianProduct、partitionBy、join、groupBy等,用戶可以直接調(diào)用系統(tǒng)計(jì)算函數(shù)處理BPD數(shù)據(jù)集,這些計(jì)算都不改變輸入的BPD數(shù)據(jù)集。
union(A,B) →A∪BcartesianProduct({
2)用戶計(jì)算:系統(tǒng)提供應(yīng)用程序編程接口(ApplicationProgrammingInterface,API),由用戶實(shí)現(xiàn)具體操作功能。用戶在處理數(shù)據(jù)集之前需要根據(jù)API實(shí)現(xiàn)函數(shù)接口。在用戶計(jì)算中,用戶可以選擇是否改變待計(jì)算的BPD數(shù)據(jù)集。
A=udfCompute(B,udf)
A=udfComputeMulti(B,C, …,udf)
A=udfListCombine(B,udf)
系統(tǒng)計(jì)算函數(shù)的語(yǔ)義很清晰。例如:join操作把兩組輸入BPD的Key-Value記錄,按照key進(jìn)行等值連接,輸出記錄的value部分是嵌套結(jié)構(gòu),由兩個(gè)匹配記錄的value部分組合形成;groupBy操作按照key進(jìn)行分組,把同一組的所有value表示成一個(gè)list,即一個(gè)包含多值的嵌套結(jié)構(gòu)。而用戶計(jì)算接口主要包括三類,都要求用戶提供一個(gè)根據(jù)相應(yīng)接口實(shí)現(xiàn)的函數(shù)(在表中以u(píng)df表示)。首先,udfCompute方法針對(duì)單個(gè)BPD數(shù)據(jù)集的每條Key-Value記錄進(jìn)行處理,例如可以實(shí)現(xiàn)WordCount中單詞的拆分。udfComputeMulti方法對(duì)多個(gè)BPD數(shù)據(jù)集的Key-Value記錄進(jìn)行某種運(yùn)算。實(shí)際上,多個(gè)輸入的BPD數(shù)據(jù)集進(jìn)行了一次join操作,系統(tǒng)對(duì)每個(gè)join的結(jié)果調(diào)用一次用戶實(shí)現(xiàn)的udfComputeMulti函數(shù)。這兩類操作對(duì)數(shù)據(jù)集的結(jié)構(gòu)沒有要求,可為1.2節(jié)提及的任意一種存在形式。udfListCombine與udfCompute的處理對(duì)象類似,不同的是數(shù)據(jù)集key、value必須同時(shí)存在,并且value為包含多值的嵌套結(jié)構(gòu)。它實(shí)現(xiàn)對(duì)每個(gè)key的多個(gè)value值進(jìn)行聚合的操作,類似MapReduce系統(tǒng)中的Reduce操作。
1.4 實(shí)例介紹
以WordCount為例,統(tǒng)計(jì)文本中所有單詞出現(xiàn)的次數(shù),用戶的主驅(qū)動(dòng)程序如下:
BPD*lines=sc.loadFile(file);BPD*words=udfCompute(lines,newmySplit());BPD*wordgroup=words->groupBy();BPD*wordcount=udfListCombine(wordgroup,newmyCombine());
loadFile函數(shù)用于從文本生成一個(gè)BPD對(duì)象——lines,它的每個(gè)記錄是一行文本。udfCompute函數(shù)調(diào)用用戶自定義的mySplit函數(shù)對(duì)每行文本記錄進(jìn)行處理,在該示例中表現(xiàn)為將字符串拆分成多個(gè)單詞,產(chǎn)生的words結(jié)果記錄中key為單詞,value為數(shù)值1。groupBy函數(shù)將Key-Value數(shù)據(jù)集按key進(jìn)行分組。在這里,每個(gè)不同的單詞為一組。最后,udfListCombine函數(shù)調(diào)用用戶自定義的myCombine函數(shù)對(duì)同一key的所有value進(jìn)行某種運(yùn)算(在該示例中為求和)。
下面以u(píng)dfListCombine函數(shù)為例,介紹udf函數(shù)的實(shí)現(xiàn)。用戶自定義實(shí)現(xiàn)的函數(shù)如下:
classmyCombine:publicUDFListCombine{voidcall(ValueIterator*it,Value*out){intsum=0;while(it->hasNext()){int*val=(int*)(it->next());sum+=*val;
}
out->put(&sum,sizeof(int));
}};
用戶實(shí)現(xiàn)了一個(gè)myCombine類,它繼承了UDFListCombine類,實(shí)現(xiàn)UDFListCombine中的虛函數(shù)call()。call()的第一個(gè)輸入?yún)?shù)是一個(gè)定義在輸入BPD記錄value列表上的Iterator迭代器。上述實(shí)現(xiàn)在while循環(huán)中通過(guò)這個(gè)Iterator依次訪問(wèn)列表中的每個(gè)value,把value的地址賦值給相應(yīng)類型的指針,就可以直接操作。call()的第二個(gè)參數(shù)用于輸出結(jié)果的BPD的value部分。在這里,把求和的結(jié)果寫入out。
從上面的示例可見,用戶可以使用C/C++程序簡(jiǎn)潔地表達(dá)大數(shù)據(jù)的運(yùn)算。
Helius分布式運(yùn)行的基礎(chǔ)是表達(dá)BPD運(yùn)算關(guān)系的DAG。用戶的主驅(qū)動(dòng)程序提交給master執(zhí)行時(shí),系統(tǒng)通過(guò)BPD變量獲取具體運(yùn)算及依賴關(guān)系,形成運(yùn)算DAG。然后,Helius把一個(gè)DAG劃分成多個(gè)階段,每個(gè)階段內(nèi)部的運(yùn)算可以在一起執(zhí)行,從而減少中間結(jié)果的生成。一個(gè)階段的輸出結(jié)果為另一個(gè)階段的輸入。其中,最后一個(gè)階段的輸入來(lái)自原始數(shù)據(jù)源(例如文件),第一個(gè)階段的計(jì)算結(jié)果是程序最終的輸出結(jié)果。按照這種層次依賴關(guān)系,系統(tǒng)自上而下檢查各個(gè)階段(首先檢查第一個(gè)階段),當(dāng)前階段運(yùn)行時(shí)將自動(dòng)檢測(cè)其依賴的其他階段,若其他階段準(zhǔn)備就緒,則提交該階段的任務(wù);否則,迭代檢查依賴的所有階段,直至所有依賴階段準(zhǔn)備就緒后提交。每個(gè)階段包含了一系列任務(wù),系統(tǒng)將這些任務(wù)分配到最佳節(jié)點(diǎn)位置,并確保所有數(shù)據(jù)就緒。
2.1 DAG的生成及階段的創(chuàng)建
在Helius系統(tǒng)中,DAG的生成過(guò)程以及階段的創(chuàng)建過(guò)程與Spark系統(tǒng)類似,都是根據(jù)用戶的主驅(qū)動(dòng)程序進(jìn)行的。用戶主驅(qū)動(dòng)程序執(zhí)行時(shí),系統(tǒng)先記錄BPD的運(yùn)算和依賴關(guān)系,并不立即執(zhí)行所對(duì)應(yīng)的分布式運(yùn)算,只有當(dāng)遇到lookup、collect和程序結(jié)束時(shí),才執(zhí)行之前記錄的所有BPD運(yùn)算。
與Spark不同的是,Helius將數(shù)據(jù)的shuffle操作單獨(dú)抽取出來(lái),顯示地表達(dá)在DAG中,而非表示在其他的操作里。這樣,DAG可以記錄shuffle的狀態(tài)信息,而不需要每個(gè)worker在實(shí)現(xiàn)BPD運(yùn)算(例如groupBy)時(shí),記錄shuffle的狀態(tài)信息。
記錄的BPD形成了一個(gè)運(yùn)算有向無(wú)環(huán)圖(DAG),如圖1所示。圖的每個(gè)頂點(diǎn)是一個(gè)BPD或者BPD的版本(若被修改),頂點(diǎn)之間的有向邊代表BPD運(yùn)算的生成關(guān)系。有向邊從輸入BPD指向結(jié)果BPD。
圖1中每個(gè)頂點(diǎn)代表一個(gè)BPD,其中BPD1和BPD2的union操作生成BPD3,BPD3的groupBy操作產(chǎn)生BPD4,BPD4是最終的計(jì)算目標(biāo)。系統(tǒng)在執(zhí)行用戶主驅(qū)動(dòng)程序時(shí),記錄BPD的運(yùn)算和依賴關(guān)系。在這個(gè)例子中,當(dāng)程序結(jié)束時(shí),才生成DAG開始分布式計(jì)算。需要注意,圖1中BPD3和BPD4之間的邊是虛線,實(shí)際上DAG中刪除了這條邊。這也正體現(xiàn)了Helius與Spark的不同點(diǎn)。因?yàn)間roupBy操作隱含地需要shuffle數(shù)據(jù),系統(tǒng)自動(dòng)生成了BPD5(圖中深色填充表示),并修改了圖,使BPD3的輸出指向BPD5,BPD5的輸出指向BPD4。

圖1 DAG生成過(guò)程
階段的創(chuàng)建由目標(biāo)頂點(diǎn)和shuffle操作確定。在圖1所示的DAG基礎(chǔ)上,master開始自下而上創(chuàng)建階段,如圖2所示。master首先為目標(biāo)頂點(diǎn)BPD4創(chuàng)建一個(gè)階段(記為階段0),并從該位置開始迭代遍歷其父BPD。檢測(cè)發(fā)現(xiàn)BPD4依賴于shuffle的結(jié)果,而shuffle必然需要網(wǎng)絡(luò)傳輸,所以master以shuffle對(duì)應(yīng)的BPD5為目的創(chuàng)建一個(gè)新的階段(記為階段1)。依此類推,master將DAG以shuffle為邊界分為多個(gè)階段,每個(gè)階段內(nèi)的BPD運(yùn)算可以整合在一起執(zhí)行,以提高運(yùn)算的性能。

圖2 階段創(chuàng)建過(guò)程
所有階段創(chuàng)建完畢后,master從上向下依次遞歸提交階段:在嘗試提交階段0,master檢測(cè)到該階段依賴于階段1,于是master掛起階段0重新提交階段1;由于階段1無(wú)依賴階段,因此階段1順利被提交;master開始提交階段1對(duì)應(yīng)的所有任務(wù),階段1完成后遞歸提交階段0;目標(biāo)階段完成后,結(jié)束調(diào)度。
2.2 任務(wù)提交
當(dāng)一個(gè)階段成功提交后,master將為該階段的目標(biāo)BPD創(chuàng)建并提交任務(wù)。BPD的每個(gè)分區(qū)作為一個(gè)任務(wù),各個(gè)分區(qū)獨(dú)立地執(zhí)行相同的計(jì)算,這使得多個(gè)任務(wù)可以在多個(gè)worker節(jié)點(diǎn)上并行執(zhí)行。
在分布式運(yùn)算環(huán)境下,基于位置感知分配任務(wù)到存儲(chǔ)數(shù)據(jù)的節(jié)點(diǎn)會(huì)大幅提高運(yùn)算的性能,減小網(wǎng)絡(luò)傳輸?shù)膸挕elius提供位置感知調(diào)度。在DAG的基礎(chǔ)上,master進(jìn)一步確定父子BPD每個(gè)分區(qū)之間的映射關(guān)系(shuffle過(guò)程除外)。在分配任務(wù)時(shí),遞歸計(jì)算該任務(wù)所在的分區(qū)依賴的父分區(qū)的位置,直到找到已經(jīng)緩存的父分區(qū)后,將該任務(wù)發(fā)送到該父分區(qū)所在的worker節(jié)點(diǎn),完成位置感知調(diào)度。
如果一個(gè)任務(wù)同時(shí)依賴兩個(gè)父分區(qū),并且兩個(gè)父分區(qū)均已緩存時(shí),那么默認(rèn)將該任務(wù)分配到第一個(gè)依賴的父分區(qū)上。當(dāng)兩個(gè)父分區(qū)的數(shù)據(jù)在不同節(jié)點(diǎn)上,并且第二個(gè)父分區(qū)的數(shù)據(jù)量遠(yuǎn)大于第一個(gè)父分區(qū)時(shí),將任務(wù)分發(fā)給第一個(gè)父分區(qū)所處的工作節(jié)點(diǎn)會(huì)增加網(wǎng)絡(luò)開銷,降低系統(tǒng)性能。一種優(yōu)化方法是根據(jù)多個(gè)依賴的父分區(qū)的數(shù)據(jù)量確定最佳分配節(jié)點(diǎn)。
2.3 數(shù)據(jù)傳輸
當(dāng)一個(gè)任務(wù)依賴多個(gè)數(shù)據(jù)源(多個(gè)父BPD分區(qū)),并且多個(gè)數(shù)據(jù)源不在同一工作節(jié)點(diǎn)時(shí),worker節(jié)點(diǎn)需要獲得所有的輸入數(shù)據(jù),才能開始計(jì)算任務(wù)。
Spark提供一種類似pull的獲取方式,如圖3所示。workerA向master請(qǐng)求數(shù)據(jù),master定位數(shù)據(jù)所在的workerB,由workerB將數(shù)據(jù)發(fā)送給workerA。workerA完成任務(wù)后回答master。

圖3 Spark 數(shù)據(jù)傳輸機(jī)制
值得注意的是,在Spark系統(tǒng)中,對(duì)依賴數(shù)據(jù)的獲取是計(jì)算任務(wù)的一部分,worker在運(yùn)行提交的任務(wù)時(shí),可能需要遠(yuǎn)程獲取數(shù)據(jù)。在發(fā)送消息1和消息4之間,workerA需要保持相應(yīng)的狀態(tài)信息,使worker的工作和故障處理變得相對(duì)復(fù)雜。
Helius提出一種statelessworker的機(jī)制,對(duì)數(shù)據(jù)的獲取過(guò)程類似于push。在該機(jī)制下,worker不負(fù)責(zé)獲取數(shù)據(jù),而是由master指示其進(jìn)行操作。worker對(duì)于每個(gè)網(wǎng)絡(luò)請(qǐng)求,只完成相應(yīng)的操作,而在網(wǎng)絡(luò)請(qǐng)求之間,不記錄額外的狀態(tài)。如果一個(gè)任務(wù)所需數(shù)據(jù)在本節(jié)點(diǎn)不存在時(shí),向master報(bào)錯(cuò)。
將提交任務(wù)分成兩個(gè)步驟:傳輸數(shù)據(jù)和提交作業(yè)。數(shù)據(jù)傳輸?shù)倪^(guò)程如圖4所示:master告訴workerB傳輸數(shù)據(jù)給workerA,workerB傳輸指定的數(shù)據(jù),workerA接收完數(shù)據(jù)后回復(fù)master傳輸完成;master接收到傳輸完畢信號(hào)后,緊接著提交作業(yè)。這樣一來(lái),系統(tǒng)保證了在分配工作之前,工作節(jié)點(diǎn)有需要的數(shù)據(jù)支持工作的進(jìn)行,同時(shí)worker不需要保持額外的狀態(tài)。這種statelessworker的機(jī)制簡(jiǎn)化了系統(tǒng)的容錯(cuò)處理,由于worker嚴(yán)格地按照master的指示工作,worker的工作機(jī)制相對(duì)來(lái)說(shuō)簡(jiǎn)單了許多,在該點(diǎn)的故障及故障處理隨之簡(jiǎn)化。系統(tǒng)將故障處理主要集中在master執(zhí)行。

圖4 Helius數(shù)據(jù)傳輸機(jī)制
2.4 數(shù)據(jù)重組
不同于數(shù)據(jù)傳輸(transfer)操作,數(shù)據(jù)重組(shuffle)操作需要將數(shù)組重組分發(fā)到所有的工作節(jié)點(diǎn),可能會(huì)占用大量的內(nèi)存空間和網(wǎng)絡(luò)帶寬。
為了減少shuffle對(duì)系統(tǒng)性能的影響,采用一種基于雙緩沖的邊計(jì)算邊發(fā)送的策略。worker為每個(gè)shuffle目標(biāo)worker節(jié)點(diǎn)都維持著一個(gè)緩沖區(qū),包含2個(gè)數(shù)據(jù)塊空間(分區(qū)數(shù)據(jù)由多個(gè)等長(zhǎng)數(shù)據(jù)塊組成)。在處理shuffle時(shí),將數(shù)據(jù)寫入相應(yīng)worker的緩沖區(qū)的數(shù)據(jù)塊中。當(dāng)緩沖區(qū)中一個(gè)數(shù)據(jù)塊已滿,可以發(fā)送這個(gè)數(shù)據(jù)塊,同時(shí)將數(shù)據(jù)寫入另一個(gè)數(shù)據(jù)塊。
圖5呈現(xiàn)的是針對(duì)workerA單方面shuffle產(chǎn)生的數(shù)據(jù)發(fā)送的過(guò)程。圖中的連線表示worker之間的連接狀態(tài),填充灰色部分代表該部分內(nèi)存已滿。

圖5 數(shù)據(jù)shuffle過(guò)程
3.1 實(shí)驗(yàn)環(huán)境
集群環(huán)境由5臺(tái)服務(wù)器組成,其中1臺(tái)master,4臺(tái)worker,服務(wù)器的處理器為IntelXeonCPUES- 2650v2 @2.60GHz×8, 內(nèi)存128GB, 硬盤1TB,操作系統(tǒng)為Ubuntu14.04 64位。集群中的工作節(jié)點(diǎn)均單線程運(yùn)行。Helius和Spark的實(shí)驗(yàn)版本分別為0.0.1 和1.6.1。Helius編譯器為G++ 4.8.1, -o2選項(xiàng)優(yōu)化,Spark編譯器為Sbt0.13.12。
實(shí)驗(yàn)以PageRank[8-9]算法和TPCH[10]基準(zhǔn)為例,從時(shí)間、網(wǎng)絡(luò)、內(nèi)存三方面開銷比較Helius和Spark的性能,并在最后對(duì)BPD的更新性能以及Helius的可擴(kuò)展性進(jìn)行評(píng)估。
3.2 PageRank
實(shí)驗(yàn)輸入文本為1.1GB, 包含網(wǎng)頁(yè)4 847 570個(gè),鏈接記錄68 993 773條。Spark集群運(yùn)行PageRank算法的配置選項(xiàng)為:spark.driver.memory=16g,spark.executor.memory=16g。
3.2.1 時(shí)間開銷
運(yùn)行PageRank算法時(shí),分別記錄迭代1、2、3、4、5次的時(shí)間開銷。表1呈現(xiàn)的實(shí)現(xiàn)結(jié)果表明,在迭代5次的過(guò)程中,Helius運(yùn)行PageRank算法的時(shí)間僅為Spark的25.12%~53.14%。因?yàn)镠elius在實(shí)現(xiàn)PageRank算法時(shí),采用的是一種建立在數(shù)據(jù)塊內(nèi)有序、塊間無(wú)序的基礎(chǔ)上優(yōu)化join操作的策略,在每次更新rank值時(shí)直接重寫舊值,而非重新創(chuàng)建新的BPD。

表1 Helius和Spark迭代時(shí)間對(duì)比
3.2.2 網(wǎng)絡(luò)開銷
在PageRank迭代1次的基礎(chǔ)上,記錄master節(jié)點(diǎn)在程序運(yùn)行過(guò)程中接收到的字節(jié)數(shù)和發(fā)送的字節(jié)數(shù),結(jié)果如表2所示。在分布式環(huán)境中,運(yùn)行在Helius系統(tǒng)下時(shí),master節(jié)點(diǎn)IP接收和發(fā)送數(shù)據(jù)量約為運(yùn)行于Spark系統(tǒng)的40%和15%。

表2 Helius和Spark網(wǎng)絡(luò)開銷對(duì)比
3.2.3 內(nèi)存開銷
在PageRank迭代1次的基礎(chǔ)上, 每隔5s記錄worker節(jié)點(diǎn)內(nèi)存剩余情況。表3呈現(xiàn)的是以20s為間隔記錄的worker節(jié)點(diǎn)使用的內(nèi)存量(單位:MB)。Helius在50s左右運(yùn)行結(jié)束,逐漸回收內(nèi)存;此時(shí),Spark仍處于工作狀態(tài),直到210s左右結(jié)束。在worker運(yùn)行的過(guò)程中,Helius占用內(nèi)存6 758MB,Spark占用內(nèi)存26 648MB,Helius約為Spark的25%。

表3 Helius和Spark內(nèi)存開銷對(duì)比
3.3 TPCH Q6性能
以TPCH的ForecastingRevenueChangeQuery(Q6)為例,取ScaleFactor為100(文本79.8GB),測(cè)試Helius和Spark的運(yùn)行時(shí)間。Spark在該例中為默認(rèn)配置。實(shí)驗(yàn)結(jié)果為Helius花費(fèi)271.595s,Spark花費(fèi)473.382s,Helius消耗時(shí)間僅為Spark的57.37%。
Helius從文本獲取輸入數(shù)據(jù)是一種篩選-丟棄的過(guò)程,根據(jù)用戶提供的查詢字段的列值,在讀取文本記錄時(shí)選取相應(yīng)的字段值構(gòu)成數(shù)據(jù)集,后續(xù)所有操作都建立在已篩選字段的數(shù)據(jù)集的基礎(chǔ)上;而Spark程序在加載文件時(shí)沒有對(duì)字段進(jìn)行篩選,運(yùn)行過(guò)程中,所有的數(shù)據(jù)集中的每條記錄都保持了輸入文本的所有字段。
3.4 BPD更新性能
在PageRank迭代1次的基礎(chǔ)上,測(cè)試在BPD更新與不更新的情況下,worker運(yùn)行UDFListCombine函數(shù)的開銷時(shí)間,以及master運(yùn)行用戶提交的驅(qū)動(dòng)程序所用的總時(shí)間。PageRank在迭代1次的基礎(chǔ)上會(huì)運(yùn)行1次UDFListCombine函數(shù)。
從表4可以看出,在BPD更新的情況下,worker運(yùn)行UDFListCombine的速度比不更新稍快;master運(yùn)行整個(gè)程序也稍快。就表4的結(jié)果而言,BPD更新在運(yùn)行時(shí)間方面的性能提升不大,這種結(jié)果很大程度上受到Helius實(shí)現(xiàn)的限制,我們將在后續(xù)的工作中進(jìn)一步研究BPD的更新。

表4 有否BPD更新時(shí)運(yùn)行時(shí)間對(duì)比
3.5 可擴(kuò)展性
以3.3節(jié)中的TPCHQ6為例,測(cè)試Helius集群分別搭建在2、4、6、8臺(tái)worker的運(yùn)行時(shí)間,結(jié)果如表5所示。在當(dāng)前實(shí)驗(yàn)條件考慮的擴(kuò)展情況下,當(dāng)worker節(jié)點(diǎn)數(shù)增加1倍時(shí),Helius運(yùn)行任務(wù)所需的時(shí)間減少50%左右。

表5 Helius可擴(kuò)展性性能
本文介紹了一種輕量級(jí)的基于內(nèi)存計(jì)算的大數(shù)據(jù)運(yùn)算系統(tǒng)Helius。Helius由C/C++語(yǔ)言實(shí)現(xiàn),避免了Spark因JVM運(yùn)行環(huán)境引起的開銷,利用數(shù)據(jù)集整體修改這一特性實(shí)現(xiàn)高效計(jì)算,采用一種statelessworker的機(jī)制簡(jiǎn)化容錯(cuò)處理,并通過(guò)維持一套嚴(yán)格的修改機(jī)制確保了數(shù)據(jù)一致性。Helius在時(shí)間、網(wǎng)絡(luò)、內(nèi)存三方面性能相對(duì)Spark均有所提升。就數(shù)據(jù)集更新性能而言,Helius存在很大的提升空間。此外,目前Helius還未實(shí)現(xiàn)節(jié)點(diǎn)故障恢復(fù),故障處理以及深層次的一致性管理問(wèn)題有待后續(xù)深入研究。
)
[1]DEANJ,GHEMAWATS.MapReduce:simplifieddataprocessingonlargecluster[J].CommunicationoftheACM— 50thAnniversaryIssue: 1958-2008, 2008, 51(1): 107-113.
[2]ZAHARIAM.Anarchitectureforfastandgeneraldataprocessingonlargeclusters,UCB/EECS- 2014- 12 [R].Berkeley:UniversityofCaliforniaatBerkeley, 2014.
[3]ZAHARIAM,CHOWDHURYM,DAST,etal.Resilientdistributeddatasets:afault-tolerantabstractionforin-memoryclustercomputing[C]//NSDI’12:Proceedingsofthe9thUSENIXConferenceonNetworkedSystemsDesignandImplementation.Berkeley,CA:USENIXAssociation, 2012: 15-28.
[4]TheApacheSoftwareFoundation.ApacheSpark[EB/OL].[2016- 05- 30].http://spark.apache.org/.
[5]TheApacheSoftwareFoundation.ApacheHadoop[EB/OL].[2016- 05- 30].http://hadoop.apache.org/.
[6]SARIMBEKOVA,STADLERL,BULEJL,etal.WorkloadcharacterizationofJVMlanguages[J].Software:PracticeandExperience, 2016, 46(8): 1053-1089.
[7]ISARDM,BUDIUM,YUY,etal.Dryad:distributeddata-parallelprogramsforsequentialbuildingblocks[C]//EuroSys’07:Proceedingsofthe2ndACMSIGOPS/EuroSysEuropeanConferenceonComputerSystems2007.NewYork:ACM, 2007: 59-72.
[8]BERKHIUTJ.Google’sPageRankalgorithmforrankingnodesingeneralnetworks[C]//Proceedingsofthe2016 13thInternationalWorkshoponDiscreteEventSystems.Piscataway,NJ:IEEE, 2016: 163-172.
[9]PAGEL,BRINS,MOTWANIR,etal.ThePageRankcitationranking:bringingordertotheWeb,TechnicalReport1999- 66 [R/OL].California:StanfordUniversity, 1999 [2016- 04- 11].http://ilpubs.stanford.edu:8090/422/1/1999- 66.pdf.
[10]TransactionProcessingPerformanceCouncil.TPCBenchmarkTMHStandardSpecificationRevision2.17.1 [S/OL].[2016- 05- 30].http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf.
[11]MALEWIEZG,AUSTEMMH,BIKAJC,etal.Pregel:asystemforlarge-scalegraphprocessing[C]//SIGMOD’10:Proceedingsofthe2010ACMSIGMODInternationalConferenceonManagementofData.NewYork:ACM, 2010: 135-146.
[12]CARSTOIUD,LEPADATUE,GASPARM.Hbase-non-SQLdatabase,performancesevaluation[J].InternationalJournalofAdvancementsinComputingTechnology, 2010: 2(5): 42-52.
ThisworkispartiallysupportedbytheCASHundredTalentsProgram,theGeneralProjectoftheNationalNaturalScienceFoundationofChina(61572468),theInnovativeCommunityProjectoftheNationalNaturalScienceFoundationofChina(61521092).
DING Mengsu, born in 1993, M.S.candidate.Her research interests include big data processing, parallel distributed computing.
CHEN Shimin, born in 1973, Ph.D., professor.His research interests include data management system, big data processing, computer architecture.
Helius: a lightweight big data processing system
DING Mengsu, CHEN Shimin*
(KeyLaboratoryofComputerSystemandArchitecture(InstituteofComputingTechnology,ChineseAcademyofSciences),Beijing100190,China)
Concerning the limitations of Spark, including immutable datasets and significant costs of code execution, memory management and data serialization/deserialization caused by running environment of Java Virtual Machine (JVM), a light-weight big data processing system, named Helius, was implemented in C/C++.Helius supports the basic operations of Spark, while allowing the data set to be modified as a whole.In Helius, the C/C++ is utilized to optimize the memory management and network communication, and a stateless worker mechanism is utilized to simplify the fault tolerance and recovery process of the distributed computing platform.The experimental results showed that in 5 iterations, the running time in Helius was only 25.12% to 53.14% of that in Spark when running PageRank iterative jobs, and the running time in Helius was only 57.37% of that in Spark when processing TPCH Q6.On the basis of one iteration of PageRank, the IP incoming and outcoming data sizes of master node in Helius were about 40% and 15% of those in Sparks, and the total memory consumed in the worker node in Helius was only 25% of that in Spark.Compared with Spark, Helius has the advantages of saving memory, eliminating the need for serialization and deserialization, reducing network interaction and simplifying fault tolerance.
in-memory computation; big data processing; distributed computation; Directed Acyclic Graph (DAG) scheduling; fault tolerance and recovery
2016- 08- 12;
2016- 10- 22。
中國(guó)科學(xué)院“百人計(jì)劃”項(xiàng)目;國(guó)家自然科學(xué)基金面上項(xiàng)目(61572468);國(guó)家自然科學(xué)基金創(chuàng)新群體項(xiàng)目(61521092)。
丁夢(mèng)蘇(1993—),女,江西吉安人,碩士研究生,主要研究方向:大數(shù)據(jù)處理、并行分布式計(jì)算; 陳世敏(1973—),男,北京人,研究員,博士,主要研究方向:數(shù)據(jù)管理系統(tǒng)、大數(shù)據(jù)處理、計(jì)算機(jī)體系結(jié)構(gòu)。
1001- 9081(2017)02- 0305- 06
10.11772/j.issn.1001- 9081.2017.02.0305
TP311.133.1
A