張笑燕,劉志浩,杜曉峰,陸天波
(北京郵電大學計算機學院(國家示范性軟件學院),北京 100876)
隨著信息社會飛速發展,數據產生的速度越來越快。現實中存在一類常見的業務,即將大量源源不斷到達的流數據S先與存儲在磁盤上的關系表R進行連接,對S進行半流連接關聯更新操作(如去重、修正等)后再將其寫入數據倉庫[1]。該過程被定義為S∞CR[2]。其中,C代表某種半流連接操作。
由于源數據系統的不同,邏輯上相同的元組可能具有不同的值,在數據進入數據倉庫時,需要通過關聯表進行統一,保證數據的一致性。
為了更具體地理解這種半流連接關聯更新操作,圖1 展示了數據倉庫中涉及該操作的一個例子。由于數據來源的不同或數據的延遲到達,系統中同一個id 可能具有不同的名稱,在數據進入數據倉庫之前,需要使用一張關系表將id 轉換為系統內部的id,對不同的名稱進行統一。

圖1 數據倉庫關聯更新示意
在真實的數據倉庫系統中,R保存在磁盤上,一般占用空間較大,無法全部放入內存;S包含不斷到達的流數據,其中的每個元組都需要和R進行連接操作[3-5]。這便產生了一個問題:S中的元組以流數據的形式不斷快速到達,而R中的元組需要通過磁盤I/O 以相對較低的速率讀取,造成更新時需要消耗大量時間等待磁盤I/O,導致數據處理的延遲。
事實上,為了解決流數據連接操作中的各種問題,與此有關的研究從未中斷。Polyzotis 等[6]提出了MESHJOIN 算法,通過對關系表的分組讀取和對流數據的分頁讀取,把對磁盤的I/O 時間分攤到了若干個流數據元組的讀取中,平衡了關系表的讀取速率和流數據的到達速率的差異。但是MESHJOIN 算法并沒有考慮流數據S的特征和關系表R的組織,因此在處理傾斜數據時表現較差[7]。Vaidehi 等[8]提出了分布式嵌套循環連接處理(DNLJP,distributed nested loop join processing)算法,對流數據的來源進行分組,根據其所定義的不同查詢成本、查詢時間等對連接查詢操作進行優化,降低了分布式集群中各節點之間通信的成本,提高了對海量數據連接查詢的效率。DNLJP 是可以用于分布式集群的算法,而 MESHJOIN 和CACHEJOIN 算法只適于單機環境,無法直接應用于分布式環境[9-10]。Naeem 等[11]提出 的CACHEJOIN 算法考慮了數據傾斜的問題,通過引入一個緩存,將關系表中較常用的部分存儲在緩存中,其中的元組有較高的概率和流數據匹配成功,減少了磁盤的I/O 次數。Jeon 等[12]提出了DS-join方案,通過使用幾種流處理引擎的微批處理模式,控制數據分區,減少了各節點之間的網絡通信頻率,同時引入緩存模塊對連接操作并行處理,減少磁盤I/O 次數,并且擁有自動調整緩存區大小的優化設置。為了保證分布式流連接操作中處理結果具有完整性和一致性,Yuan 等[13]測試了數據倉庫接收到的流數據出現錯誤時連接操作結果的質量優劣,提出了基于有序傳播模型的、應用于分布式流連接系統的Eunomia 方法,保證了所有連接器的元組到達順序的一致性,能夠消除數據傳播過程中的某些錯誤,提高了分布式流連接處理的擴展性和吞吐率。狄程等[14]設計并實現了對多源異構流數據的處理系統,在一定程度上消除了數據的結構不同對數據引入、連接處理造成的復雜問題,使流處理服務化、模塊化,減少了流數據處理程序的開發成本。
因此,本文在已有研究的基礎上進行改進和優化,提出了應用于分布式環境的D-CACHEJOIN 算法。D-CACHEJOIN 算法的關鍵在于采用一致性哈希函數[15]的策略,將R分區存儲在不同節點,并在后續的匹配過程中使用這一計算結果,達到快速將R與S進行匹配的目的。同時,本文使用服從Zipfian 分布的模擬數據[16-17],定量計算算法執行的成本開銷,以優化算法的執行效率[18]。實驗表明,在擁有一定的可用內存時,D-CACHEJOIN 算法在處理流數據時具有良好的實時處理性能,同時具有易于擴展的特性。
與CACHEJOIN 算法類似,D-CACHEJOIN 算法擁有2 個階段——流檢測階段和磁盤檢測階段。流檢測階段使用流數據S作為輸入,磁盤檢測階段使用關系表R作為輸入,由于內存大小的限制,兩階段每次都分別只處理S和R的一部分。
D-CACHEJOIN 算法執行架構如圖2 所示[11],關系表R和流數據S是外部輸入,HS用于分次存儲S中的元組,實際中占用較大的空間;HR是緩存模塊,用于存儲R中匹配頻繁的元組[19]。

