999精品在线视频,手机成人午夜在线视频,久久不卡国产精品无码,中日无码在线观看,成人av手机在线观看,日韩精品亚洲一区中文字幕,亚洲av无码人妻,四虎国产在线观看 ?

基于無鎖并發優化物聯MQTT長連接通信及數據處理改進

2025-02-26 00:00:00王立旭何鳴一呂非宗偉康陳凱旋
現代信息科技 2025年1期
關鍵詞:數據處理

摘" 要:為了更好地保障省級電力物聯網終端設備的批量化接入,提升各類設備MQTT長連接通信的穩定性和數據處理的時效性,文章對現有設備連接接入和數據處理方式進行了優化改進。首先,基于無鎖并發實現了高性能的數據處理,提升了執行程序的健壯性,從而避免了數據阻塞導致MQTT連接超時中斷問題。其次,將應用間分層處理改進為應用內部線程解耦,利用事件源驅動機制實現并行處理,從而降低了系統復雜度,減少了層級依賴,保證了設備連接接入功能的輕量級、低功耗和高可靠性。最后,通過對不同MQTT消息報文大小進行分類測試,驗證了改進后的程序性能具備高吞吐、低延時的特點,大幅降低了鏈路通信的延遲和數據阻塞,整體效率較現有方案提升了3倍以上,具有一定的工程應用性。

關鍵詞:無鎖并發;MQTT長連接;數據處理;配電物聯網;事件驅動

中圖分類號:TN929.5" " 文獻標識碼:A" 文章編號:2096-4706(2025)01-0193-06

Optimized IoT MQTT Long Connection Communication and Data Processing Improvement Based on Lock-free Concurrency

Abstract: In order to better guarantee the batch access of provincial electric power Internet of Things terminal devices and improve the stability of MQTT long connection communication and the timeliness of data processing for various types of devices, the paper optimizes and improves the existing methods for device connection access and data processing. Firstly, high-performance data processing is realized based on lock-free concurrency, which improves the robustness of the execution program, thus avoiding the problem of MQTT connection timeout interruption caused by data blocking. Secondly, the hierarchical processing between applications is improved to intra-application thread decoupling, and the parallel processing is realized by using the event source driving mechanism, thus reducing the system complexity, decreasing the hierarchical dependency, and ensuring the lightweight, low power consumption, and high-reliability features of the device connection access function. Finally, through the classification test of different MQTT message sizes, it is verified that the improved program performance has the characteristics of high throughput and low latency. It greatly reduces the delay and data blocking of link communication, and the overall efficiency is improved by more than 3 times compared with the existing scheme, which has certain engineering applicability.

Keywords: lock-free concurrency; MQTT long connection; data processing; distribution power Internet of Things; event-driven

0" 引" 言

隨著國家電網有限公司提出的泛在電力物聯網建設和落地實踐,電力物聯終端的功能需求的不斷發展,消息隊列遙測傳輸協議(Message Queuing Telemetry Transport Protocol, MQTT)也廣泛應用到電力物聯通信之中[1-2]。其次配電智慧物聯體系中的“運管邊端”四層架構也主要基于MQTT通信協議,實現各類業務智能終端的統一接入和統一管控,實現邊緣智能[3]。以江蘇省級統一物聯管理平臺為例,在其規模化應用中,開展了輸配電數字化提升,推動了輸電、配電終端規模化接入,開展用電、變電、安監等各類場景試點。目前累計接入邊設備30萬臺,端設備300萬臺,日均處理130億條、9 TB采集數據。為了更高效促進全場景感知資源,實現數據源端共享,支撐能源互聯網實現全環節實時感知與交互,各類終端設備的并發接入、異端通信和海量數據處理的時效性成了關鍵問題。

目前,行業內針對電力物聯設備接入、通信連接以及數據處理的方式大同小異,主要通過連接單元和終端建立MQTT長連接,數據處理通過服務異步轉發實現。雖然采用微服務化設計,支持水平擴展,但省級各地市終端設備接入量日益增多、報文格式的變化多樣以及網絡通信不穩定等問題,當前實現方式不足以保障系統平臺對設備持續接入過程的可靠支撐,而且現有的擴展方式極大地增加了開發和運維成本。

