張嘉新,趙建平,蔣振剛
(長春理工大學(xué) 計算機(jī)科學(xué)技術(shù)學(xué)院,長春 130022)
隨著信息技術(shù)和互聯(lián)網(wǎng)的發(fā)展,人們逐漸從信息匱乏的時代走入了信息過載的時代。在這個時代,無論是信息消費(fèi)者還是信息生產(chǎn)者都遇到了很大的挑戰(zhàn),作為信息消費(fèi)者,如何從大量信息中找到自己感興趣的信息是一件非常困難的事情,作為信息生產(chǎn)者,如何讓自己生產(chǎn)的信息脫穎而出,受到廣大用戶的關(guān)注,也是一件非常困難的事情。推薦系統(tǒng)就是解決這一矛盾的重要工具。推薦系統(tǒng)的任務(wù)就是聯(lián)系用戶和信息,一方面幫助用戶發(fā)現(xiàn)對自己有價值的信息,另一方面讓信息能夠展現(xiàn)在對它感興趣的用戶面前,從而實(shí)現(xiàn)信息消費(fèi)者和信息產(chǎn)生者的雙贏。推薦算法能夠?qū)⒖赡苁芟埠玫馁Y訊或?qū)嵨铮ɡ纾弘娪?、電視?jié)目、音樂、書籍、新聞、圖片、網(wǎng)頁)推薦給使用者。推薦算法中,協(xié)同過濾算法是目前使用最多的一種算法?!皡f(xié)同過濾”最初的應(yīng)用是于 1992 年由Goldberg、Nicols、Oki及Terry提出[1],應(yīng)用于Xerox公司在Paloma Alto研究中心資訊過載的問題,最初的推薦系統(tǒng)Tapestry只是處理公司內(nèi)部郵件,數(shù)據(jù)量較少。隨著互聯(lián)網(wǎng)的飛速發(fā)展,一些電商一天的數(shù)據(jù)量超過50TB,其中有4億條產(chǎn)品信息和2億多名用戶的活動信息,這無疑給推薦算法與大數(shù)據(jù)處理帶來了挑戰(zhàn),基于模型ALS(Alternating Least Squares)協(xié)同過濾算法提供了良好的降維方法,而最新一代的大數(shù)據(jù)處理引擎Spark非常適合處理ALS的復(fù)雜迭代計算,以逐漸取代Hadoop大數(shù)據(jù)計算平臺[2-6]。Spark雖然計算速度快,但在數(shù)據(jù)量過大時,其效率很難達(dá)到滿意程度,本文基于Spark底層運(yùn)行機(jī)制與ALS算法特點(diǎn)對Spark作業(yè)進(jìn)行資源優(yōu)化與Shuffle優(yōu)化,通過合理分配資源與合并Shuffle過程產(chǎn)生的文件,達(dá)到降低內(nèi)存存儲壓力與減少磁盤I/O的目的,從減少了推薦時間提升了推薦效率。
基于隱語義模型[7]的推薦算法所研究的核心問題就是如何將數(shù)據(jù)抽象出的高維矩陣進(jìn)行矩陣分解的問題,它本質(zhì)上是對用戶與商品的關(guān)系矩陣進(jìn)行降維。本文針對基于ALS模型的推薦算法進(jìn)行優(yōu)化研究。
ALS,譯為交替最小二乘法。將User-Item-Rating(用戶-商品-評分)的數(shù)據(jù)集建立一個User×Item(用戶-商品)的m×n矩陣,由于不是每個用戶都對每個商品進(jìn)行過評分,所以這個m×n的矩陣往往是稀疏的,因此ALS的核心就是將用戶對商品的評分矩陣分解為2個低秩矩陣,分別是用戶對商品隱含特征的偏好矩陣,和商品所包含的隱含特征矩陣,在矩陣分解的過程中,缺失的評分項得到了補(bǔ)充,基于這些補(bǔ)充的評分就可以給用戶進(jìn)行商品推薦了。
對于矩陣R(m×n),ALS旨在找到兩個低秩矩陣X(m×k)和矩陣Y(k×n),來近似的逼近R(m*n),即

其中,R(m×n)代表用戶對商品的品分矩陣,X(m×k)代表用戶對隱含特征的偏好矩陣,Y(k×n)表示商品所包含隱含特征的矩陣,其中k<<min(m,n),為了使得矩陣X和Y的乘積盡可能地逼近R,采用最小化平方誤差損失函數(shù):

其中,rui表示第u個用戶對第i個物品的評分,xu表示用戶u的偏好隱含特征向量,yi表示商品i的隱含特征向量,xTuyi為用戶u對物品評分的近似,為了防止過擬合,加入正則化項:

xu和yi耦合在一起,并不好求解,故引用ALS,先固定Y,將損失函數(shù)L(X,Y)對xu求偏導(dǎo),即:


同理固定X,由對稱性得:

反復(fù)進(jìn)行以上兩步的計算,引入均方根誤差RMSE作為迭代終止的條件參數(shù)。

