李光明,李垚周,李 頎
(陜西科技大學 電子信息與人工智能學院,陜西 西安 710021)
現如今政府之間業務聯系緊密,各級部門之間大量數據交流頻率越來越高,由于數據清洗[1]包含抽取[2]、轉換、加載[3]3個階段,而各部門之間所用系統獨立,在產生大量日志數據的同時,沒有合理的將日志數據進行抽取和轉換,日志數據[4]無法合理利用,導致政府領導不能得到更加精準的信息,對領導決策無法完全發揮相應的作用,在對數據進行分析時,出現數據分析[5]不精準等問題。為更好應對上述問題,分布式并行大數據清洗成為ETL(extraction-transformation-loading)中的重要手段。
在海量數據清洗領域中,MapReduce[6]日志清洗框架作為當前流行的并行框架而被大型互聯網公司奉為至寶,實驗結果表明,基于MapReduce的并行日志清洗框架在處理大數據時,具有高可擴展性和高容錯性[7],同時還能降低資源消耗。針對大數據清洗,已有不少學者正從事研究。劉心光等[8]從4個優化規則設計基于改進的鏈式MapReduce并行ETL,通過在Map和Reduce進行分工完成作業,來減少ETL清洗流程和I/O消耗,但是該框架沒有考慮到Shuffle、Sort過程,在面對大數據量時,如果不采用分區并行計算,容易造成數據傾斜[9]問題,導致數據清洗分布不均,時間消耗過長。李寧寧等[10]也對大數據清洗進行深入研究,采用基于任務合并的方法對清洗過程進行優化,利用多個相同文件進行合并,使其在Map階段進行一次Shuffle操作,從而提高轉換效率,但只是針對MapReduce并行清洗框架進行研究和設計,由于MapReduce只針對于離線數據[11]清洗,并且數據處理模式單一,只存在Map-Reduce操作,處理過程中還會造成大量磁盤I/O占用[12],導致清洗效率降低。針對上述問題,解書亮等[13]進行基于Spark的并行ETL研究與實踐,提出并行ETL方法,但并行處理是Spark集群自帶的配置設置,且聚合過程中,沒有考慮Join優化過程,只是單純運用Spark原理進行聚合操作,也沒考慮到數據傾斜等問題。基于上述問題,本文結合Spark分布式技術對日志清洗系統進行設計,主要貢獻如下:
(1)使用Hadoop集群[14]、Flume[15]、Kafka[16]、Spark Streaming[17]等大數據相關技術進行日志清洗系統架構設計,并將實時轉換過后的日志數據加載到HBase[18]數據庫中。
(2)針對數據抽取,采用決策對象識別算法將抽取分為3階段,實現對海量數據的快速過濾、去重。
(3)對Join操作進行優化,通過添加N以內的隨機數前綴,對數據較多的Key進行子擴展,先進行局部操作,再去除隨機數之后進行聚合操作,避免Join操作時出現數據傾斜問題。
Spark實時流計算框架具有快速、易用、通用、兼容性[19]等特點,該領域包含Spark Core[20]、Spark SQL[21]、GraphX[22]以及Spark Streaming等相關組件,本文主要運用Spark Streaming核心組件,首先Spark Streaming支持多種數據輸入源,如Kafka、Flume、Twitter[23]、以及簡單的TCP套接字[24]等,其次當日志數據在經過抽取后,可以通過Spark Streaming中內含的RDD(resilient-distributed-dataset)算子進行壓平、過濾、去重等操作,經過一系列算子進行轉換操作后可以加載到Hive[25]、HBase等多種存儲工具中。
Spark Streaming提供一個高級抽象,稱為DStream[26],內部使用RDD算子進行實現,當從外部持續數據時,Spark Streaming會將數據通過RDD算子進行操作轉換,每個RDD算子在執行時,都會將間隔內的數據存儲到內存中,其原理如圖1所示。

圖1 Spark Streaming原理
本系統采取Spark框架中的實時流技術進行架構設計,在數據源層面,采取Web前端旅游子系統產生的日志數據作為數據源,后經Tomcat服務器進行存儲,實時架構使用Flume進行數據采集,存儲到Kafka消息隊列中,然后經流式處理框架Spark Streaming進行數據轉換,實時存儲到Hbase數據倉庫中,整個架構按照分層模式分為視圖層、存儲層以及控制層和業務處理層,同時還包含兩個子系統模塊,系統架構設計如圖2所示。

