朱蔚林,木偉民,金宗澤,王偉平
(1.中國科學院 信息工程研究所,北京 100093;2.中國科學院大學,北京 100049)
數據的價值隨著時間推移會慢慢降低,在社會生活中,特別是商業場景中這一現象更加顯著。流處理系統的出現讓用戶能夠快速地從龐雜的數據流中提煉出數據蘊含的價值。由于數據流的持續產生且體積龐大,給系統的存儲能力帶來了巨大挑戰。由此導致數據流統計具有one-pass訪問、有限內存和實時等特點,也為統計帶來了極大的挑戰。
現如今,大數據時代涌現了如S4[1-2]、Storm[3]、Spark Streaming[4]等一系列數據流處理平臺[5-6],這些平臺由于面向的應用場景不同而各有特點。S4由Yahoo在2010年提出,S4為了保證良好的可擴展性,采用去中心化結構,但是S4消息路由僅支持按照Key值進行分布,而且沒有提供消息處理的反饋機制,使得容錯成為了很大的問題。Storm面向純流式處理,以低延遲為核心設計目標,將數據流處理任務抽象為有向無環圖,稱作Topology,同時在at-least-once語義的基礎上實現了exactly-once語義,但是其弱中心化的結構也沒有徹底解決單點的問題。Spark Streaming構建在Spark的基礎上,沿用了Spark中創新性的存儲結構RDD(彈性分布式數據集),將數據流切分為一個個RDD,增大了處理粒度以提高吞吐量,但是也因此增加了延遲。
在數據流上有一種普遍的應用場景是在數據流上進行數據統計[7],例如對于電信網絡、社交網絡數據流等等。在這些數據流上的實時統計無論對于輿情控制,還是提供更高質量的服務都有重要意義。然而上述平臺并不能完美契合此類應用場景。
數據流統計場景下一個很典型的處理是分組統計。以社交網絡數據流為例,假設數據流S中每個元組包含uid,topic,time三個字段,uid唯一標識一個用戶個體,topic為用戶參與的話題,time為該條信息產生的時間分片,現在需要統計各個時間分片的熱點話題。如果將數據流類比為傳統的關系數據庫,會提交以下SQL語句:
SELECT topic, time, count(1)
FROM S GROUP BY topic, time
而對于數據流而言,數據流具有快速流動、易失的特性,絕不可能像數據庫一樣在一個時刻看到整個數據流的全部數據,而且熱點話題具有很強的時效性,統計結果延遲要盡量做到最小。同時,電信網絡、社交網絡由于用戶眾多,數據量往往很大,因此對流統計平臺的性能要求很高,如流統計的吞吐率和延遲等。
面對這樣的應用場景,系統主要的需求在于高吞吐率和低延遲。目前流行的通用分布式流處理平臺如Spark Streaming和Storm等,系統結構較復雜,在這種高吞吐量的實時連續統計場景上性能有所局限。對此,面向基于窗口的連續查詢需求,提出了一個高吞吐、低延遲的分布式數據流統計模型Mars,同時提供了強大的容錯性。
數據流可以理解為一個不斷增長的數據集合[8-9],將其定義如下:
定義1:設t表示任意時間戳,t時刻到來的數據集為dt,稱{…dt-1,dt,dt+1}為數據d的數據流。
數據流具有實時性、易失性、突發性、無序性、無限性等特征。在數據流上的統計查詢處理由于這些特征,劃分為不同的模型。
按照統計的時序范圍來劃分,數據流上的統計處理分為界標模型[10]、滑動窗口模型[11]以及跳動窗口模型[12]。
1.1.1 界標模型
設當前時間戳為t,s為一個指定的在t之前的時間戳,稱作界標,界標模型統計的是從界標時間戳s開始一直到當前時間戳t范圍內數據流{ds…dt}上的統計結果。
1.1.2 滑動窗口模型
設當前時間戳為t,滑動窗口模型統計的是當前時間戳之前一定時間范圍內的數據流,設滑動窗口大小為n,即時間范圍為n,滑動窗口模型所處理的數據流為{dt-n…dt}。
除了上述基于時間戳的滑動窗口模型,另外還有基于元組數量的滑動窗口,即在內存中保存一定數量的元組。
1.1.3 跳動窗口模型
跳動窗口是滑動窗口的一個延伸。在滑動窗口模型中,窗口以一定時間范圍或者元組數量為單位向前滑動,兩次連續的處理的數據集有重疊,而跳動窗口模型相鄰兩次處理數據集無重疊,這一次處理時間范圍的起點是上一次處理時間范圍的終點(以基于時間的窗口為例)。因此,每個跳動窗口的處理結果之間無交集,同時它們的并集就是整個數據集上全量的處理結果。
根據統計查詢提交形式的不同,查詢類型分為Ad-hoc查詢[13]和連續查詢[14]。
1.2.1 Ad-hoc查詢
Ad-hoc查詢也稱即席查詢,Ad-hoc查詢請求可以在統計系統運行時的任一時刻提交,統計系統接收到查詢請求后立即處理該查詢并在產生結果后立即返回查詢結果。目前處理Ad-hoc查詢的數據流處理平臺較少。
1.2.2 連續查詢
連續查詢請求在查詢統計系統啟動時或某一特定時間戳提交,在這一時間戳之后,直到用戶主動取消查詢請求,系統一直處理該查詢,每隔一定時間間隔輸出查詢結果。Mars面向的查詢類型即是連續查詢。
對數據流做統計處理時,最小的單位是元組。理想狀態下,數據流中的每一個元組都應該被處理一次,且僅僅處理一次。但是在系統層面,由于分布式系統本身的復雜性和不確定性,以及在某些應用場景下對系統需求的不同,往往會對精度做一定程度上的犧牲。根據對每一元組處理次數的保證,統計模型的語義分為三種。
1.3.1 至多一次(at most once)
至多一次語義是最松的約束,該語義模型盡力使得每一元組被處理到,但不保證對任意元組處理的必然性。由于約束寬松,實現這樣的模型系統開銷往往是最小的,由于這樣的特性,該語義適合對吞吐量要求較高、但不要求統計精確度的應用場景。
1.3.2 至少一次(at least once)
至少一次語義保證數據流中每一元組都至少被處理一次,但是在一些特殊情況發生時,有可能會造成重復處理某些元組。在這樣的語義約束下,統計模型實現時需要設計嚴格的容錯機制,確保在任何可控故障發生時,每個元組都會被處理,但是容錯性會帶來不菲的系統開銷。
1.3.3 精確一次(exactly once)
精確一次語義是理想狀態下的語義,也是最嚴格的語義約束。這一語義一般是在至少一次語義的模型基礎上,對統計結果進行去重而得到的。因此,系統實現時,精確一次語義比至少一次語義又多了一重開銷,只有在精確度要求極其苛刻的應用場景下會使用這一語義約束,例如銀行系統或證券系統。
Mars的系統架構如圖1所示。

