吳 奔, 李喜旺, 周心圓
1(中國科學院 沈陽計算技術研究所, 沈陽 110168)
2(中國科學院大學, 北京 100049)
3(吉林大學, 長春 130012)
電力調度網是電網調度自動化、信息化的基礎,是確保電網安全、穩定、經濟運行的重要手段, 是電力系統的重要基礎設施, 傳統的電力調度網安全監測,主要是依靠工程師對網絡設備進行排查或依靠網管對管理信息庫及參數的分析進行定位.
隨著電力系統信息化進程的加快, 持續推動了實時監測系統、現場移動檢修系統、測控一體化系統、智能變電站和電力信息管理系統的擴建和應用, 使電力行業正逐漸步入到由復雜及異構數據源廣泛存在和驅動的電力大數據時代. 電力學術領域開始利用云計算技術解決智能電網海量數據, 但還是無法達到很好的實時處理能力. 想要真正實現海量實時監測, 需要研究其他大數據處理技術, 例如利用內存計算, 大數據流計算等技術, 如目前主流的大數據流計算框架Hadoop、Storm、Spark等[1–3], 采用流計算對產生的數據流進行實時處理, 并將數據在內存數據庫中緩存, 通過內存計算的方式加速數據的處理速度[4], 提高分析處理的性能.
相較于傳統的數據處理方式, 流計算的技術特點主要體現在流入系統的數據流是實時的, 流計算能夠對流入的數據進行實時處理, 并將數據在內存數據庫中緩存, 通過內存計算的方式加速數據的處理速度, 提高分析處理的性能. 流數據處理的一般過程如圖1所示.

圖1 流計算處理一般過程
目前著名的開源數據流計算框架有Hadoop平臺的 MapReduce 計算框架, Apache Storm 計算框架和Apache Spark計算框架, 他們是目前最常見的處理海量數據的開源框架.
Hadoop是磁盤級計算, 而Storm和Spark是內存級計算, 磁盤訪問延遲約為內存訪問的75 000倍, 因此Storm和Spark更快. 對于Storm和Spark這兩個高性能并行計算引擎的最大區別在于實時性: Spark是準實時, 先收集一段時間再處理, 實時計算延遲是秒級;而Storm是純實時, 實時計算延遲是毫秒級. 但Spark擁有更高的吞吐量, Spark還有一個特別的地方是,Spark 的軟件棧允許將一些 library (Spark SQL, MLlib,GrapnX)與數據流相結合[5], 提供便捷的一體化編程模型. Spark的各個組件如圖2所示.

圖2 Spark 軟件棧
Spark計算框架解決了大數據處理遇到的批處理,實時流處理和交互式查詢等難題, 結合Spark高度抽象的 RDD (Resilient Distribute Dataset, 彈性分布式數據集)概念[6], 針對多種不同的數據處理場合, 基于Spark的編程模式將被同一成相同的處理方式, Spark統一了技術棧, 降低了研發成本. 另外 Spark 擁有更清晰, 等級更高的API.
為滿足對電力調度數據網實時監測分析的實時性和高吞吐量的要求, 基于流計算的大數據實時處理分析基礎平臺以電力調度網絡的大量實時監測數據為處理對象, 主要包括: 數據接入模塊, 訓練模塊, 實時計算模塊, 分布式存儲及可視化模塊. 分布式存儲使用內存數據庫和分布式文件數據庫, 完成對實時推送數據進行存儲, 實現實時分析結果存儲, 以及離線處理功能.流計算網絡監測模型圖, 如圖3所示.

圖3 網絡監測模型圖
基于流數據的實時處理分析基礎平臺以電力調度網的大量實時監測數據為處理對象, 主要包括數據源接入, 實時流計算, 以及分布式存儲展示三個基本過程.其中考慮到調度數據網中產生的實時監測數據的源頭很多, 而且數據源只有接入實時處理系統后, 才可以進行流計算處理, 這里數據源是通過自適應采集獲取的特定類型的數據. 結合數據流處理流向, 實時流計算系統框架圖如圖4所示.

