楊建
(江西工程學院 江西新余 338000)
現階段,MQTT協議主要有兩種類型,一種應用于TCP、IP 網絡,另一種應用于傳感器網絡,能夠提供有序、無損、雙向連接。目前,MQTT協議主要針對M2M、物聯網應用開發,在設計初期充分遵循簡單、輕量原則,減少對網絡帶寬與設備資源的實際需求,確保可靠性、保證性地交付。MQTT協議使用發布、訂閱消息模式,將有效信息分配給一個或多個接收者,在實現同應用程序解耦的過程中,屏蔽負載內容,現已廣泛應用于IoT、Mobile Internet、智能家居、穿戴和辦公等智能硬件、車載自組織網絡和電力能源等行業。
MQTT協議中存在發布者、代理者和訂閱者3種角色,發布者負責向代理者發布消息,代理者負責訂閱者轉發消息,其中,消息發布者、訂閱者均為客戶端,消息代理者為服務端。MQTT協議報文通常由Fixed header、Variable header 與Payload 共同組成,主要消息類型有11 種。MQTT 協議能夠提供質量截然不同的3 種消息傳遞服務。在QOS0 至多一次服務等級中,MQTT 協議消息按照基礎TCP、IP網絡交付,有可能發生丟失或者產生重復。這種消息傳遞服務質量等級可以應用在非重要情況下。在QOS1 至少一次服務等級中,MQTT 協議消息可以確保及時送達,有可能產生重復。在QOS2恰好一次服務等級中,MQTT 協議消息可以確保送達且僅被送達一次[1]。
在MQTT 協議消息系統的功能性需求上,主要針對一般用戶和系統用戶實現。通過對MQTT協議消息系統的設計,IoT物模型管理能夠對設備功能實施數字化定義,云端建構實體數據模型后,描述模型功能,便捷化管理設備。系統可以提供IoT設備狀態管理,通過上線、下線、離線監控,實時獲取設備運行狀態、健康狀態的數據參數,據此可以搭建設備IoT數據平臺,為診斷設備故障、預警夯實基礎。端到端的消息路由可以實現對數據的靈活控制,提高邊緣計算節點的安全性,便于數據在設備、函數應用與IoT Hub間流轉[2]。
在數據、監控狀態上報、匯聚到系統中的規則引擎后,根據預先設定規則轉發消息至主體設備、業務系統、App、數據庫、消息中間件等。系統安全模塊通過在設備連接階段進行設備身份認證,對非法連接請求進行拒絕,確保連接階段設備合法。與此同時,消息系統通過控制訂閱主題進行權限區分,防止因數據泄密、越級訪問產生網絡阻塞等。為保證MQTT協議消息系統的可用性始終處于高水平,系統需要在集群工作管理模式下,對外部提供一致性服務,確保集群工作過程中的消息發布和消息訂閱操作,能夠通過正確處理消息的方式,確保消息在各節點的共享。同時,系統可以通過IoT 設備調用服務、IoT 設備遠程完整性監控等通用信息服務能力,支持IoT中心所接收的實時傳感器數據進行可視化展示、IoT 場景數據采集存儲服務、第三方服務等業務相關應用,在系統信息監控過程中,對主題建模最佳數量、序列與角度、CPU使用率、連接總數、活躍數、最大并發數、客戶端間發布和訂閱信息,監控服務器實時運行狀態。
為保證MQTT 協議消息系統的實用價值,在非功能性需求上,需要在消息遲延時間、程序并發性和系統可擴展性幾方面進行對應約束[3]。MQTT 協議消息系統在傳遞IoT場景中設備消息方面發揮主要作用,傳遞消息的及時性,直接決定IoT設備是否能夠通過相關指令迅速進行對應操作,因而,需要MQTT協議消息系統在獲得消息后,通過端到端的消息路由實時分發消息,系統性能越好。在設計MQTT 協議消息系統過程中,需要側重考慮系統在程序上的并發性,一旦系統所接入的設備數量大規模增長,并發數逐漸增高,需要考慮并發連接支持高并發的連接數,以此保證消息系統的穩定程度。伴隨業務擴展、新需求到來,為便于MQTT協議消息系統在設計過程中有效實施擴展,需要通過集群方式對系統進行部署,開發新功能、提供統一接口。
Netty 是經由JBOSS 應用服務器所提供的Java 網絡開源應用框架,功能強大。Netty提供全新方式開發Web server、客戶端過程,該方式易用性、擴展性顯著增強,在復雜的內部環境基礎上,允許更高的吞吐量。一以貫之,Netty 能夠提供異步通信方式、事件驅動策略的Web 應用程序框架與工具,快速實現Web server、客戶端管理程序的高性能、高可靠性開發。Netty使用傳輸控制協議,為多客戶端提供消息查詢服務,促使該過程相對簡單。現階段,Netty 的核心組件主要包括由java.nio.channels包定義的Channel通道、Callback回調、異步模型Future、可擴展的事件模型和Netty、代碼的主要擴展與定制點Channel Handler[4]。其中,Channel 通道通常代表一個實體的開放性連接,異步模型Future提供在操作完成過程中通知應用程序的方式,其能夠在未來某一時刻完成,提供對結果的訪問,Channel Handler是一個父接口,本身并未提供過多方法。
現階段,MQTT協議消息系統是IoT重要的傳輸協議,應用廣泛,負責進行設備接入和采集數據,需要根據業務實際需求將獲取的數據存儲至Mysql 數據庫、統一上報到相關業務系統等。在搭建MQTT協議消息系統過程中主要以集群方式實現,通過集群的方式可以解決基于MQTT協議的Rabbit MQ消息收發,實現遠程收發、AMQP與MQTT間的收發。單個節點能夠提供完整服務,可以支撐MQTT協議消息的傳遞,多節點協調對外提供一致性服務。
每單個節點的MQTT 代理服務端在搭建過程中,使用Netty 技術建構Boss Event Loop 線程模型,用以處理客戶端連接請求,建構Work Event Loop線程模型,用以處理客戶端的讀寫請求。消息入站后進行字節解碼,消息出站前將MQTT 客戶端對象編譯成為字節[5]。Engine x 是高性能Web 服務器,其作為云上數據庫的存儲引擎,可以通過數據流代理Stream 模塊實現協議的代理轉發tcp報文,主要用于數據流代理和負載均衡等。分布式應用程序協調服務軟件zookeeper 作為服務集群,MQTT 協議消息系統的總體設計主要由傳感器、車載裝置、智慧工廠等物模型與各個主題模塊、設備運行狀態管理模塊、消息路由模塊、安全模塊(SE)、集群模塊、監控模塊組成。
服務器終端能夠在消息系統中接受、發送、解析和計算IoT設備控制命令與數據指令,全監控IoT設備與數據存儲設備信息,主要在下述幾個模塊進行。
模塊一,傳感器、車載裝置、智慧工廠等物模型與各個主題模塊。消息系統通過TSL 語言描述物模型,采取JSON格式對物模型數據進行類似格式上報,在主題實現過程中將IoT 設備主題劃分為上行、下行兩類數據。
模塊二,設備運行狀態管理模塊。該模塊是后續消息發布和訂閱操作實現的基礎,需要在連接設備時通過發送報文連接消息系統。消息系統通過定義Session信息實現持久和非持久會話的存儲[6]。一旦設備成功與消息系統建立連接,通過維護TCP長連接,檢測設備運行狀態,及時將不夠活躍的連接斷開,充分將服務器內存資源釋放出來。基于MQTT 協議規范,始終保持連接的設備需要主動向服務節點定時傳送PING 包REQ心跳請求,系統接收請求后及時進行將心跳響應(PINGRESP)返回至客戶端,進而保持設備連接狀態,成功測試網絡處于通順狀態。為實現設備心跳,需要使用Netty 技術實現連接讀寫事件、空閑事件檢查,根據設備心跳時間觸發Netty核心組件,找到設備對應連接并予以關閉,釋放服務器資源。
模塊三,消息路由模塊。在實現即時消息推送過程中,通過成功連接設備端、系統,首先發送報文請求,進行協議處理,提取消息發布主題、消息傳遞服務質量級別、消息內容,進行封裝后調用發布權限,驗證設備是否具備。在消息推送前需要進行主題訂閱匹配查詢,找出各層級設備訂閱列表和訂閱該主題消息的設備,寫入消息到設備對應消息隊列后,將消息推送給指定客戶端。在實現離線消息推送過程中,需要結合MQTT協議規范存儲設備離線消息。在消息推送后進行主題訂閱匹配查詢,通過設備狀態查詢模塊查找設備在線情況,便于系統通過離線數據寫入模塊,將設備離線消息寫入數據庫[7]。一旦系統初始化開始,為保證離線消息毫秒級查詢,需要在處理邏輯中調用generate Row Key 生成消息的行鍵。最終,將消息存儲在列表里批量提交,通過后臺程序及時刪除推送過的消息。在橋接數據到Kafka開源流處理平臺時,需要根據預設規則將主題消息篩選,通過寫入對應主題,促使其他業務訂閱平臺主題消費消息。
模塊四,安全模塊(SE)。使用“一型號一密鑰”的方式驗證設備注冊情況,使用一機器一密鑰的方式驗證設備認證情況。設備連接時,通過提取clientid、username和password這3個參數解析連接報文,向設備身份驗證服務發送請求。使用標識、IP、用戶名、密碼和主題為參數發起請求權限,處理ACL HTTP請求。
模塊五,集群模塊。集群通常由節點構成,節點不規則分布于JVM、物理機之上,一旦集群系統被觸發,節點將自動加入集群,節點狀態變化通過gossip 通信協議傳輸至集群每個節點,確保集群狀態一致性[8]。因MQTT 協議消息系統通過去中心化集群方式搭建,存在多個服務節點,IoT 設備發布消息后,節點需要精準轉發消息,確保消息路由推送消息的正確與完整程度。
模塊六,監控模塊。該模塊通過變量方式存儲各節點設備連接數、主題訂閱數、客戶端等信息,在數據庫中,一旦需要查看相應信息,可以依靠腳本及時獲取信息并定時更新。
在通信過程,需要設備和服務器間的通信協議、服務器與客戶端間利用AMQPAMQP傳輸的通信協議、服務器與移動通信終端間利用MQTT協議傳輸的通信協議[9]。實現MQTT協議消息系統代理服務器工作,需要科學管理已經訂閱的消息,通過多終端轉發消息,查詢存儲信息,同時,通過設置用戶權限實現“1&X”服務關系。在多終端服務端口,用戶可以通過ID查詢訂閱成功的MQTT 主題,通過多終端對設備實施遠程控制。
Netty 的MQTT 協議消息系統的設計與實現,使用客戶端工具進行消息系統的功能測試。在測試過程中,客戶端工具通過創建多樣化連接設置的客戶端滿足測試需求,選用MQTT Box 作為主測試工具,驗證MQTT協議消息系統是否滿足系統功能需求。在功能測試過程中,主要對IoT 設備身份認證ID2、發布消息、消息訂閱主題(Subscribe)、接收推送消息、自定義心跳包結構體、消息重投功能(republish)進行[10]。
針對發布消息、消息訂閱主題、接收推送消息、自定義心跳包結構體、消息重投功能的測試通常由用例編號、用例名稱、用例標題、測試目的、測試步驟(預置條件、測試步驟)、預期結果、測試結果等共同組成。選用MQTT.fx,使用Java 語言編寫的客戶端工具,針對MQTT 客戶端進行Topic 訂閱和消息發布,同時啟動客戶端A 和客戶端B,A 用于消息發布,B 用于Topic 訂閱,測試MQTT 協議消息系統是否支持QOS2 恰好一次服務等級質量[11]。
在單節點性能測試上,可以通過兩臺機器執行程序模擬,多次執行命令設置連接數,通過Web監控頁面連接成功率,5 000 連接數的CPU 占用率為10.2%,10 000連接數的CPU占用率為18.5%,20 000連接數的CPU 占用率為28.3%,30 000 連接數的CPU 占用率為41.6%,38 000 連接數的CPU 占用率為55.1%,結果均為成功。可見連接數在38 000內,設備連接較為穩定。在集群性能測試過程中,通過Jmeter 測試軟件模擬設置并發連接數及相關參數、點擊執行,生成測試報告。MQTT協議消息系統在集群環境下可以處理更多設備連接數,節點集群處理100 000 連接非常穩定,CPU 負載較低,消息時延在1 s內,滿足對系統需求。
綜上所述,Netty 提供API 接口從網絡處理代碼中解耦核心業務邏輯與輔助功能,完全基于Java NIO(無阻塞的輸入/輸出)中的Buffer 實現內部可擴展性解決方案,在NIO通道進行交互的過程中,將數據移進移出通道,架構編程服務器與客戶端框架結構,實現服務端、客戶端要求的功能。以Netty 技術為支撐的MQTT協議消息系統屬于IoT服務運營平臺組成部分之一,與IoT設備管理子系統同處于IoT服務運營平臺接入層底部,作為平臺重要的基礎,MQTT協議消息系統可以為平臺上層應用提供強大的數據支撐,與接入層的規則引擎、IoT 應用微服務、數據存儲管理系統通過HTTP API 網關、RPC 服務器等實現服務間的協同。測試結果表明,MQTT協議消息系統能夠滿足預期設計目標,可利用在多種物聯網系統中。