季航旭,姜 蘇,趙宇海,吳 剛,王國仁
(1.東北大學計算機科學與工程學院,遼寧 沈陽 110819;2.北京理工大學計算機學院,北京 100081)
隨著信息技術的快速發展,各個領域積累的數據量日漸增多。數據量的增加以及數據挖掘算法的研究與普及,使得人們越來越重視數據中隱含的價值,因此如何快速地從數據中獲取有價值的信息成為各個研究領域的關注點。為了應對快速增長的數據,人們開發出了一代又一代大數據處理系統并產生了大量相關的優化技術。目前比較流行的大數據處理系統有Hadoop[1]、Storm[2]、Samza[3]、Spark[4,5]和Flink[6]等,它們都采用分布式集群的方式進行平臺的搭建和系統的部署,并有著各自獨特的優勢。
目前,大數據計算系統已經普及,基于它們的數據查詢和數據分析等任務也日益復雜化、多樣化,如實時智能推薦、復雜事件處理等。分布式計算系統經常面臨的挑戰是資源分配與作業調度,這是分布式環境與生俱來的問題。由于分布式環境存在計算資源異構、帶寬異構和單個作業內部計算方式復雜等情況,作業執行過程中經常出現由于資源分配不合理、調度優化不足導致的效率低、吞吐量低等缺點。更加令人堪憂的是,分布式計算具有計算結點規模大、計算任務復雜等特點,計算引擎往往要同時運行復雜繁多的分布式多作業,也就是所謂的分布式多作業。分布式多作業相比單作業在作業執行過程中將更加不利于計算資源的充分利用,這對于分布式大數據任務的執行將更加雪上加霜。目前,仍然沒有一個完美的資源分配與管理機制滿足分布式多作業的需求,因此資源的合理分配與回收仍然是提升大數據處理系統計算性能的關鍵。
現在最常用的大數據計算系統(如Flink、Spark)都是以多層執行圖(Graph)的方式表示作業的具體信息與執行過程。多層執行圖是計算系統在作業提交與作業執行之間生成的一系列有向無環圖DAG(Directed Acyclic Graph),也是計算引擎中最核心的數據結構,它們決定了分布式作業在每個節點上的資源部署。也就是說,分布式任務的執行都是根據執行圖中的信息在每個節點上進行任務部署。因此,如何在多作業執行過程中使DAG的合并達到最優,以及如何優化作業的提交順序與調度策略,將是高效執行多作業的重要保障。
本文通過對主流的大數據處理系統的研究和探索,結合目前流行的大數據處理系統優化技術,提出并實現了在作業層面上的多作業合并算法與調度策略。本文的主要貢獻點在于:
(1)提出了一種啟發式作業合并算法。通過采集到的作業特征,以作業并行度為基礎分析DAG結構上的差異性,合并浪費資源的作業,釋放占用資源較少的作業資源,提高集群資源的利用率。
(2)提出了一種基于負載均衡的多作業調度算法。根據基于作業特征的多路K-means聚類算法的分析結果使用基于負載均衡的多作業自平衡輪詢調度算法提交作業,進一步達到系統負載均衡。
(3)使用目前最新一代大數據計算系統Flink對本文提出的作業合并算法與多作業調度算法的有效性進行了驗證。結果表明,2種作業合并算法都可以減少作業的運行時間,提高系統吞吐量;基于負載均衡的多作業調度算法在最好情況下可減少40%的線程啟動數。
DAG是分布式計算領域中很常見的一種數據結構,通常由一系列用戶自定義的算子組成,在各種大數據處理系統中都能發現它的身影,比如Storm、Spark和Flink等。DAG計算將計算任務分解成為若干個子任務[7],并將這些子任務之間的邏輯關系或順序構建成DAG結構。大數據計算引擎中的DAG計算通??梢猿橄鬄?層結構:應用表達層、執行引擎層和物理執行層。應用表達層位于最上層,定義相關算子和轉換,將計算任務分解成由若干子任務形成的DAG結構,其優點是表達的便捷性,便于開發者快速描述或構建大數據應用。執行引擎層介于應用表達層和物理執行層之間,將應用表達層構建的DAG計劃任務通過轉換和映射,部署到下層的物理機集群中運行,任務的調度[8]、底層的容錯恢復機制、數據與集群信息的傳遞等都要依賴執行引擎層。下層是物理執行層,即物理集群。
Flink是Apache 開發的一個同時用于處理批數據和流數據的有狀態的計算框架和分布式處理引擎。Flink使用4層DAG結構來描述和表達作業的執行流程,每一層都對作業執行流程做了不同程度的封裝、優化和相關屬性的配置。DAG結構是Flink作業執行和部署的核心,主要包含數據流圖(StreamGraph)、作業圖(JobGraph)、執行圖(ExecutionGraph)和物理執行圖,Flink正是通過這4層圖結構把整個作業的優化、資源分配和算子部署進行分離。Flink的4層DAG結構如圖1所示。

