閭程豪 荊一楠 何震瀛 王曉陽1,
1(復(fù)旦大學(xué)軟件學(xué)院 上海 201203)2(復(fù)旦大學(xué)計(jì)算機(jī)科學(xué)技術(shù)學(xué)院 上海 201203)3(上海市數(shù)據(jù)科學(xué)重點(diǎn)實(shí)驗(yàn)室(復(fù)旦大學(xué)) 上海 200433)
在分布式數(shù)據(jù)流處理中,一個(gè)典型的場景就是采用一個(gè)固定的數(shù)據(jù)分發(fā)方法將數(shù)據(jù)(根據(jù)其鍵值)發(fā)送到多個(gè)工作節(jié)點(diǎn)中。例如,新浪、Twitter等媒體門戶對當(dāng)下新聞中的熱點(diǎn)詞匯進(jìn)行實(shí)時(shí)統(tǒng)計(jì)[1],實(shí)現(xiàn)基于頻數(shù)統(tǒng)計(jì)的在線數(shù)據(jù)挖掘技術(shù)[2],或是基于分布式數(shù)據(jù)流進(jìn)行基于“group by”的實(shí)時(shí)查詢等。
在上述場景中,數(shù)據(jù)分發(fā)方法都是以最小化整體處理的延遲時(shí)間或者最大化整體處理的吞吐量為目標(biāo)的。為了實(shí)現(xiàn)這個(gè)目標(biāo),負(fù)載均衡和鍵值分離這兩個(gè)因素往往被考慮到。一方面,工作節(jié)點(diǎn)之間的負(fù)載越均衡,并行處理的效率就越高,整體處理的性能表現(xiàn)就越好。另一方面,為了使負(fù)載盡可能地均衡,一些分發(fā)方法將含有相同鍵值的數(shù)據(jù)分發(fā)到不同的節(jié)點(diǎn)上(例如round-robin數(shù)據(jù)分發(fā)方法)從而產(chǎn)生了鍵值分離。鍵值分離往往需要額外的歸并處理,因而產(chǎn)生額外開銷。
現(xiàn)有工作基于類似上述兩方面的考慮,針對不同特征的數(shù)據(jù)集提出了各種靜態(tài)數(shù)據(jù)分發(fā)方法[6-9]以優(yōu)化處理的整體性能。當(dāng)這些數(shù)據(jù)分發(fā)方法應(yīng)用于某些特定數(shù)據(jù)集時(shí),處理延遲時(shí)間或吞吐量得以最優(yōu)化。在處理分布特征不同的數(shù)據(jù)集或者數(shù)據(jù)特征不斷變化的數(shù)據(jù)集時(shí),這些靜態(tài)分發(fā)方法顯得力不從心。例如,我們對一周內(nèi)Wikimedia所有網(wǎng)站的訪問記錄進(jìn)行分析,發(fā)現(xiàn)每小時(shí)被訪問網(wǎng)頁的數(shù)量(標(biāo)記為K)與每小時(shí)內(nèi)最受歡迎網(wǎng)頁被訪問的次數(shù)占總次數(shù)的比例(標(biāo)記為p1)都會(huì)隨著時(shí)間發(fā)生較大變化,如圖1所示。這些值的大小對不同數(shù)據(jù)分發(fā)方法的性能表現(xiàn)有很大影響。此時(shí),任何一種只對某種數(shù)據(jù)特征進(jìn)行優(yōu)化的分發(fā)方法都無法使得查詢處理的延遲時(shí)間一直保持最小。

