全部产品
Search
文档中心

OpenLake:Migrasi dari Airflow ke DataWorks

更新时间:Jan 31, 2026

Topik ini menjelaskan cara memigrasikan alur tugas penjadwalan Airflow ke alur kerja DataWorks menggunakan tool migrasi penjadwalan. Prosesnya mencakup ekspor tugas Airflow, transformasi tugas penjadwalan, dan impor tugas ke DataWorks.

1. Ekspor alur tugas Airflow

Tool migrasi mengekspor metadata alur tugas penjadwalan Airflow dengan memanfaatkan library Python Airflow untuk memuat folder DAG, mengambil informasi mengenai Directed Acyclic Graph (DAG), tugas internalnya, serta dependensinya, lalu mengekspor informasi tersebut ke file JSON.

Tool migrasi mendukung ekspor alur tugas dari Airflow 2.x.

1. Lingkungan runtime

Fitur ekspor Airflow pada tool migrasi didasarkan pada MigrationX Airflow Reader (MigrationX). Tool ini memerlukan lingkungan Python 3.9.0 atau versi yang lebih baru.

Pilih salah satu lingkungan runtime yang direkomendasikan berikut ini untuk tool ekspor:

Opsi 1: Jalankan tool di lingkungan Python yang sama dengan penjadwal Airflow Anda. Lingkungan ini harus menggunakan Python 3.9.0 atau versi yang lebih baru.

Opsi 2: Siapkan lingkungan Python baru. Di lingkungan ini, instal versi library Python Airflow yang sama dengan yang digunakan di lingkungan produksi Anda. Pastikan library Airflow kompatibel dengan versi Python tersebut.

Untuk mengonfigurasi lingkungan Python baru seperti yang dijelaskan pada Opsi 2, ikuti langkah-langkah berikut. Contoh ini menggunakan Instance ECS Linux.

## Instal Conda dan buat lingkungan Python 3.9

# Unduh dan instal Conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh

# Selama instalasi, Conda akan meminta Anda menentukan direktori instalasi. Setelah instalasi, jalankan perintah berikut untuk inisialisasi.
cd <conda_installation_directory>
conda init
conda config --set auto_activate_base false
. ~/.bashrc

# Buat lingkungan Python 3.9.0
conda create -n airflow python=3.9.0
conda activate airflow

# Instal Airflow di lingkungan Conda. Contoh ini menggunakan versi 2.5.3.
pip install apache-airflow=2.5.3

2. Prosedur

2.1. Unduh paket tool ekspor airflow-workflow-parser.zip

Tautan unduhan: airflow-workflow-parser

2.2. Ekstrak file airflow-workflow-parser.zip

tar -zxvf airflow-workflow-parser.zip

2.3. Atur PYTHONPATH

Jika Anda menjalankan tool ekspor di lingkungan Airflow Anda, Anda harus mengatur variabel lingkungan PYTHONPATH. Langkah ini dapat dilewati jika Anda menjalankan tool di lingkungan Conda.

Atur variabel lingkungan PYTHONPATH ke path direktori lib Python untuk Airflow. Contohnya:

export PYTHONPATH=/usr/local/lib/python3.6/site-packages

2.4. Ekspor tugas Airflow

Parser mengambil semua DAG dari direktori yang ditentukan. Untuk setiap DAG, parser menghasilkan file DataWorks Spec—sebuah file JSON yang mendefinisikan alur tugas DataWorks.

Definisi alur tugas DataWorks disebut DataWorks Spec.
https://github.com/aliyun/dataworks-spec/tree/master
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

Parameter

-d menentukan direktori file DAG Airflow.

-o menentukan path file JSON hasil ekspor.

-m menentukan path file konfigurasi aturan transformasi pemetaan node. Untuk informasi lebih lanjut tentang item konfigurasi transformasi, lihat Bagian 2.4.1.

Jika eksekusi gagal, biasanya disebabkan oleh dependensi yang tidak ditemukan dalam Directed Acyclic Graph (DAG). Gunakan perintah pip untuk menginstal dependensi yang tercantum dalam pesan error.

2.4.1. Templat konfigurasi transformasi

Tool ekspor dapat mentransformasi jenis node. Anda dapat menggunakan item konfigurasi untuk memetakan operator Airflow ke jenis node DataWorks. Templat berikut disediakan sebagai referensi.

Templat konfigurasi untuk migrasi ke DataWorks di MaxCompute (sebagai referensi):

{
  "workflowPathPrefix": "Airflow_import_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "DIDE_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "ODPS_SQL",
    "SqoopOperator": "DI",
    "SparkSqlOperator": "ODPS_SQL",
    "SparkSubmitOperator": "ODPS_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

Contoh templat konfigurasi untuk migrasi ke DataWorks di EMR:

{
  "workflowPathPrefix": "Airflow_import_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "EMR_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "EMR_HIVE",
    "SqoopOperator": "EMR_SQOOP",
    "SparkSqlOperator": "EMR_SPARK_SQL",
    "SparkSubmitOperator": "EMR_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

Deskripsi parameter

Parameter

Deskripsi

workflowPathPrefix

Path tempat alur tugas disimpan setelah diimpor ke DataWorks.

typeMapping

Aturan pemetaan antara operator Airflow dan jenis node DataWorks.

Untuk informasi lebih lanjut tentang jenis node DataWorks, lihat kelas Enumerasi ini: https://github.com/aliyun/dataworks-spec/blob/b0f4a4fd769215d5f81c0bbe990addd7498df5f4/spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/types/CodeProgramType.java#L180

workflow.converter.target.schedule.resGroupIdentifier

ID kelompok sumber daya DataWorks.

Di panel navigasi sebelah kiri halaman detail ruang kerja di DataWorks, buka halaman Resource Group. Lampirkan kelompok sumber daya dan dapatkan ID-nya.

2.5. Contoh

Setelah Anda mengunduh dan mengekstrak tool, struktur direktori berikut dibuat:

.
├── airflow_workflow
│   ├── common
│   ├── connections.py
│   ├── converter
│   ├── dag_parser.py
│   ├── downloader.py
│   ├── get_dags.py
│   ├── __init__.py
│   ├── miscs
│   ├── models
│   ├── patch
│   ├── __pycache__
│   └── test
├── airflow-workflow.tgz
├── dags
│   ├── example.py
│   └── __pycache__
├── flowspec-airflowV2-transformer-config.json
├── parser.py
├── README.MD
└── result

Jalankan perintah.

python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

Log operasional berikut dikembalikan:

Lihat hasil transformasi:

2.6. FAQ

2.6.1. Muncul error: TIMEZONE = pendulum.tz.timezone("UTC") TypeError: 'module' object is not callable

Untuk Airflow 2.x, pastikan versi pendulum adalah 2.1.2 atau lebih lama.

Untuk Airflow 3.x, library pendulum tidak diperlukan karena Airflow 3.x tidak lagi memiliki dependensi pada pendulum.

Solusi:

pip uninstall pendulum -y
pip install pendulum==2.1.2
2.6.2. Terjadi error saat menguraikan dependensi file DAG

Jika penguraian gagal, kegagalan biasanya disebabkan oleh dependensi khusus dalam DAG. Gunakan perintah `pip` untuk menginstal dependensi yang hilang sesuai yang tercantum dalam pesan error.

2. Impor ke DataWorks

Tool ekspor Airflow menghasilkan file JSON yang mendeskripsikan alur tugas penjadwalan. Struktur data file ini sesuai dengan DataWorks Spec.

Untuk informasi lebih lanjut, lihat Impor DataWorks Spec kustom ke DataWorks.