馬兆輝 趙睿哲 溫秀梅,2*
(1.河北建筑工程學(xué)院,河北 張家口 075000;2.張家口市大數(shù)據(jù)技術(shù)創(chuàng)新中心,河北 張家口 075000)
Spark基于RDD實現(xiàn)了一體化、多元化的大數(shù)據(jù)處理體系,強大的計算能力以及高度集成化的特點使得Spark在大數(shù)據(jù)計算領(lǐng)域具有得天獨厚的優(yōu)勢.在Spark出現(xiàn)之前,Hadoop平臺下的MapReduce框架是最熱門的大數(shù)據(jù)計算框架,但是MapReduce框架仍暴露出很多缺點,其中最主要的是迭代計算的中間結(jié)果會不停寫入磁盤,造成了數(shù)據(jù)復(fù)制嚴重、磁盤開銷大等問題.同Spark框架相比,MapReduce框架表達能力有限,不得不借助第三方工具去完成更為復(fù)雜的任務(wù).Spark框架是為了解決這些問題而設(shè)計的,Spark框架不僅擁有更豐富的函數(shù),可以對更為復(fù)雜的海量數(shù)據(jù)進行快速操作,而且可以將中間結(jié)果存入內(nèi)存,通過RDD之間存在的依賴關(guān)系形成DAG圖進行轉(zhuǎn)換操作,實現(xiàn)流水線進程,使用戶不必再擔(dān)心底層數(shù)據(jù)的特性,減少了磁盤的開銷,提升了運行速度,提高了容錯性,同時還開發(fā)出完整的Spark生態(tài)系統(tǒng),減少了開發(fā)和維護成本,對大規(guī)模數(shù)據(jù)的處理更加方便快捷,其各個組件可以共同完成絕大部分的數(shù)據(jù)處理需求和場景.本文針對Spark中最為重要的核心組件之一RDD進行詳細介紹,并通過具體實驗進行說明.
RDD(Resilient Distributed Dataset)是彈性分布式數(shù)據(jù)集,是一種抽象的分布式內(nèi)存概念,是Spark平臺中運行計算的基本存儲單元[1].RDD具有強大的容錯功能,不僅可以并行處理元素,同時還是一個高度抽象的數(shù)據(jù)結(jié)構(gòu),包含多個分區(qū).其創(chuàng)建方式主要包括兩大類:一類是來自共享文件系統(tǒng)、HDFS、HBase的外部文件系統(tǒng),另一類則是通過任何數(shù)據(jù)源提供Hadoop Input Format.
RDD提供了豐富的操作來對集合中的元素進行操作.其支持兩種操作類型:Transformations和Actions.Transformations主要是從一個存在的RDD去產(chǎn)生一個新的RDD,而Actions的操作主要是在數(shù)據(jù)集上計算之后返回給Driver[2].
1)RDD的依賴
在對RDD進行轉(zhuǎn)換操作的過程中,每個操作都會在已有的RDD的基礎(chǔ)上產(chǎn)生新的RDD.由于RDD的惰性特性,新的RDD會依賴于原有的RDD,這樣RDD之間就會形成相應(yīng)的依賴關(guān)系.
RDD的依賴關(guān)系分為兩大類,如圖1所示.

圖1 RDD的依賴關(guān)系
①窄依賴:是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等都會產(chǎn)生窄依賴;
②寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都會產(chǎn)生寬依賴[3].
2)RDD之間的轉(zhuǎn)換關(guān)系
Spark中最核心的部分就是RDD,RDD是一個不可變、粗粒度的數(shù)據(jù)集合.在RDD抽象數(shù)據(jù)模型中提供了豐富的轉(zhuǎn)換操作,然而所有的轉(zhuǎn)換操作都不會實際的執(zhí)行,這正是由于其惰性特性,因此只會記錄操作的步驟,真正的計算發(fā)生在RDD的“動作”操作,如圖2所示.

