張 鳳 盧居輝 朱海勇 吳 文
(廈門市美亞柏科信息股份有限公司乾坤大數(shù)據(jù)操作系統(tǒng)研究院,福建 廈門 361001)
2022年3月5日,李克強(qiáng)總理在第十三屆全國(guó)人民代表大會(huì)第五次會(huì)議上作《政府工作報(bào)告》[1],該報(bào)告明確提出“建設(shè)數(shù)字信息基礎(chǔ)設(shè)施,逐步構(gòu)建全國(guó)一體化大數(shù)據(jù)中心體系”。2022年4月10日,《中共中央國(guó)務(wù)院關(guān)于加快建設(shè)全國(guó)統(tǒng)一大市場(chǎng)的意見(jiàn)》發(fā)布,明確提出要加快培育統(tǒng)一的技術(shù)和數(shù)據(jù)市場(chǎng)[2]。2022年12月19日,《中共中央國(guó)務(wù)院關(guān)于構(gòu)建數(shù)據(jù)基礎(chǔ)制度更好發(fā)揮數(shù)據(jù)要素作用的意見(jiàn)》(以下簡(jiǎn)稱“數(shù)據(jù)二十條”)發(fā)布,從數(shù)據(jù)產(chǎn)權(quán)、流通交易、收益分配、安全治理等方面出發(fā),為構(gòu)建數(shù)據(jù)基礎(chǔ)制度提出20條政策舉措。“數(shù)據(jù)二十條”的出臺(tái),將充分發(fā)揮中國(guó)海量數(shù)據(jù)規(guī)模和豐富應(yīng)用場(chǎng)景的優(yōu)勢(shì),激活數(shù)據(jù)要素潛能,做強(qiáng)做優(yōu)做大數(shù)字經(jīng)濟(jì),增強(qiáng)經(jīng)濟(jì)發(fā)展新動(dòng)能[3]。因此,數(shù)據(jù)資源大一統(tǒng)將會(huì)成為接下來(lái)大數(shù)據(jù)領(lǐng)域研究的熱點(diǎn)和發(fā)展的趨勢(shì)[4]。
大數(shù)據(jù)領(lǐng)域的研究往往涉及海量數(shù)據(jù),而處理海量數(shù)據(jù)的技術(shù)也是研究的熱點(diǎn)問(wèn)題。其中,YARN 作為一種統(tǒng)一資源管理機(jī)制,因其可運(yùn)行多套計(jì)算框架而備受好評(píng)[5]。Spark 作為一種常用的大數(shù)據(jù)計(jì)算框架[6-7],也是一個(gè)通用的并行計(jì)算框架[8],Spark on YARN 運(yùn)行模式是基于YARN 彈性資源管理機(jī)制,確保用戶在YARN 集群中運(yùn)行的服務(wù)和資源能被完全隔離,從而實(shí)現(xiàn)對(duì)同時(shí)運(yùn)行在集群中的多個(gè)任務(wù)進(jìn)行管理。
在YARN 集群環(huán)境中,每個(gè)應(yīng)用實(shí)例都有一個(gè)Application Master 進(jìn)程,負(fù)責(zé)向集群資源管理器請(qǐng)求Container 資源。Spark 運(yùn)行架構(gòu)主要由Driver、Executor組成[9],Driver負(fù)責(zé)作業(yè)調(diào)度,Executor負(fù)責(zé)執(zhí)行具體的計(jì)算任務(wù)。根據(jù)Spark Driver 和Application Master 在運(yùn)行時(shí)所處的相對(duì)位置,Spark on YARN 可分為兩種模式,即YARN-Client 模式和YARN-Cluster模式[10]。
在YARN-Client 模式中,Spark Driver 單獨(dú)運(yùn)行在集群邊緣節(jié)點(diǎn)服務(wù)器上,通過(guò)部署在YARN集群上的Application Master 進(jìn)行通信,在申請(qǐng)Container 資源后,啟動(dòng)Spark Executor 來(lái)執(zhí)行具體任務(wù),二者分屬集群內(nèi)外兩個(gè)不同的進(jìn)程。在運(yùn)行過(guò)程中,前端用戶可與Spark Driver 保持在線連接,進(jìn)行更多交互操作,適合處理交互類型的Spark作業(yè)。
在YARN-Cluster 模式中,Spark Driver 運(yùn)行于YARN 集群的Application Master 中,共享同一個(gè)Container進(jìn)程。在任務(wù)提交后,Spark Driver斷開(kāi)與前端用戶的連接,其生命周期由YARN 控制,不適合運(yùn)行交互類型的作業(yè)。
為滿足與前端用戶的頻繁交互需求,實(shí)現(xiàn)對(duì)Spark on YARN運(yùn)行模式下的Spark作業(yè)生命周期的自主控制,在集群邊緣服務(wù)器上部署YARN-Client模式的Web服務(wù),通過(guò)Web API來(lái)接收、處理前端用戶發(fā)送的Spark作業(yè)請(qǐng)求[11]。由于Spark Driver 被集成在Web 服務(wù)中,相當(dāng)于該服務(wù)內(nèi)部提供一個(gè)Spark Session 入口,通過(guò)該入口將Spark 作業(yè)提交到Y(jié)ARN 集群上運(yùn)行,其作業(yè)運(yùn)行結(jié)果也將被同步返回給前端用戶,實(shí)現(xiàn)與前端用戶的在線交互需求。但該方案也有不足之處,在YARN-Client 模式下,Spark Driver 運(yùn)行在集群邊緣節(jié)點(diǎn)服務(wù)器上,由于Spark 任務(wù)要處理的數(shù)據(jù)量較大且耗時(shí)較長(zhǎng),在Spark Executor 數(shù)量較多的情況下,Spark Driver 與Spark Executor(s)的交互過(guò)程會(huì)占用大量的邊緣節(jié)點(diǎn)服務(wù)器系統(tǒng)資源,造成性能瓶頸,影響邊緣節(jié)點(diǎn)服務(wù)器上其他業(yè)務(wù)的正常執(zhí)行。
在YARN-Cluster 模式下,為避免由前端用戶直接訪問(wèn)集群內(nèi)部節(jié)點(diǎn)而造成的安全隱患,通常要在邊緣節(jié)點(diǎn)服務(wù)器上部署一個(gè)Web Server,通過(guò)保持與集群上Spark Driver 的長(zhǎng)連接會(huì)話關(guān)系,可實(shí)現(xiàn)前端用戶與Spark 的在線交互,但該方案仍無(wú)法避免集群間的網(wǎng)絡(luò)連接消耗。
為解決上述重量級(jí)客戶端方案存在的弊端,可以通過(guò)輕量級(jí)開(kāi)源應(yīng)用框架Spring Boot 來(lái)構(gòu)建Web 應(yīng)用服務(wù)[12],在內(nèi)部集成Spark Driver,以YARN-Cluster 模式將多個(gè)無(wú)差別的Web 服務(wù)部署到Y(jié)ARN 集群中,利用Restful API 來(lái)完成Spark 在線交互式作業(yè)的提交請(qǐng)求[13]。這些服務(wù)進(jìn)程將長(zhǎng)期駐留在集群內(nèi)部,可避免因頻繁申請(qǐng)Container資源而造成的時(shí)間消耗。
同時(shí),在邊緣節(jié)點(diǎn)服務(wù)器上部署高性能負(fù)載均衡和動(dòng)態(tài)代理組件HAProxy[14]。將Spring Boot服務(wù)所在的IP 地址和Restful API 端口通過(guò)HAProxy 動(dòng)態(tài)配置接口(HAProxy Data Plane API)實(shí)時(shí)注冊(cè)到HAProxy 后端代理配置中,利用HAProxy 的自動(dòng)重載機(jī)制來(lái)實(shí)現(xiàn)動(dòng)態(tài)加載后端配置,使前端用戶能在無(wú)感知情況下,通過(guò)HAProxy 的統(tǒng)一對(duì)外接口將Spark 作業(yè)無(wú)差別地提交到分散運(yùn)行在集群上的Spring Boot 服務(wù)中運(yùn)行,可避免外部用戶直接訪問(wèn)集群內(nèi)部節(jié)點(diǎn),在實(shí)現(xiàn)安全隔離的同時(shí),也實(shí)現(xiàn)多個(gè)Spring Boot服務(wù)間的并行調(diào)度,且互不干擾。
通過(guò)上述方法,邊緣節(jié)點(diǎn)服務(wù)器與集群節(jié)點(diǎn)之間無(wú)須保持長(zhǎng)連接會(huì)話,即可動(dòng)態(tài)實(shí)現(xiàn)Spark on Yarn 運(yùn)行模式下Spark 作業(yè)的異步提交及整個(gè)Spark作業(yè)生命周期的自主控制。
本研究通過(guò)提供一種輕量級(jí)客戶端提交Spark作業(yè)的實(shí)現(xiàn)方法來(lái)克服重量級(jí)客戶端方案帶來(lái)的弊端,系統(tǒng)具體實(shí)現(xiàn)框架如圖1所示。