圖2 D-CACHEJOIN 算法執行架構
下面對本文算法的執行架構進行說明。算法在流檢測階段,讀取流緩沖區的數據元組并與HR中存儲的高頻訪問元組進行連接匹配,如果在HR中找到所需的元組,則算法生成該元組作為輸出,只有在無法匹配時,才會將該元組存儲到HS中并將其指針加入隊列,以便進行后續的匹配操作。當HS滿或S中已無數據元組時,流檢測階段結束,磁盤檢測階段開始。在磁盤檢測階段,算法將R中的部分元組讀取到內存,得以分攤消耗資源較多的磁盤I/O 操作。讀取完成后,算法會將其與HS中的元組進行匹配,匹配成功后,算法生成連接后的元組作為輸出。與此同時,匹配次數超過閾值的元組會被緩存到HR中用于流檢測。經歷數次迭代后,HS中最先進入的元組已經完成了和R中所有元組的匹配,算法會將其刪除,此時HS中有了新的空閑位置,可以繼續存儲S中待匹配的元組,因此算法接下來再次切換到流檢測階段。
D-CACHEJOIN 算法的偽代碼實現如算法1所示[11],其中w為流檢測階段讀取S的元組數,b為磁盤檢測階段讀取R的元組數。
算法1D-CACHEJOIN

20) 刪除HS中匹配完成的元組及其隊列中對應的指針
21) end while
為了完整地處理流數據,算法的外層是無限循環,主要包括2 個部分,流檢測階段和磁盤檢測階段,二者交替進行。
在步驟2)~步驟9)的流檢測階段,算法在流緩沖區讀取w個元組,對w中的每個元組t,算法首先檢測檢測t是否在HR中,如果是則將其連接并輸出結果;否則將t加入HS中,同時在隊列中加入t的指針。
在步驟10)~步驟20)的流檢測階段,算法在磁盤緩沖區讀取b個元組,對b中的每個元組r,算法首先檢測r是否在HS中,如果是則將其連接并輸出結果;否則將忽略該元組。由于HS需要存儲流中的元組,因此HS是一個多映射,r可能會成功匹配到HS中的多個元組,記該成功匹配的次數為f;如果某個r對應的f大于閾值threshold Value,就將r加入HR中,如果HR已滿,則先隨機刪除HR中的一個元組,然后再將r加入HR。在每一次的迭代中,如果緩存HR未滿,說明閾值設置過高,此時可以自動降低閾值;如果過于頻繁地從HR中刪除元組,說明閾值設置過低,此時可以自動提高閾值。
1.1 節說明了本文算法在單節點環境中的一般過程,本節將算法應用到分布式環境中。為了敘述簡潔,本節以擁有2 個節點的數據倉庫為例。
數據倉庫接收從數據源經網絡傳輸的流數據時,數據會隨機地分配到各節點上。數據向各節點的隨機分配是為了實現集群整體上各節點抽取-轉換-加載(ETL,extract-transform-load)的負載均衡,在實際處理中,可以在隨機選取第一個節點后,按照某個固定的順序依次選取各個節點作為數據的接收方,消除因選取節點而產生的額外的CPU 計算開銷。節點在接收到數據元組后,直接通過一致性哈希函數計算,如果元組屬于該節點,就將其保留,否則將元組轉發到對應的節點,同時將計算得到的哈希值一并發送,避免后續產生重復的計算。D-CACHEJOIN 算法的整體架構流程如圖 3 所示。對于 MESHJOIN 算法和CACHEJOIN 算法,由于其相當于只在圖3 中的“連接”環節發揮作用,而數據元組在分布式環境中的接收、轉發等都在此之前就已經完成,因此可以相對簡單地一并擴展到分布式環境之中。
在基于本文算法的處理流程中,流數據S、關系表R和數據倉庫集群中各節點均采用同一個哈希函數進行映射,有利于集群的擴展。同時對于節點接收到的不屬于自身、需要轉發的元組,通過計算得到的哈希值將會隨元組一同轉發,避免了重復的計算。
從圖3 可以看出,數據倉庫將流數據隨機地依次分配給集群中的任一節點,從宏觀上看,集群中的所有節點在某一時間段內同時讀取數據并進行計算,保留屬于自身的數據、轉發屬于其他節點的數據。這樣的設計沒有屬于中心地位的節點,發揮了分布式集群并行處理數據的優勢。數據不需要首先經過中心節點的處理,而是直接在各節點之間進行流轉,避免了由于存在中心節點導致系統出現性能瓶頸的問題。

