呂鶴軒,黃 山,艾力卡木·再比布拉,吳思衡,段曉東
(1.大連民族大學計算機科學與工程學院,遼寧 大連 116600;2.大數據應用技術國家民委重點實驗室,遼寧 大連 116600; 3.大連市民族文化數字技術重點實驗室,遼寧 大連 116600)
近年來隨著信息技術的發展,各類門戶網站﹑搜索引擎和社交媒體軟件等產生的數據不斷膨脹,由互聯網數據中心IDC(Internet Data Center)發布的白皮書《數字化世界——從邊緣到核心》預測:到2025年,全球的數據量將增至175 ZB[1]。
大數據中有著非常豐富的數據信息,并且這些信息蘊含著很高的分析和使用價值,這也就提高了對數據挖掘實時性和準確性的需求。隨著大數據時代的到來,傳統的單機處理框架無法滿足在巨大數據量的情況下的計算需求,分布式大數據處理框架應運而生。首先由谷歌提出的MapReduce[2]編程模型,到第1代大數據框架Hadoop[3]的誕生,到由內存計算的批處理框架Spark[4],再到現在具有高吞吐、低延遲等特性的流處理框架Flink[5]。大數據計算框架隨著大數據計算需求的提高不斷地發展。
衡量大數據的數據挖掘性能有2個最重要的任務指標:一是實時性,如海量的數據規模需要實時分析并迅速反饋結果;二是準確性,需要從海量的數據中精準地提取出隱含在其中的用戶需要的有價值的信息。數據流的事件時間是數據流中事件實際發生的時間,他以附加在數據流中事件的時間戳為依據。事件時間將處理速度和結果徹底解耦,無論數據流的處理速度如何、事件時間到達算子的順序怎么樣,基于事件時間的窗口都會生成同樣的結果。
數據挖掘過程中通常需要窗口根據事件時間進行聚合計算,而數據從產生到流入Kafka[6]消息隊列再經過分布式處理框架數據源流入分布式處理框架進行計算,往往因網絡傳輸速度不同、分布式節點計算性能不同等原因,導致數據流入算子的先后順序和數據事件時間存在著局部的亂序或者數據延遲現象。為解決此問題,Flink提出了水位線機制。水位線是一個全局事件進度指標,通過設置最大允許亂序時間表示系統確信不會再有延遲事件到來的某個時間。窗口由事件的事件時間戳觸發開啟,窗口關閉和觸發計算則是通過水位線機制來主導。但是,水位線最大允亂序時間,也就是容錯值的大小,使計算必須在實時性和準確性之間進行取舍。水位線設置過小會導致大部分數據因為遲到無法參與窗口計算,大大降低計算的準確性;水位線設置過大時,雖然保證了大部分數據都能參與計算,但是為等待太多嚴重遲到的無價值數據會導致窗口計算觸發延遲,過長的等待時間大大降低了計算的實時性。綜上所述,在基于事件時間窗口的分布式計算中如何用更低的時延產生更少的遲到數據,使作業的窗口計算同時兼顧作業的準確性和實時性,是一個急需解決的問題。
在不確定彈性數據流亂序程度的情況下,傳統的水位線設置效率低下,無法保證計算的實時性和準確性。針對該問題,在不確定亂序程度的彈性流數據情況下,本文提出基于事件時間窗口的水位線動態調整策略。該策略在不確定亂序程度的彈性流數據情況下,可以在保證大部分有價值數據計算準確性的同時,有效提高計算的實時性。
本文的主要工作包括3個方面:
(1)提出了一種基于事件時間窗口的流數據微簇模型。該模型將事件時間的流數據按到達算子的順序分為微簇,此微簇的事件時間亂序程度代表當前時刻局部數據流的事件時間亂序程度。
(2)提出了基于局部事件時間亂序度的水位線動態調整策略。該策略根據局部數據的事件時間戳亂序程度動態調整水位線大小,即允許最大亂序時間的大小。
(3)在Apache Flink框架上的對水位線動態調整策略進行實驗。實驗結果表明,相比傳統水位線機制,水位線動態調整策略可以有效提高窗口計算的性能比。
在現實世界中,基于事件時間的流處理系統因為網絡帶寬不同、節點性能不同等原因存在不確定彈性亂序問題,現有的水位線機制無法在保證計算的準確性的同時保證計算的實時性。圖1所示為數據流在數據流時間模型的生命周期。

