馬 可,李玲娟,孫杜靖
(南京郵電大學 計算機學院,江蘇 南京 210003)
分布式并行化數據流頻繁模式挖掘算法
馬 可,李玲娟,孫杜靖
(南京郵電大學 計算機學院,江蘇 南京 210003)
為了提高數據流頻繁模式挖掘的效率,文中基于經典的數據流頻繁模式挖掘算法FP-Stream和分布式并行計算原理,設計了一種分布式并行化數據流頻繁模式挖掘算法—DPFP-Stream (Distributed Parallel Algorithm of Mining Frequent Pattern on Data Stream)。該算法將建立頻繁模式樹的任務分為local和global兩部分,并設置了參數“當前時間”;將到達的流數據平均分配到多個不同的local節點,各local節點使用FP-Growth算法產生該單位時間內本節點的候選頻繁項集,并按照單位時間將候選頻繁項集及其支持度計數打包發送至global節點;global節點按“當前時間”合并各local節點的中間結果并更新模式樹Pattern-Tree。在分布式數據流計算平臺Storm上進行的算法實現和性能測試結果表明,DPFP-Stream算法的計算效率能夠隨著local節點或local bolt線程的增加而提高,適用于高效挖掘數據流中的頻繁模式。
數據流;頻繁模式;分布式并行化;Storm
數據流是按時間順序到達的數據所組成的一個序列,其中的數據是動態的,數據量潛在無界、數據到達速率快。對此類數據的收集過程和挖掘過程是同時進行的,不允許反復掃描歷史數據,需要用一次掃描算法(single-scan algorithm)來處理[1]。
流數據的挖掘有分類、聚類、關聯分析等多種任務[2-4]。在流數據的關聯規則挖掘算法中,經典的FP-Stream算法實現了對流數據的頻繁模式挖掘。該算法將挖掘任務分為在線挖掘單位時間的候選頻繁項集與離線處理歷史頻繁項集兩個部分,通過傾斜時間框架存儲候選頻繁項集,并可以按照用戶輸入的參數查詢相應時間的頻繁項集[5-7]。
MapReduce是一種分布式計算框架,將一個算法抽象成Map和Reduce兩個階段進行處理,非常適合數據密集型計算,但是它是批處理的。Storm是一種典型的在線流式數據分布式計算架構,可以用來在線處理源源不斷流進來的數據,也可以通過設置滑動時間窗口等機制,在實時處理到達數據的同時,實現類似MapReduce的功能[8]。
文中首先基于經典數據流頻繁模式挖掘算法FP-Stream[9]和分布式并行計算的思想,設計了一種分布式并行化數據流頻繁模式挖掘算法(Distributed Parallel algorithm of mining Frequent Pattern on data Stream,DPFP-Stream)。接著,考慮到將流挖掘算法部署到流平臺上運行是算法實用化的前提,進一步基于Storm集群進行了DPFP-Stream算法的實現。為了評價該算法的性能,設計了線程處理壓力測試實驗,并分析了實驗效果。
FP-Tree(頻繁模式樹)是FP-Growth算法建立的一種數據結構,雖然它不能直接用于數據流的關聯規則挖掘,但通過對FP-Tree加以改進,可以將其運用在數據流上[10-11]。基于此思想,Giannella.C等提出了FP-Stream模型,將頻繁模式挖掘算法分為挖掘單位時間頻繁項集與記錄各時間段頻繁項集兩個部分[12]。第一部分設置了參數最大支持度誤差(該值小于挖掘頻繁項集的最小支持度),使用FP-Growth算法對單位時間內的數據進行挖掘,挖掘出支持度大于支持度誤差的項集即候選頻繁項集供第二部分處理;第二部分以FP-Tree為基礎,引入傾斜時間窗口[13]建立Pattern-Tree,用來記錄不同時間粒度的頻繁項集中間結果;算法對外設置了接口供用戶輸入參數,用戶可以自由地設置最小支持度、置信度與查詢時間,根據不同時間段參數方便地查詢頻繁項集及關聯規則。
圖1給出了FP-Stream算法在一個單位時間內的處理流程。