當(dāng)均方根誤差RMSE的值變化很小,小于一個預(yù)設(shè)值時,就可以認(rèn)為結(jié)果已經(jīng)收斂,停止迭代,也可以預(yù)設(shè)迭代次數(shù),當(dāng)達(dá)到預(yù)設(shè)迭代次數(shù)時,迭代停止。
如圖1所示,由于Spark集群是主從式分布結(jié)構(gòu),即在集群中只有一個Master節(jié)點(diǎn)負(fù)責(zé)整個集群中Worker節(jié)點(diǎn)的資源調(diào)度和分配,一旦Master節(jié)點(diǎn)出現(xiàn)異常就會導(dǎo)致整個集群無法工作,本文用Zookeeper技術(shù)來解決單點(diǎn)故障問題。

圖1 Spark主從結(jié)構(gòu)圖
Zookeeper[12]是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),它包含一個簡單的原語集,分布式應(yīng)用程序可以基于它實(shí)現(xiàn)同步服務(wù),配置維護(hù)和命名服務(wù)等。由Zookeeper維護(hù)的Spark集群與HDFS集群可以達(dá)到高可用的狀態(tài)(High Availability)。

圖2 Spark HA(High Availability)集群
圖2所示,在HA集群中,可設(shè)置多個Standby狀態(tài)的Master節(jié)點(diǎn),當(dāng)Master節(jié)點(diǎn)發(fā)生異常時,Standby Master會切換為Master狀態(tài),處于Standby狀態(tài)的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent發(fā)送的ElectedLeader消息后,就開始通過ZK中保存的Application,Driver和Worker的元數(shù)據(jù)信息進(jìn)行故障恢復(fù)。
由于ALS算法多次迭代的特點(diǎn),Spark作業(yè)的性能主要消耗在了Shuffle階段,同時該環(huán)節(jié)還包含了大量的磁盤I/O、序列化、網(wǎng)絡(luò)傳輸?shù)炔僮?,因此要讓ALS推薦算法的性能提高,就要對Shuffle過程進(jìn)行優(yōu)化。Shuffle,譯為“洗牌”,在MapReduce過程中需要各節(jié)點(diǎn)的同一類數(shù)據(jù)匯集到某一節(jié)點(diǎn)進(jìn)行計算[11],把這些分布在不同節(jié)點(diǎn)的數(shù)據(jù)按照一定的規(guī)則聚集到一起的過程稱為Shuffle。

圖3 無文件合并Shuffle流程
如圖3所示,該圖為Spark集群中的某個節(jié)點(diǎn)的架構(gòu)圖,此節(jié)點(diǎn)上運(yùn)行了4個ShuffleMapTask,節(jié)點(diǎn)的CPU Core個數(shù)為2。每個ShuffleMapTask都會為每個ResultTask創(chuàng)建一份BK(Bucket)緩存,以及對應(yīng)的SBF(ShuffleBlockFile)文件磁盤文件(下文統(tǒng)稱Bucket與ShuffleBlockFile為輸出文件),ShuffleMapTask的輸出會作為MapStatus發(fā)送到DAGScheduler的MapOutTrackerMaster中,MapStatus包含了每個ResultTask要拉取數(shù)據(jù)的大小,每個ResultTask會用BlockStoreShuffleFetcher去MapOutputTrackerMaster獲取自己要拉取的數(shù)據(jù)信息,然后通過BlockManager將數(shù)據(jù)拉取過來。每個Result-Task拉取過來的數(shù)據(jù)會組成一個內(nèi)部RDD,即ShuffleRDD,如果內(nèi)存不夠,則寫入磁盤,隨后每個ResultTask針對數(shù)據(jù)進(jìn)行聚合,生成MapPartitionsRDD。假設(shè)有100個MapTask,100個ResultTask,那么本地磁盤要產(chǎn)生10000個文件,由此可以看出Spark的Shuffle過程會消耗大量的磁盤I/O(輸入輸出)資源,嚴(yán)重影響推薦性能。
通過優(yōu)化ConsolidateFiles,即map端輸出文件合并機(jī)制,可以解決map端輸出文件過多的問題,如圖4所示。

