嚴磊 汪小可

摘 要:基于Hadoop平臺的實時電影推薦系統在需要大量迭代計算時運行速度明顯變慢,無法根據用戶行為作出實時反饋。針對以上問題,設計基于Spark流式計算的實時電影推薦系統,可更好地滿足用戶實時需求。基于Spark流式計算的實時電影推薦系統將傳統電影推薦算法與Spark流式計算方法相結合,在線部分使用Spark Streaming實時接收用戶模擬評分,并使用Scoket編程模擬用戶瀏覽商品時產生的實時日志數據。日志數據包括用戶當前瀏覽電影、觀看電影次數、停留時間與是否購買該商品,再使用Spark Streaming構建實時數據處理系統,計算出當前用戶相關度最高的電影并進行推薦。實驗結果表明,基于Spark 平臺的電影實時推薦系統在離線推薦訓練過程中,訓練速度相對于Hadoop 平臺有明顯提高,能根據用戶行為作出實時反饋,并向用戶進行電影推薦。
關鍵詞:電影推薦;Spark Streaming;Spark;實時推薦
DOI:10. 11907/rjdk. 182121
中圖分類號:TP301 文獻標識碼:A 文章編號:1672-7800(2019)005-0044-05
Abstract:The real-time movie recommendation system of the Hadoop platform can't make the feedback in real time according to the users' behavior. The real-time movie recommendation system based on Spark flow calculation can better meet the users' real-time demand. The real time movie recommendation based on Spark flow calculation is to combine the traditional movie recommendation algorithm with the spark streaming computing film attention. The online part uses Scoket to simulate the user's browsing products to produce real time data. The data includes the movies that the user is currently browsing and the number and stay time of watching the movie and the purchase of the product. Then Spark Streaming is used to build real-time data processing system to calculate current users' biggest concerns about those movies. The implementation results show that compared to the Hadoop platform, Spark platform based on real-time recommendation system achieves the speed of the off-line recommendation training significantly higher than that of the Hadoop platform, and can make real-time feedback according to user behavior, and want users to carry out real-time recommendation.
Key Words:movie recommendations; Spark Streaming; Spark; real-time recommendation
1 Spark與Hadoop簡介
根據 IDC 發布的數字宇宙報告顯示,至 2020 年數字宇宙將超出預期,達到 40ZB,相當于地球上人均產生 5 247GB數據[1]。如何對海量數據進行及時、高效的存取并挖掘出其中的有效信息一直是學術界的研究熱點[2-3]。從計算的角度看,目前大數據處理框架主要分為Spark框架與MapReduce框架(屬于Hadoop生態系統)。
Hadoop是一個高效、可靠、可擴展的開源分布式軟件框架,主要用于大規模數據存儲與業務計算處理[4];Spark是一個具備低延遲、易用性等特點的大數據處理框架,并且引入了RDD(Resilient Distributed Datasets)[5]的抽象。因此,與Hadoop相比,其應用于內存中的運行速度提升了上百倍,在磁盤上的運行速度也得到了大幅提升。
很多學者對Spark平臺進行了大量研究,如王虹旭等[6]在 Spark 平臺上設計一個能夠對海量數據進行高效分析的并行數據分析系統;曹波等[7]在 Spark平臺上實現FP-Growth 算法的并行計算,利用車牌記錄跟蹤車輛;Lu等[8]創新性地在Spark上使用遠程內存提高對海量數據的處理速度;Yang等[9]研究分批處理的梯度下降算法在Spark 平臺上的并行計算問題,提升了深度置信網絡的訓練收斂速度。
隨著電子商務的快速發展,推薦系統得到了越來越多公司重視[10]。Amazon、Facebook和 Yahoo 是最早將 Spark應用于推薦領域的公司。例如:Amazon會根據用戶歷史瀏覽記錄在每個頁面下方作相應推薦,還會根據用戶最近一次商品瀏覽記錄,根據其它物品與該物品相似度作商品推薦。國內將 Spark 應用于推薦領域的公司有阿里、優酷土豆、豆瓣等。
2 Spark流式電影推薦系統設計
2.1 系統架構設計
Sprak平臺采用Spark Streanming技術,在用戶每次訪問網站時,Spark Streaming 的輸入數據按照 batch size(如1s)分成一段段數據(Discretized Stream,簡稱DStream)[11],每一段數據都轉換成 Spark中的 RDD,可根據訪問日志實時計算關注度,并與離線推薦結果合并進行推薦,從而使電影網站推薦結果可根據用戶行為實時改變。
如圖1所示,系統主要分為離線計算與在線計算兩部分[12]。離線部分使用基于Spark MLlib 平臺的協同過濾算法,首先對海量靜態數據進行處理,然后進行離線推薦;在線計算部分使用Spark流式計算電影關注度并進行推薦。
系統使用Java進行開發,整體架構如圖2 所示。
將基于Spark MLlib平臺的協同過濾算法推薦結果與Spark流式計算電影關注度相結合進行推薦。將離線模型推薦的前10部電影存儲到Redis數據庫中,利用Socket2實時計算用戶對電影的關注度,然后將Redis數據庫推薦列表中的前5部電影替換成關注度最高的5部電影,得到最后的實時推薦列表。
2.2 離線計算設計
離線部分使用基于Spark MLlib平臺的協同過濾算法,協同過濾可分為:基于用戶的協同過濾(UserCF)[13]、基于商品的協同過濾(ItemCF)[14]與基于模型的協同過濾(ModelCF)[15]。本文選用基于模型的協同過濾算法,根據用戶喜好電影數據集預測用戶可能喜歡的電影,然后進行推薦。
(1)數據集準備。數據集包含films.dat、score.dat、users.dat。films數據集格式為:電影ID::電影名稱::電影類型;score數據集格式為:用戶ID::電影ID::評分::時間戳;users數據集格式為:用戶ID::性別::年齡::職業編號:郵編。“我自己的評分數據”保存在my.txt中,格式為:我的ID::電影ID::我的評分::評分時間。數據集中總共包含6 039個用戶、3 952部電影,以及100多萬條評分數據。
(2)訓練數據集推薦。首先記載數據集,按照“::”切分數據,緩存之后統計得分最高的前10部電影,在Web界面的“猜你喜歡”欄目向未登錄用戶進行推薦。偽代碼片段如下:
//根據文件夾位置加載數據集
val scoreRdd = sc.textFile(數據位置)
//根據::切分數據,緩存
val score = scoreRdd.map(“::”)。cache
//統計得分最高的前10個電影
val topK10ScoreMovie = score.map(統計函數)。take(10)。foreach(println)
然后,訓練模型進行離線預測。按照score.dat數據集中的時間戳將數據集劃分為訓練(55%,加入用戶評分)、校驗 (15%)與測試(30%)3部分。設置多個訓練參數,其中ranks、lambdas、iters都設置兩個參數,以便于三層嵌套循環產生8個組合(也即8個推薦模型),MLlib使用交替最小二乘法(ALS)學習這些隱性因子[17]。一般使用RMSE(Root-Mean-Square Error)評估誤差是否收斂[18],如公式(2)所示。
其中,N為三元組
最后,剔除已觀看電影,并使用最佳模型推薦10部用戶可能感興趣的電影。離線推薦偽代碼如下:
//分別加載樣本評分數據、我的評分數據、電影數據
score = sc.textFile(數據位置)
myRatings = addRatings(數據位置)
movies = sc.textFile(數據位置)
//將樣本評分數據劃分為訓練(55%,加入用戶評分)、校驗 (15%)與測試(30%)數據,并進行緩存
training = socre.filter(x => x. _1 < 6). cache
validation = score.filter(x => x. _1 >= 6 && x. _1 < 8). cache
test = score.filter(x => x. _1 >= 8). cache
//設置ranks、num Iters、lambdas等參數,ranks 是模型中隱語義因子個數,num Iters為迭代次數,Lambdas為正則化參數
ranks = List(8, 12)
lambdas = List(0.1, 10.0)
numIters = List(10, 20)
//三層嵌套產生8個模型,計算RMSE值
model = ALS.train(training, rank, numIter, lambda)
bestModel=RMSE最小
//使用最佳模型預測評分,對用戶進行推薦
println("推薦前10的電影")
bestModel.get.predict(). collect.sortBy()
2.3 在線計算設計
Spark流式計算電影關注度Spark Streaming 是現有 Spark 核心 API 的一種擴展,適用于實時數據在可擴展、高吞吐、高容錯等特性下的流處理[19]。Spark Streaming的內部處理機制為:接收實時流數據,根據一定時間間隔拆分成一批批數據并通過Spark Engine進行處理,最終得到處理后的結果[20]。在線計算框架如圖3所示。
本文通過Java Socket編程模擬用戶瀏覽電影網站產生的實時日志數據。Socket1發送信息格式為:電影ID::瀏覽次數::停留時間::是否收藏::觀看次數。Spark Streaming 實時接收Socket1發送的用戶數據流,并將其劃分為 Batch(可理解為各個批次的數據塊)。引入Spark相關jar包,用Spark引擎處理Batch數據,再以Batch形式輸出。創建Socket2接收Socket1發送的數據,因為用戶不同行為對關注度的影響權重不同,所以需要定義一個計算公式。本文設定瀏覽次數權重為0.8,瀏覽時間權重為0.6,是否收藏權重為1,觀看次數權重為1。使用Spark Sreaming 實時接收模擬用戶日志信息并分析其關注度,得到推薦列表。偽代碼如下:
//先定義一個JavaStreamingContext
SparkConf sparkConf = new SparkConf(). setAppName("job的名字"). setMaster("local[2]")
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,窗口時間);
//創建一個服務器端,監聽指定端口
ServerSocket SerScoket = new ServerSocket(端口號);
//獲取模擬數據
JavaReceiverInputDStream
//設定瀏覽次數權重為0.8,瀏覽時間權重為0.6,是否收藏權重為1,觀看次數權重為1
followValue = Double.parseDouble(lineSplit[1])*0.8+Double.parseDouble(lineSplit[2])*0.6 +Double.parseDouble(lineSplit[3])*1+Double.parseDouble(lineSplit[4])*1;
//對初始化的DStream進行事務級別的處理,通過updateStateByKey以Batch Interval為單位對歷史狀態進行更新
UpdateFollowValue = splitMess.updateStateByKey(函數操作)
//將
JavaPairRDD
在離線模型訓練完畢后得到離線推薦列表,將推薦列表的前10個推薦結果寫入Redis 緩存中,以提高數據存取速度,提升網站性能;然后啟動實時推薦任務,找到在線關注度最高的5部電影;根據用戶ID在Redis 緩存系統中找到離線推薦列表,以此為基礎構建新的推薦列表;去掉離線推薦列表的后5個推薦結果,將在線推薦的5部電影放在推薦列表開頭,構成最終的在線推薦列表。
3 實驗測試
由于Spark平臺在處理任務時,相對于Hadoop平臺在速度上更具有優勢,因此本文采用 Spark 平臺進行離線與在線推薦。為了測試 Hadoop與 Spark 平臺在處理計算任務時的性能差異,本文選用離線訓練方式對使用的電影數據集進行訓練,然后對兩個平臺執行不同任務的作業時間進行對比。實驗結果如圖4所示,結果表明在執行Word Count、User Based 及Item Based等迭代次數不多的任務時,Spark平臺運行效率相對于Hadoop平臺有明顯提升。ALS 模型在Hadoop與Spark平臺上的訓練性能對比如圖5所示,表明在迭代次數不斷增加的情況下,Spark平臺的優勢越來越明顯,運行效率是Hadoop平臺的10倍以上。
以上測試驗證了以Spark平臺作為系統基礎架構的優越性,繼續對系統性能進行測試。系統要求在Ubuntu 17.04 操作系統上運行,并安裝 JDK1.8、Tomcat1.7、MySQL5.5、Hadoop2.2.0、Scala2.10.4、Spark1.0.0、HBase- 0.98.11-hadoop2、eclipse等軟件,且客戶端與服務器需保持網絡連接通暢[21]。
首先對離線與在線部分分別進行測試。統計評分前10的電影,登錄后利用協同過濾算法為用戶作離線推薦,如圖6、圖7所示。
在線推薦部分測試如圖8、圖9所示,分別為Socket1模擬用戶新操作與Socket2計算關注度。
完成對系統各功能模塊的詳細設計后,接下來對系統整體進行測試,驗證實時推薦系統的可行性。對Web界面進行操作,分別測試系統功能是否符合預期。在Web上操作與Web界面反映的測試用例如圖10所示。
JavaWeb顯示電影推薦結果,用戶登錄后界面如圖11所示,用戶點擊刷新后界面如圖12所示。由于技術限制,只顯示了推薦列表的前9部電影。
實驗結果表明,根據用戶行為變化可對電影進行實時更新,相比于傳統電影推薦算法,本文創新地提出大數據下電影網站的實時推薦算法,將離線推薦結果與實時流計算的推薦結果進行融合,生成實時推薦列表。實驗驗證發現,Spark相比于Hadoop具有更快的運行速度,系統能正常運行并實時對用戶進行電影推薦。
4 結語
本文設計并實現了一套基于Spark平臺的電影推薦系統,可分析用戶行為日志信息并實時計算關注度,產生在線推薦列表,然后與離線推薦相結合對用戶進行推薦。但是系統尚有一些不足之處,本文在線計算中采用模擬器模擬用戶行為日志,將來需要加強系統對實際用戶行為日志的采集與傳輸。另外本系統沒有使用Spark集群對訓練任務進行分配,因而未能實現負載均衡,下一步需要研究并解決Spark的集群負載不均衡問題。
參考文獻:
[1] IDC. The digital universe of opportunities:rich data and the incdreasing value of the internet of things [EB/OL]. http://www.emc.com/leadership/digital-universe/2014iview/executive-summary.htm.
[2] CAO J,WU Z,WANG Y,et al. Hybrid collaborative filtering algorithm for bidirectional Web service recommendation[J].Knowledge and Information Systems,2013,36(3):607-627.
[3] GE Y, XIONG H, TUZHILIN A, et al. Cost-aware collaborative filtering for travel tour recommendations[J]. ACM Transactions on Information Systems,2014,32(1):479-496.
[4] 趙鐵柱,袁華強. 基于并發策略的分布式文件系統性能優化方案[J]. 網絡安全技術與應用,2013(7):17-18.
[5] REYNOLD X S, JOSH R, MATEI Z, et al. Shark: SQL and rich analytics at scale[J]. Computer Science, 2012:13-24.
[6] 王虹旭,吳斌,劉旸. 基于Spark的并行圖數據分析系統[J]. 計算機科學與探索,2015,9(9):1066-1074.
[7] 曹波,韓燕波,王桂玲. 基于車牌識別大數據的伴隨車輛組發現方法[J]. 計算機應用, 2015,35(11):3203-3207.
[8] LU X, RAHMAN M W U, ISLAM N, et al. Accelerating spark with RDMA for big data processing: early experiences[C]. Proceedings of the 22nd Annual Symposium on High-Performance Interconnects,2010:9-16.
[9] YANG J,HE SQ. The optimization of parallel DBN based on spark[C]. Proceedings of the 19th Asia Pacific Symposium on Intelligent and Evolutionary Systems,2016:157-169.
[10] 單明. 基于個性化推薦的電子商務推薦系統的設計與實現[D]. 長春:吉林大學, 2014.
[11] ZAHARIA M, DAS T, LI H, et al. Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters[C]. Proceedings of the 4th USENIX conference on Hot Topics in Cloud Computing,2012:10.
[12] 張賢德. 基于Spark平臺的實時流計算推薦系統的研究與實現[D]. 鎮江:江蘇大學, 2016.
[13] 俞美華. 融合用戶興趣度與項目相關度的電影推薦算法研究[J]. 電腦知識與技術,2017,13(8):22-26.
[14] RESNICK P, IACOVOU N, SUCHAK M, et al. GroupLens:an open architecture for collaborative filtering of netnews[C]. ACM Conference on Computer Supported Cooperative Work. ACM,1994:175-186.
[15] SARWAR B, KARYPIS G, KONSTAN J, et al. Item-based collabora-tive filtering recommendation algorithms[C]. Proceedings of the 10th International Conference on World Wide Web. ACM,2001:285-295.
[16] 閻輝,張學工,李衍達. 支持向量機與最小二乘法的關系研究[J]. 清華大學學報:自然科學版, 2001,41(9):77-80.
[17] SU X, KHOSHGOFTAAR T M. A survey of collaborative filtering techniques[M]. Hindawi Publishing Corp,2009.
[18] DE REZENDE R. Giving flexibility to the nelson-siegel class of term structure models[R]. Available at SSRN1290784, 2011.
[19] 趙文芳, 劉旭林. Spark Streaming框架下的氣象自動站數據實時處理系統[J]. 計算機應用, 2018(1): 38-43.
[20] 李天喜. 基于Spark Streaming的試驗數據處理系統的研究與實現[D]. 西安:西安電子科技大學,2015.
[21] 周斯波,程廣,趙宇杰. 計算機軟件的測試方法和裝置[P]. CN 106126426 A,2016.
(責任編輯:黃 健)