章琦皓 王 楓 王月婷
1(中科院等離子體物理研究所 安徽 合肥 230031)2(中國科學技術大學 安徽 合肥 230026)
MDSplus作為EAST聚變實驗數據存儲的主要工具之一,每年有大量的聚變科學家對MDSplus實驗數據進行訪問[1-2]。伴隨著實驗室的MDSplus存儲的數據量日益增長,訪問MDSplus的用戶也隨之增加。防止用戶惡意訪問MDSplus中某單一節點數據從而導致服務器負載過大、監控MDSplus服務器的流量的出入變得尤為重要。目前正在使用的MDSplus服務,其日志系統只是單一的記錄了用戶的TCP/IP連接記錄,并沒有記錄用戶任何其他相關的操作記錄,這給MDSplus的監控帶來一定的盲區。如果能對MDSplus上所有用戶的操作進行監控,并且及時、沒有偏差地記錄下來,就可以通過統計知道用戶對聚變實驗某些數據的偏好,體現該信號量所具有的研究價值。對MDSplus日志進行數據分析,提取有效的日志信息,采用現有的大數據技術上的機器學習等方法,搭建出一套可用的MDSplus日志應用平臺。
據統計目前每天EAST上的MDSplus日志大概有3萬條日志記錄,這些還只是單一的TCP/IP記錄,如果通過完善目前的MDSplus日志系統,可以記錄所有用戶的操作,那么每天會有百萬條日志記錄。當然這一日志記錄很有可能在實驗期間某一時間段呈現爆發式增長,在秒級別內產生百萬條日志記錄,在為了應對未來海量的數據日志消息的產生,本文借助大數據技術進行海量日志的分析。
另外,MDSplus聚變實驗數據存儲量很快要達到PB級,未來聚變實驗數據很有可能使用類似于Hadoop這樣的大數據框架進行存儲,使用大數據技術進行日志分析迎合了未來數據存儲的發展趨勢。
目前國內外所有使用MDSplus的實驗室或者研究機構沒有針對MDSplus日志這一項功能進行相關的技術上的完善,更沒有相關的日志上數據的分析,所以在原有日志的基礎上構建一個基于大數據技術的MDSplus日志分析系統具有技術上的挑戰和實際實驗中的意義和價值。
作為EAST實驗數據重要的存儲工具,MDSplus日志系統需要改變以往的簡單的記錄方式。EAST上MDSplus access日志原有的格式如下:
1) {date} (pid number) Connection received{or disconnected} from {username}@{ipAddress}
2) Invalid message
格式只是單一的記錄下了簡單的遠程用戶的連接記錄,包括用戶名和IP地址,其次還有一些無效的日志信息參雜在日志中。實際情況中,MDSplus日志系統需要記錄下更多有效的日志信息。如表1所示,希望能夠記錄更多關于用戶在MDSplus上的數據操作類型,如GetData、GetSegment等操作。

表1 MDSplus日志設計需求
完善現有的MDSplus日志系統,增加更多的日志記錄信息之后才能建立一個集離線與實時于一體的日志分析系統。整個架構系統能夠達到線上實時預警、流量監控。線下提取有效信息,采取應對手段的功能。整個工作分成四步:
1) 完善的MDSplus日志功能。
2) 針對MDSplus日志進行離線。
3) 針對MDSplus日志進行實時分析。
4) 日志數據可視化。
日志分析系統整個軟件架構如圖1所示。整個系統的設計是在擁有完整的日志信息前提下借助于現有的大數據技術對日志信息進行處理。

圖1 系統總體架構圖
系統用到的大數據技術和概念包含以下幾個方面:
1) Flume:分布式、可靠、高可用的海量日志聚合的系統,支持在系統中定制各類數據發送方,用于收集數據[3]。
2) Kafka:高吞吐量的分布式發布訂閱消息系統,形成流式數據,供Spark Streaming 進行流式計算[4]。
3) Hadoop:大數據分布式開發框架,使用HDFS進行數據存儲。
4) Spark Streaming:基于Spark生態圈的準實時流數據計算框架[5]。
5) Zeppelin:大數據可視化工具,除了能夠接入傳統的MYSQL數據源以外,還能很好地接入Hadoop和Spark的數據源。
6) Web端:傳統的數據展示手段,其中使用到了Echarts等開源插件。
由圖1可看出,遠程用戶針對MDSplus服務器進行數據訪問,產生大量的數據訪問日志。該日志信息實時被Flume服務監聽,監聽到日志的變化,一方面將其發送到Hadoop集群中的HDFS(Hadoop分布式文件系統)中進行持久化存儲,方便日后的離線處理,另一方面將日志信息發送到Kafka服務中,轉換成實時數據流供Spark Streaming進行實時的流數據處理。因為原有的MDSplus日志信息在一段時間后會自動被新的日志信息給覆蓋,所以在數據采集方面為了保證原有的MDSplus日志信息持久化,在數據收集時先在Flume端進行自定義正則過濾器,將不必要的日志信息過濾掉。一方面在離線數據存儲時候將日志信息按照年月日時間被歸分到HDFS存儲下不同的目錄中,另一方面在實時數據傳輸中將日志信息按照不同的日志類型存儲在Kafka不同的Topic中。采用這樣方式進行數據采集,將傳統的服務器日志信息和新生的大數據采集框架有機地結合起來,從離線和實時兩個方面使得日志信息的收集存儲有很好的條理性和邏輯性。數據還可以持久化到MYSQL數據庫中,中間利用到Zeppelin數據可視化工具和Web數據展示工具。至此,整個流程將日志的產生、處理、展現綜合起來,形成了一個完整的EAST實驗數據訪問日志分析系統。
根據現有的MDSplus源碼接口,采用鉤子函數監聽的方式對整個MDSplus服務器進行監聽[6]。設計對應的鉤子函數可以對所需要的信息進行鉤取。將需要監聽MDSplus的用戶操作使用枚舉的方式存儲,然后對應到相應的Notify通知中,針對不同的操作調用CallHookback函數,通知到MDSplus的日志文件中去。整個MDSplus日志架構和流程如圖2所示。

