魏連秋,張義紅,張建光,馬 倩,邢春燕,劉 偉
(1.衡水學院數學與計算機科學系,河北 衡水 053000;2.衡水學院馬克思主義學院,河北 衡水 053000)
消息傳輸過程中,可能會出現網絡異常、程序異常、機器異常等各種突發情況,無法保證消息正常傳輸。一條消息從生產到消費的整個過程中有生產者生產、消息代理存儲、消費者消費三個過程需要保證消息的可靠性。通過對消息傳輸流程進行分析,發現若能保證生產過程可靠性、消息代理存儲過程可靠性、消費過程可靠性,則基本可以實現消息傳輸的可靠性。本文通過對現有消息中間件實現機制的分析,在遵循AMQP協議的基礎上,結合實際業務應用場景,權衡消息的可靠傳輸,對消息生產過程、消息代理存儲過程、消息消費過程三個過程進行分析,提出一種分布式系統中消息可靠性傳輸方案,實現消息傳輸的高可靠性[1]。
在生產者發出消息之后可能會出現網絡擁塞、服務器宕機等突發問題,從而導致消息的丟失。如果不對消息生產過程的可靠性加以設計,生產者將無法感知消息是否已正常送達消息代理,進而將導致消息的丟失。如果消息在生產過程中,能感知到傳輸過程中消息傳輸失敗,生產者可以進行重新傳輸等后續動作以確保消息生產過程的可靠性。因此,在消息生產過程中,設計一種高級發布確認方案以保證消息生產可靠性是十分必要的。
RabbitMQ提供了publisher confirm機制來避免消息發送到MQ過程中丟失,這種機制要給每個消息指定一個唯一ID,消息發送到MQ以后,會返回一個結果給發送者,表示消息是否發送成功。
publisher發送消息的配置為publisher-confirmtype。這里支持兩種類型:一種是simple:同步等待confirm結果,直到超時;另一種是correlated:異步回調,定義ConfirmCallback,MQ返回結果時會回調這個ConfirmCallback。相對而言,異步確認性價比較高,因而在此采用correlated:異步回調。
(1)消息成功發送到交換機,如圖1所示返回ack。

圖1 消息成功投遞到交換機
(2)消息未成功投遞到交換機,如圖2所示返回nack。

圖2 消息未成功發送到交換機
在僅開啟publisher confirm機制的情況下,交換機接收到消息后,會直接給消息生產者發送確認信息,如果發現該消息不可路由,那么消息會被直接丟棄,此時生產者是不知道消息被丟棄了。這顯然不是我們所希望的,于是提出mandatory+publish-return方案。
設置mandatory參數可以在當消息傳輸過程中不可達目的地時將消息返回給生產者。可以使用如圖3所示方案,當把mandatory參數設置為true時,如果交換機無法將消息進行路由時,會將該消息返回給生產者,當把該參數設置為false時,如果發現消息無法進行路由,則直接丟棄。

圖3 mandatory+publish-return方案
在只使用mandatory參數的情況下,如果交換機無法將消息進行路由時會提示Returned message but no callback available,說明設置mandatory參數后,如果消息無法被路由,則會返回給生產者,此時是通過回調的方式進行的,但由于還沒有回調函數,所以生產者需要設置相應的回調函數才能接受該消息。此時就需要設置生產者回執:publisher-return,實現一個ReturnCallback接口,這樣當消息傳輸到交換機而沒有路由到隊列時,就返回ACK及路由失敗原因。此時再運行則Returned message but no callback available提示消息,而可以看到生產者接收到了被退回的消息,并帶上了消息被退回的原因:NO_ROUTE。
有了mandatory+publisher-return方案,我們獲得了對無法路由消息的感知能力,此時我們可以通過日志的形式得到反饋并手動重新處理。但實際上,這樣做無形中增加了生產者的復雜性,需要添加處理這些被退回的消息的邏輯。如果既不想增加生產者的復雜性,又不丟失消息,怎么辦?在RabbitMQ中,有一種備份交換機的機制存在,可以較好地解決這一問題。
消息消費可靠性是指在分布式系統中,保證消息在傳輸和處理過程中不會丟失或重復,為了實現可靠的消息消費,一般采取如下措施。
RabbitMQ為了能夠把消息正確發送給消費者,提供了消息確認機制,即告訴RabbitMQ消息已經收到,使用消息確認機制可以使消費者成功處理消息后再從隊列中刪除該消息。
消費者獲取消息后,應該向RabbitMQ發送ACK回執,表明自己已經處理該消息。若出現這樣的場景:RabbitMQ傳輸消息給消費者,消費者獲取消息后返回ACK,RabbitMQ刪除消息,此時消費者突然宕機消息未成功處理,這樣,消息就丟失了。因此消費者返回ACK的時機非常重要。一般RabbitMQ允許配置三種確認模式:①none:關閉ACK,RabbitMQ假定消費者獲取消息后會成功處理,消息傳輸后立即被刪除,此種模式下消息傳輸是不可靠的,可能丟失,因而不可??;②auto:自動ACK,由spring監測listener代碼是否出現異常,沒有異常則返回ACK。此種模式類似于事務機制;③manual:手動ACK,需要在業務代碼結束后,手動調用API發送ACK,這種模式需要根據具體情況,判斷什么時候發送ACK,使用較為復雜。綜合考慮推薦使用默認的auto模式[2]。
當消息數量變得過多或處理速度變慢時,可以使用消費者端限流來控制消息的傳遞速率。通過限制消息的最大數量或大小避免消費者宕機,從而提高消費者的可靠性。
為了保障分布式系統中消息可靠性傳輸,筆者設計了如下解決方案。
生產者發送消息給交換機,若不能發送成功則利用publisher confirm機制重新傳輸,若交換機不能成功將消息路由到隊列則利用備份交換機機制將消息傳輸到指定隊列進行消費;若消費者不能成功消費消息則利用延遲重試機制在指定次數內進行循環延遲重試;若循環結束仍舊不能成功消費則利用死信交換機將消息傳輸到死信隊列進行消費。其中的交換機、隊列設置均設為持久的,消息視具體情況而定,使用如圖4所示傳輸方案,這樣可最大限度保證消息可靠性傳輸與消費。

圖4 發布確認+帶重試次數延遲消費的可靠性傳輸方案
生產端核心代碼如下:
在消費端對消息的處理邏輯中,還應考慮冪等性,即對同一條消息多次處理所產生的結果應該是相同的,這樣出現重復處理的情況也不會對系統產生影響。另外,在分布式環境中,我們還可以采用副本集群模式,即在RabbitMQ集群中,多個節點會同步存儲隊列中的消息,如果某個節點發生故障,其他節點可以接替它繼續處理數據,從而防止消息丟失。通過以上解決方案的合理選擇和組合,可以提高RabbitMQ性能和可靠性,從而確保分布式系統的穩定性和可用性。