All Products
Search
Document Center

AnalyticDB:Gunakan Airflow untuk menjadwalkan pekerjaan SQL XIHE

Last Updated:Jun 26, 2025

Airflow adalah penjadwal sumber terbuka populer yang menyediakan berbagai alat baris perintah dan antarmuka web yang mudah digunakan. Airflow dapat mengoordinasikan dan menjadwalkan berbagai alur kerja sebagai grafik asiklik terarah (DAG). Anda dapat menggunakan Airflow untuk secara cerdas mengoordinasikan pekerjaan ekstraksi, transformasi, dan pemuatan (ETL) untuk pemrosesan data offline dan data real-time di AnalyticDB for MySQL. Ini mengotomatiskan pemrosesan data dan meningkatkan efisiensi pemrosesan.

Prasyarat

  • Sebuah kluster Enterprise Edition, Basic Edition, atau Data Lakehouse Edition dari AnalyticDB for MySQL telah dibuat.

  • Airflow telah diinstal. Untuk informasi lebih lanjut, lihat Pemasangan Airflow.

  • Alamat IP server yang menjalankan Airflow ditambahkan ke daftar putih alamat IP dari kluster AnalyticDB for MySQL. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP.

Prosedur

  1. Periksa apakah paket apache-airflow-providers-mysql ditampilkan.

    1. Akses antarmuka web Airflow. Di bilah navigasi atas, pilih Admin > Providers.

    2. Di halaman Providers, periksa apakah paket apache-airflow-providers-mysql ditampilkan.

    3. (Kondisional diperlukan) Jika paket apache-airflow-providers-mysql tidak ditampilkan, jalankan perintah berikut untuk menginstal paket apache-airflow-providers-mysql secara manual:

      pip install apache-airflow-providers-mysql
      Penting

      Jika pesan kesalahan OSError: mysql_config not found muncul setelah Anda menjalankan perintah sebelumnya, jalankan perintah yum install mysql-devel untuk menginstal MySQL dan kemudian jalankan kembali perintah sebelumnya untuk menginstal paket apache-airflow-providers-mysql.

  2. Buat koneksi ke kluster AnalyticDB for MySQL.

    1. Di bilah navigasi atas, pilih Admin > Connections.

    2. Klik ikon image. Di halaman Add Connections, konfigurasikan parameter yang dijelaskan dalam tabel berikut.

      Parameter

      Deskripsi

      Connection id

      Nama koneksi.

      Connection Type

      Tipe koneksi. Pilih MySQL.

      Host

      Titik akhir yang digunakan untuk terhubung ke kluster AnalyticDB for MySQL. Anda bisa mendapatkan titik akhir di halaman Cluster Information dari konsol AnalyticDB for MySQL.

      Login

      Nama akun basis data dari kluster AnalyticDB for MySQL.

      Password

      Kata sandi akun basis data dari kluster AnalyticDB for MySQL.

      Port

      Nomor port dari kluster AnalyticDB for MySQL. Atur nilainya menjadi 3306.

      Catatan

      Parameter lainnya bersifat opsional. Konfigurasikan parameter berdasarkan kebutuhan bisnis Anda.

  3. Pergi ke direktori instalasi Airflow dan periksa parameter dags_folder di file airflow.cfg.

    1. Pergi ke direktori instalasi Airflow.

      cd /root/airflow
    2. Periksa parameter dags_folder di file airflow.cfg.

      cat file.cfg
    3. (Kondisional diperlukan) Jika tidak ada folder di jalur yang ditentukan oleh parameter dags_folder, jalankan perintah mkdir untuk membuat folder.

      Catatan

      Sebagai contoh, jika parameter dags_folder menentukan jalur /root/airflow/dags dan jalur /root/airflow tidak berisi folder bernama dags, Anda dapat membuat folder dags di jalur /root/airflow.

  4. Tulis file DAG. Dalam contoh ini, sebuah file DAG bernama mysql_dags.py digunakan.

    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_mysql',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    mysql_test = MySqlOperator(
        task_id='mysql_test',
        mysql_conn_id='test',
        sql='SHOW DATABASES;',
        dag=dag,
    )
    mysql_test_task = MySqlOperator(
        task_id='mysql_test_task',
        mysql_conn_id='test',
        sql='SELECT * FROM test;',
        dag=dag,
    )
    mysql_test >> mysql_test_task
    if __name__ == "__main__":
        dag.cli()
    

    Parameter:

    • mysql_conn_id: nama koneksi yang dibuat di Langkah 2.

    • sql: pernyataan SQL spesifik bisnis.

    Untuk informasi tentang parameter lainnya, lihat DAGs.

  5. Di antarmuka web Airflow, klik tab DAGs dan klik DAG yang ingin Anda kelola. Di pojok kanan atas halaman yang muncul, klik ikon image.

    Setelah eksekusi berhasil, Anda dapat mengklik lingkaran hijau di sebelah kanan nama DAG untuk melihat detail eksekusi DAG.

    image

    image

    Penting

    Waktu eksekusi DAG yang ditampilkan adalah 8 jam lebih awal daripada waktu yang sesuai di zona waktu UTC+8 karena penjadwal Airflow menggunakan zona waktu UTC.