圖4 流計算實時處理系統整體架構圖
2.3.1 數據接入模塊
使用分布式消息隊列系統Kafka作為系統的數據接入模塊[7], 發揮其發布訂閱消息傳遞機制及海量消息緩存特性, 為實時監測數據的連續流計算提供數據保障. 由于數據流的生成方式采用的是Kafka分布式消息隊列, 因此數據在進行整合發送時, 還需要根據發送數據的類型, 將數據添加話題字段, 同一Topic內部的消息按照一定的key和算法被分區到不同的服務器上.本系統可以包含多種數據源, 如調度網設備運行狀態信息, 調度網網絡流量特征等, 發布信息時流數據產生系統作為Kafka消息數據的生產者將數據流分發給Kafka消息主題, 流計算系統 Spark Streaming實時消費并計算數據. Kafka分布式集群架構如圖5所示.
2.3.2 實時流計算模塊
系統平臺的實時流計算模塊主要是基于Spark Streaming的分布式流計算框架構成, 它將流式計算分解成一系列短小的批處理作業[8], 將Kafka中每一個話題的連續數據源定義為一個數據流DStream, 而DStream為每個時間段所對應的RDD的集合, 每一段數據都轉化成Spark中的RDD彈性分布式數據集.Dstream數據流的定義如圖6所示.
然后將 Spark Streaming中對 DStream的Transformation操作變為針對Spark中對RDD的Transformation操作[9], 將RDD經過操作變成中間結果保存在內存中. 整個流式計算根據業務的需求可以對中間的結果進行疊加, 或者存儲到外部設備. Spark Streaming的運行流程如圖7所示.
2.3.3 分布式存儲模塊
為提高數據分析處理和數據監測預警的實時性,對于數據的存儲模塊則選用內存數據庫實現, 這里使用分布式內存數據庫Redis將實時處理分析的結果進行數據key/value存儲. 由于內存數據庫存儲容量限制,對于訪問頻率較低, 數據量較大, 用以進行定期離線分析的數據,則需要借助分布式文件數據庫HBase對其進行存儲, 確保數據存儲的可靠性, 高并發, 及擴展能力.
網絡流量異常監測是網絡安全防護至關重要的方法, 由于網絡攻擊具有突發性, 要求我們能夠及時發現可疑網絡流量, 從而采取網絡防護措施. 網絡流量異常監測主要實現方法[10], 首先獲取正常通訊下的網絡數據和攻擊下的異常網絡數據, 將采集到的網絡數據作為帶標簽的訓練樣本[11], 可以結合Spark軟件棧中的MLib機器學習函數庫應用于流數據分析中[12], 通過聚類算法對訓練樣本進行聚類, 建立網絡流量分類模型.結合流處理框架Spark Streaming, 程序加載分類模型對新增的流量數據數據進行分類, 對大規模網絡流量準實時監測[13,14].

圖5 Kafka 分布式集群架構圖

圖6 Dstream 的定義

圖7 Spark Streaming 運行流程圖
對于網絡流量的特征向量, 采用基于機器學習的流量異常監測方法最常用的是聚類算法對數據集樣本進行訓練[15]. 對于如K-means等傳統的劃分聚類方法僅能發現球狀簇, 它們很難發現任意形狀的簇, 無法避免地將噪聲或離群點包含進簇中. 為了發現任意形狀的簇, 可以把簇看做數據空間中被稀疏區域分開的稠密區域, 即基于密度實現聚類. 對于對象o的密度則可以用靠近o的對象數度量. DBSCAN (Density Based Spatial Clustering of Application with Noise, 具有噪聲應用的基于密度的空間聚類)則是基于密度聚類算法的典型代表[7]. 該算法指定參數ε來表示每個對象的鄰域半徑, 對象o的ε鄰域則是以o為中心、以ε為半徑的空間. 鄰域的大小由參數ε確定, 因此鄰域的密度可以簡單地用鄰域內的對象數度量. DBSCAN通過另一參數MinPts, 即指定稠密區域的密度閾值, 來衡量鄰域是否稠密. DBSCAN算法在發現簇的過程如下文.
(1) 首先將給定數據集D中的所有對象都標記為“unvisited”.
(2) DBSCAN隨機地選擇一個未訪問的對象p, 標記 p 為“visited”, 并檢查 p 的ε-鄰域是否至少包含MinPts個對象. 如果不是, 則p被標記為噪聲點.
(3) 否則為 p創建一個新的簇 C, 并且把 p的ε-鄰域中的所有對象都放到候選集合N中. DBSCAN迭代地把N中不屬于其他簇的對象添加到C中.
在此過程中, 對于N中標記為“unvisited”的對象,DBSCAN 把它標記為“visited”, 并且檢查它的ε-鄰域.如果的ε-鄰域至少有MinPts個對象, 則的ε-鄰域中的對象都被添加到N中. DBSCAN繼續添加對象到C,直到C不能再擴展, 即直到N為空. 此時, 簇C完全生成, 于是被輸出. 為了尋找下一個簇, DBSCAN 從剩下的對象中隨機地選擇一個未訪問的對象. 聚類過程繼續, 直到所有對象都被訪問.
系統實驗環境所使用的Spark集群搭建在基于Hadoop的基于分布式的安裝, 集群有3個節點, 其中將一個節點配置為Master, 其他2個配置為Slave, 每個節點的配置都是內存8 GB, 并搭載Centos操作系統, 相關軟件版本如表1所示.

