朱子龍,李玲娟
(南京郵電大學(xué) 計算機(jī)學(xué)院,江蘇 南京 210023)
聚類是將未知數(shù)據(jù)集合分組成由較高相似度的對象組成的若干類或者簇的過程[1]。聚類算法大致分為劃分方法、層次方法、基于密度的方法、基于網(wǎng)格的方法和基于模型的方法等五類[2-3]。DBSCAN算法作為典型的基于密度的方法,具有聚類速度快、有效處理“噪聲”點并且能夠發(fā)現(xiàn)任意形狀的簇等優(yōu)點[4],因此研究用該算法高效并行化處理海量數(shù)據(jù)具有重要的現(xiàn)實意義。
Hadoop作為近年來比較流行的大數(shù)據(jù)處理平臺,利用HDFS分布式海量存儲和MapReduce分布式計算框架并行處理,其每次map計算過程中產(chǎn)生的中間結(jié)果需要反復(fù)讀寫本地磁盤,在進(jìn)行大量迭代計算時,MapReduce計算模型將會耗費大量讀寫時間[5]。2009年加州大學(xué)伯克利分校創(chuàng)立基于內(nèi)存的Spark大數(shù)據(jù)處理計算框架,更好地支持交互式查詢和迭代算法,擴(kuò)展了MapReduce計算框架,并且支持內(nèi)存式存儲和高效的容錯機(jī)制。
文中研究了DBSCAN算法在Spark平臺的并行化實現(xiàn)方案,并通過實驗驗證了方案的高效性。
DBSCAN算法是一種典型的基于密度的聚類算法,它將簇定義為高密度相連的點的最大集合。該算法能夠?qū)⒏呙芏鹊臄?shù)據(jù)區(qū)域分為不同的簇,能夠在具有“噪聲”的數(shù)據(jù)集中識別出任意形狀的聚類[3]。對于數(shù)據(jù)量為n的數(shù)據(jù)集合,按照空間索引方法,DBSCAN的計算復(fù)雜度是O(nlogn),否則其計算復(fù)雜度為O(n2)。
(1)DBSCAN算法涉及到的定義。
定義1(Eps鄰域):對指定數(shù)據(jù)集D中以x為圓心的半徑Eps內(nèi)的球形區(qū)域稱為該點x的Eps鄰域。
定義2(核心對象):D中任意一點x的Eps鄰域內(nèi)包含大于或等于最小數(shù)目MinPts個對象,則稱該點x為核心對象。
定義3(邊界對象):D中任意一個不是核心對象的點,當(dāng)其在其他核心對象的Eps鄰域中時,稱它為邊界對象。不同核心對象的Eps鄰域中可能會有相同的邊界對象。
定義4(直接密度可達(dá)):如果D中的點y在點x的Eps鄰域中而且點x是核心對象,則稱點y是從點x關(guān)于參數(shù)Eps和MinPts直接密度可達(dá)的[6]。
定義5(密度可達(dá)):在給定半徑Eps和MinPts的數(shù)據(jù)集D中,存在對象鏈m1,m2,…,mn,其中m1=x,mn=y,對mi∈R(1≤i≤n),如果mi+1是從mi直接密度可達(dá)的,則點y是從點x關(guān)于Eps和MinPts密度可達(dá)的。密度可達(dá)關(guān)系是不對稱的。
定義6(密度相連):如果D中的點x和點y是從點p關(guān)于Eps和MinPts密度可達(dá)的,則稱點x和點y是密度相連的[6]。
定義7(簇):基于密度的DBSCAN算法的簇是密度相連的數(shù)據(jù)點的最大集合[7]。對給定數(shù)據(jù)集D的任意一個非空子集R,如果稱之為簇,必須滿足如下條件:
①最大性:任意對象點x,y∈D,若x∈R,且點y是從x密度可達(dá)的,則y∈R。
②連通性:任意對象點x,y∈R,則點x,y是密度相連的。
定義8(噪聲):如果D中的某點不被包含在任意簇中,則稱為噪聲。
綜上所述:當(dāng)基于密度聚類時,數(shù)據(jù)集中的簇看作是被低密度區(qū)域分隔開的高密度數(shù)據(jù)區(qū)域[8]。數(shù)據(jù)集中的核心對象一定屬于某簇,而且密度相連的點在同一簇中。噪聲點是數(shù)據(jù)集中的干擾數(shù)據(jù),會被舍棄。
(2)DBSCAN算法的基本思想。
首先從給定數(shù)據(jù)對象集中隨機(jī)選定一個點x,在該點給定半徑Eps的區(qū)域?qū)ふ揖垲悺H绻cx的Eps鄰域內(nèi)至少包含MinPts個對象,那么以點x為核心對象創(chuàng)建一個新簇,接著反復(fù)基于這些核心對象查找直接密度可達(dá)的數(shù)據(jù)對象,查找過程可能會涉及密度可達(dá)簇的相關(guān)合并[3]。直到?jīng)]有新的點被合并到其他簇時,算法結(jié)束。
(3)DBSCAN算法的具體步驟。
輸入:包含n個數(shù)據(jù)對象的數(shù)據(jù)集D={x1,x2,…,xn},半徑Eps和最小對象數(shù)目MinPts。
輸出:簇集合{R1,R2,…,Rn}。
①輸入待處理數(shù)據(jù)集D后,任意選擇一個數(shù)據(jù)點x,檢查該對象的Eps鄰域;
②如果數(shù)據(jù)點x的Eps區(qū)域內(nèi)至少包含最小對象數(shù)MinPts,則以點x為核心對象形成新簇并從點x出發(fā)尋找所有密度可達(dá)的數(shù)據(jù)點,隨之更新簇;
③如果數(shù)據(jù)點x不是核心對象,將點x當(dāng)作噪聲點處理;
④repeat以上步驟;
⑤until無新的數(shù)據(jù)點加入任何簇。
不同于Hadoop的MapReduce計算模型,Spark能夠使job中間結(jié)果保存在內(nèi)存中,減少對磁盤的大量讀寫操作,從而可以高效低延遲處理大型數(shù)據(jù)集[9-11]。Spark引入了彈性分布式數(shù)據(jù)集(resilient distributed dataset,RDD)概念,實現(xiàn)了任務(wù)調(diào)度、分發(fā)和處理等,同時提供了更多計算模式組件,如SparkSQL、Spark Streaming、MLib和GraghX等,可以適用于多種分布式平臺場景。
Spark的RDD核心概念,讓其能夠以基本一致的操作方式去處理不同的應(yīng)用場景。RDD本質(zhì)上是一個不可變只讀的分布式元素集合,每個RDD包含不同的分區(qū),這些分區(qū)就是多個dataset片段,它們分別運行在不同的集群節(jié)點上可被同時并行處理。實際上,Spark并行框架計算流程就是通過待處理數(shù)據(jù)創(chuàng)建RDD、轉(zhuǎn)化成新的RDD和調(diào)用RDD行動操作求值得到結(jié)果[12]。RDD支持兩種操作類型:轉(zhuǎn)化(transformation)和行動(action)。其中轉(zhuǎn)化操作是將現(xiàn)有的RDD轉(zhuǎn)化成一個全新的RDD,Spark中轉(zhuǎn)化操作都是惰性求值,只有在行動操作實際用到這些RDD時才會被計算。而行動操作會觸發(fā)實際計算,并且向驅(qū)動器程序返回結(jié)果或者將結(jié)果存入外部存儲系統(tǒng)中。通常情況下,每個轉(zhuǎn)化過的RDD會在對其執(zhí)行行動操作時被重新計算,但是大數(shù)據(jù)迭代算法中經(jīng)常會多次使用同一RDD,為避免多次調(diào)用行動操作對同一RDD計算而帶來的開銷,Spark支持對RDD進(jìn)行持久化,計算出RDD的節(jié)點會分別保存其所求得的分區(qū)數(shù)據(jù)。
Spark在分布式環(huán)境中采用了主從結(jié)構(gòu)計算模型,其中包含驅(qū)動器(driver)節(jié)點和執(zhí)行節(jié)點(見圖1)。

