Airflow是比較流行的開源調度工具,提供了豐富的命令列工具和簡單易用的操作介面,可以實現各類工作負載的DAG編排與調度。您可以使用Airflow完成AnalyticDB for MySQL離線資料的ETL智能編排和即時資料處理流程的編排,實現資料處理過程的自動化,提高資料處理的效率。
前提條件
叢集的產品系列為企業版、基礎版或湖倉版。
已安裝Airflow。具體操作,請參見Airflow社區文檔。
已將運行Airflow的伺服器IP地址添加至AnalyticDB for MySQL叢集的白名單中。具體操作,請參見設定白名單。
操作步驟
查看是否存在apache-airflow-providers-mysql。
訪問Airflow Web介面,在頂部導覽列單擊。
在Providers頁面,查看是否存在apache-airflow-providers-mysql。
(條件必選)若不存在apache-airflow-providers-mysql,需執行以下命令手動安裝apache-airflow-providers-mysql。
pip install apache-airflow-providers-mysql重要執行該語句後,若出現
OSError: mysql_config not found報錯,需執行yum install mysql-devel命令安裝MySQL,再重新執行該命令安裝apache-airflow-providers-mysql。
建立串連。
在頂部導覽列單擊。
單擊
按鈕,在Add Connections頁面配置如下參數:參數
說明
Connection id
串連名稱。
Connection Type
選擇MySQL。
Host
AnalyticDB for MySQL叢集的串連地址。可通過控制台叢集資訊頁面,查看串連資訊。
Login
AnalyticDB for MySQL叢集的資料庫帳號。
Password
AnalyticDB for MySQL叢集資料庫帳號的密碼。
Port
AnalyticDB for MySQL叢集的連接埠號碼,固定為3306。
說明其他參數為選填參數,按需配置即可。
進入Airflow安裝目錄,在
airflow.cfg檔案中查看dags_folder參數。進入Airflow安裝目錄。
cd /root/airflow在
airflow.cfg檔案中查看dags_folder參數。cat file.cfg(條件必選)如果dags_folder參數指定路徑下無檔案夾,您需執行
mkdir命令建立檔案夾。說明例如,dags_folder參數指定的路徑為
/root/airflow/dags,如果/root/airflow路徑下無dags檔案夾,您可以在該路徑下建立dags檔案夾。
編寫DAG,本文的DAG檔案為
mysql_dags.py。from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', } dag = DAG( 'example_mysql', default_args=default_args, start_date=days_ago(2), tags=['example'], ) mysql_test = MySqlOperator( task_id='mysql_test', mysql_conn_id='test', sql='SHOW DATABASES;', dag=dag, ) mysql_test_task = MySqlOperator( task_id='mysql_test_task', mysql_conn_id='test', sql='SELECT * FROM test;', dag=dag, ) mysql_test >> mysql_test_task if __name__ == "__main__": dag.cli()參數說明:
mysql_conn_id:步驟2中配置的串連名稱。sql:業務具體的SQL語句。
其他參數請參見Airflow社區文檔。
在Airflow Web介面,單擊對應DAG右側
按鈕。執行成功後,您也可以單擊對應DAG右側的綠色圓圈,查看DAG執行的詳細資料。

重要Airflow工具預設使用UTC時區,所以DAG的執行時間顯示比北京時間(UTC+8)少8小時。