999精品在线视频,手机成人午夜在线视频,久久不卡国产精品无码,中日无码在线观看,成人av手机在线观看,日韩精品亚洲一区中文字幕,亚洲av无码人妻,四虎国产在线观看 ?

面向流式數據處理系統的高效故障恢復方法

2022-11-30 07:27:10劉陽張揚揚周號益
計算機應用 2022年11期
關鍵詞:故障系統

劉陽,張揚揚,周號益

面向流式數據處理系統的高效故障恢復方法

劉陽1,2,3,張揚揚1,2,周號益1,2,4*

(1.北京航空航天大學 大數據科學與腦機智能高精尖創新中心,北京 100191; 2.北京航空航天大學 計算機學院,北京 100191; 3.北京航空航天大學 未來空天技術學院/高等理工學院,北京 100191; 4.北京航空航天大學 軟件學院,北京 100191)(?通信作者電子郵箱haoyi@buaa.edu.cn)

針對流式數據處理系統Flink無法高效處理單點故障的問題,提出了一種基于增量狀態和備份的故障容錯系統Flink+。首先,提前建立備份算子和數據通路;然后,對數據流圖中的輸出數據進行緩存,必要時使用磁盤;其次,在系統快照時進行任務狀態同步;最后,在系統故障時使用備份任務和緩存的數據恢復計算。在系統實驗測試中,Flink+在無故障運行時沒有顯著增加額外容錯開銷;而在單機和分布式環境下處理單點故障時,與Flink系統相比,所提系統在單機8任務并行度下故障恢復時間減少了96.98%,在分布式16任務并行度下故障恢復時間減少了88.75%。實驗結果表明,增量狀態和備份方法一起使用可以有效減少流式系統單點故障的恢復時間,增強系統的魯棒性。

流式數據處理系統;故障恢復;分布式檢查點;狀態備份;Apache Flink

0 引言

大數據時代,隨著互聯網、物聯網等技術的快速發展,諸如工業監控、社交媒體、實時搜索引擎等應用場景產生了海量的數據,并對計算處理有了更嚴格的要求,需要數據處理系統提供低延遲的實時計算,而對實時計算的需求進一步促進了分布式數據處理系統從批處理(Batch Processing)模式逐步轉變為流處理(Stream Processing)模式。

批處理系統通過對輸入數據進行采樣收集,當數據規模達到設定閾值后利用批處理引擎計算累積的數據,可以反映一段時間內數據的特征,同時還能夠保證數據分析結果的正確性,也被廣泛用于機器學習、圖計算等領域,并采用分布式檢查點[1]、反應式故障恢復[2]等技術進行容錯。此外,批處理系統的框架相對簡單且易于擴展。雖然批處理可以達到很高的吞吐,但在實時性方面難以滿足當前大數據背景下各類實時應用的低延遲的需求。

相較于批處理模式,流式系統逐條地對輸入數據進行實時處理,可以捕獲動態實時數據的最新特征,更快地挖掘數據背后的價值。流式系統通常會對無界數據提供長期穩定且實時的計算處理,可以很好地滿足商業公司對數據處理的實時性需求。流式處理分為有狀態處理和無狀態處理,前者語義豐富可以表達更真實的數據場景,后者的使用場景較為簡單。在分布式系統中,有狀態的流數據處理完全依賴于前序計算狀態,如果在數據處理過程中發生故障,將導致前序狀態的丟失,從而導致流計算必須整體重新開始,嚴重情況下如持續大規模數據輸入場景,甚至無法實現狀態恢復,故障后系統恢復代價很高。因此,針對有狀態流數據處理的故障容錯需要系統在計算資源和處理速度上進行一定妥協。但隨著新興應用場景的出現,流式系統對數據處理延遲和備份開銷要求越來越嚴格,當前故障容錯機制面臨著新的挑戰。

目前,主流開源流式系統[3-11]和商業流式系統[12]根據系統設計與能力支持的不同,采用的容錯機制不盡相同,針對的應用場景和容錯能力也有較大差異。如Storm[10-11]采用消息確認的機制實現容錯,但僅支持至少一次語義支持;Spark Streaming[5-9]將流式數據視為一個個小的批數據,利用微批(Micro Batch)以支持流式計算,并復用Spark的血緣容錯機制,支持精確一次語義,但對其他的流式特性支持較差。Flink[3-4]采用了一種簡化的Chandy?Lamport分布式快照算法[13-14],在保證精確一次語義的同時,實現輕量級的容錯。當一次快照完成之后可視為數據成功處理,從而實現端到端的精確一次語義支持。當故障發生時,所有計算任務整體回滾到上一次快照狀態,并重新消費之前的數據。然而這種方法存在一個弊端,即使故障規模較小,甚至是單節點故障,也不得不將所有計算節點進行回滾,故障恢復時間長,并且需要重新計算因回滾丟失的進度。

針對這一問題,本文提出一種基于增量式狀態備份的高效故障恢復系統Flink+,通過增量式狀態同步實現快速狀態備份;利用上游輸出緩存和備份數據通路實現故障任務的快速恢復。Flink+基于Flink已有的分布式快照機制,利用已有的計算任務進行增量式的互相備份,一個計算任務在進行自己主計算的同時,也負責備份其他節點的任務狀態和計算邏輯。在創建計算圖時,備份任務提前建立好與主任務上下游任務的網絡連接,以降低故障恢復時任務切換的時間,該備份連接在無故障運行時并不進行傳輸;當快照時,備份任務增量式地同步主任務的狀態,降低狀態備份開銷。在故障發生后,備份任務立即啟動備份計算邏輯,并利用備份網絡連接接管上下游數據,實現快速的故障任務切換與恢復。為了進一步細化故障恢復的粒度,采用上游輸出緩存機制,在無故障運行時,上游任務的輸出會被保留一段時間,以防止下游故障之后需要從數據源頭進行重計算,只需要重新消費其上游的輸出數據,進一步降低故障恢復時間。

本文的主要工作如下:

1)設計了一種基于增量式狀態的快速備份方法,結合快照機制和增量狀態備份,實現對系統狀態的快速備份。

2)利用上游輸出緩存和備份數據通路實現故障任務的快速切換和狀態恢復,提高系統對單點故障的處理速度。

3)在開源流式系統Flink中進行了實現和實驗驗證,實驗結果驗證了本文方法的可行性和有效性,該方法在無故障運行時沒有顯著增加額外容錯開銷,同時實現了非常顯著的故障恢復加速效果,加速比可達6~8。