圖1 系統(tǒng)實(shí)現(xiàn)框架
輕量級(jí)Web應(yīng)用服務(wù)(Spring Boot)內(nèi)部集成執(zhí)行引擎Spark Driver,通過(guò)Spark on YARN 的Cluster模式在YARN 集群內(nèi)部運(yùn)行,由Restful API 來(lái)接收處理請(qǐng)求,并在Spring Boot 服務(wù)內(nèi)部直接通過(guò)Spark Driver 來(lái)調(diào)度執(zhí)行器Spark Executor(s),從而完成Spark作業(yè)的執(zhí)行和其他相關(guān)請(qǐng)求操作。該服務(wù)長(zhǎng)期駐留在YARN 集群內(nèi),以在線交互的方式及時(shí)處理不同類型的Spark 作業(yè),可避免頻繁啟動(dòng)Spark容器造成的時(shí)間消耗。
HAProxy(可提供高性能負(fù)載均衡、基于TCP和HTTP 應(yīng)用的動(dòng)態(tài)代理)部署在YARN 集群的邊緣服務(wù)器節(jié)點(diǎn)上,代理前端用戶發(fā)送Spark 作業(yè)相關(guān)操作請(qǐng)求,根據(jù)負(fù)載均衡策略將其發(fā)送到分散在后端YARN 集群上的Spring Boot 服務(wù)上進(jìn)行相應(yīng)處理。前端用戶只需通過(guò)HAProxy 統(tǒng)一訪問(wèn)接口就可無(wú)差別使用后端YARN 集群上的Spring Boot 服務(wù),從而實(shí)現(xiàn)前端用戶與YARN集群的安全隔離。
HAProxy Data Plane API 是一種能實(shí)現(xiàn)HAP-roxy 配置動(dòng)態(tài)更新的Restful API。在啟動(dòng)Spring Boot 成功后,將其監(jiān)聽(tīng)的IP 地址和端口通過(guò)HAP-roxy Data Plane API 動(dòng)態(tài)注冊(cè)到HAProxy 代理配置中,HAProxy Data Plane API 會(huì)根據(jù)配置的變更情況來(lái)自動(dòng)重載HAProxy 服務(wù),使前端用戶能在無(wú)感知的情況下訪問(wèn)所有后端服務(wù)。
服務(wù)啟動(dòng)器(Launcher)用于監(jiān)聽(tīng)并保證在YARN 集群中始終運(yùn)行一定數(shù)量的Spring Boot 服務(wù)。Launcher 會(huì)將失效的Spring Boot 服務(wù)接口從HAProxy Data Plane API 中動(dòng)態(tài)刪除。當(dāng)YARN 集群上Spring Boot 服務(wù)數(shù)量不足時(shí),Launcher 會(huì)啟動(dòng)新的Spring Boot服務(wù),直至滿足數(shù)量要求。
Spring Boot 服務(wù)在接收到Spark 作業(yè)提交請(qǐng)求后,會(huì)直接將其寫入輕量級(jí)的消息隊(duì)列服務(wù)中,排隊(duì)等待處理,實(shí)現(xiàn)前端用戶請(qǐng)求與后端執(zhí)行引擎的解耦與異步化。
NoSQL 數(shù)據(jù)庫(kù)用于保存和更新Spark 作業(yè)的執(zhí)行進(jìn)度,并在作業(yè)運(yùn)行完畢后,會(huì)保存一定數(shù)量的采樣數(shù)據(jù)或結(jié)果數(shù)據(jù),用于向前端用戶展示。
使用動(dòng)態(tài)代理服務(wù)的好處是其能屏蔽前端用戶與后端YARN 集群間的直接交互訪問(wèn),保證內(nèi)網(wǎng)安全,同時(shí)使Spark Driver 分散運(yùn)行于YARN 集群上,減少集群邊緣節(jié)點(diǎn)的運(yùn)行壓力,從而實(shí)現(xiàn)輕客戶端服務(wù)的目的,具體處理流程如圖2所示。

