丁派克 曹芳芳 王曉玲
北京航天自動控制研究所,北京100854
隨著信息化、智能化、國產化的發展,航天設備產生的試驗數據日益增加,由此導致數據存儲和分析的可靠性要求和性能要求也在不斷提高。大規模數據的出現不僅對數據存儲產生了很大的壓力,同時也使數據分析、計算的難度大幅增加。“中興事件”、“棱鏡門”事件后,國家加強了自主可控的要求,基于核心技術堅持國產化原則,需要在國產平臺上實現大數據組件軟件的國產化適配和智能數據分析技術。但受限于國產化硬件性能較低的制約,國產化平臺的單機性能過低,為了提升系統的數據處理能力,只能通過增大處理器的數量來實現[1]。將基于Hadoop框架的MapReduce分布式計算框架的航天大數據處理軟件移植到國產硬件上運行時,出現數據解析速度過慢、數據處理性能不足的問題,無法滿足當前的型號需求。針對當前數據處理性能不足的問題,設計一種分布式數據預處理框架,并把預處理框架前移到數據計算模塊之前;在原有的MapReduce 分布式計算框架開發的基礎上,采用基于Spark 的數據計算框架。Spark 是一個針對超大數據集合的低延遲的集群分布式計算系統,通過采用彈性分布式數據集RDD,消除了MapReduce計算過程中的臨時計算數據結果的落地,有效減少了大量非必要的硬盤 IO開銷,且Spark 在數據實時處理、數據挖掘以及機器學習等尤其是需要大量迭代的計算方面具有先天的優勢。
MapReduce是一種分布式編程模型,是Hadoop的核心組件之一[2]。原數據處理軟件的數據解析算法位于MapReduce模塊,通過客戶端將大文件數據導入HDFS(Hadoop Distributed File System,分布式文件系統,Hadoop的核心組件之一,下文簡稱HDFS)文件系統后,提交解析任務給MapReduce模塊。按數據類型將任務分類、切塊后提交給Map,后調用解析算法,在Reduce階段根據業務需要進行上下文運算和排序處理后輸出到HDFS文件系統。原數據處理框架如圖1所示。

圖1 原數據處理框架
這種方式主要有2個問題:1)將數據預處理和解析同時進行,會導致啟停、調用MapReduce模塊頻繁,在Reduce模塊對文件進行上下文運算導致數據傾斜嚴重,內存消耗過大,最終導致數據解析效率不高。2)數據吞吐量不足,可能會出現數據預處理出現錯誤、產生壞數據的情況,導致數據解析的可靠性不足。
針對上述問題設計了分布式數據預處理框架。主要包括以下2個部分:
1)將數據解析算法前移,對數據進行規格化處理后,再進入數據解析模塊進行處理。有效減少了MapReduce的啟停調用頻率;
2)采用Kafka消息隊列,增加數據的吞吐量,可以保證數據的高質量、高可靠性預解析。Kafka是由LinkedIn開發的基于發布-訂閱(Pub-Sub)機制的分布式消息隊列,具有高吞吐、低延遲、易拓展的優勢,并可以對消息隊列進行持久化存儲[3]。優化后的分布式預處理框架如圖2所示。

圖2 分布式預處理框架
分布式預處理框架先按照一定協議規則對數據文件進行分類,然后通過切塊、打標簽等方法以多進程、多線程的方式高效地將原始數據通過Kafka消息隊列有序地解析成結構化、可讀性高的有效數據,以便于后續業務的查詢與分析。Kafka消息隊列對帶有標簽的數據塊按照一定規則進行排序,形成有序數據流,保證進入解析計算模塊的數據是規格化的數據,可以大幅提升數據解析效率。
通過調整預處理結構得到的規格化數據,進入數據解析框架。原軟件采用的MapReduce框架通過輸入加載模塊(InputFormat)對HDFS中的數據進行加載后,進行邏輯分塊存入HDFS,數據讀取模塊(RecordReader)將各個分塊數據從HDFS讀取出來以鍵值對的形式輸出作為Map函數的輸入MapReduce框架是把中間結果寫入到HDFS中,帶來了大量的數據復制、磁盤IO和序列化開銷[4]。這些大量的額外開銷產生了內存消耗過大的問題,導致軟件的數據解析計算性能不足。Spark將數據以“RDD轉換”的方式進行解析計算,結果保存在內存當中,不儲存中間結果,可以大幅減少內存資源消耗,從而提升解析效率。
RDD(Resilient Distributed Datasets,彈性分布式數據集,下文簡稱RDD)是Spark中最基本的數據抽象單位,本質上是一個只讀的分區記錄集合,不能直接修改,只能基于穩定的物理存儲中的數據集來創建RDD,或者通過在其他RDD上執行確定的轉換操作創建新的RDD。每個RDD可以分成多個分區,每個分區就是一個數據集片段(HDFS上的塊),并且一個RDD的不同分區可以被保存到集群中不同的節點上,從而可以在集群中的不同節點上進行并行計算[5]。
RDD的調用方法是非即時的,在計算之前的RDD轉換操作,Spark僅僅是記錄下了RDD轉換操作的行動軌跡以及全部RDD之間的依賴關系,在轉換結果未確定之前,不進行真正的計算。RDD的特殊調用機制,使Spark在處理數據時不必耗費資源存儲中間結果,直接獲得所需要的最終數據進行輸出與存儲。
Spark會根據RDD之間的轉化操作,區分出2種依賴關系:寬依賴于窄依賴。一個父RDD對應多個子RDD,圖3中RDD0、RDD1、RDD2、RDD3之間的關系,為寬依賴;一個或多個父RDD對應一個子RDD,如RDD4、RDD5、RDD6、RDD7之間的關系,為窄依賴。某一個RDD在發生數據丟失時,Spark會返回上一級RDD對數據進行重算[6]。