圖2 RDD的轉(zhuǎn)換關(guān)系
3)RDD操作類型
RDD支持兩種操作:轉(zhuǎn)換(Transformation)和動作(Action,動作或行動).其中轉(zhuǎn)換操作用于對RDD的創(chuàng)建,是通過操作方法從已經(jīng)存在的數(shù)據(jù)集中創(chuàng)建一個新的數(shù)據(jù)集,動作操作是數(shù)據(jù)執(zhí)行部分,主要是指計算數(shù)據(jù)集里的數(shù)據(jù)后并將結(jié)果返回到Driver.
由于轉(zhuǎn)換操作都具有Lazy特性,即Spark不會立即進行實際的操作,只會記錄執(zhí)行的流程,只有發(fā)出Action操作的時候才會真正執(zhí)行[4].默認情況下,RDD的每個動作在執(zhí)行的時候,都會將之前的數(shù)據(jù)重新計算一遍,為了保證計算的高效性以及計算結(jié)果的可重用性,在實際計算過程中,根據(jù)實際情況,在特定的計算環(huán)節(jié)上執(zhí)行persist方法,將計算的中間結(jié)果持久化到內(nèi)存或者磁盤上.如果進行了持久化操作,那么在進行Action操作的時候,就會從內(nèi)存或者磁盤將已經(jīng)計算好的數(shù)據(jù)取出直接用于后續(xù)計算,這樣節(jié)省了計算步驟和時間,同時也提高了整體的計算效率.
4)RDD的彈性特性
RDD之所以被稱為彈性數(shù)據(jù)集,其主要體現(xiàn)在以下幾個方面.
①自動將存儲在內(nèi)存和磁盤中的數(shù)據(jù)切換.RDD是基于內(nèi)存的,但是當(dāng)內(nèi)存“滿”的時候,會將一部分數(shù)據(jù)放到磁盤,前提是持久化級別設(shè)置成MEMORY_AND_DISK.
②基于Lineage的高效容錯.若計算步驟很多,如果其中某個環(huán)節(jié)出錯,可從指定位置恢復(fù)已經(jīng)計算好的數(shù)據(jù),有效避免了重新計算.當(dāng)然可恢復(fù)的前提是在相應(yīng)位置進行了計算結(jié)果數(shù)據(jù)的持久化.
③Task如果失敗會自動進行特定次數(shù)的重試.
④Satge如果失敗會自動進行特定次數(shù)的重試,而且只會計算失敗的分區(qū).
⑤檢查點和持久化過程.在計算過程中,有的計算相對復(fù)雜,若計算鏈條相對較長或者其結(jié)果經(jīng)常被訪問,可以將其結(jié)果進行緩存,以便后續(xù)直接訪問,以此來節(jié)省計算時間,提高整體運行速度.
⑥數(shù)據(jù)調(diào)度、DAG調(diào)度、Task調(diào)度和資源管理無關(guān).Spark集群中任務(wù)調(diào)度和資源調(diào)度是分開的.
⑦數(shù)據(jù)分區(qū)的高度彈性.在計算過程中,當(dāng)數(shù)據(jù)分區(qū)較小時會降低處理效率,為了提高處理效率,需要將小的分區(qū)合并成一個較大分區(qū)進行處理;而當(dāng)數(shù)據(jù)分區(qū)較大時,由于內(nèi)存大小限制,需要把分區(qū)劃分成較小的數(shù)據(jù)分區(qū).可以根據(jù)不同的情況設(shè)置不同的分區(qū)數(shù)量和大小,提高或降低并行度[5].
5)RDD運行過程
(1)通過讀取集合或來自外部的數(shù)據(jù)源創(chuàng)建RDD對象;
(2)SparkContext通過RDD的相關(guān)操作構(gòu)建一個DAG作為邏輯執(zhí)行計劃;
(3)DAGScheduler根據(jù)ShuffleDenpendency將DAG劃分為多個階段,每個階段包含多個tasks,之后每個task會被TaskScheduler調(diào)度到不同節(jié)點的Executor上啟動執(zhí)行.如圖3所示.

圖3 RDD在Spark中的運行過程
使用本地系統(tǒng)創(chuàng)建RDD,在IDEA中實現(xiàn)電商用戶頁面單跳轉(zhuǎn)化率統(tǒng)計實驗和電商熱門品類中Top10活躍Session統(tǒng)計實驗的具體案例,各軟件具體版本如表1所示.

