任培花,蘇 銘
(山西大同大學 計算機與網絡工程學院,大同 037009)
“套牌車”是指未在交管部門辦理手續,偽造、冒用他人合法車輛和行駛駕照的車輛[1].近年來,隨著機動車數量的增加,車輛套牌現象屢見不鮮,嚴重損害了交通參與者的合法權益,增加了交管部門的工作難度,甚至增添了社會不穩定因素等.因此,套牌行為已成為當前交通和治安管理中的一個難點問題.
現今城市的各個重要路口,均有車輛監控系統可以記錄車牌號碼、經過地點、時間、現場圖片等.傳統的車輛套牌監測大多基于車輛監控系統的日志信息采用人工識別、牌照識別、或射頻識別等方法識別套牌車輛.然而伴隨車輛數量和出行量地持續增加,城市交通采集范圍在逐步擴大,按現有監控系統,每天會產生以億為單位的日志記錄,不管用哪種監測方法面對海量交通數據集,必然會有成本高、效率低、實時性差等問題.因此,為了解決這些難題,國內外很多人研究將大數據技術引入到車輛套牌稽查系統中,對這些數據進行實時分析和存儲.
隨著并行計算框架的產生,關于車輛套牌的大數據研究正在興起,很多學者專家紛紛展開相關研究.以“套牌”、“大數據”為檢索詞,從 2014-2019年期間,對中國知網、萬方數據庫、維普網公開發表的有關車輛套牌大數據文章進行檢索.發現車輛套牌監測的研究主要停留在監測方法方面,大數據技術在車輛套牌方面的應用研究不足,采用的大數據分析框架也比較老.另外,研究內容主要集中在數據分析方面,數據存儲方面的研究很少.
代表性的如文獻[1]提出了一種基于歷史車牌識別數據(ANPR)集的套牌車并行檢測方法TP-Finder,實現了基于整數劃分的數據分塊策略,可準確呈現所有疑似套牌車輛的歷史行車軌跡.文獻[2]提出一種基于路段閾值表和時間滑動窗口的套牌計算模型,可以實時甄別交通數據流中的套牌嫌疑車.文獻[3]將實際車輛記錄遷移到Hadoop集群的HBase中,然后從Hive從HBase中獲取同一車牌號碼的時空分布情況,通過校正因子獲取最終的嫌疑套牌車.文獻[4,5]提出一種針對大規模數據集的分布式計算模型MapReduce.文獻[6]提出一種新的基于Hadoop的MapReduce算法模型,可以有效地解決處理海量數據時面臨的性能瓶頸問題,該算法通過引入多臺硬件計算資源協同處理大規模數據下的套牌車檢測.文獻[7]提出一種比Hadoop實時性好的分布式實時計算框架Storm.該框架具有健壯性、容錯性、動態調整并行度等特性.
通過對文獻研究可知,目前普遍做法將實際的海量數據導入或并行連接到大數據平臺上,然后再進行數據分析,進而監測套牌車輛.但這些做法的實時性普遍不足,如文獻[1-3]均采用對歷史記錄的分析,車輛運行每時每刻在發生,這種做法顯然實時性很差.文獻[4-6]中雖然考慮了實時性,但MapReduce屬于批處理算法,等數據集到一個量的時候才啟動,勢必會有一個時間延遲.因此,本文充分考慮車輛套牌監測的實時性要求,借鑒文獻[7],引入Kafka (分布式消息隊列)和Storm(分布式實時大數據處理系統)來解決海量車牌數據的實時分析和信息存儲問題,Kafka作為中間件可以為日志信息提供緩沖機會,有效緩解了數據采集和數據分析的不同步問題,提高了數據的高可用性和實時性,避免了由于服務器故障而造成數據丟失的問題.Storm中的運行的是拓撲(topology)算法相對于MapReduce而言,進程是永駐的,只要有數據就可以進行實時處理,從而可以實現實時分析,實現套牌監測的實時性、準確性.
為了實現實時分析、存儲車輛套牌信息,本文提出一個基于Kafka和Storm的車輛套牌實時分析存儲系統(簡稱車輛套牌實時分析存儲系統).從系統功能模塊的邏輯結構和劃分兩個角度分別進行描述.
該系統包含的邏輯執行過程:日志獲取、日志緩沖、數據分析和數據存儲.圖1是邏輯過程分層結構圖.

