杜政頡, 王 鵬, 黃 焱, 郎福通
(1.成都信息工程學(xué)院軟件工程學(xué)院并行計(jì)算實(shí)驗(yàn)室,四川成都610225;2.中國(guó)科學(xué)院成都計(jì)算機(jī)應(yīng)用研究所,四川 成都 610041;3.中國(guó)科學(xué)院大學(xué),北京 100049)
大數(shù)據(jù)處理分為兩類(lèi)模式,一類(lèi)是批處理模式,數(shù)據(jù)源為靜態(tài);一類(lèi)是流處理模式,數(shù)據(jù)源為動(dòng)態(tài)。批處理模式系統(tǒng)有Hadoop、Spark、Disco、HPCC等,流處理模式系統(tǒng)來(lái)自 Twitter的 Storm和來(lái)自 Yahoo的 S4系統(tǒng)[1]。MapReduce是Hadoop中被廣泛應(yīng)用的一種大數(shù)據(jù)處理模型,側(cè)重于批處理,Topology是來(lái)自于Storm系統(tǒng)中的一種編程模型,側(cè)重于流處理。
MapReduce無(wú)法解決具有迭代結(jié)構(gòu)的應(yīng)用程序,迭代結(jié)構(gòu)程序在實(shí)際應(yīng)用中很普遍,因此,有人基于MapReduce提出一種迭代MapReduce。文獻(xiàn)[2]提出一種名為T(mén)wister的迭代MapReduce處理方案,文獻(xiàn)[3]提出一種名為HaLoop的MapReduce迭代方案。
Storm是一款應(yīng)用于實(shí)時(shí)流處理領(lǐng)域的大數(shù)據(jù)處理工具。在Storm中,Nathan Mar提出一種新的并行編程模型Topology。這種模型改進(jìn)了MapReduce需要存儲(chǔ)中間數(shù)據(jù)這一繁瑣過(guò)程[2],采用類(lèi)似于流水線(xiàn)作業(yè)方式的任務(wù)分解模型,側(cè)重于處理動(dòng)態(tài)數(shù)據(jù)源的任務(wù),實(shí)時(shí)性更強(qiáng)。Storm編程模型與MapReduce一樣,并沒(méi)有考慮這種迭代結(jié)構(gòu)應(yīng)用程序的實(shí)現(xiàn)過(guò)程。對(duì)于這一缺陷,還沒(méi)有人提出一種改進(jìn)方案。因此,文中基于這種Topology編程模型,通過(guò)增加組件Receiver、IBolt、Checker組建迭代Topology,設(shè)計(jì)了一種新的可以解決迭代結(jié)構(gòu)應(yīng)用程序Topology模型,并對(duì)這種模型的新增組件和其對(duì)應(yīng)的API進(jìn)行了介紹和分析,在Storm系統(tǒng)架構(gòu)基礎(chǔ)上設(shè)計(jì)了一種迭代Topology的實(shí)現(xiàn)方案,描述了在這種實(shí)現(xiàn)方式下解決具有迭代結(jié)構(gòu)程序的具體過(guò)程,并使用這種模型實(shí)現(xiàn)了K-Means算法,實(shí)例論證這種迭代模型的可行性。
Storm編程模型原理[4]:一般任務(wù)都可以用流水線(xiàn)作業(yè)方式表現(xiàn)出來(lái),其中的組件就相當(dāng)于流水作業(yè)中的一個(gè)工人,不同的組件負(fù)責(zé)任務(wù)中不同的部分,一個(gè)組件處理完自己的工作即提交給下一個(gè)組件,直至整個(gè)任務(wù)處理完成。整個(gè)任務(wù)實(shí)現(xiàn)的過(guò)程可以用圖1的Topology來(lái)表示。這種模型的一個(gè)突出特點(diǎn):數(shù)據(jù)源可以靜態(tài)可以動(dòng)態(tài),動(dòng)態(tài)環(huán)境中它的表現(xiàn)更能體現(xiàn)出它的優(yōu)勢(shì)。一方面,這種模型采用消息傳遞方式交互數(shù)據(jù),數(shù)據(jù)量相比于從磁盤(pán)獲取要小,動(dòng)態(tài)環(huán)境中,數(shù)據(jù)量動(dòng)態(tài)讀取,每次讀取量小,很好滿(mǎn)足了這種模型的特點(diǎn);另一方面,這種模型是一種實(shí)時(shí)性處理模型,動(dòng)態(tài)環(huán)境中更能夠體現(xiàn)這種特點(diǎn)。所以,這種處理模型側(cè)重于流處理。模型包含兩類(lèi)組件:Spout和Bolt,Spout組件負(fù)責(zé)讀取數(shù)據(jù)源,Bolt組件負(fù)責(zé)實(shí)際的數(shù)據(jù)操作運(yùn)算。組件在實(shí)現(xiàn)中有對(duì)應(yīng)的API,Spout組件API為setSpout(),Bolt組件API為setBolt()。

