王 梓,梁正和,吳瑩瑩
(河海大學 計算機與信息學院,江蘇 南京 211110)
大數據時代的到來,信息已成為各行各業發展的決策依據。世界計算機數據信息總量海量增長,已經有越來越多的企業、教育、科研機構和生廠商們意識到這些數據中所包含的重要價值,ETL的發展已成為必然趨勢[1]。企業在使用各種應用系統期間往往積累了大量的信息、數據資源,計算機技術的快速發展使用戶可以對這些信息進行采集、分析、處理,它們成為了企業發展的決策性依據,構成了企業發展的寶貴財富。隨著時間的推移,技術的進步,原有舊系統不斷升級從而被新系統完全取代,在此使用期間往往會積累大量珍貴的歷史數據,這些數據也是新系統成功啟用的關鍵。同時,這些從舊系統到新系統使用過程中的歷史數據也是企業進行決策發展,制定戰略方向的重要依據。
如今,隨著數據的大量累積,越來越多的企業、廠商都在構建數據倉庫(data warehouse)來滿足企業在發展過程中的戰略需要,他們需要將這些可能來自不同的軟硬件平臺、操作系統、數據模型乃至地理上分布、管理上自治和模式上異構的數據源進行集成。一個或多個不同數據源的相關數據可以進行綜合集中放入數據倉庫[2],在數據倉庫中可以針對不同的主題和匯總數據進行統計和分析,從而為決策人員提供數據支持。在構建數據倉庫的過程中,首先需要將各種分布的、異構的數據源中的數據抽取出來,在此過程中進行清洗、轉換、集成最后加載到數據倉庫中,這個過程叫做抽取轉換裝載即ETL(extraction transformation loading)[3]數據遷移。ETL在構建過程中需要面對傳輸效率、準確性、數據異構性、多目標等問題。傳統的ETL在解決不同操作系統之間使用不同的編程語言問題方面,設置一個專有的轉換引擎置于數據源和目標數據倉庫[4]之間,用于運行所有的轉換程序。但在數據轉換過程中,專有引擎執行所有轉換工作成為“瓶頸”。隨著企業的發展,數據需要從結構化數據源(關系數據庫),非結構化數據源(PDF文件、郵件等),半結構化數據源(XML和其他標記語言),遺留系統(主機)、應用程序包(SAP)等異構數據源中提取,同時數據量也呈現出遞增式的增長,對數據的存儲[5],數據的異構性、并發性進行研究已成為當前的主要研究方向。
在ETL構建過程中,盡量降低ETL過程的設計與維護代價,提升ETL過程的執行效率,是企業在實際項目中重要考慮的問題,因此,設計一種優秀的ETL工具對數據倉庫[6]非常有益。利用ETL工具可以對異構數據源中的業務數據進行抽取和轉換,并將其裝載到數據倉庫[7],其主要作用是對各類業務數據的清理、轉換和裝載,為基于數據倉庫的決策分析應用提供高質量的數據。截止目前,生產的數據量大大提高,傳統數據處理和數據倉庫技術已不能滿足海量數據[8]處理的現實需求,因此基于Kafka和Disruptor,提出一種對傳統ETL進行改進的模型[9],并就某教育企業對全國高校系統數據遷移進行了實驗研究。
ETL是對數據進行抽取、清洗、轉換和裝載[10]的過程,數據從異構數據源中抽取,遷移到指定的目標庫。其間,數據的抽取、清洗、轉換和裝載形成串行或并行的過程。T過程是ETL的核心,也是數據的轉換,而抽取和裝載一般可以作為轉換的輸入和輸出,或者作為一個單獨的部件,其復雜程度沒有轉換部件高。ETL是構建數據倉庫的重要組成部分,用戶從數據源抽取出所需的數據,經過數據清洗,最終按照預先定義好的數據倉庫模型,將數據加載到數據倉庫中。傳統的ETL上手快,易操作,可是當數據海量增長時,傳統的ETL在性能和數據處理的準確性、多樣性、并發性等方面卻大打折扣。現在數據的ETL過程經常會選擇Kafka作為消息中間件應用在離線和實時的場景中。針對上述問題,文中基于Kafka和Disruptor并發框架對傳統ETL進行了改進。
Disruptor是一個高性能的異步處理框架,一般被設計在生產者—消費者(producer-consumer problem,PCP)問題上,可以獲得盡量高的吞吐量(TPS)和盡量低的延遲。針對“并發、緩沖區、生產者—消費者模型、事務處理”這些元素的程序來說,Disruptor是一種大幅提升性能(TPS)的方案。它本質上是個ringbuffer,buffer(就是數組)做過優化防止JVM偽共享,lock free是通過CAS自旋[11],多線程[12]并發獲取buffer中的序號,這里需要CAS,把事件放入槽中,工作線程調度交給jdk線程池,只要buffer中有事件,就不停提交給線程池,不需要鎖進程,解決了多線程讀寫,實現讀寫同步,解決了數據延遲的問題。
(1)disruptor沒有鎖,所以效率高,速度快。
(2)所有訪問者都記錄自己序號的實現方式[13],允許多個生產者與多個消費者共享相同的數據結構。
近年來,Kafka作為一個新興的分布式消息系統,受到了眾多企業、科研機構的青睞。Kafka在分布式集群應用中作為多種類型的數據管道和消息系統[14]而應用廣泛。流數據是大多數集群統計和實時數據采集過程中所產生的數據,可能包括頁面訪問量、物聯網傳感器采集數據等方式。Kafka用作LinkedIn的活動流(activity stream)和運營數據處理[15]管道(pipeline)的基礎。在消息保存中Kafka根據每個topic進行歸類,消息發送者稱為producer,接收消息者稱為consumer,此外Kafka集群[16]由多個Kafka實例組成,每個實例(server)稱為broker。
(1)持久化能力的高效性。對TB級以上的數據也可以在常數時間復雜度讀取和寫入硬盤。
(2)支持broker間的消息分區,并保證分區中消息讀取的有序性。
(3)分布式系統,易于向外擴展。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴展機器。
(4)消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
(5)異步:Kafka分布式[17]消息系統采用異步通信機制,消息進入系統緩存后系統無需立刻應答或處理,可以根據用戶需求和配置情況選擇。
(6)持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份,防止數據丟失。
基于Kafka和Disruptor數據處理技術對傳統ETL的改進如圖1所示。

