Airflow adalah alat open-source populer yang menyediakan utilitas command line lengkap dan antarmuka pengguna (UI) yang mudah digunakan untuk mengatur dan menjadwalkan beban kerja sebagai DAG. Gunakan Airflow untuk mengelola pekerjaan ETL dan alur kerja data real-time di AnalyticDB for MySQL guna mengotomatiskan pemrosesan data dan meningkatkan efisiensi.
Prasyarat
Kluster AnalyticDB for MySQL Edisi Perusahaan, Edisi Dasar, atau Edisi Data Lakehouse telah dibuat.
-
Airflow telah diinstal. Untuk informasi selengkapnya, lihat dokumentasi Airflow.
-
Pastikan alamat IP server Airflow berada dalam daftar putih alamat IP kluster AnalyticDB for MySQL. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP.
Prosedur
-
Periksa apakah provider apache-airflow-providers-mysql telah diinstal.
-
Di UI Airflow, klik .
-
Di halaman Providers, periksa apakah apache-airflow-providers-mysql ada dalam daftar.
-
(Opsional) Jika provider apache-airflow-providers-mysql tidak ada dalam daftar, jalankan perintah berikut untuk menginstal it:
pip install apache-airflow-providers-mysqlPentingJika muncul error
OSError: mysql_config not found, jalankan perintahyum install mysql-develuntuk menginstal file pengembangan MySQL. Kemudian, jalankan kembali perintah instalasi untuk apache-airflow-providers-mysql.
-
-
Buat koneksi.
-
Di UI Airflow, klik .
-
Klik ikon
. Di halaman Add Connection, konfigurasikan parameter berikut.Parameter
Deskripsi
Connection ID
ID unik untuk koneksi tersebut.
Connection type
Pilih MySQL.
Host
Titik akhir kluster AnalyticDB for MySQL. Temukan titik akhir ini di halaman Cluster Information di Konsol.
Login
Username untuk AnalyticDB for MySQL.
Password
Password untuk AnalyticDB for MySQL.
Port
Port AnalyticDB for MySQL kluster AnalyticDB for MySQL. Nilainya tetap pada 3306.
CatatanParameter lain bersifat opsional. Konfigurasikan sesuai kebutuhan.
-
-
Buka direktori instalasi Airflow dan periksa parameter dags_folder dalam file
airflow.cfg.-
Buka direktori instalasi Airflow.
cd /root/airflow -
Periksa parameter dags_folder dalam file
airflow.cfg.cat airflow.cfg -
(Opsional) Jika folder tidak ada di path yang ditentukan oleh parameter dags_folder, jalankan perintah
mkdiruntuk membuat folder tersebut.CatatanMisalnya, jika path untuk dags_folder adalah
/root/airflow/dagstetapi folderdagstidak ada di direktori/root/airflow, buatlah folder tersebut.
-
-
Buat file DAG, misalnya
mysql_dags.py:from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', } dag = DAG( 'example_mysql', default_args=default_args, start_date=days_ago(2), tags=['example'], ) mysql_test = MySqlOperator( task_id='mysql_test', mysql_conn_id='test', sql='SHOW DATABASES;', dag=dag, ) mysql_test_task = MySqlOperator( task_id='mysql_test_task', mysql_conn_id='test', sql='SELECT * FROM test;', dag=dag, ) mysql_test >> mysql_test_task if __name__ == "__main__": dag.cli()Tabel berikut menjelaskan parameter-parameter tersebut.
-
mysql_conn_id: ID koneksi yang Anda buat pada Langkah 2. -
sql: pernyataan SQL yang ingin Anda eksekusi.
Untuk informasi selengkapnya tentang parameter tersebut, lihat dokumentasi Airflow.
-
-
Di UI Airflow, temukan DAG Anda dan klik ikon
di kolom Actions untuk menjalankannya.Setelah DAG dijalankan, klik lingkaran hijau di kolom Runs untuk melihat detail eksekusi.
Angka "1" dalam lingkaran hijau di samping example_mysql menunjukkan satu kali eksekusi DAG berhasil.
Di halaman Task Instances, Anda dapat melihat tiga catatan tugas. Jika State bernilai success, Dag ID adalah example_mysql, Task ID adalah mysql_test_task, dan Operator adalah MySqlOperator, maka tugas tersebut berhasil dijalankan.
PentingSecara default, Airflow menggunakan zona waktu Coordinated Universal Time (UTC). Artinya, waktu eksekusi yang ditampilkan tertinggal 8 jam dari China Standard Time (UTC+8).