1 研究背景

1.1 語義支持

在流式系統中,數據的最小單位是消息,對消息的處理次數保證被稱為投遞語義(Delivery Semantic),為了更好地理解流式計算,需要首先介紹一下流處理的三種語義:

1)最多一次:對于數據中的每條消息,至多只進行一次處理,如果發生故障,消息會丟失,并且系統不會進行故障恢復計算,而是繼續處理后續到達的消息。這種語義放松了系統的計算保證,簡化了系統設計,適合能夠容忍數據丟失的應用。

2)至少一次:對于每條消息,至少會計算一次。考慮消息已被計算但未被確認的情況,若此時發生故障,系統重啟,數據源重新發送未被確認的消息,則會導致對同一條消息的多次計算。這種語義能夠保證數據的完整性,但需要上層應用處理數據重復的問題。

3)精確一次:不論是發生故障還是正常運行,對于每個消息,從系統整體的端到端來看都只會處理一次,即一份輸入數據對應一份輸出數據。這種語義提供最強的數據保證,可以滿足對數據有強一致性要求的應用,但增加了流式系統的復雜性。

1.2 容錯相關工作

流式系統的容錯機制[15-16]可分為三個類別:主動備份、被動備份以及混合備份[17-18]。

1)主動備份:系統中的計算節點都有一個獨立的備份節點,備份節點和主節點擁有一樣的資源和計算邏輯,系統正常運行時,二者處理一樣的數據流,因此主節點和備份節點的狀態可以實現同步,當然備份節點的輸出會被丟棄或者緩存起來,這取決于不同的實現方式,只有主節點的輸出會傳向下游計算節點。當系統中發生節點故障時,自動切換到備份節點,因為主節點和備份節點狀態是同步的,只有切換時間代價,沒有狀態恢復的時間代價。該策略可以實現最低延遲的故障處理,但僅考慮單一節點備份就需要消耗2倍的軟硬件資源,對于大型系統來說負擔較重。雙集群備份是一種典型的主動備份方法。

2)被動備份:對于系統中的每一個節點,定期將節點的狀態通過快照或者其他形式保存到備份節點上,備份節點可以獲取主節點的計算邏輯,當系統發生故障時,備份節點從最近一次緩存的狀態開始恢復,輸入數據可以一起放入快照或者緩存在輸出隊列中。該策略的具體的實現方式較多,但是不可避免的是節點狀態存在恢復過程,可以滿足對故障恢復時間要求不嚴格的場景需求,但仍無法滿足諸如“雙十一”等關鍵業務的嚴格需求。被動備份策略往往采用節點冷啟動的方案,因此資源需求相對較低。分布式檢查點是一種典型的被動備份方法。

3)混合備份[19-22]:該策略綜合考慮了上述不同策略的優缺點,通過部分熱啟動的備份節點實時進行狀態同步。系統正常工作時,備份節點會對同步的狀態進行恢復計算,發生故障時,主節點計算邏輯就可以直接切換到備份邏輯上。例如,相關工作FP4S(Fragment?based Parallel State Recovery for Stateful Stream Applications)[23]借鑒鏈式復制的思想將任務組織成環形一致性哈希(Consistent Hashing)進行互備,建立路由和鄰居表來選擇狀態的地理優先備份,減少網絡延遲;同時使用糾錯碼將任務內存狀態分塊寫入備份節點,在恢復時并行拉取狀態塊來提高恢復速度,其后續工作SR3(Customizable Recovery for Stateful Stream Processing Systems)[24]更進一步優化了狀態備份的選擇性。

混合策略可以有效結合各種容錯機制的優點,充分利用資源降低開銷,加速故障恢復。而開源流式系統目前還是使用單一容錯策略居多,因此存在進一步優化的空間。Apache Storm作為最早的流式系統,是原生的流式系統,采用被動備份和消息確認機制來實現容錯,但只能提供至少一次投遞語義,容錯能力薄弱,且消息確認機制導致系統吞吐量不高;Apache Spark的Spark streaming部分屬于流處理框架,底層采用彈性分布式數據集(Resilient Distributed Dataset, RDD)來實現流計算,但本質上是“微批”的處理思想,在處理延遲上存在不足(盡管Spark Streaming的底層已經開始向流處理原生框架遷移,但是在適配方面還是略微遜色);Apache Flink目前是最火熱的開源流處理框架之一,原生流處理框架使Flink可以實現毫秒級的事務,并且相比Storm又具有更高的吞吐量,但是Flink本身的容錯機制也存在一些問題,因此,本文針對Flink系統,提出了一種混合備份方法,設計并實現了Flink+系統,通過主動狀態同步與被動計算來實現高效的故障恢復。

2 Flink

Apache Flink屬于原生的流處理架構,但Flink也同時支持批處理,它把批處理當作流處理中的一種特殊情況,用流處理來模擬批處理,本文在此只討論Flink的流處理框架。

在Flink中,所有的數據都被看作流的一部分,這種抽象很接近于現實世界。互聯網數據往往都是事件流,從一個數據庫轉移到另外一個數據庫,進行一些操作,生成新的事件等,每個事件還往往伴隨著其被處理的時間,而傳統的批處理對時間信息不敏感,因此無法通過時間信息獲取更多數據信息,這在電商經濟中尤為凸顯,用戶點擊和購買對于商家的推薦有明顯的影響,并且推薦要實時,否則就可能錯失用戶。Flink作為最新一代的原生流處理框架,在事件處理時延上可以實現毫秒級別的延遲,其穩定性和可擴展性非常適合大規模集群的數據處理,本文的實驗也基于Flink進行。

2.1 Flink系統架構

Flink流處理框架由兩類運行時進程構成:JobManager和TaskManager。在后續討論過程中將前者稱之為作業管理器,后者稱之為執行管理器。

作業管理器負責協調申請各種資源、對流處理任務建模、執行快照等。調度器(Scheduler)負責調度子任務(SubTask)的運行,快照協調器(Checkpoint Coordinator)負責分布式快照相關的邏輯。執行管理器是任務實際運行的地方,執行管理器中的任務槽(Task Slot)作為最小任務執行單位,由作業管理器申請使用。每個Task Slot同一時刻只能處理一個任務。不同的Task Slot通過本地消息隊列或者網絡傳遞數據。Flink采用Akka作為底層高并發的運行時,各組件通過Actor模型通信。

