岳敏
(重慶遠通電子技術開發有限公司,重慶 400000)
在供水與排水網中,有大量的無人值守泵站,7×24小時工作。為了保證這些泵站的正常工作,建立了聯網監控系統。這些系統主要采用比較傳統的自動化技術構建,包括使用組態軟件作為現場數據采集的核心軟件、使用Modbus 作為現場數據采集及向中心站的數據傳輸協議、使用關系型數據庫存儲工況數據,如圖1 所示。

圖1 以組態軟件為核心的監控平臺架構
在應用中,該方案雖然解決了無人泵站的遠方監控問題,但也表現出較多的問題,主要表現在以下方面。
(1)軟件架構方面。組態軟件因架構限制,阻礙了監控系統的智能化拓展。因應用需求,需在基礎監控之上進一步開發泵站能耗分析、故障診斷等智能應用。但組態軟件架構比較封閉,提供的開放集成接口較少,既不能有效提供數據、分析技術等方面的基礎支持,也不能方便地將智能應用集成進組態應用中,已不能滿足智能應用開發需求,從整體軟件架構上,阻礙了整個系統的智能化拓展。
(2)網絡傳輸協議方面。Modbus 協議在安全、功能、效率等多方面已不能滿足需求。Modbus 作為應用最廣泛的工控網絡協議,在現場設備數據采集方面具有最好的兼容性,具有不可替代的價值。但其協議幾乎不具備安全機制,因此,在監控站向中心站的廣域數據傳輸方面有嚴重的安全隱患。同時在新開發計劃中,擬進一步建立分布式的設備群事件機制,開發泵站的全生命周期管理應用,Modbus 的輪詢模式從功能性上來說無法支持這些新功能的開發,并在監控站數量不斷增加的情況下,出現了明顯的效率瓶頸。
為解決以上問題,以面向大量無人值守泵站群為目標,采用新的物聯網技術及IT 技術,設計實現了新的無人值守泵站群物聯網平臺。以下將從平臺架構、協議設計、應用接口等方面對新的設計進行詳述。
以數據流處理為基本設計思想,采用MQTT 為通信協議,各單元間以標準的集成方式進行集成的新架構,如圖2 所示。

圖2 以數據流處理為核心的系統架構
為解決Modbus 協議帶來的諸多架構局限,采用了以MQTT 為通信協議的通信架構。MQTT 是目前物聯網系統中應用最廣的協議,訂閱/發布是MQTT 的基本通訊模式。在該模式下,云中心通過訂閱設備端的相關消息,實現設備端向云中心的數據發送;同時,設備端通過訂閱云中心的相關消息,實現接收云中心的控制命令。訂閱/發布模式在大量遠方設備情況下,相對輪詢模式具有明顯的性能優勢,節約大量的網絡資源。
平臺架構以處理數據流作為基本思想。數據流的主要特征是數據處理單元將數據看作無邊界的數據集合,在數據傳輸的過程中即完成數據轉發、過濾、分析、轉存等操作,相對于將數據按時間間隔存儲后再進行處理的批處理系統,數據流系統具有處理延遲小、計算負荷分配均勻(從而更適合規模擴展)等特點。Apache Storm、Apache Flink 等流處理系統的廣泛應用,已證明了數據流處理系統在數據密集應用中的優越性和實用性。
在泵站監控中,主要包括三種數據流:(1)設備數據流:即設備的工況數據,是典型時間序列數據;(2)事件數據流:即監控系統中常見的報警事件、越限事件等事件數據。有效設計、處理這類數據,形成事件驅動體系,是提升系統智能性的關鍵之一;(3)控制數據流:主要為云中心對監控站的控制信號,也包括中心對現場系統的運維信號,例如,設備升級數據包、遠方提取日志命令等。
針對不同數據流的結構特征差異,平臺使用了混合的數據存儲方案:對于工況數據這類時間序列數據,使用時間序列數據庫存儲,例如,實際采用了TDEngine;事件數據因具有比較明確的結構特征,仍采用關系型數據庫進行存儲,例如,采用了PostgreSQL。
在監控站端使用LF EDGE ekuiper 作為流處理核心。LF EDGE 是Linux 基金會發起的開源組織,旨在建立獨立于硬件、芯片、云或操作系統的一個開放的、可互操作的邊緣計算框架。作為LF EDGE 下孵化的項目,ekuiper 是一款輕量級物聯網邊緣分析、流式處理開源軟件,可以運行在各類資源受限的邊緣設備上。eKuiper 參考了Apache Spark、Apahce Storm 等云端流式處理項目的架構與實現,結合邊緣流式數據處理的特點,采用了編寫基于源(Source),SQL(業務邏輯處理),目標(Sink)的規則引擎來實現邊緣端的流式數據處理。ekuiper 的基本架構如圖3 所示。

