杜華明 ,張 鵬 ,徐克付 ,譚建龍 ,李 焱
(1.中國科學技術大學軟件學院 合肥 230051;2.中國科學院信息工程研究所 北京 100093;3.信息內容安全技術國家工程實驗室 北京 100093;4.國家計算機網絡應急技術處理協調中心 北京 100029)
隨著云計算、物聯網等技術的興起,數據正以前所未有的速度不斷增長和積累,大數據時代已經到來,其中典型的3個特點就是:規模性、多樣性和高速性[1]。同時,大數據主要的處理模式包括批處理和流處理兩種[2]。批處理是先存儲后處理,而流處理則是直接處理。流處理的基本理念是數據的價值會隨著時間的流逝而不斷減少,因此盡可能快地對最新的數據做出處理并且給出結果是所有流處理的共同目標。流處理系統從處理模型上可以分為集中式、分布式以及并行分布式。然而,無論哪種處理模型,提高流處理的可靠性都是其中的熱點和難點。故障容錯作為提高系統可靠性的一個方面已經被廣泛研究,并且出現了很多成熟的技術。總的來說,一個故障容錯協議必須包含兩個部分:一是節點的故障檢測和替換;二是故障節點丟失狀態的恢復。其中,檢測和替換是被動行為,而丟失狀態的恢復則是一種需要持續保存可能丟失狀態信息的主動行為。在參考文獻[3]中,給出了一個查詢算子的故障容錯的3種技術:主動備份技術、被動備份技術和上游備份技術。這3種技術的主要區別在于出現故障時保存可能丟失狀態信息的方式(也就是查詢算子的狀態)。
主動備份技術(或者主動復制)通過一個算子的備份節點提供算子的故障容錯,其中這個備份節點中的算子處理與主節點的算子相同的元組。也就是說,當主節點的算子出現故障時,可以使用這個備份節點的算子替換它。這種容錯技術會帶來較高的開銷,其中主要的開銷是保存副本的空間開銷,因為它們在數據處理的大部分時間中并沒有被利用。此外,元組必須發送到多個節點,這也會帶來額外的時間開銷。最后,備份節點的算子必須和主節點的算子保持相同的元組處理順序,這會產生額外的時間開銷。此外,當主節點出現故障時,主動備份技術需要把主節點的輸出流切換到備份節點,因此故障恢復的時間也較長。
被動備份技術把屬于要備份節點的算子狀態周期性地復制到備份節點上。復制可以不斷地在備份節點上或者在專用的節點上進行,當主節點出現故障時,這些備份被安裝到替換的備份節點上。一個算子的周期性的復制被稱為校驗。與主動備份相比,周期性的校驗減少了主節點和備份節點之間需要發送的元組個數,所以被動備份的時間開銷較少。另一方面,由于在最后一個校驗點和出現故障的這段時間內發送到主節點的所有元組都沒有在備份節點中被維護,所以這些元組需要重新發送到備份節點上,因此導致被動備份的故障恢復時間較長。
上游備份是一種不同的備份機制,它不需要使用任何備份節點,只依賴上游節點和下游節點。上游節點定義了一個協議用于維護其所輸出元組的狀態,直到下游節點確認這些元組可以被刪除。上游備份的核心思想是當主節點出現故障時,上游節點把所有在輸出隊列并且還沒有被下游節點確認的元組重新發送到替換的節點上。上游備份的唯一開銷就是維護上游節點發送的元組的空間開銷。然而,由于主節點狀態需要重建并且再次分別處理每個元組,因此上游備份的故障恢復時間會較長(恢復時間取決于主節點狀態的恢復時間)。
對于數據流處理系統,故障容錯仍然是其中的重點和難點。在工業界中,S4和Storm是當前流行的數據流處理系統,其中S4使用Zookeeper來協同集群中的任務分配,集群中活躍(active)節點被分配具體的任務,而空閑(idle)節點放在池中以在故障容錯或者負載均衡時使用。特別地,一個空閑節點可以注冊成為分配不同任務的多個活躍節點的備份節點。對于S4在運行一段時間后可能出現失敗、基礎設施更新、調度重新分配和應用更新等情況,參考文獻[4]中提出了相應的策略:高可用策略、基于檢查點的狀態恢復策略和低時延的處理策略。對于Storm,參考文獻[5]中介紹的容錯技術僅是將元組不停地重發,以保證每個元組至少得到一次完整的處理。
在學術界中,參考文獻[6]認為現有的故障容錯技術(無論被動備份還是主動備份)都會在運行時增加一些時間開銷。該文的一個主要工作是提出了一個預測模型,該模型會在輸入元組處理失敗時啟動故障容錯機制。這個模型需要分流器來持續監控一個節點狀態,并且把它們標記為正常(normal)、警告(alert)和失敗(failure)。當一個節點的狀態從正常變為警告時,分流器會啟動故障容錯機制。為了構建分類器,這個模型定義了一段訓練時間以預測未來可能出現的故障。當預測模型發現一個可能在短期內發生的故障時,它首先把故障節點的算子遷移到一個專用的節點上以減少故障和算子替換所造成的影響。當節點狀態是警告時,為了收集用來預測相同類型算子未來出現故障的信息,可能出現故障的算子的監控力度會增加。然而,該工作的不足在于實際中能夠被預測的故障類型是很少的。不僅如此,該文提出的故障容錯技術只有在預測模型所需的計算資源沒有超過其他故障容錯技術所需要的計算資源時才有效。
[7]和參考文獻[8]中,關注非確定性算子的故障容錯。在數據流中,算子執行的非確定性可以通過對輸入流元組到達次序敏感的函數或者依賴時間的函數來定義。如作者所述,用來提供這種算子故障容錯的維護信息包括到元組到達次序信息和依賴元組到達次序敏感的函數信息。該文作者也研究了如何有效維護副本,使其能夠克服嚴格同步副本中輸入元組的次序所帶來的限制,同時作者還研究了如何以多線程方式運行副本。
在參考文獻[9]中,作者提出了一種混合主動備份和被動備份的容錯協議,其中的核心思想是周期性地校驗查詢算子的狀態,這些狀態并不存放到專用的節點,而是存放到這個算子的空閑副本上。也就是說,算子的一個副本正在被其他節點所維護,這個副本不用接收主節點所處理的相同元組,它的狀態只需要通過增量的校驗點來持續地更新。當主節點出現臨時故障時,算子的副本被啟用并且開始處理和主節點所處理的相同元組。如果主節點出現永久故障時,那么這個查詢會得到指示,開始只向算子的副本發送元組并且開始使用它的輸出元組。
參考文獻[10]在前期工作[11]的基礎上提出了一種基于異步校驗機制(類似于模糊校驗)的容錯協議,該協議沒有對算子狀態和它的輸出隊列進行校驗,而是讓所有輸出元組都包含窗口校驗元組(專門用來描述窗口中間狀態的元組),并且只有輸出的數據流被持久化。當節點出現故障時,通過讀取該節點的算子輸出隊列來查找最近的窗口校驗信息,以計算出從哪個位置重新發送數據流的元組。如作者討論的一樣,由于這種校驗方式在重建故障節點的算子狀態時減少了重新發送元組的數量,所以可以減少故障恢復的時間。
StreamCloud[12]在參考文獻[10]的基礎上對故障容錯以下幾個方面的改進:
·出現故障時重新發送元組的時間點的信息earliest timestamp只包含在輸出元組的頭部,減少了存儲開銷;
·earliest timestamp是在線維護的,因此避免了在并行文件系統中查找數據的不必要的讀取操作;
·數據流持久化中通過采用一種對持久化信息自識別的命名方式避免了元數據的維護[9],減少了故障容錯過程對運行時的影響;
·基于earliest timestamp的容錯協議可以對重新部署期間發生的故障進行容錯。
然而,上述技術如果把故障粒度定為數據流中的每個元組,當元組數量很多時,跟蹤這些元組是否已經被處理的內存開銷會很大。因此需要一種既能節約內存又能夠保證需要處理的每個元組都被處理的可靠方案。為此本文提出了一種既能夠保證元組得到可靠處理又能夠節省內存開銷的元組跟蹤方法。該方法包括內存分配策略、元組跟蹤單元選擇策略和校驗值更新策略,這3個策略通過只保留元組標識符的異或校驗值而不是元組來減少內存開銷,同時通過改進一致性散列來實現元組跟蹤單元的負載均衡。
流是由具有相同數據模式且無界限的元組序列組成的。與傳統的數據庫系統不同,數據流處理系統處理的數據是沒有經過持久化的流數據,當且僅當滿足查詢條件時,數據流處理系統才會將查詢結果返回給用戶,所以該查詢又可以稱作連續查詢。一個查詢可以定義為一個有向無環圖,并且圖中每個節點都是一個操作算子,圖中每條邊可以表示的是數據流向。本文把查詢的起點稱為元組生成器(spring),處理單元稱為元組處理器(processor)。元組生成器可以產生并發送數據流,其中的數據流中的元組被稱為根元組。根元組的狀態分為正在處理狀態(pending)、處理失敗狀態(failure)和處理成功狀態(finished)3種。元組處理器接收數據流并且處理數據流,處理后的數據流也可以發送給其他的元組處理器。元組生成器和元組處理器在元組的處理過程中會啟動多個任務線程來并行處理元組。
元組生成器處理根元組后會產生多個元組,這些元組經過元組處理器處理后可能繼續產生新的元組,直到元組處理器不再產生新元組為止,所有元組所形成的一個樹狀結構被稱為元組樹。元組樹的根元組用springId唯一標識,其他元組用tupleId唯一標識。在元組樹形成的過程中,元組生成器和元組處理器會不斷地向元組跟蹤器發送消息,元組跟蹤器根據發送來的消息構造跟蹤記錄,并將跟蹤記錄存儲在元組跟蹤單元中(元組跟蹤單元的選擇策略在第2.2節介紹)。元組跟蹤單元(acker)是一個跟蹤元組處理過程的進程。跟蹤記錄是一個三元組,表示為

