汪志峰 ,趙宇海* ,王國仁
(1.東北大學計算機科學與工程學院,沈陽,110169;2.北京理工大學計算機學院,北京,100081)
近年來,由于數據量呈指數式增長,處理大數據[1]的方式也發生了很大變化,迄今已經歷三代引擎的改進:第一代以Hadoop[2]為代表,利用MapReduce[3]進行大數據處理;第二代以Spark[4]為代表,是基于RDD(基于內存計算)的微批流處理框架;第三代是以Flink 為代表的面向流,保證Exactly Once 的實時流處理框架.Flink[5]的計算平臺可以實現毫秒級延遲下每秒處理上億次的消息或者事件;同時,Flink 還提供一種Exactlyonce[6]的一致性語義,保證數據的正確性,使Flink大數據引擎可以提供金融級的數據處理能力.高吞吐、低延遲的性能使Flink 成為目前流處理的首選.與此同時,各個公司處理大數據的方式也發生了很大變化,例如阿里巴巴、滴滴出行、美團都大規模使用Flink 集群,阿里巴巴還開源自己的Blink 給Flink 社區,給Flink 帶來了更好的性能優化以及方便的SQL 環境.如圖1 所示的流程中,上游是事務處理、日志、點擊事件等,經過Flink 的流處理到達下游;下游是處理之后的數據存儲到數據庫里或直接被應用所利用.Flink 在其中可以提供低延遲、高吞吐、Exactly Once 的處理.

圖1 Flink 應用場景Fig.1 Flink application scenario
任務調度是Flink 很重要的功能.Flink 通過JobManger 任務調度器管理Slot,把任務分配到合適的Slot 等待TaskManager 執行.Flink 的任務調度圖如圖2 所示,Flink 集群啟動后首先會啟動一個JobManger 和一個或多個TaskManager,Client提交任務給JobManager,JobManager 再調度任務到各個TaskManager 去執行.在這個過程中TaskManager 把心跳和任務處理信息匯報給Job-Manager,TaskManager 之間以流的形式進行數據傳輸.在集群運行的過程中,如果流任務失敗則利用快照加檢查點的形式恢復,如果批任務失敗則重新開始.在把任務分配到具體的Slot 的過程中,優先選擇符合Local 屬性的節點.如果有Slot-SharingGroup 限制,則在SlotSharingGroup 里再創建一個SimpleSlot;如果有CoLocationGroup 限制,則必須在同一個CoLocationGroup 里創建一個SimpleSlot;如果沒有上述限制,則從Slot 集合里挑一個.