圖1 Wikmedia數(shù)據(jù)集上數(shù)據(jù)特征隨著時(shí)間的變化
面對上述挑戰(zhàn),本文策略為:(1) 在基于mini-batch的分布式流處理模型中,提出了一種自適應(yīng)數(shù)據(jù)分發(fā)策略APS,以應(yīng)對數(shù)據(jù)特征不斷變化的流數(shù)據(jù)處理任務(wù)。(2) 為數(shù)據(jù)分發(fā)方法提供了一種叫做整體分發(fā)評估HPM的估計(jì),HPM綜合考慮了每個(gè)mini-batch中的負(fù)載均衡和鍵值分離情況,并為APS的調(diào)整提供依據(jù)。(3) 采用處理的延遲時(shí)間作為整體性能的評判標(biāo)準(zhǔn),并通過在Spark Streaming[5]上的大量實(shí)驗(yàn)證明了 APS的優(yōu)越性與HPM的準(zhǔn)確性。
現(xiàn)有工作中的數(shù)據(jù)分發(fā)方法主要基于最大負(fù)載和鍵值分離兩方面的考慮。文獻(xiàn)[6-7]提出MPC模型,通過綜合分析一個(gè)目標(biāo)查詢所需數(shù)據(jù)交換的輪數(shù)和每輪數(shù)據(jù)交換中的最大負(fù)載這兩個(gè)因素找到最佳的數(shù)據(jù)分發(fā)方法。Nasir等[8-9]和Katsipoulakis等[10]通過權(quán)衡數(shù)據(jù)分發(fā)的不平衡程度與鍵值分離產(chǎn)生的額外處理和存儲(chǔ)開銷來確定最優(yōu)的數(shù)據(jù)分發(fā)方法。本節(jié)我們將著重介紹并比較當(dāng)下最流行和最先進(jìn)的5種分發(fā)方法。
(1) Hash分發(fā)方法 Hash分發(fā)方法(HASH)使用一個(gè)哈希函數(shù)為每個(gè)鍵值映射一個(gè)特定的“編號(hào)”,并將數(shù)據(jù)發(fā)送到“編號(hào)”對應(yīng)的工作節(jié)點(diǎn)。“編號(hào)”與工作節(jié)點(diǎn)一一對應(yīng)且相同鍵值總是對應(yīng)相同的“編號(hào)”。因此HASH不會(huì)產(chǎn)生鍵值分離,但其負(fù)載均衡受數(shù)據(jù)集傾斜程度的影響較大。
(2) Round-robin分發(fā)方法 Round-robin分發(fā)方法(RR)不考慮數(shù)據(jù)的鍵值,將數(shù)據(jù)逐條輪流發(fā)送至每一個(gè)工作節(jié)點(diǎn)。RR會(huì)產(chǎn)生大量的鍵值分離,但每個(gè)節(jié)點(diǎn)上的負(fù)載幾乎相同。
(3) Power of Two Choices分發(fā)方法 Power of Two Choices分發(fā)方法(PoTC)[12]在數(shù)據(jù)分發(fā)過程中,令每個(gè)負(fù)責(zé)數(shù)據(jù)分發(fā)的載入節(jié)點(diǎn)各自記錄已發(fā)送過的鍵值與其對應(yīng)送往的“編號(hào)”。各個(gè)載入節(jié)點(diǎn)互相獨(dú)立,并分別實(shí)時(shí)更新并記錄送往每個(gè)“編號(hào)”對應(yīng)節(jié)點(diǎn)的負(fù)載條數(shù)。對含有新鍵值的數(shù)據(jù),PoTC使用兩個(gè)獨(dú)立的哈希函數(shù)產(chǎn)生兩個(gè)“編號(hào)”,將數(shù)據(jù)發(fā)送到當(dāng)前負(fù)載較小的“編號(hào)”對應(yīng)的節(jié)點(diǎn)中,并記錄該“編號(hào)”與鍵值的對應(yīng)關(guān)系;對含有舊鍵值的數(shù)據(jù),PoTC通過該鍵值對應(yīng)的“編號(hào)”對其進(jìn)行分發(fā)。Katsipoulakis等[10]指出,當(dāng)載入節(jié)點(diǎn)只有一個(gè)時(shí),PoTC不僅能避免鍵值分離,還能改善負(fù)載均衡。然而,現(xiàn)實(shí)的分布式應(yīng)用中,數(shù)據(jù)的分發(fā)往往由多個(gè)獨(dú)立的載入節(jié)點(diǎn)共同完成,因此同一個(gè)鍵值在不同載入節(jié)點(diǎn)中對應(yīng)的“編號(hào)”不一定相同,鍵值分離隨之產(chǎn)生。
(4) Partial Key Grouping分發(fā)方法 Partial Key Grouping分發(fā)方法[8](PK)在數(shù)據(jù)分發(fā)過程中,令每個(gè)載入節(jié)點(diǎn)實(shí)時(shí)更新記錄送往不同“編號(hào)”對應(yīng)節(jié)點(diǎn)的負(fù)載條數(shù)。對每條剛到達(dá)的數(shù)據(jù),PK使用兩個(gè)獨(dú)立的哈希函數(shù)產(chǎn)生兩個(gè)“編號(hào)”,將數(shù)據(jù)發(fā)送到當(dāng)前負(fù)載較小的“編號(hào)”對應(yīng)的節(jié)點(diǎn)中。Nasir等[8]指出,當(dāng)數(shù)據(jù)傾斜程度與工作節(jié)點(diǎn)數(shù)量滿足一定條件時(shí),PK獲得較好的負(fù)載均衡且只產(chǎn)生少量鍵值分離。然而當(dāng)數(shù)據(jù)傾斜過大,PK的負(fù)載均衡較差。此外,PK缺乏一定的拓展性。
(5) D-Choices分發(fā)方法與W-Choices分發(fā)方法 D-Choices分發(fā)方法(DC)和W-Choices分發(fā)方法(WC)是PK的兩種更高級(jí)的拓展[9](APK)。分發(fā)過程中,每個(gè)載入節(jié)點(diǎn)與PK類似,各自記錄發(fā)往下游工作節(jié)點(diǎn)的負(fù)載情況。同時(shí),APK根據(jù)鍵值出現(xiàn)的概率將所有鍵值分為heavy hitter與light key兩類。對含有l(wèi)ight key的數(shù)據(jù),APK使用兩個(gè)獨(dú)立的哈希函數(shù)產(chǎn)生兩個(gè)“編號(hào)”,將數(shù)據(jù)發(fā)送到當(dāng)前負(fù)載較小的“編號(hào)”對應(yīng)的節(jié)點(diǎn)中;對含有heavy hitter的數(shù)據(jù),APK為其提供更多“編號(hào)”的選擇(DC使用d個(gè)獨(dú)立哈希函數(shù)產(chǎn)生d個(gè)編號(hào),WC則提供所有的編號(hào)),并將數(shù)據(jù)發(fā)送到當(dāng)前負(fù)載最小的“編號(hào)”對應(yīng)的節(jié)點(diǎn)中。根據(jù)Nasir等[9]的分析,DC和WC均能獲得最佳的負(fù)載均衡且表現(xiàn)接近,同時(shí)會(huì)產(chǎn)生一定的鍵值分離。當(dāng)heavy hitter個(gè)數(shù)為0時(shí),APK退化成了PK;當(dāng)每個(gè)鍵值都是heavy hitter時(shí)(例如只有1個(gè)鍵值的情況),APK退化成了RR。為了討論方便,本文選用WC來代表APK。
1) HASH不會(huì)產(chǎn)生鍵值分離,且負(fù)載均衡程度由數(shù)據(jù)分別的特征決定。當(dāng)數(shù)據(jù)分布較均勻時(shí),一個(gè)理想的HASH方法可以獲得最佳的處理表現(xiàn)。
2) 在常見的含有多個(gè)載入節(jié)點(diǎn)的應(yīng)用中,PoTC與PK都為所有的鍵值提供了兩個(gè)選擇,鍵值分離程度類似。而PK在分發(fā)過程中為更多數(shù)據(jù)提供了兩個(gè)選擇,因此PK比PoTC有更好的負(fù)載均衡和整體表現(xiàn)。
3) APK通過給heavy hitter更多的分發(fā)選擇,不僅提升了PK的可拓展性,更是以有限的鍵值分離增加為代價(jià),解決了數(shù)據(jù)傾斜程度過大時(shí)的負(fù)載均衡問題。
4) APK和RR都能獲得最佳的負(fù)載均衡,并分別對heavy hitter和所有鍵值進(jìn)行全局的分發(fā)。因此APK的鍵值分離程度更少,整體表現(xiàn)性能更佳。
5) APK歸納或更優(yōu)于RR、PoTC和PK。但由于APK會(huì)產(chǎn)生鍵值分離,因此只有當(dāng)數(shù)據(jù)分布不均勻時(shí),APK會(huì)更優(yōu)于HASH獲得最佳表現(xiàn)。
綜上所述,HASH和APK有機(jī)會(huì)在不同的數(shù)據(jù)特征中獲得最佳性能表現(xiàn)。因此,本文選用HASH和APK作為參考和比較。
基于mini-batch的分布式流處理系統(tǒng)是當(dāng)下最流行的分布式流處理系統(tǒng)之一。以Spark Streaming[5]和Java Flume[11]為例,它們被廣泛應(yīng)用于實(shí)時(shí)或準(zhǔn)實(shí)時(shí)的分布式流處理應(yīng)用中,具有良好的錯(cuò)誤恢復(fù)能力。
基于上述系統(tǒng)的數(shù)據(jù)分發(fā)模型如圖2所示。模型根據(jù)系統(tǒng)時(shí)間將數(shù)據(jù)流劃分為一系列微小批次(mini-batch),并對mini-batch進(jìn)行串行處理。圖2將第t個(gè)批次中的一次數(shù)據(jù)分發(fā)抽象成了一個(gè)有向無環(huán)圖。圖中結(jié)點(diǎn)“L”代表接收和分發(fā)數(shù)據(jù)的載入節(jié)點(diǎn),結(jié)點(diǎn)“M”代表接收并處理數(shù)據(jù)的map工作節(jié)點(diǎn),每條有向線段代表數(shù)據(jù)的分發(fā)方向。一次分發(fā)完成后,系統(tǒng)對各個(gè)map工作節(jié)點(diǎn)的工作狀態(tài)進(jìn)行同步,并根據(jù)處理任務(wù)的需要決定下一步的操作(繼續(xù)分發(fā)、數(shù)據(jù)歸并或結(jié)果輸出等)。當(dāng)一個(gè)mini-batch處理結(jié)束后,系統(tǒng)對所有節(jié)點(diǎn)的工作狀態(tài)進(jìn)行同步。在當(dāng)前批次的數(shù)據(jù)處理完成且下一個(gè)批次的數(shù)據(jù)也收集完成之后,系統(tǒng)開始對下一個(gè)批次的數(shù)據(jù)進(jìn)行處理。