圖2 系統架構
如圖2所示:系統總體架構設計將從以下幾部分進行分解:
(1) 數據存儲層:實時架構在數據存儲層上采取HBase和Mysql進行數據存儲,同時將Web端采集的日志數據存儲在Kafka中,這樣做的優點在于,Kafka包含生產者消費者模型,可以實時接收數據,同時HBase具備列式存儲優勢,可存儲任意格式。
(2) 業務處理層:采用Spark Streaming流式處理框架,可以針對Kafka消費者組中的數據進行實時拉取,將數據按照DStream形式進行處理,而DStream內部設計采用RDD進行表示,最終會轉換成Operation操作,對重復的數據會按照Key值進行統計計算,然后經過聚合輸出到數據倉庫中。
(3) 控制層:通過引入決策對象識別算法和分區聚合算法對轉換的數據進行算法優化,提高處理效率。
(4) 視圖層:前端頁面數據會通過圖形化進行展示,在架構上采取前后端分離思想,采用MVVM模式,運用HTML、CSS(cascading-style-sheets)、JavaScript、JQuery、Ajax技術設計。
大數據日志清洗系統結合改進后的分區聚合算法對實時產生的數據進行清洗,通過提高Join操作的效率以保證日志數據的精準度。在用戶功能模塊設計上包含超級管理員和普通管理員模塊,在操作類模塊布局中包含日志清洗功能,配置管理功能,圖形統計和資源統計功能,同時各功能模塊中包含子功能。基于Spark的大數據清洗系統功能模塊如圖3所示。

圖3 大數據日志清洗系統功能模塊
日志清洗管理功能包含數據導入、清洗規則制定以及清洗流程解析子功能,通過對將導入的數據解析、驗證、轉換處理以到達清洗效果。
配置管理功能包含流式架構的集群配置,如Kakfa集群、Hadoop集群、Spark集群、Flume集群等。
資源統計功能主要包含各資源的調度、清洗后數據的展示以及在計算時文件的統計等子功能。
數據清洗簡稱ETL,其目的在于對不合理、不規則、有噪音有缺陷的數據進行去重、過濾,只保留合理有用的數據。在ETL操作中,分3個步驟,分別為抽取、轉換、加載,在本文中,數據抽取來自旅游子系統,產生的數據多為日志數據,所以主要工作是去除重復數據和補全因為并發過程中造成的字段缺失的數據。現階段針對海量數據處理問題,在大數據領域中有兩種處理方式,一種是通過MapReduce進行清洗,一種是使用Spark進行數據清洗。
MapReduce并行清洗技術采取分而治之的設計思想,將原始數據抽象成Map-Reduce操作對,輸入輸出采取

圖4 基于MapReduce的ETL流程
具體操作步驟如下:
(1)將原始數據劃分為多個Key/Value鍵值對。通過Map統計每個Key鍵,存入Value值中。
(2)Combiner整合Map結果,將Map的輸出作為Combiner的輸入,再將Combiner的輸出作為Reduce的輸入。
(3)Partition分割每個Map節點的結果,按照Key分別映射給不同的Reduce。
(4)Reduce完成最終的數據聚合,存入數據倉庫中。
雖然MapReduce在數據清洗方面有很大優勢,但每次進行MapReduce處理后,數據都存入磁盤中,對I/O消耗過大,且ETL的數據均為離線數據,無法實時處理,所以清洗效率不是很高。
Spark在進行數據清洗時,以彈性分布式數據集RDD為計算依據,RDD以一組分片為數據集的基本組成單元,每個分片為一個計算任務處理,分片的數量決定并行計算的粒度,而Spark Streaming內部DStream也是基于RDD算子來表示,清洗流程如圖5所示。

圖5 Spark并行大數據清洗流程
由圖5可知,Spark Streaming從Kafka消息隊列中讀取日志數據后,在其內部是由一個個DStream組成,DStream是用一系列連續的RDD來表示。每個RDD含有一段時間間隔內的數據,根據分片數量設置分區,并對數據進行實時抽取,采取并行分區方式處理數據,先通過RDD讀取到文本信息,對數據進行壓平分割,統計相同Key所對應Value值的個數,再通過Join操作將相同Key對應的Value進行聚合,輸出到數據倉庫中,最終達到清洗目的。相比于MapReduce框架,Spark框架更適合于企業對大數據的ETL處理,隨著業務需求的不斷變更和數據量的不斷增大,可以根據實際情況來實現功能、集群的擴展。
本文對基于Spark的數據清洗算法進行了研究和實踐,重點解決數據過濾、去重、數據傾斜等問題,因為在數據清洗過程中,對臟數據的處理是最耗時的階段,需要多步操作,通過決策對象識別算法進行快速過濾去重,提升數據的抽取效率,通過改進的Join操作對去重后數據進行聚合,避免數據傾斜。
假設決策表T具有n個不同的決策對象值,將兼容對象決策對象值用1,2,3,…n代表,不兼容對象決策對象值用n+1表示,我們可以設置決策表T由n+1個子決策表組成,即D1,D2,D3,…,Dn,每個子決策表包含相同的類對象,對象編號為n1,n2,n3,…,nn,因此,決策表T是一個兼容的決策表。假設對象a有不同的對象值,它被映射為1,2,3,…,r,可以得出