圖1 元組跟蹤器的交互過程
在元組跟蹤器啟動階段,首先是初始化元組跟蹤單元的數量,元組跟蹤單元的數量可以從配置文件中讀取,也可以在元組跟蹤器運行狀態下,通過對外接口來修改元組跟蹤單元數量。然后查看是否能夠產生元組跟蹤單元。如果能夠正常產生元組跟蹤單元,元組跟蹤器開始接收元組生成器發送的springId和taskId,以進入運行階段。否則,元組跟蹤器再次讀取配置文件重新生成相應數量的元組跟蹤單元。
元組跟蹤器的運行階段可以細劃為內存分配、元組跟蹤單元選擇和校驗值更新3個子階段。如圖1所示,當元組跟蹤器接收到元組生成器發來的springId和taskId后,首先進入內存分配階段,元組跟蹤器為每個元組分配大約20 byte的內存空間來構造跟蹤記錄;然后進入元組跟蹤單元選擇階段,通過元組跟蹤單元選擇策略將跟蹤記錄存儲在不同的元組跟蹤單元上;最后進入校驗值更新階段,元組生成器(元組處理器)會不斷地向元組跟蹤器發送消息(已處理的元組ID和新產生的元組ID),元組跟蹤器利用元組生成器(元組處理器)發來的消息不斷地更新跟蹤記錄中的校驗值,更新策略在第3.3節介紹,當校驗值為0時,說明跟蹤記錄所跟蹤的元組已經得到了完整的處理。此時,元組跟蹤器將會通知元組生成器相應的任務線程,任務線程將對應的元組的狀態修改為已完成(finished)。最后,元組生成器會將狀態為已完成的元組從內存中移除。當校驗值不為0時,任務線程將對應的元組的狀態修改為失敗(failure),最后,元組生成器會將狀態為失敗的元組重新發送。
元組跟蹤器和數據流處理引擎之間是松耦合關系,它可以獨立于數據流處理引擎運行,當不需要元組跟蹤器時,用戶可以通過命令行來終止元組跟蹤器所對應的進程。
為了實現節省內存、負載均衡和可靠的元組處理,元組跟蹤器采用的技術主要涉及內存分配、元組跟蹤單元選擇和校驗值更新,下面具體介紹這3個策略。
大數據具有規模大并且速度快的特點,要保證從元組生成器產生的每個根元組都得到至少一次完整的處理,則需要對每個根元組所形成的元組樹中的每個元組進行跟蹤,以確定元組樹中的所有元組是否都得到完整的處理。但是,如果元組樹中含有成千上萬個節點,對元組樹的跟蹤所占用的內存會隨著元組樹中節點數的增加而呈現指數級增長,這樣會導致內存溢出。針對跟蹤元組樹的內存開銷問題,本文提出一種節約內存的方法,該方法只保留元組標識符的異或校驗值而不是元組。其中,元組生成器接收根元組后,會向元組跟蹤器發送springId和taskId,然后,元組跟蹤器利用springId、taskId以及checkValue構造跟蹤記錄。當這個根元組得到完整處理時,元組跟蹤器會通知taskId對應的任務將根元組從內存中移除,否則,元組跟蹤器會通知taskId對應的任務重新發送該根元組。
由于元組生成器產生元組的數量多,如果僅使用一個元組跟蹤單元來跟蹤元組生成器產生的所有根元組,那么元組跟蹤單元的負載會很高。因此,元組跟蹤器需要使用多個元組跟蹤單元來跟蹤元組生成器產生的根元組。為了使各個元組跟蹤單元跟蹤元組的數量盡量均衡,需要一個將元組的跟蹤記錄分配到不同的元組跟蹤單元的分配策略,使得各個元組跟蹤單元負載相對均衡,這樣不僅可以減小單個元組跟蹤單元的負載壓力,同時也可以提高整體性能。如果某個元組跟蹤單元異常終止,元組跟蹤器會將該元組跟蹤單元的跟蹤記錄分配給其他的元組跟蹤單元。
本文中,元組跟蹤單元的選擇采用了改進的一致性散列策略,其中的原理如下:元組跟蹤器使用散列函數將元組生成器產生的根元組ID映射到環上的某一個值,環是一個由0~(232-1)的數值組成的空間。然后將一個元組跟蹤單元及其副本分別映射到環上的某一個值,每個元組跟蹤單元跟蹤逆時針方向上與它距離最近的元組,這樣每個元組跟蹤單元所跟蹤的元組數量就會相對均衡。具體的分配步驟如下。
首先將元組映射到一個32 bit的key值,該映射首先初始化全局變量hash=0,然后將字符串中每個字符從右到左順序執行如式(1)所示的計算式:

最后執行如式(2)所示的計算式:

所得的key值對應環中的某個值。
例如根元組、根元組2…根元組6和acker A、acker B,將這6個根元組和兩個acker的ID映射到環上,散列函數為key=hash(value),該函數封裝的是式(1)和式(2)的邏輯,value是springId或ackerId,然后將映射的結果封裝成location=
對于環中的每個根元組,從根元組的key值出發,沿順時針方向旋轉搜索,當遇到第一個acker時,將元組的location存儲在該acker上,因為springId和ackerId的散列值是固定的,因此這個元組和acker的關系必然是唯一和確定的。理想的散列結果是將所有元組均勻分配到各acker中,采取的策略就是將一個ackerId映射到兩(N)個位置,這樣可以保證跟蹤記錄相對均勻地分配到各個acker中。此時value為acker_id#1,acker_id#2,…,acker_id#N,hash 值 key=hash(value),hash 值在環上的分布以及分配結果如圖2(b)所示。
在元組跟蹤器運行過程中,由于某些原因導致acker的數量減少,根據圖 2(a)和圖2(b)所描述的映射方法,這時受影響的將僅是沿acker B1和acker B2逆時針遍歷直到下一個 acker(acker A2和 acker A1)之前的 location,也是本來映射到acker B上的那些location。那么,僅需要將根元組2對應的location分配給acker A2,根元組4和根元組6對應的location分配給acker A1即可,元組的重新分配如圖 2(c)所示。
假如因為元組的數目過多(僅有acker A和acker B記錄這些元組對應的location狀態負載過大)而增加acker或者通過修改元組跟蹤器的acker數量N值增加acker(假設新增acker C)。通過圖2(b)中所提到的將ackerId映射到環的方法,acker C的兩個hash值分別映射到根元組3和根元組5、根元組4和根元組6所對應的location之間,這時受影響的元組將僅是那些沿acker C1或acker C2逆時針遍歷直到下一個acker B2和acker A2之間的location(它們本來也是映射到 acker B1和 acker A2上的),將這些location重新映射到acker C1和acker C2上即可,映射的結果如圖 2(d)所示。
以上所描述的就是元組跟蹤單元選擇策略的全過程,通過使用改進的一致性散列將一個ackerId映射到環上的多個位置,不僅能夠保證元組的跟蹤記錄分配的相對均衡,同時也保證增加或者減少元組跟蹤單元只會影響小部分已經分配的跟蹤記錄。
3.3 校驗值更新策略
為了保證元組生成器產生的根元組至少得到一次完整的處理,當元組沒有被成功處理時,元組跟蹤器將會重發沒有得到成功處理的根元組。元組跟蹤器通過跟蹤記錄中的校驗值(0/非0)來判斷元組生成器產生的根元組是否得到完整的處理,如果某個元組沒有得到完整處理(校驗值為非0),那么元組跟蹤器會通知元組生成器中處理該元組的任務(task)重新發送該元組。
跟蹤記錄通過對校驗值判斷根元組是否已經得到完整的處理。不管這棵元組樹多大,它只是簡單地把這棵樹上的所有已處理的tupleId(根元組ID除外)和新產生的tupleId進行異或(XOR)運算,并以此結果更新校驗值。當校驗值為0時,表示跟蹤記錄中對應的根元組被完整地處理,因為元組樹中的每個tupleId都出現兩次,所以異或的結果為0,由此證明根元組產生的元組樹中的所有元組都得到完整處理。反之,沒有得到完整處理,這是因為如果在處理元組的過程中出現元組丟失,那么元組樹中每個tupleId不會出現兩次,所以結果不為0,此時通過跟蹤記錄中的taskId通知元組生成器中相應的任務重發springId對應的根元組。因為校驗值是64 bit的,所以元組樹存在未被處理的元組但是異或結果為0的概率是1/264,因此可以忽略。下面描述一下checkValue更新的過程。

