本文介紹了基於LHM調度遷移工具將Airflow調度任務流遷移到DataWorks Workflow的方案與操作流程,包括Airflow任務匯出、調度任務轉換、DataWorks任務匯入。
一、匯出Airflow任務流
匯出Airflow調度任務流元資訊的基本原理是,利用Airflow的Python庫載入使用者的DAG Folder,擷取DAG及其內部任務資訊與依賴關係,整理為JSON檔案匯出。
遷移工具目前支援對Airflow 2.x的任務流匯出。
1 運行環境
遷移工具的Airflow匯出能力基於MigrationX Airflow Reader(MigrationX)實現,在Python>= 3.9.0的環境中執行。
推薦的匯出工具運行環境(二選一):
1、在Airflow調度所在Python環境中運行(需滿足Python >= 3.9.0的條件)
2、準備新的Python環境並安裝與生產環境同版本的Airflow Python庫,需注意Airflow庫與Python版本的對照關係。
方案二中提到的新Python環境可參考下列流程配置(以linux ecs為例):
## 安裝conda並建立Python3.9環境
# 下載並安裝conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh
# 安裝過程中conda會要求指定安裝目錄,安裝完成後執行以下命令進行初始化
cd <conda安裝目錄>
conda init
conda config --set auto_activate_base false
. ~/.bashrc
# 建立Python3.9.0環境
conda create -n airflow python=3.9.0
conda activate airflow
# 在Conda環境中安裝airflow(以2.5.3為例)
pip install apache-airflow==2.5.32 操作流程
2.1 下載匯出工具包airflow-workflow-parser.zip:
2.2 解壓airflow-workflow-parser.zip:
unzip airflow-workflow-parser.zip2.3 設定PTYHONPATH
若在Airflow環境中運行匯出工具,需配置PYTHONPATH;若在Conda環境中運行匯出工具,則可跳過此步驟。
指定PYTHONPATH到airflow的python lib目錄,例如:
export PYTHONPATH=/usr/local/lib/python3.6/site-packages
export AIRFLOW_HOME=/var/lib/airflow
## 注意,如果建立airflow沒有airflow.cfg檔案,可以通過 airflow config list --defaults 產生
export AIRFLOW_CONFIG=/var/run/cloudera-scm-agent/process/2531-airflow-AIRFLOW_SCHEDULER/airflow.cfg2.4 執行airflow任務匯出
Parser會擷取指定目錄下的所有DAG,每個DAG對應產生一個DataWorks Spec檔案(DataWorks任務流描述定義檔案,json格式)。
DataWorks任務流描述定義 dataworks-spec
https://github.com/aliyun/dataworks-spec/tree/master
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json參數解釋:
-d 表示Airflow DAG檔案的存放目錄;
-o 表示匯出結果JSON檔案的存放路徑;
-m 表示設定檔路徑,可在其中配置節點映射轉換規則。具體轉換配置項見2.4.1小節。
若執行失敗,通常是由於DAG中有特殊依賴,按報錯使用pip命令依次安裝缺失的依賴即可。
2.4.1 轉換配置模板
匯出工具支援對節點類型進行轉換,可通過配置項將Airflow Operator與DataWorks節點類型映射,其模板如下。
遷移至DataWorks on MaxCompute的配置模板(參考配置):
{
"workflowPathPrefix": "Airflow匯入_V3/",
"typeMapping": {
"EmptyOperator": "VIRTUAL",
"DummyOperator": "VIRTUAL",
"ExternalTaskSensor": "VIRTUAL",
"BashOperator": "DIDE_SHELL",
"HiveToMySqlTransfer": "DI",
"PrestoToMySqlTransfer": "DI",
"PythonOperator": "PYTHON",
"HiveOperator": "ODPS_SQL",
"SqoopOperator": "DI",
"SparkSqlOperator": "ODPS_SQL",
"SparkSubmitOperator": "ODPS_SPARK",
"SQLExecuteQueryOperator": "MySQL",
"PostgresOperator": "Postgresql",
"MySqlOperator": "MySQL",
"default": "PYTHON"
},
"settings": {
"workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
}
}至DataWorks on EMR的配置模板(參考配置):
{
"workflowPathPrefix": "Airflow匯入_V3/",
"typeMapping": {
"EmptyOperator": "VIRTUAL",
"DummyOperator": "VIRTUAL",
"ExternalTaskSensor": "VIRTUAL",
"BashOperator": "EMR_SHELL",
"HiveToMySqlTransfer": "DI",
"PrestoToMySqlTransfer": "DI",
"PythonOperator": "PYTHON",
"HiveOperator": "EMR_HIVE",
"SqoopOperator": "EMR_SQOOP",
"SparkSqlOperator": "EMR_SPARK_SQL",
"SparkSubmitOperator": "EMR_SPARK",
"SQLExecuteQueryOperator": "MySQL",
"PostgresOperator": "Postgresql",
"MySqlOperator": "MySQL",
"default": "PYTHON"
},
"settings": {
"workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
}
}參數說明:
參數名 | 描述 |
workflowPathPrefix | 任務流匯入DataWorks後的儲存位置。 |
typeMapping | Airflow Operator與DataWorks節點類型的映射規則。 DataWorks節點類型可參考此枚舉類:https://github.com/aliyun/dataworks-spec/blob/b0f4a4fd769215d5f81c0bbe990addd7498df5f4/spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/types/CodeProgramType.java#L180 |
workflow.converter.target.schedule.resGroupIdentifier | DataWorks資源群組Id |
由DataWorks工作空間詳情頁左側功能表列進入資源群組頁面,綁定資源群組,並擷取資源群組ID。
2.5 操作樣本
下載工具到本地後,解壓縮得到如下目錄:
.
├── airflow_workflow
│ ├── common
│ ├── connections.py
│ ├── converter
│ ├── dag_parser.py
│ ├── downloader.py
│ ├── get_dags.py
│ ├── __init__.py
│ ├── miscs
│ ├── models
│ ├── patch
│ ├── __pycache__
│ └── test
├── airflow-workflow.tgz
├── dags
│ ├── example.py
│ └── __pycache__
├── flowspec-airflowV2-transformer-config.json
├── parser.py
├── README.MD
└── result執行命令。
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json作業記錄如下:
查看轉換結果:
2.6 常見問題
2.6.1 報錯 TIMEZONE = pendulum.tz.timezone("UTC") TypeError: 'module' object is not callable
如果是Airflow 2.x,需確保 pendulum 版本 ≤ 2.1.2。
如果是Airflow 3.x,則無需 pendulum(Airflow 3.x 已移除對 pendulum 的依賴)。
解決辦法:
pip uninstall pendulum -y
pip install pendulum==2.1.22.6.2 dag檔案解析依賴報錯
如果解析失敗, 一般是dag中有相對應特殊依賴,按照報錯使用pip依次安裝缺失的依賴即可。
二、匯入DataWorks
Airflow匯出工具將輸出調度任務流的描述檔案,JSON格式,其資料結構符合DataWorks Spec規範。
使用者可參考如下文檔完成DataWorks匯入:自訂DataWorks Spec匯入DataWorks