陳羽中,郭松榮,郭 昆,李國輝,林魏超
1(福州大學 數學與計算機科學學院,福州 350116) 2(福建省網絡計算與智能信息處理重點實驗室,福州 350116) 3(海西政務大數據應用協同創新中心,福州 350003) 4(空間數據挖掘與信息共享教育部重點實驗室,福州 350002)
近年來,隨著云計算、物聯網的快速發展,以智能傳輸系統、用電實時采集系統、網絡流量監測等為代表的實時數據流應用日益增多.海量數據的實時產生,使面向海量數據流的數據分析與挖掘成為數據挖掘研究的重要內容.隨著電網技術的發展,用電信息采集系統智能化電表的出現,用戶用電信息采集的頻率更加頻繁,智能終端接入量不斷增加.面對如此巨大的用戶量和數據,如何快速有效地進行分析挖掘已經成為建設智能電網的一項重要研究課題[1].
數據流是一種由數據項構成的序列,其具有以下特征:①動態性,數據項隨著時間的變化而不斷變化;②時效性,數據項的信息表達的是當前時間的狀態;③瞬時性,數據流的無限性和連續性使得當前的處理機沒辦法處理完整的數據流信息,無限的數據流沒法全部被物理機保存起來;④無限性,數據項隨著時間的推移不斷到達,數據量的增長沒有限制[2].傳統的聚類算法已經無法適應這種新型的海量的數據流環境,海量的數據流對聚類算法提出了新的挑戰[3,4]:數據只能訪問一次;數據具有時效性,需要能夠在有限的時間內對數據進行聚類;數據可能會隨著時間的推移而發生變化,需要考慮數據概念漂移的問題;要求具有處理離群點的能力等.因此對于數據流聚類算法的研究已經在學術界和工業界得到了廣泛的關注.
目前,數據流聚類算法研究已經取得不少成果, Aggarwal等提出了一個優秀的解決數據流聚類的CluStream算法[5],該算法首次提出了數據流處理的兩個階段,分別是在線微簇聚類和離線的宏簇聚類過程.朱蔚恒等提出了一種基于密度與空間的ACluStream算法,能夠進行任意形狀的聚類[6],但其在處理不屬于已有聚類塊的新數據點誤差較大.張建朋等利用改進的WAP將新檢測到的類的模式合并到聚類模型中,提出一種具有時態特征與近鄰傳播思想的數據流聚類算法TCAPStream[7],但難以適應海量大數據.邢長征等基于近鄰傳播思想提出了一種基于近鄰傳播與密度相融合的進化數據流聚類算法[8].陳晉音等提出了一種面向混合屬性數據流的基于密度的聚類算法研究方法[9].孫力娟等提出了一種基于權值衰減的數據流模糊微簇聚類算法[10].Zhang等提出了一種采用模糊聚類算法的數據流聚類算法,其聚類的結果克服硬聚類的缺點,能反應對象與類之間的實際關系[11].
本文提出一種新的數據流聚類算法DACluStream(Dynamic Adjustment CluStream),改進了CluStream聚類框架的在線微簇刪除、合并機制,同時對于CluStream不能夠增加在線微簇的個數的限制進行修改,可以根據在線微簇的情況進行動態的增加微簇個數.此外,還借助并行實時計算框架Spark Streaming實現DACluStream算法,使其可以適應于海量大數據的實時處理.本文后續章節安排如下:第2節介紹了CluStream算法基本思想、CluStream存在的缺陷以及主流的并行實時計算框架;第3節介紹了本文所提的算法的具體內容;第4節介紹了本文所做的相關實驗與結果;最后部分對本文所做的內容進行總結并提出未來的研究方向.
CluStream算法有兩個主要的處理階段程,分別為在線的微簇聚類和離線宏簇聚類兩個部分.其算法的主要包含以下幾個內容:
1)初始聚類:當數據量達到一定規模的時候進行初始的在線微簇聚類;
2)在線微簇更新:對于新到達的數據,判斷其是否屬于已有的微簇.若屬于則更新原有的微簇模型,否則作為一個新的微簇點替換掉原有比較早的微簇或合并兩個比較近的微簇,進而更新在線的微簇信息,并利用金字塔時間的存儲結構存儲在線微簇信息.
3)離線宏簇聚類:離線部分主要是使用一個宏簇的聚類過程,對在線聚類得到的微簇進行再次的聚類得到宏簇的聚類,同時根據用戶的要求進行聚類結果的分析.
CluStream提供一個解決數據聚類的框架,但是也存在一些問題.
在線微簇的聚類的結果的好壞會影響到用戶離線部分的宏簇聚類.CluStream在線聚類部分設定的微簇的個數q是固定不變的,當有新的微簇形成的時候,如果沒有可以刪除的微簇,會合并兩個最近的微簇,但隨著時間的推移,這樣會把原來兩個最近的不相關的微簇合并在一次,造成較大的誤差.如圖1所示,微簇1到微簇5的微簇中類別區分度比較明顯,如果為了合并而將兩個比較近的簇2和4合并的話,將會使合并之后的微簇變得稀疏.
面對海量的數據,單臺處理機已經難以適應,需要將數據分發到更多的處理機進行分布式并行計算.主流的并行數據處理框架有Hadoop[12]和Spark[13]等.而主流的并行實時計算框架有Storm、Spark Streaming等.
Storm通過設計一個用于實時計算的拓撲(topology),整個拓撲由stream,spout和bolt組成[14].Spark Streaming是Spark核心API的一個擴展,其接收實時流的數據,并將輸入數據流以時間片(秒級)為單位進行拆分,然后通過Spark Engine處理,每一批的數據都轉成Spark中的RDD(Resilient Distributed Datasets,RDD)進行處理[15].
針對2.2節所述CluStream算法存在的問題,可以在以下幾個方面進行改進:
1)根據在線微簇情況設計一種動態添加微簇個數的方法;
2)為了保持微簇的數量對CluStream的微簇刪除機制進行調整,定義時態特征,使用衰減系數對微簇里的數量進行隨著時間的推移而衰減.
3)將算法應用到并行實時計算框架中,使其能夠處理海量大規模實時數據.
為了更好的描述數據點對在線微簇的貢獻程度,提出微簇時態密度特征的概念,基于微簇的時態密度特征提出了新的在線微簇刪除、合并和增加機制.
數據點對于微簇的聚類的貢獻程度隨著時間的變化而變化,越早的數據對微簇的貢獻程度越低,相反越近的數據對微簇的貢獻程度越大[16].
微簇時態密度特征描述的是數據點對微簇的貢獻程度,新的數據點到達時,時態權重為1,之后隨著時間的推移而進行指數的衰減.若一個微簇一直有新的數據到達,則其時態密度(權重之和)會越來越大,反之其時態密度會逐漸減少.因此時態密度特征能夠直觀的反映一個微簇隨著時間的推移對整體的微簇產生的重要性,如果其一直未能有新的數據到達以更新該微簇信息則可以將該微簇安全的刪除掉.