Figure 1 Life cycle of stream data 圖1 流數據生命周期
Shukla 等人[7]針對數據流不同的操作時間類型和統計的窗口技術進行討論。Li等人[8]針對流數據亂序問題提出了一種新的核心流代數操作的物理實現策略,包括基于堆棧的數據結構和相關的清除算法,采用物理的形式一定程度解決了數據流亂序問題,實驗結果表明,物理策略可以有效解決流數據亂序問題。Bhatt等人[9]設計了一個可以處理流數據延遲的流處理模型,并提供了一個端到端的低延遲系統,通過實驗表明,當窗口大小等于數據到達率時,系統延遲可以有效減少。Affetti 等人[10]針對流處理的實時性以及窗口和時間的概念提出了Dataflow Model模型。Akidau等人[11]提出了針對時間語義進行詳細檢測的數據流模型,解決了流數據無界和亂序的問題。Bhatt等人[12]通過分析水印和觸發方法來解決無界數據中的亂序和其他非常規問題。Bhatt等人[13]提出了在大數據處理中使用適當的管道和水印來處理延遲和吞吐量的方法。高自娟等人[14]提出了一種基于變尺度滑動窗口的流數據聚類算法。該算法采用動態變化的滑動窗口來采集流數據,利用帶有平均時間戳與平均權值的混合指數直方圖來支持數據處理,從而能更好地捕獲動態變化的流數據。徐江等人[15]結合基于分區與基于時間2種滑動窗口思想,構建單位時間周期下融合子流處理結果的滑動窗口模型,采取并行實時運算模式實現了實時流數據處理。
針對處理不確定彈性流數據亂序問題,雖然前人已經做了相應的研究并且獲得了一定的效果,但針對不確定亂序程度或彈性亂序流數據情況下的計算優化研究仍有很大空間。
Flink是第3代流處理引擎,它支持精確的流處理,同時能滿足各種規模下對高吞吐和低延遲的要求。Flink對底層的操作進行了封裝,從而為用戶提供了流處理Datastream API和批處理DataSet API 2個接口。用戶使用這些接口就可以完成基本的流數據處理任務和批數據處理任務。
3.2.1 Flink的時間語義
時間語義是Flink的四大基石之一。在Flink的流處理中會涉及到不同的時間概念,根據用戶不同事件類型需求,Flink的時間主要分為事件時間、進入時間和處理時間3類。事件時間是指事件創建的時間,它通常由事件所攜帶的時間戳表示,例如事件的生成時間。進入時間是指數據流入Flink處理框架的時間。處理時間是指數據流入Flink框架后本節點對本條數據進行計算操作的本地時間。
以上3種時間語義中,因為事件時間將計算速度和計算結果內容徹底解耦,無論數據流的處理速度如何、事件時間到達算子的順序怎么樣,基于事件時間的窗口都會生成同樣的結果,而且事件時間相對其他2種時間更具有研究價值,所以大多數流處理任務都會選擇使用數據的事件時間語義。
3.2.2 Flink的窗口分類
對于流處理系統而言,流入的消息不存在上限,處理的流數據可能是一個持續到達且無窮的事件流。所以,對于聚合操作和連接操作而言,流處理操作需要對流入的消息進行分段處理,然后基于分段后的每一段消息進行聚合計算或者連接等操作。此時的分段和分端口計算操作即為窗口。窗口是流式處理計算中一類十分常見的操作。Flink窗口操作主要分為滾動窗口、滑動窗口、會話窗口和全局窗口4類。滾動窗口將無限的數據流按固定大小拆分成不同的窗口,不同的窗口之間的事件數據沒有交叉重疊。滑動窗口有2個參數,分別是窗口大小和滑動大小,不同窗口之間可以有事件數據交叉重疊。會話窗口用一個固定的時間間隔閾值來劃分不同的窗口。全局窗口把所有相同鍵值放入同一個窗口,全局窗口沒有起止的時間,需要自定義觸發計算,否則窗口永遠不會進行聚合計算。
3.2.3 基于事件時間的水位線
在使用事件時間處理流數據的時候會遇到數據亂序的問題,流處理從事件產生、流經Source、再到操作算子需要一定的時間。理論情況下,傳輸到操作算子的數據都是按照事件時間產生時的時間順序而來的,但是分布式環境下會因為網絡延遲、計算節點性能不同和數據背壓等原因而導致亂序的產生,特別是使用分布式消息隊列系統時,因此,大部分情況下多個分區之間的數據無法保證有序進入窗口計算。圖2為數據流在系統中的理想順序和真實順序。但是,在進行窗口計算的時候,不能無限期地等下去,必須要有一個機制來保證在特定的時間后觸發窗口計算。