圖2 Flink 任務調度圖Fig.2 Flink task scheduling diagram
所以,Flink 的默認調度策略是輪詢,每個任務需要去可用的Slot 集合順序選擇一個Slot 分配給這個Task.在調度過程中,屬于同一個Task 的所有SubTask 不能分配在Slot 里面,因為任務需要分布式運行,所以不同的子任務必須分配在不同的Slot 里.
Flink 默認的調度策略把所有節點認作同等性能(這里的性能指CPU 的處理能力、內存等),但在實際的集群搭建部署過程中,集群中的節點可能性能相差較大.若在這種異構的集群中采用輪詢的調度策略就可能因為沒有考慮每個節點的不同負載能力,使性能較差的節點和性能較好的節點被分配同樣的任務.性能較差的節點負載過多的Task 會影響整體Job 的運行效率,使集群的吞吐降低、延時增加,延長整個Job 的運行時間,因此在異構集群中根據集群各個節點的不同性能調整任務的分配成為提升異構集群整體性能的關鍵.基于此,本文提出自適應負載均衡算法,提升系統的吞吐量、降低延時、減少Job 的運行時間,使集群的整體性能有較大的提升.
本文的主要貢獻:
(1)通過對Hadoop 和Spark 的研究發現,在異構集群中應用默認的輪詢調度策略沒有考慮節點的處理性能,導致集群執行效率的下降.本文通過實驗證明Flink 系統中也會出現這種現象,即異構Flink 集群中可能存在由于任務分配不均衡導致Job 完成時間被拖長、吞吐量降低、延時增加.
(2)提出異構集群中平滑加權輪詢任務調度算法(Smooth Weighted Polling Task Scheduling,SWPTS).根據集群中每個節點的性能權重Ew,按從大到小的次序,依次給每個節點的Slot 分配SubTask.與此同時,記錄節點當前的權重Cw,降低Cw最大的節點來避免有效權重大的節點被連續多次選中,從而使集群在開始調度時就能保持負載均衡.
(3)提出基于蟻群算法的任務調度算法(Task Scheduling Algorithm Based on Ant Colony Algorithm,ACTS).在集群運行過程中,當集群資源的使用高于預設值ε時則使用ACTS 尋找全局最優任務分配方案,使每個SubTask 被分配到合適的Slot 上,讓整個集群的效率達到最高.
(4)在真實數據集和人工數據集上進行了實驗分析和驗證,結果表明,在Flink 集群里應用SWPTS 和ACTS 確實對縮短Job 運行時間、提高吞吐量、降低延時有很好的效果.
在流計算框架中任務調度是很重要的一個模塊,負責把Task 調度給Slave 的Slot 執行,不同的流計算引擎盡管實現方式不同,但實現的功能都是把任務根據某種算法分配給指定的Slave 執行.
Hadoop 提供可插拔的任務調度器,它根據用戶的需求選擇合適的調度方案,并可以隨時切換.Hadoop 的任務調度算法有三種.(1)FIFO 調度器[7].FIFO 調度器是最開始被集成到Hadoop里的,Task 按FIFO 的順序進入大工作隊列,Job-Tracker 從工作隊列里取最先到達的Task.這種調度策略沒有考慮作業的優先級和作業的大小,但這種策略最容易實現,也是有效率的.(2)公平調度器(Fair Scheduler)[8].公平調度是一種分配作業資源的策略,目的是讓所有的作業隨著時間的推移都能平均地獲取等同的共享資源,所有Job 享有相同的資源.(3)計算能力調度器(Capacity Scheduler)[9]支持多個隊列,每個隊列可配置一定的資源量并采用FIFO 調度策略.為防止同一個用戶的作業獨占隊列中的資源,該調度器會對同一用戶提交的作業所占的資源量進行限定,優先執行占用最小、優先級高的作業.
Lee and Chung[10]提出一個針對Hadoop 的截止時間約束調度算法,使用統計學方法來測量數據節點的性能,并基于該信息創建檢查點來監視作業的進度;根據每個檢查點的作業進度,算法將任務分配給不同的作業隊列.Kc and Anyanwu[11]提出基于工作執行代價模型來滿足用戶規定的數據處理截止日期的調度算法,這些工作執行代價包括map 和reduce 兩個階段的運行時間、map 和reduce 輸入數據的規模和分布.
Spark 是當下十分流行的流計算框架.默認情況下,Spark 調度器按照FIFO(先進先出)[12]的形式來調度任務,每個工作被分為多個“階段”(如map 和reduce 語句).對于所有可用的資源,第一個工作的優先級最高,其任務即被啟動;之后是第二個,依次類推.如果集群不需要隊列頭的工作,后面的工作將被立刻啟動;如果隊列頭的工作很大,則后面的工作可能被大大推遲.Spark 后來的版本模仿Hadoop 的公平調度器[13]也添加了公平的調度策池,不同的工作可以被分到不同的組,每組對應一個任務池,不同的任務池設置不同的調度選項(權重).
Mao et al[14]提出用機器學習領域的增強學習和神經網絡,在無須手動設置最小工作完成時間的情況下得到一種在指定工作負載情況下的調度算法,在這個過程中設計可擴展的RL 模型,并發明RL 訓練方法處理連續到來的隨機Job.Ren et al[15]設計了第一個分散感知調度器Hopper,為了提供可拓展性和可預測性,Hopper 被設計成一個分散的Job 調度器,它把資源分配給Job 的同時也能預測拖慢工作進度的子任務,從而采取合理算法降低系統的延時.
Storm 里也有很多對Storm 的調度器進行改進的工作.Peng et al[16]在2015 年提出R-Storm 資源感知調度器,通過增加最大化資源利用率來提升總吞吐量,同時最小化網絡延遲,在任務調度時R-Storm 可以滿足軟資源和硬資源的限制以及最小化組件之間的網絡通信代價,在標準的Yahoo基準測試下吞吐量提高30%~47%,CPU 利用率提高69%~350%.Chen and Lee[17]發現Storm 默認使用輪詢的算法來分配任務,這對異構計算環境不是最佳,于是提出G-Storm 調度算法.GStorm 利用集群節點GPU 來加速整體性能,實驗結果表明,與原始Storm 調度算法相比,G-Storm在輕量級和高負載拓撲上可以實現1.65 倍至2.04 倍的性能提升.
目前針對異構集群的負載均衡算法沒有通用的高效算法,大部分還是通過感應集群中某種資源來作出動態調度,或利用集群的其他計算能力(如GPU)來負載均衡,通常存在三個問題:(1)這些算法只從單一的性能指標去實現負載均衡,沒有綜合多個性能指標來思考;(2)雖然其中有加權輪詢的算法,但這種加權在選擇節點時沒有考慮平滑,導致有些權重大的節點被連續選中,造成短時間內局部負載過重,影響節點效率;(3)這些算法只考慮負載均衡,沒有考慮集群負載均衡算法后已使用的資源高于設定的閾值時的處理.
針對以上問題,本文改進和實現如下:(1)考慮CPU 利用率、內存利用率和總內存這三個關鍵性能指標,而非單一指標,并且動態地監控三個性能指標來作出調度決策;(2)不僅考慮多個性能指標的加權,而且在加權過程中進行平滑處理,即避免讓權重大的節點連續被選中造成局部短時間內負載過重;(3)在集群已使用的資源高于設定閾值后采用ACTS,能夠在一定的迭代次數內得到全局最優任務分配方案,按照這個方案調度任務來重新負載均衡,也能使集群處于最佳狀態.
本研究考慮Flink 集群CPU 利用率和內存利用率等性能指標,在初始調度階段采用SWPTS,在集群已使用資源高于設定閾值的情況下采用ACTS,經過實驗驗證,能夠在Job 的運行時間、延時、吞吐量等性能指標上有明顯提升.
定義1 內存使用率(Mu)這是描述計算機的重要性能指標之一,可描述集群中已使用的內存占用總內存的比重.定義如下:

其中,memfree表示剩余的內存,memtotal表示總內存大小.
定義2 CPU 使用率(Cu)它描述CPU 當前被占用的情況,使用率越大表示CPU 越忙,在這種情況下CPU 很難空出時鐘周期運行即將提交該節點的任務.定義如下:

定義3 有效權重(Ew)這是在初始調度時根據CPU 使用率、內存使用率等性能指標得到的一個綜合權重,可衡量節點的總體資源使用情況.計算方式如下:

定義4 當前權重(Cw)它可以用來衡量運行過程中每個節點被選中的權重,每經過一次調度當前權重都會發生變化,可以用來挑選最合適的節點.定義如下:

其中,Cwlast表示上次調度時的有效權重.
定義5 任務執行時間(timeMatrix[ i ][ j ])它表示任務i分配給節點j所需的時間,在算法執行的初始時刻初始化.定義如下:

定義6 最大信息素概率(maxphe)它主要用來確定螞蟻的臨界下標(indexbound),即確定一個臨界螞蟻編號,在這個編號之前的螞蟻選擇信息素最大的節點從而把任務分配給該節點,而這個節點之后的螞蟻選擇一個隨機的節點,把任務分配給該節點.定義如下:

其中,pheMatrix[i]是節點i的信息素.臨界下標的定義如下:

本節詳細介紹集群資源監控、SWPTS 和ACTS.集群資源監控主要負責監控集群中每個節點的CPU 使用率、內存使用率,并把這些數據通過Http 協議[18]發送給Flink 的JobManager 作為性能數據來確定權重.與傳統的負載均衡算法不同,SWPTS 和ACTS 是基于集群資源的動態負載均衡算法.
3.1 集群資源監控集群資源監控主要負責為調度器提供性能數據,Flink 集群的JobManager根據性能數據來確定每個Slave 的權重,調度器利用權重來實現整體的調度負載均衡.集群資源監控整體的架構如圖3 所示,每個slave 節點利用正則表達式解析/proc 目錄下的cpuinfo 和meminfo 這兩個文件得到每個時刻CPU 的使用率和內存使用率,然后通過Socket 通信把數據發送給Master.Master 中的Redis[19]完成性能數據的持久化處理,同時通過Web 技術對外提供Http 接口,Flink 集群可以通過RPC 調用獲取集群中各個節點的實時性能數據.

圖3 資源調度架構圖Fig.3 The diagram of resource scheduling architecture
3.2 SWPTS 總覽初始化bestInstance和total-Weight.其中bestInstance用來保存最后選中的最好的DataMetric,它在每次循環過程中都記錄當前節點為止Cw最大的DataMetric.totalWeight則在每次循環過程中累加每個DataMetric的Ew,循環結束時,totalWeight保存所有節點Ew的和.
在每次循環過程中,每個DataMetric的Cw都會加上它自身的Ew,并用total累加當前Ew.如果當前DataMetric的Cw大于bestInstance的Cw,則記錄bestInstance到目前為止具有最大Cw的DataMetric.
在循環完成之后,找到當前的最優的Data-Metric,它有最大的Cw.為使該DataMetric不被重復選中,需要降低它的Cw.即用bestInstance的Cw減去total,并返回這個最優的bestInstance.具體見算法1.

SWPTS 主要是在Job 運行的初始階段把任務分配給TaskManager 的Slot.為減少頻繁Rpc帶來的時間影響,在初始的調度過程中JobManager 沒有一直請求集群資源監控的性能數據,而是在第一次請求后緩存數據,在后續任務到來時SWPTS 利用初始緩存的性能數據,再用ACTS選擇一個節點的Slot 分配給Task.
以一個有三個Slave 節點的集群為例,假設weight(Slave1)∶weight(Slave2)∶weight(Slave3)=3∶1∶2,集群的前六次調度如表1 所示.
可以看到,Slave1 的權重最大,被調度的次數為3;Slave3 的權重次之,被調度的次數為2;Slave2 的權重最小,被調度的次數為1.上述調度體現了加權,即權重大的節點被調度選中的次數也多;同時也體現了平滑的特點,權重大的節點沒有被連續選中,而是被間斷地選中.綜上所述,SWPTS 能使集群調度負載均衡且避免權重較大的節點在局部短時間內負載過重.

表1 Flink 任務調度過程Table 1 Flink task scheduling process
3.3 ACTS 總覽SWPTS 主要用在初始調度,但當集群中已使用資源高于設定閾值ε時,如果不考慮全局的最優調度,則資源有可能進一步降低,負載持續不均衡.此時采用ACTS,可以得到一種全局的任務分配方案,按照這個方案把Task分配給指定的Slot 運行能使集群處于最佳運行狀態,此時集群資源的利用率最好.
蟻群算法(Ant Colony Algorithm)是一種模擬螞蟻覓食行為的模擬優化算法,由意大利學者Dorigo and Gambardella[20]于1991 年首先提出,并首先使用在解決TSP(旅行商問題)[21]上.蟻群算法的基本原理如下:
(1)螞蟻在隨意行走的路徑上釋放信息素[22],這些信息素有利于后面的螞蟻繼續尋找路徑.
(2)碰到沒走過的路口就隨機挑選一條路走,同時釋放與路徑長度有關的信息素.
(3)信息素濃度與路徑長度成反比.后來的螞蟻再次碰到該路口時,選擇信息素濃度較高的路徑.
(4)隨著螞蟻覓食過程中信息素的不斷累積,最優路徑上的信息素濃度越來越大,讓后續的螞蟻更多地選擇信息素濃度高的這條路徑.
(5)一段時間后,蟻群找到最優覓食路徑,是一條全局的最短路徑,即距離食物源最近.

