劉思宇+梁毅+陳誠(chéng)
摘要:Spark是一種新型分布式海量數(shù)據(jù)處理平臺(tái),在應(yīng)用執(zhí)行過(guò)程中,Spark以任務(wù)作為最小執(zhí)行單元。因此,任務(wù)執(zhí)行時(shí)間預(yù)測(cè)是指導(dǎo)Spark進(jìn)行性能分析、優(yōu)化資源調(diào)度以及故障監(jiān)控的基礎(chǔ)。在Spark平臺(tái)中,由于計(jì)算數(shù)據(jù)分布不均及網(wǎng)絡(luò)資源的共享,導(dǎo)致同樣計(jì)算邏輯的任務(wù)在不同計(jì)算節(jié)點(diǎn)上執(zhí)行的時(shí)間可能產(chǎn)生很大差異,需根據(jù)實(shí)時(shí)運(yùn)行環(huán)境進(jìn)行動(dòng)態(tài)預(yù)測(cè)。通過(guò)結(jié)合任務(wù)在不同節(jié)點(diǎn)所需數(shù)據(jù)量以及集群網(wǎng)絡(luò)狀況,對(duì)任務(wù)在不同節(jié)點(diǎn)的執(zhí)行時(shí)間進(jìn)行預(yù)測(cè)。實(shí)驗(yàn)表明,該方法對(duì)任務(wù)進(jìn)行預(yù)估,誤差可保證在19%以內(nèi),任務(wù)執(zhí)行時(shí)間預(yù)估算法對(duì)Spark調(diào)優(yōu)有一定的指導(dǎo)作用。
關(guān)鍵詞:大數(shù)據(jù);Spark;預(yù)測(cè);分布式;任務(wù)
DOIDOI:10.11907/rjdk.171509
中圖分類(lèi)號(hào):TP306
文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1672-7800(2017)012-0019-03
Abstract:Spark is a new distributed big data processing platform. In the implementation of Spark, task is the minimum execution unit. Therefore, the prediction of the execution time of the task can guide Spark to perform performance analysis, optimize resource scheduling and fault monitoring. In Spark platform, due to the uneven distribution of computing data and the sharing of network resources, the task with same computing logic may have different execution time in different nodes, and it needs to be dynamically predicted according to the real time environment. Currently on the Spark platform, the prediction technology is rarely studied. This paper predicts the execution time of each task at different nodes by combing the amount of data required by different tasks in each node and the status of cluster network. Experiment show that the method can be used to estimate the tasks in the task set, the error can guarantee less than 19%. Therefore, the task execution time estimation algorithm proposed in this paper has some guiding effect on Spark tuning.
Key Words:big data; Spark; prediction; distributed; task
0 引言
大數(shù)據(jù)時(shí)代,新型海量數(shù)據(jù)處理平臺(tái)大量涌現(xiàn)。其中,以Spark數(shù)據(jù)處理平臺(tái)為典型代表的分布式內(nèi)存計(jì)算平臺(tái)得到廣泛關(guān)注 [1]。Spark是繼Hadoop之后提出的一種基于內(nèi)存的分布式大數(shù)據(jù)處理平臺(tái),被譽(yù)為可以取代Map/Reduce的下一代大數(shù)據(jù)處理核心技術(shù)[2-3]。與Hadoop Map/Reduce相比,Spark基于內(nèi)存的運(yùn)算可提升100倍處理速度[4]。
任務(wù)是Spark的最小執(zhí)行單元。由于不同任務(wù)所需數(shù)據(jù)可能存在于集群各節(jié)點(diǎn)上,且數(shù)據(jù)量不盡相同,導(dǎo)致同樣計(jì)算邏輯任務(wù)在不同的計(jì)算節(jié)點(diǎn)上執(zhí)行的時(shí)間產(chǎn)生很大差異,需要根據(jù)應(yīng)用運(yùn)行的實(shí)時(shí)環(huán)境進(jìn)行動(dòng)態(tài)預(yù)測(cè)[5]。對(duì)任務(wù)執(zhí)行時(shí)間的有效預(yù)測(cè)可以指導(dǎo)Spark進(jìn)行性能分析、優(yōu)化資源調(diào)度以及監(jiān)控平臺(tái)故障。目前,在Spark平臺(tái)還沒(méi)有任務(wù)執(zhí)行時(shí)間預(yù)估技術(shù)。
本文分析了Spark平臺(tái)任務(wù)的拉取、執(zhí)行過(guò)程,結(jié)合各任務(wù)在不同節(jié)點(diǎn)所需的數(shù)據(jù)量以及集群網(wǎng)絡(luò)狀況,對(duì)各任務(wù)執(zhí)行時(shí)間進(jìn)行預(yù)測(cè),為優(yōu)化資源調(diào)度、應(yīng)用性能分析奠定基礎(chǔ)。
1 相關(guān)技術(shù)
1.1 Spark系統(tǒng)模型
Spark 是由 UC Berkeley AMP 實(shí)驗(yàn)室開(kāi)發(fā)的開(kāi)源通用的海量數(shù)據(jù)處理平臺(tái),是對(duì)Map/Reduce型海量數(shù)據(jù)處理平臺(tái)的創(chuàng)新與豐富,其構(gòu)架如圖1所示。
Spark平臺(tái)采用Master/Slave結(jié)構(gòu),其中集群管理器作為Master端,負(fù)責(zé)平臺(tái)中的應(yīng)用與資源管理;計(jì)算節(jié)點(diǎn)作為Slave端,負(fù)責(zé)啟動(dòng)任務(wù)執(zhí)行器Executor,由Executor負(fù)責(zé)任務(wù)的實(shí)際計(jì)算。在執(zhí)行應(yīng)用時(shí),會(huì)在相應(yīng)節(jié)點(diǎn)運(yùn)行Driver進(jìn)程,負(fù)責(zé)整個(gè)應(yīng)用的執(zhí)行和管理。
Spark大數(shù)據(jù)處理平臺(tái)的計(jì)算模型使用有向無(wú)環(huán)圖(Directed Acyclic Graph, DAG),描述復(fù)雜數(shù)據(jù)處理邏輯并提供更豐富的數(shù)據(jù)操作原語(yǔ)。Spark引入新的分布式數(shù)據(jù)集抽象表達(dá)模型——彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDDs)。RDD作為Spark平臺(tái)的核心概念,用于描述分布存儲(chǔ)于多個(gè)節(jié)點(diǎn)的海量數(shù)據(jù)集。
1.2 Spark任務(wù)執(zhí)行模型
Spark計(jì)算模型中,依據(jù)數(shù)據(jù)操作類(lèi)型,作業(yè)分為多個(gè)階段(Stage),各階段以一定的拓?fù)浣Y(jié)構(gòu)執(zhí)行。在單個(gè)階段內(nèi)部,為了操作海量數(shù)據(jù)集,Spark會(huì)并行執(zhí)行一組完全相同的任務(wù)來(lái)處理RDD的每一分片。任務(wù)執(zhí)行完成后,輸出結(jié)果會(huì)進(jìn)行分區(qū)處理,具有相同分區(qū)值的元組會(huì)傳給后繼階段中同一個(gè)任務(wù)進(jìn)行相關(guān)計(jì)算。不同Stage之間的數(shù)據(jù)傳輸過(guò)程被稱(chēng)為混洗(Shuffle)。同一階段任務(wù)啟動(dòng)時(shí),會(huì)首先選擇合適的節(jié)點(diǎn),并拉取其它節(jié)點(diǎn)上屬于該任務(wù)的數(shù)據(jù)進(jìn)行計(jì)算。
執(zhí)行任務(wù)時(shí),Spark首先并行拉取遠(yuǎn)程數(shù)據(jù),而后對(duì)拉取來(lái)的數(shù)據(jù)進(jìn)行處理,Spark的任務(wù)執(zhí)行模型如圖2所示。
2 任務(wù)執(zhí)行時(shí)間預(yù)測(cè)方法
本節(jié)介紹任務(wù)執(zhí)行時(shí)間預(yù)測(cè)技術(shù),主要是不同節(jié)點(diǎn)上各任務(wù)數(shù)據(jù)量獲取及各任務(wù)執(zhí)行時(shí)間計(jì)算。
2.1 各節(jié)點(diǎn)數(shù)據(jù)量獲取
當(dāng)新的Stage開(kāi)始時(shí),各任務(wù)需要根據(jù)上一階段不同任務(wù)寫(xiě)入Spark既有模塊MapStatus中的文件位置信息,尋找合適的啟動(dòng)位置,本方案擬定義新的方法對(duì)Map任務(wù)輸出到各節(jié)點(diǎn)的數(shù)據(jù)量進(jìn)行統(tǒng)計(jì),核心算法如下:
2.3 系統(tǒng)實(shí)現(xiàn)
系統(tǒng)基于Spark 1.6平臺(tái)實(shí)現(xiàn),主要針對(duì)Spark平臺(tái)中既有的各模塊進(jìn)行改造,并新增了數(shù)據(jù)采集模塊和任務(wù)執(zhí)行代價(jià)預(yù)測(cè)模塊,圖3為系統(tǒng)架構(gòu)。
在Executor端,新增了用以獲取Executor間網(wǎng)絡(luò)通信狀況以及Executor輸出數(shù)據(jù)信息的ExecutorMonitor。
在Driver端,新增了PartitionSizeGetter組件以及TaskCostEstimator組件。其中PartitionSizeGetter模塊負(fù)責(zé)在任務(wù)啟動(dòng)之前訪問(wèn)Spark既有的組件MapOutputTracker,統(tǒng)計(jì)出該任務(wù)在不同節(jié)點(diǎn)所需拉取的數(shù)據(jù)量,為任務(wù)執(zhí)行代價(jià)估計(jì)模塊做準(zhǔn)備。TaskCostEstimator通過(guò)PartitionSizeGetter模塊獲取不同任務(wù)在不同節(jié)點(diǎn)上所需的數(shù)據(jù)量,以及在Driver上的網(wǎng)絡(luò)通信狀況,通過(guò)計(jì)算模型進(jìn)行綜合,對(duì)任務(wù)在不同位置的執(zhí)行時(shí)間進(jìn)行預(yù)估。
3 性能評(píng)估
3.1 實(shí)驗(yàn)環(huán)境及負(fù)載選擇
本系統(tǒng)基于Apache Spark 1.6實(shí)現(xiàn),所用操作系統(tǒng)為CentOS6.2,實(shí)驗(yàn)測(cè)試環(huán)境由 4臺(tái)物理節(jié)點(diǎn)構(gòu)成,每臺(tái)節(jié)點(diǎn)的硬件配置、網(wǎng)絡(luò)狀況、操作系統(tǒng)以及JVM版本等如表1所示。在本測(cè)試環(huán)境中,1臺(tái)節(jié)點(diǎn)作為提交節(jié)點(diǎn),其余3臺(tái)作為數(shù)據(jù)計(jì)算節(jié)點(diǎn)。
任務(wù)執(zhí)行時(shí)間通過(guò)Spark提供的UI界面監(jiān)控獲得,選擇BigDataBench中的標(biāo)準(zhǔn)負(fù)載WordCount進(jìn)行實(shí)驗(yàn)。
3.2 預(yù)測(cè)結(jié)果分析
監(jiān)測(cè)WordCount負(fù)載中執(zhí)行Count操作的各任務(wù)所花費(fèi)的時(shí)間,圖4為任務(wù)執(zhí)行時(shí)間預(yù)測(cè)值與實(shí)際值的對(duì)比情況。
使用本文方法對(duì)各任務(wù)的執(zhí)行時(shí)間進(jìn)行預(yù)測(cè),所得到的預(yù)測(cè)曲線與實(shí)際曲線基本吻合,任務(wù)預(yù)測(cè)時(shí)間與實(shí)際執(zhí)行時(shí)間的平均誤差為13%,最大誤差不超過(guò)19%。
4 結(jié)語(yǔ)
本文面向Spark海量數(shù)據(jù)處理平臺(tái),設(shè)計(jì)并實(shí)現(xiàn)了任務(wù)執(zhí)行時(shí)間的預(yù)測(cè)模型,該模型充分考慮了任務(wù)的數(shù)據(jù)拉取代價(jià)與數(shù)據(jù)處理代價(jià),對(duì)任務(wù)執(zhí)行時(shí)間進(jìn)行了綜合預(yù)測(cè)。實(shí)驗(yàn)表明,本文預(yù)測(cè)方法可有效預(yù)測(cè)任務(wù)執(zhí)行時(shí)間,最大誤差不超過(guò)19%。
參考文獻(xiàn):
[1] AVAILABLE.Apache spark [EB/OL]. https://spark.apache.org.
[2] AVAILABLE.Apache hadoop [EB/OL]. http://hadoop.apache.org/.
[3] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large clusters[EB/OL]. http://blog.csdn.net/cnlht/article/details/6181799.
[4] KRISH K R, ANWAR A, BUTT A R. HatS: a heterogeneity-aware tiered storage for hadoop[C].Ieee/acm International Symposium on Cluster, Cloud and Grid Computing. IEEE,2014:502-511.
[5] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets[C].A fault-tolerant abstraction for in-memory cluster computing,2014.
(責(zé)任編輯:杜能鋼)