Figure 2 Diagram of flow data out of order圖2 流數據亂序情況圖
水位線是一個進度指標,本質上也是一種時間戳,根據水位線的插入類型分為有序水位線、無序水位線和多并行水位線。水位線表示系統確信不會再有延遲事件到來的某個時間點。水位線的工作原理是向數據流中插入當前最大事件時間戳減去最大等待時間的時間戳,此時間戳即為水位線,計算窗口通過處理后的水位線來決定是否觸發窗口計算。等待時間也稱為容錯值,即水位線的大小。例如,當一個窗口算子接收到T的水位線,就可以認為不會再有任何時間戳小于T的事件到來了。本質上,水位線提供了一個邏輯時鐘,用來告知窗口當前的事件時間,以及代替原來的數據事件時間觸發窗口關閉計算,但是窗口的開啟還是由數據的事件時間來觸發。圖3為水位線工作原理圖,系統按照規則向流數據中插入代表水位線的時間戳,當水位線大小設為0 s時,水位線時間戳會緊挨著插入流數據,當水位線大小設為2 s時,水位線時間戳會在數據到達后等待2 s然后插入流數據中。

Figure 3 Principle of watermark圖3 水位線工作原理
水位線可用于平衡延遲和結果的完整性。較小的水位線保證了低延遲,但隨之而來的是低準確性。該情況下,會有大部分有價值數據因為遲到無法參與窗口計算。反之,如果水位線過大,雖然可信度得以保證,但可能會無謂地增加處理延遲。當前的水位線雖然維持了低延遲和高準確率的平衡,但是系統在面對未知亂序程度的數據流或者彈性亂序的數據流時無法做出彈性應對。水位線只允許在結果的準確性和延遲之間做出取舍,現有的方法并不能二者兼得。
本節先對傳統水位線機制進行簡單分析,然后對基于事件時間窗口的流數據處理建立模型,之后介紹基于事件時間的局部亂序度算法的設計與實現,最后介紹針對事件時間窗口水位線動態調整策略的設計與實現。
傳統的水位線常為根據流數據亂序情況和計算需求而設置的靜態值,但現實中多數流數據無法估計其亂序程度,而且大部分流數據內部亂序情況也不是穩定不變,而是會有局部彈性亂序的現象出現。
如圖4所示。白色空心圓表示遲到程度較小的數據,灰色實心圓表示遲到程度中等的數據,黑色實心圓表示遲到嚴重的數據。橫坐標為事件時間,即數據產生時間;縱坐標為到達窗口計算算子時間,理想水位線表示數據產生即實時到達,小水位線表示窗口關閉觸發計算等待較短時間,大水位線表示窗口關閉觸發計算等待較長時間。靜態的水位線設置過小會導致大部分有價值的遲到不嚴重的數據無法參與窗口計算,從而降低窗口計算的準確性。水位線設置過大,雖然保證了大部分遲到嚴重的數據也能參與窗口計算,但在流數據亂序情況穩定或者不亂序的情況下延長了窗口計算的等待時間,從而降低了窗口計算的實時性。綜上所述,需要一個可以根據數據流局部亂序情況動態調整水位線大小的策略來應對彈性亂序流數據,以保證水位線在流數據亂序嚴重的情況下升高使大部分有價值的數據也可以參與計算,水位線在流數據亂序穩定的情況下降低,使窗口計算觸發不必等待過長的時間。