表1 軟件及對應(yīng)版本表
2.1.1 使用程序中的集合創(chuàng)建RDD
RDD的數(shù)據(jù)來源可以是程序中的集合,在Spark中可以通過parallelize和makeRDD將集合轉(zhuǎn)化成RDD,SparkContext中的parallelize方法可以指定分區(qū)個數(shù).源碼如圖4所示.

圖4 使用程序中的集合創(chuàng)建RDD
2.1.2 使用本地文件系統(tǒng)創(chuàng)建RDD
RDD的數(shù)據(jù)來源也可以是本地的文件系統(tǒng),這對于程序中需要進行相對較大的數(shù)據(jù)量測試是很有必要的.在Spark中可以通過textFile方法來讀取本地文件系統(tǒng)創(chuàng)建RDD.源碼如圖5所示.

圖5 使用本地文件創(chuàng)建RDD
2.1.3 使用HDFS創(chuàng)建RDD
HDFS可以作為RDD的數(shù)據(jù)來源,而且從HDFS上讀取數(shù)據(jù)來創(chuàng)建RDD的方式也是目前Spark生產(chǎn)系統(tǒng)中最常用的方式.源碼如圖6所示.

圖6 使用HDFS創(chuàng)建RDD
頁面單跳轉(zhuǎn)化率是指一個用戶在一次電商購物過程中訪問的頁面路徑如首頁、產(chǎn)品列表頁、產(chǎn)品詳情頁、訂單頁面、支付頁面,首頁跳轉(zhuǎn)到產(chǎn)品列表頁叫一次單跳,訂單頁面跳到支付頁面也叫一次單跳.單跳轉(zhuǎn)化率就是統(tǒng)計頁面點擊的概率.根據(jù)頁面轉(zhuǎn)化率指標(biāo)的大小,產(chǎn)品經(jīng)理和運營總監(jiān)可以分析網(wǎng)站的產(chǎn)品和頁面的表現(xiàn),決定是否需要去優(yōu)化網(wǎng)站的布局.電商網(wǎng)頁頁面路徑圖如圖7所示.

圖7 電商網(wǎng)頁頁面路徑圖
2.2.1 實驗分析
實驗數(shù)據(jù)采集自電商的用戶行為數(shù)據(jù),共包括有180570條數(shù)據(jù),其中每一條主要包含用戶的4種行為:搜索、點擊、下單和支付.實驗數(shù)據(jù)如圖8所示.

圖8 電商用戶行為數(shù)據(jù)
數(shù)據(jù)中每一行表示用戶的一個行為數(shù)據(jù),采用“_”分割字段,如果搜索關(guān)鍵字是null即表示無效搜索;如果品類id和產(chǎn)品id為-1即表示為無效點擊;用戶可同時下單多個產(chǎn)品,即品類id和產(chǎn)品id可為多個,多個數(shù)據(jù)之間采用“,”進行分割,若不是下單行為即用null表示;用戶也可同時支付多個產(chǎn)品,與下單行為類似,若不是支付行為即用null表示.
實驗過程首先對數(shù)據(jù)進行處理,將每一行的數(shù)據(jù)分割開,讀取到規(guī)定的頁面后,通過reduceByKey轉(zhuǎn)換算子和countByKey行動算子統(tǒng)計出來每個頁面的訪問次數(shù)和每個用戶的頁面單跳跳轉(zhuǎn)路徑并按時間升序排序,然后過濾出單跳跳轉(zhuǎn)目標(biāo)相同的路徑并統(tǒng)計次數(shù),最后計算單跳轉(zhuǎn)化率.實驗過程如圖9所示.

圖9 實驗流程圖
2.2.2 實驗結(jié)果
實驗數(shù)據(jù)中共有49個頁面,頁面id由1到49,本文實驗結(jié)果選取id從1到20的頁面轉(zhuǎn)化率進行展示.實驗結(jié)果如圖10所示,每一行數(shù)據(jù)由單跳跳轉(zhuǎn)路徑和單跳轉(zhuǎn)化率構(gòu)成.

