王中華,柴小麗
(中國電子科技集團公司第三十二研究所,上海 201808)
隨著信息科學和云計算技術的飛速發展,智能設備的持續普及,存儲設備性能的提升和網絡帶寬的增長為大數據的存儲和流通提供了物質基礎,云計算技術通過將分散的數據集中在數據中心,從而可以更為集中有效地處理和分析大數據信息.云計算技術為海量數據存儲和分散的用戶訪問提供了必要的空間和途徑[1].大數據主要有4 種計算模式,分別是批量計算,流式計算,圖計算和交互計算.其中,適用于大數據分析的計算模式主要是批量計算和流式計算這兩種,并且由于批量計算和流式計算針對的數據流類型不同,所適用的大數據應用場景也不一樣[2].
批量計算會將先將數據信息統一收集起來,然后把大量的數據信息存儲到本地或云端地數據庫中,最后對數據進行批量的處理.由此可見,批處理適用的數據一般是靜態數據,即保存在本地或云端數據庫中的信息,任務可一次性完成.因此,批量計算一般應用在實時性要求不高,離線計算的場景下,進行數據分析或實現離線報表等[3–5].
數據流是一組有序的,有起點和終點的字節的數據序列,一般包含輸入流和輸出流,在實時通信領域中,數據的價值與時間成反比,即處理數據的時間越短,數據的價值越大.因此,必須對實時的數據信息給出毫秒級的響應.流式計算就是應用在實時場景下,或對時效性要求比較高的場景,如實時推薦,業務監控[6–9].
Hadoop是一個由Apache 基金會開發的分布式系統基礎架構,實現了一個分布式文件系統HDFS,提供高吞吐量來訪問應用程序的數據,適合有著超大數據集的應用程序,HDFS為海量數據提供存儲,MapReduce為海量數據提供計算[10].Apache 開發Flink 開源流處理框架,核心是用Java和Scala 編寫的分布式數據流引擎,Flink 通過支持數據并行和流水線方式,可執行任意流數據程序[11],Flink的流水線運行方式支持系統執行批處理和流處理程序,同時支持迭代算法的執行.Twitter 開發了Storm 框架提供分布式的,高容錯的實時計算系統,支持多語言:Java,Python,Ruby 等[12],可實現亞秒級的低延遲.Storm 提供較高的可靠性,所有信息都可保證至少處理一次,確保不會丟失信息.
由于在項目研發過程中,需要8 臺機器對視頻流執行解碼任務,另外8 臺執行目標識別的推理任務,并且由于兩組機器都缺乏硬件條件完成其他任務.傳統的平均分配和單機任務指定策略不能滿足項目需求,本文通過設計新調度算法來實現對兩組機器的任務分配.
如圖1所示,Storm 集群采用主從式架構,集群中的節點主要分為以下4 類[13]:
主節點(Master node):通過運行Storm nimbus 命令啟動Storm的主節點,nimbus是Storm 系統的主控節點,主要用于向集群中提交作業,通過讀取ZooKeeper的工作信息分配集群的任務,以及監控整個集群狀態(有進程級的也有線程級別的).
工作節點(Worker node):通過運行Storm supervisor命令啟動Storm的工作節點.通過設定的端口與Zoo-Keeper 進行信息交互,讀取主控節點分配的任務信息,下載作業副本,管理屬于自己的Worker 進程,如啟動,暫停或撤銷任務的工作進程及其線程.一個工作進程中可運行多個線程,每個線程中又可運行多個任務(task).
控制臺節點(Web console node):通過運行Storm ui 命令啟動用戶界面服務節點,默認的服務端口號為8080,可在storm.yaml 中設定ui.port 進行修改.可以在ui 界面上查看已提交的作業狀態,包括集群的整體狀態,已使用的節點數,每個節點的運行情況和作業的執行狀態,支持手動停止正在執行的作業.
協調節點:通過運行ZooKeeper server start 命令啟動ZooKeeper 進程的節點,實現numbus和supervisor之間的協調管理,包含分布式狀態維護和分布式配置管理等.

圖1 Storm 模型架構
Storm 使用ZooKeeper 來保證集群的一致性.Storm的所有的狀態信息都是保存在ZooKeeper 里面,nimbus通過在ZooKeeper 上面寫狀態信息來分配任務[14].
Supervisor,task 通過從ZooKeeper 中讀狀態來領取任務,同時supervisor,task 也會定期發送心跳信息到ZooKeeper,使得nimbus 可以監控整個Storm 集群的狀態,從而可以重啟一些停止的task.
在每臺機器上設置myid 文件來分配機器在集群中的id,如圖2所示,51 表示式第幾臺服務器,10.0.0.1表示服務器的IP 地址,2888:3888 表示服務器中與集群中leader 交換信息的端口.