(1)

假設可辨別對象由兩個對象組成,即決策對象和條件對象。如果兩個對象的決策值不同,則條件對象a的屬性值也不同,那么a便可以識別這兩個對象,即具有相對的可識別能力。一個對象所能識別的對象數目越多,相對識別能力就越強,就可以用相對識別數來衡量相對識別能力。在兼容性決策表T中A∈C,a可辨別一對屬性的對象,表示如下
ObjPq={
(2)
式中:ObjPq為可辨別的對象,M為決策對象,N為條件對象。f(M,q),f(N,q)為決策值。

(3)
式中:p,q,r為被映射的對象值,vi為等價類中的元素。
在兼容性決策表T中A?C, c∈C∪D,其中對象c為新增識別能力,定義為A1,A2,…,Ar之和,表示如下
ObjSU,A∪{c}|ObjSU,A=ObjSA1,{c}∪…∪ObjSAr,{c}
(4)
式中:A,C,U為對象,c為新增的識別能力。

算法1:
輸入:對象A,識別能力c,保留對象D
輸出:重復數據,重復次數

(1) Main1←Objectkey, Textvalue//傳入參數, 創建Main1函數
(2) For each a int {ObjPq}
(3) A_Class←getObject(value, a)//將獲得的對象傳給導入類A
(4) B_Class←getObject(value, b)
(5) For each a int {ObjSU, A}
(6) context.write(B_Class,3)//輸出重復數據
(7) context.write(C_Class,1)//輸出重復次數
算法2:
輸入:重復數據,重復次數的集合
輸出:過濾后總條數
(1) Main2←Stringstr, Vectorv//傳入參數, 創建Main2函數
(2) totalNum←foreach
(3) Context.write(str, new IntWritable(totalNum))//輸出顧慮后總條數
輸入: 決策表T
輸出: 去重、過濾后的表Table
(1) Main←new JobConf//創建主函數Main, 然后new出SparkStreamingContext類
(2) Job←JobClient.runJob(conf)
(3) Table←For each c int{ObjSU,AU{c}|ObjSU,A}//循環遍歷出兩個對象集中日志條數
(4) Table←Table-{c} //得出去重、 過濾后的表
經過上步操作后,需要在數據倉庫中對數據進行聚合處理,但如果使用傳統的Map Reduce進行數據聚合操作,那在聚合時將產生大量的I/O請求,會降低聚合效率,現基于Spark環境,可以針對RDD算子使用分區聚合算法進行處理。通過Join操作對數據中Key值相同的記錄進行聚合,以達到最終清洗的結果,在數據量少的情況下進行Join運算,不會發生數據傾斜等問題,但假設在RDD1和RDD2中,RDD1中的Key數據量過大,將會使Task1處理時間過長,這將導致數據傾斜等問題。現通過對Join操作進行優化改進,算法實現步驟如下:
步驟1 對大數據量相同的Key的RDD1,使用Sample采樣出一份樣本,統計每個Key的數量,并計算出最大的Key。
步驟2 從RDD1中分離出過大的Key通過以N為隨機數對它標號,使之形成獨立的RDD1_1,數據量正常的Key標號為RDD1_2。同理將RDD2也進行上述操作,得到RDD2_1和RDD2_2。
步驟3 將RDD1_1和RDD2_1進行Join操作,此時原先的Key將被分散成多份,分散到多個任務中進行Join操作,Join的結果記為Result,然后將隨機數前綴去掉,得到Result1,同理RDD1_2和RDD2_2進行操作,結果記為Result2。
步驟4 最后將Join1和Join2通過union算子合并,就計算出最終結果Result3。
算法流程如圖6所示。

圖6 改進的Join操作
從圖6中可以看出,當對Join操作進行算法優化后,RDD1上原有的一個任務被分成2個任務,每個任務進行1×2次運算,因為是并行處理,所以1×2次運算的總時間和2個任務并行運算次的時間是一樣的,在避免數據傾斜的同時,提高了數據清洗效率。
基于Spark的大數據清洗系統主要進行實時數據清洗,流程如圖7所示。

圖7 大數據清洗系統流程
旅游子系統每天實時產生有1 G的日志數據,需要采用流式計算的方式對該日志數據進行清洗,通過Flume采集Web子系統產生的日志格式數據,存儲到Kafka消息隊列中,Spark Streaming實時拉取Kafka消費者組中的日志數據進行清洗,實現政府人員對實時產生數據的精準把控。
為對比數據清洗的效率,本實驗搭建出Hadoop集群和Spark集群,以便進行實驗對比。Hadoop集群和Spark集群均采用5臺服務器進行部署,分別命名為Spark01至Spark05,其中Spark01為主節點,Spark02至Spark05為從節點,每臺服務器都會部署Centos-7,Hadoop-2.6.4,Jdk-8u191-linux-x64,Spark-2.2.-1-bin-hadoop2.6,Zookee-per-3.4.5,Hbase-1.2.12,同時服務器內存配置為4 G、處理器內核總數為2核,每個處理器的內核數量為4個,磁盤大小為50 G,集群開發工具使用IDEA。
本文實驗數據來自于智慧咸陽大數據分析平臺中的一個旅游子系統產生的日志數據,需要按照咸陽市政府的要求對產生的旅游日志數據進行清洗,該旅游子系統包含數據集大小10 G-50 G不等,實驗使用精簡數據集ml-1m、ml-3m、ml-5m與ml-7m進行對比實驗,分別包括100萬、300萬、500萬與700萬條日志數據。
本實驗將從3個方面進行大數據清洗效率對比。
(1)Hadoop集群MapReduce清洗框架和Spark集群Spark清洗框架在不同數據樣例下的運行速率,實驗結果如圖8所示。

圖8 Spark清洗與MapReduce清洗運行速率對比
由實驗結果可以看出,當測試數據依次為100萬、300萬、500萬、700萬時,基于MapRduce的清洗系統明顯比基于Spark的清洗系統所用時間更長,經實驗對比,運用Spark清洗框架清洗數據時,時間相比MapReduce清洗框架縮短50%左右,表明使用Spark構建大數據日志清洗系統更加合理。
(2)為驗證數據傾斜的改善效果,現將本文改進后的算法與傳統的基于Spark的并行ETL數據清洗進行實驗對比,根據均方根誤差RMSE作為算法改善后的評價標準,其公式為
(5)
式中:N代表測試數據的條數,pi表示實際清洗時間,ri表示算法預測的清洗時間,當(pi-ri)的值越小,則整體RMSE的值越小,表明預測清洗的時間與實際的時間的偏差越小,即數據傾斜的改善效果更明顯,大數據清洗系統的精準度和清洗效率越高。
1)在Spark平臺下,將運用決策對象識別算法和對Join操作進行優化后的算法與傳統基于Spark的并行ETL數據清洗算法進行實驗對比,如圖9所示。