Figure 1 Four-layer DAG structure of Flink
圖1中,數據流圖是用戶通過API接口編寫的、用來表達用戶所要執行的計劃任務的邏輯結構。作業圖是在數據流圖的基礎上進行優化以及調整各種參數配置后的數據結構,它裹挾著作業運行期間所需的必要信息。這些信息被客戶端提交到集群中的協調中心,即作業管理器(JobManager)。執行圖可以被視作并行化的作業圖,當接收到一個新的作業圖時,會把其中的每一個算子按照其并行度轉化成多個可實際部署的子任務(在執行圖中以Execution表示)。當執行圖中的一系列子任務真正在從結點機器上運行的時候,才會構成物理執行圖。
目前最流行的大數據處理平臺默認情況下都以FIFO的形式調度作業。Wang等[9]為了解決在虛擬化云環境中同時運行的多個作業之間的干擾問題,開發了數據驅動分析模型,估計多個作業之間的干擾對作業執行時間的影響,并為此提出了一種干擾感知作業調度算法。黃廷輝等[10]通過對分布式系統關鍵技術的分析,得出I/O和CPU的不匹配是影響計算性能的一個重要因素,提出合并文件的運行方式,可以減少緩存文件的數量,提高I/O效率,不過仍存在內存成本高的弊端。
Flink系統中資源是按處理槽(Slot)進行劃分的,支持多種已有的成熟的資源管理器,例如Yarn和Mesos等。Storm作為曾經最流行的流式大數據處理系統,默認是采用輪詢的調度方式管理作業的[11]。Qian等[12]為了解決Storm集群中擴展更多新計算機時帶來的負載不均衡問題,設計了S-Storm,為負載均衡群集中均勻分配Slot??傊?,目前的分布式作業調度算法和資源分配算法都是基于作業對資源的需求或者集群中結點資源的使用情況,進行作業的調度和資源的分配的,它們面向的是單個作業,并沒有考慮作業間的關系對集群性能的影響。
一個復雜的DAG通常由多種類型的算子組成,有些算子只涉及本地運算,它們以內存共享的方式傳輸數據,運行效率高,給系統增加的負載小。也有些算子會通過網絡協議棧傳輸數據,除了網絡本身的不可靠性會增加延遲,還有網絡緩沖數據、序列化、反序列化和用戶態/內核態之間的切換等耗時操作持續地占用系統資源。為了便于描述,本文定義了全局算子和本地算子這2個概念。
定義1(全局算子) 全局算子指在分布式集群中,需要從其他結點獲取數據進行處理的算子,如Join和Reduce等。
定義2(本地算子) 本地算子指在分布式集群環境中,不需要從其他結點獲取數據,只對本地數據進行處理的算子,如Filter、Map和FlatMap等。
本文在研究作業合并和作業調度時需要提取DAG的相關特征量,計算作業之間的差異性并通過聚類算法對作業進行區分。聚類算法是一種經典的群分析方法[13],它以數據間距度量數據相似性[14],把相似的數據集中到一起,是一種發現數據集內部結構特征的無監督學習算法[15]。聚類算法按聚類思想可以分為:劃分法聚類、密度法聚類[16]、圖論聚類法[17]和網格法聚類等。
本文采用的K-means算法是一種典型的劃分聚類法,其思想是預先指定聚類數目和聚類中心,計算點與點之間的距離,把每一個點歸類到與其距離最近的聚類中心。距離的度量方式很多,本文使用歐氏距離(式(1))和曼哈頓距離(式(2))相結合的方式度量作業之間的距離,其中n為樣本點維度。
(1)
(2)
歐氏距離從幾何空間的角度衡量元素間的距離,常用于測量度量標準一樣的數據間的距離;曼哈頓距離用來計算數據在多維屬性上的差之和,可以減弱離群數據帶來的影響。
本節詳細介紹基于啟發的作業合并算法。首先對作業進行分析,解析作業的DAG圖,以及作業任務量與作業分配到的內存資源之間的關系;然后分別采用基于并行度的作業合并算法和基于DAG結構差異性的作業合并算法,對占用系統內存資源較多的作業進行合并,從而提高系統的吞吐量。
本文采用廣度優先遍歷的方式提取作業執行圖中相關的信息,一個典型的作業執行圖如圖2所示,主要包含以下信息:數據源文件路徑、作業并行度和算子總數等。

