Topik ini menjelaskan cara memigrasikan alur tugas penjadwalan Airflow ke alur kerja DataWorks menggunakan tool migrasi penjadwalan. Prosesnya mencakup ekspor tugas Airflow, transformasi tugas penjadwalan, dan impor tugas ke DataWorks.
1. Ekspor alur tugas Airflow
Tool migrasi mengekspor metadata alur tugas penjadwalan Airflow dengan memanfaatkan library Python Airflow untuk memuat folder DAG, mengambil informasi mengenai Directed Acyclic Graph (DAG), tugas internalnya, serta dependensinya, lalu mengekspor informasi tersebut ke file JSON.
Tool migrasi mendukung ekspor alur tugas dari Airflow 2.x.
1. Lingkungan runtime
Fitur ekspor Airflow pada tool migrasi didasarkan pada MigrationX Airflow Reader (MigrationX). Tool ini memerlukan lingkungan Python 3.9.0 atau versi yang lebih baru.
Pilih salah satu lingkungan runtime yang direkomendasikan berikut ini untuk tool ekspor:
Opsi 1: Jalankan tool di lingkungan Python yang sama dengan penjadwal Airflow Anda. Lingkungan ini harus menggunakan Python 3.9.0 atau versi yang lebih baru.
Opsi 2: Siapkan lingkungan Python baru. Di lingkungan ini, instal versi library Python Airflow yang sama dengan yang digunakan di lingkungan produksi Anda. Pastikan library Airflow kompatibel dengan versi Python tersebut.
Untuk mengonfigurasi lingkungan Python baru seperti yang dijelaskan pada Opsi 2, ikuti langkah-langkah berikut. Contoh ini menggunakan Instance ECS Linux.
## Instal Conda dan buat lingkungan Python 3.9
# Unduh dan instal Conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh
# Selama instalasi, Conda akan meminta Anda menentukan direktori instalasi. Setelah instalasi, jalankan perintah berikut untuk inisialisasi.
cd <conda_installation_directory>
conda init
conda config --set auto_activate_base false
. ~/.bashrc
# Buat lingkungan Python 3.9.0
conda create -n airflow python=3.9.0
conda activate airflow
# Instal Airflow di lingkungan Conda. Contoh ini menggunakan versi 2.5.3.
pip install apache-airflow=2.5.32. Prosedur
2.1. Unduh paket tool ekspor airflow-workflow-parser.zip
Tautan unduhan: airflow-workflow-parser
2.2. Ekstrak file airflow-workflow-parser.zip
tar -zxvf airflow-workflow-parser.zip2.3. Atur PYTHONPATH
Jika Anda menjalankan tool ekspor di lingkungan Airflow Anda, Anda harus mengatur variabel lingkungan PYTHONPATH. Langkah ini dapat dilewati jika Anda menjalankan tool di lingkungan Conda.
Atur variabel lingkungan PYTHONPATH ke path direktori lib Python untuk Airflow. Contohnya:
export PYTHONPATH=/usr/local/lib/python3.6/site-packages2.4. Ekspor tugas Airflow
Parser mengambil semua DAG dari direktori yang ditentukan. Untuk setiap DAG, parser menghasilkan file DataWorks Spec—sebuah file JSON yang mendefinisikan alur tugas DataWorks.
Definisi alur tugas DataWorks disebut DataWorks Spec.
https://github.com/aliyun/dataworks-spec/tree/master
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.jsonParameter
-d menentukan direktori file DAG Airflow.
-o menentukan path file JSON hasil ekspor.
-m menentukan path file konfigurasi aturan transformasi pemetaan node. Untuk informasi lebih lanjut tentang item konfigurasi transformasi, lihat Bagian 2.4.1.
Jika eksekusi gagal, biasanya disebabkan oleh dependensi yang tidak ditemukan dalam Directed Acyclic Graph (DAG). Gunakan perintah pip untuk menginstal dependensi yang tercantum dalam pesan error.
2.4.1. Templat konfigurasi transformasi
Tool ekspor dapat mentransformasi jenis node. Anda dapat menggunakan item konfigurasi untuk memetakan operator Airflow ke jenis node DataWorks. Templat berikut disediakan sebagai referensi.
Templat konfigurasi untuk migrasi ke DataWorks di MaxCompute (sebagai referensi):
{
"workflowPathPrefix": "Airflow_import_V3/",
"typeMapping": {
"EmptyOperator": "VIRTUAL",
"DummyOperator": "VIRTUAL",
"ExternalTaskSensor": "VIRTUAL",
"BashOperator": "DIDE_SHELL",
"HiveToMySqlTransfer": "DI",
"PrestoToMySqlTransfer": "DI",
"PythonOperator": "PYTHON",
"HiveOperator": "ODPS_SQL",
"SqoopOperator": "DI",
"SparkSqlOperator": "ODPS_SQL",
"SparkSubmitOperator": "ODPS_SPARK",
"SQLExecuteQueryOperator": "MySQL",
"PostgresOperator": "Postgresql",
"MySqlOperator": "MySQL",
"default": "PYTHON"
},
"settings": {
"workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
}
}Contoh templat konfigurasi untuk migrasi ke DataWorks di EMR:
{
"workflowPathPrefix": "Airflow_import_V3/",
"typeMapping": {
"EmptyOperator": "VIRTUAL",
"DummyOperator": "VIRTUAL",
"ExternalTaskSensor": "VIRTUAL",
"BashOperator": "EMR_SHELL",
"HiveToMySqlTransfer": "DI",
"PrestoToMySqlTransfer": "DI",
"PythonOperator": "PYTHON",
"HiveOperator": "EMR_HIVE",
"SqoopOperator": "EMR_SQOOP",
"SparkSqlOperator": "EMR_SPARK_SQL",
"SparkSubmitOperator": "EMR_SPARK",
"SQLExecuteQueryOperator": "MySQL",
"PostgresOperator": "Postgresql",
"MySqlOperator": "MySQL",
"default": "PYTHON"
},
"settings": {
"workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
}
}Deskripsi parameter
Parameter | Deskripsi |
workflowPathPrefix | Path tempat alur tugas disimpan setelah diimpor ke DataWorks. |
typeMapping | Aturan pemetaan antara operator Airflow dan jenis node DataWorks. Untuk informasi lebih lanjut tentang jenis node DataWorks, lihat kelas Enumerasi ini: https://github.com/aliyun/dataworks-spec/blob/b0f4a4fd769215d5f81c0bbe990addd7498df5f4/spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/types/CodeProgramType.java#L180 |
workflow.converter.target.schedule.resGroupIdentifier | ID kelompok sumber daya DataWorks. |
Di panel navigasi sebelah kiri halaman detail ruang kerja di DataWorks, buka halaman Resource Group. Lampirkan kelompok sumber daya dan dapatkan ID-nya.
2.5. Contoh
Setelah Anda mengunduh dan mengekstrak tool, struktur direktori berikut dibuat:
.
├── airflow_workflow
│ ├── common
│ ├── connections.py
│ ├── converter
│ ├── dag_parser.py
│ ├── downloader.py
│ ├── get_dags.py
│ ├── __init__.py
│ ├── miscs
│ ├── models
│ ├── patch
│ ├── __pycache__
│ └── test
├── airflow-workflow.tgz
├── dags
│ ├── example.py
│ └── __pycache__
├── flowspec-airflowV2-transformer-config.json
├── parser.py
├── README.MD
└── resultJalankan perintah.
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.jsonLog operasional berikut dikembalikan:
Lihat hasil transformasi:
2.6. FAQ
2.6.1. Muncul error: TIMEZONE = pendulum.tz.timezone("UTC") TypeError: 'module' object is not callable
Untuk Airflow 2.x, pastikan versi pendulum adalah 2.1.2 atau lebih lama.
Untuk Airflow 3.x, library pendulum tidak diperlukan karena Airflow 3.x tidak lagi memiliki dependensi pada pendulum.
Solusi:
pip uninstall pendulum -y
pip install pendulum==2.1.22.6.2. Terjadi error saat menguraikan dependensi file DAG
Jika penguraian gagal, kegagalan biasanya disebabkan oleh dependensi khusus dalam DAG. Gunakan perintah `pip` untuk menginstal dependensi yang hilang sesuai yang tercantum dalam pesan error.
2. Impor ke DataWorks
Tool ekspor Airflow menghasilkan file JSON yang mendeskripsikan alur tugas penjadwalan. Struktur data file ini sesuai dengan DataWorks Spec.
Untuk informasi lebih lanjut, lihat Impor DataWorks Spec kustom ke DataWorks.