莊 榮,李玲娟
(南京郵電大學 計算機學院,江蘇 南京 210023)
面對實時到達、連續、無限的流數據[1],傳統的數據挖掘算法難以滿足流數據挖掘的需求,因而流數據的分類挖掘[2]研究一直是熱門話題并且具有重大意義。概念適應快速決策樹算法(concept-adapting very fast decision tree,CVFDT)[3]是經典的流分類算法之一。CVFDT主要克服了VFDT(very fast decision tree)算法[4]對于數據樣本的不斷變化而不能更換模型的缺點,并且可以有效地解決概念漂移[5]的問題。與傳統的靜態大數據處理平臺Hadoop[6]不同,Spark[7]擴展了廣泛使用的MapReduce[8]模型,提出了基于內存的并行計算框架,通過將中間結果緩存在內存中以減少I/O磁盤操作,從而更高效地支持多次迭代式計算模式。為此,文中研究了基于Spark的CVFDT分類算法并行化,用以提高CVFDT算法對流數據的分類效率。
CVFDT屬于一種增量式的分類挖掘方法,即用新到樣本修正舊分類器,產生新分類器,以適應新環境。CVFDT在樹的所有節點上維持統計信息用于計算基于屬性值的信息增益測試,即統計測試,并基于Hoeffding不等式確定葉節點變成分支節點所需的樣本數目,對數據流建立分類決策樹。
以t為時間戳,xt表示t時刻到達的數據向量,數據流可表示為{…,xt-1,xt,xt+1,…}[9]。CVFDT算法的有關定義如下:


在拿到新的數據流樣本后,從上到下遍歷決策樹,并在樹的每個分支節點根據屬性取值等判斷進入不同的分支,最終到達樹的葉節點。隨著數據流樣本的不斷增多,信息增益測試為了滿足一定條件,其葉節點必須要以較高的置信度確定最佳劃分屬性,從而變成一個分支節點,這樣循環可以不斷地決策學習新的葉節點。如遇到概念漂移問題,CVFDT就會在相應分支節點上并行生成備選子樹,原子樹會隨著備選子樹的精度遠遠超過其本身時被替換和釋放。
Spark不同于Hadoop MapReduce的是,Job中間輸出結果可以保存在內存中,從而不再需要頻繁讀寫HDFS[12],可以顯著提高運行速度。Spark還提供了SparkSQL、Spark Streaming[13]、MLib[14]等計算模式組件,更適用于分布式平臺場景。Spark Streaming是構建在Spark上的處理Stream數據的框架,基本原理是將Stream數據分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部分數據。
彈性分布式數據集RDD[15]是Spark的核心和基礎。RDD是一種分布式的內存抽象,表示只讀的分區記錄的集合。它只能通過在穩定物理存儲中的數據集或其他已有的RDD上執行一些確定性操作(并行操作中的轉換操作)來創建,并且RDD僅支持粗粒度轉換,即在大量的記錄上執行單一的操作,因此省去了大量的磁盤I/O操作,對于需要多次迭代計算的機器學習算法、交互式數據挖掘來說,效率得到了極大地提升;同時它具有非常出色的容錯機制和調度機制,能夠有效確保系統的長時間穩定運行。所以Spark能夠更高效地支持交互式查詢以及迭代式計算等多種計算模式。
在CVFDT建樹過程中計算最佳分割點時,需要將Hoeffding邊界作為節點分裂條件找到最佳分割點,其首要任務是計算并比較各個屬性的最佳基尼分割指數。在面對含多種屬性類別的數據集時,線性串行的計算模式會大大降低運行效率。
因此,針對CVFDT算法的分割點計算過程,考慮到每個屬性的基尼分割指數求解是完全獨立計算,可以對這些計算進行同步,設計了如圖1所示的屬性間并行化方案。