W(pj,tk+1)=2-λ(tk+1-tpj)=2-λ(tk+1-tk+tk-tpj)
2-λ(tk+1-tk)2-λ(tk-tpj)=2-λ(tk+1-tk)W(pj,tk)
(1)
又假設t1時刻新到達的數據量為Δn則有:
(2)
由公式(2)可知,對于任意的微簇i其時態密度由兩部分組成,一部分為之前存在的原有的數據的衰減后得到的密度,另一部分為剛到達的該微簇i的新的數據的密度.同時這種增量的計算方式,能夠有效的避免每次重新計算所有點的密度,進而有效提高了計算的速度.
3.2.1 微簇刪除
根據定義1可知,微簇時態密度特征,可以表示數據點對微簇的貢獻情況.當一個微簇一直未能有新的數據點到達,可見其是一個比較早的微簇,可以將該微簇安全的刪除,以防止無限的增加微簇的數量.
定義2.設tk時刻新到達的數據量為Δn,在線微簇的數量為q,刪除的閾值為δ,微簇i的時態密度之和為D(i,tk),則D(i,tk)<(Δn*δ/q)時,則可以安全的將微簇刪除.
3.2.2 微簇的合并
定義3.設當前在線微簇中,微簇i和微簇u的簇中心的距離最近為dmini,u,微簇i的最大覆蓋范圍為dic,微簇的u的最大覆蓋范圍為duc,則當(dic+duc)>dmini,u時可以將微簇i和微簇u進行合并為一個新的微簇e,否則增加一個新的微簇.
3.2.3 微簇的增加
定義3表明兩個相對較近的微簇的距離在可控的覆蓋范圍內是較近的,將兩者進行合并并不會使其覆蓋范圍擴大得比較大,可以解決CluStream中存在的問題.同時為限制在線微簇個數q的無限增長設定一個閾值θ,當在線微簇的數量達到θq時停止微簇的增長.
基于前面的討論,給出改進的DACluStream算法的主要步驟如下:
3.3.1 初始化微簇聚類(initKmeans)
當數據量達到minInitPoints初始聚類的最小數據量的時候開始進行在線微簇的初始化.
3.3.2 在線微簇更新(updateMicroClusters)
根據已有的在線微簇模型,對新到達的數據流信息進行在線微簇的更新,根據3.2的微簇刪除、合并與增加機制對微簇信息進行更新.
3.3.3 離線宏簇聚類(offlineMacroClusters)
在線得到的numQ個微簇,根據用戶的需求進行k個宏簇的聚類,并對結果進行分析.
函數1.initKmeans
輸入:currentN當前ti時刻到達的數據量,q初始的微簇的數量,minInitPoints初始聚類最小的數據量,minInitPoints為初始聚類最小的數據量
輸出:初始聚類得到的模型
1.IF(sumOfN 2.Wait for data;//等待新的數據的到達 3.sumOfN=sumOfN+currentN;//累加新到達的數據 4.ELSE 5.使用sumOfN所有的數據進行Kmeans模型iter的迭代構建; 6.END IF 函數2.updateMicroClusters 輸入:聚類模型,currentRdd當前到達的數據 輸出:新的聚類模型 1.計算Rmsd作為微簇的覆蓋范圍; 2.初始化離群點集合OutList; 3.初始化可以歸并的集合AddList; 4.sumOfN+=currentRdd.count; 5.currentRdd.Map( 6.FOR(微簇i <-所有微簇)DO 7.計算數據點Pj到微簇i的簇中心的距離; 8.找到距離最近的微簇d,最近的距離為Distj,d; 9.END FOR 10.IF(Distj,d>=Rmsdd) THEN 11.OutList.add(Pj);//數據點Pj不能歸入已有的微簇中; 12.ELSE 13.AddList.add((d,Pj));//數據點Pj可以歸入已有的微簇d中; 14.END IF) 15.遍歷AddList更新已有的微簇的簇中心; 16.初始化可以安全刪除的微簇DeleteList; 17.初始化待合并的微簇MergeList; 18.IF(OutList不為空) THEN 19.FOR(微簇i<- 所有微簇) DO 20. 根據定義2計算微簇i是否可以刪除; 21. IF(微簇i可以刪除) THEN 22.DeleteList.add(i); 23. ELSEMergeList.add(i); 24. END IF 25.END FOR 26.初始化新的簇的集合newMCList;// 用于存儲新的不可歸并的數據 27.OutList.map(//將離群點q和新的微簇newMC進行比較,看能否歸入到新的微簇中 28.IF(新的微簇為null)THEN 29. IF(DeleteList!= null)THEN 30.將離群點p作為一個新的微簇,并計算其Rmsd; 31.DeleteList.remove(0); 32. ELSE//合并最近的兩個微簇 33.將離群點p作為一個新的微簇,并計算其Rmsd; 34.根據定義3判斷是否兩個微簇可以合并或者增加新的微簇; 35.END IF 36.ELSE 37.FOR(微簇i<- 所有新建的微簇) DO 38. 計算數據離群點p到微簇i的簇中心的距離; 39. 找到距離最近的微簇d,最近的距離為Distp,d;} 40. IF(Distp,d<=Rmsdd) THEN 41. 更新微簇d的微簇信息; 42. ELSE 43. IF(新的微簇為null)THEN 44. IF(DeleteList!= null)THEN 45.將離群點p作為一個新的微簇,并計算其Rmsd; 46.DeleteList.remove(0); 47.ELSE{//合并最近的兩個微簇 48. 將離群點p作為一個新的微簇,并計算其Rmsd; 49.根據定義3判斷是否兩個微簇可以合并或者增加新的微簇; END IF 50. END IF 51. END IF 52. END FOR 53. END IF ) 54.END IF 函數3.offlineMacroClusters 輸入:在線得到的numQ個微簇,numPoints用于聚類的數據,k聚類類別的個數 輸出:離線聚類得到的模型 1.FOR(微簇i<-numQ個微簇)DO 2.Weighti=Ni/sumOfN;//微簇i的權重,Ni表示微簇i里的數據量的個數 3.END FOR 4.從numQ個微簇中根據其權重有放回的抽取numPoints的數據點作為points,權重越大選中的概率越大; 5.從numQ個微簇中根據其權重無放回的抽取k個數據作為初始種子點seeds,權重越大選中的概率越大; 6.使用points和seeds構建Kmeans的offIters迭代聚類; 7.輸出聚類的模型. 函數initKmeans的時間復雜度主要是在微簇的在線Kmeans聚類上,聚類的個數q、數據量的大小minInitPoints和迭代次數iter,其時間復雜度為O(q×minInitPoints×iter). 函數updateMicroClusters因為每個時間片到達的數據量n遠大于已有的微簇的個數numQ,所以易知其時間復雜度O(numQ×n). 函數offlineMacroClusters3的時間復雜度在于宏簇的Kmeans聚類上,聚類個數k,數據量大小numPoints,迭代次數offIters,易知其時間復雜度為O(numPoints×k×offIters). 根據前面所述DACluStream算法主要分為在線微簇更新和離線宏簇聚類兩個部分,兩個部分可以獨立分開的.在在線微簇更新部分算法時間復雜度為O(numQ×n),而在離線宏簇部分算法復雜度為O(numPoints×k×offIters). 實驗數據主要分為兩個部分分別為人工數據集和真實數據集.其中人工數據集為51673條記錄規模,具有5個維度的30個類別的服從高斯分布的數據集且生成的數據大小介于[0,1]之間.真實數據集來自于某電力公司提供的某省的部分電力負荷數據達到528628條記錄規模,其中包含用戶每天24個小時的電力負荷值.實驗的硬件環境為9臺Ubuntu系統的虛擬機組成的集群,每臺虛擬機的配置為:2個核心,16G內存和100G的硬盤大小.軟件環境為Ubuntu 版本號,JDK 7.0,Spark 版本號為1.5.2.實驗發現minInitPoints取不同值對聚類精度影響很小,因此在實驗中統一取minInitPoints=3000,實驗的數據流速度設置為3000條/時間片,衰減速度 ,為了保障在線微簇數據的完整性設置刪除閾值 ,同時因在線微簇的初始聚類數量(人工數據集q=150,真實數據集q=100)比較大,為限制其變化太多影響聚類效果,設置自動增長的. 對于聚類質量的評估主要使用的是SSQ(sum of square distance),數據點Pj到聚類i的類中心Ci的距離記作 ,計算當前時間批次內到達的所有數據點到其最近的簇中心的距離 的平方和為該時間片的SSQ如公式(3)所示. (3) 其中n為聚類的個數,ni為屬于聚類i的數據點個數,SSQ是一種常用的評價K-劃分聚類質量的方法,該值越小說明聚類的質量越好[6]. 為了防止實驗誤差,本文通過多次實驗取平均值作為實驗結果.實驗中都是取20次的實驗結果的平均值作為實驗結果,人工數據集平均SSQ如表1、圖2所示,電力數據集實驗結果如下頁表2、圖3所示. 表1 人工數據集平均SSQTable 1 Average SSQ of artificial data sets 4.3.1 人工數據集上的實驗結果 從圖2可以發現,在人工數據集上,本文所提的DACluS-tream算法聚類得到的質量相對于CluStream算法聚類得到的質量有顯著性的提高,DACluStream的SSQ相對于CluStream的SSQ降低了4倍左右,說明DACluStream聚類得到的結果的各個簇內數據比較緊密,聚類質量高.這是因為算法在在線微簇更新部分,對微簇的刪除、合并做了修改,且算法可以動態添加微簇的個數,使各個微簇之間的數據更加緊密,聚類效果更好.且DACluStream算法聚類得到的SSQ的波動性比較小,說明DACluStream算法具有較好的穩定性,當在線微簇數達到一定程度時,所有新到達的數據能夠歸入到已有的微簇,使其微簇內整體微簇個數達到穩定的時候,穩定性變高. 圖2 人工數據集平均SSQFig.2 Average SSQ of artificial data sets 4.3.2 真實數據集上的實驗結果 從圖3中可以發現,在真實用電數據集上,本文所提的DACluStream算法聚類得到的質量比CluStream算法聚類得到的質量高,隨著時間片的推移DACluStream聚類得到的結果都比CluStream聚類得到結果質量高.DACluStream算法能夠根據在線微簇的情況動態添加微簇的個數,提出了新的微簇的刪除、合并機制,使各個微簇里的數據更加緊密,提高聚類質量. 表2 真實數據集平均SSQTable 2 Average SSQ of real data sets 圖3 真實數據集平均SSQFig.3 Average SSQ of real data sets 通過實驗可以發現,本文所提的DACluStream聚類算法,能夠在在線微簇更新過程中,得到一個更好的微簇,使其在離線宏簇聚類的時候得到一個更好的聚類結果. 面對數據流實時到達的特性,如何有效的對數據進行聚類分析具有重要的意義.CluStream是一個優秀的處理實時聚類框架,但其存在一定的缺陷.本文提出一種改進CluStream的DACluStream聚類算法,使用微簇時態密度之和對微簇的有效性進行描述,算法根據在線微簇的聚類情況進行動態的微簇添加操作,通過實驗分析對比,可以發現本文所提的DACluStream能夠得到更好的聚類質量.同時通過應用Spark Streaming實現算法的并行化擴展,能夠適應于如今海量大數據挖掘的需求.接下來,將進一步在使算法能夠處理更多類型的數據流,如非數值型的數據流,同時引入更多的優化策略,進一步提升算法的精度等方面展開研究. [1] Jing Jie-feng,Li Shan-shan.Research and development of real-time data mining system in power network[J].Hebei Electric Power,2007,(S1):33-35. [2] Xu Fei.The research of real-time processing based on big data stream[D].Wuxi:Jiangnan University,2015 . [3] Hassani M,Spaus P,Gaber M M,et al.Density-based projected clustering of data streams[C].International Conference on Scalable Uncertainty Management,Springer Berlin Heidelberg,2012:311-324. [4] Brief A,Holmes G,Pfahringer B.MOA:massive online analysis,a framework for stream classification and clustering[C].JMLR:Workshop and Conference Proceedings,2010,11:44-50. [5] Aggarwal C C,Han J,Wang J,et al.A framework for clustering evolving data streams[C].Proceedings of the 29th International Conference on Very Large Data Bases-Volume 29,VLDB Endowment,2003:81-92. [6] Zhu Wei-heng,Yin Jian,Xie Yi-huang.Arbitrary shape cluster algorithm for clustering data stream[J].Journal of Software,2006,17(3):379-386. [7] Zhang Jian-peng,Chen Fu-cai,Li Shao-mei,et al.Online clustering of evolution data stream based on affinity propagation clustering[J].Pattern Recognition & Artificial Intelligence,2014,27(5):443-451. [8] Xin Chang-zheng,Liu Jian.Evolutionary data stream clustering algorithm based on integration of affinity propagation and density[J].Journal of Computer Applications,2015,35(7):1927-1932. [9] Chen Jin-yin,He Hui-hao,Yang Dong-yong.Density-based heterogeneous data stream clustering algorithm[J].Journal of Chinese Computer Systems,2016,37(1):43-47. [10] Sun Li-juan,Chen Xiao-dong,Han Chong,et al.New fuzzy-clustering algorithm for data stream[J].Journal of Electronics & Information Technology,2015,37(7):1620-1625. [11] Zhang B,Qin S,Wang W,et al.Data stream clustering based on fuzzy c-mean algorithm and entropy theory[J].Signal Processing,2016,126(2):111-116. [12] Xie Gui-lan,Luo Sheng-xian.Study on application of MapReduce model based on Hadoop[J].Microcomputer & Its Applications,2010,29(8):4-7. [13] Karau H,Konwinski A,Wendell P,et al.Learning spark:lightning-fast big data analysis[M]." O′Reilly Media,Inc.",2015. [14] Toshniwal A,Taneja S,Shukla A,et al.Storm@ twitter[C].Proceedings of the 2014 ACM SIGMOD International Conference on Management of Data,ACM,2014:147-156. [15] Liu X,Iftikhar N,Xie X.Survey of real-time processing systems for big data[C].Proceedings of the 18th International Database Engineering & Applications Symposium,ACM,2014:356-361. [16] Yang Ning,Tang Chang-jie,Wang Yue,et al.Clustering algorithm on data stream with skew distribution based on temporal Density[J].Journal of Software,2010,21(5):1031-1041. 附中文參考文獻: [1] 景杰峰,李姍姍.電網實時數據挖掘系統的研究與開發[J].河北電力技術,2007,(S1):33-35. [2] 徐 飛.大數據流的實時處理研究[D].無錫:江南大學,2015. [6] 朱蔚恒,印 鑒,謝益煌.基于數據流的任意形狀聚類算法[J].軟件學報,2006,17(3):379-386. [7] 張建朋,陳福才,李邵梅,等.基于仿射傳播的進化數據流在線聚類算法[J].模式識別與人工智能,2014,27(5):443-451. [8] 邢長征,劉 劍.基于近鄰傳播與密度相融合的進化數據流聚類算法[J].計算機應用,2015,35(7):1927-1932. [9] 陳晉音,何輝豪,楊東勇.一種面向混合屬性數據流的基于密度的聚類算法研究[J].小型微型計算機系統,2016,37(1):43-47. [10] 孫力娟,陳小東,韓 崇,等.一種新的數據流模糊聚類方法[J].電子與信息學報,2015,37(7):1620-1625. [12] 謝桂蘭,羅省賢.基于Hadoop MapReduce模型的應用研究[J].微型機與應用,2010,29(8):4-7. [16] 楊 寧,唐常杰,王 悅,等.一種基于時態密度的傾斜分布數據流聚類算法[J].軟件學報,2010,21(5):1031-1041.3.4 復雜性分析
4 實 驗
4.1 實驗數據及環境配置
4.2 評價指標
4.3 實驗結果與分析




5 結 論