圖3 ekuiper 的基本架構
監控站的數采單元通過modbus 協議采集二次供水泵站PLC 和其他傳感器的數據,并將數據轉換為MQTT 數據包,發布到監控站的MQTT broker(使用Mosquitto)。ekuiper 通過向Mosquitto 訂閱這些數據,獲得Mosquitto 轉發的設備數據流。根據規則設定,ekuiper 將一部分需要在云中心即時顯示的數據,直接通過MQTT 轉發布到云中心的MQTT Broker,例如,泵站實時出水壓力、實時功率等;將不需要在云中心實時展示的數據,例如,變頻器頻率、電機三相電流等,存儲在本地,供本地智能應用分析使用。
監控站各單元產生的事件,也通過ekuiper 匯總后發送給云中心的MQTT Broker,構成事件數據流。ekuiper 通過MQTT 接收云中心發送來的控制數據流,并根據規則轉發給特定單元,執行相應控制命令。
基于MQTT 的消息命名機制,實現了對不同數據流的分類標識,具體設計將在第3 節中詳述。云中心接收到監控端的數據流后,流處理引擎根據分流標識對數據進行分流:工況數據流將存入時間序列數據庫;事件數據流將存入關系數據庫。
一方面,監控應用需求不斷擴展,為因應需求變化,新增或更新相應智能應用,應是平臺在生命周期內的常態化要求。另一方面,IT 技術發展迅速,如何將最適合的IT 技術不斷引入平臺內賦能,是不斷拓展平臺功能,提升平臺性能的重要保證。因此,保證單元接口的標準化是平臺設計的重要原則。
通過對技術體系現狀的分析,確定了兩個層面的集成接口機制:
(1) 基 于Restful API 的 服 務 集 成。Restful API 是目前使用最為廣泛的服務集成方式。通過使用Restful API 能夠實現微服務架構,解耦服務間的緊耦合,提升平臺的可擴展性。平臺集成的幾個核心模塊,例如,ekuiper、TDengine 均提供了Restful API,以實現對其功能的調用。在本平臺中,目前主要在兩種場景中使用Restful API 集成。第一種是在集成調用以上核心模塊的功能時。第二種是在作為控制單元的控制臺(包括監控站與云中心的控制臺)對外提供對系統的操作、配置、展示等功能時,均以Restful API 方式提供。通過該方式可實現控制臺核心控制邏輯與使用方式的解耦,例如,當實現控制臺界面時,可以靈活選擇各種不同的前端技術(例如基于瀏覽器的Web UI,或者基于桌面的UI),還可以通過腳本實現對系統的自動化運維。
(2)基于ZeroMQ 的操作系統內進程間通信。當實現同一操作系統內不同智能應用間的交互時,例如,故障診斷應用調用能耗分析應用的計算結果作為其故障診斷的依據時,采用基于ZeroMQ 的操作系統內進程間通信方式可獲得更高的性能。ZeroMQ 是一種基于消息隊列的多線程底層網絡庫,支持線程間,進程間、機器間的消息通訊,提供線程安全的同步接口。
在2.2 節中已提到將數據流根據其性質進行分流是平臺的核心思想。在具體實現上,主要依賴對MQTT 數據包的主題(Topic)設計,以及流處理引擎對主題的解析進而進行對不同數據流的分流。每個MQTT 數據包包括主題與數據體(Payload)兩部分。MQTT 規范除了規定#與*兩個通配符外,并未對主題的設計做出其他規定。因此,利用這種靈活性,設計了如下的主題結構:
應用名稱/設備名稱/數據流類型/…
應用名稱是指整個平臺的名稱,例如,對于二次供水監控平臺,采用拼音首字母命名模式,命名為“eg”。設備名稱指具體一臺二次供水設備的編號,例如,可命名為“d101”,其中101 是設備編號,在前面加入字母是為未來在該應用平臺中加入其他類型設備預留空間。數據流類型即為以上所述的數據流類型,包括設備數據流(d)、事件數據流(e)、控制數據流(c)。
基于以上規則,對于編號為101 的二次供水泵站的MQTT 數據包主題可命名如下:
(1)工況數據,“eg/d101/d”,具體的數據格式在數據體中采用JSON 格式表達。
(2)事件數據,“eg/d101/e/event_name”,其中“event_name”為具體的事件名稱,例如,“online”表示設備上線事件;“offline”表示設備下線事件,跟事件相關的數據包含在數據體中,例如,“eg/d101/e/online”事件的數據體中可包括系統的開機自檢報告。
(3)控制數據,“eg/d101/c/command_name”,其中“command_name”為具體的控制命令,例如,“reboot”表示設備重啟命令,收到“eg/d101/c/reboot”命令后,101 號設備應自啟。
從以上可見,該命名方式具有極大的可擴展空間,能夠勝任平臺未來不斷擴展的需要。流處理引擎在接收到MQTT 消息后,將首先對主題按照命名規則進行分析,分析出數據包的來源設備,數據流的類型,并根據不同類型數據流的特定處理方式對數據進行處理,實現數據流的分流。
目前,該平臺已接入超過100 臺二次供水設備。在實現基礎的遠程監控功能基礎上,進一步支撐了智能應用的快速開發和可靠運行,包括實現了發明專利(CN202010099925.1)《一種邊云協同的供水設備能耗監測與能效評價系統和方法》中的二次供水設備能效評價算法,為有效降低二次供水設備能耗奠定了數據基礎。未來,平臺將進一步加大二次供水設備接入量,并通過核心單元集群配置、云原生部署等方式提升平臺的可靠性和管理效率。