圖2 元組跟蹤單元選擇策略
首先,元組生成器產生具有64 bit ID的根元組,并將其置為pending狀態,然后元組生成器將springId和taskId發送給元組跟蹤器,元組跟蹤器用收到的springId、taskId和checkValue(初始化為0)構造跟蹤記錄后,將springId生成的tupleId進行異或運算,用得到的結果與checkValue做異或運算,用異或運算的結果更新checkValue。
然后,元組處理器每次處理元組后,會給元組跟蹤器發送處理元組的tupleId及新生成的元組的tupleId。同樣,將已處理的元組的tupleId和新生成的元組的tupleId進行異或運算,將得到的結果與checkValue進行異或運算以更新校驗值。
最后,當元組處理器不再產生新元組時,僅將輸入的tupleId做異或運算,將得到的結果與checkValue做異或運算,用異或運算的結果更新checkValue。
以上過程就是某一個跟蹤記錄中校驗值的變化過程。判斷跟蹤記錄第一個字段springId所代表的根元組是否重發的依據就是判斷校驗值是否為0。當校驗值為0時,元組跟蹤器會將該springId和taskId發回給元組生成器,元組生成器將會根據taskId把springId的元組狀態置為已完成狀態,并將該元組從內存中移出。否則,將會通知元組生成器更新taskId中springId的元組狀態為失敗,那么元組生成器會重發該元組。
下面以元組springId為01001為例,介紹它的跟蹤記錄校驗值的更新的全過程。如圖3所示,元組生成器產生springId為01001的根元組,元組生成器處理根元組并產生了tupleId為01010和01011兩個元組,此時沒有需要確認的元組,元組生成器將這兩個新產生的元組ID發送給相應的元組跟蹤單元,元組跟蹤單元將這兩個元組tupleId為01010和01011進行異或運算,將得到的結果與校驗值做異或運算。計算所得的結果更新校驗值。將tupleId為01010的元組被發送到元組處理器1中,將tupleId為01011的元組被發送到元組處理器2中,計算過程如圖3(a)所示。
元組處理器1處理01010這個元組,處理元組的結果是產生了新元組,其tupleId為01100。元組處理器1將已處理的元組tupleId和新產生的元組tupleId發送給相應的元組跟蹤單元,元組跟蹤單元將01010和01100進行異或運算,用所得的結果與校驗值做異或運算,用計算所得的結果更新校驗值,將tupleId為01100的元組傳到元組處理器3中,計算過程如圖3(b)所示。
元組處理器2處理01011這個元組,處理元組的結果是產生了新元組,其tupleId為01101。元組處理器2將已處理的元組tupleId和新產生的元組tupleId發送給相應的元組跟蹤單元,元組跟蹤單元將01011和01101進行異或運算,用所得的結果與校驗值做異或運算,用計算的結果更新校驗值,將tupleId為01101的元組發送到元組處理器3中,計算過程如圖3(c)所示。
元組處理器3處理01100和01101這兩個元組,不再有新的元組生成。那么,元組處理器3僅將已處理的元組tupleId發送給相應的元組跟蹤單元,元組跟蹤單元將01100和01101做異或運算,用所得的結果與校驗值做異或運算,用計算所得的結果更新校驗值,計算過程如圖3(d)所示。
下面通過實驗驗證元組跟蹤器的內存開銷和負載。本實驗環境采是用4臺4核主頻2.40 GHz的PC機做服務器,內存4 GB。本實驗所定義的查詢僅有一個映射(map)算子和一個聚合(aggregate)算子。數據源是文件中讀取的英文文本數據,本實驗的目標是計算出文本中不同的單詞在文本中出現的次數。元組生成器將文本中的每一行數據封裝為一個根元組,map算子就是將每行的文本以空格為分隔符將根元組分割成若干元組 (單詞),并將每個元組(單詞)發給aggregate算子做聚合計算。
實驗一驗證僅有一個元組跟蹤單元時,輸入元組的數量對元組跟蹤單元占用內存的影響。輸入的元組數從10萬到90萬不等,元組跟蹤單元的內存變化情況如圖4(a)所示,當輸入的元組個數是10萬時,上文已經介紹了一個跟蹤記錄僅消耗20 byte的內存,理論上10萬應該消耗1.9 MB內存,而實際僅消耗0.86 MB內存,內存消耗減少了55%左右。當輸入元組個數是20萬時,理論計算應該消耗3.8 MB左右,而實際上僅消耗1.59 MB,內存消耗減少了58%左右。出現以上現象是因為當元組跟蹤單元確定根元組已經得到完整的處理后,元組跟蹤器會通知元組生成器將對應根元組移除內存,與此同時元組跟蹤單元也會將相對應的跟蹤記錄移除內存,也就是說在不斷地構造新跟蹤記錄的同時也將已處理的跟蹤記錄移除內存,所以該階段內存的消耗的增長率較小,從圖4(a)中可以看出元組的數量較少時,元組跟蹤器的內存節開銷較小。當輸入的元組增大到40萬后,內存開銷增長率會平緩地增大,由于程序對元組的跟蹤能力有限,所以內存開銷會增加。當輸入元組增長到一定數量后,受機器內存的影響,元組跟蹤器的內存開銷將不會增加,內存開銷增長率為0。同時,從圖中可以看到,隨著輸入元組數量的增多,無元組跟蹤單元的數據流處理系統的內存開銷線性增長,當輸入元組個數為90萬時,無元組跟蹤單元對元組的跟蹤需要消耗大約69 MB內存,而元組跟蹤單元對元組的跟蹤僅僅消耗大約16 MB的內存。所以圖4(a)也從側面反映了另外的一個問題:即使使用單個元組跟蹤單元來跟蹤元組,內存開銷也是很小的。第4.2節觀察在使用多個元組跟蹤單元時,是否能夠進一步地減少單臺機器的內存開銷。