為了解決現存問題,本文針對MQTT長連接通信和數據處理兩方面進行了改進優化。主要基于無鎖并發的方式解決了數據傳輸的延遲和阻塞;利用內存預分配機制保證了高效的數據處理;通過事件源驅動模式解耦線程,提升了并行處理能力。因此,改進后的連接單元在長連接維持與數據處理方面有了顯著改善,能夠更好地保障了平臺接入能力的穩定性和可靠性,確保了平臺具備海量數據處理的時效性和并發性。可進一步為傳統電網業務賦能,提升電網數據通信和感知的互動能力,提升運行質效和社會綜合能效。

1" MQTT通信連接及過程特點

在當前智慧配電物管平臺的系統架構中,采用預分配和動態負載機制實現海量MQTT長連接和數據轉發,主要分為三層結構實現,如圖1所示。連接層和轉發層是實現海量設備接入和數據轉發的兩個核心層,其中連接層主要由MQTT連接單元組成,實現長連接接入管理;轉發層主要由轉發微服務實例構成,實現對接收到的MQTT報文數據進行處理轉發。

雖然每層中組件都是無狀態,支持動態水平擴展和自動感知負載,但隨著各類終端設備連接接入日益增多,系統平臺對連接單元的海量數據處理和MQTT數據轉發的能力要求也隨之提升,現有的實現方式也逐漸出現穩定性和并發性兩方面的瓶頸。現在主要是通過連接單元建立長連接和終端設備進行數據通信傳輸,使用數據轉發微服務實例對MQTT消息進行處理轉發至業務中間件,為應用之間數據解耦。結合上述問題,本文基于無鎖并行對現有長連接數據接入和處理轉發功能進行改進,消除分層處理的效率延遲,通過實現程序內部線程解耦,提升海量連接和數據處理轉發的性能。

1.1" MQTT連接通信

由于電力物聯網中的各類終端設備之間的通信環境往往受限,為了保證設備之間異端通信的可靠性和時效性,通常采用消息隊列遙測傳輸協議MQTT作為通信的基礎協議。MQTT協議是基于主題的消息生產和訂閱消費模式[4],生產者和訂閱者之間通過代理中間服務器進行解耦通信。電力物聯網中各類設備通過不同網絡與MQTT服務器建立連接并進行通信交互,通信模型如圖2所示。主要組成包括:1)MQTT服務器負責管理消息的代理服務器;2)消息生產者向MQTT服務器發送消息數據;3)消息訂閱者通過MQTT服務器接收主題消費生產者推送的消息。

MQTT協議之所以能夠為設備通信提供穩定可靠的通信交互,是因為MQTT基于TCP/IP之上實現,具備TCP的持久連接性,在數據傳輸完成后也能保持連接狀態,所以在不穩定的網絡環境下也無須重建連接,能夠快速響應通信狀態。MQTT通信是在客戶端與代理服務器之間建立連接,通常客戶端向代理服務器發送啟動連接指令,代理服務器收到后做出狀態響應建立連接。兩者之間成功建立連接后,只要客戶端不主動發送斷開指令,它們就會一直保持長連接狀態。其次基于TCP確認應答和重傳機制[5],MQTT通過進行報文控制和劃分服務質量(QoS)等級[6]來明確消息傳送次數、是否丟失以及重復發送等問題,從而確保消息傳輸的可靠性,如圖3所示。QoS等級從低到高不但表示消息可靠性的提高,也表示數據傳輸復雜度的提升,越大QoS對于收發系統消耗也越大,因此在應用中會根據實際的通信網絡情況和消息的重要程度對不同MQTT消息單獨設置QoS值。

1.2" 無鎖并發處理

傳統程序設計中,為了提升代碼的邏輯處理性能和響應速度,一般通過使用線程池進行多線程處理。而線程池本質是利用阻塞隊列,底層會使用鎖機制,因此隊列中會出現線程阻塞,從而吞吐量受限。Disruptor作為一種高性能異步處理模型[7],能夠實現線程間無鎖并發。Disruptor基于無鎖機制,底層通過CAS和自旋實現[8],結合事件驅動模式,高效實現隊列并發操作。Disruptor數據緩存是通過環形數組隊列RingBuffer實現,通過單線程寫和內存屏障阻止指令重排的方式實現無鎖[9]。RingBuffer主要用于不同線程之間數據傳遞的緩存,通過序號指向數組環中的下一個元素,如圖4所示。