圖2 MDSplus日志完善架構圖
1) 遠程客戶發出請求,連接到MDSplus服務器;
2) 客戶在服務器上進行一系列的TreeOpen、GetData等操作;
3) 對應的操作觸發TreeCallHook函數;
4) TreeCallHook函數觸發libTreeShrHook.so動態鏈接庫;
5) 動態鏈接庫將相應的日志內容以刷新緩沖的方式打印到日志文件中;
6) 客戶端斷開與服務器的連接。
完善的日志系統中調用的動態鏈接庫LibTreeShrHook.so使用到的鉤子函數算法如下(偽代碼):
int Notify(TreeshrHookType Htype, char *tree, int shot, int nid)
{
SomeVarDefine();
//定義一些記錄信息變量
switch (Htype) {
//匹配對應的數據操作類型
case OpenTree:
name=″OpenTree″;
Operation_1();
break;
…………………… 省略……………………
case GetSegment:
name=″GetSegment″;
Opreation_n();
break;
}
printf(Meassage);
//打印日志信息
fflush(stdout);
//刷新日志信息到標準輸出中
if (path != na && path != (char *)0)
free(path);
//釋放節點路徑
return 1;
}
值得注意的是,目前鉤子函數的觸發條件是遠程的客戶端連接方式,暫不支持本地操作日志記錄功能。該算法基本能夠實現目前所需要的MDSplus Log的功能。圖3是目前完善后的日志文件能夠記錄到的日志內容,新增加了{date} (pid number) HookType called for {node absolutely path}日志格式,使得日志信息更加完整、可靠。

圖3 MDSplus日志內容
MDSplus日志內容作為Flume的代理對象Agent的數據來源,將日志信息緩沖到Channel中。采用了Flume的選擇分流模式,將事件流向兩個目的地。在離線模式下,Channel1介質設置為磁盤介質,一旦達到緩沖大小,就將日志內容發送到下游Sink1指定的HDFS中進行存儲。分流模式如圖4所示。

圖4 Flume分流模式
HDFS中的存儲內容是Flume過濾后以天數為單位存儲的日志信息,使用Hadoop的MapReduce計算框架將日志信息分解成兩種不同的數據模型[7]:
(1) 客戶模型。記錄著當前用戶的信息,包括用戶連接或者斷開連接的時間、當前連接的進程號、用戶名、IP地址、當前用戶狀態等信息:
client(linkTime:String,pid:Integer,user:String,host:String, status:String)
(2) 操作模型。記錄著用戶連接MDSplus服務器后一系列的操作信息,包括操作時間、進程號、數據操作類型、操作的樹名、炮號名等信息:
operation(linkTime:String,pid:Integer,hooktype:String, tree:String, shot:Integer, nodepath:String)
考慮到日志信息中含有多種不同類型的日志信息種類,所以在MapReduce的map過程需要接收兩種不同的輸入數據類型進行序列化,分別是client數據類型和operation數據類型。然后繼承Hadoop接口中的GenericWritable類,將兩種數據類型結合起來,這樣就解決了map過程中可能出現不同的數據類型的情況,具體如下:
public class logWritable extends GenericWritable {
private static Class extends Writable>[] CLASSES=null;
static {
CLASSES=(Class extends Writable>[]) new Class[] {
org.apache.hadoop.io.Text.class,
ClientWritable.class,
//自定義client類型
OperationWritable.class
//自定義operation類型
};
}
……………………………省略……………………………
}
經過數據的ETL過程,可以看到MDSplus日志信息被提取出來放在以下兩個數據庫表中。每個表中部分信息如圖5所示。

