李 濤 馮仲科 孫素芬 程文生
(1.北京林業大學精準林業北京市重點實驗室, 北京 100083; 2.北京市農林科學院農業科技信息研究所, 北京 100097)
海量的氣象數據可以通過物聯網天氣傳感器設備或者網絡爬蟲收集,這些數據生成源以連續的方式生成大量數據[1-3],這種氣候數據大多是傳統數據處理工具和技術無法處理的結構化、半結構化和非結構化數據[4-6]。傳統數據挖掘算法和統計方法難以存儲并處理這類數據[7-9],氣候數據需要一個可擴展的分布式框架來存儲和處理,并在季節性氣候中獲得更有意義的變化信息[10-11]。雖然國內外有許多氣候氣象WebGIS數據管理分析系統,但由于氣象站和計量中心在不間斷地產生新的實時數據,這些數據在傳統WebGIS平臺中是無法進行存儲與計算的[12-15]。因此,需要可擴展的分布式地理空間WebGIS系統來分析和利用氣象數據[16]。本文結合Hadoop的分布式計算和存儲技術、地理信息系統相關技術、數據庫技術,以實際需要的設計要求,實現氣象地理信息的采集、篩選、儲存、分析、顯示應用等功能,采用瀏覽器端進行數據的展示與分析。
氣象地理信息主要包括氣象屬性信息以及對應的地理空間信息。隨著互聯網技術的飛速發展,如今可以利用網絡爬蟲技術抓取相關的網絡平臺數據,通過篩選所需要的數據并轉換為云計算可用的數據結構,便可以積累海量的可進行分布式計算的氣象地理信息數據。
通過網絡爬蟲或者物聯網天氣傳感器設備獲取到的海量氣象地理信息數據大多是非結構化的文本格式數據,也可以通過其他方式獲取可以用于氣象分析與展示的柵格影像。這些數據一般是TB級以上的數據量,由于硬件資源限制,在單機環境下是無法進行處理或存儲的[17-21]。為了解決海量氣象數據的存儲問題,通常情況下是將數據分配到多個操作系統管理磁盤中,但是該種方式不便于工作人員的管理和維護,因此迫切需要一種能夠同時管理多臺機器上文件的分布式文件管理系統[22-25]。
分布式文件管理系統種類很多,但是所有的系統都是基于一次寫入、多次查詢的情況,不支持并發與寫入情況,本文采用Hadoop體系下開源的分布式文件管理系統HDFS,其采用主從架構來管理文件,即由一個名稱節點和多個數據節點組成了一個分布式文件系統(Hadoop distributed file system,HDFS)集群。名稱節點的作用為:負責客戶端請求和響應;元數據的管理,包括查詢和修改。數據節點的作用為:存儲管理用戶文件塊數據;定期向名稱節點匯報自身所持有的塊信息,即通過心跳信息上報自身情況。
在海量氣象數據的分析計算方面,傳統單節點WebGIS系統通過擴展到集群來分布式運行,將極大地增加程序的復雜度和開發難度,因此本文引入一個分布式運算程序的MapReduce編程框架,其核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在服務器集群上。開發人員可以將絕大部分工作集中到業務邏輯開發上,而將分布式計算中的復雜度交由框架來處理[26-27]。
本系統基于Hadoop生態體系進行搭建,包括數據獲取層、云計算層、云存儲層和前端顯示層。數據獲取層的數據來源于網絡爬蟲爬取的氣象數據[28]或者物聯網天氣傳感器設備中采集的數據。云存儲層、云計算層分別是Hadoop的分布式存儲框架HDFS和分布式計算框架MapReduce,主要功能是將當前的海量空間數據進行統一格式化處理,并將其存入到分布式文件系統中,通過并行處理框架可以對包含空間屬性的氣象數據進行大數據量的快速分布式計算得到分析結果。前端顯示層則是利用Cesium進行三維可視化展現。整個平臺結構如圖1所示。