圖1 邏輯層次結構圖
數據采集主要用來收集用戶操作產生的車輛日志信息.數據緩存減少了流量超峰給系統帶來的壓力,該模塊使用Flume[8](分布式日志收集系統)、Kafka、Storm、Redis[9](云數據庫)等大數據框架搭建后端服務架構.從結構圖(如圖1)可知,道路監控系統產生的日志信息,先通過Kafka進行備份緩存,然后將Kafka、Storm進行整合,將數據導入Storm運行框架中.Storm中Spout (Storm消息源)會源源不斷地從Kafka上某個主題獲取數據,并對數據進行封裝發送到下游的Bolt (Storm消息處理者)計算節點,Bolt節點對數據進行實時計算,判斷是否出現套牌車輛,算法邏輯都會在Bolt節點中進行運算處理.最后系統將實時計算后的結果數據存儲到服務器的文件中,最后一次的車輛信息存入Redis中.
車輛套牌實時分析存儲系統的功能包括日志緩存與獲取、日志信息切分、套牌監測和信息存儲.
(1)日志緩存與獲取
通過連接道路監控系統,在數據實時采集與數據實時處理之間搭建一個Kafka消息隊列進行緩存,解決數據實時采集與數據實時處理之間速度不同步和數據丟失的問題,進一步將道路車輛監控系統產生的實時日志數據通過Kafka導入系統.其次通過Kafka與Storm進行整合,然后將數據導入Storm運行框架中.
(2)日志信息切分
對采集到數據進行切分,獲取系統需要使用的有效數據.
(3)套牌監測
通過車輛id從Redis數據庫中快速讀取道路車輛監控系統對應的最近監控歷史記錄,計算當前車輛的區間速度,如果車輛速度超過區間速度值就將對應車牌號碼標記為套牌號牌.
(4)信息存儲
從實時的道路監控記錄中提取出必要的交通信息,并將實時交通信息存入數據庫中.
本系統涉及的主要實體類包括:KafkaTopo、SplitBolt、SpeedBolt、StorageBolt、JedisPoolUtils.下面是整個系統的類圖結構,如圖2所示.
實體類介紹:
(1)KafkaTopo類主要是將Kafka與Storm進行整合,這個類既具備緩沖特點又具備實時計算的特點.
(2)JedisPoolUtils類主要是編寫對Redis進行讀寫的Java客戶端代碼,讀取連接池配置文件,從而提供訪問Redis的接口.
(3)SplitBolt類主要是對收到的日志信息進行拆分,提取有用信息,發送到SpeedBolt.
(4)SpeedBolt類主要是對SplitBolt發送來的信息與Redis中的信息進行對比,計算出動態車速,并且檢測出套牌車輛,將套牌車輛信息發送到StorageBolt.
(5)StorageBolt類主要是對SpeedBolt發送來的套牌車輛信息進行存儲.

圖2 系統類圖
KafkaTopo類是整個系統的主要控制部分,通過將數據分析與計算流程串聯起來;首先是設置Kafka的主題與配置其Broker的主機地址,通過配置可以將日志數據獲取進來;接下來設置3個Bolt組件,分別完成數據切分、動態車速計算與數據存儲的功能;最后將整個工程打成jar包提交到配置好的服務器上,來完成實時的車輛套牌檢測功能.其具體的功能時序圖如圖3所示.
本系統實現按照日志緩存與獲取、日志信息切分、套牌監測和信息存儲4個功能展開.
日志緩存與獲取是通過Kafka來實現,首先將Kafka分別安裝到3臺Linux服務器上,由于Kafka的高可用是通過Zookeeper[10]來實現的,因此需要在3臺服務器上安裝Zookeeper集群,Zookeeper集群的IP將作為Kafka的Broker的配置地址.接下來通過Storm提供的SpoutConfig來將Broker的主機地址、Kafka的主題、Zookeeper集群的服務器地址、Storm的SpoutId進行設置,從而將整個數據的接收所需要依賴的環境搭建起來.接下來對Storm從kafka獲取數據的方式進行設置,將其設置成從數據流的起始位置開始讀取數據;這些配置設置完成后,最終實例化一個Topology Builder對象來將之前的配置信息進行整合,從而使其成功獲取從車輛監控系統獲取的數據.

圖3 系統時序圖
整個數據流的緩存是通過Kafka來實現的.因為Kafka的broker結點上會有消費者機制與生產者機制,通過設置生產者所生產消息的長度來控制消費者的消費.使得整個數據流經過Kafka都會有一個緩存的過程,如果數據出現丟失,也可以通過設置ACK來實現回滾,使得消息不會產生丟失.整個Kafka的機制實現了日志信息數據流傳輸的高可用性.
其整個獲取與緩存的流程圖如圖4所示.
通過從Storm的Spout將信息發送到指定的SplitBolt進行切分字段,SplitBolt繼承BaseBasicBolt,通過對其execute()方法進行重寫來實現日志信息的切分.通過其方法的Tuple參數接收到Spout發送來的日志信息,將接收到的信息通過toString()方法轉換成字符串類型,從而進一步使用split()方法來對整個字符串信息進行切分,獲取車輛的id、車牌、坐標、行進方向、拍攝時間等字段信息.之后調用參數中Collector對象的emit()方法來將切分出來的字段信息進行封裝,發送到下一個Bolt.
整個日志信息切分過程時序圖如圖5所示.

圖4 日志信息獲取與緩存流程圖