數組環通過預加載方式,初始化內存中一段連續的緩存,通過預分配緩存對象保證處理速度更快。數組環中的數據只會被覆蓋不會被清除,這樣降低了垃圾回收機制啟動的頻率,從而保證執行效率,解決了數據延遲問題。Disruptor核心是Sequence序列,一方面通過順序遞增的序號來編號,管理進行交換的事件數據,對于事件數據的處理會根據序號逐個遞增,從而保證了多線程并發處理的原子性;另一方面可以防止不同Sequence之間緩存偽共享[10]。當多線程中互相獨立的變量分配到同一個緩存行中時,若線程修改變量就會影響彼此性能,Disruptor通過填充緩存行來消除這種偽共享問題。如圖5所示,緩存行大小為64個字節,通過一個Long型序號占8個字節,在序號前后均插入7個Long型值,這樣確保每個序號都獨占一個緩存行。

2" 改進后的MQTT連接處理模型

在智慧配電物聯管理平臺現有的MQTT連接單元中,終端與MQTT服務器建立連接后,將不同數據全部發送到一個主題中,再由數據轉發微服務進行訂閱監聽主題中的數據,通過復雜的邏輯判斷和業務處理后在轉發至業務Kafka中,如圖6所示。

隨著設備類型的日益增多,業務復雜性也凸顯煩瑣,數據處理性能出現瓶頸,從而導致長連接通道阻塞和終端超時斷連。現有方式只能通過擴展集群規模來相對維持正常業務處理,但又帶來了較高的運維成本。因此,本文基于無鎖并發模型對現有的MQTT連接單元和數據轉發處理過程的改進,如圖7所示。

利用Disruptor異步處理模型實現應用內部線程解耦,消除應用間分層異步,減少應用間分層數據處理,高效提升程序并發處理性能。當各類終端設備通過網絡和MQTT服務器建立長連接后,通過不同業務主題進行數據交互,部分類型如表1所示。不同業務主題會單獨對應一個Disruptor隊列進行處理,而Disruptor內部通過多生產多消費模式進行線程異步解耦,直接將原始數據解析處理后轉發至應用Kaka,極大提升了處理性能,避免了連接阻塞和延遲。

無鎖并發的改造是將業務邏輯處理在內存中進行,通過事件源驅動的方式實現,核心基于Disruptor生產消費模型構建,主要包括事件消息載體對象、事件監聽消費對象、數據存儲容器和消息生產對象。其中事件消息載體對象就是傳輸的事件消息,需要通過事件工廠方法進行創建;事件監聽消費對象是對容器中的事件對象進行消費處理,充當數據的消費者實現主要的業務邏輯;數據存儲容器是一個環形數組容器,用于事件消息的存儲;消息生產對象就是事件消息的生產者,將消息發布傳輸至存儲容器中供消費者進行監聽消費。Disruptor對象實例化及模型構建的流程如圖8所示。

其核心代碼如下:

// 實例化模型對象

Disruptorlt;IotMsgDatagt; disruptor = new Disruptorlt;IotMsgDatagt;(

iotMsgDataFactory, // 事件對象工廠

ringBufferSize, // 有界環形數組長度

executor, // 線程池

ProducerType.SINGLE, // 生產者類型

new BlockingWaitStrategy()); // 等待策略

// 添加事件消費者

disruptor.handleEventsWith(new IotMsgDataHandler());

// 啟動Disruptor

disruptor.start();

// 獲取RingBuffer緩存容器

RingBufferlt;IotMsgDatagt; ringBuffer = disruptor.getRingBuffer();

// 實例化生產者

IotMsgDataProducer producer = new IotMsgDataProducer(ringBuffer);

// 發送數據至緩存容器

producer.sendData(data);

實現消息生產方法的創建首先需要獲取實際存儲數據的容器,從RingBuffer里面獲取一個可用序號,根據序號得到數據元素,從而進行業務邏輯操作,最終提交發布該事件至數組容器中。生產者往環形數組容器內發布數據的核心代碼如下:

public void sendData(IotMsgData data) {

// 1 從環形數組中獲取可用序號

long sequence = ringBuffer.next();

try {

// 2 獲取事件對象

IotMsgData event = ringBuffer.get(sequence);

// 3 對象賦值

event.setValue(IotMsgData.getLong(0));

} finally {

// 4 發布投遞

ringBuffer.publish(sequence);

}

}

3" 實驗結果分析

為了驗證基于無鎖并發改進的MQTT連接單元的處理性能和實時性,在服務器上搭建了壓測環境,其中包括改進后的單實例程序,MQTT Broker作為MQTT消息服務器端,模擬網關設備發送模擬數據,且數據報文遵循電力物聯網云邊交互規范,序列化之后的數據格式如圖9所示。

模擬數據按照MQTT協議服務質量Qos0進行構造,每條報文消息分別按照1 KB、3 KB、5 KB、7 KB和10 KB大小分類模擬,模擬數據量均300萬條,在不同報文大小的情況下進行3輪測試結果如表2所示。可以看出,改造前的程序處理效率會受到報文大小的影響,報文數據量越大效率越低,而改造后的處理效率受報文大小影響較小,整體性能比之前提升了3倍以上。

為了進一步優化改造后的程序性能,在Disruptor對象構建時,分別選用阻塞等待策略和自旋等待策略進行對比分析。由于無鎖并發的實現方式是基于內存,所以給程序分配了10 GB內存和12核CPU的資源進行測試對比,結果分別如圖10和圖11所示。觀察發現,阻塞等待策略對CPU資源的占用基本穩定,CPU和內存占用受到數據量大小影響較小,相對比較低耗;而使用自旋等待策略時,在程序啟動運行后CPU占用率明顯變高,因為底層采用自旋和yield機制,保證了消費線程優先占有CPU資源進行計算,而且為了保證線程處理的高吞吐量和低延遲,對內存的占用也相對較高。因此,正常的業務中選用阻塞等待策略構建無鎖并發模型進行數據處理,完全能夠保證程序的高并發高效率處理能力;而在部分極低延遲的業務場景下可采用自旋等待策略,并且在實例化Disruptor對象時事件處理線程數需要設置小于CPU核數,RingBuffer環形數組長度根據消息報文大小和機器資源進行合理分配,避免出現數組長度較大,導致每批次訂閱的數據一直存儲在單位數組空間內,大量對象不能夠及時被回收,容易出現內存溢出。

4" 結" 論

本文針對現有物聯管理平臺中MQTT長連接通信性能和數據處理并發性問題,提出一種基于無鎖并發的方式對傳統連接單元進行改進優化。通過消除應用分層通信的延遲,設計應用內部線程解耦,基于事件源驅動和生產消費模型的思想構建了高性能的連接單元,提升了海量數據的并發處理能力,解決了各類終端設備MQTT通信延遲和消息阻塞等問題,保證了MQTT長連接的穩定性和數據傳輸的時效性。雖然對連接單元的設備接入和數據處理能力方面進行了改進,但是無鎖并發是基于內存實現,改進的程序依賴運行環境資源,當選擇極低延遲的等待策略時CPU占用率較高,所以需要進一步對底層線程自旋等待的邏輯進行優化,能夠使其根據宿主機資源自適應調整,這也是下一步待完善的地方。

參考文獻:

[1] 高焜,劉澤輝,高偉,等.一種基于區塊鏈的MQTT協議優化算法 [J].電力信息與通信技術,2024,22(5):10-16.

[2] 劉海濤,祁升龍,蘆翔,等.基于MQTT的配電終端多核處理器核間通信設計 [J].電子器件,2022,45(5):1207-1213.

[3] 劉亮,李卉.邊緣計算網關的功能設計與系統實現 [J].電測與儀表,2021,58(8):42-48.

[4] 邵瑞雪,田秀霞.智能電網中基于MQTT協議的ABAC訪問控制方案 [J].計算機應用研究,2022,39(11):3436-3443.

[5] 趙菁.TCP/IP協議棧的安全性分析 [J].網絡安全技術與應用,2024(4):12-14.

[6] 熊風光,陳霖,韓慧妍,等.基于MQTT協議的輕量化文本信息分發技術研究 [J].計算機技術與發展,2024,34(2):90-97.

