王寶軍 詹英
摘要: 對于許多應用領域不斷產生的數據流,面向數據流聚集查詢的應用最為廣泛。本文在構造壓縮桶的基礎上,提出了基于時間維度壓縮數據流的算法,來動態地形成壓縮數據流,并進一步給出了使用壓縮桶獲得數據流聚集查詢的數學方法。
關鍵詞: 數據流; 壓縮桶; 聚集查詢; 時間維度
中圖分類號:TP393文獻標識碼:A 文章編號:1006-8228(2012)04-29-03
Aggregate compression algorithm for data stream
Wang Baojun, Zhan Ying
(Zhejiang Institute of Communications, Hangzhou, Zhejiang 311112, China)
Abstract: In many fields, data stream continues to grow in terms of generation speed. Aggregate query for data stream was most widely used. By constructing compression buckets, the authors provides in this paper a compression algorithm for data stream based on time dimension, in order to dynamically form compression data stream, and give mathematical method of aggregate query for data stream, by use of compression buckets.
Key words: data stream; compression buckets; aggregate query; time dimension
0 引言
數據流是隨著網絡的廣泛應用而出現的一種新的數據形式。數據流聚集查詢是數據流管理與知識發現系統中一種重要的數據知識發現模型,但快速流動的流數據與有限的處理能力之間的矛盾使得流數據的聚集查詢分析比關系數據庫的聚集分析更困難。
目前國內外已經對數據流聚集查詢模式展開了研究。Dobra A等人研究利用隨機草圖技術,提取數據流的輪廓,減少數據的處理量來加快數據處理速度,并提出了一種草圖分割技術來提高算法的性能[1]。Gilbert A C等人研究采用小波技術對數據流進行壓縮,實現了近似聚集查詢[2]。Madden研究了傳感器網絡中的聚集查詢問題,重點是如何動態地建立路由樹,實現流水線聚集操作[3,4]。Ahnad Y提出了數據流查詢的分布式操作[5]。張冬冬等人提出了一種新的數據流傳輸方式,有效地減少網絡中分布式數據流的傳輸量[9]。傅鸝等人建立了基于數據流驅動的數據流連續查詢模型,設計并使用查詢算子在查詢鏈中的有序組合來構造出各種復雜的連續查詢語句[7]。李建中等人提出利用多元線性回歸方法來預測具有線性關系的數據流的未來聚集值,但如果數據不具有線性關系,該模型誤差就會增大[10]。
以上的數據流聚集查詢相關算法采用近似聚集、壓縮數據流等技術來提高查詢速度。由于數據流的“流”性和隨機性,使得流量的變化具有突發性,然而,商業活動中,普遍要求能夠實時地檢索面向數據流的聚集查詢結果,并獲得更高的準確率。
1 數據流壓縮
1.1 相關問題描述
數據流是一個以數據到達時間為戳的數據序列。流數據的聚集查詢分為預定義查詢(Predefined Query)和即席查詢(Real-time Query)兩類。預定義查詢主要針對數據流后續到來的數據計算查詢結果;而即席查詢是針對數據流中流過的所有數據。數據流源源不斷地流入系統,因此無法將所有數據流保存起來,為了獲得更為準確的即時查詢結果,在聚集查詢中,需要對數據流進行壓縮。由于數據流動態振蕩流動,面向數據流的數據流聚集查詢系統無法存儲所有流數據,而用戶有查詢分析過去與未來流數據的需求,因此需要不斷地壓縮數據流,來滿足用戶需求。壓縮后的數據流結構應該是簡單的,方便為用戶提供各類流數據聚集查詢,并能夠最大程度地反映原始流數據。壓縮后的數據流結構是對壓縮后的數據流的靜態特征的描述,它描述數據的內容和流數據之間的相互關系。
由于數據流連續無限地流動,數據流具有時間特征,因此可以在時間維度上壓縮數據流。本文采用基于對數尺度的時間傾斜框架模型[8]來壓縮數據流。面向數據流壓縮算法以增量的方式對壓縮數據流進行更新,從而提高數據流的壓縮速度,滿足數據流聚集查詢的實時性要求。用戶會根據需求向系統提出多種聚集查詢,這要求壓縮數據流盡可能地反映原數據流的信息。隨著時間的流逝,流過的流數據被不斷地壓縮,歷史流數據被不斷地拋棄。
1.2 相關定義
定義1. 設PT為時間分區長度。構造壓縮桶Buckcets(BuckcetsID=0…n),壓縮桶有三個抽屜drawer(drawerID=0…2),每個抽屜存放流數據的時間長度為2 BuckcetsID×PT。壓縮桶的結構如圖1所示。其中每個桶的2號抽屜是臨時存儲單元。如果0號抽屜是空的,則同一個桶的1號抽屜也空。
設i(i=0…n)為壓縮桶的編號, i號桶中的抽屜存儲流數據的時間長度為2i×PT。每個壓縮桶的第0號與第1號抽屜存放流數據,2號抽屜是臨時存儲空間,只有當這個桶中的第0號與第1號抽屜非空,此時只能將新流入的流數據臨時存放到2號抽屜,系統合并此桶的第0號與第1號抽屜,并推入下一桶后,新流入到2號抽屜的流數據被轉移到同一桶的0號抽屜。例如,第0號桶流入第3個PT時間長度的流數據,而第0號桶的第0號與第1號抽屜已經分別存儲了第1個和第2個PT時間長度,系統壓縮第0號桶的第0號與第1號抽屜,并將流數據推入第1號桶的第0號抽屜后,第3個PT時間長度的流數據才可以流入第0號桶的第0號抽屜。也就是說,桶號為i的流數據來源于桶號為i-1的桶,系統壓縮第i-1號桶的第0號與第1號抽屜,并將流數據推入第i號桶的第0號或第1號抽屜。壓縮桶間的數據壓縮與流動示意圖如圖2所示。
圖1壓縮桶的結構
圖2壓縮桶間的數據壓縮與流動示意圖
引理保存流數據的最大時間長度為LongTime,MaxBCount為保存LongTime時長的流數據所需壓縮桶的數量。則
⑴
證明:設m個桶最多可以存儲流數據的時間長度為MaxT(m),則
MaxT=(2×20+2×21+…+2×2m) ×PT
所以MaxT(m)=(2m+1-2)×PT
設m-1個桶最多可以存儲流數據的時間長度為MaxT(m-1),則
當時間長度LongTime滿足:
MaxT(m-1)<LongTime≤MaxT(m)
則存儲時間長度LongTime的流數據至少需要m個桶。所以:
證畢。
1.3 數據流壓縮算法
以商業零售實際業務數據流為例,本文將探索針對數據流的聚集查詢與壓縮方法。商業零售數據流結構如下:sale(ProductID,OrderQty),sale是超市商業零售數據流,ProductID表示產品編號,OrderQty表示訂貨量。用戶根據需求提交各類查詢,并請求實時獲得各類查詢結果。例如,系統根據用戶提交的產品號 ProductID,選擇相關產品進行壓縮。
定義初始數據流結構:
Datasourse(timestamp;productID;orderqty),timestamp記錄了流數
據到達的時間點。
定義壓縮后數據流的數據結構:
Compresssourse(starttime; productID; maxorderqty;minorderqty;sumorderqty; countorderqty),starttime表示壓縮的初
始時間; maxorderqty表示訂貨量的最大值;minorderqty表示訂
貨量的最小值;sumorderqty表示訂貨量的總和; countorderqty表
示訂貨次數。
算法1:數據流壓縮算法
輸入:初始數據流。
輸出:經過壓縮后的數據流存儲在桶中,每個抽屜存儲壓縮后的數據流。
定義桶的數據結構:
public struct buckets
{public compresssourse drawer0;
public compresssourse drawer1;
public compresssourse drawer2;}
根據需存儲的最大時間長度,計算需要的桶數MAXBcount;
定義桶DataS:
buckets[] DataS = new buckets[MAXBcount];
初始化桶中的所有抽屜;定義記錄時間長度的變量feng;定義時間分區PT;
While(true)
{根據用戶提交查詢的產品號ProductID 獲取原始數據流;
獲得產生數據流的當前時間;
if(接收的是第一個流數據)
{壓縮后直接推入0號桶2號抽屜,它的starttime為被推入流數據的
timestamp。接著進入下一循環等待下一個流數據;}
計算新流入的流數據的timestamp與0號桶2號抽屜的starttime相隔時間feng:
if (是同一個時間分區feng < PT)
{壓縮同一時間分區內的數據到0號桶2號抽屜;
回到循環開頭,繼續讀下一個數據;
continue;}
else
{ 記錄當前桶號碼;
while (DBcount < MAXBcount)
{if (桶0號抽屜有空)
{將桶2號抽屜的數據移到桶0號抽屜;
break;}
else
{if (桶1號抽屜有空)
{將桶2號抽屜的數據移到桶1號抽屜;
break; }
else
{if (不是最后一桶)
{將該桶的0號與1號抽屜合并后放入下一桶中的2號抽屜;
該桶的0號與1號抽屜變空;}
else
{丟棄該桶的0號與1號抽屜;}
合并后,該桶0號抽屜空出來,放入該桶2號抽屜的流數據; }}}
if (0號桶2號抽屜空))
{將新讀入的數據放入0桶2號抽屜;
重新設置starttime;}
else {break;}}}
2 獲得壓縮桶狀態的數學方法
當用戶向系統提出面向數據流的查詢請求時,系統首先判斷流數據被壓縮到哪些桶中,而壓縮流數據存儲了最大值、總和等聚集值,使得用戶獲得聚集值變得非常方便。
在壓縮過程的任意時刻,用戶均可能提出獲得流數據的聚集值,這要求系統能夠迅速判斷各個桶的狀態,也就是每個桶中的0號抽屜或1號抽屜是否存儲了壓縮數據。
假設j為最后流入桶中的時間分區流數據,求每個桶中含有數據的抽屜數。存儲第j個時間分區,需要BCount個桶。則:
,⑵
如果,BCount大于MaxBCount,則從MaxBCount+1到BCount號桶的流數據被丟棄。所以,
,j∈N+。
則,
,⑶
其中ai的取值僅為0或1,表示第i個桶中有ai+1個抽屜有流數據。ai=0表示0號抽屜存儲了壓縮流數據,ai=1表示0號與1號抽屜存儲了壓縮流數據。
例如j=33,表示持續流入數據流的時間長度為33×PT個時間長度。根據公式⑵,此時需要的桶數為5。根據公式⑶,得到33-25+1=2。則2=0×20+1×21+0×21+0×21+0×21,由此,我們可以得到壓縮桶的狀態為0號桶、2號桶、3號桶、4號桶的0號抽屜存儲了壓縮數據,1號桶的0號與1號抽屜存儲了壓縮數據。
3 結束語
本文提出了在時間維度上壓縮數據流的方法:不斷流入壓縮桶的流數據被不斷地以2為底的對數尺度進行壓縮。實驗表明,壓縮桶結構在滿足了壓縮數據的存儲需求的同時,大大減少了存儲空間,桶中的壓縮數據能夠隨著時間不斷地更新,基于時間傾斜的數據流壓縮算法能夠提高數據流的壓縮速度。能夠滿足數據流聚集查詢的實時性要求,也能夠提高數據流動態聚集查詢的效率及靈活性。
參考文獻:
[1] Dobra A,Garofalakis M,Gehrke J,et a1.Processing Complex Aggregate Queries over Data Streams[C].Proceedings of the 2002ACM SIGMOD International Conference on Management of Data,M acIison.W isconsin.2002.
[2] Gilbert A C,Kotidis M uthukrishnan S M ,et a1.Surfing Wavelets on Streams: One—pass Summaries for Approximate Aggregate Queries[C] .Proceedings of the 27th International conference on Very Large Data Bases.2001
[3] Madden S R,Franklin M J,Hellerstein J M ,et a1.TAG :A Tiny Aggregation Service for Ad—hoc Sensor Networks[C] .Proc.of the 5thSymp.on Operating Systems Design and Implementation,Boston,USA 2002.
[4] Madden S R.Szewczyk R.Franklin M J.et a1.Supporting Aggregate Queries Over Ad—hoc Wireless Sensor Networks[C].Proceedings of the Workshop on Mobile Computing and Systems Applications.Los Alamitos:IEEE Computer Press.2002.
[5] Ahnad Y,Berg B,Cetintemel U,et a1.Distributed operation in the borealis stream processing engine[C].Proc of ACM SIGMOD Conference.Baltimore:[s.n.],2005:882~884
[6] 詹英,吳春明,王寶軍.一種與緩沖區緊耦合的環形循環滑動窗口的數據流抽取算法[J].電子學報,2011.39(4):2262~2267
[7] 傅鸝,魯先志,蔡斌.一種基于數據流驅動的數據流連續查詢模型[J].重慶工學院學報(自然科學),2008.22(10)
[8] Jiawei Han,Micheline Kamber.Data Mining Concepts and Techniques[M].China Machine Press.
[9] 張冬冬,李建中,王偉平,等.分布式復式數據流的處理[J].計算機研究與發展,2004.41(10):1780~1785
[10] Li Jian -zhong,Guo Long-jiang,Zhang Dong-dong,et a1.Processing algorithms or predictive aggregate queries over data streams[J].Journal of Software,2005.16(7):1251~1261