圖1 FP-Stream算法單位時間內的處理流程
2.1 基本思想
FP-Stream算法無法直接運用于分布式環境,因為當到達的數據流速過快,算法第一部分(用FP-Growth算法挖掘單位時間內的候選頻繁項集)無法快速產生結果,為了提高挖掘速度必須提高最大支持度誤差值,但這會影響挖掘精度。
針對這個問題,文中設計了DPFP-Stream算法。其基本思想是:將挖掘任務分為local和global兩大部分,相應地,設置多個local節點和一個global節點,local節點為局部計算節點,global節點為全局合并節點。到達的流數據平均分配到不同的local節點,各local節點使用FP-Growth算法產生該單位時間內本節點的候選頻繁項集,按照單位時間將候選頻繁項集及其支持度計數打包發送至global節點;global節點合并各local節點的中間結果并發送至Pattern-Tree。此外,設置參數“當前時間”來保證被合并數據在時間上的對應性。
2.2 候選頻繁項集的分布式并行化挖掘
候選頻繁項集的挖掘以分布式并行化方式進行,到達的數據平均分配到各個local節點,每個節點設置一個緩存,接收一個單位時間的流數據,當接收時間到達一個單位時間,對這一塊數據建立FP-Tree,根據算法設定的最大支持度誤差閾值,找到該單位時間內的候選頻繁項集(支持度大于最大支持度誤差的項集),并將其按照時間打包發送至global模塊進行處理。圖2為基于單位時間內的數據生成FP-Tree的流程。

