李團結+從新法+李光明
【摘要】 日志對于每個系統來說都是不可或缺的一部分,而現階段對日志的處理效率卻不盡如人意。實時性日志考驗的是大數據處理框架的實時計算能力,基于Storm 并借助開源框架 Kafka,設計了一個實時數據收集與處理的系統,將數據轉為流的形式,對收集來的數據直接在內存以流的形式進行計算,輸出有價值的信息保存到Redis。最后對系統進行性能測試以及計算能力的測試。實驗結果表明,該系統可擴展性良好,且并行計算能力穩定,適合大量實時數據處理。
【關鍵字】 Storm Kafka Redis
一、引言
大數據時代,與互聯網行業息息相關的諸多領域中用戶數量和其產生的數據在不斷地累加,為之提供支撐的服務器端存放的日志信息量也隨之劇增,如何準確及時的篩選海量日志中的關鍵信息成為了亟待解決的問題。眾所周知,Hadoop架構可以使用戶可以在不了解分布式底層細節的情況下,開發分布式程序。充分利用集群的威力進行高速運算和存儲,但是對于實時性極強的流式數據,顯然流處理框架Strom更適合,并且處理效率客觀。
二、Storm計算框架
Storm是由BackType開發并被Twitter于2011 年開源的分布式實時計算系統[1],能夠很容易可靠地處理無界持續的流數據,進行實時計算 [2]。
任務拓撲是Storm的邏輯單元,一個實時的應用打包為拓撲后發送,拓撲是由Spout和Bolt組成,其二者的關系如圖1所示。Spout節點從數據源中源源不斷的消費數據并把數據發送到后面的Bolt節點,而Topology是將Spout和Bolt組合在一起完成一項具體的計算任務。Topology一旦提交就會一直執行。
Storm主從架構圖包含一個主節點Nimbus和多個從節點Supervisor,Zookeeper完成兩者之間的協調。每個 Worker都執行且只執行任務拓撲中的一個子集, 在每個Worker 內部,會有多個 Executor,每個 Executor對應一個任務,負責具體數據的計算,即用戶所實現的 Spout /Bolt 實例。
三、日志綜合管理平臺基于Storm的實現方案
3.1開發環境及采用的測試數據集
硬件環境包括Storm集群,Kakfa集群,Zookeeper集群,Storm包括1個Nimbus和4個Supervisor;Kafka集群包括5個節點;Zookeeper集群也包括5個節點,集體配置如表1所示。

軟件環境:jdk-1.7.0_79、logstash-2.3.4、elasticsearch-2.3.4、storm-0.9.5、kafka_2.9.1-0.8.2.0
zookeeper-3.3.5、python-2.7.12。
操作系統:Linux version 3.10.0-327.el7.x86_64
數據集:用戶話單日志信息(約2 billon/day)。
3.2平臺架構及處理流程
日志綜合處理平臺主要由三層組成,包括:數據采集層、數據分析及存儲層以及數據展示層。可以實現對日志從采集到分析處理的全過程并在頁面監控平臺顯示。
本實驗方案使用 Kafka為消息中間件傳遞消息。Kafka是一種高吞吐量的分布式發布訂閱消息系統,其依賴Zookeeper保存每組消費者消費的相應Topic的偏移量。
SpoutA接收待初始化的數據,并將其發K-means&DBSCANBolt 通過數據簇形態識別以初始化微簇;SpoutB從Kafka中接收初始化后待處理的流數據,將其發送至LocalBolt進行局部微聚類;SpoutC用作處理時間戳,每單位時間向LocalBolt發送一次信息,當接收到時間戳消息,將局部微聚類更新結果存放到Redis做實時局部微聚類更新結果的保存,并合并原有的增量信息發送到GlobalBolt;SpoutD通過消息中間件 Kafka接收用戶發送的查詢參數。
K-means&DBSCANBolt接收 SpoutA傳輸的待初始化數據與聚類參數 k(簇數),進行標準 k-means聚類或者DBSCAN聚類,聚類的結果以微簇形式發送至 LocalBolt隨后根據時間戳信息保存結果到Redis,并由滑動窗口觸發機制合并局部微簇到全局微簇GlobalBolt。RL-DSCA算法的微簇在線維護微簇進行的在線增量更新是由LocalBolt來實現的,體現了RL-DSCA算法分布式數據的處理,到達的待處理流數據將會分配到各個LocalBolt節點,這些節點具體的功能均不相同,LocalBolt各節點處理流程如圖2所示。主要處理Bolt的實現功能如下。
extractBolt:該Bolt主要實現從初始化后的數據流中篩選目標信息,并將篩選出來的數據發送到下一個處理bolt。
judgeSysTimeBolt:該Bolt用來判斷系統時間和時間戳的關系檢測拓撲停止工作的異常情況,如出現拓撲異常,系統時間>時間戳時間,對時間戳補齊并進行更新(updateTimestampBolt)結果存放到Redis。
judgeLogTimeBolt:改Bolt主要是判斷來的日志是實時日志還是歷史日志,如果日志時間在時間戳范圍內即為實時日志,否則按照歷史日志來處理。
sendAndUpdateRedisBolt:實時日志的發送,根據SpoutC傳來的時間戳消息,將局部微聚類更新結果存放到Redis。
submitLastValueBolt:該Bolt用于處理歷史日志的最后一個時間戳,根據來的一條正常日志觸發將歷史日志的微簇發送到Redis。
abnormalHandleBolt:該Bolt主要對歷史日志進行處理,避免影響實時流數據的處理,并將歷史日志的處理結果合并到Redis供全局微簇的合并。
現將該平臺的主要功能概述如下:
接收 K-means&DBSCANBolt生成的初始化微簇生成初始緩存集Kafka;對于到達拓撲的待處理的數據流,LocalBolt按照單位時間生成局部聚類增量,并將該中間結果發送至Redis供合并;Redis實現RL-DSCA算法的合并部分,即合并局部增量結果進行全局微簇增量更新:接收LocalBolt生成的初始化微簇生成初始全局微簇;緩存各局部線程傳輸的中間結果;使用滑動窗口觸發機制,達到觸發時間點則合并暫存的中間結果,將結果打上相應時間標記Tag,生成實時全局微簇快照發送至GlobalBolt。GlobalBolt實現RL-DSCA算法的查詢輸出;接收GlobalBolt生成的全局微簇快照,將其存儲至金字塔時間幀結構中供后續查詢;當用戶輸入查詢參數時,通過SpoutD接收查詢參數,查找金字塔時間幀結構中的相應數據,將查詢結果發送至SendBolt 進行輸出。
四、結束語
本文設計開發了流數據計算平臺 Storm 的計算架構處理海量數據日志綜合管理平臺,結合Kafka和Redis對日志進行了實時性的分析和處理。滿足了用戶對大數據量日志信息的使用需要,并達到了客觀的處理效率。
參 考 文 獻
[1] The Apaehe Foundation. Storm official website- [EB/OL].https://storm.apache.org/.
[2] Github Inc. Storm Wiki[EB/OL]. https://github.com/apache/storm.