圖1 Mars系統架構
由圖1可見,Mars依賴的外部組件有四個:分布式消息中間件提供消息服務,Mars的輸入和輸出都通過分布式消息中間件完成,統計任務數據庫儲存用戶的統計需求,分布式協調系統提供對分布式集群運行時的狀態管理等等,分布式緩存負責異步解耦和臨時緩存中間數據。
Mapper集群、Reducer集群是Mars的核心組件。Mapper集群從消息中間件拉取消息并處理,將中間結果順序緩存在分布式緩存中;Reducer集群從分布式緩存中順序讀取分布式緩存中的中間數據,處理后將最終的統計結果再發送回消息隊列。
Mapper和Reducer的靈感來自于MapReduce編程模型,盡管MapReduce編程模型是為了批處理場景而提出的,但是它將大規模數據處理過程抽象為Map和Reduce兩個階段,對于數據流統計問題同樣具有重要的指導意義。Mars將MapReduce模型擴展到了集群概念上,每一個Mapper或者Reducer計算單元都是分布式集群中的一個節點,分別稱作Mapper或Reducer。所有的Mapper節點組成Mapper集群,所有的Reducer節點組成Reducer集群。
Mapper集群采用去中心化結構,集群內各個節點是對等關系,使用去中心化結構的核心目標是將計算粒度切分,在進行較大窗口下的統計時,如果使用集中式結構,緩存窗口內的全部數據將造成極大的內存開銷和時間開銷。而Mars將這樣的大窗口切分為小窗口分布到集群上并行處理,有效解決了該問題。同時這一設計保證了Mapper集群良好的可擴展性,使得Mars的計算能力隨著集群規模的擴大可以得到近似于線性的增加。
Reducer集群的統計功能是將經過Mapper切分的細粒度統計結果合并為任務需求窗口大小的結果,因此Reducer集群采用主從架構。集群啟動時,各個節點首先借助分布式協調系統選舉出一個領導者,該領導者負責給各個節點分配任務,并監聽節點狀態,當集群規模發生改變時重新分配任務。同時,各個從節點也監聽主節點的狀態,當主節點發生故障時重新選舉領導者。
Mapper和Reducer通過分布式緩存傳遞數據[15]的協議設計是Mars的一個關鍵點。Mars使用了一種特殊的序號機制保證Mapper和Reducer協作步進,同時保證兩個階段異步運行。
初始狀態時,首先針對每個處理任務在分布式緩存中為Mapper集群和Reducer集群初始化一個序號,稱為SEQ。當Mapper節點處理完輸入的原始數據集后,將緩存中的SEQ自增1,使用自增操作的主要目的是使得多節點并行處理統一統計任務時不會得到統一SEQ,造成數據覆蓋。
而Reducer集群出于容錯性考慮,使用了延遲更新SEQ的策略。
容錯是所有分布式流處理系統應當關注的問題,雖然集群中每個節點發生故障的概率很小,但是一旦發生,由于數據流不斷流動的特點,丟失的數據便很難找回。大多數現有的故障恢復策略都是通過冗余備份策略實現的[16],Mars所采用的策略也類似。
在Mapper端,每一個節點拉取數據后都首先將數據存一份本地文件,同時將文件名與需要該數據的統計任務id列表的對應關系注冊到分布式緩存中,每當某一任務處理完該數據時,從列表中刪去該任務id,直到列表為空時刪除本地文件,最后向分布式消息中間件反饋ack消息。假如在ack之前該節點發生了宕機,由于消息中間件未接收到ack消息,當發生超時后,消息中間件會向其他節點重發該消息。這一機制保證了每一條元組都會被處理至少一次。
在Reducer端,采用延遲更新SEQ的方式來保證容錯性。以一個統計任務的處理過程為例,如圖2所示。Reducer集群有兩個節點,R1正在處理數據,初始R1從緩存中得到序號1,于是從緩存中得到序號為1的數據集并處理,處理完成后在內存中序號加1變為2,并不更新到緩存中。接著在緩存中讀取序號為2的數據集,假如在處理過程中,歸并后的統計結果尚未輸出之前R1節點發生宕機,經過新一輪的領導者選舉以及任務分配,該任務遷移到R2節點。這時R2節點從緩存中獲取到的序號依然為1,不會造成數據丟失。

