袁海飛
(徐州徐工挖掘機械有限公司信息化管理部 江蘇省徐州市 221000)
工業數據的存儲管理是工業信息化應用、推進智能制造的前提和基礎[1],然而海量的生產設備聯網數據、物聯網數據、實時監控數據為工業數據的存儲管理帶來了效率問題,同時也對設備監控數據存儲計算的實時性、高效性提出了更高的要求[2]。
在數據存儲方面,現有的設備監控系統[3-4]往往將數據存儲在如MySQL等結構化數據庫中,頻繁的讀寫請求給數據庫帶來了極大壓力。有的應用研究[4]采用“一主多從、讀寫分離”的存儲架構,通過一個主數據庫接收寫請求,多個從數據庫同步數據并處理讀請求,這種方式緩解了數據庫的壓力,但仍存在復雜表需要垂直切分或水平切分的情況,在數據分析或查詢過程中表現欠佳。
大數據平臺Hadoop的出現為設備聯網監控數據管理提供了新的方向[5-8],Hadoop平臺中的存儲組件HBase是一種分布式的列式數據庫,針對設備監控數據存儲具有結構靈活、可拓展、存儲效率高等優勢,被廣泛地應用在數據存儲和分析過程中[5-6]。而基于HBase存儲的時序數據庫OpenTSDB支持以毫秒精度、每秒數百萬次寫入,在設備監控數據方面具有良好的表現[8]。
然而,不同于普通應用系統,設備監控系統因其特殊的應用場景會產生大量的實時數據[2],如設備、儀表參數、定位、指標等。這些實時增量不斷增長的時序數據為數據存儲的可擴展性提出了要求。此外,在數萬臺機器毫秒級監控的場景中,服務器每秒需要處理GB級的數據,傳統通過負載均衡進行實時計算的處理方式已經達到瓶頸。
為了解決以上問題,本文提出了一種基于分布式實時計算架構的生產設備數據分析平臺。首先,基于Hadoop平臺構建數據存儲計算集群;然后,利用OpenTSDB對設備實時監測數據進行時序化存儲管理;最后,基于Kafka和Flink對實時設備監測數據進行實時計算分析,實現高并發設備監控場景下的低延遲響應。
OpenTSDB[8]是一個可擴展時間序列數據庫,能夠在不丟失粒度的情況下存儲和提供大量時間序列數據。OpenTSDB可以基于Hadoop平臺運行,支持以毫秒精度、每秒數百萬次寫入,通過添加節點來增加數據庫容量。OpenTSDB的底層由HBase進行存儲,由此可以提供高可用、可擴展、高效查詢等數據操作。OpenTSDB還提供GUI系統,可以根據用戶篩選條件展示數據直方圖。同時,OpenTSDB采用HTTP API的形式提供數據接口,方便異構系統的數據通信以及如Grafana等數據可視化組件進行數據監控展示。
Apache Flink[9]是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink基于Flink流式執行模型(Streaming execution model),能夠支持流處理和批處理兩種應用類型。流處理和批處理所提供的服務等級協議完全不相同,流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。如:實現批處理的開源方案MapReduce、Spark;實現流處理的開源方案Storm;微批處理方案Spark Streaming。與傳統方案不同,Flink在實現流處理和批處理時,將二者統一起來,把批處理被作為一種特殊的流處理,即有界的數據流。這種批、流一體的架構使得Flink在執行實時數據計算時具有極低的延遲。
基于分布式實時計算架構的生產設備數據分析平臺總體架構共包括:數據采集層、數據存儲層、資源管理層、數據計算層和應用層。如圖1所示為系統架構圖。
系統的數據源主要包括:設備實時監測上傳數據、用戶操作日志流數據及通過API調用的相關業務數據。此外,還可以通過ETL導入數據作為系統的數據源。
系統的數據存儲根據數據類型和應用場景分為基礎業務庫、時序數據庫和內存數據庫。基礎業務庫通過MySQL存放系統業務結構化信息,時序數據庫存儲生產設備監控數據。此外,服務器將經常訪問的數據緩存在內存數據庫Redis中,從而提高訪問速度和計算效率。
系統通過由Yarn進行資源管理,負責在有數據計算請求時根據集群狀況分配計算資源和計算節點,從而提供MapReduce、Spark、Flink等組件的計算環境。
系統對于并發產生的設備監測數據、訂單業務數據等,通過Flink進行實時計算,實現異常判斷、實時統計和數據存儲寫入功能,并由Phoenix進行HBase中數據的結構化查詢計算。
系統通過Web的形式提供用戶交互界面,實現對設備實時監測、歷史狀態查詢、統計報表、設備管理、系統管理和日志管理等。

