秦子實
摘要:隨著機器學習算法在工業領域的大規模應用,企業內部網絡急需部署一個可以用于機器學習算法驗證、調試以及應用的系統。該系統應具備足夠的算力,同時支持研發人員并發運行多個計算任務,并在計算任務結束后返回結果。該文將使用消息隊列和分布式進程調度框架設計一個滿足企業內部需求的分布式機器學習平臺。
關鍵詞:分布式系統;機器學習;消息隊列
中圖分類號:TP393 文獻標識碼:A 文章編號:1009-3044(2018)01-0201-02
1 概述
隨著人工智能應用的日益成熟,越來越多的工業領域使用了機器學習算法解決實際問題。在企業的日常運作中會產生大量的業務數據,研究人員使用機器學習算法從這些業務數據中挖掘出有更價值的信息。因此,企業內部中需要一個能夠支持研發人員并發運行機器學習算法的平臺,本文將使用Python作為主要編程語言,利用Celery分布式進程調度系統分配計算任務、利用Redis臨時存儲計算作業臨時消息及計算結果、利用TensorFlow/Keras執行機器學習算法,最終通過Flask將平臺發布為網頁應用,向研發人員提供機器學習算法驗證、調試等服務。
2 Celery技術簡介
Celery是一個架構簡潔、配置靈活且具有高可用性的分布式任務隊列框架,它擅長通過分布式系統并行處理大量作業,并提供維護此類系統的必要工具。Celery專注于實時處理任務隊列,同時也支持任務調度。
Celery的架構由三部分組成,消息隊列系統(message broker)、任務執行單元(celery worker)和作業結果儲存(task store backend)。Celery本身雖然不提供消息服務,但是兼容大部分常見的消息隊列(如Redis、RabbitMQ等)。Celery worker可以運行在分布式系統的節點上。作業結果可以以內存數據庫(如Redis、memcached)、數據庫(MongoDB)、對象關系映射(SQLAlchemy、DjangoORM)等多種方式進行數據持久化。
此外,Celery還可以與gevent等框架集成,以支持大規模并發特性。同時也支持諸如pickle、json、yaml等多種序列化格式。
3 系統結構設計
分布式機器學習系統通過Celery進行作業調度,Celery接受前端的任務(例如通過Flask接受用戶提交的任務)后,在多臺運行Celery進程的主機上分發作業并執行。由于機器學習算法普遍具有運算量大、作業時間長等特點,所以各執行中的作業應定期將作業進度回寫至消息隊列或持久化系統中,以在前端(如用戶在Flask上的提交作業的頁面)實時更新作業進度。在作業結束時Celery worker將結果寫入消息隊列或持久化系統,前端可以從中讀取結果。
3 系統結構設計
本文以Flask項目為例組織代碼,使用Redis作為消息隊列,使用SQLAlchemy進行數據持久化,使用TensorFlow/Keras作為機器學習計算平臺。
3.1 Flask項目結構
Flask項目的源碼結構如下:
[- config.py
- run.py
- app/
|- app/__init__.py
|- app/models.py
|- app/views.py
- preprocess/
|- preprocess/__init__.py
|- preprocess/models.py
|- preprocess/views.py
- analysis/
|- analysis/__init__.py
|- analysis/models.py
|- analysis/views.py
|- analysis/datamodel.py
|- analysis/tasks.py ]
一個典型的分布式機器學習項目至少應包含項目配置(config.py)、自動化腳本(run.py)、用戶及權限(app模塊)、數據預處理(preprocess模塊)、模型訓練(analysis模塊)這五部分:
1) 項目配置包括Flask、SQLAlchemy、Redis、Celery、TensorFlow/Keras等模塊的全局配置,以及項目相關常量(如模型保存路徑、上傳類型限制等);
2) 自動化腳本包括項目初始化、部署、啟動、停止、升級、調試等命令行腳本;
3) 用戶及權限是B/S系統的必備功能,包括用戶-角色的多對多映射、用戶-角色的增刪改等功能;
4) 數據預處理模塊負責對用戶上傳的數據進行結構化處理,并將結構化數據寫入內存數據庫等持久化系統,以供之后的機器學習算法或前端可視化模塊快速調用;
5) 模型訓練模塊應具備模型建立(模型結構及模型規模參數可配置)、訓練配置(批訓練集及迭代次數等參數可調整)、訓練作業監控(模型誤差及迭代次數實時消息回寫)、訓練結果保存(模型及模型運算結果保存)等機器學習系統常見功能。
3.2 Redis消息隊列
Redis系統部署在消息服務器上,作為消息中間人(Message Broker)的角色,配合Celery在分布式系統中調度并發的機器學習作業。
此外,Redis本身具備鍵值對內存數據庫的功能,利用Redis極高的并發讀寫速度,可以用于暫存Celery作業的中間狀態,以供前端實時監控機器學習計算作業的進度。endprint
Redis以消息中間人的形式集成在Celery中:
[# - app/
# |- app/__init__.py
from celery import Celery
celery = Celery('mltasks', broker='redis://
使用Redis在Flask項目中進行數據保存與讀取:
[from redis import Redis
rds = Redis(host='
...
rds.set(result_uuid, result)
...
result = rds.get(result_uuid) ]
3.3 Celery進程調度
Celery進程運行在分布式系統的所有主機上,各Celery進程通過Redis消息隊列交換信息,協調資源。
在Flask項目中,Celery通常在程序入口處初始化:
[# - app/
# |- app/__init__.py
from celery import Celery
celery = Celery('mltasks', broker='redis://
項目中涉及機器學習模型訓練的代碼應該放在Celery作業中運行,這部分代碼應集中管理在作業模塊中,將每一個耗時計算封裝為獨立的函數,并給函數添加@celery.task修飾符,供Flask項目代碼異步調用。
函數首先使用訓練集訓練模型,訓練過程產生的日志可以通過“logging_uuid”參數實時寫入Redis消息隊列供前端顯示訓練進度;然后將訓練完成的模型保存至文件系統,同時使用訓練好的模型對驗證集進行預測,以查看模型泛化性能;最后將驗證結果保存在Redis消息系統中,供前端顯示訓練結果:
[# - analysis/
# |- analysis/tasks.py
from app import celery
from app import rds
@celery.task
def training_task(data, training_uuid, logging_uuid, epochs):
model = SomeModel()
vs = data['validate_set']
ts = data['training_set']
# 使用訓練集訓練模型,并設置此次訓練的日志回寫地址為logging_uuid
training_history = model.train(ts, logging_uuid, epochs)
# 使用驗證集測試模型
predict = model.predict(vs)
# 保存模型
model.save()
# 將訓練結果保存至Redis系統
training_result = json.dumps({'status': 'success',
'predict': predict})
rds.set(training_uuid, training_result)
# 將訓練日志狀態設為訓練完成,以通知前端更新顯示
logging_result = json.dumps({'model_state': 'trained'})
rds.set(logging_uuid, logging_result) ]
類似的耗時操作,如訓練模型、保存模型、加載模型、使用模型預測等耗時操作均可封裝在上述Celery作業中,當Flask項目需要執行這些耗時操作時,使用Celery作業提供的“delay”異步調用:
[# - analysis/
# |- analysis/views.py
from app import celery
from app import rds
from flask import request
from flask import Response
from config import HOST_ID
from analysis.tasks import training_task
import uuid
@app.route('/api/v1/analysis/somedata/
def data_model_training_service(data_id):
if request.args.get('action') == 'train':
data = some_get_data_function(data_id)
# 為celery的各作業生成唯一的uuid
training_uuid = HOST_ID + str(uuid.uuid1())
logging_uuid = HOST_ID + str(uuid.uuid1())
# 使用celery異步執行作業
training_task.delay(data, training_uuid, logging_uuid, epochs=int(request.args.get('epochs')))
# 執行其他操作并返回
return Response(json.dumps ({'status': 'success'})) ]
上述函數以異步的方式執行模型訓練,執行后即刻返回,并指定Redis系統中的日志回寫地址以及訓練結果回寫地址。使用“celery.task.delay”方式調用的函數均運行在Celery worker中,“delay”函數的參數列表就是“@celery.task”修飾符修飾對象的參數列表,每次“delay”調用均使用獨立進程。
這種異步調用可以保證在Flask項目中該路由請求不會被耗時操作阻塞,導致前端界面無響應。為了配合使用這類耗時操作的異步調用,前端及Flask項目需要改變異步操作相關代碼的編寫模式:
1) 每一個耗時操作的代碼應至少分為兩部分,第一部分負責設置回寫地址并異步執行耗時操作;第二部分負責從日志回寫地址中取回耗時操作執行進度。
2) 當前端需要執行耗時操作時,向Flask項目發送開始執行的請求,Flask生成唯一的Redis回寫地址并使用“delay”函數異步調用,在響應中通知前端回寫地址。
3) 前端程序應定期向Flask項目發送請求,使用日志回寫地址查詢日志信息,并在界面中跟新耗時操作的執行進度。
使用日志回寫地址取回耗時操作執行進度信息的代碼示例如下:
[# - analysis/
# |- analysis/views.py
from app import rds
@app.route('/api/v1/analysis/somedata/
def data_model_log_service(data_id):
return Response(r.get(request.args.get('redisLoggingTaskID'))) ]
完成各耗時操作函數的編寫后,將Flask項目及其運行環境分發在分布式系統的其他主機上,之后在這些主機上分別啟動celery worker,這些進程將通過Redis主機上的消息隊列交換數據并進行進程調度:
[celery worker -A app.celery --loglevel=debug ]
4 結束語
本文介紹了使用Celery分布式進程調度系統搭建用于企業內部網絡的機器學習計算平臺。該平臺架構簡潔,具備較強的可擴展性,容易通過添加主機的方式線性提升系統計算能力,在企業內部的機器學習算法驗證、調試及應用中發揮了重要作用,是一種易于部署實現的平臺。