全部產品
Search
文件中心

AnalyticDB:Airflow調度XIHE SQL

更新時間:Aug 08, 2024

Airflow是比較流行的開源調度工具,提供了豐富的命令列工具和簡單易用的操作介面,可以實現各類工作負載的DAG編排與調度。您可以使用Airflow完成AnalyticDB for MySQL離線資料的ETL智能編排和即時資料處理流程的編排,實現資料處理過程的自動化,提高資料處理的效率。

前提條件

  • 叢集的產品系列為企業版、基礎版或湖倉版

  • 已安裝Airflow。具體操作,請參見Airflow社區文檔

  • 已將運行Airflow的伺服器IP地址添加至AnalyticDB for MySQL叢集的白名單中。具體操作,請參見設定白名單

操作步驟

  1. 查看是否存在apache-airflow-providers-mysql

    1. 訪問Airflow Web介面,在頂部導覽列單擊Admin > Providers

    2. Providers頁面,查看是否存在apache-airflow-providers-mysql

    3. (條件必選)若不存在apache-airflow-providers-mysql,需執行以下命令手動安裝apache-airflow-providers-mysql

      pip install apache-airflow-providers-mysql
      重要

      執行該語句後,若出現OSError: mysql_config not found報錯,需執行yum install mysql-devel命令安裝MySQL,再重新執行該命令安裝apache-airflow-providers-mysql

  2. 建立串連。

    1. 在頂部導覽列單擊Admin > Connections

    2. 單擊image按鈕,在Add Connections頁面配置如下參數:

      參數

      說明

      Connection id

      串連名稱。

      Connection Type

      選擇MySQL

      Host

      AnalyticDB for MySQL叢集的串連地址。可通過控制台叢集資訊頁面,查看串連資訊。

      Login

      AnalyticDB for MySQL叢集的資料庫帳號。

      Password

      AnalyticDB for MySQL叢集資料庫帳號的密碼。

      Port

      AnalyticDB for MySQL叢集的連接埠號碼,固定為3306。

      說明

      其他參數為選填參數,按需配置即可

  3. 進入Airflow安裝目錄,在airflow.cfg檔案中查看dags_folder參數。

    1. 進入Airflow安裝目錄。

      cd /root/airflow
    2. airflow.cfg檔案中查看dags_folder參數。

      cat file.cfg
    3. (條件必選)如果dags_folder參數指定路徑下無檔案夾,您需執行mkdir命令建立檔案夾。

      說明

      例如,dags_folder參數指定的路徑為/root/airflow/dags,如果/root/airflow路徑下無dags檔案夾,您可以在該路徑下建立dags檔案夾。

  4. 編寫DAG,本文的DAG檔案為mysql_dags.py

    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_mysql',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    mysql_test = MySqlOperator(
        task_id='mysql_test',
        mysql_conn_id='test',
        sql='SHOW DATABASES;',
        dag=dag,
    )
    mysql_test_task = MySqlOperator(
        task_id='mysql_test_task',
        mysql_conn_id='test',
        sql='SELECT * FROM test;',
        dag=dag,
    )
    mysql_test >> mysql_test_task
    if __name__ == "__main__":
        dag.cli()
    

    參數說明:

    • mysql_conn_id:步驟2中配置的串連名稱。

    • sql:業務具體的SQL語句。

    其他參數請參見Airflow社區文檔

  5. Airflow Web介面,單擊對應DAG右側image按鈕

    執行成功後,您也可以單擊對應DAG右側的綠色圓圈,查看DAG執行的詳細資料。

    image

    image

    重要

    Airflow工具預設使用UTC時區,所以DAG的執行時間顯示比北京時間(UTC+8)少8小時。