圖1:系統總體架構圖
時序數據庫OpenTSDB的存儲管理沒有提供Java的SDK進行方法調用,而實基于HTTP請求的方式進行操作。系統將設備實時監測時序化數據存儲在OpenTSDB中,其主要存儲結構設計如下。

OpenTSDB底層由HBase負責按照時間戳作為Rowkey進行存儲,每條數據記錄包括監控項名稱metric、時間戳(秒/毫秒級)、監控項值和監控項相關信息(tags)。OpenTSDB是面向列式存儲的數據庫,其監控項信息tags可記錄多條也可不記錄,這些tags可供用戶進行數據后期查詢和篩選使用。
對高并發產生的設備監控數據,系統通過基于Kafka和Flink進行設備數據的實時計算處理,實現異常判斷、實時統計、數據寫入等操作,具體步驟如下:
(1)系統基于消息隊列Kafka對設備數據進行傳輸,達到高并發、低延遲和消峰的作用。首先通過命令行創建Kafka消息訂閱的topic,表示一條設備監控的數據記錄,設計topic名為“devinfo”,定義副本數2個,分區數9個。多副本的設計提高了Kafka的可用性,多分區的設計使得消費者能夠對多個分區同時處理,實現數據負載均衡,提高實時處理效率。
bin/kafka-topics.sh --zookeeper node01:2181 --create --replicationfactor 3 --partitions 9 --topic devinfo
(2)設備監控數據匯聚在設備通信網關,由網關進行協議格式解析,然后構建Kafka生產者KafkaProducer,將設備監控數據發送到Kafka broker的“devinfo”這個topic下。同時,利用回調函數監測是否發送成功,異常則觸發報警。因為Kafka多分區并行消費的設計,同一分區下的數據會順序消費,但不同分區數據的處理會產生數據錯亂的情況。因此,網關生產者根據消息設備id作為分區的key進行分區,將同一設備數據分到同一個Kafka分區中,從而保證同一個設備的消息順序性。

表1:分布式節點配置情況

表2:OpenTSDB寫入性能測試

圖2:OpenTSDB時序數據查詢
(3)基于實時計算引擎Flink創建Kafka的消費者線程池,實時處理設備數據記錄并將其寫入OpenTSDB。
首先,配置Flink流式處理環境,設置Flink定期執行CheckPoint將數據持久化到內存中,設置周期設為1s,設置檢查點模式為Exactly-once,即有且僅有一次。同時,若執行CheckPoint時間超過60s,則丟棄該檢查點。
其次,通過Kafka Flink Connector API實現Flink消費Kafka處理,配置Kafka的相關信息,如:Zookeeper的集群、Kafka的broker集群以及Kafka消息者組。
然后,配置Kafka消息的
最后,添加配置的數據源作為Flink流式環境的source,執行DataStream流的map過程對每一條消息進行實時處理,如:異常判斷、實時統計等,最后分別調用OpenTSDB API執行數據寫入操作。
服務器端首先搭建部署Hadoop集群,包括HDFS、Yarn、Zookeeper、Kafka、HBase等組件的部署、監控管理。此外,還部署了Flink計算組件、MySQL主備節點等。Hadoop集群應用環境選用1個主節點和3個計算節點,各節點配置情況如表1所示。
在性能測試方面針對OpenTSDB的數據寫入性能進行實驗。實驗使用多線程寫入的方式模擬網關節點,并通過Flink計算寫入數據庫,模擬100臺設備,每臺設備監測數據為50個字段,其中30個字段為String型格式,20個為Double型格式,采樣頻率為每秒一次,網關節點每10s一次性寫入數據庫。持續模擬1個小時,共計60*60/10*50*100=180萬個數據點,記錄平均每批次(10s)數據寫入平均耗時并計算吞吐量如表2所示。
OpenTSDB提供了一個Web應用實現數據的簡單查詢和篩選,用戶可以根據監控項名稱和查詢時間范圍獲取到對應監控值的直方圖,通過監控項的tag可以對結果進行篩選。如圖2所示為監控設備PLC的電流變化曲線圖。然而,此界面所查詢的數據為秒級,對于毫秒級的時序數據雖然已經存儲在數據庫中,但仍需要通過Http API的方式進行查詢。
本文設計并實現基于分布式實時計算架構的生產設備數據分析平臺,對實時監控數據庫的存儲管理和實時分析計算進行優化。首先,構建基于Hadoop的數據集群平臺;然后,基于OpenTSDB實現對設備監控數據進行分布式、可擴展的時間序列數據管理;最后,基于Kafka和Flink對實時設備監測數據進行實時計算分析,實現高并發設備監控場景下的低延遲響應。實驗對OpenTSDB的寫入性能進行測試,結果表明本系統能夠有效進行設備監控系統的數據存儲和實時計算過程。