圖1 Storm Topology
有許多并行算法內(nèi)部都帶有簡(jiǎn)單的迭代結(jié)構(gòu)。這些算法大多分布在數(shù)據(jù)聚類(lèi)、維度縮減、鏈接分析、機(jī)器學(xué)習(xí)和計(jì)算機(jī)視覺(jué)等領(lǐng)域。K-Means、確定性退火聚類(lèi)、PageRank和SMACOF算法就是其中的例子。這種具有迭代結(jié)構(gòu)的算法可以用以下公式來(lái)描述[3]:

其中,R0表示初始化時(shí)的結(jié)果,L表示一種不變的關(guān)系。這種公式表示的程序,只有當(dāng)?shù)竭_(dá)某檢查點(diǎn)時(shí)才將終止運(yùn)行。比如,迭代后的結(jié)果與前面結(jié)果相比已經(jīng)不會(huì)出現(xiàn)變化就可以作為一個(gè)檢查點(diǎn),這也是很多優(yōu)化算法迭代的檢查點(diǎn)。
迭代程序的一個(gè)關(guān)鍵點(diǎn)是結(jié)果與輸入具有相關(guān)性,上面的Topology中輸出結(jié)果無(wú)法再次作為輸入,所以這種迭代部分只能放入一個(gè)組件Bolt作為一個(gè)任務(wù)來(lái)處理,這樣就增加了該組件的處理負(fù)載。對(duì)于低配置集群,這種高負(fù)載一方面導(dǎo)致整體系統(tǒng)效率降低,另一方面還有可能使節(jié)點(diǎn)失效。因此,下面設(shè)計(jì)一種具有迭代結(jié)構(gòu)的Topology模型,在這種模型中,處理迭代結(jié)構(gòu)程序時(shí),就可以把迭代結(jié)構(gòu)部分按照功能拆分開(kāi)來(lái),而不是把整個(gè)迭代部分放入一個(gè)組件中,使得這種模型處理迭代問(wèn)題時(shí)更加靈活。
基于Storm的基礎(chǔ)編程模型,文中改進(jìn)其Topology,增加了迭代模塊,能夠解決迭代類(lèi)的問(wèn)題,如圖2所示。迭代Topology與之前Topology相比多了一個(gè)新拓?fù)?Iterator。Iterator負(fù)責(zé)處理具有迭代結(jié)構(gòu)部分程序。里面同時(shí)包含了IBolt組件、迭代檢查器Checker和外部數(shù)據(jù)源接收器Receiver。IBolt組件負(fù)責(zé)任務(wù)處理,迭代檢查器負(fù)責(zé)判斷迭代是否結(jié)束,外部數(shù)據(jù)接收器負(fù)責(zé)接收迭代器外部發(fā)來(lái)的數(shù)據(jù)。迭代Topology好比在流水線(xiàn)上的工人中增加一個(gè)迭代管理員,負(fù)責(zé)管理處理迭代任務(wù)的工人,并告訴他們什么時(shí)候進(jìn)行迭代操作,什么時(shí)候結(jié)束迭代。
圖2中描述的這種迭代模型看起來(lái)只能模仿Do-While循環(huán)模式,但While-Do或者Do-While在一定條件下可以互相轉(zhuǎn)化,所以這種模型其實(shí)可以解決任何迭代問(wèn)題。
迭代 Topology新增了 Receiver、IBolt、Checker 3個(gè)組件,通過(guò)新增的 3個(gè)API(setReceiver()、setIBolt()、setChecker())來(lái)實(shí)現(xiàn)。
(1)新增組件
Receiver組件:Receiver組件用來(lái)接收外部組件和內(nèi)部迭代組件Checker組件發(fā)送來(lái)的消息。將接收到的消息進(jìn)行排隊(duì)處理后發(fā)往迭代開(kāi)始的IBolt組件。Receiver組件一方面解決了Checker組件和Spout組件同時(shí)向IBolt組件發(fā)送消息的功能,另一方面也可以控制數(shù)據(jù)傳入IBolt組件的速率。
IBolt組件:IBolt組件與Bolt組件一樣,負(fù)責(zé)實(shí)際任務(wù)處理,但它處理的是需要迭代運(yùn)算的任務(wù),區(qū)別于非迭代功能的任務(wù)。
Checker組件:Checker組件是實(shí)現(xiàn)迭代過(guò)程的關(guān)鍵,迭代控制主要由它完成。主要功能是檢查迭代處理是否結(jié)束。與其它組件不同的是,有兩個(gè)發(fā)射口,一個(gè)是發(fā)往Receive組件,一個(gè)是發(fā)往外部Bolt組件。如果進(jìn)入下次迭代,消息發(fā)往Receiver;如果迭代結(jié)束,消息發(fā)往外部Bolt組件。