圖1 針對分割點計算過程的屬性間并行化方案
如圖1所示,首先計算每個任務的最佳基尼指數和Hoedding邊界,從而找到每個任務的最佳分割點;然后在每個任務計算完成后進行比較,獲取最佳分割點。這種改進的計算模式可以有效地降低時間復雜度。
1.Spark的并行計算過程簡述。
RDD為了讓海量數據分散在不同的計算節點上進行并行處理,會橫向地拆分成N個分區,因此對RDD進行計算操作時,集群會對每個分區進行計算,然后由相應的集群控制器將結果進行匯總,最終統計整個RDD結果。因此,Spark的并行屬于“橫向”并行化。
2.針對建樹過程的基于Spark的橫向并行化。
在3.1提出的屬性間并行化的基礎上,基于Spark的橫向并行化,針對CVFDT算法的建樹過程做如下并行化改造:
(1)If(都是同類屬性的數值||獲取的屬性列表個數不超過閾值)
(2)將含有同屬性最多數值的類復制給節點N的Decision類并且返回;
(3)Else
(4)得到節點N的屬性列表AttList,將所有屬性列表轉化為對應的RDD;
(5)計算由每個RDD生成的并行化任務,匯總并比較每個最佳分割點;
(6)再計算Hoeffding邊界產生節點分裂條件,找到最佳分割點;
(7)在AttList中找到該分割點相應屬性的屬性列表并刪除,然后對其余屬性表進行分裂,得到屬性表Attlist1,Attlist2,…,AttlistN;
(8)新的子節點N1,N2,…,Nn由節點N生成,并將屬性列表Attlist1,Attlist2,…,AttlistN分別賦給N1,N2,…,Nn;
(9)執行buildTree(n1),buildTree(n2),…,buildTree(n)操作,遞歸建立決策樹。
選擇傳統單機CVFDT算法和基于Spark集群的CFVDT并行化算法對實驗數據集進行分類操作,算法運行環境是:
集群硬件環境:1個Master節點,2個Slave節點。
集群操作系統:centos6.6。
集群軟件環境:JRE1.7.0_13、Scala_2.11.6、Spark1.3.1。
單機環境:eclipse_4.5.0、JRE1.7.0_13、Windows7、2.13 GHz、4 GB內存。
目的是借助流數據處理平臺提高CVFDT算法的執行效率。為了驗證所設計的CVFDT算法基于Spark的并行化方案的可行性和有效性,對比了單機和集群并行環境下,CVFDT算法處理不同數據量所需的時間。
實驗使用的數據集源自Kaggle比賽,基于美國UGC網站Stumbleupon提供的歷史數據,設計分類模型,預測該網站提供的網頁是否長期流行。訓練集樣本數目為10 706個,測試集樣本大小為5 171 M。
為了考慮算法的實用性,選取了200k、300k、500k、800k、1 000k條這五種不同規模的數據集,測試結果如表1所示。
表1 建樹時間測試結果

數據大小/條 現有算法/s并行化算法/s綜合效率提高/%200k6766.90.13300k110106.43.37500k197187.15.28800k310289.67.011 000k401367.49.14
當選取200k條規模數據集時,算法(建樹)效率提升微乎其微,并且10 000條數據規模下,建樹時間反而增加。原因是資源管理、網絡傳輸會伴隨著Spark運行集群而產生額外開銷。當數據規模達到300k以上時,或者海量數據規模時,基于Spark的CVFDT并行化有明顯的效率提升。
圖2對比了單機和Spark集群環境下在數據規模分別為200 M、300 M、500 M、800 M、1 000 M時所需的時間。