圖4 蟻群算法示意圖Fig.4 Ant colony algorithm
ACTS 就是根據蟻群算法改造的.經典的蟻群算法是尋找一條最短路徑,而ACTS 的目標是尋找一種全局最優分配方案,使Task 的完成時間最短、資源使用率最好、集群重新負載均衡.ACTS 以Task 的運行時間為衡量信息素增減的標準,由每只螞蟻把每個任務分配給對應的節點.由于分布式并行的特點,每只螞蟻取運行時間最長的Task 所需要的時間作為Job 的完成時間.
算法的整體框架如算法2 所示,主要包括三部分:第一部分是初始化任務集合,第二部分是初始化信息素矩陣,第三部分是迭代搜索.

步驟1,初始化任務矩陣.timeMatrix[i][j]表示任務i分配給節點j所需的時間.根據任務集合tasks 和節點集合nodes,應用式(5)計算每個任務在每個節點完成需要的時間.表2 是一個示例,Task[ 1 ],Task[ 2 ],Task[ 3 ]對應的數據規模大小分別是10,8,6;Node[ 1 ],Node[ 2 ],Node[ 3 ]對應的處理能力分別為2,2,1;每個任務在每個節點完成需要的時間如表2 所示.

表2 初始化任務矩陣Table 2 Initialization task matrix
步驟2,初始化信息素矩陣.將負載均衡調度過程中的一次任務分配看作一條路徑,因此pheromoneMatrix[i][j]表示將任務i分配給節點j這條路徑的信息素濃度.初始化信息素矩陣,將所有值置為1,因為初始時所有的路徑都沒有螞蟻選擇,默認為1.
步驟3,迭代搜索.這是ACTS 最關鍵的一步,由三部分組成:迭代分配任務、計算每只螞蟻的任務處理時間、更新信息素.整個迭代搜索的算法框架如算法3 所示.

算法3 中,第2 行表示初始化Path_AllAnt,它是一個三維數組,用來存保第iteratorCount次迭代過程中第antCount只螞蟻將i個任務分配給j個節點處理.第4 行是初始化Path_OneAnt,表示第antCount只螞蟻將i任務分配給j節點處理.第5~8 行表示循環每個任務,為每個任務通過任務分配函數asignTask 分配到一個節點node,給相應的Path_OneAnt,Path_AllAnt賦值.第9 行和第10 行是計算本次迭代中所有螞蟻的任務處理時間,并將所有螞蟻的任務處理時間加入總結果集resultData.第11 行是更新信息素.最后返回resultData.
在整個蟻群算法中,共進行iteratorNum次迭代.每次迭代都會產生當前的最優分配策略,即“局部最優解”,迭代的次數越多,局部最優解就越接近全局最優解.但迭代次數過多會造成調度器大量的時間和性能開銷,無法滿足海量任務的調度;而迭代次數太少,則得到的可能不是全局最優解.本文采用固定迭代次數為100 次.
任務分配函數負責將一個指定的任務按照某種策略分配給某一節點處理.分配策略有兩種:(1)按信息素濃度分配,即是將任務分配給本行中信息素濃度最高的節點處理.例如:當前的任務編號是taskCount,當前的信息素濃度矩陣是phe-romoneMatrix,則任務會分配給pheromoneMatrix[taskCount]這一行中信息素濃度最高的節點.(2)隨機分配,即將任務隨意分配給某個節點處理,一般根據螞蟻的編號antNum來選擇.boundPointMatrix[i]=5 表示編號為0~5 的螞蟻在分配任務i的時候采用“按信息素濃度”的方式分配,即將任務i分配給信息素濃度最高的節點處理;而編號為6~9 的螞蟻在分配任務i時,采用隨機分配策略.
當閥門結構是非平衡式軟密封單座結構時,閥芯導向在閥門套筒內且全行程導向,從而可以保證閥門打開的時候介質在閥芯側向流動,該處閥芯外表面不參與密封。軟密封閥芯鑲嵌在閥芯體內,閥芯密封面在凹槽底部,閥座密封面使用的是一個凸臺,兩者接觸時為平面密封。當閥門關到底部處于關閉狀態時,閥座的凸起表面嵌入軟密封環中,該處的密封材料變形包圍住閥座凸起表面,由此確保可靠的密封效果,同時閥桿也不用承受過大推力。由于整個密封墊片鑲嵌在凹槽內,可以使得閥芯閥座接觸后的軟密封材料PTFE變形得到有效控制,從而保證閥芯閥座在有效行程內密封可靠。
蟻群算法有三個問題需要注意:
(1)計算任務處理問題.由于Flink 集群里任務都是并行運行的,因此在計算任務處理時通常以最晚完成的任務的時間為整個Job 的完成時間.
(2)更新信息素問題.將所有路徑的信息素降低m%,表示信息素的揮發;找出所有螞蟻的最短路徑,則該路徑的信息素提升n%,表示該路徑是最短路徑,信息素不斷提升.
(3)更新boundPointMatrix問題.bound-PointMatrix表示臨界螞蟻下標集合,在該下標之前的螞蟻選擇信息素濃度最高的節點分配,在該下標之后的螞蟻選擇隨機一個節點分配,計算方式如式(6)和式(7)所示.
本文的調度器及算法均已在Flink 中實現.通過修改Flink runtime 包下面的調度模塊,可添加數據HTTP Api 數據訪問和數據解析器.整個系統依賴Redis 和maskmonitor 性能監視應用.使用SWPTS 和使用默認調度算法的Flink 系統在延時、吞吐量、運行時間方面做了對比和分析,并在不同的數據集和不同并行度上進行了實驗驗證.實驗使用Flink 自帶的實例WordCount.由于臨界資源閾值是個經驗值,不同閾值效率對比如圖5.根據圖5 可以看出閾值為80%效率最高,因此本文實驗默認閾值設定為80%.