2.2 Flink分布式快照

接下來詳細介紹Flink目前的容錯恢復機制[1]。Flink的容錯機制屬于被動備份策略,主要通過分布式快照實現,周期性地把系統算子狀態備份到遠端持久化存儲,當系統發生故障后,利用上一次快照的狀態重新計算并恢復狀態。

Flink的分布式快照算法是基于Chandy?Lamport算法[11]修改后實現的異步算法[12]。Chandy?Lamport算法用來對分布式系統狀態做快照,把分布式系統的全局狀態和鏈路狀態記錄下來用以故障恢復或死鎖檢測等。Flink對其進行修改后,實現了一個輕量級的異步分布式快照算法。

Flink流計算模型中包含Source operator作為數據輸入源算子、Transformation operator作為變換算子和Sink operator作為數據輸出算子三種算子類型。

Flink系統由一個快照協調器協調全局狀態快照,該協調器會周期性觸發快照消息給Source operator,該算子在收到消息后,會首先把自身狀態備份到快照中,然后生成一個barrier消息傳遞給下游全部算子。該barrier消息標有對應的序號并且和普通數據共享同一通路,當下游算子處理到barrier時,會觸發自身的快照,如果算子有多個輸入通道,那么當每個輸入通道都接收到對應的barrier后才會觸發算子的狀態快照,否則輸入通道會暫時阻塞該通道的輸入數據直到快照開始。下游算子把狀態備份到快照后同樣生成新的barrier,并且傳遞給自己的全部下游算子;當Sink operator收到輸入通道的全部對應一致序號的barrier后,本次快照完全結束,并且通知快照協調器。之后由協調器把備份的狀態發送到持久化存儲中。

Flink的這種異步快照方式可以在不影響其他算子正常計算的情況下完成整個執行圖的狀態備份,將快照和數據流處理完美地融合在一起。

針對上述提到的阻塞情況,如果因為系統負載不均,導致算子某個輸入通道被阻塞較長時間會對算子計算造成影響,因此Flink對這種情況設計了非對齊的快照算法:即當第一個輸入通道收到barrier后,就立即向下游算子廣播該barrier;同時立即開始狀態備份,還會把還沒有接收到對應標號的輸入通道的數據全部備份起來。這種情況下的快照算法需要算子記錄輸入流的部分信息,與原始Chandy? Lamport算法較為相似。這種非對齊的快照會導致備份狀態的數據量變大,同時在狀態恢復時需要對狀態和保存的輸入流數據重新計算,增加了恢復時間。

當發生單一節點故障時,Flink系統會計算故障區域,因為流計算算子聯系比較緊密,下游算子的故障會導致上游算子停止計算,陷入等待,進而使得整個流計算全部停止。Flink系統首先會把遠程存儲的快照狀態拉取到本地,釋放之前計算任務的資源,把之前的計算任務全部取消,然后重新拉起全部計算任務,分配資源,并且把快照狀態分配給對應的算子,將數據源消費偏移指向快照時刻的位置。因此,Flink在開啟快照后的數據源必須具備重放的能力,以滿足故障后重新處理部分輸入數據的需求。從Flink目前的容錯機制可以明顯看出其存在的部分問題:單一節點故障導致整個計算圖的重啟;狀態恢復過程后數據重新計算不可避免。

因為Flink以批量的數據進行快照,相比Storm而言更加輕量,但是不可避免的是在每一次恢復時數據會有回滾計算,而且對于某些場景,快照的周期不能過短,否則會給系統帶來較大的負擔,影響算子的計算性能,因此這種情況下故障恢復所需要的時間也比較長,可能無法滿足場景需求。以研究的應用場景為例,Flink系統可以實現容錯,但是無法滿足故障后快速恢復,如果將快照周期縮短,會給系統帶來較大的運行負擔,影響正常的算子計算。Flink提供至少一次和精確一次的處理語義

3 Flink+系統設計

3.1 整體設計

本文設計的容錯方案基于快照技術進行優化。一方面,針對大數據場景下全量狀態備份數據量巨大從而導致備份開銷大的問題,通過采用增量式的狀態備份(將快照間隔內的狀態改變量定義為狀態的增量變化,即狀態增量),減少快照數據傳輸量,加快快照過程,進而縮短快照周期,從而使得故障后回滾時間變短,加快故障恢復。之所以采用狀態增量是因為大數據流計算場景下,任務的狀態數據量通常很大,而快照時間間隔內的狀態改變量通常相對較小。如果每次快照都把全部狀態復制備份,那么相鄰兩次快照有很多冗余的備份以及網絡傳輸。采用增量狀態,就可以在每次快照時只對狀態改變量進行備份傳輸,在遠端存儲進行全量狀態的恢復,可以大幅減小快照時的網絡帶寬壓力。另一方面,針對故障后系統在恢復過程中的時間開銷問題,通過采用備份節點以及上游輸出緩存技術,實現故障后任務的快速切換,并且直接消費上游緩存的輸出數據進行狀態恢復,降低故障恢復時間代價,避免單點故障向上游擴散導致系統全局回滾。

將上述容錯方案應用到Flink系統,改進后的系統稱為Flink+,具體的設計分為以下兩個階段:

1)快照階段:采用RocksDB作為狀態后端的Flink系統,在增量快照時會將每個任務的快照周期內狀態改變量發送到持久化存儲遠端,基于此,可以利用持久化遠端存儲的增量狀態,在備份任務上提前進行重放(Replay),保證備份任務和主任務的檢查點狀態一致性。因為傳輸的快照狀態主要為周期內改變量,所以網絡帶寬占用會顯著下降。

2)故障恢復階段:為了加快故障恢復,在系統啟動時,提前在備份任務和上下游任務建立數據通路,而不是故障后再新建任務并建立通路。但該備份數據通路只在故障時工作,正常情況下不用于數據交換。故障發生時,通過備份數據通路可以做到快速的任務切換,而備份任務已有上次快照時的狀態,可直接利用上游緩存重新計算進行狀態追趕,避免計算狀態的全局回滾和恢復任務的啟動時間。

整個系統架構如圖1所示,Flink+將上層應用的流計算轉換為對應的工作流圖,同時選擇部分算子構建備份流圖。在每次快照時同步主算子和備份算子之間的狀態,構建備份算子和上游算子的數據通路。當有備份的算子發生故障時,系統可以利用備份算子和提前建立的數據通路以及上游輸出緩存來快速恢復丟失的狀態。