圖1 平臺結構Fig.1 Platform structure
1.2.1數據獲取模塊
從物聯網天氣傳感器設備獲取數據,或利用網絡爬蟲抓取氣象相關網頁來獲取相關數據。利用爬蟲WebDriver和PhantomJs技術相應接口對得到的網頁內容進行解析,進而獲取需要的氣象地理信息。由于得到的信息很多雜亂無章,為了獲取真實需要的信息數據,在獲取元素信息后,還需要利用正則表達式對這些屬性信息進行篩選判別,同時進行格式統一處理,最后將輸出后的數據以GeoJson或者其他文本格式合并保存。
1.2.2云存儲模塊
由于HDFS平臺不適合管理小文件,所以首先對采集到的大量小文件進行合并。
小文件合并有以下3種方式:①在采集數據時,將小文件合并為大文件再上傳至HDFS。②在業務處理之前,使用HDFS上的MapReduce程序對小文件進行合并。③在利用MapReduce處理小文件時,采用conbineInputFormat提高效率。
1.2.3云計算分析模塊
由MapReduce的工作流程可知,Hadoop下的空間數據并行操作共需要6個步驟:
(1)MapReduce程序啟動時,最先啟動的是MRAppMaster(MapReduce程序啟動節點),MRAppMaster啟動后根據本次作業的描述信息,計算出需要的Map任務實例數量,然后向集群申請啟動相應數量的Map任務進程。
(2)利用客戶指定的輸入格式來獲取RecordReader并讀取數據,形成輸入鍵值對。
(3)將輸入鍵值對傳遞給客戶定義的Map方法,做邏輯運算,并將Map方法輸出的鍵值對收集到緩存中。
(4)將緩存中的鍵值對按照鍵值分區排序后不斷溢寫到磁盤文件中。
(5)MRAppMaster監控到所有Map任務完成后,根據客戶指定的參數啟動相應數量的Reduce任務進程,并告知Reduce任務進程要處理的數據范圍,進行數據分區。
(6)Reduce任務進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干臺運行Map任務的機器上獲取若干個輸出結果文件,并在本地進行重新歸并排序,然后按照相同鍵值的鍵值對為一個組,調用客戶定義的Reduce方法進行邏輯運算,并收集運算輸出的結果鍵值對,然后調用客戶指定的輸出個數將結果數據輸出到外部存儲,通過空間數據轉換接口將結果保存成GeoJson類型數據并存儲在各個HDFS節點中,整個并行操作過程就此結束。
氣象數據計算流程圖如圖2所示。

