Airflow is a popular open source scheduler that provides a variety of command-line tools and an easy-to-use web interface. Airflow can orchestrate and schedule various workflows as directed acyclic graphs (DAGs). You can use Airflow to intelligently orchestrate extract, transform, load (ETL) jobs for offline data and real-time data processing in AnalyticDB for MySQL. This automates data processing and improves processing efficiency.
Prerequisites
An AnalyticDB for MySQL Data Lakehouse Edition cluster is created.
Airflow is installed. For more information, see Installation of Airflow.
The IP address of the server that runs Airflow is added to an IP address whitelist of the AnalyticDB for MySQL cluster. For more information, see Configure an IP address whitelist.
Procedure
Check whether the apache-airflow-providers-mysql package is displayed.
Access the Airflow web interface. In the top navigation bar, choose .
On the Providers page, check whether the apache-airflow-providers-mysql package is displayed.
(Conditionally required) If the apache-airflow-providers-mysql package is not displayed, run the following command to manually install the apache-airflow-providers-mysql package:
pip install apache-airflow-providers-mysql
ImportantIf the
OSError: mysql_config not found
error message is returned after you run the preceding command, run theyum install mysql-devel
command to install MySQL and then rerun the preceding command to install the apache-airflow-providers-mysql package.
Establish a connection to the AnalyticDB for MySQL cluster.
In the top navigation bar, choose .
Click the
icon. On the Add Connections page, configure the parameters that are described in the following table.
Parameter
Description
Connection id
The name of the connection.
Connection Type
The type of the connection. Select MySQL.
Host
The endpoint that is used to connect to the AnalyticDB for MySQL cluster. You can obtain the endpoint on the Cluster Information page of the AnalyticDB for MySQL console.
Login
The name of the database account of the AnalyticDB for MySQL cluster.
Password
The password of the database account of the AnalyticDB for MySQL cluster.
Port
The port number of the AnalyticDB for MySQL cluster. Set the value to 3306.
NoteOther parameters are optional. Configure the parameters based on your business requirements.
Go to the Airflow installation directory and check the dags_folder parameter in the
airflow.cfg
file.Go to the Airflow installation directory.
cd /root/airflow
Check the dags_folder parameter in the
airflow.cfg
file.cat file.cfg
(Conditionally required) If no folder exists in the path specified by the dags_folder parameter, run the
mkdir
command to create a folder.NoteFor example, if the dags_folder parameter specifies the
/root/airflow/dags
path and the/root/airflow
path does not contain a folder nameddags
, you can create the dags folder in the /root/airflow path.
Write a DAG file. In this example, a DAG file named
mysql_dags.py
is used.from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', } dag = DAG( 'example_mysql', default_args=default_args, start_date=days_ago(2), tags=['example'], ) mysql_test = MySqlOperator( task_id='mysql_test', mysql_conn_id='test', sql='SHOW DATABASES;', dag=dag, ) mysql_test_task = MySqlOperator( task_id='mysql_test_task', mysql_conn_id='test', sql='SELECT * FROM test;', dag=dag, ) mysql_test >> mysql_test_task if __name__ == "__main__": dag.cli()
Parameters:
mysql_conn_id
: the name of the connection established in Step 2.sql
: the business-specific SQL statement.
For information about other parameters, see DAGs.
On the Airflow web interface, click the DAGs tab and click the DAG that you want to manage. In the upper-right corner of the page that appears, click the
icon.
After the execution is successful, you can click the green circle to the right of the DAG name to view the execution details of the DAG.
ImportantThe displayed execution time of the DAG is 8 hours earlier than the corresponding time in the UTC+8 time zone because the Airflow scheduler uses the UTC time zone.