圖1 Spark計算模型
驅(qū)動器節(jié)點運行Application中的main()方法,創(chuàng)建SparkContext、創(chuàng)建RDD和對RDD轉(zhuǎn)化、執(zhí)行行動操作,它作為應(yīng)用邏輯執(zhí)行起點負(fù)責(zé)將用戶程序轉(zhuǎn)化為多個物理執(zhí)行單元,然后將其分發(fā)到不同執(zhí)行器節(jié)點執(zhí)行處理,并且會根據(jù)執(zhí)行器節(jié)點任務(wù)處理情況,負(fù)責(zé)執(zhí)行器節(jié)點的任務(wù)調(diào)度以盡可能地提高效率。執(zhí)行器節(jié)點接收到驅(qū)動器節(jié)點指令后,主要負(fù)責(zé)分區(qū)中任務(wù)執(zhí)行并將任務(wù)執(zhí)行結(jié)果反饋給驅(qū)動器節(jié)點,執(zhí)行器節(jié)點可以為需要緩存的RDD進(jìn)行內(nèi)存式存儲,從而可以提高運行效率。
DBSCAN算法定義類簇是密度相連點的最大集合,需要反復(fù)迭代尋找核心對象的密度可達(dá)數(shù)據(jù)點[13],所以可以使用Spark大數(shù)據(jù)計算框架RDD實現(xiàn)。在Spark應(yīng)用中一個驅(qū)動器(Driver)程序定義了集群中執(zhí)行器(executor)節(jié)點上的分布數(shù)據(jù)集,并實現(xiàn)任務(wù)分發(fā)、調(diào)度、執(zhí)行和聚合結(jié)果等操作。文中設(shè)計的DBSCAN算法并行化方案如下:
(1)配置Spark。
首先,驅(qū)動器程序創(chuàng)建SparkConf對象,配置Spark如何連接到相關(guān)集群中,然后創(chuàng)建SparkContext對象來連接訪問Spark。如前文所說,Spark并行框架計算流程實際上是通過待處理數(shù)據(jù)創(chuàng)建RDD,轉(zhuǎn)化成新的RDD,并調(diào)用RDD行動操作求值得到結(jié)果。一般可以通過兩種方式創(chuàng)建RDD:讀取外部文件系統(tǒng)中的數(shù)據(jù)集或者在驅(qū)動器程序中對數(shù)據(jù)集進(jìn)行并行化,同時Spark支持常用文件格式和文件存儲系統(tǒng),比如HDFS、Amazon S3、HBase等。讀取待處理數(shù)據(jù)集創(chuàng)建RDD后,分發(fā)到集群中各個執(zhí)行器節(jié)點中,轉(zhuǎn)化為Spark中數(shù)據(jù)塊保存在內(nèi)存或者磁盤中,并通過BlockManager進(jìn)行管理。RDD中partition是邏輯數(shù)據(jù)塊,分別對應(yīng)BlockManager管理的物理分區(qū)中相應(yīng)的Block。由于Spark采用惰性求值的方式節(jié)省集群中的內(nèi)存使用,只有RDD行動操作才能觸發(fā)Job的提交計算。RDD的Action算子觸發(fā)Job提交,Spark收到Job后生成具有邏輯性的有向循環(huán)圖(RDD DAG),隨后DAGScheduler會對DAG進(jìn)行Stage劃分。對應(yīng)的每個Stage都會生成一組Task集合并提交到TaskScheduler中,會由TaskScheduler將Task調(diào)度分發(fā)到各個執(zhí)行器節(jié)點的線程池中執(zhí)行。
(2)多執(zhí)行器節(jié)點并行執(zhí)行DBSCAN算法。
DBSCAN算法基于Spark的并行化流程見圖2。