圖2 基于mini-batch的流數(shù)據(jù)分發(fā)模型
為了衡量數(shù)據(jù)分發(fā)方法的性能表現(xiàn),本文提供了一種叫作整體分發(fā)評估(HPM)的估計(jì)。
首先給出單個(gè)mini-batch中最大負(fù)載和鍵值分散度的定義來量化分發(fā)方法在單個(gè)mini-batch中的負(fù)載均衡程度和鍵值分離程度。接著,結(jié)合最大負(fù)載與鍵值分散度,本文給出分發(fā)方法在單個(gè)mini-batch中的整體分發(fā)評估。
定義1對于第t個(gè)mini-batch的數(shù)據(jù)分發(fā),收到最多數(shù)據(jù)的工作節(jié)點(diǎn)所接收的數(shù)據(jù)條數(shù)為mini-batcht上的最大負(fù)載,記作L(t)。
定義2對于第t個(gè)mini-batch的數(shù)據(jù)分發(fā),分發(fā)后每個(gè)工作節(jié)點(diǎn)含有鍵值數(shù)量的和減去被分發(fā)數(shù)據(jù)的鍵值數(shù)量為mini-batcht上的鍵值分散度,記作D(t)。
定義3對于第t個(gè)mini-batch的數(shù)據(jù)分發(fā),其最大負(fù)載和鍵值分散度的線性組合為mini-batcht上的整體分發(fā)評估,記作HPM(t):
HPM(t)=L(t)+λ·D(t)
(1)
式中:λ為組合系數(shù),用于衡量鍵值分離程度占整體處理開銷的影響比例。例如,進(jìn)行“union”操作時(shí),鍵值分離并不會(huì)影響結(jié)果的輸出,因此λ=0;進(jìn)行“group by”操作時(shí),由于鍵值分離,工作節(jié)點(diǎn)含有的部分結(jié)果需要進(jìn)一步聚合歸并,產(chǎn)生額外的開銷,因此λ>0,且λ與單條數(shù)據(jù)在聚合歸并時(shí)的處理時(shí)間以及單條數(shù)據(jù)在map工作節(jié)點(diǎn)中的處理時(shí)間密切相關(guān)。此外,對分離的鍵值進(jìn)行聚合歸并處理的節(jié)點(diǎn)越少時(shí),對部分結(jié)果聚合歸并的操作就慢,λ就越大。
為了幫助理解,圖3以在mini-batcht中的數(shù)據(jù)分發(fā)為例,解釋了上述概念。圖3中不同顏色的方片代表含有不同鍵值的數(shù)據(jù),根據(jù)定義,mini-batcht的最大負(fù)載L(t)=8,鍵值分散度D(t)=2。當(dāng)進(jìn)行“union”操作時(shí),λ=0,HPM(t)為8;當(dāng)進(jìn)行組合系數(shù)λ=1的“group by”操作時(shí),HPM(t)=8+1×2=10。