圖2 迭代Topology
(2)新增組件API
setReceiver():構(gòu)造1個(gè)接收器,2個(gè)參數(shù),參數(shù)1設(shè)定接收器名,參數(shù)2設(shè)定并行數(shù)目;
setIBolt():構(gòu)造1個(gè)迭代處理組件,3個(gè)參數(shù),參數(shù)1設(shè)定迭代處理組件名,參數(shù)2設(shè)定要處理的任務(wù),參數(shù)3設(shè)定并行數(shù)目;
setChecker():構(gòu)造1個(gè)迭代檢查器,4個(gè)參數(shù),參數(shù)1設(shè)定迭代檢查器名,參數(shù)2設(shè)定消息接收器名,參數(shù)3設(shè)定迭代檢查器任務(wù),參數(shù)4設(shè)定并行數(shù)目;
圖3為Storm中實(shí)現(xiàn)Topology模型的一種架構(gòu)[4]。該架構(gòu)由3個(gè)進(jìn)程組成:Nimbus進(jìn)程為主進(jìn)程,負(fù)責(zé)接收客戶(hù)端提交的代碼,并將代碼序列化,為客戶(hù)機(jī)分發(fā)任務(wù);Zookeeper進(jìn)程負(fù)責(zé)Nimbus進(jìn)程和Supervisor進(jìn)程之間的消息協(xié)同工作;Supervisor進(jìn)程負(fù)責(zé)接收任務(wù)并執(zhí)行任務(wù),將任務(wù)結(jié)果返回給用戶(hù)。
實(shí)現(xiàn)這種迭代Topology,仍然延用Storm系統(tǒng)基礎(chǔ)架構(gòu),需要改變的是系統(tǒng)的調(diào)度策略、組件類(lèi)型。一個(gè)完整迭代任務(wù)執(zhí)行過(guò)程如下:
(1)Nimbus進(jìn)程接收客戶(hù)端提交過(guò)來(lái)的具有迭代結(jié)構(gòu)的Topology,然后將每個(gè)組件序列化,并分發(fā)任務(wù)到Supervisor;
(2)Supervisor接收分發(fā)的任務(wù)并開(kāi)始執(zhí)行;
(3)Spout組件發(fā)射一條消息到迭代消息接收器Receiver,消息接收器將消息進(jìn)行排隊(duì),按照先來(lái)先服務(wù)策略,把消息發(fā)往第一個(gè)要開(kāi)始迭代操作的組件IBolt;
(4)IBolt組件處理完自己的任務(wù),根據(jù)用戶(hù)設(shè)定,決定任務(wù)結(jié)果是發(fā)往Checker組件還是發(fā)往下一個(gè)IBolt組件;
(5)Checker組件根據(jù)接收到的消息和用戶(hù)定義的檢查點(diǎn)決定是否繼續(xù)迭代操作,如果迭代完成則將消息發(fā)往外部Bolt組件,否則,將消息發(fā)往Receiver組件開(kāi)始下一次循環(huán)操作。

