全部產品
Search
文件中心

DataWorks:Airflow -> DataWorks

更新時間:Mar 11, 2026

本文介紹了基於LHM調度遷移工具將Airflow調度任務流遷移到DataWorks Workflow的方案與操作流程,包括Airflow任務匯出、調度任務轉換、DataWorks任務匯入。

一、匯出Airflow任務流

匯出Airflow調度任務流元資訊的基本原理是,利用Airflow的Python庫載入使用者的DAG Folder,擷取DAG及其內部任務資訊與依賴關係,整理為JSON檔案匯出。

遷移工具目前支援對Airflow 2.x的任務流匯出。

1 運行環境

遷移工具的Airflow匯出能力基於MigrationX Airflow Reader(MigrationX)實現,在Python>= 3.9.0的環境中執行。

推薦的匯出工具運行環境(二選一):

1、在Airflow調度所在Python環境中運行(需滿足Python >= 3.9.0的條件)

2、準備新的Python環境並安裝與生產環境同版本的Airflow Python庫,需注意Airflow庫與Python版本的對照關係。

方案二中提到的新Python環境可參考下列流程配置(以linux ecs為例):

## 安裝conda並建立Python3.9環境

# 下載並安裝conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh

# 安裝過程中conda會要求指定安裝目錄,安裝完成後執行以下命令進行初始化
cd <conda安裝目錄>
conda init
conda config --set auto_activate_base false
. ~/.bashrc

# 建立Python3.9.0環境
conda create -n airflow python=3.9.0
conda activate airflow

# 在Conda環境中安裝airflow(以2.5.3為例)
pip install apache-airflow==2.5.3

2 操作流程

2.1 下載匯出工具包airflow-workflow-parser.zip:

下載連結:airflow-workflow-parser

2.2 解壓airflow-workflow-parser.zip:

unzip airflow-workflow-parser.zip

2.3 設定PTYHONPATH

若在Airflow環境中運行匯出工具,需配置PYTHONPATH;若在Conda環境中運行匯出工具,則可跳過此步驟。

指定PYTHONPATH到airflow的python lib目錄,例如:

export PYTHONPATH=/usr/local/lib/python3.6/site-packages

export AIRFLOW_HOME=/var/lib/airflow

## 注意,如果建立airflow沒有airflow.cfg檔案,可以通過 airflow config list --defaults 產生
export AIRFLOW_CONFIG=/var/run/cloudera-scm-agent/process/2531-airflow-AIRFLOW_SCHEDULER/airflow.cfg

2.4 執行airflow任務匯出

Parser會擷取指定目錄下的所有DAG,每個DAG對應產生一個DataWorks Spec檔案(DataWorks任務流描述定義檔案,json格式)。

DataWorks任務流描述定義 dataworks-spec
https://github.com/aliyun/dataworks-spec/tree/master
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

參數解釋:

-d 表示Airflow DAG檔案的存放目錄;

-o 表示匯出結果JSON檔案的存放路徑;

-m 表示設定檔路徑,可在其中配置節點映射轉換規則。具體轉換配置項見2.4.1小節。

若執行失敗,通常是由於DAG中有特殊依賴,按報錯使用pip命令依次安裝缺失的依賴即可。

2.4.1 轉換配置模板

匯出工具支援對節點類型進行轉換,可通過配置項將Airflow Operator與DataWorks節點類型映射,其模板如下。

遷移至DataWorks on MaxCompute的配置模板(參考配置):

{
  "workflowPathPrefix": "Airflow匯入_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "DIDE_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "ODPS_SQL",
    "SqoopOperator": "DI",
    "SparkSqlOperator": "ODPS_SQL",
    "SparkSubmitOperator": "ODPS_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

至DataWorks on EMR的配置模板(參考配置):

{
  "workflowPathPrefix": "Airflow匯入_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "EMR_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "EMR_HIVE",
    "SqoopOperator": "EMR_SQOOP",
    "SparkSqlOperator": "EMR_SPARK_SQL",
    "SparkSubmitOperator": "EMR_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

參數說明:

參數名

描述

workflowPathPrefix

任務流匯入DataWorks後的儲存位置。

typeMapping

Airflow Operator與DataWorks節點類型的映射規則。

DataWorks節點類型可參考此枚舉類:https://github.com/aliyun/dataworks-spec/blob/b0f4a4fd769215d5f81c0bbe990addd7498df5f4/spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/types/CodeProgramType.java#L180

workflow.converter.target.schedule.resGroupIdentifier

DataWorks資源群組Id

由DataWorks工作空間詳情頁左側功能表列進入資源群組頁面,綁定資源群組,並擷取資源群組ID。

2.5 操作樣本

下載工具到本地後,解壓縮得到如下目錄:

.
├── airflow_workflow
│   ├── common
│   ├── connections.py
│   ├── converter
│   ├── dag_parser.py
│   ├── downloader.py
│   ├── get_dags.py
│   ├── __init__.py
│   ├── miscs
│   ├── models
│   ├── patch
│   ├── __pycache__
│   └── test
├── airflow-workflow.tgz
├── dags
│   ├── example.py
│   └── __pycache__
├── flowspec-airflowV2-transformer-config.json
├── parser.py
├── README.MD
└── result

執行命令。

python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

作業記錄如下:

查看轉換結果:

2.6 常見問題

2.6.1 報錯 TIMEZONE = pendulum.tz.timezone("UTC") TypeError: 'module' object is not callable

如果是Airflow 2.x,需確保 pendulum 版本 ≤ 2.1.2。

如果是Airflow 3.x,則無需 pendulum(Airflow 3.x 已移除對 pendulum 的依賴)。

解決辦法:

pip uninstall pendulum -y
pip install pendulum==2.1.2
2.6.2 dag檔案解析依賴報錯

如果解析失敗, 一般是dag中有相對應特殊依賴,按照報錯使用pip依次安裝缺失的依賴即可。

二、匯入DataWorks

Airflow匯出工具將輸出調度任務流的描述檔案,JSON格式,其資料結構符合DataWorks Spec規範。

使用者可參考如下文檔完成DataWorks匯入:自訂DataWorks Spec匯入DataWorks