圖2 DBSCAN算法并行化流程
每個執(zhí)行器節(jié)點通過多線程方式對其RDD分區(qū)內(nèi)容中的數(shù)據(jù)使用DBSCAN算法進(jìn)行計算,首先讀取數(shù)據(jù)集形成RDD_1,RDD_1啟動Sample算子,逐一隨機(jī)選取某數(shù)據(jù)點x作為起始點并轉(zhuǎn)化為RDD_2。設(shè)計map函數(shù),計算點x的Eps鄰域內(nèi)是否包含大于或等于MinPts個數(shù)據(jù)對象,以判斷其是否為核心對象,如果是則形成新簇,生成RDD_3。RDD_3啟動collectAsMap算子,將已處理的相同類簇匯總到RDD_4同一數(shù)據(jù)分片中,RDD_4啟動reduceByKey算子,尋找從核心對象出發(fā)直接密度可達(dá)的數(shù)據(jù)點,此過程中可能存在密度可達(dá)的類簇合并,重復(fù)迭代,直到無新的數(shù)據(jù)點加入任何簇時輸出聚類結(jié)果。
執(zhí)行過程中,執(zhí)行器節(jié)點會將需要緩存的RDD緩存在內(nèi)存中,并將各自處理數(shù)據(jù)匯總到驅(qū)動器節(jié)點,并再次迭代計算后終止。此時驅(qū)動器節(jié)點調(diào)用saveAsTextFile算子,將結(jié)果存儲到分布式存儲系統(tǒng)HDFS中。最后,通過調(diào)用SparkContext的stop方法退出Spark應(yīng)用。由于Spark大數(shù)據(jù)計算框架的核心數(shù)據(jù)模型RDD是分發(fā)到不同executer節(jié)點上并行計算的,并將中間結(jié)果緩存到內(nèi)存中,因此對于需要大量迭代計算的DBSCAN算法可以節(jié)省大量時間。
為了測試和分析基于Spark的并行化DBSCAN算法的性能,分別用單機(jī)DBSCAN算法和基于Spark的并行DBSCAN算法對實驗數(shù)據(jù)集進(jìn)行聚類操作,在不同運行模式下,對聚類效果準(zhǔn)確度和時間效率進(jìn)行對比。
實驗搭建的Spark集群包含1臺驅(qū)動器節(jié)點,2臺執(zhí)行器節(jié)點。每個節(jié)點的CPU為Intel CORE i5-4210H,每個節(jié)點配有2個處理器,硬盤數(shù)據(jù)讀寫速度為600.00MB/s,其中驅(qū)動器節(jié)點擁有6G運行內(nèi)存,執(zhí)行器節(jié)點擁有2G運行內(nèi)存。操作系統(tǒng)為centos6.5;Java版本為JDK1.7.0_13;Spark版本為1.6.0;Scala版本為2.10.4。
具體節(jié)點配置見表1。