圖3 Storm框架
K-Means算法是數(shù)據(jù)挖掘中應(yīng)用廣泛的數(shù)據(jù)聚類(lèi)算法,算法程序結(jié)構(gòu)包含了迭代處理部分,這種具有迭代結(jié)構(gòu)的算法很多,為了驗(yàn)證迭代Topology模型的可行性,文中選用比較典型并且為大家熟知的K-Means算法來(lái)實(shí)現(xiàn)。
K-Means算法的核心思想[5]是找出K個(gè)聚類(lèi)中心c1,c2,…,ck,使每個(gè)數(shù)據(jù)點(diǎn)xi和與其最近的聚類(lèi)中心cv的平方距離和被最小化(該平方距離和被稱(chēng)為偏差D)。對(duì)n個(gè)樣本進(jìn)行聚類(lèi)的過(guò)程如下:
(1)初始化:隨機(jī)指定k個(gè)聚類(lèi)中心(c1,c2,…,ck);
(2)重復(fù)下面過(guò)程直到D收斂:
①分配xi:對(duì)每個(gè)樣本xi,找到離它最近的聚類(lèi)中心cv,并將其分配到cv所標(biāo)明類(lèi);
②修正cv:對(duì)每一個(gè)cv移動(dòng)到其標(biāo)明的類(lèi)中心;
藥士道:“此人之前乃是假死,蓋因體力透支嚴(yán)重,精神高度緊繃,再加之外傷失血過(guò)多,導(dǎo)致身體機(jī)能衰竭。如今能重新恢復(fù)氣息,實(shí)數(shù)罕見(jiàn)。容我為她配些養(yǎng)神滋補(bǔ)的草藥,至于她能不能徹底醒過(guò)來(lái),還要看她自身的造化了。”
因此,在迭代 Topology模型上設(shè)計(jì)K-Means算法的Topology可以由圖4表示。
該Topology中,Spout組件為spout,Receiver組件為receiver,Checker組件為checker,IBolt處理組件包括caldistance和move,Bolt組件為writeResult,K-Means算法在迭代Topology上的實(shí)現(xiàn)過(guò)程的具體描述如圖5所示。

圖4 K-Means算法的迭代Topology
該實(shí)現(xiàn)采用5個(gè)點(diǎn)作為點(diǎn)群,選取2個(gè)種子點(diǎn),圖5描述了這種歸類(lèi)的大致過(guò)程,具體實(shí)現(xiàn)過(guò)程如下:
(1)spout組件隨機(jī)產(chǎn)生5個(gè)點(diǎn)作為點(diǎn)群,隨機(jī)產(chǎn)生2個(gè)點(diǎn)作為基點(diǎn),然后將點(diǎn)群和基點(diǎn)作為數(shù)據(jù)輸入發(fā)往receiver組件;
(2)receiver組件接收到輸入后把輸入數(shù)據(jù)發(fā)往迭代開(kāi)始組件caldistance,該組件用來(lái)計(jì)算基點(diǎn)與各個(gè)點(diǎn)群之間的距離;
(3)caldistance將計(jì)算完后的距離以及點(diǎn)群和基點(diǎn)位置發(fā)往move組件,move組件根據(jù)距離計(jì)算出點(diǎn)群中心,并將基點(diǎn)移動(dòng)到點(diǎn)群中心,將移動(dòng)后的基點(diǎn)與點(diǎn)群作為數(shù)據(jù)輸入發(fā)往checker組件;
(4)checker組件將第一次接收到的數(shù)據(jù)與初始化的基點(diǎn)相比較,比較完后將接收到的基點(diǎn)代替初始化的基點(diǎn)作為下一次比較的對(duì)象,初始化的基點(diǎn)位置一般為(0,0),因此第一次比較必然要進(jìn)行迭代處理,所以checker組件將接收到的點(diǎn)群和基點(diǎn)發(fā)往receiver,然后開(kāi)始第二次重復(fù)處理過(guò)程;
下面是構(gòu)建K-Means迭代Topology的主要模擬實(shí)現(xiàn)代碼段:
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout(“spout” ,new RandomProducePoints(5,2),2);
builder.setReceiver(“receiver” ,4);
builder.setIBolt(“caldistance” ,new calDistance(2),2).shufflegrouping(“ receiver”);
builder.setIBolt(“move”,new MovetoCenter(),2).shufflegrouping(“caldistance”);
builder.setChecker(“checker” ,”receiver” ,new checkChanges(),2).shufflegrouping(“move”);
builder.setBolt(“writeResult”,new writeResult(),1).shufflegrouping(“checker”);
以上代碼都是模擬仿真Storm系統(tǒng)中實(shí)現(xiàn)Topology的代碼來(lái)表述的。TopologyBuilder類(lèi)是構(gòu)建一個(gè)Topology圖需要的類(lèi),類(lèi)中包含了設(shè)置各種組件的方法,一個(gè)應(yīng)用的Topology就是通過(guò)這個(gè)類(lèi)來(lái)實(shí)現(xiàn)的,組件中的參數(shù)主要有4類(lèi):組件名;組件任務(wù);接收消息的方式;并行數(shù)目。接受消息有6種分組方式:隨機(jī)分組、字段分組、全部分組、全局分組、無(wú)分組、直接分組,示例中都為隨機(jī)分組。隨機(jī)分組有一個(gè)參數(shù),這個(gè)參數(shù)指定接受消息的組件名。隨機(jī)分組的意思就是隨機(jī)接受組件發(fā)送來(lái)的消息。例如,組件move有4個(gè)線(xiàn)程在同時(shí)工作,組件caldistance有5個(gè)線(xiàn)程同時(shí)工作,move隨機(jī)接收caldistance發(fā)送來(lái)的消息,就表示move的任意一個(gè)線(xiàn)程接受caldistance任意線(xiàn)程計(jì)算完后的結(jié)果。并行數(shù)目,指共同執(zhí)行該組件任務(wù)的線(xiàn)程數(shù)目。并行數(shù)目都需要用戶(hù)根據(jù)任務(wù)情況來(lái)設(shè)置。示例中receiver采用4個(gè)線(xiàn)程,writeResult采用1個(gè)線(xiàn)程,其它的都是2個(gè)線(xiàn)程。receiver線(xiàn)程數(shù)目為4,因?yàn)樗纫邮誷pout的消息還要接收checker的消息。