圖5 Flink 臨界閾值效率對比Fig.5 Comparison of Flink critical threshold efficiency
4.1 數據集針對WordCount 使用真實數據集和模擬數據集.真實數據集是TPC-C[23],用九個表生成模擬五種事務處理,產生三個大數據集,模擬真實場景下的批計算.此外,自己寫程序生成三個只包括字符串的模擬數據集,用以擴充實驗測試.數據集的來源和規模如表3 所示.

表3 數據集規模Table 3 The size of datasets
4.2 實驗環境及配置SWPTS 和ACTS 均基于Flink1.4.2,編程語言為Java.實驗所用的集群硬件配置和參數如下:
實驗環境:分布式集群由一臺服務器和兩臺虛擬主機構成,兩臺虛擬主機模擬異構集群的效果,服務器主機為Master 節點,兩臺虛擬機為Slave 節點.
Master 節點的配置如下:CPU 為Intel(R)Core(TM)i7-6700,4 Core,主頻3.40 GHz;內存為64 GB 2133 MHz;機械硬盤為WDC WD10 EZEX -08WN4A0 1 TB;編程環境為IntelliJ IDEA 2018,Maven,Git;操作系統為Ubuntu 16.04.5;Flink 版本1.4.2,JDK 版本1.8.0_151,Hadoop 版本2.7.5.
Slave1 的配置如下:CPU 為Intel(R)Core(TM)i5-4690,4 Core,主頻3.50 GHz;內存為4 GB 2133 MHz;機械硬盤60 GB;編程環境為IntelliJIDEA 2018,Maven,Git;操作系統為Ubuntu 16.04.5;Flink 版本1.4.2,JDK 版本1.8.0_151,Hadoop 版本2.7.5.
Slave2 的配置如下:CPU 為Intel(R)Core(TM)i5-4690,2 Core,主頻3.50 GHz;內存為2 GB 2133 MHz;機械硬盤60 GB;編程環境為IntelliJ IDEA 2018,Maven,Git;操作系統為Ubuntu 16.04.5;Flink 版本1.4.2,JDK 版本1.8.0_151,Hadoop 版本2.7.5.
4.3 實驗結果及分析通過修改Flink1.4.2 的默認任務調度器為自適應任務調度器,新增SWPTS 和ACTS 兩種負載均衡調度算法.在異構集群中不同的數據集下分別與Flink1.4.2 在運行時間、延時、吞吐量[24]方面做性能對比,可以看出改進之后的算法在三個方面都有所提升.
4.3.1 運行時間對比分析通常運行時間是一個可以讓用戶對比算法性能的最直觀的指標.運行快說明算法節省了Job 的完成時間,加快了用戶響應時間,對提升性能非常重要.SWPTS 和ACTS 能將任務盡可能多地分配給資源豐富的節點,應用到自適應任務調度器中能使資源豐富的節點更多地接收Task,比Flink 默認調度器的負載更均衡,運行時間也相應縮短.
在并行度為8 和16 時,在自己程序生成的數據集(圖6)和TPC-C 數據集(圖7)上進行對比實驗,可以看到,應用自適應任務調度器(LoadbalanceFlink)之后的運行時間比默認任務調度器(NativeFlink)平均減少8%左右,這是因為前者將任務均勻分配,資源多的節點可以完成盡可能多的Task,縮短了整體的完成時間,而默認任務調度器讓資源少的節點完成和其他節點同等的任務,拖慢了整體工作的完成進度.
4.3.2 吞吐量對比分析吞吐量(Throughput)指單位時間內由計算引擎成功處理的數據量,反映系統的負載能力.吞吐量常用于資源規劃,也能協助分析系統性能瓶頸,從而進行相應的資源調整以保證系統達到用戶要求的處理能力.