表1 節(jié)點配置
實驗采用了UCI實驗室提供的Wine數(shù)據(jù)集、Car Evaluation數(shù)據(jù)集和Adult數(shù)據(jù)集[14-15],詳情見表2。

表2 數(shù)據(jù)集
以正確聚類的樣本數(shù)占總樣本數(shù)的百分比作為準(zhǔn)確率,對DBSCAN算法的聚類結(jié)果進(jìn)行評估,得到的結(jié)果如表3所示。

表3 準(zhǔn)確度對比
可以看出,對3個數(shù)據(jù)集,基于Spark的并行DBSCAN算法比傳統(tǒng)單機(jī)運行模式的DBSCAN算法的聚類準(zhǔn)確率都有所提高。算法準(zhǔn)確度不會因為數(shù)據(jù)分布式處理而影響聚類結(jié)果,說明分布式數(shù)據(jù)處理具有良好的穩(wěn)定性和準(zhǔn)確度。
圖3展示了傳統(tǒng)單節(jié)點DBSCAN算法和基于Spark的并行DBSCAN算法處理不同數(shù)據(jù)集時的耗時情況。

圖3 算法運行效率對比
可以看出,在數(shù)據(jù)量比較少的情況下,傳統(tǒng)單節(jié)點DBSCAN算法運行時間比基于Spark的DBSCAN算法運行時間要少,因為Spark平臺啟動時的初始化需要消耗一定時間。伴隨著數(shù)據(jù)量逐漸增大,單節(jié)點DBSCAN算法執(zhí)行時間漲幅明顯,因為數(shù)據(jù)逐步增多所消耗的處理器和內(nèi)存等資源也在逐步增多,單個工作節(jié)點由于資源限制,運行速度會逐步降低導(dǎo)致消耗更多時間;相反,基于Spark的DBSCAN算法運行時間隨著數(shù)據(jù)量增加漲幅明顯低于單節(jié)點運行模式,此時分布式集群優(yōu)勢逐漸顯示出來。可以得出這樣的結(jié)論:在處理規(guī)模較大的數(shù)據(jù)時,基于Spark平臺的DBSCAN算法聚類時效性更好。
對DBSCAN算法如何在Spark平臺進(jìn)行并行化實現(xiàn)進(jìn)行了研究。通過設(shè)計該算法在Spark集群中的并行化實現(xiàn)方案和對3個數(shù)據(jù)集進(jìn)行聚類處理,驗證了在大規(guī)模的數(shù)據(jù)處理中,基于Spark平臺的DBSCAN算法并行化能夠有效完成聚類工作,并且具有更高的準(zhǔn)確度和更好的執(zhí)行效率。
參考文獻(xiàn):
[1] HAN Jiawei,KAMBER M.數(shù)據(jù)挖掘概念與技術(shù)[M].范明,譯.北京:機(jī)械工業(yè)出版社,2001:232-236.
[2] 張 麗.無參數(shù)網(wǎng)格聚類算法的研究[D].鄭州:鄭州大學(xué),2009.
[3] 林建仁.聚類算法的研究與應(yīng)用[D].上海:復(fù)旦大學(xué),2007.
[4] CHEN Yixin,TU Li.Density-based clustering for real-time stream data[C]//Proceedings of the13th ACM SIGKDD international conference on knowledge discovery and data mining.San Jose,California,USA:ACM,2007:133-142.
[5] 李 帥,吳 斌,杜修明,等.基于Spark的BIRCH算法并行化的設(shè)計與實現(xiàn)[J].計算機(jī)工程與科學(xué),2017,39(1):35-41.
[6] 張世安.分布式環(huán)境下的數(shù)據(jù)挖掘算法的研究與實現(xiàn)[D].上海:復(fù)旦大學(xué),2004.
[7] 范 敏,李澤明,石 欣.一種基于區(qū)域中心點的聚類算法[J].計算機(jī)工程與科學(xué),2014,36(9):1817-1822.
[8] 高 兵.基于密度的數(shù)據(jù)流聚類方法研究[D].哈爾濱:哈爾濱工程大學(xué),2014.
[9] 張 泉.面向云計算數(shù)據(jù)中心的存儲服務(wù)質(zhì)量技術(shù)研究[D].武漢:華中科技大學(xué),2014.
[10] JAIN A K.Data clustering:50years beyond K-Means[J].Pattern Recognition Letters,2010,31(8):651-666.
[11] PAN Donghua,ZHAO Lilei.Uncertain data cluster based on DBSCAN[C]//Proceedings of IEEE international conference on multimedia technology.Hangzhou,China:IEEE,2011:3781-3784.
[12] KELLNER D,KLAPPSTEIN J,DIETMAYER K.Grid-based DBSCAN for clustering extended objects in radar data[C]//Intelligent vehicles symposium.Alcala de Henares,Spain:IEEE,2012:365-370.
[13] 唐振坤.基于Spark的機(jī)器學(xué)習(xí)平臺設(shè)計與實現(xiàn)[D].廈門:廈門大學(xué),2014.
[14] 羅啟福.基于云計算的DBSCAN算法研究[D].武漢:武漢理工大學(xué),2013.
[15] ZAHARIA M,CHOWDHURY M,FRANKLIN M J,et al.Spark:cluster computing with working sets[C]//Proceedings of the2nd USENIX conference on hot topics in cloud computing.Boston,MA:[s.n.],2010:1765-1773.