圖3 D-CACHEJOIN 算法的整體架構流程
對于數據倉庫的ETL 過程而言,主要關注的性能指標包括系統吞吐率、內存消耗等。本文研究中使用的分布式系統中,各個節點的作用主要是參與計算和數據存儲等,大量運算如備份、保證數據一致性等只在內部進行,對內提供支持,對外只提供將多節點集群環境虛擬為單節點環境的接口,集群可以視為一個單節點環境。引入分布式系統直接導致算法增加的開銷是一致性哈希函數計算的開銷,為了實現負載均衡,令各節點隨機接收流數據,然后相互轉發數據。文獻[6]中已經給出MESHJOIN 算法的成本模型,本文參考該模型,建立D-CACHEJOIN 算法的性能評估模型,模型參數如表1 所示。

表1 算法性能評估模型參數
根據表1 中的參數,下面逐個計算每個過程的開銷,最終得到D-CACHEJOIN 算法的整體吞吐率μ(單位為元組/秒)。
1) 數據在網絡中傳輸的開銷。假設分布式系統由n個節點構成,單位時間內需轉發的節點數為,因此網絡中數據總的傳輸開銷為。
3) 讀取關系表R中k個分頁的開銷為CI/O(k)。
4) 移除w個匹配完成的元組及其指針的開銷為wCE。
5) 從流緩沖區中讀取w個元組的開銷為wCS。
6) 將w個元組加入HS的開銷為wCA。
7) 將HS中元組與R中元組匹配的開銷為。
8) 輸出結果的開銷。結果輸出時的開銷和連接的成功率有關,成功率越高,需要輸出的結果元組就越多,因此,最終輸出結果的開銷為。

由此可得最終D-CACHEJOIN 算法的整體吞吐率為

用nSP表示流探測階段中每次迭代處理的元組數量,nDP表示磁盤探測階段中每次迭代處理的元組數量,則算法的吞吐率還可以表示為

式(1)和式(2)能否精確表示流數據和關系表進行連接操作的開銷的前提是算法必須只需要有限的內存就可以持續運行,同時流數據S中新到達的元組進入系統時,需要有剩余的緩沖區內存對其進行保存。更加詳細的關于關系表R的分頁數和流緩沖區中元組個數的理論論證可參考文獻[6]。
由式(2)可知,算法的吞吐率與R的設置情況、S的設置情況、流和磁盤緩沖區等因素有關,本節通過定量計算來描述算法的成本。
1)nSP的模型計算
在流檢測階段中,S加載到流緩沖區后立即與HR中的元組進行匹配,且優先與HR中較多次與R中元組成功匹配的元組進行,如果匹配成功,則直接生成結果輸出;如果匹配失敗,則該元組在后續的磁盤檢測階段嘗試進行匹配。由此,nSP與HR、R中元組大小、R中元組個數、流緩沖區大小有關,其中流緩沖區大小在后續實驗中得知極小(始終在0.1 MB 以下),因此忽略不計。
根據表1,nR為R中元組個數,為HR中元組個數。現實中在構建R和S時,由于其數據分布并不均勻,需要考慮其扭曲分布系數Zipf,如果Zipf 值為0,那么連接屬性值完全均勻分布,隨著Zipf 值的增大,數據的不均勻分布程度就越大。假設流數據S有S~Zipfian(1,n),其概率密度函數為,累積分布函數為,其中,Hx,1同理[20]。通過歸一化處理,流檢測階段的匹配概率p1可表示為


其中,x和y是未知數。
當nhR變為原來的2 倍時,式(4)變為

將式(5)代入式(4),得φ1=2x,即x=lbφ1。
當nR變為原來的2 倍時,式(4)變為

將式(6)代入式(4),得ψ1=2y,即y=lbψ1。
因此,式(4)變為

若在n次迭代中算法處理的流元組總量為Sn,則有

2)nDP的模型計算
在流檢測階段,算法已經通過HR連接匹配了S中的部分元組,在磁盤檢測階段中,將使用常規方式進行關系表R和流數據S中元組的連接。由于R較大,算法分段對R進行讀取,每次讀取磁盤緩沖區大小nRB的數據段,由于S~Zipfian(1,n),因此可以逐段計算匹配概率后相加,表示為

記R中元組的分段數為N,由匹配概率之和經歸一化后可得平均匹配概率p2為