圖2 動(dòng)態(tài)代理服務(wù)處理流程
動(dòng)態(tài)代理服務(wù)管理執(zhí)行步驟如下。
(1)在YARN 邊緣節(jié)點(diǎn)服務(wù)器上啟動(dòng)Launcher服務(wù),啟動(dòng)并保持一定數(shù)量的Spring Boot 服務(wù)。Launcher 服務(wù)會(huì)定期通過(guò)HAProxy Data Plane API來(lái)獲取HAProxy 當(dāng)前已配置的Spring Boot 后端服務(wù)接口。在Launcher 與Spring Boot 進(jìn)行服務(wù)通信時(shí),檢查所有后端服務(wù)的有效性,將當(dāng)前有效的Spring Boot服務(wù)數(shù)量N與實(shí)際需要啟動(dòng)的服務(wù)數(shù)量M進(jìn)行比較。若N (2)啟動(dòng)成功的Spring Boot 服務(wù)會(huì)將其所在IP和監(jiān)聽(tīng)端口通過(guò)HAProxy Data Plane API 注冊(cè)到HAProxy后端配置中。 (3)HAProxy Data Plane API 會(huì)根據(jù)最新配置動(dòng)態(tài)重載HAProxy,并對(duì)外提供統(tǒng)一代理服務(wù),將Spark 相關(guān)請(qǐng)求轉(zhuǎn)發(fā)到分散在YARN 集群中的Spring Boot服務(wù)上進(jìn)行處理。 通過(guò)動(dòng)態(tài)代理HAProxy 提供的統(tǒng)一訪問(wèn)接口,前端用戶Client將Spark作業(yè)Job提交到后端YARN集群的Spring Boot 服務(wù)上執(zhí)行,并異步獲取作業(yè)的執(zhí)行進(jìn)度和處理結(jié)果。提交作業(yè)處理流程如圖3所示。 圖3 提交作業(yè)流程 提交作業(yè)具體執(zhí)行步驟如下。 (1)Client 向HAProxy 統(tǒng)一訪問(wèn)接口發(fā)送Job 處理請(qǐng)求。 (2)HAProxy 在接收到Job 處理請(qǐng)求后,根據(jù)負(fù)載均衡策略選擇一個(gè)后端服務(wù)(如Spring Boot 1)來(lái)處理當(dāng)前請(qǐng)求。 (3)Spring Boot 1服務(wù)會(huì)直接將Job處理請(qǐng)求信息寫入到消息隊(duì)列中,通知前端用戶該Job 已提交成功,進(jìn)入調(diào)度隊(duì)列等待后續(xù)處理。 (4)YARN 集群中其他空閑的后端服務(wù)(如Spring Boot 2)從消息隊(duì)列中獲取Job 處理請(qǐng)求信息。 (5)利用Spring Boot 2 服務(wù)內(nèi)部集成的Spark Driver 調(diào)用集群中的Spark Executor(s)對(duì)Job 進(jìn)行調(diào)度執(zhí)行。 (6)Spring Boot 可定時(shí)將Job 處理進(jìn)度寫入到NoSQL 數(shù)據(jù)庫(kù),并將Job 的最終處理結(jié)果和采樣數(shù)據(jù)寫入NoSQL數(shù)據(jù)庫(kù)。 (7)Client 從步驟(3)接收到Job 提交成功的信息后,向HAProxy 統(tǒng)一訪問(wèn)接口發(fā)送獲取Job 實(shí)時(shí)進(jìn)度和處理結(jié)果的請(qǐng)求信息。 (8)對(duì)獲取到的Job 進(jìn)度和結(jié)果請(qǐng)求信息,HAProxy 按照負(fù)載均衡策略選擇一個(gè)后端服務(wù)(如Spring Boot 3)進(jìn)行處理。 (9)Spring Boot 3 在接收到請(qǐng)求信息后,從NoSQL 數(shù)據(jù)庫(kù)中獲取Job 實(shí)時(shí)進(jìn)度和處理結(jié)果,并經(jīng)HAProxy 返回給前端用戶,完成此次作業(yè)的提交過(guò)程。 Spark 作業(yè)Job 在被提交到集群上的Spring Boot 服務(wù)后,因某種原因,前端用戶要提前終止作業(yè)執(zhí)行。此時(shí),可通過(guò)HAProxy 提供的統(tǒng)一訪問(wèn)接口來(lái)及時(shí)撤銷Spark 作業(yè)。取消作業(yè)流程如圖4 所示。 圖4 取消作業(yè)流程 取消作業(yè)具體執(zhí)行步驟如下。 (1)前端用戶向HAProxy 統(tǒng)一訪問(wèn)接口提交取消Job的請(qǐng)求。 (2)HAProxy 按照負(fù)載均衡策略將請(qǐng)求分發(fā)到后端服務(wù)(如Spring Boot 2)上執(zhí)行。 (3)Spring Boot 2從NoSQL數(shù)據(jù)庫(kù)中獲取Job執(zhí)行進(jìn)度和Job 當(dāng)前運(yùn)行時(shí)所在的Spring Boot 服務(wù)信息。 (4)Spring Boot 2 根據(jù)Job 的進(jìn)度狀態(tài)來(lái)執(zhí)行不同的取消操作。①若Job 執(zhí)行進(jìn)度不存在,則說(shuō)明該Job 未執(zhí)行,Job 信息仍在消息隊(duì)列中,直接刪除消息隊(duì)列中的Job 信息,標(biāo)記Job 已取消,繼續(xù)執(zhí)行步驟(5);②若Job 執(zhí)行進(jìn)度為已完成狀態(tài)(如成功、失敗、取消等終結(jié)狀態(tài)),則說(shuō)明該Job 已執(zhí)行完畢,無(wú)須處理,直接標(biāo)記Job 已取消,繼續(xù)執(zhí)行步驟(5);③若Job 執(zhí)行進(jìn)度為未完成狀態(tài),則說(shuō)明該Job 正在執(zhí)行。將取消Job 的請(qǐng)求信息轉(zhuǎn)發(fā)到Job 運(yùn)行時(shí)所在的后端服務(wù)(如Spring Boot 1)中進(jìn)行處理,繼續(xù)執(zhí)行步驟④;④利用Spring Boot 1 內(nèi)部集成的Spark Driver 來(lái)向集群提交Job 取消指令,完成后將NoSQL 數(shù)據(jù)庫(kù)中的Job執(zhí)行進(jìn)度更改為取消狀態(tài),標(biāo)記Job 已取消,并將取消結(jié)果反饋給Spring Boot 2。 (5)Spring Boot 2 將Job 取消結(jié)果通過(guò)HAProxy反饋給前端用戶,結(jié)束操作。 基于輕量級(jí)Web 應(yīng)用框架集成的Spark Driver 功能,采用YARN-Cluster 模式將多個(gè)無(wú)差別的Web 服務(wù)部署到資源充足的YARN 集群上,能分散運(yùn)行,并行調(diào)度,減輕邊緣節(jié)點(diǎn)服務(wù)器的資源使用負(fù)擔(dān),降低故障發(fā)生頻率。 通過(guò)代理組件來(lái)實(shí)現(xiàn)動(dòng)態(tài)配置更新與自動(dòng)重載機(jī)制,代理后端多個(gè)無(wú)差別Web 服務(wù),對(duì)外提供統(tǒng)一的服務(wù)訪問(wèn)接口,能在用戶對(duì)后端無(wú)感知的情況下,接收和分發(fā)Spark 作業(yè)的交互式提交請(qǐng)求,從而完成對(duì)Spark 作業(yè)生命周期的管理。 代理組件僅提供Spark請(qǐng)求信息和結(jié)果數(shù)據(jù)的高性能代理和轉(zhuǎn)發(fā)功能,無(wú)須在邊緣節(jié)點(diǎn)服務(wù)器與集群之間保持長(zhǎng)連接會(huì)話,具體請(qǐng)求由后端Web服務(wù)之間自動(dòng)協(xié)作完成,能實(shí)現(xiàn)輕客戶端的高效交互效果。 綜上所述,本研究通過(guò)在邊緣節(jié)點(diǎn)服務(wù)器上部署高性能負(fù)載均衡和動(dòng)態(tài)代理組件HAP-roxy,提供一種輕量級(jí)客戶端方式來(lái)提交Spark作業(yè)的實(shí)現(xiàn)方法,實(shí)現(xiàn)對(duì)Spark 作業(yè)進(jìn)行動(dòng)態(tài)調(diào)度與全生命周期的管理。通過(guò)Spark on YARN模式將多個(gè)具有相同功能但相互之間獨(dú)立運(yùn)行的Rest 服務(wù)部署到Y(jié)ARN 集群上,利用HAProxy的自動(dòng)重載機(jī)制來(lái)動(dòng)態(tài)更新和加載后端服務(wù)配置,使前端用戶能在對(duì)后端變動(dòng)無(wú)感知的情況下;通過(guò)HAProxy 的統(tǒng)一對(duì)外接口,將Spark 作業(yè)提交到分散運(yùn)行在YARN 集群上無(wú)差別的Rest服務(wù)中執(zhí)行。該方法無(wú)須在邊緣節(jié)點(diǎn)服務(wù)器與集群節(jié)點(diǎn)服務(wù)器之間保持長(zhǎng)連接會(huì)話,通過(guò)HAProxy 能有效避免外部用戶直接訪問(wèn)集群內(nèi)部節(jié)點(diǎn),達(dá)到集群內(nèi)外安全隔離的效果,同時(shí)實(shí)現(xiàn)Spark on YARN 運(yùn)行模式下Spark 作業(yè)的交互式提交與異步調(diào)度,完成對(duì)Spark 作業(yè)全生命周期的自主控制。該方法在解決傳統(tǒng)重客戶端與Spark 應(yīng)用服務(wù)保持長(zhǎng)連接會(huì)話弊端的同時(shí),也能滿足前端用戶的頻繁交互需求。3 提交作業(yè)

4 取消作業(yè)

5 結(jié)語(yǔ)