Figure 4 Contrast diagram of watermark 圖4 水位線對比圖
本節針對彈性亂序流數據情況下基于事件時間的窗口計算建立數據流微簇模型。首先對本文所用到的變量進行定義。
ti:第i個到達數據所攜帶的事件時間戳。
N:當前時間點所到達數據中的最大事件時間戳,如式(1)所示:
N=max(ti)
(1)
M:設置水位線時的最大等待時間,即水位線的動態范圍。
Tk:當數據流入計算數據流局部亂序度算子時,Tk表示當前數據流微簇,如式(2)所示:
Tk={ti+1,ti+2,…,ti+k}
(2)
其中,k為微簇大小。
P:當前時間微簇亂序程度,由微簇攜帶信息求得。
Wi:插入到數據流中攜帶時間戳的水位線。當前時間插入到數據流中的水位線Wi的計算如式(3)所示:
Wi=N-M*P
(3)
該模型針對事件時間窗口計算的水位線而動態調整,主要提出微簇機制來表示當前時刻流數據的亂序狀態,微簇Tk中按數據流入算子順序存儲并實時更新數據流到達當前算子的最新k個時間戳,按照進入算子的先后順序丟棄舊的時間戳,使微簇內部始終保持k個最新時間戳。亂序數據多為數據背壓情況下各節點間網絡傳輸速度不同、各節點性能和計算速度不同所導致的,因此不能確定的彈性流數據具有亂序程度局部聚集特性[16]。即嚴重遲到的數據幾乎會聚集到同一時間段,同理亂序情況較弱的流數據也呈局部聚集狀態,因此微簇中部分最新數據的亂序情況可以有效地表示當前時刻流數據的粗略亂序情況。使用該模型可以比較方便地求解。
以建立最優水位線調整模型為基礎,可以設計出基于事件時間的局部亂序度算法,具體如算法1所示。
算法1基于事件時間的局部亂序度算法
輸入:當前最新微簇Tk,微簇Tk攜帶當前最新k個已到達數據所攜帶時間戳信息和到達先后順序信息。
輸出:0~1的亂序度P。
Step1根據數據流微簇模型,算子內部創建狀態存放長度為k的微簇Tk、當前亂序度P及當前數據流最大事件時間戳,微簇按到達算子先后順序存儲當前最新k個到達算子的數據事件時間戳。
Step2當有新的數據到達當前算子時更新微簇Tk內部事件時間戳和算子最大事件時間戳N,使微簇始終保持按到達先后順序存儲最新事件時間戳,如圖5所示為微簇狀態更新圖。

Figure 5 Updating of microcluster state 圖5 微簇狀態更新圖
Step3當微簇更新狀態時,計算亂序度P,亂序度計算方式如式(4)所示:
(4)
其中,P表示當前微簇的亂序程度,其值在0~1,1表示微簇完全正序,0表示微簇完全逆序。
Step4更新亂序度P,即代表當前數據流局部亂序程度。
該算法根據流數據微簇模型中微簇的狀態實時計算當前數據流的亂序度P。亂序度P由數據進入微簇的先后順序和數據所攜帶的時間戳決定,亂序度越大表示當前微簇亂序程度越低,亂序度越小表示當前微簇亂序程度越高。
在并行集群中,流數據從數據源流入系統將根據編程邏輯結構被分配到不同節點的不同算子上,但因為計算資源不同和網絡傳輸速度不同等原因會導致數據事件時間局部亂序或延遲等情況發生。為保證計算任務高效率完成,需要更準確的策略來調整水位線容錯值。本文提出一種基于事件時間局部亂序度算法的水位線動態調整策略,其流程圖如圖6所示。