圖3 在mini-batch t上的數(shù)據(jù)分發(fā)
根據(jù)基于mini-batch的流處理模型的特點(diǎn),本文給出了數(shù)據(jù)分發(fā)方法整體分發(fā)評估的定義。
定義4在基于mini-batch的流處理分發(fā)模型中,各個(gè)mini-batch上整體分發(fā)評估的累加即為一個(gè)數(shù)據(jù)分發(fā)方法的全局整體分發(fā)評估,記作HPM:
(2)
綜合上述定義,單個(gè)mini-batch中的整體分發(fā)評估值越小,分發(fā)方法在該mini-batch中的表現(xiàn)越好。因此,全局整體分發(fā)評估的值越小,就意味著一個(gè)數(shù)據(jù)分發(fā)方法能提供給處理任務(wù)更好的整體性能表現(xiàn)。
因此,本文的優(yōu)化目標(biāo)為找到一種數(shù)據(jù)分發(fā)方法,使其在對數(shù)據(jù)分布特征不斷變化的數(shù)據(jù)流的分發(fā)中,獲得最小的全局整體分發(fā)評估HPM,從而使分布式流處理任務(wù)獲得最小的延遲時(shí)間。
基于上述優(yōu)化目標(biāo),本文提出了一種叫做自適應(yīng)數(shù)據(jù)分發(fā)策略(APS)的數(shù)據(jù)分發(fā)方法。APS采用了一系列被廣泛使用的靜態(tài)數(shù)據(jù)分發(fā)方法作為候選,根據(jù)對每個(gè)mini-batch數(shù)據(jù)分布特征的預(yù)測及各個(gè)候選方法在該mini-batch上整體分發(fā)評估的估計(jì),調(diào)整在每個(gè)mini-batch上的數(shù)據(jù)分發(fā)方法。對每個(gè)mini-batch的處理中,APS按以下4個(gè)步驟進(jìn)行:
1) 在開始某個(gè)mini-batcht的處理時(shí),每個(gè)載入節(jié)點(diǎn)分別獲得對mini-batcht數(shù)據(jù)分布特征的預(yù)測。
2) 每個(gè)載入節(jié)點(diǎn)分別遍歷所有的候選分發(fā)方法,根據(jù)mini-batcht的預(yù)測數(shù)據(jù)特征,選取HPM(t)估計(jì)值最小的數(shù)據(jù)分發(fā)方法。
3) 每個(gè)載入節(jié)點(diǎn)根據(jù)所選的數(shù)據(jù)分發(fā)方法,將當(dāng)前mini-batcht內(nèi)的數(shù)據(jù)分發(fā)到map工作節(jié)點(diǎn)進(jìn)行處理。
4) 若出現(xiàn)鍵值分離,map工作節(jié)點(diǎn)根據(jù)處理任務(wù)的需要,決定對數(shù)據(jù)的下一步的操作(例如聚合歸并)。
在步驟2)中,每個(gè)載入節(jié)點(diǎn)分別獲得相同的數(shù)據(jù)分布特征的預(yù)測并且采用相同的HPM估計(jì)方法,因此會(huì)調(diào)整至同一種數(shù)據(jù)分發(fā)方法。