圖2 氣象數據計算流程圖Fig.2 Flow chart of weather data calculation
1.2.4海量氣象數據結構
由于云計算需要的數據類型應該是非格式化的數據,地理信息常用的格式shp或者dbf都是格式化數據,因此不滿足云計算的數據要求。在地理空間非格式化數據中,GeoJson基于Json(Javascript 對象簡譜),數據以鍵值對的形式進行存儲,可以充分滿足這種數據結構要求,也符合開放地理空間信息聯盟(OGC)標準。另一方面,在前端進行三維可視化時,通過AJAX(異步 Javascript和XML)也可以很方便地使用這種格式。GeoJson數據的geometry屬性中的type字段包含了點、線、面、多點、多線、多面等常用的地理信息系統幾何類型,因此本研究采用GeoJson作為Hadoop中的分布式存儲管理格式。示例數據中具有地理實體的唯一標識符號id、地理實體坐標信息coordinates、地理實體的氣象屬性信息數組properties等。其數據格式為:{
"type": "FeatureCollection",
"totalFeatures": 1376,∥要素數量合計
"features": [{
"type": "Feature",∥要素類型
"id": "china_air_quality20171216_0.1",∥要素編號
"geometry": {∥幾何屬性
"type": "Point",∥幾何類型為點狀要素
"coordinates": [116.366, 39.8673]∥要素的經緯度坐標},
"geometry_name": "the_geom",
"properties": {∥要素的屬性表
"監測點編碼": "1001A","監測點名稱": "YN001","經度": 116.366,"緯度": 39.8673,"日期": 20161216,"時間": 0,"AQI": 41,"PM2_5": 14,"PM2_5_24h": 46,"PM10": 41,"PM10_24h": 41,"SO2": 1,"SO2_24h": 6,"NO2": 7,"NO2_24h": 32,"O3": 60,"O3_24h": 63,"O3_8h": 59,"O3_8h_24h": 60,"CO": 0.2,"CO_24h": 0.7}
},...],
"crs": {"type": "name",
"properties":{"name":"urn:ogc:def:crs:EPSG::4326"}}}
本文使用網絡爬蟲爬取的氣象數據包括逐日監測點編碼、監測點名稱、經緯度、獲取日期、AQI、PM2.5含量、PM10含量、SO2含量、NO2含量、O3含量、CO含量、氣溫、降水量、相對濕度、日照時數等參數。使用的數據為從爬取的數據中篩選出的2016年云南省氣象臺站歷史數據。
系統代碼編寫工具為Ecplise,版本為Mars。使用Maven作為項目管理構建工具。
獲取PhantomJSDriver的工具類實例代碼如下
public static WebDriver getPhantomJs() {
String osname=System.getProperties().getProperty("os.name");
if (osname.equals("Linux")) {
System.setProperty("phantomjs.binary.path", "/usr/bin/phantomjs");
} else {
System.setProperty("phantomjs.binary.path", "./phantomjs/win/phantomjs.exe");}
DesiredCapabilities=DesiredCapabilities.phantomjs();
desiredCapabilities.setCapability("phantomjs.page.settings.userAgent", "Mozilla/5.0 (Windows NT 6.3; Win64; x64; rv:50.0) Gecko/20100101 Firefox/50.0");
desiredCapabilities.setCapability("phantomjs.page.customHeaders.User-Agent", "Mozilla/5.0 (Windows NT 6.3; Win64;x64;rv:50.0)Gecko/20100101 Firefox/50.0");
if (Constant.isProxy) {
org.openqa.selenium.Proxy proxy=new org.openqa.selenium.Proxy(); proxy.setProxyType(org.openqa.selenium.Proxy.ProxyType.MANUAL);
proxy.setAutodetect(false);
String proxyStr="";
do {
proxyStr=ProxyUtil.getProxy();
} while (proxyStr.length()==0);
proxy.setHttpProxy(proxyStr);
desiredCapabilities.setCapability(CapabilityType.PROXY, proxy);
}
return new PhantomJSDriver(desiredCapabilities);
}try{
WebDriver=PhantomJsUtil.getPhantomJs();
webDriver.get(url);
SleepUtil.sleep(Constant.SEC_5);
PhantomJsUtil.screenshot(webDriver);
WebDriverWait wait=new WebDriverWait(webDriver, 10);
wait.until(ExpectedConditions.presenceOfElementLocated(By.id(inputId)));
Document=Jsoup.parse(webDriver.getPageSource());
}finally{
if (webDriver != null) {
webDriver.quit();}}
針對海量影像數據的存儲,利用HDFS技術對2TB左右容量的全球柵格地圖進行分節點管理。對于海量文本格式的氣象地理信息數據,利用MapReduce框架實現分布式計算功能以及云南省空氣質量監測站點空間位置及其氣象參數的快速查詢展示。
2.2.1集群部署
Linux環境下Centos版本為7.4,Hadoop版本為Hadoop3.0,JDK版本為Java 1.8_161。本次試驗采用8個服務器節點組成的集群。配置8個節點的IP地址、機器名稱以及其代表的角色、網絡配置,如表1所示。