圖2 建立單位時間內的FP-Tree的流程
建立了單位時間內的FP-Tree之后,使用經典的FP-Growth算法[14]挖掘該棵FP-Tree中支持度大于最大支持度誤差的頻繁項集,即候選頻繁項集,具體過程可以描述如下:
輸入:待挖掘的FP-Tree;
輸出:所有的頻繁項集。
步驟:
遞歸地挖掘每個條件FP-Tree,累加后綴頻繁項集,直到找到FP-Tree為空或者FP-Tree只有一條路徑,首先調用FP-Growth(Tree,null)。
過程FP-Growth (Tree,x)可以描述如下:
procedureFP-Growth(Tree,x)
ifTree含單個路徑P
then{
for路徑P中節點的每個組合(記作b)
產生模式b∪a,其支持度support為b中節點的最小支持度;
}
else{
foreachai在Tree的頭部(按照支持度計數由低到高順序進行掃描)
{
產生一個模式b=ai∪a,其支持度support=ai.support;構造b的條件模式基(即順著headertable中item的鏈表,找出所有包含該item的前綴路徑,這些前綴路徑就是條件模式基),然后構造b的條件FP-Tree,即Treeb;
ifTreeb不為空
then調用FP_Growth(Treeb,b);
}
}
local節點在生成一個單位時間內的候選頻繁項集之后,將該單位時間內的所有頻繁項集與記錄總數、當前時間一起打包發送至global節點進行合并。
2.3 分布式并行化挖掘結果的合并
由于處理能力會有所不同,各local節點處理生成中間結果并發送至global節點的速度可能不一致,這使得global節點會錯誤地將不同時間段的中間結果合并至相同時間段。為了防止此類情況的發生,DPFP-Stream算法對global節點與各local節點設置了參數“當前時間”,global節點依據各local節點發送的“當前時間”對中間結果進行合并。
global節點設置了閾值threshold,其作用是控制Pattern-Tree的合并。global節點合并一個local節點的中間結果至Pattern-Tree的過程可以描述如下:
輸入:全局Pattern-Tree,單個local節點的中間結果MR,合并閾值threshold;
輸出:合并后的全局Pattern-Tree。
步驟:
比較MR與Pattern-Tree的當前時間;
if MR.time==PatterTree.time
then{
for each frequent item in MR
{
在Pattern-Tree中找到相應節點,將支持度計數加入該節點的第一塊時間窗口(若Pattern-Tree中無相應節點,則新建節點插入相應信息);
}
將該單位時間內記錄總數加入Root節點的第一塊時間窗口;
}
else if MR.time==PatternTree.time+1
then{
for each frequent item in MR
{
在Pattern-Tree中找到相應節點,將節點內窗口的數據向后滑動,并將支持度計數加入該節點的第一塊時間窗口(若Pattern-Tree中無相應節點,則新建節點插入相應信息),Root節點內窗口的數據向后滑動,并將記錄總數加入Root節點的第一塊時間窗口;
}
Pattern-Tree.time+1;
}
else if MR.time then{ 依據PatternTree.time-MR.time,找到該事件對應到傾斜時間窗口的具體位置,對MR中的所有頻繁項集,在Pattern-Tree中進行更新; } else{ 將Pattern-Tree當前時間發送至local節點,更新local節點當前時間,使之與global節點一致; } 3.1 Storm系統 Storm[15]是Twitter支持開發的一款分布式的、開源的、實時的、主從式大數據流式計算系統,是一種典型的流式數據計算架構,數據在任務拓撲中被計算,并輸出有價值的信息。 任務拓撲是Storm的邏輯單元,一個實時應用的計算任務將被打包為任務拓撲后發布,任務拓撲一旦提交后就會一直運行,除非顯式地去中止。一個任務拓撲是由一系列Spout和Bolt構成的有向無環圖,通過數據流實現Spout和Bolt之間的關聯。如圖3所示,Spout負責從外部數據源不間斷地讀取數據,并以元組形式發送給相應的Bolt,Bolt負責對接收到的數據流進行計算,可以級聯,也可以向外發送數據流。 圖3 Storm拓撲示例 3.2 DPFP-Stream算法在Storm上的部署 基于Storm流計算框架的編程模型,文中設計了DPFP-Stream算法在Storm上的部署方案。 圖4為DPFP-Stream算法在Storm上的拓撲示意圖。 圖4 DPFP-Stream的Storm拓撲圖 如圖4所示,Kakfa[16]作為消息中間件,接收用戶發送的配置參數與查詢參數,發送至InputSpout供后續計算;DataSpout接收待挖掘數據,將到達的數據打上時間戳標記并平均發送至各Local Bolt線程進行計算;InputSpout接收用戶輸入的配置參數與查詢參數,將所有參數(支持度、支持度誤差、置信度、查詢時間)發送至Global Bolt供挖掘計算與查詢,同時將參數最大支持度誤差發送至Local Bolt供生成候選頻繁項集;Local Bolt為算法在Storm上實現的并行化部分,按時間戳對單位時間內到達的數據使用FP-Growth算法挖掘候選頻繁項集;Global Bolt為算法在Storm上實現的合并部分,對各Local Bolt生成的中間結果進行合并,生成最新的Pattern-Tree;用戶可向系統輸入查詢參數查詢最新Pattern-Tree。 為了測試DPFP-Stream算法的分布式并行化效果,設計了如下實驗。 (1)實驗數據集與環境。 實驗數據集是預處理過的超市購物數據集,實驗中分別使用經典的FP-Stream算法與文中設計的DPFP-Stream算法對該數據集進行頻繁模式挖掘。 實驗環境:1個Nimbus節點、2個Supervisor的Storm集群,每臺機器內存8 GB,處理器為主頻2.70 GHz的i7處理器,操作系統為CentOS 6.4。使用Kafka作為消息中間件,設置一個producer每秒選取數據集中有特定關聯規則的數據,打上相應時間戳,按照每秒10 000條的速率發送至Kafka,算法的Storm拓撲從Kafka中獲取數據進行相應挖掘計算。支持度設為0.5,置信度設為0.8,經典FP-Stream算法的支持度誤差設為0.3,DPFP-Stream算法的支持度誤差設為0.1,線程數設為3。 (2)實驗結果與分析。 在FP-Stream算法與DPFP-Stream算法的對比方面:各算法分別處理完100萬條數據后,兩種算法的挖掘結果一致,而Storm系統的StormUI中顯示FP-Stream相應線程的capacity(線程處理壓力)為0.143,DPFP-Stream相應線程的平均capacity為0.113。這說明,盡管DPFP-Stream的參數支持度誤差設置的比較小,但是在得出一致的挖掘結果的同時,單個線程所承受的計算壓力反而減小了。 在DPFP-Stream算法的線程處理壓力隨算法參數設置和線程數的變化方面:實驗結果如圖5所示,在每秒到達拓撲的數據流速率為10 000條不變的情況下,無論支持度誤差為0.1或是0.3,DPFP-Stream算法的capacity都隨著Local Bolt線程個數的增加呈倒數減小,說明算法的處理能力可隨線程數的增加呈近線性增加;而當支持度誤差為0.1時,雖然線程處理壓力比支持度誤差為0.3時大,但是可以挖掘出更多的頻繁項集。 圖5 DPFP-Stream算法多線程測試結果 從圖中可以看出,DPFP-Stream算法能有效地降低計算處理壓力,并且不影響挖掘結果。此外,在線程處理壓力不變的情況下,由于DPFP-Stream算法可以設置更低的支持度誤差,故能在一定的情況下挖掘出FP-Stream算法挖掘不到的結果。 基于FP-Stream算法和分布式并行計算思想,文中設計了一種分布式并行化數據流頻繁模式挖掘算法(DPFP-Stream),并在流計算平臺Storm上進行了算法實現與性能測試。結果表明,該算法借助分布式并行化機制,能以更小的線程處理壓力獲得同樣的挖掘精度,也說明了文中對FP-Stream算法所做的基于Storm的分布式并行化工作的可行性和有效性。 [1] Li Lingjuan,Li Xiong.An improved online stream data clustering algorithm[C]//Proceedings of second international conference on business computing and global informatization.Shanghai,China:[s.n.],2012:526-529. [2] Gaber M,Zaslavsky A,Krishnaswamy S.Mining data streams:a review[J].SIGMOD Record,2005,34(2):18-26. [3] Han J,Kamber M,Pei J.Data mining:concepts and techniques[M].[s.l.]:Elsevier,2006:242-248. [4] 孫大為,張廣艷,鄭緯民.大數據流式計算:關鍵技術及系統實例[J].軟件學報,2014,25(4):839-862. [5] 孫玉芬,盧炎生.流數據挖掘綜述[J].計算機科學,2007,34(1):1-5. [6] Charikar M,Chen K,Farach-Colton M.Finding frequent items in data streams[C]//Proceedings of automata,languages and programming.Berlin:Springer,2002:693-703. [7] 李國徽,陳 輝.挖掘數據流任意滑動時間窗口內頻繁模式[J].軟件學報,2008,19(10):2585-2596. [8] Ma Ke,Li Lingjuan,Ji Yimu,et al.Research on parallelized stream data micro clustering algorithm[C]//Proceedings of ICCAET 2015.Zhengzhou,China:[s.n.],2015:629-634. [9] Giannella C,Han J,Pei J,et al.Mining frequent patterns in data streams at multiple time granularities[J].Next Generation Data Mining,2003,212:191-212. [10] 唐耀紅.數據流環境中關聯規則挖掘技術的研究[D].北京:北京交通大學,2012. [11] 劉學軍,徐宏炳,董逸生,等.挖掘數據流中的頻繁模式[J].計算機研究與發展,2015,42(12):2192-2198. [12] 程轉流,王本年.數據流中的頻繁模式挖掘[J].計算機技術與發展,2007,17(12):53-55. [13] Jin R,Agrawal G.An algorithm for in-core frequent itemset mining on streaming data[C]//Proceedings of fifth IEEE international conference on data mining.[s.l.]:IEEE,2005:210-217. [14] Han J,Pei J,Yin Y,et al.Mining frequent patterns without candidate generation:a frequent-pattern tree approach[J].Data Mining and Knowledge Discovery,2004,8(1):53-87. [15] Marz N.Storm:distributed and fault-tolerant realtime computation[EB/OL].2012.http://storm.apache.org. [16] Apache.Apache Kafka:a high-throughput,distributed,publish-subscribe messaging system[EB/OL].2015.http://kafka.Apache.org. Distributed Parallel Algorithm of Mining Frequent Pattern on Data Stream MA Ke,LI Ling-juan,SUN Du-jing (School of Computer,Nanjing University of Posts and Telecommunications,Nanjing 210003,China) In order to improve the efficiency of mining frequent pattern on data stream,a Distributed Parallel Algorithm of Mining Frequent Pattern on Data Stream,named DPFP-Stream,is designed in this paper based on the ideas of classical FP-Stream and the distributed parallel computing.It divides the task of building frequent pattern tree into two parts:local and global,and introduces a new parameter “current time”.The arrival data will be equally distributed into different local nodes.Then every local node uses FP-Growth algorithm to produce candidate frequent items,and packages them with relevant support count according to unit time,and sends them to the global node.The global node combines the results produced by local nodes according to the “current time” and updates the global Pattern-Tree.The results of implementing DPFP-Stream algorithm and testing its performance on Storm,a distribution data stream computing platform,show that the computing efficiency of DPFP-Stream can increase linearly with the increasing of local nodes or the local bolts,and DPFP-Stream is applicable to effectively mine frequent pattern from data stream. data stream;frequent pattern;distributed parallelization;Storm 2015-10-10 2016-01-20 時間:2016-06-22 國家自然科學基金資助項目(61302158,61571238);中興通訊產學研項目 馬 可(1991-),男,碩士研究生,CCF會員,研究方向為流數據挖掘、信息安全;李玲娟,教授,CCF會員,通訊作者,研究方向為數據挖掘、信息安全、分布式計算。 http://www.cnki.net/kcms/detail/61.1450.TP.20160621.1701.014.html TP311 A 1673-629X(2016)07-0075-05 10.3969/j.issn.1673-629X.2016.07.163 DPFP-Stream算法在Storm平臺上的實現


4 實驗與結果分析

5 結束語