圖5 日志信息切分時序圖
文獻[1]中提出一種利用車輛時空矛盾關系判斷套牌的算法,借鑒該算法.本文認為正常行駛的車輛在區間內的動態車速在一個限速范圍內,如果某輛車出現了套牌情況,必然會出現在相同時間點,坐標位置不同的情況,而且算出的區間動態車速遠高于標準車速,因此可斷定該車輛為可疑套牌車輛.
算法步驟:
Step 1.創建Jedis客戶端,配置相關數據,建立連接池,連接Redis數據庫接口.
Step 2.獲取當前車輛id、車牌、坐標、運行方向、獲取時間等信息
Step 3.從Redis中通過id獲取車輛信息,若有,拿出來通過動態車速對比,看是否發生套牌.若沒有,將信息存入Redis中
其主要業務邏輯是:從上一Bolt中獲取“id”,“registId”,“hangId”,“x”,“y”,“dir”,“time”和“info”信息,通過id判斷Redis中是否有該車輛信息,若沒有,則將該車輛整條信息保存進入Redis中;若有,則將車輛信息拿出來切分,從而獲取“x”,“y”,“dir”,“time”等字段信息,將兩次的信息進行對比計算出動態車速;若此車速遠大于區間車速,則該車為套牌車輛.
(1)動態車速計算
從上一Bolt中獲取車輛信息,通過車輛id判斷Redis中是否有該車輛信息,若沒有,則將該車輛整條信息保存進入Redis中;若有,則將車輛信息拿出來切分,從而獲取坐標、時間等字段信息,將兩次的信息進行對比計算出動態車速;若此車速遠大于區間車速,則該車為套牌車輛.
具體的業務處理是在execute()方法中,通過其Tuple參數來接收SplitBolt發送過來的各個字段信息和車輛的整條info日志信息.通過創建carInfo、hangId、x、y、dir、time等字段來獲取車輛信息;之后通過調用Jedis對象的get()方法,將last_hangId傳入,看返回結果是否為空,若可以獲取到車輛信息,則將獲取到的車輛信息進行再次切分,拿到上次記錄到的字段信息,分別設置為 last_x、last_y、last_dir、last_time;之后調用String對象的equals()方法來判斷相同車輛兩次的行駛方向是否相同,若相同并且為x方向,則將x于last_x相減取絕對值,其結果就是車輛的行駛路徑;將兩次時間相減取絕對值并進行單位換算,從而獲得時間.路程與時間相除即可得到車輛的動態車速.若行駛方向為y,計算方法相同.最后將車輛信息存入Redis中,替換掉之前的車輛信息.若從Redis中獲取數據為空,則說明此車輛還沒有被車輛監控系統記錄過,因此直接存入Redis中.
(2)套牌判定
獲取動態車速后,接下來就是與此路段的區間標準車速進行比較,若獲取的動態車速遠大于區間車速,則懷疑出現了兩輛相同車牌的車輛,將其判定為套牌車輛.并將此車輛信息通過調用Collector的emit()方法將套牌車輛信息發送出去.最后仍然要調用declare OutputFields()方法,來指定發送字段為info,即套牌車輛信息.
對以上整個車速計算和套牌車的檢測所做出的功能時序圖如圖6所示.

圖6 套牌監測功能時序圖
將檢測出的套牌車輛信息通過編寫輸出代碼和修改相關配置文件,存入分布式服務器上指定的文檔中,從而可以完成信息的分布式存儲.
首先,通過Tuple對象獲取SpeedBolt發送過來的套牌車輛信息,調用FileWriter的write()方法,來對日志信息進行寫操作;然后調用其flush()方法來刷新數據流,從而使得之前寫入的數據能完整輸出到指定的文件中.信息存儲的功能時序圖如圖7所示.

圖7 套牌車輛信息存儲時序圖
通過對4個功能模塊的實現過程敘述與部分功能時序圖的介紹,對系統的整個功能進行了詳細的實現.對接收到的日志信息進行了詳細的分析與計算,整個系統的業務邏輯實現完畢,接下來工程的部署模塊可以直接將工程打成jar包來進行實時的運行.至此,系統實現已全部完成.
本系統的實驗環境包括集群搭建和數據庫服務器安裝,主要包括Zookeeper集群、Kafka集群、Storm集群、Redis安裝等.實驗環境配置完畢,將工程打成jar包上傳到搭建好的集群運行,通過連接車輛模擬系統產生實時數據,可以看見Redis數據庫上存儲所有收集到的車輛信息,如圖8所示.

圖8 Redis測試數據圖
/home/hadoop/stormoutput目錄下存儲有套牌車輛嫌疑的車輛信息,如圖9所示.以這些信息為基礎,交管人員只需后期和車輛具體核實,即可按照相關交通法規進行處理.

圖9 套牌車輛信息圖
針對當前海量車輛數據套牌檢測實時性差的難點,引入了大數據技術,經過文獻對比研究,發現Kafka作為中間件進行緩存,不僅保證了數據采集的效率而且還保證了數據的高可用性.再加上,Storm比Hadoop實時性處理能力強,所以Kafka結合Storm提高了車輛套牌監測的實時性.最后通過集群環境的搭建,實驗分析發現系統性能已達到設計目標.在今后的研究工作中,將探索研究套牌識別的精準度,進而更有效地幫助交管部門完成套牌識別工作.