本文通過上一個(gè)mini-batch的數(shù)據(jù)特征分布情況來預(yù)測當(dāng)前mini-batch的數(shù)據(jù)特征。由于文獻(xiàn)[13-14]中許多數(shù)據(jù)流特征估計(jì)方法的存在,本文并未對數(shù)據(jù)特征的預(yù)測方法展開深入討論。
在APS的調(diào)整中,數(shù)據(jù)分發(fā)方法在一個(gè)mini-batch上對HPM的估計(jì)非常關(guān)鍵。根據(jù)第1節(jié)中對現(xiàn)有分發(fā)方法的介紹和比較,本文選用HASH和APK組成APS的候選分發(fā)方法集合。本節(jié)將分別介紹這兩種分發(fā)方法在mini-batch上對HPM的估計(jì)。
假設(shè)系統(tǒng)共有m個(gè)接收并處理來自載入節(jié)點(diǎn)數(shù)據(jù)的map工作節(jié)點(diǎn)。mini-batcht含有M條待分發(fā)的數(shù)據(jù)和K個(gè)不同的鍵值,其中出現(xiàn)次數(shù)最多的鍵值出現(xiàn)的概率為p1, heavy hitter的個(gè)數(shù)為h。
3.2.1 HASH在mini-batch上對HPM的估計(jì)
HASH不會(huì)產(chǎn)生鍵值分離,因此D(t)=0。至于最大負(fù)載的估計(jì),HASH滿足帶權(quán)重單選擇的“balls-into-bins”模型[7]。其中,每個(gè)出現(xiàn)頻率不同的鍵值對應(yīng)模型中的不同重量的“ball”,m個(gè)map工作節(jié)點(diǎn)則分別對應(yīng)模型中m個(gè)“bin”。根據(jù)模型,mini-batcht中HASH的最大負(fù)載L(t)滿足:
(3)
式中:g(δ)=(1+δ)·ln(1+δ)-δ。

