All Products
Search
Document Center

AnalyticDB:Orkestrasi SQL AnalyticDB for MySQL dengan Airflow

Last Updated:Jun 22, 2026

Airflow adalah alat open-source populer yang menyediakan utilitas command line lengkap dan antarmuka pengguna (UI) yang mudah digunakan untuk mengatur dan menjadwalkan beban kerja sebagai DAG. Gunakan Airflow untuk mengelola pekerjaan ETL dan alur kerja data real-time di AnalyticDB for MySQL guna mengotomatiskan pemrosesan data dan meningkatkan efisiensi.

Prasyarat

  • Kluster AnalyticDB for MySQL Edisi Perusahaan, Edisi Dasar, atau Edisi Data Lakehouse telah dibuat.

  • Airflow telah diinstal. Untuk informasi selengkapnya, lihat dokumentasi Airflow.

  • Pastikan alamat IP server Airflow berada dalam daftar putih alamat IP kluster AnalyticDB for MySQL. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih alamat IP.

Prosedur

  1. Periksa apakah provider apache-airflow-providers-mysql telah diinstal.

    1. Di UI Airflow, klik Admin > Providers.

    2. Di halaman Providers, periksa apakah apache-airflow-providers-mysql ada dalam daftar.

    3. (Opsional) Jika provider apache-airflow-providers-mysql tidak ada dalam daftar, jalankan perintah berikut untuk menginstal it:

      pip install apache-airflow-providers-mysql
      Penting

      Jika muncul error OSError: mysql_config not found, jalankan perintah yum install mysql-devel untuk menginstal file pengembangan MySQL. Kemudian, jalankan kembali perintah instalasi untuk apache-airflow-providers-mysql.

  2. Buat koneksi.

    1. Di UI Airflow, klik Admin > Connections.

    2. Klik ikon image. Di halaman Add Connection, konfigurasikan parameter berikut.

      Parameter

      Deskripsi

      Connection ID

      ID unik untuk koneksi tersebut.

      Connection type

      Pilih MySQL.

      Host

      Titik akhir kluster AnalyticDB for MySQL. Temukan titik akhir ini di halaman Cluster Information di Konsol.

      Login

      Username untuk AnalyticDB for MySQL.

      Password

      Password untuk AnalyticDB for MySQL.

      Port

      Port AnalyticDB for MySQL kluster AnalyticDB for MySQL. Nilainya tetap pada 3306.

      Catatan

      Parameter lain bersifat opsional. Konfigurasikan sesuai kebutuhan.

  3. Buka direktori instalasi Airflow dan periksa parameter dags_folder dalam file airflow.cfg.

    1. Buka direktori instalasi Airflow.

      cd /root/airflow
    2. Periksa parameter dags_folder dalam file airflow.cfg.

      cat airflow.cfg
    3. (Opsional) Jika folder tidak ada di path yang ditentukan oleh parameter dags_folder, jalankan perintah mkdir untuk membuat folder tersebut.

      Catatan

      Misalnya, jika path untuk dags_folder adalah /root/airflow/dags tetapi folder dags tidak ada di direktori /root/airflow, buatlah folder tersebut.

  4. Buat file DAG, misalnya mysql_dags.py:

    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.utils.dates import days_ago
    default_args = {
        'owner': 'airflow',
    }
    dag = DAG(
        'example_mysql',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    mysql_test = MySqlOperator(
        task_id='mysql_test',
        mysql_conn_id='test',
        sql='SHOW DATABASES;',
        dag=dag,
    )
    mysql_test_task = MySqlOperator(
        task_id='mysql_test_task',
        mysql_conn_id='test',
        sql='SELECT * FROM test;',
        dag=dag,
    )
    mysql_test >> mysql_test_task
    if __name__ == "__main__":
        dag.cli()
    

    Tabel berikut menjelaskan parameter-parameter tersebut.

    • mysql_conn_id: ID koneksi yang Anda buat pada Langkah 2.

    • sql: pernyataan SQL yang ingin Anda eksekusi.

    Untuk informasi selengkapnya tentang parameter tersebut, lihat dokumentasi Airflow.

  5. Di UI Airflow, temukan DAG Anda dan klik ikon image di kolom Actions untuk menjalankannya.

    Setelah DAG dijalankan, klik lingkaran hijau di kolom Runs untuk melihat detail eksekusi.

    Angka "1" dalam lingkaran hijau di samping example_mysql menunjukkan satu kali eksekusi DAG berhasil.

    Di halaman Task Instances, Anda dapat melihat tiga catatan tugas. Jika State bernilai success, Dag ID adalah example_mysql, Task ID adalah mysql_test_task, dan Operator adalah MySqlOperator, maka tugas tersebut berhasil dijalankan.

    Penting

    Secara default, Airflow menggunakan zona waktu Coordinated Universal Time (UTC). Artinya, waktu eksekusi yang ditampilkan tertinggal 8 jam dari China Standard Time (UTC+8).