圖3 校驗值更新流程
實驗二是觀察隨著元組跟蹤單元的增加,各元組跟蹤單元的內存開銷(這里所說的內存開銷指的是集群中各機器內存平均開銷)情況。將輸入元組限定在50萬個,當元組跟蹤單元的數量為1時,可以從圖4(b)中看出,內存開銷略小于7 MB,當元組跟蹤單元增多時,元組跟蹤單元內存開銷不斷減少,這是因為隨著元組跟蹤單元數量的增加,原來分配到一個元組跟蹤單元的跟蹤記錄被多個跟蹤單元均勻分配,所以隨著元組跟蹤單元的增加,內存開銷逐漸減少。從圖中可以看出,當元組跟蹤單元的數量超過6個時,元組跟蹤單元的內存開銷減少幅度越來越小,這是被集群自身節點個數所限制,因為在集群中機器數量較少的情況下,隨著元組跟蹤單元的數量不斷增多,集群中每臺機器中運行的元組跟蹤單元數量相對較多,那么內存開銷會逐漸增大。
實驗三主要是驗證第3.2節的元組跟蹤單元選擇策略能否使得各個元組跟蹤單元存儲的跟蹤記錄相對均衡,要驗證的是輸入元組的數量對元組跟蹤單元負載均衡的影響,分別做3次實驗,每次實驗的元組跟蹤單元的數量都設置為6個,并且每次實驗輸入元組的數量分別為10萬個、20萬個和30萬個。實驗結果如圖4(c)所示。從圖中可以看出,隨著輸入元組數量的增多,曲線逐漸趨于橫線,這也說明了輸入的元組越多,各個元組跟蹤單元存儲的元組的總數量相對更均衡。這主要是因為元組跟蹤單元選擇策略是將各個元組跟蹤單元及其副本的映射值均勻分布在環上,采用元組跟蹤單元的副本策略可以使各個元組跟蹤單元在環上的分布盡量均衡,這樣輸入元組的數量越多,將元組映射到環上的位置占用率就越高,根據第3.2節所述的跟蹤記錄分配方法,在數量上每個元組跟蹤單元所存儲的跟蹤記錄也就相對地均衡。