圖2 拓撲結構
Topology是Storm 中運行的一個實時應用程序,因為各個組件間的消息流動形成邏輯上的一個拓撲結構[15],如圖2所示.主要由以下幾部分組成:
Spout:在一個topology 中產生源數據流的組件.通常情況下spout 會從外部數據源中讀取數據,然后轉換為topology 內部的源數據.Spout是一個主動的角色,其接口中有個nextTuple()函數,Storm 框架會不停地調用此函數,源源不斷地發送數據.Spout 另一個重要的方法時ack和fail,Storm 監控到tuple 從spout 發送到toplogy 成功完成或失敗時調用ack和fail,保證數據的可靠性.
Bolt:在一個topology 中接受數據然后執行處理的組件.Bolt 可以執行過濾,函數操作,合并,寫數據庫等任何操作.Bolt是一個被動的角色,其接口中有個execute(Tuple input)函數,在接受到消息后會調用此函數,用戶可以在其中執行自己想要的操作[16].
Tuple:Storm spout,bolt 組件消息傳遞的基本單元(數據模型),Tuple是包含名稱的列表,Storm 支持所有原生類型,字節數組為Tuple 字段傳遞,如果要傳遞自定義對象,需要實現接口serializer[17].
Stream:源源不斷傳遞的Tuple 就組成了stream.
Storm 集群默認的調度器是EventScheduler[18–21],采用輪詢策略來搜索集群中所有拓撲結構的工作節點,將資源較為均勻的分配給任務進程.具體分配流程如下:
先由nimbus 來計算拓撲的工作量,及計算多少個task,task的數目是指spout和bolt的并發度的分別的和nimbus 會把計算好的工作分配給supervisor 去做,工作分配的單位是task,即把計算好的一堆task 分配給supervisor 去做,即將task-id 映射到supervisor-id+port 上去,具體分配算法如算法1.

算法1.傳統任務調度算法1)從ZooKeeper 上獲得已有的assignment(新提交的topology為空).2)查找所有可用的slot,slot 就是可用的worker,在所有supervisor上配置的多個worker的端口.3)將任務均勻地分配給可用的worker,supervisor 會根據nimbus 分配給他的任務信息來讓自己的worker 做具體的工作,worker 會到ZooKeeper 上去查找給他分配了哪些task,并且根據這些task-id 來找到相應的spout/bolt,它還需要計算出這些spout/bolt 會給哪些task 發送消息,然后建立與這些task的連接,然后在需要發消息的時候就可以給相應的task 發消息.
而在當前項目應用過程中,需要將視頻解碼和目標識別任務分別運行在兩組機器上,并且由于任務的硬件需求,解碼的任務不能在處理目標識別的機器上運行.
為了實現多任務分組調度,實現了算法2.

算法2.多任務分組調度算法1)從ZooKeeper 上獲得已有的assignment(新提交的topology為空).2)在配置集群時為每臺機器設置supervisor 名稱,如下圖所示,通過supervisor.scheduler.meta 設置節點名稱.3)采用循環的方式,通過判斷meta.get(“name”)==”supervisor51”或meta.get(“name”)==”supervisor52”等得到匹配的supervisor 列表.4)通過componentToExecutors.get("decode")獲得解碼任務的線程數.5)通過getAvailableSlots 函數提取上述指定supervisor的所有可用節點.6)構建map<WorkSlot,List<ExecutorDetails>>將可用節點與線程情況相匹配.7)通過cluster.assign 函數將匹配情況提交給集群,集群將按照對應關系分配線程,其余任務采用平均分配,由于已經將解碼機器組的可用節點全部占滿,剩余的推理任務將自動均勻地分配到推理機器組.
實驗共使用16 臺服務器,且均使用Linux 系統,其中8 臺服務器作為視頻解碼組,搭載Arm v8 多核處理器和中科睿芯解碼卡;另外8 臺服務器作為推理組,搭載Arm v8 多核處理器和寒武紀MUL100 加速卡.
Storm 集群由1 個nimbus和16 個supervisor 節點組成(為了實現資源充分使用,其中一個服務器既作為nimbus 用來分發任務和監控集群狀態,也用來處理任務).
拓撲結構:
Spout:用于讀取視頻文件作為輸入,組件命名為filename-reader,共8 個線程;
Bolt1:用于實現視頻解碼任務,組件命名為decode,共23 個線程;
Bolt2:用于實現推理任務(目標識別),組件命名為inference,共32 個線程.
(1)任務分配情況
由表1可知,視頻解碼組同時執行文件讀取和視頻解碼任務,推理組全部執行推理任務(目標識別),具體ui 結果如圖3所示.其中,在視頻解碼組中預留一個節點,用于預防解碼任務時可能出現的節點阻塞.
(2)作業執行情況
由圖4知整體拓撲的數據處理情況,10 分鐘時處理了48 685 條數據,3 小時處理了872 481 條數據.整個拓撲運行24 小時并未發生中斷,可見調度算法的穩定性.

表1 集群任務分配情況

圖3 多任務分組調度結果

圖4 作業執行情況
本文研究了Storm 環境下,多任務在兩組機器上分別運行并存在信息交互的情況,提出了多任務分組調度策略,該機制可以將存在不同需求的兩個任務分別分配到對應的機器群組中,以達到運行和資源分配最優情況.實驗證明,該調度機制可實現視頻解碼和推理任務的分組運行,并且通過持續運行拓撲24 小時,驗證了該調度機制的穩定性.后續的工作將繼續完善該調度機制,當存在3 個或更多任務需要指定機器群組資源運行時,能夠實現多個指定任務分配到指定機器群組中,以實現資源的正確分配與最優分配.