(4)

(5)
3.2.2 APK在mini-batch上對HPM的估計(jì)
APK可以保證L(t)最優(yōu),即負(fù)載完全平均:
APK給了每個(gè)heavy hitter最多m個(gè)選擇,每個(gè)light key最多2個(gè)選擇。因此,一次分發(fā)后,每個(gè)heavy hitter最多可以提供(m-1)個(gè)額外的鍵值,每個(gè)light key最多可以提供1個(gè)額外的鍵值。所以mini-batch t中APK的鍵值分散度D(t)滿足:
D(t)≤(m-1)·h+1·(K-h)=K+(m-2)·h
(6)
APK在mini-batcht中對HPM的估計(jì)滿足:
(7)

實(shí)驗(yàn)在含有10臺(tái)機(jī)器的集群中進(jìn)行。每臺(tái)機(jī)器分別有2個(gè)12核2.1 GHz Intel Xeon處理器,64 GB內(nèi)存,運(yùn)行64位Ubuntu Server 14.04操作系統(tǒng)。集群上運(yùn)行Apache Spark 2.0.0與Apache Kafka 0.10.1.0。通過實(shí)驗(yàn),我們將驗(yàn)證APS的優(yōu)越性與HPM的準(zhǔn)確性。
(1) 基于mini-batch的分布式流處理系統(tǒng) 實(shí)驗(yàn)選用Spark系統(tǒng)[4]中的Spark Streaming模塊作為基于mini-batch的分布式流處理系統(tǒng)。Spark系統(tǒng)按照standalone的方式部署在10臺(tái)機(jī)器上,其中,1臺(tái)為master、9臺(tái)為worker。每個(gè)worker維護(hù)一個(gè)含有24個(gè)核的executor。因此,本實(shí)驗(yàn)最多可以同時(shí)使用216個(gè)工作節(jié)點(diǎn)。當(dāng)鍵值分離現(xiàn)象出現(xiàn)時(shí),根據(jù)任務(wù)的需要,系統(tǒng)決定是否將含有分離鍵值的數(shù)據(jù)聚合歸并到reduce節(jié)點(diǎn)中進(jìn)行下一步操作。實(shí)驗(yàn)選用3個(gè)載入節(jié)點(diǎn)、15個(gè)map工作節(jié)點(diǎn)和1個(gè)reduce節(jié)點(diǎn)。
(2) 模擬數(shù)據(jù)源 實(shí)驗(yàn)中,我們搭建了一個(gè)含有3臺(tái)機(jī)器的Kafka集群[3],并使用1個(gè)topic中的3個(gè)partition來部署數(shù)據(jù)集以模擬3個(gè)數(shù)據(jù)流。每個(gè)partition獨(dú)立地存儲(chǔ)數(shù)據(jù),且被設(shè)置為從offset的最小值開始讀取,并與3個(gè)載入節(jié)點(diǎn)一一對應(yīng)。因此,相同數(shù)據(jù)源在使用不同分發(fā)方法時(shí),分發(fā)數(shù)據(jù)的內(nèi)容和順序可以保持一致。
(3) 真實(shí)數(shù)據(jù)集 真實(shí)數(shù)據(jù)集WIKI是來自Wikimedia的開源數(shù)據(jù)。內(nèi)容是自 2016年1月1日至2016年1月7日的每個(gè)小時(shí)內(nèi)對所有Wikimedia網(wǎng)站的訪問記錄。我們將原數(shù)據(jù)集做一定的解析之后,得到含有168個(gè)小時(shí)級(jí)時(shí)間戳的4 490 000 000條記錄。每條記錄包括時(shí)間戳與其訪問的網(wǎng)址信息,并將網(wǎng)址信息視作鍵值。其數(shù)據(jù)分布的特征變化如圖1所示。實(shí)驗(yàn)中,我們對數(shù)據(jù)做了3%的均勻抽樣以模擬更快的數(shù)據(jù)特征的變化。
(4) 模擬數(shù)據(jù)集 模擬數(shù)據(jù)集ZF1、ZF2均服從ZipF分布。分別通過改變ZipF分布函數(shù)中的鍵值數(shù)量K和特征指數(shù)函數(shù)z,我們生成了數(shù)據(jù)集ZF1和ZF2。圖4展示了各個(gè)模擬數(shù)據(jù)集上數(shù)據(jù)分布的特征變化。固定ZF1中K=3 000,ZF2中z=0.8,通過控制系統(tǒng)讀入數(shù)據(jù)流的速度,分別保證系統(tǒng)在處理ZF1和ZF2時(shí),每個(gè)mini-batch上的特征分布能呈現(xiàn)圖中變化。