表1 集群各節點地址及其角色Tab.1 Cluster node addresses and their roles
在Master節點上的hosts文件中添加集群中各節點的主機名和IP地址。安裝jdk1.8環境、Hadoop3.0環境到名稱節點并遠程復制到其余7個數據節點上。在數據節點上修改Hadoop目錄下的/etc/hadoop/workers為數據節點的機器名稱。最后配置Hadoop集群環境:①core-site.xml是Hadoop的核心配置文件,這里需要配置兩個屬性,fs.default.name配置Hadoop的HDFS系統的名稱,位置為主機的9000端口。hadoop.tmp.dir配置Hadoop的臨時目錄根位置。②hdfs-site.xml是HDFS的配置文件,dfs.http.address配置HDFS的http訪問位置,dfs.replication配置文件塊的副本數,一般不大于從機的個數。③配置文件mapred-site.xml是MapReduce任務的配置,由于hadoop2.x使用了Yarn框架,所以要實現分布式部署,必須在mapreduce.framework.name屬性下配置為Yarn。其中mapred.map.tasks和mapred.reduce.tasks分別為Map和Reduce的任務數。④配置節點yarn-site.xml,該文件為Yarn框架的配置,為一些任務的啟動位置。
為了方便集群的維護,Hadoop自帶了一個歷史服務器,可以通過歷史服務器查看已經運行完的MapReduce作業記錄,比如用了多少個Map或者Reduce、作業提交時間、作業啟動時間、作業完成時間等信息。默認情況下,Hadoop歷史服務器是沒有啟動的,可以通過/hadoop-3.0.0/sbin/mr-jobhistory-daemon.sh start historyserver命令來啟動Hadoop歷史服務器。這樣就可以訪問主機的19888端口,查看已經運行完的氣象數據分析作業情況。
2.2.2MapReduce分布式計算代碼
(1)Map階段代碼
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String year=line.substring(15, 19);
int airTemperature;
if(line.charAt(14)=='+' )
{airTemperature=Integer.parseInt(line.substring(11, 12));}
else{
airTemperature=Integer.parseInt(line.substring(15, 16));}
String temperature= line.substring(19, 21); if(airTemperature != MISSING && temperature.matches("[01459]"))
{ context.write(new Text(year), new IntWritable(airTemperature));}}
(2)Reduce階段代碼
public void reduce(Text key, Iterable〈IntWritable〉 values, Context context) throws IOException, InterruptedException {
int minValue=Integer.MAX_VALUE;
for(IntWritable value:values)
{minValue=Math.min(minValue, value.get()); } context.write(key, new IntWritable(minValue));}
MapReduce程序的運行步驟為:啟動HDFS和Yarn,然后在集群中的任意一臺服務器上啟動執行程序[hadoop@lt mapreduce]$ hadoop jarhadoop-mapreduce-gis.jar geojson/ncep1979-2017.gz/geojson/out。
采用Cesium框架的三維可視化進行前端結果展示,實現如下功能:①海量矢量格式數據的查詢渲染展示,利用Cesium的Entity來實現任意渲染,有諸多點、線、面渲染形式可供選擇,點擊可查看管理其屬性,也可以利用字段模糊查詢得到結果,使用Cesium的Infobox模塊可以實現,最終以三維可視化的方式展示給用戶。②發布分布式文件系統中的影像地圖為Geoserver地圖服務,并加載顯示。③利用Ceisum調用OpenStreetMap的開源興趣點(POI)搜索庫,實現氣象站點的搜索定位功能。④利用AJAX技術調用Geoserver的Web 地圖要素服務(WFS)、Web地圖服務(WMS)、地理標記語言服務(GML),可以獲取并發布通過云計算分析得到的矢量數據、元數據、圖例等信息,用于動態展示。⑤空間分析功能,在查詢輸入框內輸入需求可以獲得最終的分析結果,并且疊加在三維地球上以圖形的形式展示。⑥利用Cesium實現常用的測量、標繪等功能。⑦其他輔助性功能,如地圖縮放功能支持底圖顯示18個級別,點擊三維底圖上的矢量數據實體可以提取出該實體的所有氣象相關屬性信息并展示在右上角的小窗口中。可視化界面如圖3所示。