圖5 迭代 Topology實(shí)現(xiàn)K-Means算法過(guò)程
迭代 Topology在Storm Topology原型基礎(chǔ)上增加了 Receiver、IBolt、Checker組件,Receiver和Checker組件與IBolt組件連接組成了一個(gè)具有迭代功能的Topology圖,使用這種迭代Topology圖成功解決了具有迭代功能的K-Means算法,這種方案因?yàn)槭窃谝郧盎A(chǔ)上添加組件完成,所以就很好保留了原Topology的特點(diǎn)。這種迭代Topology在實(shí)現(xiàn)方式上,保留了Storm基礎(chǔ)架構(gòu),只是調(diào)整了主進(jìn)程的調(diào)度策略和組件類(lèi)型,實(shí)現(xiàn)上降低了后續(xù)開(kāi)發(fā)難度。
致謝:感謝成都市科技局創(chuàng)新發(fā)展戰(zhàn)略研究項(xiàng)目(11RKYB016ZF)對(duì)本文的資助
[1] 孟小峰,慈祥.大數(shù)據(jù)管理:概念,技術(shù)與挑戰(zhàn)[J].計(jì)算機(jī)研究與發(fā)展,2013,50(1):146-169.
[2] Ekanayake J,Li H,Zhang B,et al.Twister:a runtime for iterative mapreduce[C].Proceedingsof the 19th ACM International Symposium on High Performance Distributed Computing.ACM,2010:810-818.
[3] Bu Y,Howe B,Balazinska M,et al.HaLoop:Efficient iterative data processing on large clusters[J].Proceedings of the VLDB Endowment,2010,3(1-2):285-296.
[4] Storm-wiki.[EB/OL].http://github.com/nathanmarz/storm/wiki/,2013-06-10.
[5] 孫吉貴,劉杰,趙連宇.聚類(lèi)算法研究[J].軟件學(xué)報(bào),2008,19(1):48-61.
[6] Dean J,Ghemawat S.MapReduce:simplified data processing on large clusters[J].Communications of the ACM,2008,51(1):107-113.
[7] Neumeyer L,Robbins B,Nair A,et al.S4:Distributed stream computing platform[C].Data Mining Workshops(ICDMW),2010 IEEE International Conference on.IEEE,2010:170-177.
[8] Cherniack M,Balakrishnan H,Balazinska M,et al.Scalable Distributed Stream Processing[C].CIDR.2003,3:257-268.
[9] Nathanmarz-blog[EB/OL].http://nathanmarz.com/.2013-07-10.
[10] Storm-berkeley[EB/OL].storm-berkeley.pdf.2013-09-01.
[11] 張建萍,劉希玉.基于聚類(lèi)分析的K-means算法研究及應(yīng)用[J].計(jì)算機(jī)應(yīng)用研究,2007,24(5):166-168.
[12] 鄧華鋒,劉云生,肖迎元.分布式數(shù)據(jù)流處理系統(tǒng)的動(dòng)態(tài)負(fù)載平衡技術(shù)[J].計(jì)算機(jī)科學(xué),2007,34(7):120-123.
[13] 亓開(kāi)元,趙卓峰,房俊,等.針對(duì)高速數(shù)據(jù)流的大規(guī)模數(shù)據(jù)實(shí)時(shí)處理方法[J].計(jì)算機(jī)學(xué)報(bào),2012,35(3):477-490.
[14] Getting Started with Storm[EB/OL].Getting Started with Storm.pdf.2013-08-18.
[15] S4 vs Storm[EB/OL].s4vStorm.pdf.2013-08-08.