Figure 6 Flowchart of dynamic watermark adjustment strategy圖6 水位線動態調整策略流程
具體步驟為:
(1)用戶提交作業后,計算引擎會根據用戶定義的數據源將數據流讀取到分布式環境之中,并將數據按照編程的邏輯傳遞到各個節點上。
(2)流數據從數據源流入系統以后,系統內部會根據處理數據的內部邏輯對流數據進行分區、計算和處理。
(3)為不同并行度上的算子分配算子狀態。該方法基于數據流微簇模型將當前算子流中的數據分為微簇,微簇時間戳數據存儲在當前算子狀態中。
(4)亂序度計算。當有新的數據流入算子時,根據微簇中的時間戳和基于事件時間的局部亂序度算法計算當前時刻的亂序度。
(5)當有新的數據流流入算子時,算子內部首先根據數據事件時間更新微簇內時間戳狀態,然后根據微簇計算當前時間的局部亂序度。
(6)更新水位線最大允許亂序時間值。當計算出新的局部亂序度以后,根據局部亂序度大小調整水位線最大允許亂序時間值。調整方式如式(5)所示:
Wi=N-M*P
(5)
(7)更新當前最大水位線Wi,確保插入數據流中的水位線呈正序增加狀態。
(8)當得到新的水位線以后,按照程序中設定的水位線周期將水位線插入數據流中并傳遞給下一個算子。
(9)當事件時間窗口收到大于窗口關閉的水位線值后觸發窗口關閉并執行程序。
如圖7所示,當水位線的允許亂序時間設置過大,會出現窗口無效等待的情況,從而降低了系統的計算速度;當水位線的允許亂序時間設置過小會使窗口丟失數據影響窗口計算的準確性。而本策略可以使最大允許等待時間動態變化,在保證系統計算準確性的同時降低計算延遲。

Figure 7 Window function圖7 窗口函數
本文提出的水位線動態調整策略在Apache Flink 1.12中進行了實現,本節介紹實驗環境和實驗形式并根據實驗結果進行分析,以證明該策略的有效性。
5.1.1 實驗環境
實驗環境是由4臺機器搭建的Flink集群,集群具體參數配置如表1所示。集群設置1臺主節點Master,3臺從節點Node1、Node2和Node3。集群環境為CentOS 7.4.1系統,Java 1.8.0 版本。并使用Java語言進行程序編寫。

Table 1 Configuration of flink cluster 表1 Flink集群配置
Flink集群支持Standalone cluster、Flink on Yarn和Flink on Kubernetes 3種模式,本實驗采用常用的Standalone cluster模式進行集群部署。
5.1.2 實驗設計
因為本文所提出的策略主要是針對窗口計算處理不同情況所導致的局部亂序流任務,所以數據集采用模擬生成的彈性亂序數據集,數據強度分別設置為10萬,20萬,50萬,80萬,100萬和150萬條流數據作為測試數據集,每次實驗用盡數據集。為驗證該策略和亂序度算法消耗的系統資源較小且在任務中可忽略不計,特設置每1 ms生成一條數據,根據馬卿云等人[17]對同等級集群帶寬傳輸開銷測試,數據亂序程度分別采用3 s, 5 s, 8 s, 10 s和12 s延遲上下浮動。算法亂序度分別采用3 s, 5 s, 8 s, 10 s和12 s進行對比。實驗采用流處理數據密集型任務WordCount作為窗口任務,窗口滑動距離20 s。并行度分別設為1,4,8,12,20和32。
因為本文設計的實驗為窗口任務處理不同亂序程度的流數據,故主要選取性能比作為評估標準,從有效性、魯棒性、可擴展性和抗壓性4個方面進行對比實驗。
性能比根據式(6)計算:
(6)
其中,Ni為參與窗口計算的數據量,Ni越大即計算完整沒有遲到數據或者低價值遲到數據越少,說明窗口計算準確性高;Di為窗口從開始到觸發計算結束所用時長,即窗口計算時延,Di越小說明窗口計算時延越低,即實時性越高。m為開啟窗口個數。性能比E即為平均計算數據量比平均窗口計算時長。性能比越高,說明計算兼顧準確性和實時性的能力越強。
5.3.1 有效性測評
本節分別為動態水位線算法和傳統固定水位線算法設置不同的最大等待時間,實驗結果如表2所示。