圖1 改進的ETL模型
實驗中,Kafka結合Disruptor技術,數據通道在啟動后,會啟動一個restful數據被動接收接口,前端埋點或其他的數據收集服務會將收集到的數據以restful+json的方式傳到接口,接口在接收到數據后,會根據配置的類型,將數據發送到指定的接收源,接口源可以是配置的一個read-type,是Kafka的job,實現數據之間的快速傳輸。從多種數據源中抽取數據,將數據發送到Disruptor的RingBuffer環形區域,Disruptor數據消費者發送數據到Kafka服務器,由于Disruptor高性能[18]、低延遲的特點,從而提高了目標源數據到Kafka中數據的速度,因此在傳輸速度方面對傳統的ETL有很大的改進,極大節約了時間[19]。再利用Kafka高吞吐和異步的特性,Disruptor可持續發送數據到Kafka中,Kafka消費者處理數據,節省了數據等待傳輸時間,同時該模型可接受多種不同數據源,實現數據多樣性的傳輸。
為此,在研發創新方面,華岳每年投入大量資金,不斷提升產品質量和智能化程度,同時也收獲了寶貴的自主知識產權。截至目前,公司共獲得48項專利,其中發明專利7項,實用新型38項,有力支撐了產品更新升級和向高端化發展。
啟動數據通道,在通道啟動一個restful數據被動接收接口,解析配置文件,程序初始化操作→啟動消費者線程→生產者進程發布事件到Disruptor,從目標源中讀取數據到Disruptor的RingBuffer環形隊列中,Disruptor消費者將RingBuffer環形隊列[20]收集到的數據通過restful+json的方式傳到接口,接口在接收到數據后將數據發送到Kafka。由于broker的增加或者減少都會觸發Consumer Rebalance,數據通過Kafka Consumer開始處理partition里面的message,實驗中接收源為jdbc,數據發送到指定的接收源過程中對數據進行了清洗和裝載。由于Kafka高吞吐、異步性的特點,可以將數據存放在Kafka服務器端,隨時處理服務器端的數據。
Disruptor讀取源數據和發送數據給Kafka broker服務器,在程序中的主要流圖如圖2所示。

圖2 Disruptor讀寫過程
Kafka原理結構如圖3所示。

圖3 Kafka原理
圖4所示為改進后的ETL將MYSQL數據庫中的數據遷移到Postgresql數據庫中。t_ampa_useraction分別在MYSQL不同的數據庫下,在job配置文件中配置了目標源和目標數據庫,啟動項目,開始數據遷移。實驗中使用了4張表同時進行,由開始時間和接入時間可見,極大地節約了數據傳輸時間。

圖4 MYSQL中數據導入Postgresql
圖5所示為傳統ETL將MYSQL數據庫中的數據遷移到Postgresql數據庫中,兩個ETL同時啟用。

圖5 傳統ETL MYSQL中數據導入Postgresql
通過實驗對比,現在數據的ETL過程經常會選擇Kafka作為消息中間件應用在離線和實時的場景中,結合Kakfa的特性和Disruptor高并發、高吞吐的特點,Disruptor消費者發送數據到Kafka服務器中,實現數據的高效傳輸。傳統的ETL當數據量上升到一定程度時,傳輸的記錄有時會缺失,處理時間過長,無法實現并發存儲,對此做出了改進。在實際開發過程中,因為同時對各個高校的數據進行遷移,數據量急劇增長,因此使用Kafka高吞吐量、低延遲、異步的特性,可以極大提高數據的傳輸效率。
文中利用Kafka和Disruptor并發框架兩種數據處理技術快速構建數據ETL通道,憑借高吞吐量、低延遲的特點,極大節約了數據之間的傳輸時間。實驗采用分布式消息系統作為大規模流數據的緩存,提高了平臺對動態流數據輸入數據量突發性變化的適應能力。針對多種數據源如http、txt、jdbc等的處理,對傳統ETL進行了改進,實現對大量數據的并發處理,使不同數據庫之間的數據能夠快速、同步、多樣地傳輸。
雖然對傳統ETL在處理速度和吞吐量方面進行了改進,但是在排序、分頁等功能上做得還不夠完善,當job上升到10個以上時,xml文件解析便容易出錯,對這些問題將有待進一步完善。