圖 1 Flink+系統架構

3.2 基于備份的快照技術

Flink系統的快照是基于Chandy?Lamport算法的異步改進版,通過引入barrier消息,實現了在不停止系統正常工作的情況下,完成系統整體狀態的備份。但是Flink系統在遇到節點故障后只能通過系統整體回滾到上一次快照狀態的方式來恢復系統狀態。因此可以將關鍵節點的狀態和輸入都備份起來,在故障發生時直接利用備份的信息恢復,這種做法可以控制回滾區域以及加快故障恢復的過程。具體介紹如下:

1)備份節點:本文基于Flink的快照技術,引入了備份節點的概念(在Flink中是備份任務),如圖2所示,備份節點擁有主節點的靜態資源,主要負責狀態同步,不進行計算,運行開銷較低。

圖 2 基于備份的快照

把快照的狀態增量同步給備份節點,備份節點通過合并增量狀態與上次同步的狀態,使主備份節點的狀態在每一次檢查點都是一致的;備份節點的數據通路在正常運行時處于關閉狀態,當系統某個任務發生故障時,備份任務的數據通路可以立即打開,并基于同步的狀態和上游任務緩存的輸出進行恢復,從而避免系統整體重啟。

2)鏈式備份:針對備份節點的組織形式,本文將流式處理系統中的有狀態任務組織成多條鏈式結構[25-26],這里借鑒鏈式復制的思想。假設每個任務連成一條鏈(其中為用戶指定的容錯參數,一般取=3)。成鏈的方法有多種選擇,比如考慮到故障問題,一條鏈的節點由不同機器上的節點組成,可以避免因機器故障導致鏈上全部節點故障;或者在考慮網絡帶寬的情況下,將鏈上的節點組織成為地理上直接連通的節點,減小網絡延遲。具體來說,可以根據任務有向無環圖(Directed Acyclic Graph,DAG)對成鏈方式進行選擇,對不同的需求適配不同的成鏈方式,成鏈方式的選擇作為本文的一個后續研究方向。

在無故障運行階段,鏈上的每一個節點(即任務)都周期性地向其后繼節點(即備份任務)同步任務狀態和計算邏輯(僅需同步一次),后繼節點作為前序節點的備份節點存在。考慮到Flink目前的三層執行圖模式,可以將成鏈邏輯放在Job Graph或者Execution Graph。因為任務具體執行時分配單位是Execution,所以成鏈邏輯在Execution Graph可以更加有效地利用分配方式選擇成鏈方式。在無故障運行階段,可以利用Flink流式系統中流式快照的機制;不同的是Flink快照技術中任務在接收到快照消息后僅將自己的狀態寫入到持久化存儲中,而Flink+系統鏈式地將狀態增量同步到鏈上的后繼節點直到鏈尾節點。備份任務擁有主任務的計算邏輯但無故障運行時不進行計算,只做狀態同步,為了降低內存占用,備份任務采用RocksDB作為狀態存儲后端。

在無故障運行時,任務會在每一次快照點把狀態信息發送給備份任務,備份任務基于主任務的狀態信息維護和主任務一致檢查點狀態。備份任務同時向后繼節點發送狀態信息,使整個鏈的檢查點狀態一致。這樣,鏈上的每個節點都會擁有前序節點的同步狀態,方便故障時的恢復計算。

3)上游輸出緩存:為了避免故障恢復的時候全局回滾,本方案采用了上游備份機制。如圖3所示,在正常運行過程中,任務向下游發送自己的輸出后,并不清理本地的這些輸出數據buffer,而是緩存下來,使用空間超過內存限制則溢出到磁盤,并在收到快照消息時記錄當前輸出的offset,當下游節點完成狀態同步后,會向上游發送清理輸出的消息,此時上游任務清理掉offset之前緩存的輸出數據以減少內存占用。當故障發生時,只將故障任務切換到備份任務,并重新消費上游備份的輸出,其他任務仍執行之前的計算。

4)消息ID去重:考慮到故障恢復后,備份節點重新消費數據并向下游發送輸出,而故障發生之前故障節點可能已經發送過部分相同的數據,此時下游節點則可能會處理相同的數據,從而不滿足精確一次的投遞語義。本文方案給每個消息都編上全局唯一的ID,并在每個任務中用RocksDB維護一個已處理消息ID的集合,當檢測到消息ID在這個集合中時,則直接丟棄不進行處理,為了加快檢測速度,可以采用布隆過濾器進行過濾,當布隆過濾器無法判斷時再訪問RocksDB進行確定。為了減少資源占用,系統會定期清理布隆過濾器。考慮到對于有精確一次投遞語義需求的事務,消息ID去重是必須的,而對于較寬松的投遞語義不是必要的,且對于事務型處理來說,精確一次的投遞語義有多種實現方式,因此本文實驗未對消息ID進行測試。

圖 3 上游備份

總之,雖然本文方案仍然采用快照的基本機制,但是通過備份節點的方式,利用部分額外資源對主節點狀態的同步備份,可以有效節省故障后系統重啟計算的代價,縮短快照周期,防止故障發生時不必要的全局數據回滾。消息去重對精確一次語義才會起作用,對于至少一次語義來說是不必要的,因此該功能可以根據上層應用對一致性的需求開啟或者關閉。

3.3 基于任務切換的故障恢復技術

當單點故障發生時,直接將故障任務切換到備份任務,此時備份任務啟動計算邏輯準備計算,同時上游任務將緩存的輸出恢復到上一次快照的offset,并向下游發送備份的輸出數據,備份任務基于同步的狀態重新消費上游輸出數據進行狀態恢復并向下游輸出。由于僅進行任務的切換,單點故障下流式處理可以被快速恢復。

通過提前建立備份任務和上游任務以及下游任務的數據通路,在故障恢復階段,系統可以快速地將故障節點切換到其鏈上的后繼節點,并重新消費計算圖中上游節點的備份輸出,恢復流式計算。同時在集群中重啟故障節點任務,重啟成功后的節點可以采用追趕備份任務狀態的方式,在二者狀態同步時再次切換;或者可以把重啟后的節點作為新的備份節點添加到鏈的末尾。整個過程如圖4所示,備份算子的存在使恢復過程可以在極短的時間內開始,而且不影響上游算子的正常工作。