類似于nSP的計算方式,p2可以用nRB、nR和nhR表示。現考察nRB、nhR和nR分別變為2 倍時p2的變化情況。假設此時p2分別增大一個常數因子θ2、增大一個常數因子φ2和減小一個常數因子ψ2,且有

和nSP中處理方法相同,此時可得x=lbφ2,y=lbψ2,z=lbθ2。
因此,式(11)可以變為

若哈希表中存儲的流元組總量為,則有

確定nSP和nDP后,即可通過式(2)對算法進行調整。
實驗環境為4 臺IBM POWER x-236 8841 服務器,Intel Xeon Gold 6248R CPU,64 GB 內存,Linux 64 位操作系統,具體配置情況如表2 所示。

表2 實驗環境
環境部署的規模主要從數據訪問峰值、吞吐量要求和響應時間等綜合考量。基本思路是讀寫分離。通過解析查詢語句,對于僅包括讀操作的配置一個連接字符串到讀服務器,對于寫操作則配置另一個連接字符串到寫服務器,將大規模數據的訪問分流到多臺服務器上,使應用中讀取數據的速度和并發量顯著提高,增加了系統響應速度,減少了服務器的壓力,能夠有效增加系統的穩定性和擴展性。在讀寫分離的基礎上再進行負載均衡,具體過程是各節點通過專用網絡進行連接,對每個服務器進行監測,獲取資源占用情況,整個集群可以視為一臺具有超高性能的獨立服務器。
實驗數據來自某運營商內部的原始話單數據,以此構建流數據S和關系表R,各包含約2 000 萬條數據。
由于本文算法具有緩存模塊和自動控制緩存中元組換入換出的閾值的功能,因此需要一個熱身階段。由于在實際場景中,程序開始后會持續地運行,因此熱身階段是可以接受的,后續實驗中如果沒有特殊說明,該系統熱身階段都會忽略。
本節設計實驗測試了本文算法的性能,實驗的具體軟硬件條件已在表2 中列出,在此環境上搭建了具有8 個虛擬節點的分布式集群。通過將本文算法與傳統的MESHJOIN 算法、CACHEJOIN 算法和較先進的Eunomia 分布式連接方法進行比較,對算法的執行效率進行實驗考察,各實驗運行5 次后取平均值作為實驗結果。
實驗1不同數據量對算法效率的影響
首先令集群開啟4 個節點,處理不同數量級的數據,吞吐率對比如圖4 所示,D-CACHEJOIN都取得了比其他3 種算法更好的吞吐率,這一方面是因為實驗數據并不是均勻分布,而是服從Zipfian 分布的,其中一部分數據具有比其他數據更多的出現次數,沒有對此進行額外考慮的算法無法適應該環境;另一方面是因為D-CACHEJOIN具有緩存模塊,能夠較好地處理數據傾斜的情況。同時,D-CACHEJOIN 的吞吐率在10 萬、100 萬和1 000 萬條數據的情況下分別是Eunomia的1.30 倍、1.48 倍和1.65 倍,算法的優勢隨著數據量的增多而逐漸增大,這是因為數據量增多時,可能對磁盤的 I/O 次數逐漸增大,D-CACHEJOIN 越來越多地減少了實際上對于磁盤的I/O 次數。

圖4 不同數據量下算法的吞吐率
現實中大多數的大型數據集是服從Zipfian 分布的[21],包括本文實驗的數據集。相對于均勻分布的數據集,對服從Zipfian 分布的數據集進行實驗有更大的實際意義,但為了實驗的完整性,給出其他實驗條件相同時對均勻數據的實驗結果,如圖5 所示。

圖5 不同數據量下算法的吞吐率(均勻分布)
實驗2可用內存總量對算法效率的影響
本文實驗中可用內存的總量為磁盤緩沖區、流緩沖區、隊列、HR和HS的大小之和,但本文實驗中有兩點不變:流緩沖區基本不變而且可以忽略,因為在所有情況下流緩沖區使用的內存不超過0.1 MB;H R保持不變,因為HR的大小會在很大程度上影響本文算法的效率,所以在后續實驗中專門對HR的大小對算法效率的影響進行研究,故在本文實驗中HR保持為R的1%,一旦HR滿,如果再次有元組的匹配頻率超過threshold Value,將可能會對HR中已有元組進行替換并可能引起threshold Value 的動態調整。可用內存總量對算法效率的影響如圖6 所示。