圖9 傳統算法與改進后算法清洗時間效率對比

2)在Spark平臺下,將運用決策對象識別算法和對Join操作進行改進后的算法與傳統基于Spark的并行ETL數據清洗算法進行實驗對比,如圖9所示。
由表1可得:本文改進后的算法較基于MapReduce日志清洗算法和傳統的基于Spark并行ETL算法有較低的RMSE值,則時間復雜度較低,算法精準度提升,清洗效率提升,數據傾斜得到很好的改善。

表1 傳統算法與改進后算法RMSE對比
(3)通過測試Reduce節點個數來驗證算法改進后與傳統算法對數據清洗速度的影響,實驗指定測試數據分別為100萬條、300萬條、500萬條、700萬條時。對應實驗結果如圖10~圖13所示。

圖10 100萬條數據對應不同的Reduce個數在算法改進前后的對比

圖11 300萬條數據對應不同的Reduce個數在算法改進前后的對比

圖12 500萬條數據對應不同的Reduce個數在算法改進前后的對比

圖13 700萬條數據對應不同的Reduce個數在算法改進前后的對比
實驗結果如圖10~圖13所示,改變集群中Reduce的個數,發現當數據為100萬條,Reduce為1,數據為300萬條,Reduce為3,數據為500萬條,Reduce為5時,數據為700萬條,Reduce為5時,日志數據清洗時間最短,且算法改進后比傳統算法所用時間更少,清洗效率更高。
由此可以得出:當數據量給定時,改變Reduce節點個數,算法改進前后所用時間明顯不同,且改進后的算法較傳統算法所用時間更短。但當Reduce個數繼續增多時,清洗時間反而增加,這是因為在給定數據量下,Reduce的數量已經確定,而一味提升并行化程度只會造成更多任務分配的開銷。
本文實現了基于Spark平臺的大數據日志清洗系統,實驗結果表明相比于傳統清洗系統或MapReduce清洗系統來說,清洗效率大大提升,能夠高效、快速地完成大數據的清洗任務。
(1)通過Hadoop、Flume、Kafka、Spark Streaming等大數據組件進行系統搭建,提出決策對象識別算法,將抽取分為3階段,分別為辨別等價類中重復數據,對等價類重復數據過濾處理,將過濾后數據存儲在內存之中。
(2)對Join操作進行優化,通過加入隨機數前綴,將數據先進行局部聚合,再全局聚合,降低發生數據傾斜概率。
下一步可以對Spark Streaming中的updateStateByKey算子進行優化,針對每個key的狀態編寫對接實際業務的清洗代碼,使該日志清洗系統更加完善。