圖2 Reducer容錯機制示意圖
綜上,Mars強大的容錯機制保證了Mars的at-least-once語義。
使用Java語言實現了Mars,并在典型的分組統計使用場景下對Mars進行了測試,同時與Storm和Spark Streaming進行對比。
實驗采用由30臺服務器組成的集群。其中,消息中間件擁有3個服務節點和30個數據節點,分布式緩存為15節點的主備集群,分布式協調服務擁有7個服務節點。
實驗的數據源選用了模擬的網絡數據流S,數據流的每個元組包含20個字段,其中關鍵字段有timestamp、type、sip、dip、port、location等。
每個字段的內容根據一個已知的集合中以均勻概率隨機生成,生成后每個元組的平均大小為150字節。共生成元組10億條,整個數據集大小140 G。在每個單元實驗前,提前將數據集加載到分布式消息中間件中。
在上述數據流上,構建了如下的統計場景:
SELECT sip, type, timestamp, count(*)
FROM S
GROUP BY sip, type, timestamp
WINDOWING 60 s
其中,WINDOWING關鍵字表示以60 s大小的跳動窗口進行統計。需要特別說明的是,對時間戳進行分組統計是為了使統計結果具有應用價值,Mapper會對時間戳以窗口大小,即60 s為單位進行歸一化。
性能一般以吞吐量為表征,吞吐量計算公式如下:
T=tps×bs×ts
其中,T表示吞吐量;tps表示消息中間件每秒處理的事務數,實驗中Mapper集群作為消息中間件的消費者,tps相當于每秒消費的數據集個數;bs表示每個數據集包含的元組數;ts表示每個元組的大小(平均)。
分別在不同的數據集大小和不同的集群規模下對上述統計需求進行實驗,結果如下所述。
3.4.1 集群規模固定,數據集大小不同
實驗在20個節點的集群上完成,吞吐量取統計過程中整個集群吞吐量的平均值。由圖3可見,隨著每個數據集所包含的元組數量的增長,一開始吞吐量上升很快,當數據及大小達到5 000時,吞吐量達到峰值,隨著數據及大小繼續增加,吞吐量呈緩慢下降的趨勢。不難分析出,當數據集大小為1這種極端情況時,每次網絡開銷只傳輸一個元組,效率極低;當數據集大小增大到5 000個元組時,網絡開銷和解包開銷達到一個平衡點,故性能達到最優;當數據集大小繼續增大時,雖然每次網絡開銷得到的數量足夠大,但是反序列化數據流會占用大量的CPU資源,導致用于統計的系統資源減少,從而吞吐量下降。