圖4 模擬數(shù)據(jù)集的數(shù)據(jù)特征偏移

實(shí)驗(yàn)1將WIKI數(shù)據(jù)流基于mini-batch進(jìn)行鍵值聚合(類似對鍵值做詞頻統(tǒng)計(jì)),并分別記錄前60個(gè)mini-batch中系統(tǒng)分別使用HASH、APK和APS進(jìn)行數(shù)據(jù)分發(fā)時(shí)的性能表現(xiàn)。每個(gè)mini-batch長度為40 s,并含有732 000條輸入數(shù)據(jù)。為了更加清晰地展現(xiàn)數(shù)據(jù)分發(fā)方法對整體性能表現(xiàn)的影響,實(shí)驗(yàn)設(shè)置map工作節(jié)點(diǎn)上每條數(shù)據(jù)處理時(shí)間為0.1 ms,reduce節(jié)點(diǎn)上每條數(shù)據(jù)處理時(shí)間為0.3 ms,以模擬較為復(fù)雜的聚合任務(wù)。根據(jù)處理任務(wù)的類型(任務(wù)處理在各個(gè)節(jié)點(diǎn)上的延遲時(shí)間),組合系數(shù)λ設(shè)為3。
圖5展示了不同分發(fā)方法在每個(gè)mini-batch中的處理延遲時(shí)間變化。APS通過自適應(yīng)地調(diào)整選擇每個(gè)mini-batch中的分發(fā)方法,將處理延遲時(shí)間盡可能地保持在最佳水平。與HASH和APK相比,APS分別最多能將處理延遲時(shí)間降低26.66%和26.67%。同時(shí),注意到,由于本文對數(shù)據(jù)分布的預(yù)測存在誤差,APS在對第50個(gè)和第59個(gè)mini-batch上的調(diào)整存在一定的延遲。

圖5 不同分發(fā)方法在每個(gè)mini-batch上的處理延遲時(shí)間
圖6展示了不同分發(fā)方法在每個(gè)mini-batch中的整體分發(fā)評估變化。每個(gè)mini-batch中整體分發(fā)評估的變化與圖5非常相似,因此每個(gè)mini-batch上的延遲處理時(shí)間與整體分發(fā)評估具有很強(qiáng)的相關(guān)性。本文對HPM估計(jì)方法的準(zhǔn)確性得以驗(yàn)證。