表1 集群的軟件配置
本論文使用的數據為從電力調度數據網通過自適應采集及預處理過的網絡流量數據, 每個網絡連接的統計信息, 數據集的大小約為708 M, 包含490萬個連接. 數據集中每個連接信息包括發送的字節數, 登錄次數, TCP 錯誤數等. 數據集包含 38 個特征, 下面是其中的一個連接的樣例:
2, tcp, http, SF, 1684, 363, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,0, 0, 0, 0, 0, 0, 1, 1, 0.00, 0.00, 0.00, 0.00, 1.00, 0.00,0.00, 104, 66, 0.63, 0.03, 0.01, 0.00, 0.00, 0.00, 0.00,0.00, normal.
數據集中每個連接的信息包括發送的字節數, 登陸次數, TCP 錯誤數等. 以上代表一個 TCP 連接, 他訪問http服務, 發送了1684字節的數據, 收到數據363字節, 用戶登錄成功等. 許多特征值取值為0或1, 比如第15列的su_attemted,它們代表某種行為出現與否.最后的字段表示類別標號, 大多數為normal.
在建立監測模型時, 由于每個特征的屬性值, 和閾值不同, 我們需要將數據集進行數據歸一化(數據標準化)處理, 數據歸一化的標準采用的是z-score歸一化方法, z-score方法是基于數據集的均值和標準差σ計算歸一化后的結果, 計算方式如公式(1)所示:

實驗首先經過Kafka客戶端讀取數據集特征數據通過創建話題的方式生產主題, 發送給Spark Streaming消費, 這里使用Direct方式讀取并計算分析, 將特征數據以DBSCAN聚類學習算法進行聚類, 使用Spark-Mlib中的DbscanModel的變體StreamingDbsan[16].StreamingDbscan模型可以根據增量對簇進行更新. 我們分別就網絡流量異常監測的準確性和平臺計算的實時性進行測試. 準確率通過合并各個SparkStreaming輸出數據來計算. 計算每個類簇所含的主要攻擊種類個數與數據總數的比值.
某個類簇的準確率p的計算公式如公式(2)所示:

其中,m為類簇中數量占第一位的數據總數, 即主要攻擊的類型個數,w為類簇的數據總數.
數據的總準確率P的計算公式如公式(3)所示:

其中,M為所有類簇中數量占第一位的數量總數,W為所有類簇的所有數據的總和.
表2是經過SparkStreaming結合Dbscan數據聚類分析的得出的結果.

表2 流量數據聚類檢測結果
從表2可以看出, 經過聚類分析將數據分為19類,通過公式(3)可以得出總的準確率P為97.48%. 準確率較高.
實驗分別在云計算和流計算處理平臺, 分別以每100萬條數據, 5個測試等級對應時間出來開銷, 分別測試并對最終獲得結果, 從圖8所示的實驗結果可知,與云計算方式的系統架構對比, 使用流計算的系統框架具備了分布式流處理的高吞吐的性能, 能夠滿足海量數據實時處理分析的性能需求.

圖8 云計算方式與流計算方式吞吐量對比
本文提出了基于流計算的處理方式, 針對電網調度數據網海量數據監測分析, 構建實時監測分析平臺,兼具高吞吐量高實時性及容錯性和可擴展性的優勢,該系統基于電網調度數據網流量數據實現了流量異常的監測, 結合流計算技術實現了海量實時數據的計算分析處理及存儲的需求, 同時為電力調度網的自動化運維等其他需求提供有效可靠的借鑒思路. 但本文只是對已知的網絡攻擊進行分析, 還需加強未知類型攻擊的算法模型創建, 系統仍然需要更加深入的改進.