圖6 可用內存總量對算法效率的影響
從圖6 可以看出,幾種流連接算法的吞吐率都會隨著可用內存總量的增加而提高,由于引入了緩存模塊,CACHEJOIN 和D-CACHEJOIN 的吞吐率都要比MESHJOIN 高2 倍以上,而Eunomia 由于需要基于時間戳對流數據進行檢查,因此需要較多的額外內存才能表現出較高的效率,內存較少時執行效率不足。另有其他實驗顯示,當提供大量內存時,Eunomia 的吞吐率已經超過了CACHEJOIN,但仍低于D-CACHEJOIN。考慮到實驗的完整性,補充了內存較少時的情況,由于D-CACHEJOIN 需要存儲哈希函數的計算結果以便后續使用,因此當可用內存較少時,D-CACHEJOIN 的吞吐率要略低于CACHEJOIN。由于本文研究的支撐系統具有充足的內存可供使用,同時鑒于現代分布式系統中如果有計算需要,通常都會配備充足的內存,因此一般不會出現內存嚴重不足的情況。當可用內存增大后,D-CACHEJOIN 的吞吐率逐漸超過CACHEJOIN,且隨著內存的增加,前者吞吐率的增加速度要高于后者;當內存達到100 MB 時,D-CACHEJOIN 的吞吐率相比于CACHEJOIN 增加了10%以上。
實驗3R的緩存比例對算法效率的影響
由于算法引入了緩存模塊,因此可對R中頻繁匹配的元組進行緩存,能夠大大提升算法的效率,本文實驗固定可用內存大小,研究R的緩存比例對算法效率的影響,實驗結果如圖7 所示。

圖7 R 的緩存比例對算法效率的影響
根據圖7 可以看出,當R的緩存比例達到10%時,D-CACHEJOIN 算法的吞吐率是MESHJOIN 算法的10 倍以上。當緩存比例較低時,D-CACHEJOIN算法的吞吐率仍然是MESHJOIN 算法的2 倍以上,由于沒有引入緩存策略,MESHJOIN 和Eunomia的吞吐率保持不變。這說明引入緩存模塊可以大大提升算法的性能。
實驗4D-CACHEJOIN 算法在分布式集群中的擴展性
為了測試算法在分布式集群中的擴展情況,將集群中的節點數n由4 增加到8,并且改變算法處理的數據量,考察此時的執行效率,實驗結果如圖8 所示。

圖8 算法吞吐率隨節點擴展的變化情況
總體而言,各算法的吞吐率隨著節點數的增多而增大,且D-CACHEJOIN 保持著最佳的執行效率。圖8 中從左至右,D-CACHEJOIN 的吞吐率依次是Eunomia 的1.29 倍、1.15 倍、1.48 倍、1.36 倍、1.65 倍、1.50 倍,這表示隨著集群中節點數的增加,D-CACHEJOIN 相對于Eunomia 在執行效率上的優勢變小了(對其他2 種算法也有這一結論)。這是因為隨著節點數的增多,每個節點上關系表R的片段就越小,用于磁盤 I/O 的時間就越短,而D-CACHEJOIN 恰恰減少了磁盤I/O 的次數,因此這一優勢在節點數增加時變小了。但與對內存的訪問相比,磁盤I/O 是非常消耗時間的操作,因此即使集群進一步擴容,只要磁盤上的關系表無法全部放入內存中,D-CACHEJOIN 算法仍然會有更優的執行效率。
綜上,D-CACHEJOIN 算法總體上保持了最優的執行效率。當ETL 系統中數據的數量級發生比較大的變化時,由于對數據庫的訪問時間,即對磁盤的I/O 所需時間會隨之增大,算法的吞吐率會隨之下降,但本文算法的優勢也會逐漸增大,因為本文算法減少了對磁盤的I/O 次數。為了擁有更高的效率,算法往往會進行更多消耗內存的額外工作,在現代處理海量數據的分布式系統中,通常都會配備足夠的內存,同時本文算法擁有一定的自由度,可以根據實際情況設置緩存的大小,更好地貼合具體場景,本文算法一般能夠很好地適應環境并良好運行。當分布式集群中新增或下線節點時,本文算法保持了更優的執行效率;當處理海量數據時,本文算法在分布式集群中具有良好的拓展性。
本文介紹了流數據接收過程中面臨的主要問題,并簡單介紹了已有的流連接算法的處理策略,在此基礎上對算法進行改進,提出了一種把使用緩存的流連接策略應用于分布式環境的D-CACHEJOIN算法。此外,本文詳細描述了該算法的執行架構并計算其性能開銷,通過實驗展示了該算法的執行效率,說明了該算法具有較好的適應性,在一般的大數據處理系統中能夠良好運行,并且在分布式集群中具有良好的拓展性。