圖5 離線處理結果
考慮到要從MDSplus日志中獲取到實時的信息,從而即時地采取手段進行應對惡意的服務器攻擊等行為。關于實時計算框架在Spark Streaming和Storm之間的選擇,可以清楚地看見,Storm對于消息的處理是純實時的,是一條一條消息進行處理,但是相比較于Spark Streaming吞吐量比較低[8]。基于以下幾點的考慮,實時處理框架最終選擇了Spark Streaming計算模型:
1) MDSplus日志分析不需要達到純實時的精確度。
2) Spark生態對實時計算、離線批處理、交互式查詢等業務功能可拓展性強。
3) Spark生態圈很容易和現有的Hadoop生態圈結合。
結合圖4,很容易看到MDSplus的日志流的下一個目的地是Kafka,其中采用的緩沖通道Channel2是內存緩沖。為了避免Flume直接將日志文件直接發送給Spark Streaming處理導致的計算框架崩潰的情況,其將消息流先發送給Kafka這個消息中間件,日志數據以發布-訂閱的模式實時記錄到對應的topic里,Spark Streaming從相對應的Topic中讀取數據流進行流數據計算。
在整個準實時數據處理流程中,采用Spark原生的編程語言Scala進行編程,降低了代碼的冗余。處理的過程中,根據流數據的內容進行過濾,提取日志內容中有效字段。將原有的RDD(resilient distributed dataset)轉換成以RDD為基礎的分布式數據集的DataFrame形式。其中DataFrame應用于使用SQL處理數據的場景,在系統中采用了Spark的SQLContext類,將處理后的字段寫入到MySQL數據庫中。部分處理過程如下所示:
//開始處理整個日志內容
logs.foreachRDD(logs=>{
//創建一個sqlcontext單例模式
val sQLContext=
SQLContextSingleton.getInstance(logs.sparkContext)
import sQLContext.implicits._
//client日志內容處理
var flag =″OFF″
val logClient=logs.filter({s=>
s.contains(″Connection″)
}).map({k=>
k.split(″ ″)
}).map({t=>
if(t(9)==″received″)
flag=″ON″
else
flag=″OFF″
new client(
linkTime=t(0)+″ ″+t(1)+″ ″+t(2)+″ ″+t(3)+″ ″+t(4),
//后面轉換成timeStamp
pid=t(7).replace(″)″,″″).toInt,
user=t(11).split(″@″)(0),
host=t(11).split(″@″)(1),
status=flag
)
}).toDF()
logClient.registerTempTable(″client″)
經過SparkStreaming處理后提取出來的字段放置在不同的DataFrame中,最終的結果存到MYSQL數據庫中,供數據展示前端進行可視化。
無論是離線的數據處理,還是涉及到的實時數據處理,都需要將數據進行可視化,方便大家快速直觀地了解目前MDSplus服務相關的信息。在前端的展示上采取了Zeppelin數據可視化工具和傳統的Web展示工具兩種方式相結合的手段。Zeppelin作為大數據可視化工具,不僅能夠很好地支持Spark和Hadoop,還能和傳統的MYSql相互連接。Web展現的方式采用了Echarts插件,將MDSplus服務器狀態能夠直觀展現出來[9]。圖6是Web端日志數據可視化的內容之一,顯示當前各國在線人數以及當前最長在線的用戶。

圖6 數據可視化Web展示
系統測試過程中,采取多線程并發式模擬多用戶訪問MDSplus數據庫,并對MDSplus數據庫進行各種數據讀取等操作。模擬并發用戶量1 000多名,每個用戶的操作平均產生20條日志,共計產生約3萬條數據。測試采取兩種不同的方式進行日志處理,分別是MapReduce方式的離線數據處理、Spark Streaming的準實時數據處理方式。表2是兩種不同的處理方式的時間上的對比。

表2 MDSplus日志處理方式對比
由于離線數據在處理的過程中,需要啟動系統的資源,所以耗費比較長的時間,但是在數據量達到海量時,該處理方式具有一定的優勢。而準實時處理是按照時間切片進行數據拉取和處理,所以在實時性方面占有優勢。該工作系第一次對MDSplus日志進行功能完善和日志信息處理的工作,目前還沒有其他相關的工作對MDSplus進行日志完善和處理,屬于原創性工作,所以暫時沒有和其他的工作在時間和結果上進行對比。
本文介紹了利用大數據技術對EAST數據訪問日志分析系統的設計和實踐。該系統極大地方便了聚變科研人員對EAST實驗數據的管理。首次對MDSplus的日志系統進行改進,完善了MDSplus日志信息。針對用戶行為產生的海量日志數據,使用大數據技術中比較成熟的HadoopMR、SparkStreaming等技術很好地完成了日志的離線和在線的解析。這項工作不僅為 聚變領域中數據訪問工作提供了借鑒,還對其他的海量日志的處理工作具有一定的參考價值。