圖6 不同分發(fā)方法在每個(gè)mini-batch上的整體分發(fā)評估
4.3.1 不同數(shù)據(jù)特征偏移對APS性能提升影響
實(shí)驗(yàn)2分別將ZF1和ZF2數(shù)據(jù)流基于mini-batch進(jìn)行鍵值聚合,并選用每個(gè)mini-batch上的整體分發(fā)評估值作為性能指標(biāo)。通過計(jì)算使用APS獲得的HPM較使用HASH或APK獲得的HPM所降低的百分比,我們得到APS較HASH和APK的處理性能提升比率。實(shí)驗(yàn)中,每個(gè)mini-batch長度為10 s,含有45 000條數(shù)據(jù)(被3個(gè)載入節(jié)點(diǎn)平均接收),數(shù)據(jù)特征變化如圖7所示。此外,實(shí)驗(yàn)通過設(shè)置map工作節(jié)點(diǎn)和reduce節(jié)點(diǎn)上每條數(shù)據(jù)的處理時(shí)間,將組合系數(shù)λ設(shè)為1。
圖7展示了APS在ZF1和ZF2中相比HASH和APK獲得的性能提升比率。其中,“vs HASH avg”、“vs HASH max”和“vs APK avg”、“vs APK max”分別代表實(shí)驗(yàn)過程中APS較HASH性能提升比率的平均值、最大值以及較APK性能提升比率的平均值、最大值。

圖7 在ZF1和ZF2上使用APS獲得的性能提升
實(shí)驗(yàn)表明,在對擁有不同數(shù)據(jù)特征偏移的流數(shù)據(jù)集進(jìn)行分布式處理時(shí),相比于候選集中的靜態(tài)數(shù)據(jù)分發(fā)方法,APS能讓整體處理性能獲得巨大的提升。
4.3.2 不同任務(wù)類型對APS性能提升的影響
實(shí)驗(yàn)3使用不同的組合系數(shù)λ來表示不同的任務(wù)類型,并使用不同的λ值對ZF1數(shù)據(jù)集進(jìn)行類似實(shí)驗(yàn)2的多次模擬。同時(shí),實(shí)驗(yàn)仍然選用每個(gè)mini-batch上的整體分發(fā)評估值作為性能指標(biāo),計(jì)算APS較HASH和APK的處理性能提升比率。實(shí)驗(yàn)假設(shè),處理任務(wù)中鍵值分離產(chǎn)生的性能開銷與λ的值成正比。
圖8展示了APS在不同組合系數(shù)λ中的表現(xiàn)結(jié)果。λ的值越大,APS較HASH的提升比率越低,較APK的提升比率越高;λ的值越小,APS較HASH的提升比率越高,較APK的提升比率越低。

圖8 在不同任務(wù)類型中使用APS獲得的性能提升
實(shí)驗(yàn)表明,當(dāng)處理任務(wù)中鍵值分離的開銷很高時(shí),APS相比產(chǎn)生鍵值分離的APK有巨大的性能提升,故傾向于調(diào)整為HASH。當(dāng)處理任務(wù)中鍵值分離的開銷很低時(shí),APS相比負(fù)載偏移較多的HASH有巨大的性能提升,故傾向于調(diào)整為負(fù)載更加均衡的APK。
本文提出了一種叫做自適應(yīng)數(shù)據(jù)分發(fā)策略(APS)的分發(fā)方法,為基于mini-batch的分布式流處理任務(wù)提供更好的性能表現(xiàn)。同時(shí),本文還為數(shù)據(jù)分發(fā)方法的表現(xiàn)性能提供了一種叫作整體分發(fā)評估的估計(jì)方法。
通過真實(shí)數(shù)據(jù)集上的實(shí)驗(yàn)分析,本文驗(yàn)證了APS相比現(xiàn)有被廣泛使用的靜態(tài)分發(fā)方法的優(yōu)越性和整體分發(fā)評估的準(zhǔn)確性。通過模擬數(shù)據(jù)集上的實(shí)驗(yàn)分析,本文進(jìn)一步分析了APS在不同實(shí)驗(yàn)設(shè)定下的表現(xiàn)能力。