圖4 元組跟蹤器的內存開銷和負載
規模性、多樣性和高速性是大數據處理必須要考慮的3個特征,很多研究工作圍繞這3個特征正在展開。針對規模性和高速性,本文提出了一種面向數據流處理的元組跟蹤方法,其中的最大特點是節約內存,該方法具體包括內存分配策略、元組跟蹤單元選擇策略和校驗值更新策略,這3個策略通過只保留元組標識符的異或校驗值而不是元組減少內存開銷,同時通過改進一致性散列實現元組跟蹤單元的負載均衡。內存開銷和負載均衡的相關實驗表明,該方法有效實現了元組的可靠處理。
參考文獻
1 孟小峰,慈祥.大數據管理:概念,技術與挑戰.計算機研究與發展,2013,50(1):146~169
2 Kumar R.Two computational paradigm for big data.http://kdd2012.sigkdd.org/sites/images/summerschool/Ravi.Kumar.pdf,2012
3 Hwang J H,Balazinska M,Rasin A,et al.High-availability algorithms for distributed stream processing.Proceedings of 21st International Conference on Data Engineering (ICDE 2005),Tokyo,Japan,2005
4 S4 distributed stream computing platform.http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance/,2013
5 Guaranteeing message processing.https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing,2013
6 Gu X H,Papadimitriou S,Yu P S,et al.Toward predictive failure management for distributed stream processing systems.Proceedings of 2008 the 28th International Conference on Distributed Computing Systems (ICDCS’08),Washington,DC,USA,2008
7 Brito A,Fetzer C,Felber P.Minimizing latency in fault-tolerant distributed stream processing systems.Proceedings of 2009 the 29th IEEE International Conference on Distributed Computing Systems(ICDCS’09),Washington,DC,USA,2009
8 Brito A,Fetzer C,Felber P.Multithreading-enabled active replication for event stream processing operators.Proceedings of the 28th IEEE International Symposium on Reliable Distributed Systems(SRDS’09),Niagara Falls,New York,USA,2009
9 Zhang Z,Gu Y,Ye F,et al.A hybrid approach to high availability in stream processing systems.Proceedings of the 30th International Conference on Distributed Computing Systems(ICDCS’10),Washington,DC,USA,2010
10 Sebepou Z,Magoutis K.CEC:continuous eventual check pointing for data stream processing operators.Proceedings of 2011 IEEE/IFIP 41st International Conference on Dependable Systems Networks(DSN),Hong Kong,China,June 2011
11 Sebepou Z,Magoutis K.Scalable storage support for data stream processing.Proceedings of the 26th Symposium on Mass Storage Systems and Technologies(MSST),Incline Village,Nevada,May 2010
12 Gulisano V, Jimenez-Peris R, Patino-Martnez M, et al.StreamCloud:an elastic and scalable data streaming system.IEEE Transactions on Parallel and Distributed Systems,2012,23(12):2351~2365