圖 4 故障恢復

在Flink系統中,由于采用RocksDB作為狀態存儲,備份任務可能同時備份多個節點的狀態,進一步還受限于所在機器的內存、CPU等資源,計算可能比較慢,效率相比主節點可能會比較低,有可能觸發系統的反壓機制,降低系統整體性能。因此本文方案會同時原地重啟主任務并接管原有RockDB狀態,即和備份任務同步狀態,若無法原地重啟,則在其他機器上重啟,此時該狀態為空,可以將其掛到備份任務的鏈尾。然后主任務通過其前序節點進行狀態追趕。當完成狀態同步后,在備份任務接收到快照消息并完成狀態同步后,將重啟的主任務重新恢復成鏈頭,并將計算切換到主任務。至此,整個故障恢復完成。

4 系統實現

4.1 輕量級容錯機制

Flink原始容錯過程如圖5所示。主任務在計算過程中發生故障,任務管理器在感知到故障后會首先釋放故障任務的資源并停止整個執行圖的計算,然后開始推導需要恢復的算子區域;之后,重新拉起故障區域的任務,并基于快照存儲的上一次狀態和可重放數據源進行任務的狀態恢復,系統重啟后重新消費自上一個快照開始的數據。

可以看出Flink原方案存在的問題在于:單個算子故障導致系統整體重啟,并且回滾后系統需整體重新處理自上一次快照的數據。對于簡單的流處理任務,系統整體重啟的代價相對較小,但是對于較大規模的流理系統,系統算子大規模重啟的代價對于實時性來說是不可接受的。

圖 5 Flink系統在單點故障發生后的流程

改進后Flink的容錯過程如圖6所示。本文設計的容錯機制分為狀態備份、主備份任務切換、上游輸出緩存、備份任務狀態恢復幾個模塊。

圖 6 Flink+系統在單點故障發生后的流程

4.2 狀態備份

Flink的快照機制會把執行圖算子狀態保存到持久化存儲中,以便在故障時拉取用于恢復,本方案基于此,在每次快照做持久化的同時把狀態同步到備份任務,使備份任務維持和主任務一樣的快照狀態。為了方便實現,改進措施直接利用Flink原有的狀態恢復過程,在快照時,備份節點進行狀態“恢復”,即同步狀態。在成鏈方式上,因為備份節點和主節點之間的聯系屬于備份層面的,并不是實際流處理的數據通路,因此成鏈作為在Flink三層圖結構中Execution的一個單獨抽象來對待,通過記錄節點之間的成鏈關系,在快照時同步狀態,在故障時切換,把上游節點的輸出切換到備份節點的通路上。同時,因為使用了RocksDB作為狀態后端,在快照時,可以利用RocksDB本身的changelog實現增量式的狀態備份,即:在每次快照時只發送新增的或者壓縮的狀態文件,未改變的狀態文件不再進行傳輸。增量形式使快照傳輸的網絡流量大幅降低,后續實驗結果也體現了這一點。

4.3 主備份任務切換

備份任務在正常情況下啟動后就直接掛起,只在狀態同步時工作,不觸發任何計算邏輯,不占用CPU資源。當主任務發生故障時,系統檢測到后會立即把主任務掛起,把數據通路切換到備份任務上,并且啟動備份計算邏輯。這種情況下,備份節點時刻處于熱啟動狀態,但是幾乎不占用CPU資的計算資源。

在Flink系統中,正常的流處理執行圖中的上下游任務會通過數據通路channel實現數據交換,且上下游任務之間的channel是一一對應關系;發生故障后,備份任務可以通過重新建立和上游任務以及下游任務的數據通路來代替主任務執行計算,但是這一過程需要花費一定的時間,且隨著計算圖規模變大,花費時間越多。因此,為了實現快速的任務切換,本文的方案會讓備份任務和主任務一樣,提前把數據通路建立好并通過標記來控制數據的交換。

Flink的數據通路在向下游發送數據時,可以通過添加標記來控制數據是否被實際發送,改進后的系統同樣通過一個flag實現數據通路的控制,在正常情況下,該flag值為true,會使得上游算子的數據只發送給主任務,發生故障后,會將flag置為false,使數據可以被備份任務的數據通路接受。

4.4 上游輸出緩存

Flink的每個任務會把輸出緩存在buffer中,由管理器通知下游算子buffer數據可以被消費,本文基于Flink的buffer,將其緩存下來,在每個快照周期清理一次,僅保存上一次快照期間的輸出數據,以滿足故障恢復的需求。對于快照時的offset,Flink會一起保存到每一次快照中,因此在故障后恢復狀態時,offset會自動指向上一次快照時的輸出位置;改進措施則是在備份節點中同步記錄該offset值,在故障時直接使用備份節點的offset。如果buffer使用的內存超過了限額,Flink會使用磁盤來緩存。有了上游輸出備份,在故障后下游算子可以直接消費上游算子緩存的數據恢復狀態,而不需要從頭開始消費數據,很大程度上減弱了故障的影響。Flink系統本身提供了從持久化存儲中拉取到本地的狀態文件來恢復狀態的API,同時,因為Flink的不同模塊的通信由Akka提供,兩個任務之間沒有直接通信方式,均通過執行管理器來調度;因此為了方便實現狀態同步,在系統設計中,基于上述API進行改進,結合快照協調器的消息,當快照結束后,備份任務收到執行管理器的消息便開始從持久化遠端拉取對應的狀態文件并在本地恢復。這種方式比主任務直接把快照狀態文件傳輸給備份任務要花費更多的時間,但是實現更加簡單。

4.5 備份任務狀態恢復

故障后備份任務基于上游緩存的數據和之前同步的主任務狀態來進行狀態恢復,但是此時上游算子的計算并沒有被停止,系統整體仍處于運行狀態,故障算子則在恢復后從故障時間點開始重新處理上游緩存的數據,追趕備份任務的狀態。考慮到這一點,系統可能會因為故障算子的重復處理產生反壓問題,但考慮到故障能夠被很快恢復,反壓問題可能并不嚴重。

Flink+在面臨算子子任務崩潰時的恢復過程整體變得較為復雜,但是避免了整個執行圖的重啟,只對故障的算子進行重啟,同時備份算子可以立即切換過來執行計算任務,加快了恢復過程

5 實驗與結果分析