圖3 RDD的寬依賴與窄依賴
Spark通過記錄RDD之間的依賴關系,可以提升其容錯性,從而提升Spark的執行速度。
在Spark執行計算全部任務時,會根據RDD之間的依賴關系生成流程圖,再通過流程圖的信息劃分任務階段。分解的具體依據是:
1)從流程圖的末端進行反向分析,遇到窄依賴就把當前RDD的計算任務劃入當前階段。由于窄依賴的轉化關系僅存在一個丟失數據的子RDD,在重算時對于父類RDD的利用率為100%,具有很高的計算效率,Spark會盡量將全部窄依賴都分入一個階段進行解析。
2)遇到寬依賴就斷開。對于寬依賴,一個父RDD對應多個子RDD,某一個子RDD發生數據丟失時,Spark返回上一級父RDD,并對所有的子RDD進行再次運算,無論其他子RDD是否發生數據丟失。Spark在寬依賴關系的RDD產生數據丟失時,會對其他未丟失的子RDD的再次計算產生了大量多余的計算,與窄依賴放在同一階段進行計算會降低計算效率[7]。
Spark根據寬窄依賴將計算任務分成多個階段,其中窄依賴的轉化關系類似于管道運輸,RDD之間相互不受影響,將其劃分到同一任務階段進行分布式計算,可以大幅提升計算效率。
基于RDD調用機制以及Spark內存計算機制的優越性,設計如圖4所示基于Spark的數據處理框架。從HDFS取出數據文件,獲得初始數據集RDDOrigin后,對數據進行預處理。預處理階段先根據數據包頭分成5種數據類型并打上標簽,再把數據進行切塊形成若干個RDD。
以Net類型數據為例,RDDNet被切塊后形成若干個帶有標簽的小塊RDD,然后對小塊RDD進行解析,解析完畢后將小塊RDD進行合并、排序操作,最終得到RDDResult為輸出結果,存入到HDFS。Spark任務階段規劃如圖4所示。Spark從流程圖的末端開始分析,RDD的合并排序以及RDD的解析轉換均為窄依賴,可劃分到同一階段;而初始數據集RDDOrigin的按數據類型分塊與RDD1553、RDDAD、RDDNet、RDDTCP/IP、RDDIO的切塊這2個轉化操作均為寬依賴,被劃分到不同的任務階段中。由于Spark在劃分任務階段時是從末端開始分析,那么實際計算的任務階段順序應該是反過來的。RDDOrigin按數據類型分塊的任務在階段1,RDD1553、RDDAD、RDDNet、RDDTCP/IP、RDDIO的切塊在階段2,解析、數據的合并、排序任務在階段3。全部計算任務完成后,輸出最終的RDDResult并存入HDFS。

圖4 基于Spark的數據處理框架
大數據軟件在中標麒麟5.0操作系統環境和龍芯3A3000刀片服務器集群的硬件平臺上適配運行;采用的大數據組件主要包括Hadoop 2.7.2,HDFS 2.7.3,Spark 2.0.2,Kafka 0.8;以某航天武器裝備控制系統產生的試驗數據為解析對象,數據類型主要為1553、AD、IO等二進制數據類型。將100M,300M,1G,3G大小的航天裝備試驗數據分別用舊方案與優化后的方案進行解析,并記錄處理時間,最后把2個方案所消耗的時間進行對比。
通過對航天裝備試驗數據進行解析比對,得出如表1所示結果:
通過試驗數據對比可以看出:優化后的方案在處理100M、300M、1G、3G大小的數據均較原始方案性能優越,數據處理速度有了明顯提升。
通過對數據預處理框架和Spark內存計算技術的研究,提出了一種基于Spark的國產化海量數據預處理與數據計算方法,對航天武器裝備控制系統產生的試驗數據進行快速解析和處理。經過與原數據處理軟件框架的對比分析驗證,結果表明,本文的優化方法可以有效提升國產大數據平臺的數據處理性能,利用RDD調用機制與Spark內存計算能力,解決數據在MapReduce框架中解析性能不足的問題,并已應用到部分航天武器裝備系統中,有效解決了國產平臺下海量數據的快速處理分析需求。

表1 數據解析結果比對