圖3 氣象數據三維可視化界面Fig.3 3D visualization interface of meteorological data
Cesium三維可視化渲染代碼如下
function ShowAttribute(attribute) {
viewer.entities.removeAll();
var coordinate;
var attributeSizeArray=[];
for (var i=0; i < shp_data.length; i++) {
attributeSizeArray.push(shp_data[i].properties[attribute]);
}
var minattributeSize=attributeSizeArray.min();
var maxattributeSize=attributeSizeArray.max();
for ( var i=0; i < shp_data.length; i++) {
∥判斷該屬性如果是負數,則改為正數,并且渲染為柱狀圖
var size=((shp_data[i].properties[attribute]-minattributeSize)/(maxattributeSize-minattributeSize)) *100;
∥ alert(maxattributeSize);
if (shp_data[i].geometry.type == "Point") {
∥ alert(shp_data[i].geometry.type);
coordinate=shp_data[i].geometry.coordinates;
∥ alert(coordinate);
}
if (shp_data[i].geometry.type == "MultiPolygon") {
∥ alert(shp_data[i].geometry.type);
coordinate=shp_data[i].geometry.coordinates[0][0][0];
∥ alert(coordinate);
}
addentity(coordinate,size,shp_data[i].properties[attribute],shp_data[i].properties.NAME);
}
}
本文主要針對傳統WebGIS服務器與Hadoop集群環境下海量氣象數據的存儲與計算進行性能對比。選擇8臺服務器節點作為集群運行環境,節點CPU為i5處理器,頻率為2.7 GHz,內存均為8 GB,硬盤容量為500 GB。試驗數據為1996—2016年云南省氣象信息,數據量約為4.6 GB。為了對比集群中節點個數對氣象數據存儲及管理的影響,使用4種方案配置節點,集群節點個數分別為單節點、2個節點、4個節點、8個節點,集群中部分節點啟動后的頁面如圖4所示。

圖4 集群運行頁面Fig.4 Cluster operation page
隨著節點數的變化,數據集中的氣溫最大值、最小值、平均值的計算消耗時間如圖5所示。可以看出,集群隨著節點數的增加,計算性能增加,但是節點越多,數據傳輸通信時間成本越大,因此計算性能隨節點數的增大速率降低。

圖5 計算消耗時間與節點數的關系Fig.5 Relationship of calculating time with number of nodes
隨Map任務并行度變化的集群計算性能試驗結果如圖6所示,通過試驗發現,每個節點的最優并行度為13~15個Map任務,每個Map任務的執行時間至少1 min。如果每個作業的Map任務或者 Reduce任務的運行時間都只有30~40 s,那么就減少該作業的Map任務或者Reduce任務數量。因為調度器在調度任務時,中間過程可能要花費幾秒鐘,如果每個任務都非常快就跑完了,則會浪費太多中轉調度時間。

圖6 計算消耗時間與Map任務數的關系Fig.6 Relationship of calculating time with number of Map tasks
配置作業的Java虛擬機重用可以改善上述問題,Java虛擬機重用技術不是指同一作業的兩個或兩個以上的任務可以同時運行于同一Java虛擬機上,而是排隊按順序執行。mapred.job.reuse.jvm.num.task,默認是1,表示一個Java虛擬機上最多可以順序執行的任務數目是1,也就是說一個任務啟用一個Java虛擬機。在mapred-default.xml文件中配置塊容量,如果輸入的文件非常大,比如1TB,可以考慮將HDFS上的每個塊容量設大,比如設成256 MB或者512 MB。Reduce任務的并行度同樣影響整個作業的執行并發度和執行效率,但與Map任務的并發數由切片數決定不同,Reduce任務數量可以直接手動設置,默認值是1,可以手動設置為4,即job.setNumReduceTasks(4)。如果數據分布不均勻,就有可能在Reduce任務階段產生數據傾斜,因此要注意Reduce任務數量并不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有一個Reduce任務。
如圖7所示,集群隨著節點增加,存儲性能變高,最重要的是如果數據量超出了單節點服務器硬盤容量,則無法進行存儲,而集群多節點架構可以解決這一問題。