圖2 對應不同數據量的處理時間測試結果
在單機環境下,隨著數據規模的擴大,數據處理時間急劇增加;在Spark集群環境下,在200 M數據規模下,處理時間提升不明顯,除了上一節提到的原因之外,主要還存在求解每個分裂條件的基尼系數時,會依照分裂條件對RDD進行過濾,然后再調用count()函數來統計個數,每一次的count操作都是在Spark集群中完成,然后將計算結果傳輸給虛擬機。當屬性列表數據條數N不是很大時,Spark的優勢無法體現。300 M數據量之后,可以看到并行化算法的數據處理時間明顯減少,當數據量到1 000 M時,處理時間縮減了66.6%。
由表1和圖2可以看出,基于Spark集群的并行化CVFDT算法在處理規模較大的流式數據時,運行效率有所提高,并且在數據規模增大時,其效果會越發明顯。并且并行化CVFDT算法相對于單機環境在處理海量數據時處理效率有顯著提高,而且合理設定RDD過濾可使處理效率進一步提高。
將經典的流數據分類挖掘算法CVFDT部署于流數據處理平臺Spark上,借助構建在Spark之上的實時計算框架Spark Streaming來實現對流數據的并行化分類。對CVFDT算法進行了屬性間并行化改造,并且基于Spark的RDD進行了CFVDT算法在建樹流程上的橫向化并行。測試結果證明了該設計思想的正確性和方案的有效性,也說明了基于Spark的并行化CVFDT算法對大規模流數據有良好的適應能力。
參考文獻:
[1] DING Shifei,WU Fulin,QIAN Jun,et al.Research on data stream clustering algorithms[J].Artificial Intelligence Review,2015,43(4):593-600.
[2] ABURROUS M,HOSSAIN M A,DAHAL K,et al.Predicting phishing websites using classification mining techniques with experimental case studies[C]//7th international conference on information technology.Las Vegas,NV,USA:IEEE,2010:176-181.
[3] 王 濤,李舟軍,顏躍進.一種基于哈希鏈表的高效概念漂移連續屬性處理算法[J].計算機工程與科學,2008,30(8):65-68.
[4] 袁 磊,張 陽,李 梅,等.在數據流管理系統中實現快速決策樹算法[J].計算機科學與探索,2010,4(8):673-682.
[5] MINKU L L,WHITE A P,YAO Xin.The impact of diversity on online ensemble learning in the presence of concept drift[J].IEEE Transactions on Knowledge & Data Engineering,2010,22(5):730-742.
[6] DITTRICH J,QUIANéRUIZ J A,JINDAL A,et al.Hadoop++:making a yellow elephant run like a cheetah (without it even noticing)[J].Proceedings of the VLDB Endowment,2010,3(1-2):515-529.
[7] 黎文陽.大數據處理模型Apache Spark研究[J].現代計算機,2015(8):55-60.
[8] 沈 超,鄧彩鳳.論Storm分布式實時計算工具[J].中國科技縱橫,2014(3):53.
[9] RAAHEMI B,ZHONG Weicai,LIU Jing.Peer-to-peer traffic identification by mining IP layer data streams using concept-adapting very fast decision tree[C]//Proceedings of the 20th IEEE international conference on tools with artificial intelligence.Dayton,OH,USA:IEEE,2008.
[10] 張發揚,李玲娟,陳 煜.VFDT算法基于Storm平臺的實現方案[J].計算機技術與發展,2016,26(9):192-196.
[11] YIN Chunyong,FENG Lu,MA Luyu,et al.A feature selection algorithm of dynamic data-stream based on Hoeffding inequality[C]//International conference on advanced information technology and sensor application.[s.l.]:IEEE,2015:92-95.
[12] 歐陽永.運營商大數據系統建設的分析與研究[D].南京:南京郵電大學,2016.
[13] 管祥青.大數據可視化模型的協同過濾算法研究及應用[D].長沙:湖南大學,2015.
[14] XIN R S,GONZALEZ J E,FRANKLIN M J,et al.GraphX:a resilient distributed graph system on Spark[C]//International workshop on graph data management experiences and systems.New York,NY,USA:ACM,2013.
[15] 劉志強,顧 榮,袁春風,等.基于SparkR的分類算法并行化研究[J].計算機科學與探索,2015,9(11):1281-1294.