Table 2 Performance comparison of watermark algorithm with different algorithms out-of-order degrees表2 水位線算法在不同算法亂序度下性能對比
從表2中可以看出,最初最大等待時間太小限制了動態水位線算法的動態范圍,所以性能比略差于傳統固定水位線算法的;但是,隨著最大等待時間設置的增大,動態水位線算法動態范圍變大,可以更靈活地處理亂序數據,性能比也慢慢高于傳統固定水位線算法算法的。
5.3.2 魯棒性測評
本節分別為動態水位線算法和傳統固定水位線算法設置不同程度的亂序數據,實驗結果如表3所示。從表3中可以看出,面對不同亂序程度的數據流,動態水位線算法性能比均高于傳統固定水位線算法的。但是,當數據流亂序程度高于動態水位線算法最大等待時間范圍時,性能比會略微下降。這是由于不論動態水位線多靈活,也是有界限范圍的,超出動態范圍的嚴重遲到數據還是無法進入窗口計算。延遲超大的不確定彈性數據還是無法保證全部進入窗口計算。為了均衡實時性和準確性,有時不得不舍棄一些嚴重遲到的數據。而且,嚴重遲到的數據往往價值不大,大部分情況下舍去低價值數據避免過高的時延是非常有必要的。

Table 3 Performance comparison of watermark algorithm with different data out-of-order degrees表3 水位線算法在不同數據亂序度下性能對比
5.3.3 可擴展性測評
本節分別為動態水位線算法和傳統固定水位線算法設置不同的并行度,其中模擬數據亂序程度設為8 s,算法最大等待時間設為8 s。實驗結果如表4所示。從表4中可以看出,在不同并行度的情況下,動態水位線算法性能比上下浮動小,且均優于傳統固定水位線算法的。這說明動態水位線算法和調整策略效果穩定,具有一定的可擴展性。

Table 4 Performance comparison of watermark algorithms with different parallelisms表4 水位線算法在不同并行度下性能對比
5.3.4 抗壓性測評
本文分別為動態水位線算法和傳統固定水位線算法設置不同強度的數據量,其中模擬數據亂序程度設為8 s,算法最大等待時間設為8 s。因不同數據量和固定的窗口長度導致性能比無法對比,所以性能比按數據量比例采用平均每10萬條數據量進行計算。實驗結果如表5所示。從表5中可以看出,面對不同強度的數據量,動態水位線算法的性能比均高于傳統固定水位線算法的,且性能穩定。這說明動態水位線算法和調整策略在現有強度測試下效果穩定,具有一定的抗壓性。

Table 5 Performance comparison of watermark algorithms with different data sizes表5 水位線算法在不同數據量下性能對比
本文針對傳統固定水位線算法針對基于事件時間的不確定彈性亂序流數據的窗口計算無法同時兼顧準確性和實時性的問題,提出了基于局部亂序度算法的水位線動態調整策略。該策略基于流數據時間模型和局部亂序度算法對水位線進行動態調整。通過模擬流數據進行實驗測評,分析對比了2種水位線算法的準確性和平均時長。實驗結果表明,系統在保證大部分數據不丟失的前提下,縮短了窗口等待時間,提高了窗口計算的實時性,為因計算資源不同和網絡傳輸速率不同等原因導致的數據彈性亂序情況下的窗口計算提供了可靠的保證。
未來將考慮在異構集群環境中不同節點計算性能差距明顯的情況下進一步優化流計算模型,并考慮在更大的數據集上進行實驗驗證,以及在真實場景下進行應用,以測試系統平臺效果,并優化流計算算法。