為了驗證本文提出的輕量級故障恢復方案,在單機模式和分布式環境下分別對Flink系統(1.13.0版本)和改進后的Flink+系統進行測試。實驗采用WordCount任務來測試流式系統的計算、備份、故障恢復能力,數據源由英文版《哈利波特》構成。

1)實驗目的。針對流式系統備份開銷問題和故障恢復延遲問題,本文設計一種基于增量狀態備份的快照容錯方案,并在Apache Flink系統上實現了原型,本實驗的目的是為了驗證該方案的可行性和有效性,探究增量狀態對快照速度的影響以及上游輸出備份和狀態備份對故障恢復速度的影響。

2)實驗內容。利用WordCount流式計算任務,對Flink+系統的故障容錯能力進行驗證。實驗對單機和分布式集群下的Flink系統和Flink+系統進行對比探究,基于不同任務并行度,測試了Flink和Flink+的故障恢復速度。為了驗證改進部件對Flink系統本身的影響可以忽略不計,實驗也對同一任務下兩個系統的CPU占用率和內存占用率進行了測試。

3)單點故障。Flink中最小運行單位是Execution任務,會被分配給一個Java虛擬機(Java Virtual Machine, JVM)線程執行,本文將其在運算過程發生異常導致自身崩潰的問題定義為單點故障。

4)機器故障。單個機器上可能運行多個JVM,每個JVM可以運行多個執行管理器(TaskManager),每個執行管理器可以調度運行多個Execution任務。機器的故障會導致多個JVM的故障,進而導致多個單點故障。機器故障可以具化為多個單點故障,因此機器故障導致的多個單點故障如果相互無關,那么可以被視為多個單點故障的處理;如果多個單點故障有關系,可以把有聯系的單點故障認為是一個統一的大的單點故障,其恢復流程和普通的單點故障基本一致。因此實驗過程只針對單點故障。

5.1 實驗環境

實驗用到的物理機均為16核32線程的Linux機器,CPU為Intel Xeon E5?2650,主頻為2.00 GHz,每臺機器的內存為256 GB,操作系統均為Ubuntu 16.04.10。

Flink系統的部署主要分為Taskmanager和Jobmanager兩個部件:Jobmanager負責整個流式任務的調度執行;Taskmanager負責最小粒度單位“并行任務”的執行,其內部有slot,是用戶代碼實際執行的地方。

單機實驗模式下,實驗環境由1臺16核32線程的機器構成,Flink組件由1個Jobmanager和16個Taskmanager構成,每個Taskmanager僅包含一個slot執行單位。

分布式集群實驗模式下,實驗環境由8臺16核32線程的機器構成,Flink組件由1個Jobmanager(部署在控制機器上1上)和16個Taskmanager(每臺機器部署2個Taskmanager)構成,同樣的,每個Taskmanager僅由一個slot構成。

這里每個Taskmanager僅分配一個slot是為了使任務之間通過網絡通信而不是Taskmanager內部的內存空間通信,盡可能模擬真實大數據場景下的網絡通信場景。

5.2 實驗結果

從表1的實驗結果可以看出,基于本文設計改進后的Flink+系統在單機模式下的整體表現都明顯優于原Flink系統,其在系統故障后的恢復時間可以達到數十毫秒到數百毫秒級,而Flink系統面對單點故障需要注銷原有資源,重新拉起任務,恢復時間在秒級。

表 1 單機模式下Flink和Flink+系統恢復時間對比

為了探究并行度對故障恢復的影響,本文針對不同的并行度進行了實驗測試。實驗發現,對于同一任務的不同并行度,隨著任務并行度的增大,恢復時間減小比例基本上在逐漸增大,說明改進后的系統在高并行度下表現更加良好,相較于原Flink系統,更能適應并行化任務。觀察實驗結果還能發現隨著并行度的增加,兩個系統的故障恢復時間都呈現先減小后增大的趨勢。

造成這一現象的主要原因:單機下資源總數受限,一方面,提高并行度可以充分利用CPU的并行能力加快故障恢復,縮短恢復時間;而另一方面,并行任務增多意味著CPU負荷變大,一定程度上會延長恢復時間。當并行度逐漸接近CPU核數時,提高并行度帶來的增益被多任務的負荷抵消,甚至產生負增益,導致故障恢復時間增加。因此,可以看出流式任務的并行度并不是越大越好,適中的并行度有利于故障后的恢復。

考慮到實際環境中流計算任務往往是分布式進行,本文對分布式環境下的Flink+系統和Flink系統的故障恢復表現進行了實驗測試。從表2的實驗結果可以看出,改進后的Flink+系統在分布式環境下同樣表現優異,基本能夠維持在200 ms以內的故障恢復時間。但是因為分布式環境下不可避免的網絡通信延遲,系統在高并行度任務上并沒有單機模式那樣優異的表現,本研究認為這主要是網絡延遲帶來的影響。

表 2 分布式環境下Flink和Flink+系統恢復時間對比

因為單機模式和分布式集群的區別主要在于Taskmanager的部署位置不同,二者對于資源的使用大致相同,而實驗中給每個Taskmanager僅配置了1個slot資源,因此在分布式環境下當并行度達到32時,并行任務之間需要共享slot資源,此時,并行任務帶來的負增益大于并行計算帶來的正增益,因此整體恢復時間變長,正如實驗結果顯示,在并行度為32時,Flink系統和改進后的Flink+系統表現均有所下降。

為了驗證本文的容錯機制對于流式系統本身并沒有較大的影響,即可以在滿足輕量性的同時加快系統在故障時的恢復,本文針對Flink系統和改進后的Flink+系統進行了CPU和內存測試實驗。

實驗由2臺16核32線程的機器構成的集群進行,機器1負責kafka數據讀入和Jobmanager執行,機器2負責Taskmanager任務執行。通過對機器2上運行的Taskmanager的檢測,實驗獲取了執行同樣任務處理同樣數據情況下兩個系統的CPU和內存占用率結果,為了方便獲取數據,本實驗在較小規模下進行,僅在機器2上開啟了兩個Taskmanager進行測試。

如圖7所示,Flink和Flink+的整體執行過程幾乎完美吻合,二者在任務過程中占用率幾乎相等,CPU占用率維持在200%~300%,這說明改進后的Flink+并沒有給系統帶來較大的CPU負擔。通過計算兩個系統的平均CPU占用率可得:Flink系統的平均CPU占用率為264%,Flink+系統的平均占用率是258.8%,考慮到其他進程以及CPU占用率的波動,可以認為二者占用率基本一致,即本文的改進沒有給Flink系統帶來較大的負擔。