圖6 Flink 運行時間對比(自定義數據集)Fig.6 Flink runtime on custom dataset

圖7 Flink 運行時間對比(TPC-C 數據集)Fig.7 Flink runtime on TPC-C dataset
實驗使用阿里巴巴提供的advertising 工具,它是標準流測試工具yahoo stream benchmark 的簡化版.測試原理是隨機產生兩個廣告流,把ad_id 相同的join 起來存放到Redis 里,通過單位時間內在Redis 讀到多少條數據來計算吞吐量.計算如式(8)所示:

其中,currentNum代表當前讀到的數據編號,即已經讀到多少條數據;lastNum表示前一次讀到的數據編號;currentTime表示當前時間,lastTime表示上一次的時間.
從實驗結果(圖8)可以看出,改進之后的Flink 自適應負載均衡算法比默認任務調度算法的吞吐量更高,原因同前,本文算法能使負載均衡,即資源多的節點負載較多的任務,則整個集群在單位時間內能完成更多的任務,吞吐量增大.

圖8 Flink 不同并行度下的吞吐量對比Fig.8 Throughput of Flink with different parallelism
4.3.3 延時對比分析延時(latency)指數據從進入數據窗口的時間到真正被處理的時間間隔,單位為毫秒(ms),反映系統處理的實時性.金融交易分析等大量實時計算業務對延遲要求較高,因為延時越小,數據實時性越強.
實驗使用阿里巴巴提供的advertising 工具,原理是隨機產生兩個廣告流,把ad_id 相同的join起來存放到Redis 里.計算如式(9)所示:

其中,handleTime表示某條記錄實際被處理的時間,windowTime表示流里面該記錄屬于的時間窗口開始時間.
從實驗結果(圖9)可以看出,本文改進之后的Flink 自適應負載均衡算法比默認的調度算法延時更小,原因亦同前,本文算法能夠實現負載均衡,使資源多的節點能負載較多的任務,則每個任務在被處理之前需要等待的時間也相應變短,即延時變小.

圖9 Flink 不同并行度的延時對比Fig.9 Delay of Flink with different parallelism
上述在運行時間、吞吐量、延時三方面的實驗表明:SWPTS 和ACTS 改變了任務的默認分配策略,可以盡量按動態資源的大小將任務優先分配給資源較多的節點,解決異構集群負載不均衡的問題.
本文提出的自適應負載均衡算法由平滑加權輪詢任務調度算法(SWPTS)和基于蟻群算法的任務調度算法(ACTS)組成.經過實驗驗證,在異構Flink 集群的環境下,自適應負載均衡算法的運行時間、吞吐量和延時與默認調度算法相比都有所提升.在運行初期,利用SWPTS 負載均衡,使得任務在初始分配的時候負載均衡.在運行過程中,當集群已使用資源高于設定閾值時,采用ACTS 尋找一種全局最優分配方案,也能重新均衡負載.等已使用資源低于設定閾值時,則繼續采用之前的算法進行調度.