Figure 2 Job execution graph
處理的數據量和作業分配到的內存資源需要通過計算獲得。算法根據文件路徑信息訪問文件大小,從系統配置文件中讀取為Slot分配的內存大小。作業的分類貫穿于信息采集過程,算法根據數據來源、文件大小、作業分配到的內存資源大小和作業的執行邏輯將作業分為可合并型作業與不可合并型作業。在作業執行流的遍歷過程中,算法以矩陣結構存儲頂點間的連接信息,元素值的大小表示算子間的連接數。表1是對圖2的信息提取。

Table 1 Statistics of the number of connections between operators
并行度決定了作業在執行時所占集群內存資源的總量,且和集群中的Slot是對應的,意味著并行度相同的作業將分配到相同大小的內存資源。因此,對于沒有充分占用內存資源的作業,合并并行度相同的作業,可使2個作業共用1個作業的內存資源,同時不會對作業執行邏輯造成影響。
影響作業執行的因素有很多,定義3~定義5的3個度量:任務量大小比值(F)、DAG最大深度比值(D)和DAG全局算子數比值(G),決定作業的特征。
定義3(任務量大小比值(F)) 任務量大小比值是表示2個作業處理任務量大小差異性的重要指標之一,其計算如式(3)所示:
(3)
其中,x和y分別表示2個作業所處理的數據集數量,wf_mi、wf_mj分別表示2個不同作業處理的文件集合中單個文件的大小。通過實驗得知,F的閾值取值為[0.5,2]。
定義4(DAG最大深度比值(D)) 表示2個作業的執行圖中最長算子鏈長度的比值,它是反映2個作業DAG差異性最明顯的指標,其計算如式(4)所示:
(4)
其中,dept_m和dept_n分別表示2個作業執行圖的最大深度。DAG深度越大的作業執行時間越長,因此合并后的作業在數據量相當的情況下,其執行時間取決于合并前DAG深度較大的作業。D的閾值取值為[0.5,2]。
定義5(DAG全局算子數比值(G)) 表示2個作業圖在全局算子數量上的差異。全局算子和數據傳輸緊密相關,是影響作業執行速度的重要指標之一,體現2個作業在傳輸上的差異。其計算如式(5)所示:
(5)
其中,G表示2個并行度相同的作業的全局算子數的比值,gol_m和gol_n分別表示2個作業中全局算子的個數。DAG中全局算子的個數越多,執行時間越長。通過實驗得知,G的閾值取值為[0.5,2]。
基于并行度的作業合并算法執行過程如算法1所示。
算法1基于并行度的作業合并算法
輸入:待合并作業j;不包含j的待合并作業集合Jobs。
輸出:合并后的作業mergeJob。
1.forjobinJobsdo
2.ifjob.parallelism==j.parallelism
3.計算j與job任務量比值F;
4.ifF∈[0.5,2]do
5. 計算j與job的DAG圖最大深度比值D;
6.ifD∈[0.5,2]do
7. 計算j與job的全局算子的比值G;
8.endif
9.ifG∈[0.5,2]do
10.mergeJob=merge(j,job);
11.removejobfromJobs,returnmergeJob;
12.endif
13.endif
14.endif
15.endfor
(1)首先從待合并作業緩沖池的作業集中取出一個作業j,然后遍歷Jobs,從中取出一個與j并行度相同的作業job。
(2)使用3個度量值衡量作業job與j的匹配程度,如果job與j在上述3個比值上都能落到對應的閾值空間,兩者匹配,調用merge函數合并job與j,返回合并后的結果,終止循環;否則繼續循環。
(3)循環結束后,檢查mergeJob的值是否為空,如果mergeJob的值為空,說明Jobs中沒有與j并行度相同并且符合3個條件的job,那么j會轉而參與基于DAG圖結構差異性的作業合并計算。
對于作業緩沖池中剩余的由于F、D、G取值落在閾值空間以外而無法合并的作業,采用基于DAG結構差異性的作業合并算法處理。
算法以DAG結構差異性為切入點,Slot只隔離內存資源,因此為了避免作業對CPU資源的爭搶,盡量選擇異構程度高的作業進行合并。算法增加2個度量為基于DAG結構差異性的作業合并算法提供支持。
定義6(作業并行度比值(P)) 作業并行度是作業最明顯的特征之一,并行度比值是衡量2個作業在并行度上的差異最明顯的指標。其計算如式(6)所示:
(6)
其中,P表示2個作業并行度的比值,parallelism_m和parallelism_n表示2個作業的并行度。并行度是對應于集群中的Slot數量,因此基于DAG的作業合并算法在合并作業時首先需要考慮的就是作業并行度。通過實驗得知,P的閾值取值為[0.5,2]。
定義7(DAG結構相似性(S)) DAG結構相似性反映2個作業在執行邏輯上的差異,以歐氏距離為基礎定義了DAG結構相似性,其計算如式(7)所示:
(7)
其中,o表示算子的數量。
在特征提取過程中使用矩陣保存作業執行流程圖的基本信息,M和N分別表示存儲作業執行流程圖基本信息的矩陣,Mij和Nij分別表示矩陣中的元素。算法執行過程如算法2所示。
算法2基于DAG結構差異性的作業合并算法
輸入:待合并作業j;不包含j的待合并作業集合Jobs。
輸出:合并后的作業mergeJob。
1.按并行度大小給Jobs中的作業從小到大排序
2.中間作業集合為midJobs;
3.forjobinJobsdo
4. 計算j與job任務量比值F;
5.ifF∈[0.5,2]do
6. 計算j與job的DAG圖最大深度比值D
7.ifD∈[0.5,2]do
8. 計算j與job的全局算子的比值G
9.ifG∈[0.5,2]do
10. 計算j與job并行度比值P
11.ifP∈[0.5,2]do
12. addjobtomidJobs
13.endif
14.endif
15.endif
16.endif
17.endfor
18.forjobinmidJobs
19.計算j與jobDAG圖矩陣間的歐氏距離U;
20.更新U獲取最小值,并記錄相應的job;
21.endfor
22.mergeJob=merge(j,job)
23.returnmergeJob
(1)從待合并作業中取出一個作業j,然后遍歷Jobs,獲取一個與j并行度相同的作業job;
(2)在循環中使用4個度量值衡量作業job與作業j的匹配程度,如果符合對應的閾值空間,則把作業job加入到中間作業集midJobs中;
(3)遍歷中間作業集合midJobs,使用歐氏距離從中間數據集合中選出與作業j在歐氏距離上相似度最小的作業job,合并作業j與job并返回結果。
除了作業合并之外,作業的執行順序與調度策略也是影響多作業執行效率的重要因素。因此,本文提出基于負載均衡的多作業調度算法,其由3部分組成:
(1)預處理模塊:進行相關特征的提取工作,包括作業并行度、算子個數和算子類型等;(2)分類模塊:采用K-means聚類算法根據提取的特征信息對作業進行聚類分析,聚類算法在負載均衡方面應用廣泛[18,19],經過聚類把作業分成3個類別:大作業、中等作業和小作業;(3)調度模塊:調度模塊根據聚類結果,使用自平衡輪詢調度算法計算作業的提交順序,同時充分利用集群的Slot資源,防止Slot閑置。
基于負載均衡的多作業調度算法主要使用作業并行度、算子總數、各類型算子個數和作業圖深度為參考,通過遍歷對信息進行采集。該算法執行過程如算法3所示。
算法3DAG特征提取算法
輸入:作業DAG結構圖Plan。
輸出:提取到的信息集合infoList。
1.fornodeinPlando
2.max=Math.max(max,BFS(node));
3.ifnodeis not visited
4. add node’s characters toinfoList,node.visited=true;
5.node相連接的未被訪問的頂點入隊列Q;
6.whileQis not emptydo
7.v=Q頭元素出隊列;
8.addv’s characters toinfoList,v.visited= true;
9.v相連接的未被訪問的頂點入隊列Q;
10.endwhile
11.endif
12.infoList.max=max
13.endfor
14.returninfoList
(1)使用深度優先搜索DFS(Depth First Search)計算從Sink算子到距離最遠的Source算子的距離,并記錄在max中;如果node頂點未被訪問過,將頂點信息存入infoList中。
(2)將與node頂點相連的頂點加入隊列Q,如果Q不為空,從Q中取出一個頂點v,將v的信息記錄到infoList中,與v相連的未訪問過的頂點加入隊列。
(3)更新infoList中的DAG深度,for循環直到遍歷完Plan中的所有頂點,返回infoList。
聚類分析模塊將根據特征信息對作業進行分類,使用4種數據度量作業之間的相似性,分別是作業并行度、各類算子個數、作業執行流程圖深度和全局算子的個數。算法采用歐氏距離與曼哈頓距離相結合的方式測量作業間的距離。ope[i]是以數組的形式存儲,dept、全局算子ops的大小是衡量作業流程復雜性的度量標準。
定義8(作業在不同算子類型上的差異性) 算子及算子類型最能區分作業的不同,算子類型的差異性反映了作業的總體差異性,其計算如式(8)所示:
(8)
其中,mope[i]與nope[i]分別為作業m與作業n的不同類型的算子的個數。
定義9(作業在DAG結構深度上的差異性) DAG結構深度是作業最明顯的特征之一,它描述了作業運行時數據流通的最大路徑,其計算如式(9)所示:
distancedept(m,n)=|mdept-ndept|
(9)
其中,mdept與ndept分別為作業m與作業n的DAG結構深度。
定義10(作業在Task線程數上的差異性) 作業在集群中開啟的線程數直接反映作業對系統CPU資源的占用量,作業在Task線程數上的差異性計算如式(10)所示:
distancetasknum(m,n)=|mpara*mops-npara*nops|
(10)
其中,mpara與npara分別為作業m與作業n的并行度,mops與nops分別為作業m與作業n的全局算子數量。
定義11(作業的差異性) 本文從3個角度衡量了作業之間的差異性,其計算如式(11)所示:
distance(m,n)=distanceope(m,n)+distancedept(m,n)+distancetasknum(m,n)
(11)
本文提出的基于作業特征的多路K-means聚類分析算法如算法4所示。
算法4基于作業特征的多路K-means聚類分析算法
輸入:作業及其特征集合PlanList。
輸出:聚類結果ClusterResult。
1. 根據并行度乘以算子總數的大小對PlanList進行排序;
2. 獲取初始聚類中心點集合;
3.fori=1 to 3do
4.center_i=K_means(PlanList,center_i);
5.endfor
6.fori=1 to 3do
7. 計算每個聚類中心點將PlanList劃分的程度;
8.endfor
9.center=K_means(PlanList,center);
10.根據center將PlanList分組并放入ClusterResult中
11.returnClusterResult
(1)對作業及其特征集合PlanList按并行度乘以算子總數大小進行排序。
(2)從排好序的PlanList中選擇3個作業作為聚類中心;以排好序的PlanList的隊列頭作業、隊列尾作業和中間作業作為聚類中心;從排好序的PlanLsit中分別取隊列頭3個作業、隊列中間3個作業、隊列尾部3個作業,取其平均值作為聚類中心。
(3)調用K_means算法循環更新每個聚類中心的值;計算每個聚類中心將PlanList劃分的程度,劃分程度度量標準為,聚類結果每類作業的數量越平均越好。選取聚類結果好的2個聚類中心取其平均值,調用K_means算法進行最后聚類;計算聚類結果,并輸出結果。
通過多路聚類的方式優化了聚類中心點的選取,通過基于作業特征的多路K-means聚類分析可以把作業集合根據聚類中心點聚集成3個作業類別,為算法提供可靠的支持。
本文以輪詢調度法[20 - 23]為基礎實現了多作業的提交優化,目的是在不浪費集群Slot資源的情況下,使集群開啟的Task線程數量保持平穩,以此達到在多作業情況下平衡集群性能的目的。集群中作業工作的線程數量是由作業并行度和算子個數決定的,因此控制作業的提交順序,可以達到控制集群開啟的Task線程數量的目的。作業能否提交成功取決于集群剩余并行度是否滿足作業的并行度需求,如果作業的并行度比集群中可用并行度大,作業就會被拒絕,因此輪詢的作業提交方式并不會嚴格執行,而且集群空閑的Slot資源會隨著作業的提交和結束動態地變化。針對這種情況本文設計了自平衡的輪詢調度算法,如算法5所示。
算法5基于負載均衡的多作業自平衡輪詢調度算法
輸入:聚類結果ClusterResult;最后的聚類中心center。
輸出:下一個提交的作業Job。
1. 對K-means聚類結果收集排序;
2. 平分排好序的作業到3個隊列中,并設置指針p;
3.翻轉隊列midQueue、minQueue,查詢集群剩余Slot;
4.ifslotNum> 0do
5.ifjobNum> 0do
6.pre=p;queue=Queue[p];
7.whilequeueis not emptydo
8.max= 0;
9.foreleminqueuedo
10.ifelem.parallelism≤slotNumdo
11.ifmax 12.job=elem;max=elem.parallelism; 13.endif 14.endif 15.endfor 16.endwhile 17.ifmax!= 0do 18.p=(p++)%3; 19.endif 20.ifmax== 0do 21.p=(p++)%3; 22.ifp==predo返回 4; 23.endif 24. 返回 7; 25.endif 26.endif 27.endif (1)對K-means聚類產生的3個集合中的元素按元素距離聚類中心點的距離大小進行排序;比較3個聚類中心點的大小,按聚類中心點的大小,從大到小合并3個排好序的作業集合。 (2)將合并后的集合平均分成3份,并放入3個隊列中,將midQueue和minQueue隊列進行逆轉。 (3)每隔5 s查詢一次集群剩余Slot資源,從指針指向的隊列開始,遍歷隊列中的元素找到集群中空閑Slot資源能滿足的最大并行度的作業提交。每次提交作業后,修改指針指向下一隊列。 (4)對3個集合進行判斷,如果出現隊列為空,并且總作業的數量大于2,按順序收集3個集合中的隊列,再平分所有的作業到3個集合中,并更改指針使其指向midQueue,否則不再進行作業收集。 本文使用2種類型的作業來進行對比實驗,一種是單詞統計(WordCount),另一種是表連接(Join)。因為全局算子中最復雜的算子就是Join類型算子,其他簡單類型的算子使用最多的是Filter、Map和FlatMap,因此WordCount作業和Join作業足以覆蓋實際應用中的大部分場景。 本文實驗采用大數據測試基準TPC-H生成的數據集,是事務性能管理委員會TPC(Transaction Processing Performance Council)發布的權威數據庫評測基準,可以保證生成的模擬數據具有真實性、客觀性與健壯性。在WordCount實驗中本文選用5個基本的大數據集來模擬批處理環境中的大規模數據處理。在表連接實驗中,本文選取TPC-H生成的Lineitem表和Orders表作為數據源,其中Lineitem有16個字段,前3個字段Orderkey、Partkey和Suppkey是主鍵。Orders表有9個字段,前2個字段Orderkey和Custkey是主鍵。 實驗的評估指標有3個: (1)作業運行時間:在相同硬件條件下,任務量相同、處理邏輯相同的作業處理速度越快,表明系統性能越好。 (2)作業吞吐量:單位時間內集群處理的平均數據量大小,即被處理的總數量(totalDataVolume)與運行總時間(totalProcessTime)的比值,其定義如式(12)所示: (12) (3)集群開啟的最高Task線程數:本文提出的基于負載均衡的多作業調度算法以降低集群同一時刻開啟的最高Task線程數為首要目標。 本文所描述的相關技術細節均在Flink 1.8.0版本中進行實現,實驗運行的軟硬件環境如下所示: (1)硬件環境:采用的分布式環境由4臺服務器組成,1臺主結點,3臺從結點,結點之間通過千兆以太網連接。配置為:CPU:Intel Xeon E5-2603 V4 *2,核心數目:6核心;內存:128 GB(從結點64 GB);硬盤:400 GB SSD。 (2)軟件環境:操作系統:CentOs 7;Flink版本:1.8.0,JDK版本:1.8.0;存儲環境:Hadoop 2.7.5。 (1)基于并行度的作業合并算法實驗。 作業合并算法實驗對一對相同的WordCount作業和一對不同的Join作業分別進行順序執行和合并執行。表2展示了作業的基本信息。 Table 2 Job sets information for experiment 1 圖3對比了2個WordCount作業在相同實驗環境、相同數據集上順序執行和合并執行的執行結果。其中圖3a對比了執行時間,合并執行的執行時間減少了5%~23%。在內存資源足夠使用的前提下,單個WordCount程序對集群CPU的利用沒有達到時刻滿負荷運行的狀態,所以作業合并不僅能提高集群的內存資源利用,也能提升集群CPU資源的利用。圖3b對比了吞吐量,采用了作業合并算法后系統可以更快地到達吞吐量峰值。 Figure 3 Results of WordCount job merging based on the number of parallelism 圖4對比了Join1和Join2作業在相同實驗環境、相同數據集上順序執行和合并執行的執行結果。其中圖4a對比了運行時間,圖4b對比了系統吞吐量。盡管效果不如WordCount作業明顯,但基于并行度的作業合并算法對運行時間仍有一定縮減,吞吐量提升效果在4.5%~20%。 Figure 4 Results of Join job merging based on the number of parallelism (2)基于DAG結構差異的作業合并算法實驗。 實驗先后提交了2個并行度不同的WordCount作業和Join作業,來模擬基于DAG結構差異性的作業合并。 圖5和圖6從運行時間和吞吐量2個方面展示了作業合并算法的提升效果。合并執行的執行時間明顯低于順序執行的總時間,并且差距明顯,因為本實驗不是在滿并行度的條件下進行的,實際執行時可能會出現不同情況,對于WordCount作業,基于DAG結構差異性的作業合并算法具有明顯的優勢。 Figure 5 Results of WordCount job merging based on DAG structure difference Figure 6 Results of Join job merging based on DAG structure difference (3)基于負載均衡的多作業調度算法實驗 對于多作業調度算法,實驗以4個作業為基礎,表3列出了作業算子的基本信息,這些作業特征信息是衡量作業之間差異性的關鍵。實驗模擬了7個任務量大小不同的作業,采用隨機的方式模擬了作業的提交順序,將其執行結果與多作業調度算法的結果進行比較。表4展示了作業對應的編號以及其處理任務量信息,表5展示了優化前后作業的提交順序。 Table 3 Job sets information for experiment 3 Table 4 Job number and corresponding processing task volume Table 5 Order of job submission 圖7展示了基于負載均衡的多作業調度算法的提升效果。從圖7a可以看出,通過調優作業的提交順序可縮短作業處理的時間,但存在某些按FIFO提交模式的順序比優化后的輪詢提交順序要好,該情況的出現是因為算法在執行過程中并未考慮到任務量的大小。從圖7b可以看出,基于負載均衡的多作業調度算法在吞吐量性能上提升了5%左右。圖7c 展示了集群開啟的Task線程數對比,基于負載均衡的多作業調度算法執行作業集時,集群開啟的最大線程數在多數情況下有所減少,最好情況下減少了40%的線程。 Figure 7 Running results of multi-job scheduling algorithm based on load balancing 本文通過分析作業與系統資源之間的關系,以及作業與作業之間的關系,提出并實現了提高集群資源利用率和負載均衡能力的算法,本文提出的優化主要包含2個方面: (1)提出了啟發式的作業合并算法,通過分析作業任務量和作業分配到的集群資源之間的關系,合并對集群資源利用率低的作業,使它們共用同一個作業的資源。該算法解決了集群部分作業資源利用率低的問題,并通過實驗驗證了該算法在不同類型作業上對集群性能提升的有效性。 (2)提出了基于負載均衡的多作業調度算法,首先對作業進行特征提??;然后通過多路K-means聚類算法將作業分為3類:大作業、中等作業和小作業;之后采用自平衡輪詢調度算法提交分類好的作業,保證大作業不會在集群中集中執行,降低了集群由于開啟過多線程造成集群性能下降的概率,并通過實驗驗證了該算法的有效性。 分布式系統在多作業執行層面還有許多需要優化和提高的部分,未來可繼續研究的問題有: (1)動態調度。目前的分布式大數據處理系統未能做到在作業執行過程中動態地調整作業的執行流程,這種方式不利于資源的動態回收與共享。針對這一問題,系統需要做出相應的優化和改進。 (2)優化多作業并行度。并行度是作業執行的關鍵,目前在分布式大數據處理平臺的應用中,一般都是從業人員根據數據與業務特性手動優化并行度,這樣就給并行度的優化帶來了很多困難。因此,研究和設計出一套并行度設置的優化方案,也是分布式大數據系統應用方面的一個研究課題。6 實驗
6.1 數據集與評估指標
6.2 實驗環境設置
6.3 實驗結果與分析









7 結束語