圖10 頁面單跳轉(zhuǎn)化率實驗結(jié)果圖
品類是指產(chǎn)品的分類,部分電商品類可分為多級,此實驗中品類為一級,實驗按照每個品類的點擊、下單、支付的數(shù)量來統(tǒng)計出各個品類的數(shù)量,并選出數(shù)量排名前10的品類作為熱門品類.對于排名前10的品類,分別獲取每個品類點擊次數(shù)排名前10的SessionId,這個功能可以看到,某個用戶群體最感興趣的品類以及各個品類中最典型的用戶的Session的行為.
2.3.1 實驗分析
實驗數(shù)據(jù)仍使用電商的用戶行為數(shù)據(jù).
實驗過程首先分別統(tǒng)計每個品類點擊的次數(shù)、下單的次數(shù)和支付的次數(shù),通過遍歷全部日志數(shù)據(jù),根據(jù)品類id和操作類型分別累加各個品類的數(shù)量,遍歷完成之后就得到了每個品類id和操作類型的數(shù)量,按照點擊下單支付的順序來排序,得到Top10熱門品類.過程如圖11所示.

圖11 Top10熱門品類實驗過程圖
過濾出熱門品類Top10的日志,將熱門品類Top10的數(shù)據(jù)類型轉(zhuǎn)換為RDD[(categoryId,sessionId),1],并統(tǒng)計數(shù)量,將數(shù)據(jù)類型轉(zhuǎn)換為RDD[(categoryId,sessionId),count],統(tǒng)計出每個品類中Session的數(shù)量,接下來對每個品類中的Session數(shù)量進行排序,并取出前10.實驗過程如圖12所示.

圖12 Top10活躍Session實驗流程圖
2.3.2 實驗結(jié)果
Top10熱門品類實驗結(jié)果圖如圖13所示,結(jié)果圖中10個結(jié)果為Top10熱門品類,每一行由品類id、點擊量、下單量、支付量組成包裝類,再按照點擊量的順序降序來排序.

圖13 Top10熱門品類實驗結(jié)果圖
Top10活躍Session實驗結(jié)果圖如圖14所示,每一行數(shù)據(jù)由Top10熱門品類的品類ID和一個List組成,List中包含10個SessionInfo包裝類,包裝類中包含點擊數(shù)最高的SessionId和點擊數(shù),并根據(jù)SessionInfo中的點擊數(shù)降序排序.

圖14 Top10活躍Session實驗結(jié)果圖
Spark不僅含有Hadoop平臺下MapReduce框架所具備的優(yōu)點,也很好地解決了MapReduce中存在的一些問題,同時在功能上為更好的適應(yīng)現(xiàn)代大數(shù)據(jù)環(huán)境做了延伸和擴展,使其在操作更簡潔方便的前提下執(zhí)行速度提高了近百倍.其中,RDD是作為Spark技術(shù)中數(shù)據(jù)操作的基本單位.本文主要論述了RDD的屬性、RDD之間的依賴、常見的轉(zhuǎn)換關(guān)系、操作類型、彈性特性和運行原理,創(chuàng)建RDD的幾種常見的方式以及電商用戶頁面單跳轉(zhuǎn)化率統(tǒng)計實驗和電商熱門品類中Top10活躍Session統(tǒng)計實驗實現(xiàn).Spark不僅可以支持Scala、Java、Python等多種語言編程,還支持DataFrame、DataSet等多種數(shù)據(jù)類型,而且提供了一個完整而強大的生態(tài)系統(tǒng),其中有SQL查詢、流式計算、機器學(xué)習(xí)和圖計算組件,這些組件可以應(yīng)用在一個程序中,完成更加復(fù)雜的需求.在現(xiàn)代大數(shù)據(jù)環(huán)境下,為了更好解決實際生活中的復(fù)雜問題,充分理解并掌握RDD的運行可以讓Spark的運行節(jié)省大量的數(shù)據(jù)處理時間,從而有效地優(yōu)化數(shù)據(jù)處理過程,提高整個過程的效率.