在內存方面,二者的占用率也維持在相同的水平,實驗結果顯示并無差距。

綜合實驗結果,可以得出如下結論:本文提出的輕量級快照容錯方案可以大幅減少故障恢復時間;流式任務的并行度太高會導致故障后恢復時間增加;本文提出的方案不會給流式系統本身帶來顯著CPU壓力和內存壓力。

圖 7 Flink與Flink+的CPU占用率對比

6 結語

本文針對流式系統分布式快照機制故障恢復慢的問題,提出了一種基于增量狀態備份的故障恢復方法Flink+,通過增量式狀態同步實現了快速狀態備份,通過上游輸出緩存與提前網絡連接進一步細化了故障恢復粒度并實現了快速故障恢復。實現結果表明,Flink+能夠在不顯著增加額外CPU、內存開銷的同時實現6~8倍的故障恢復加速。

[1] ZHANG Y Y, LI J X, ZHANG Y M, et al. FreeLauncher: lossless failure recovery of parameter servers with ultralight replication[C]// Proceedings of the IEEE 41st International Conference on Distributed Computing Systems. Piscataway: IEEE, 2021: 472-482.

[2] ZHANG Y Y, LI J X, SUN C G, et al. HotML: a DSM?based machine learning system for social networks[J]. Journal of Computational Science, 2018, 26: 478-487.

[3] CARBONE P, KATSIFODIMOS A, EWEN S, et al. Apache Flink: stream and batch processing in a single engine[J]. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 2015, 38(4): 28-38.

[4] CARBONE P, EWEN S, FóRA G, et al. State management in Apache Flink: consistent stateful distributed stream processing[J]. Proceedings of the VLDB Endowment, 2017, 10(12): 1718-1729.

[5] GARCíA?GIL D, RAMíREZ?GALLEGO S, GARCíA S, et al. A comparison on scalability for batch big data processing on Apache Spark and Apache Flink[J]. Big Data Analytics, 2017, 2: No.1.

[6] MENG X R, BRADLEY J, YAVUZ B, et al. MLlib: machine learning in Apache Spark[J]. Journal of Machine Learning Research, 2016, 17: 1-7.

[7] ZAHARIA M, DAS T, LI H Y, et al. Discretized streams: fault?tolerant streaming computation at scale[C]// Proceedings of the 24th ACM Symposium on Operating Systems Principles. New York: ACM, 2013: 423-438.

[8] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault?tolerant abstraction for in?memory cluster computing[C]// Proceedings of the 9th USENIX Symposium on Networked Systems Design and Implementation. Berkeley: USENIX Association, 2012: 15-28.

[9] ZAHARIA M, DAS T, LI H Y, et al. Discretized streams: an efficient and fault?tolerant model for stream processing on large clusters[C]// Proceedings of the 4th USENIX Workshop on Hot Topics in Cloud Computing. Berkeley: USENIX Association, 2012: No.10.

[10] TOSHNIWAL A, TANEJA S, SHUKLA A, et al. Storm@Twitter[C]// Proceedings of the 2014 ACM SIGMOD international Conference on Management of Data. New York: ACM, 2014: 147-156.

[11] IQBAL M H, SOOMRO T R. Big data analysis: Apache Storm perspective[J]. International Journal of Computer Trends and Technology, 2015, 19(1): 9-14.

[12] NOGHABI S A, PARAMASIVAM K, PAN Y, et al. Samza: stateful scalable stream processing at LinkedIn[J]. Proceedings of the VLDB Endowment, 2017, 10(12): 1634-1645.

[13] CHANDY K M, LAMPORT L. Distributed snapshots: Determining global states of distributed systems[J]. ACM Transactions on Computer Systems, 1985, 3(1): 63-75.

[14] CARBONE P, FóRA G, EWEN S, et al. Lightweight asynchronous snapshots for distributed dataflows[EB/OL]. (2015-06-29)[2021-12-15].https://arxiv.org/pdf/1506.08603.pdf.

[15] 段澤源. 大數據流式處理系統負載均衡與容錯機制的研究[D]. 北京:華北電力大學, 2017: 28-30.(DUAN Z Y. Research on load balancing and fault tolerant mechanism of big data stream processing system[D]. Beijing: North China Electric Power University, 2017:28-30.)

[16] 孫大為,張廣艷,鄭緯民. 大數據流式計算:關鍵技術及系統實例[J]. 軟件學報, 2014, 25(4):839-862.(SUN D W, ZHANG G Y, ZHENG W M. Big data stream computing: technologies and instances[J]. Journal of Software, 2014, 25(4): 839-862.)

[17] LI H L, WU J, JIANG Z, et al. Integrated recovery and task allocation for stream processing[C]// Proceedings of the IEEE 36th International Performance Computing and Communications Conference. Piscataway: IEEE, 2017: 1-8.

[18] LI H L, WU J, JIANG Z, et al. Task allocation for stream processing with recovery latency guarantee[C]// Proceedings of the 2017 IEEE International Conference on Cluster Computing. Piscataway: IEEE, 2017: 379-383.

[19] AKIDAU T, BALIKOV A, BEKIRO?LU K, et al. MillWheel: fault?tolerant stream processing at Internet scale[J]. Proceedings of the VLDB Endowment, 2013, 6(11): 1033-1044.

[20] GUO J, AGRAWAL G. Smart Streaming: a high?throughput fault? tolerant online processing system[C]// Proceedings of the 2020 IEEE International Parallel and Distributed Processing Symposium Workshops. Piscataway: IEEE, 2020: 396?405.

[21] LIN C F, ZHAN J J, CHEN H H, et al. Ares: a high performance and fault?tolerant distributed stream processing system[C]// Proceedings of the IEEE 26th International Conference on Network Protocols. Piscataway: IEEE, 2018: 176-186.

[22] VENKATARAMAN S, PANDA A, OUSTERHOUT K, et al. Drizzle: fast and adaptable stream processing at scale[C]// Proceedings of the 26th Symposium on Operating Systems Principles. New York: ACM, 2017: 374-389.

[23] LIU P C, XU H L, DA SILVA D, et al. FP4S: fragment?based parallel state recovery for stateful stream applications[C]// Proceedings of the 2020 IEEE International Parallel and Distributed Processing Symposium. Piscataway: IEEE, 2020: 1102-1111.