圖3 不同數據集大小時的吞吐量變化曲線
3.4.2 數據集大小固定,集群規模變化
在上述實驗得出的最優數據集大小下,吞吐量取統計過程中整個集群吞吐量的平均值。由圖4可見,吞吐量隨著集群規模的逐步增大,幾乎呈線性增長,當集群規模增大到20個節點時,性能達到最優,此時整個集群吞吐量達到3.5 GB/s,可以計算得單節點吞吐量為179 MB/s。

圖4 不同集群規模時的吞吐量變化曲線
當集群規模繼續擴大時,吞吐量并未繼續增加,這是由于從分布式消息中間件消費數據時是讀盤操作。
測試數據中使用的數據是提前發送并緩存在消息隊列上的。系統從消息隊列中消費數據時,消息隊列會從磁盤上讀取數據。由于消息隊列特性,每個消息隊列節點只會從一塊磁盤上讀取數據。一共有35個消息隊列節點,磁盤讀取速度上限大約是100 Mbps,因此整個消息隊列所能提供的最大消費速率約為3.5 G。當集群規模大于20時,由于消息隊列磁盤讀取速度已達上限,速度無法繼續增加。
同時,將上述的統計需求分別使用Spark Streaming和Storm的編程接口進行了實現,二者同樣使用Mars的分布式消息中間件作為輸入和輸出。
3.5.1 性能對比
由于Spark Streaming和Storm本身已經對數據集進行了抽象,故無需在不同數據集大小的情況下進行對比。在不同的集群規模下,實驗結果如圖5所示。

圖5 Mars,Spark Streaming,Storm性能對比曲線
由圖5可見,與Storm和Spark Streaming相比,Mars在分組統計需求下具有較明顯的性能優勢。在集群規模為20時,Mars的吞吐量是Spark Streaming的1.46倍,是Storm的2.82倍。
3.5.2 實時性對比
實時性方面,實驗計算了部分處理日志中記錄的每個元組的平均延遲。Storm專門為了流處理場景設計,平均延遲最小,為653 ms;Spark Streaming由于需要“攢”數據,平均延遲達到了2 383 ms;Mars介于兩者之間,平均延遲為1 372 ms。
3.5.3 語義準確性對比
由于統計需求是在連續跳動窗口上的分組統計,沒有過濾對數據量產生變化的計算,因此如果統計過程保證了exactly-once語義,那么統計結果中分組統計量的和應與原始記錄數量保持一致。
該實驗使用上述實驗數據中一個4 000萬數據量的子集完成,實驗集群在實驗過程中分別將其中三個節點斷網以模擬故障,實驗結果如表1所示。