圖7 存儲消耗時間與節點數的關系Fig.7 Relationship of storage time with number of nodes
農業數據類型包括農業生產數據、資源數據、技術數據、市場經濟數據以及政策法規數據等,分為非結構化數據和結構化數據。在地理因素和季節等因素影響下,農業數據表現出了特殊的數據離散性和實效性。隨著農業信息化程度不斷提升,我國農業現代化的步伐也隨之加快,轉型升級在逐步進行,農業及其相關數據正在被大量收集、歸納、整理。隨著農田中物聯網設備的大量布署,農業數據源源不斷地產生,形成農業大數據。該數據有以下特性:①數據量非常巨大,并且會連續產生。②因為作物生長在時間方面具有季節性,故農業大數據必須具有時效性,需及時處理數據并且反饋結果。③農業大數據的種類繁雜。④數據量巨大造成數據價值密度較低,但是價值量非常大。如果可以有效利用這些數據,將會大大加快農業信息化的進程。
利用氣象大數據分析昆明市種植適宜地,使用ID3決策樹分類方法對氣象大數據信息進行分析,決策樹分類是常用的分類挖掘模型,本次試驗在海量的AQI、PM2.5濃度、降水量、相對濕度、日照時數等大數據中挖掘、分析出具體種植適宜地,可以為農業生產適宜區域的選擇提供決策支持。
ID3分類方法是以信息增益來評判屬性,選擇屬性分裂后信息屬性增益最大的進行分裂,采用貪心思想遍歷所有決策空間。使用云計算架構,分類計算步驟為:
(1)輸入樣本屬性集A,樣本類別集B,樣本訓練集C。其中樣本訓練集C如表2所示。
(2)創建樣本節點R,如果訓練集C為空,則返回父節點中多數類標記R;如果訓練集C中樣本屬于同一類別B,則標記類B的節點R為該葉子節點;如果A為空則返回C中的多數類標記;如果計算得出了A中增益率最大的屬性為S,則用S標記節點R。
(3)根據計算出的S的值{si|i=1,2,…,m}將訓練集C分成{Ci|i=1,2,…,m}。
(4)遞歸執行ID3TREE(R-S,B,C1), ID3TREE(R-S,B,C2),…,ID3TREE(R-S,B,Cn),直至最終計算結果中的元組屬于同一類,信息增益是原信息和新的需求信息的差,樣本集信息熵的計算公式為
(1)
式中I——信息熵
Pi——訓練集C中任意元組屬于類Bi的概率輸入對應的鍵值,通過文中的云計算架構,對上述數據集進行分類屬性的選擇,建立ID3決策樹,分別得出氣溫、天氣、降水量、相對濕度等信息的信息增益,依據上述計算流程進行集群分布式運算,前端對運算結果進行可視化展示,該應用利用海量氣象信息精準判斷某種植區域是否適宜種植作物。

表2 樣本訓練集Tab.2 Agricultural data set
在實際農業生產中,氣象災害評估的實時性和準確性面臨極大考驗,氣象災害的發生會導致農作物生長受到影響,產生巨大的經濟損失。利用大數據分析則可以及時察覺即將到來的氣象災害,對災害進行分類和災害等級評估,提前采取預防措施,減少經濟損失。對此參考部分氣象災害的等級指標,建立其災害等級指標,利用本平臺大數據計算分析模塊處理海量農業氣象數據集,得到可用于氣象災害評估的信息。試驗以影響作物生長的低溫災害指數為例,利用基于最近鄰法(KNN)組合分類器分布式計算模塊,將溫度數據代入低溫災害指數公式中計算,將其轉換為低溫災害指數,其低溫災害指數為

(2)
式中f——低溫災害指數
t——當前最小溫度,℃
u、σ——正態分布參數
最后利用分類后的低溫災害風險指數進行災害等級預測以及作物受損程度評估。利用云南省1996—2016年低溫氣象數據,通過計算得到昆明市的低溫災害風險指數,建立低溫災害評估等級,如表3所示,其中1級災害作物受損較輕,還可正常生長,4級災害時,作物生長停止或者死亡,此時會發出預警,農戶則可及時做相應預防措施減少損失。

表3 低溫災害評估Tab.3 Low temperature disaster assessment
基于分布式氣象大數據分析的GIS平臺采用Hadoop體系架構,利用數據爬取技術在互聯網上獲取海量氣象數據,并通過云存儲技術進行分布式存儲,解決了傳統單節點服務器WebGIS 系統硬件受限制的問題。在氣象大數據的分析計算方面,試驗結果表明,多節點集群下效率更高,在查詢遍歷性能方面也比傳統WebGIS單節點服務器高;通過對海量氣象數據采用云計算技術進行分析可以幫助有關部門進行決策;利用Cesium對計算得到的氣象信息進行三維可視化,可以直觀看到氣象站點歷史氣象參數的變化情況和計算決策結果,但是在具體分析功能擴展方面還需要完善。