[24] XU H L, LIU P C, CRUZ?DIAZ S, et al. SR3: customizable recovery for stateful stream processing systems[C]// Proceedings of the 21st International Middleware Conference. New York: ACM, 2020: 251-264.

[25] RENESSE R van, SCHNEIDER F B. Chain replication for supporting high throughput and availability[C]// Proceedings of the 6th Symposium on Operating Systems Design and Implementation. Berkeley: USENIX Association, 2004: 91-104.

[26] TERRACE J, FREEDMAN M J. Object storage on CRAQ: high? throughput chain replication for read?mostly workloads[C]// Proceedings of the 2009 USENIX Annual Technical Conference. Berkeley: USENIX Association, 2009: No.11.

Efficient failure recovery method for stream data processing system

LIU Yang1,2,3, ZHANG Yangyang1,2, ZHOU Haoyi1,2,4*

(1,,100191,;2,,100191,;3,,100191,;4,,100191,)

Focusing on the issue that the single point of failure cannot be efficiently handled by streaming data processing system Flink, a new fault?tolerant system based on incremental state and backup, Flink+, was proposed. Firstly, backup operators and data paths were established in advance. Secondly, the output data in the data flow diagram was cached, and disks were used if necessary. Thirdly, task state synchronization was performed during system snapshots. Finally, backup tasks and cached data were used to recover calculation in case of system failure. In the system experiment and test, Flink+ dose not significantly increase the additional fault tolerance overhead during fault?free operation; when dealing with the single point of failure in both single?machine and distributed environments, compared with Flink system, the proposed system has the failure recovery time reduced by 96.98% in single?machine 8?task parallelism and by 88.75% in distributed 16?task parallelism. Experimental results show that using incremental state and backup method together can effectively reduce the recovery time of the single point of failure of the stream system and enhance the robustness of the system.

stream data processing system; failure recovery; distributed checkpoint; state backup; Apache Flink

This work is partially supported by National Natural Science Foundation of China (U20B2053, 61872022), Open Project of State Key Laboratory of Software Development Environment (SKLSDE?2020ZX?12).

LIU Yang, born in 1999, Ph. D. candidate. His research interests include distributed systems, graph processing systems.

ZHANG Yangyang, born in 1991, Ph. D. candidate. His research interests include distributed systems, machine learning, graph processing.

ZHOU Haoyi, born in 1991, Ph. D., lecturer. His research interests include big data system, machine learning.

1001-9081(2022)11-3337-09

10.11772/j.issn.1001-9081.2021122108

2021?12?15;

2022?02?27;

2022?03?04。

國家自然科學基金資助項目(U20B2053, 61872022);軟件開發環境國家重點實驗室開放課題(SKLSDE?2020ZX?12)。

TP311.5

A

劉陽(1999—),男,山西大同人,博士研究生,CCF會員,主要研究方向:分布式系統、圖計算系統;張揚揚(1991—),男,河北保定人,博士研究生,CCF會員,主要研究方向:分布式系統、機器學習、圖計算;周號益(1991—),男,四川德陽人,講師,博士,CCF會員,主要研究方向:大數據系統、機器學習。

猜你喜歡
故障系統
Smartflower POP 一體式光伏系統
工業設計(2022年8期)2022-09-09 07:43:20
WJ-700無人機系統
ZC系列無人機遙感系統
北京測繪(2020年12期)2020-12-29 01:33:58
故障一點通
基于PowerPC+FPGA顯示系統
半沸制皂系統(下)
連通與提升系統的最后一塊拼圖 Audiolab 傲立 M-DAC mini
奔馳R320車ABS、ESP故障燈異常點亮
故障一點通
故障一點通
主站蜘蛛池模板: 亚洲日韩国产精品无码专区| 亚洲精品动漫在线观看| 免费不卡视频| 亚洲熟女中文字幕男人总站| 九色在线视频导航91| 日韩国产高清无码| 国产女人在线视频| 日本一区二区三区精品国产| 麻豆精品在线播放| 成人a免费α片在线视频网站| 亚洲v日韩v欧美在线观看| 新SSS无码手机在线观看| 国产成人av一区二区三区| 91久久偷偷做嫩草影院精品| 日韩资源站| 99性视频| 国产成年无码AⅤ片在线| 福利在线一区| 亚洲欧美不卡| 综合久久久久久久综合网| 日韩欧美中文字幕在线精品| 8090成人午夜精品| 99热这里只有免费国产精品| 日本人妻丰满熟妇区| 亚洲无线一二三四区男男| 欧美精品一区二区三区中文字幕| 国产精品区网红主播在线观看| 亚洲色无码专线精品观看| 色悠久久久久久久综合网伊人| YW尤物AV无码国产在线观看| 日韩av无码DVD| 一本无码在线观看| 国产一区二区精品福利| 午夜福利在线观看成人| 青青草久久伊人| 国产精品蜜臀| 亚洲色欲色欲www网| 日韩二区三区无| 亚洲第一色网站| 国产欧美视频一区二区三区| 亚洲天堂福利视频| 国产小视频免费观看| 高清国产在线| 国产精品污视频| 天堂久久久久久中文字幕| 国产白浆一区二区三区视频在线| 日韩在线视频网站| 久久永久视频| 色综合久久无码网| 国产精女同一区二区三区久| 国产精品久久久久久搜索| 国产在线精彩视频论坛| 国产微拍一区| 成AV人片一区二区三区久久| 国产精品视频999| 日本免费a视频| 黄色网站在线观看无码| 国产日韩精品欧美一区喷| 欧美午夜网站| 国产精品专区第1页| 国产精品自拍合集| 亚洲码一区二区三区| 99视频只有精品| 亚洲国产中文欧美在线人成大黄瓜| 国产一区二区三区在线精品专区| 狠狠色综合网| 免费视频在线2021入口| 亚洲免费播放| 日韩黄色大片免费看| 鲁鲁鲁爽爽爽在线视频观看| 性视频一区| 亚洲第一中文字幕| 精品自拍视频在线观看| 国产99欧美精品久久精品久久| 欧美福利在线观看| 日韩欧美国产精品| 免费人成在线观看成人片| www.99在线观看| 国产综合网站| 在线精品亚洲国产| 久久精品无码专区免费| 久久久久久国产精品mv|