表1 Mars Spark Streaming,Storm語義準確性對比
由表1可見,Mars雖然僅實現了at-least-once語義,但由于其良好的容錯性,在發生節點故障時并沒有造成數據丟失或重復,實現了與Storm相同級別的語義限制。
提出了一個面向基于窗口的連續查詢需求的分布式數據流統計模型。該模型在保證at-least-once語義的前提下,實現了優異的性能,尤其在基于窗口的分組統計這一統計場景下,相比目前流行的分布式數據流處理平臺具有較明顯的優勢。同時,Mars具有良好的可擴展性,使其在面對不同規模的數據場景時有良好的適應性。
與此同時,由于該統計模型是面向大規模實時流統計場景的,因此對于需要進行迭代計算、復雜計算的流處理場景支持并不完善。該模型的下一步發展和完善應當是面對更多流處理場景,進行通用化拓展。
[1] CHAUHAN J, CHOWDHURY S A, MAKAROFF D. Performance evaluation of Yahoo! S4:a first look[C]//Seventh international conference on P2P,parallel,grid,cloud and internet computing.[s.l.]:IEEE,2012:58-65.
[2] NEUMEYER L,ROBBINS B,NAIR A,et al.S4:distributed stream computing platform[C]//Proceedings of the 10th IEEE international conference on data mining workshops.Sydney:IEEE Press,2010:170-177.
[3] SIMONCELLI D,DUSI M,GRINGOLI F,et al.Scaling out the performance of service monitoring applications with BlockMon[C]//Proceedings of the 14th international conference on passive and active measurement.Hong Kong:IEEE Press,2013:253-255.
[4] ZAHARIA M,DAS T,LI H,et al.Discretized streams:an efficient and fault-tolerant model for stream processing on large clusters[C]//USENIX conference on hot topics in cloud computing.[s.l.]:USENIX,2012.
[5] 張 鵬,李鵬霄,任 彥,等.面向大數據的分布式流處理技術綜述[J].計算機研究與發展,2014,51:1-9.
[6] 崔星燦,禹曉輝,劉 洋,等.分布式流處理技術綜述[J].計算機研究與發展,2015,52(2):318-332.
[7] GüNDüZ S,?ZSU M T.A web page prediction model based on click-stream tree representation of user behavior[C]//Proceedings of the ninth ACM SIGKDD international conference on knowledge discovery and data mining.[s.l.]:ACM,2003:535-540.
[8] BABCOCK B,BABU S,DATAR M,et al.Models and issues in data stream systems[C]//Proceedings of the twenty-first ACM SIGMOD-SIGACT-SIGART symposium on principles of database systems.[s.l.]:ACM,2002:1-16.
[9] 孫大為,張廣艷,鄭緯民.大數據流式計算:關鍵技術及系統實例[J].軟件學報,2014,25(4):839-862.
[10] PERNG C S,WANG H,ZHANG S R,et al.Landmarks:a new model for similarity-based pattern querying in time series databases[C]//16th international conference on data engineering.[s.l.]:IEEE,2000:33-42.
[11] BABCOCK B,DATAR M,MOTWANI R.Sampling from a moving window over streaming data[C]//Proceedings of the thirteenth annual ACM-SIAM symposium on discrete algorithms.[s.l.]:[s.n.],2002:633-634.
[12] ZHU Y,SHASHA D.Statstream:statistical monitoring of thousands of data streams in real time[C]//Proceedings of the 28th international conference on very large data bases.[s.l.]:[s.n.],2002:358-369.
[13] 熊全洪,魏 娟,劉 武.即席查詢研究[J].現代商貿工業,2008,20(12):345-346.
[14] CHANDRASEKARAN S,FRANKLIN M J.Streaming queries over streaming data[C]//Proceedings of the 28th international conference on very large data bases.[s.l.]:[s.n.],2002:203-214.
[15] 何小東,尹海波.基于共享緩沖區的數據流處理框架設計與實現[J].計算機工程與設計,2012,33(11):4398-4401.
[16] 陳晗鳴,羅 威,李明輝.分布式系統中基于主/副版本的實時容錯調度綜述[J].計算機應用研究,2012,29(11):4017-4022.