[7] 王梓,梁正和,吳瑩瑩.基于Kafka、Disruptor技術對傳統ETL的改進 [J].計算機技術與發展,2018,28(11):26-29.

[8] 周沭玲,金楠,侯海平.基于WPF消息機制的多UI線程并發阻塞問題解決方案 [J].通化師范學院學報,2021,42(4):90-96.

[9] 李浩,張曉強.基于單線程的無錨點目標檢測模型 [J].計算機工程,2021,47(5):229-235+243.

[10] 燕保躍,姜博.一種面向持久化內存的熱點變量快速寫入算法 [J].小型微型計算機系統,2022,43(11):2442-2448.

猜你喜歡
數據處理
驗證動量守恒定律實驗數據處理初探
認知診斷缺失數據處理方法的比較:零替換、多重插補與極大似然估計法*
心理學報(2022年4期)2022-04-12 07:38:02
ILWT-EEMD數據處理的ELM滾動軸承故障診斷
水泵技術(2021年3期)2021-08-14 02:09:20
ADS-B數據處理中心的設計與實現
電子測試(2018年4期)2018-05-09 07:28:12
MATLAB在化學工程與工藝實驗數據處理中的應用
基于希爾伯特- 黃變換的去噪法在外測數據處理中的應用
大數據處理中基于熱感知的能源冷卻技術
計算機工程(2015年4期)2015-07-05 08:28:04
Matlab在密立根油滴實驗數據處理中的應用
數據處理能力在求職中起關鍵作用
我國首個“突發事件基礎數據處理標準”發布
主站蜘蛛池模板: 网友自拍视频精品区| 无码AV日韩一二三区| 亚洲伊人天堂| 国产主播一区二区三区| 欧美高清国产| 国产主播喷水| 亚洲精品大秀视频| 国产日韩欧美精品区性色| 亚洲a级在线观看| 国产精品亚洲专区一区| 精品国产亚洲人成在线| 亚洲乱码精品久久久久..| 成人免费网站久久久| 亚洲天堂777| 国产成人精品男人的天堂| 国产特一级毛片| 久久五月天国产自| 一区二区午夜| 国产精品久久久久久搜索| 亚洲欧美另类中文字幕| 98超碰在线观看| 亚洲无码四虎黄色网站| 色综合a怡红院怡红院首页| 久久中文字幕2021精品| 91人人妻人人做人人爽男同| 91成人在线免费观看| 亚洲欧美日韩成人在线| 国产中文在线亚洲精品官网| 欧美a网站| 九九热视频精品在线| 玖玖精品在线| 国产精品专区第一页在线观看| 久久综合久久鬼| 国产成人资源| 国模视频一区二区| 8090午夜无码专区| 伊人久久综在合线亚洲91| 国产成a人片在线播放| 黄色网页在线播放| 国产波多野结衣中文在线播放| 国产午夜福利亚洲第一| 99热亚洲精品6码| 欧美中出一区二区| 91蜜芽尤物福利在线观看| 国产成人精品高清不卡在线| a级毛片免费网站| 在线国产毛片| 亚洲六月丁香六月婷婷蜜芽| 精品人妻AV区| 日本欧美视频在线观看| 成AV人片一区二区三区久久| 久久9966精品国产免费| 国产在线97| aaa国产一级毛片| 国产黑人在线| 亚洲欧洲日产国产无码AV| 中文字幕 欧美日韩| 伊人激情综合网| 亚洲第一香蕉视频| 欧美性久久久久| 久久黄色一级视频| 国产一级毛片在线| 国产第二十一页| 久久这里只有精品23| 又粗又硬又大又爽免费视频播放| 亚洲自拍另类| 91福利国产成人精品导航| 精品少妇人妻一区二区| 毛片最新网址| 亚洲av无码牛牛影视在线二区| 日韩毛片免费| 伦精品一区二区三区视频| 国产男女XX00免费观看| 最近最新中文字幕在线第一页 | 国产18页| 亚洲aaa视频| 亚洲欧美天堂网| 五月激情综合网| 精品一区二区无码av| 一本久道久久综合多人| 亚洲免费成人网| 欧美一级片在线|