












摘" 要:為了更好地保障省級電力物聯(lián)網(wǎng)終端設(shè)備的批量化接入,提升各類設(shè)備MQTT長連接通信的穩(wěn)定性和數(shù)據(jù)處理的時效性,文章對現(xiàn)有設(shè)備連接接入和數(shù)據(jù)處理方式進(jìn)行了優(yōu)化改進(jìn)。首先,基于無鎖并發(fā)實現(xiàn)了高性能的數(shù)據(jù)處理,提升了執(zhí)行程序的健壯性,從而避免了數(shù)據(jù)阻塞導(dǎo)致MQTT連接超時中斷問題。其次,將應(yīng)用間分層處理改進(jìn)為應(yīng)用內(nèi)部線程解耦,利用事件源驅(qū)動機(jī)制實現(xiàn)并行處理,從而降低了系統(tǒng)復(fù)雜度,減少了層級依賴,保證了設(shè)備連接接入功能的輕量級、低功耗和高可靠性。最后,通過對不同MQTT消息報文大小進(jìn)行分類測試,驗證了改進(jìn)后的程序性能具備高吞吐、低延時的特點,大幅降低了鏈路通信的延遲和數(shù)據(jù)阻塞,整體效率較現(xiàn)有方案提升了3倍以上,具有一定的工程應(yīng)用性。
關(guān)鍵詞:無鎖并發(fā);MQTT長連接;數(shù)據(jù)處理;配電物聯(lián)網(wǎng);事件驅(qū)動
中圖分類號:TN929.5" " 文獻(xiàn)標(biāo)識碼:A" 文章編號:2096-4706(2025)01-0193-06
Optimized IoT MQTT Long Connection Communication and Data Processing Improvement Based on Lock-free Concurrency
Abstract: In order to better guarantee the batch access of provincial electric power Internet of Things terminal devices and improve the stability of MQTT long connection communication and the timeliness of data processing for various types of devices, the paper optimizes and improves the existing methods for device connection access and data processing. Firstly, high-performance data processing is realized based on lock-free concurrency, which improves the robustness of the execution program, thus avoiding the problem of MQTT connection timeout interruption caused by data blocking. Secondly, the hierarchical processing between applications is improved to intra-application thread decoupling, and the parallel processing is realized by using the event source driving mechanism, thus reducing the system complexity, decreasing the hierarchical dependency, and ensuring the lightweight, low power consumption, and high-reliability features of the device connection access function. Finally, through the classification test of different MQTT message sizes, it is verified that the improved program performance has the characteristics of high throughput and low latency. It greatly reduces the delay and data blocking of link communication, and the overall efficiency is improved by more than 3 times compared with the existing scheme, which has certain engineering applicability.
Keywords: lock-free concurrency; MQTT long connection; data processing; distribution power Internet of Things; event-driven
0" 引" 言
隨著國家電網(wǎng)有限公司提出的泛在電力物聯(lián)網(wǎng)建設(shè)和落地實踐,電力物聯(lián)終端的功能需求的不斷發(fā)展,消息隊列遙測傳輸協(xié)議(Message Queuing Telemetry Transport Protocol, MQTT)也廣泛應(yīng)用到電力物聯(lián)通信之中[1-2]。其次配電智慧物聯(lián)體系中的“運管邊端”四層架構(gòu)也主要基于MQTT通信協(xié)議,實現(xiàn)各類業(yè)務(wù)智能終端的統(tǒng)一接入和統(tǒng)一管控,實現(xiàn)邊緣智能[3]。以江蘇省級統(tǒng)一物聯(lián)管理平臺為例,在其規(guī)模化應(yīng)用中,開展了輸配電數(shù)字化提升,推動了輸電、配電終端規(guī)模化接入,開展用電、變電、安監(jiān)等各類場景試點。目前累計接入邊設(shè)備30萬臺,端設(shè)備300萬臺,日均處理130億條、9 TB采集數(shù)據(jù)。為了更高效促進(jìn)全場景感知資源,實現(xiàn)數(shù)據(jù)源端共享,支撐能源互聯(lián)網(wǎng)實現(xiàn)全環(huán)節(jié)實時感知與交互,各類終端設(shè)備的并發(fā)接入、異端通信和海量數(shù)據(jù)處理的時效性成了關(guān)鍵問題。
目前,行業(yè)內(nèi)針對電力物聯(lián)設(shè)備接入、通信連接以及數(shù)據(jù)處理的方式大同小異,主要通過連接單元和終端建立MQTT長連接,數(shù)據(jù)處理通過服務(wù)異步轉(zhuǎn)發(fā)實現(xiàn)。雖然采用微服務(wù)化設(shè)計,支持水平擴(kuò)展,但省級各地市終端設(shè)備接入量日益增多、報文格式的變化多樣以及網(wǎng)絡(luò)通信不穩(wěn)定等問題,當(dāng)前實現(xiàn)方式不足以保障系統(tǒng)平臺對設(shè)備持續(xù)接入過程的可靠支撐,而且現(xiàn)有的擴(kuò)展方式極大地增加了開發(fā)和運維成本。
為了解決現(xiàn)存問題,本文針對MQTT長連接通信和數(shù)據(jù)處理兩方面進(jìn)行了改進(jìn)優(yōu)化。主要基于無鎖并發(fā)的方式解決了數(shù)據(jù)傳輸?shù)难舆t和阻塞;利用內(nèi)存預(yù)分配機(jī)制保證了高效的數(shù)據(jù)處理;通過事件源驅(qū)動模式解耦線程,提升了并行處理能力。因此,改進(jìn)后的連接單元在長連接維持與數(shù)據(jù)處理方面有了顯著改善,能夠更好地保障了平臺接入能力的穩(wěn)定性和可靠性,確保了平臺具備海量數(shù)據(jù)處理的時效性和并發(fā)性。可進(jìn)一步為傳統(tǒng)電網(wǎng)業(yè)務(wù)賦能,提升電網(wǎng)數(shù)據(jù)通信和感知的互動能力,提升運行質(zhì)效和社會綜合能效。
1" MQTT通信連接及過程特點
在當(dāng)前智慧配電物管平臺的系統(tǒng)架構(gòu)中,采用預(yù)分配和動態(tài)負(fù)載機(jī)制實現(xiàn)海量MQTT長連接和數(shù)據(jù)轉(zhuǎn)發(fā),主要分為三層結(jié)構(gòu)實現(xiàn),如圖1所示。連接層和轉(zhuǎn)發(fā)層是實現(xiàn)海量設(shè)備接入和數(shù)據(jù)轉(zhuǎn)發(fā)的兩個核心層,其中連接層主要由MQTT連接單元組成,實現(xiàn)長連接接入管理;轉(zhuǎn)發(fā)層主要由轉(zhuǎn)發(fā)微服務(wù)實例構(gòu)成,實現(xiàn)對接收到的MQTT報文數(shù)據(jù)進(jìn)行處理轉(zhuǎn)發(fā)。
雖然每層中組件都是無狀態(tài),支持動態(tài)水平擴(kuò)展和自動感知負(fù)載,但隨著各類終端設(shè)備連接接入日益增多,系統(tǒng)平臺對連接單元的海量數(shù)據(jù)處理和MQTT數(shù)據(jù)轉(zhuǎn)發(fā)的能力要求也隨之提升,現(xiàn)有的實現(xiàn)方式也逐漸出現(xiàn)穩(wěn)定性和并發(fā)性兩方面的瓶頸。現(xiàn)在主要是通過連接單元建立長連接和終端設(shè)備進(jìn)行數(shù)據(jù)通信傳輸,使用數(shù)據(jù)轉(zhuǎn)發(fā)微服務(wù)實例對MQTT消息進(jìn)行處理轉(zhuǎn)發(fā)至業(yè)務(wù)中間件,為應(yīng)用之間數(shù)據(jù)解耦。結(jié)合上述問題,本文基于無鎖并行對現(xiàn)有長連接數(shù)據(jù)接入和處理轉(zhuǎn)發(fā)功能進(jìn)行改進(jìn),消除分層處理的效率延遲,通過實現(xiàn)程序內(nèi)部線程解耦,提升海量連接和數(shù)據(jù)處理轉(zhuǎn)發(fā)的性能。
1.1" MQTT連接通信
由于電力物聯(lián)網(wǎng)中的各類終端設(shè)備之間的通信環(huán)境往往受限,為了保證設(shè)備之間異端通信的可靠性和時效性,通常采用消息隊列遙測傳輸協(xié)議MQTT作為通信的基礎(chǔ)協(xié)議。MQTT協(xié)議是基于主題的消息生產(chǎn)和訂閱消費模式[4],生產(chǎn)者和訂閱者之間通過代理中間服務(wù)器進(jìn)行解耦通信。電力物聯(lián)網(wǎng)中各類設(shè)備通過不同網(wǎng)絡(luò)與MQTT服務(wù)器建立連接并進(jìn)行通信交互,通信模型如圖2所示。主要組成包括:1)MQTT服務(wù)器負(fù)責(zé)管理消息的代理服務(wù)器;2)消息生產(chǎn)者向MQTT服務(wù)器發(fā)送消息數(shù)據(jù);3)消息訂閱者通過MQTT服務(wù)器接收主題消費生產(chǎn)者推送的消息。
MQTT協(xié)議之所以能夠為設(shè)備通信提供穩(wěn)定可靠的通信交互,是因為MQTT基于TCP/IP之上實現(xiàn),具備TCP的持久連接性,在數(shù)據(jù)傳輸完成后也能保持連接狀態(tài),所以在不穩(wěn)定的網(wǎng)絡(luò)環(huán)境下也無須重建連接,能夠快速響應(yīng)通信狀態(tài)。MQTT通信是在客戶端與代理服務(wù)器之間建立連接,通常客戶端向代理服務(wù)器發(fā)送啟動連接指令,代理服務(wù)器收到后做出狀態(tài)響應(yīng)建立連接。兩者之間成功建立連接后,只要客戶端不主動發(fā)送斷開指令,它們就會一直保持長連接狀態(tài)。其次基于TCP確認(rèn)應(yīng)答和重傳機(jī)制[5],MQTT通過進(jìn)行報文控制和劃分服務(wù)質(zhì)量(QoS)等級[6]來明確消息傳送次數(shù)、是否丟失以及重復(fù)發(fā)送等問題,從而確保消息傳輸?shù)目煽啃裕鐖D3所示。QoS等級從低到高不但表示消息可靠性的提高,也表示數(shù)據(jù)傳輸復(fù)雜度的提升,越大QoS對于收發(fā)系統(tǒng)消耗也越大,因此在應(yīng)用中會根據(jù)實際的通信網(wǎng)絡(luò)情況和消息的重要程度對不同MQTT消息單獨設(shè)置QoS值。
1.2" 無鎖并發(fā)處理
傳統(tǒng)程序設(shè)計中,為了提升代碼的邏輯處理性能和響應(yīng)速度,一般通過使用線程池進(jìn)行多線程處理。而線程池本質(zhì)是利用阻塞隊列,底層會使用鎖機(jī)制,因此隊列中會出現(xiàn)線程阻塞,從而吞吐量受限。Disruptor作為一種高性能異步處理模型[7],能夠?qū)崿F(xiàn)線程間無鎖并發(fā)。Disruptor基于無鎖機(jī)制,底層通過CAS和自旋實現(xiàn)[8],結(jié)合事件驅(qū)動模式,高效實現(xiàn)隊列并發(fā)操作。Disruptor數(shù)據(jù)緩存是通過環(huán)形數(shù)組隊列RingBuffer實現(xiàn),通過單線程寫和內(nèi)存屏障阻止指令重排的方式實現(xiàn)無鎖[9]。RingBuffer主要用于不同線程之間數(shù)據(jù)傳遞的緩存,通過序號指向數(shù)組環(huán)中的下一個元素,如圖4所示。
數(shù)組環(huán)通過預(yù)加載方式,初始化內(nèi)存中一段連續(xù)的緩存,通過預(yù)分配緩存對象保證處理速度更快。數(shù)組環(huán)中的數(shù)據(jù)只會被覆蓋不會被清除,這樣降低了垃圾回收機(jī)制啟動的頻率,從而保證執(zhí)行效率,解決了數(shù)據(jù)延遲問題。Disruptor核心是Sequence序列,一方面通過順序遞增的序號來編號,管理進(jìn)行交換的事件數(shù)據(jù),對于事件數(shù)據(jù)的處理會根據(jù)序號逐個遞增,從而保證了多線程并發(fā)處理的原子性;另一方面可以防止不同Sequence之間緩存?zhèn)喂蚕韀10]。當(dāng)多線程中互相獨立的變量分配到同一個緩存行中時,若線程修改變量就會影響彼此性能,Disruptor通過填充緩存行來消除這種偽共享問題。如圖5所示,緩存行大小為64個字節(jié),通過一個Long型序號占8個字節(jié),在序號前后均插入7個Long型值,這樣確保每個序號都獨占一個緩存行。
2" 改進(jìn)后的MQTT連接處理模型
在智慧配電物聯(lián)管理平臺現(xiàn)有的MQTT連接單元中,終端與MQTT服務(wù)器建立連接后,將不同數(shù)據(jù)全部發(fā)送到一個主題中,再由數(shù)據(jù)轉(zhuǎn)發(fā)微服務(wù)進(jìn)行訂閱監(jiān)聽主題中的數(shù)據(jù),通過復(fù)雜的邏輯判斷和業(yè)務(wù)處理后在轉(zhuǎn)發(fā)至業(yè)務(wù)Kafka中,如圖6所示。
隨著設(shè)備類型的日益增多,業(yè)務(wù)復(fù)雜性也凸顯煩瑣,數(shù)據(jù)處理性能出現(xiàn)瓶頸,從而導(dǎo)致長連接通道阻塞和終端超時斷連。現(xiàn)有方式只能通過擴(kuò)展集群規(guī)模來相對維持正常業(yè)務(wù)處理,但又帶來了較高的運維成本。因此,本文基于無鎖并發(fā)模型對現(xiàn)有的MQTT連接單元和數(shù)據(jù)轉(zhuǎn)發(fā)處理過程的改進(jìn),如圖7所示。
利用Disruptor異步處理模型實現(xiàn)應(yīng)用內(nèi)部線程解耦,消除應(yīng)用間分層異步,減少應(yīng)用間分層數(shù)據(jù)處理,高效提升程序并發(fā)處理性能。當(dāng)各類終端設(shè)備通過網(wǎng)絡(luò)和MQTT服務(wù)器建立長連接后,通過不同業(yè)務(wù)主題進(jìn)行數(shù)據(jù)交互,部分類型如表1所示。不同業(yè)務(wù)主題會單獨對應(yīng)一個Disruptor隊列進(jìn)行處理,而Disruptor內(nèi)部通過多生產(chǎn)多消費模式進(jìn)行線程異步解耦,直接將原始數(shù)據(jù)解析處理后轉(zhuǎn)發(fā)至應(yīng)用Kaka,極大提升了處理性能,避免了連接阻塞和延遲。
無鎖并發(fā)的改造是將業(yè)務(wù)邏輯處理在內(nèi)存中進(jìn)行,通過事件源驅(qū)動的方式實現(xiàn),核心基于Disruptor生產(chǎn)消費模型構(gòu)建,主要包括事件消息載體對象、事件監(jiān)聽消費對象、數(shù)據(jù)存儲容器和消息生產(chǎn)對象。其中事件消息載體對象就是傳輸?shù)氖录ⅲ枰ㄟ^事件工廠方法進(jìn)行創(chuàng)建;事件監(jiān)聽消費對象是對容器中的事件對象進(jìn)行消費處理,充當(dāng)數(shù)據(jù)的消費者實現(xiàn)主要的業(yè)務(wù)邏輯;數(shù)據(jù)存儲容器是一個環(huán)形數(shù)組容器,用于事件消息的存儲;消息生產(chǎn)對象就是事件消息的生產(chǎn)者,將消息發(fā)布傳輸至存儲容器中供消費者進(jìn)行監(jiān)聽消費。Disruptor對象實例化及模型構(gòu)建的流程如圖8所示。
其核心代碼如下:
// 實例化模型對象
Disruptorlt;IotMsgDatagt; disruptor = new Disruptorlt;IotMsgDatagt;(
iotMsgDataFactory, // 事件對象工廠
ringBufferSize, // 有界環(huán)形數(shù)組長度
executor, // 線程池
ProducerType.SINGLE, // 生產(chǎn)者類型
new BlockingWaitStrategy()); // 等待策略
// 添加事件消費者
disruptor.handleEventsWith(new IotMsgDataHandler());
// 啟動Disruptor
disruptor.start();
// 獲取RingBuffer緩存容器
RingBufferlt;IotMsgDatagt; ringBuffer = disruptor.getRingBuffer();
// 實例化生產(chǎn)者
IotMsgDataProducer producer = new IotMsgDataProducer(ringBuffer);
// 發(fā)送數(shù)據(jù)至緩存容器
producer.sendData(data);
實現(xiàn)消息生產(chǎn)方法的創(chuàng)建首先需要獲取實際存儲數(shù)據(jù)的容器,從RingBuffer里面獲取一個可用序號,根據(jù)序號得到數(shù)據(jù)元素,從而進(jìn)行業(yè)務(wù)邏輯操作,最終提交發(fā)布該事件至數(shù)組容器中。生產(chǎn)者往環(huán)形數(shù)組容器內(nèi)發(fā)布數(shù)據(jù)的核心代碼如下:
public void sendData(IotMsgData data) {
// 1 從環(huán)形數(shù)組中獲取可用序號
long sequence = ringBuffer.next();
try {
// 2 獲取事件對象
IotMsgData event = ringBuffer.get(sequence);
// 3 對象賦值
event.setValue(IotMsgData.getLong(0));
} finally {
// 4 發(fā)布投遞
ringBuffer.publish(sequence);
}
}
3" 實驗結(jié)果分析
為了驗證基于無鎖并發(fā)改進(jìn)的MQTT連接單元的處理性能和實時性,在服務(wù)器上搭建了壓測環(huán)境,其中包括改進(jìn)后的單實例程序,MQTT Broker作為MQTT消息服務(wù)器端,模擬網(wǎng)關(guān)設(shè)備發(fā)送模擬數(shù)據(jù),且數(shù)據(jù)報文遵循電力物聯(lián)網(wǎng)云邊交互規(guī)范,序列化之后的數(shù)據(jù)格式如圖9所示。
模擬數(shù)據(jù)按照MQTT協(xié)議服務(wù)質(zhì)量Qos0進(jìn)行構(gòu)造,每條報文消息分別按照1 KB、3 KB、5 KB、7 KB和10 KB大小分類模擬,模擬數(shù)據(jù)量均300萬條,在不同報文大小的情況下進(jìn)行3輪測試結(jié)果如表2所示。可以看出,改造前的程序處理效率會受到報文大小的影響,報文數(shù)據(jù)量越大效率越低,而改造后的處理效率受報文大小影響較小,整體性能比之前提升了3倍以上。
為了進(jìn)一步優(yōu)化改造后的程序性能,在Disruptor對象構(gòu)建時,分別選用阻塞等待策略和自旋等待策略進(jìn)行對比分析。由于無鎖并發(fā)的實現(xiàn)方式是基于內(nèi)存,所以給程序分配了10 GB內(nèi)存和12核CPU的資源進(jìn)行測試對比,結(jié)果分別如圖10和圖11所示。觀察發(fā)現(xiàn),阻塞等待策略對CPU資源的占用基本穩(wěn)定,CPU和內(nèi)存占用受到數(shù)據(jù)量大小影響較小,相對比較低耗;而使用自旋等待策略時,在程序啟動運行后CPU占用率明顯變高,因為底層采用自旋和yield機(jī)制,保證了消費線程優(yōu)先占有CPU資源進(jìn)行計算,而且為了保證線程處理的高吞吐量和低延遲,對內(nèi)存的占用也相對較高。因此,正常的業(yè)務(wù)中選用阻塞等待策略構(gòu)建無鎖并發(fā)模型進(jìn)行數(shù)據(jù)處理,完全能夠保證程序的高并發(fā)高效率處理能力;而在部分極低延遲的業(yè)務(wù)場景下可采用自旋等待策略,并且在實例化Disruptor對象時事件處理線程數(shù)需要設(shè)置小于CPU核數(shù),RingBuffer環(huán)形數(shù)組長度根據(jù)消息報文大小和機(jī)器資源進(jìn)行合理分配,避免出現(xiàn)數(shù)組長度較大,導(dǎo)致每批次訂閱的數(shù)據(jù)一直存儲在單位數(shù)組空間內(nèi),大量對象不能夠及時被回收,容易出現(xiàn)內(nèi)存溢出。
4" 結(jié)" 論
本文針對現(xiàn)有物聯(lián)管理平臺中MQTT長連接通信性能和數(shù)據(jù)處理并發(fā)性問題,提出一種基于無鎖并發(fā)的方式對傳統(tǒng)連接單元進(jìn)行改進(jìn)優(yōu)化。通過消除應(yīng)用分層通信的延遲,設(shè)計應(yīng)用內(nèi)部線程解耦,基于事件源驅(qū)動和生產(chǎn)消費模型的思想構(gòu)建了高性能的連接單元,提升了海量數(shù)據(jù)的并發(fā)處理能力,解決了各類終端設(shè)備MQTT通信延遲和消息阻塞等問題,保證了MQTT長連接的穩(wěn)定性和數(shù)據(jù)傳輸?shù)臅r效性。雖然對連接單元的設(shè)備接入和數(shù)據(jù)處理能力方面進(jìn)行了改進(jìn),但是無鎖并發(fā)是基于內(nèi)存實現(xiàn),改進(jìn)的程序依賴運行環(huán)境資源,當(dāng)選擇極低延遲的等待策略時CPU占用率較高,所以需要進(jìn)一步對底層線程自旋等待的邏輯進(jìn)行優(yōu)化,能夠使其根據(jù)宿主機(jī)資源自適應(yīng)調(diào)整,這也是下一步待完善的地方。
參考文獻(xiàn):
[1] 高焜,劉澤輝,高偉,等.一種基于區(qū)塊鏈的MQTT協(xié)議優(yōu)化算法 [J].電力信息與通信技術(shù),2024,22(5):10-16.
[2] 劉海濤,祁升龍,蘆翔,等.基于MQTT的配電終端多核處理器核間通信設(shè)計 [J].電子器件,2022,45(5):1207-1213.
[3] 劉亮,李卉.邊緣計算網(wǎng)關(guān)的功能設(shè)計與系統(tǒng)實現(xiàn) [J].電測與儀表,2021,58(8):42-48.
[4] 邵瑞雪,田秀霞.智能電網(wǎng)中基于MQTT協(xié)議的ABAC訪問控制方案 [J].計算機(jī)應(yīng)用研究,2022,39(11):3436-3443.
[5] 趙菁.TCP/IP協(xié)議棧的安全性分析 [J].網(wǎng)絡(luò)安全技術(shù)與應(yīng)用,2024(4):12-14.
[6] 熊風(fēng)光,陳霖,韓慧妍,等.基于MQTT協(xié)議的輕量化文本信息分發(fā)技術(shù)研究 [J].計算機(jī)技術(shù)與發(fā)展,2024,34(2):90-97.
[7] 王梓,梁正和,吳瑩瑩.基于Kafka、Disruptor技術(shù)對傳統(tǒng)ETL的改進(jìn) [J].計算機(jī)技術(shù)與發(fā)展,2018,28(11):26-29.
[8] 周沭玲,金楠,侯海平.基于WPF消息機(jī)制的多UI線程并發(fā)阻塞問題解決方案 [J].通化師范學(xué)院學(xué)報,2021,42(4):90-96.
[9] 李浩,張曉強.基于單線程的無錨點目標(biāo)檢測模型 [J].計算機(jī)工程,2021,47(5):229-235+243.
[10] 燕保躍,姜博.一種面向持久化內(nèi)存的熱點變量快速寫入算法 [J].小型微型計算機(jī)系統(tǒng),2022,43(11):2442-2448.