圖4 文件合并Shuffle流程
在并行運(yùn)行了ShuffleMapTask、后,每個ShuffleMapTask都創(chuàng)建了4個BK緩存,以及對應(yīng)的SBF磁盤文件,ShuffleMapTask、執(zhí)行完成后,Shuffle-MapTask、在并行運(yùn)行時不會重新創(chuàng)建新的輸出文件,而是復(fù)用之前的ShuffleMapTask、創(chuàng)建的輸出文件,并將數(shù)據(jù)寫入ShuffleMapTask、的輸出文件中,ResultTask在拉取數(shù)據(jù)時,只是拉取少量數(shù)據(jù),每個輸出文件中可能包含了多個ShuffleMapTask給自己的輸出文件,這樣輸出文件數(shù)量就減少了一倍。在實(shí)際應(yīng)用中假設(shè)有100個節(jié)點(diǎn),每個節(jié)點(diǎn)一個Executor,每個Executor分配 2個CPU Core,分別有1000個ShuffleMapTask和ResultTask,將會產(chǎn)生100萬個輸出文件,磁盤對I/O的壓力非常大。在優(yōu)化ConsolidateFiles后,每個Executor執(zhí)行10個ShuffleMapTask,那么每個節(jié)點(diǎn)輸出文件的數(shù)量是2000個,100個節(jié)點(diǎn)輸出文件的總數(shù)減少至20萬個,磁盤I/O壓力減小了5倍。
本文中所搭建的Spark大數(shù)據(jù)集群為高可用集群,即在集群服務(wù)器架構(gòu)中,當(dāng)主服務(wù)器故障時,備份服務(wù)器能夠自動接管主服務(wù)器的工作,并及時切換過去,以實(shí)現(xiàn)對用戶的不間斷服務(wù),本文所使用的存儲系統(tǒng)是Hadoop中的HDFS(分布式文件系統(tǒng)),具體版本如表1所示。

表1 集群配置
Spark大數(shù)據(jù)集群的優(yōu)勢就是可以不用專業(yè)的、價格高昂的服務(wù)器,而用普通的個人電腦即可,本文實(shí)驗(yàn)中所用的計算機(jī)配置如表2所示。

表2 節(jié)點(diǎn)配置
如表3所示,節(jié)點(diǎn)glance02、glance03為Master節(jié)點(diǎn),即在程序運(yùn)行中,一旦其中一個Active狀態(tài)的Master節(jié)點(diǎn)出現(xiàn)故障,另一個Standby狀態(tài)的Master節(jié)點(diǎn)會以秒級的速度切換為Active狀態(tài),使程序繼續(xù)運(yùn)行。

表3 節(jié)點(diǎn)角色
本文所使用的數(shù)據(jù)集為MovieLens數(shù)據(jù)集,它是由明尼蘇達(dá)大學(xué)的GroupLens研究組組織收集的,數(shù)據(jù)集中包括用戶數(shù)據(jù),電影數(shù)據(jù)與電影的評分?jǐn)?shù)據(jù),主要用于推薦系統(tǒng)的研究,數(shù)據(jù)集包含幾種級別數(shù)量級的數(shù)據(jù),本文使用的數(shù)據(jù)大小為:電影數(shù)據(jù)10618條,電影評分?jǐn)?shù)據(jù)為1000萬條。
解決針對新用戶的冷啟動問題:在數(shù)據(jù)集中選出最受歡迎的50部電影,在這50部電影中隨機(jī)選取10部電影,讓新用戶打分,根據(jù)用戶打分情況,生成用戶的特征向量,并使用ALS協(xié)同過濾推薦算法向用戶推薦50部電影。
通過三組實(shí)驗(yàn)分別是無優(yōu)化,資源優(yōu)化與Shuffle優(yōu)化,觀察四項指標(biāo)(RMSE:均方根誤差,Iteration:迭代次數(shù),Rank:矩陣維度,Assessment:推薦基準(zhǔn)線)與運(yùn)行時間的關(guān)系。

圖5 不同優(yōu)化方式的時間-參數(shù)對比圖
其中,圖5(a)是三組實(shí)驗(yàn)中RMSE的變化值與運(yùn)行時間的關(guān)系;圖5(b)是三組實(shí)驗(yàn)的最佳迭代次數(shù)與運(yùn)行時間的關(guān)系;圖5(c)是三組實(shí)驗(yàn)的最佳矩陣維度與運(yùn)行時間的關(guān)系,圖5(d)中Assessment是高出推薦基準(zhǔn)評估線的百分比,由這三組實(shí)驗(yàn)可以看出,在RMSE與Assessment的值基本不變的情況下,通過資源配置的改進(jìn)推薦時間由21分鐘縮短至14分鐘,提升效率為33.3%,通過Shuffle的map端文件合并的優(yōu)化,推薦時間由14分鐘縮短至9.5分鐘提升的總效率為54.8%。
針對于應(yīng)用行性強(qiáng)的推薦算法,性能的提高不能僅僅需要考慮推薦算法本身,在數(shù)據(jù)量急劇增加今天,大數(shù)據(jù)處理技術(shù)的提高與優(yōu)化也是不可或缺的,本文提出的Shuffle過程中map端的文件輸出合并優(yōu)化是實(shí)際生產(chǎn)中容易忽視而又重要的問題,通過實(shí)驗(yàn)證明,本文提供的改進(jìn)算法是可行、有效的。
Spark是基于內(nèi)存計算的快速的大數(shù)據(jù)計算引擎,要想使推薦效率進(jìn)一步提高,在Spark的優(yōu)化上還有很多值得研究的地方,接下來可以從分布式數(shù)據(jù)的網(wǎng)絡(luò)傳輸,ShuffleMapTask的執(zhí)行并行度及實(shí)際項目重構(gòu)RDD架構(gòu)等方面進(jìn)行更深一步的研究。